当前位置: 首页 > news >正文

#4:MinIO分片上传和集群部署

本文介绍了MinIO大文件分片上传和集群部署的实现方案。对于分片上传,详细说明了使用AWSS3 SDK实现的分片流程(初始化、上传、合并),包括Java配置、API设计和Vue前端实现。在集群部署方面,介绍了4节点MinIO集群的搭建过程,包括磁盘准备、纠删码配置、负载均衡(Nginx)等关键步骤。此外还提供了MinIO工具的简单使用方法,如文件上传和Docker部署配置。该方案支持大文件上传、断点续传、秒传等功能,适用于需要高可用分布式存储的场景。

一、分片上传

当上传大文件(>100M)时,或者大于单次上传限额5GB时,需要断点续传、秒传、进度条功能时,并可以利用宽带加速实现分片上传传输。

注意:如果你使用的为新版本的依赖,minio弃用了原先的create和init方法,需要结合AWS S3

案例项目地址:https://github.com/dev-wangchao/MinIO.git

1、分片流程

分片初始化

分片上传

分片合并

2、依赖引入
        <dependency><groupId>io.minio</groupId><artifactId>minio</artifactId><version>8.4.3</version></dependency><!-- AWS S3 SDK for multipart upload --><dependency><groupId>com.amazonaws</groupId><artifactId>aws-java-sdk-s3</artifactId><version>1.12.565</version></dependency>
3、基本配置

application.yml配置文件

minio:endpoint: http://1.27.236.84:9000username: minioadminpassword: minioadmindefaultBucket: bucketpresignedUrlExpiry: 7

属性配置类

@Component
@ConfigurationProperties(prefix = "minio")
@Data
public class MinioProperties {private String endpoint;private String username;private String password;private String defaultBucket;private Integer presignedUrlExpiry;}

AmazonS3配置

@Beanpublic AmazonS3 amazonS3Client() {// 创建AWS凭证AWSCredentials credentials = new BasicAWSCredentials(minioProperties.getUsername(),minioProperties.getPassword());// 客户端配置ClientConfiguration clientConfiguration = new ClientConfiguration();clientConfiguration.setSignerOverride("AWSS3V4SignerType");// 构建S3客户端,配置为使用MinIOreturn AmazonS3ClientBuilder.standard().withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(minioProperties.getEndpoint(), "us-east-1")).withCredentials(new AWSStaticCredentialsProvider(credentials)).withPathStyleAccessEnabled(true) // MinIO使用路径样式访问.withClientConfiguration(clientConfiguration).build();}
4、初始化分片
@PostMapping("/init")public String initMultipartUpload(@RequestParam String fileName,@RequestParam String contentType) {String newFileName = String.format("%s/%s",LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE), fileName);ObjectMetadata metadata = new ObjectMetadata();metadata.setContentType(contentType);InitiateMultipartUploadRequest uploadRequest = new InitiateMultipartUploadRequest(minioProperties.getDefaultBucket(), newFileName, metadata);InitiateMultipartUploadResult uploadResult = amazonS3.initiateMultipartUpload(uploadRequest);String uploadId = uploadResult.getUploadId();log.info("初始化分片上传成功,uploadId: {}", uploadId);String redisKey = String.format("minio:multipart:upload:%s", uploadId);Map<String, Object> partInfo = Map.of("bucket", minioProperties.getDefaultBucket(),"originFileName", fileName, "newFileName", newFileName, "uploadId", uploadId);stringRedisTemplate.opsForHash().putAll(redisKey, partInfo);stringRedisTemplate.expire(redisKey, Duration.ofMinutes(10));return uploadId;}
5、分片上传文件
@PostMapping("/upload")public void uploadMultipartFile(@RequestParam MultipartFile partFile,@RequestParam String uploadId,@RequestParam Integer partNumber) throws Exception {Assert.isTrue(1 <= partNumber && partNumber < 10000, "分片号必须在1-10000之间");String redisKey = String.format("minio:multipart:upload:%s", uploadId);Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(redisKey);UploadPartRequest uploadPartRequest = new UploadPartRequest().withInputStream(partFile.getInputStream()).withPartSize(partFile.getSize()).withUploadId(uploadId).withPartNumber(partNumber).withBucketName((String) entries.get("bucket"))// 确保所有分片上传到同一个对象.withKey((String) entries.get("newFileName"));UploadPartResult result = amazonS3.uploadPart(uploadPartRequest);log.info("分片上传成功: uploadId={}, partNumber={}, etag={}",uploadId, partNumber, result.getPartETag().getETag());String redisPartKey = String.format("minio:multipart:%s:%s", uploadId, partNumber);Map<String, Object> partInfo = Map.of("partNumber", partNumber.toString(), "etag", result.getPartETag().getETag());stringRedisTemplate.opsForHash().putAll(redisPartKey, partInfo);}
6、合并分片
    @PostMapping("/marge")public void margeMultipartUpload(@RequestParam String uploadId) {String redisKey = String.format("minio:multipart:upload:%s", uploadId);Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(redisKey);// 获取所有分片信息(排除上传信息本身)String partKeys = String.format("minio:multipart:%s:*", uploadId);Set<String> keys = stringRedisTemplate.keys(partKeys);Assert.isTrue(keys != null && !keys.isEmpty(), "请先上传分片");// 过滤掉上传信息本身,只保留分片信息Set<String> partKeysOnly = keys.stream().filter(key -> !key.equals(String.format("minio:multipart:upload:%s", uploadId))).collect(java.util.stream.Collectors.toSet());Assert.isTrue(!partKeysOnly.isEmpty(), "没有找到有效的分片信息");// 构建分片ETagList<Map<Object, Object>> partInfoMap = partKeysOnly.stream().map(key -> stringRedisTemplate.opsForHash().entries(key)).sorted(Comparator.comparingInt(p -> Integer.parseInt(p.get("partNumber").toString()))).toList();List<PartETag> partETags = partInfoMap.stream().map(partInfo ->new PartETag(Integer.parseInt(partInfo.get("partNumber").toString()) , partInfo.get("etag").toString())).toList();CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest().withUploadId(uploadId).withBucketName(entries.get("bucket").toString()).withKey(entries.get("newFileName").toString()).withPartETags(partETags);CompleteMultipartUploadResult result = amazonS3.completeMultipartUpload(completeMultipartUploadRequest);// 清理Redis数据stringRedisTemplate.delete(redisKey);log.info("合并分片成功: uploadId={}, objectKey={}, etag={}", uploadId, result.getKey(), result.getETag());}
7、Vue代码

前端代码我只贴出了部分,前端测试可能需要你们自己生成测试文件啦

<template><div class="multipart-upload-demo"><el-card><template #header><div class="card-header"><span>AWS S3 分片上传演示</span></div></template><div class="upload-section"><el-uploadref="uploadRef"class="upload-demo":auto-upload="false":on-change="handleFileChange":file-list="fileList":show-file-list="false"drag><el-icon class="el-icon--upload"><UploadFilled /></el-icon><div class="el-upload__text">拖拽文件到此处或 <em>点击选择文件</em></div><template #tip><div class="el-upload__tip">支持大文件上传,自动分片处理</div></template></el-upload><div class="file-info" v-if="selectedFile"><h4>选择的文件:</h4><p><strong>文件名:</strong>{{ selectedFile.name }}</p><p><strong>文件大小:</strong>{{ formatFileSize(selectedFile.size) }}</p><p><strong>文件类型:</strong>{{ selectedFile.type || '未知' }}</p><p><strong>预计分片数:</strong>{{ estimatedChunks }}</p></div><div class="upload-controls" v-if="selectedFile"><el-button type="primary" @click="startUpload" :loading="uploading">开始上传</el-button><el-button @click="clearFile">清除文件</el-button><el-button v-if="currentUploadId" type="danger" @click="abortUpload":loading="aborting">取消上传</el-button></div><div class="upload-progress" v-if="uploading || uploadProgress.total > 0"><h4>上传进度:</h4><el-progress :percentage="uploadPercentage" :status="uploadStatus":stroke-width="20"/><p>已上传:{{ uploadProgress.completed }} / {{ uploadProgress.total }} 分片</p><div v-if="uploadSpeed > 0" class="upload-stats"><p>上传速度:{{ formatFileSize(uploadSpeed) }}/s</p><p>剩余时间:{{ estimatedTime }}</p></div></div><div class="upload-result" v-if="uploadResult"><el-alert title="上传成功" type="success" :closable="false"show-icon><p><strong>上传ID:</strong>{{ uploadResult.uploadId }}</p><p><strong>文件名:</strong>{{ uploadResult.fileName }}</p><p><strong>文件大小:</strong>{{ formatFileSize(uploadResult.fileSize) }}</p><p><strong>分片数量:</strong>{{ uploadResult.chunkCount }}</p><p><strong>状态:</strong>{{ uploadResult.message }}</p></el-alert></div><div class="error-message" v-if="errorMessage"><el-alert :title="errorMessage" type="error" :closable="true"@close="clearError"show-icon/></div></div></el-card></div>
</template><script>
import { ref, computed } from 'vue'
import { ElMessage } from 'element-plus'
import { UploadFilled } from '@element-plus/icons-vue'
import MultipartUpload from '@/utils/MultipartUpload'export default {name: 'MultipartUploadDemo',components: {UploadFilled},setup() {const selectedFile = ref(null)const fileList = ref([])const uploading = ref(false)const aborting = ref(false)const uploadProgress = ref({ completed: 0, total: 0 })const uploadResult = ref(null)const errorMessage = ref('')const currentUploadId = ref('')const uploadStartTime = ref(0)const uploadedBytes = ref(0)const uploadSpeed = ref(0)// 分片大小 (10MB)const chunkSize = 10 * 1024 * 1024// 计算预计分片数const estimatedChunks = computed(() => {if (!selectedFile.value) return 0return Math.ceil(selectedFile.value.size / chunkSize)})// 计算上传百分比const uploadPercentage = computed(() => {if (uploadProgress.value.total === 0) return 0return Math.round((uploadProgress.value.completed / uploadProgress.value.total) * 100)})// 上传状态const uploadStatus = computed(() => {if (errorMessage.value) return 'exception'if (uploadPercentage.value === 100) return 'success'return undefined})// 预计剩余时间const estimatedTime = computed(() => {if (uploadSpeed.value === 0 || !selectedFile.value) return '计算中...'const remainingBytes = selectedFile.value.size - uploadedBytes.valueconst remainingSeconds = remainingBytes / uploadSpeed.valueif (remainingSeconds < 60) {return `${Math.round(remainingSeconds)}秒`} else if (remainingSeconds < 3600) {return `${Math.round(remainingSeconds / 60)}分钟`} else {return `${Math.round(remainingSeconds / 3600)}小时`}})// 文件选择处理const handleFileChange = (file) => {selectedFile.value = file.rawclearResult()}// 开始上传const startUpload = async () => {if (!selectedFile.value) {ElMessage.warning('请先选择文件')return}uploading.value = trueuploadProgress.value = { completed: 0, total: estimatedChunks.value }uploadStartTime.value = Date.now()uploadedBytes.value = 0clearError()clearResult()try {const multipartUpload = new MultipartUpload({chunkSize: chunkSize,onProgress: (progress) => {uploadProgress.value.completed = progress.partNumber// 计算上传速度const currentTime = Date.now()const elapsedTime = (currentTime - uploadStartTime.value) / 1000 // 秒uploadedBytes.value = progress.partNumber * chunkSizeuploadSpeed.value = uploadedBytes.value / elapsedTime},onError: (error) => {errorMessage.value = error.message || '上传失败'uploading.value = false},onSuccess: (result) => {uploadResult.value = resultElMessage.success('文件上传成功')uploading.value = false}})const result = await multipartUpload.upload(selectedFile.value)console.log('上传完成:', result)// 保存uploadId用于可能的取消操作currentUploadId.value = result.uploadId} catch (error) {console.error('上传失败:', error)errorMessage.value = error.message || '上传失败'ElMessage.error('文件上传失败')} finally {uploading.value = false}}// 取消上传const abortUpload = async () => {if (!currentUploadId.value) returnaborting.value = truetry {// 由于MinioChunkUploadController没有abort方法,这里只能在前端取消// 实际上已经上传的分片仍然会保留在Minio中ElMessage.info('上传已取消(注意:已上传的分片仍保留在服务器中)')uploading.value = falsecurrentUploadId.value = ''uploadProgress.value = { completed: 0, total: 0 }} catch (error) {console.error('取消上传失败:', error)ElMessage.error('取消上传失败')} finally {aborting.value = false}}// 清除文件const clearFile = () => {selectedFile.value = nullfileList.value = []clearResult()clearError()uploadProgress.value = { completed: 0, total: 0 }}// 清除结果const clearResult = () => {uploadResult.value = null}// 清除错误const clearError = () => {errorMessage.value = ''}// 格式化文件大小const formatFileSize = (bytes) => {if (bytes === 0) return '0 B'const k = 1024const sizes = ['B', 'KB', 'MB', 'GB']const i = Math.floor(Math.log(bytes) / Math.log(k))return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i]}return {selectedFile,fileList,uploading,aborting,uploadProgress,uploadResult,errorMessage,currentUploadId,uploadSpeed,estimatedChunks,uploadPercentage,uploadStatus,estimatedTime,handleFileChange,startUpload,abortUpload,clearFile,clearError,formatFileSize}}
}
</script><style scoped>
.multipart-upload-demo {padding: 20px;
}.upload-section {max-width: 800px;margin: 0 auto;
}.upload-demo {margin-bottom: 20px;
}.file-info {background: #f5f7fa;padding: 15px;border-radius: 4px;margin: 20px 0;
}.file-info h4 {margin-top: 0;color: #303133;
}.file-info p {margin: 8px 0;color: #606266;
}.upload-controls {margin: 20px 0;text-align: center;
}.upload-controls .el-button {margin: 0 10px;
}.upload-progress {margin: 20px 0;
}.upload-progress h4 {color: #303133;margin-bottom: 15px;
}.upload-stats {margin-top: 10px;font-size: 14px;color: #909399;
}.upload-result {margin: 20px 0;
}.error-message {margin: 20px 0;
}
</style>
8、结果测试

可以观察到分片上传成功

二、集群部署

1、集群准备

MinIO的分布式部署依赖纠删码实现数据冗余和高可用,要求最小集群规模为4节点,以支持数据分片(K个数据库)和(M个校验块)的分布式存储。

2、磁盘添加

部署纠删码需要挂载到一块全新的磁盘中,故需要添加一块全新的磁盘

3、检查磁盘块

lsblk

4、格式化挂载

mkfs.xfs /dev/sdb

mkdir -p /opt/minio/data

mount /dev/sdb /opt/minio/data

5.安装MinIO

以下命令都要同时在四台服务器中执行

mkdir -p /usr/local/minio

mkdir -p /etc/minio

cd /usr/local/minio

wget  http://dl.minio.org.cn/server/minio/release/linux-amd64/minio

chmod +x minio

vim start_minio.sh

sh脚本命令:

#!/bin/bash
# 设置MinIO的管理员账号密码(建议生产环境修改为强密码)
export MINIO_ROOT_USER=minioadmin
export MINIO_ROOT_PASSWORD=minioadmin# 启动MinIO分布式集群
/usr/local/minio/minio server \--config-dir /etc/minio \       # 指定配置文件目录--address :9000 \               # 设置API服务监听端口--console-address :9001 \       # 设置控制台监听端口\# 节点1的4个数据目录http://192.168.88.131/opt/minio/data/data1 \http://192.168.88.131/opt/minio/data/data2 \http://192.168.88.131/opt/minio/data/data3 \http://192.168.88.131/opt/minio/data/data4 \\# 节点2的4个数据目录http://192.168.88.132/opt/minio/data/data1 \http://192.168.88.132/opt/minio/data/data2 \http://192.168.88.132/opt/minio/data/data3 \http://192.168.88.132/opt/minio/data/data4 \\# 节点3的4个数据目录http://192.168.88.133/opt/minio/data/data1 \http://192.168.88.133/opt/minio/data/data2 \http://192.168.88.133/opt/minio/data/data3 \http://192.168.88.133/opt/minio/data/data4 \\# 节点4的4个数据目录http://192.168.88.134/opt/minio/data/data1 \http://192.168.88.134/opt/minio/data/data2 \http://192.168.88.134/opt/minio/data/data3 \http://192.168.88.134/opt/minio/data/data4 \&  # 后台运行

chmod +x start_minio.sh

chmod +x /usr/local/minio/minio

mkdir -p /opt/minio/data/data1 /opt/minio/data/data2 /opt/minio/data/data3 /opt/minio/data/data4

./start_minio.sh

至此集群环境已经搭建完毕,你可以在其中一个节点上传文件,会发现另外一个节点也可以看见文件数据

部署分布图:

6.负载均衡

在主节点里搭建Nginx来负载均衡到四个从节点

yum install gcc openssl openssl-devel pcre pcre-devel zlib zlib-devel -y

wget http://nginx.org/download/nginx-1.24.0.tar.gz

tar -zxvf nginx-1.24.0.tar.gz

cd nginx-1.24.0/

./configure --prefix=/usr/local/nginx

make install

/usr/local/nginx/sbin/nginx -c /usr/local/nginx/conf/nginx.conf

cd  /usr/local/nginx/conf

在Nginx的配置文件中添加数据组,默认使用轮询

配置文件参考如下:

#user  nobody;
worker_processes  1;events {worker_connections  1024;
}http {include       mime.types;default_type  application/octet-stream;sendfile        on;keepalive_timeout  65;upstream api {server 192.168.88.131:9000;server 192.168.88.132:9000;server 192.168.88.133:9000;server 192.168.88.134:9000;}upstream webapi {server 192.168.88.131:9001;server 192.168.88.132:9001;server 192.168.88.133:9001;server 192.168.88.134:9001;}server {listen 80;server_name localhost;# MinIO APIlocation /minio/api/ {proxy_pass http://api;proxy_set_header Host $http_host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}# 根路径跳转到控制台location = / {return 302 /console/;}}server {listen 90;server_name localhost;# 关键修改:路径重写 + 根路径代理location /console/ {# 移除路径前缀:将 /console/xxx -> /xxxrewrite ^/console/(.*)$ /$1 break;proxy_pass http://webapi;# 必须添加的请求头proxy_set_header Host $http_host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;# 关键:支持WebSocketproxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";# 解决静态资源路径问题sub_filter_once off;sub_filter_types *;sub_filter 'href="/' 'href="/console/';sub_filter 'src="/' 'src="/console/';sub_filter '="/' '="/console/';  # 修复相对路径资源}# 防止直接访问根路径404location = / {return 302 /console/;}}
}

遂至,所有的集群工作都已经完成。

三、MinIO工具

1、MinioCient配置
    @Beanpublic MinioClient minioClient() throws Exception {return MinioClient.builder().endpoint(minioProperties.getEndpoint()).credentials(minioProperties.getUsername(), minioProperties.getPassword()).httpClient(new OkHttpClient()).build();}
2、文件上传

minio内部会自动对大文件分片处理封装

    @PostMapping("pdf")public String uploadPdf(MultipartFile file) throws Exception{String fileName = getObjName(file.getOriginalFilename(), PdfType);try(InputStream inputStream = file.getInputStream()) {minioClient.putObject(PutObjectArgs.builder().bucket(minioProperties.getDefaultBucket()).object(fileName)// 文件流;文件的总大小;每个分片的大小,-1表示默认分片5或者64MB.stream(inputStream, file.getSize(), -1).build());}// 获取文件对象进行验证minioClient.getObject(GetObjectArgs.builder().bucket(minioProperties.getDefaultBucket()).object(fileName).build());return minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder().bucket(minioProperties.getDefaultBucket()).object(fileName).method(Method.GET).expiry(minioProperties.getPresignedUrlExpiry(), TimeUnit.DAYS).build());}
3、docker-compose.yml
version: '3.7'services:minio:image: minio/minio:latestcontainer_name: minioports:- "9000:9000"  # API端口- "9001:9001"  # 控制台端口environment:MINIO_ROOT_USER: minioadmin  # 默认用户名(强烈建议修改!)MINIO_ROOT_PASSWORD: minioadmin  # 默认密码(强烈建议修改!)volumes:- /opt/minio-server/minio-data/:/data  # 持久化存储目录command: server /data --console-address ":9001"  # 明确指定控制台端口restart: unless-stoppednetworks:- minio_networknetworks:minio_network:driver: bridge

以上就是我整理的MinIO学习笔记,如有不足之处,欢迎指正。

http://www.xdnf.cn/news/1275589.html

相关文章:

  • 攻击实验(ARP欺骗、MAC洪范、TCP SYN Flood攻击、DHCP欺骗、DHCP饿死)
  • 安全运维的核心
  • C语言——深入理解指针(二)
  • 【递归、搜索与回溯算法】递归算法
  • Ollama+Deepseek+Docker+RAGFlow打造自己的私人AI知识库
  • 计算机网络:超网即路由聚合一定需要连续的IP地址吗?
  • 秋招春招实习百度笔试百度管培生笔试题库百度非技术岗笔试|笔试解析和攻略|题库分享
  • RabbitMQ面试精讲 Day 19:网络调优与连接池管理
  • Spring Boot 注解详解:@RequestMapping 的多种用法
  • 十、Linux Shell脚本:流程控制语句
  • Day41--动态规划--121. 买卖股票的最佳时机,122. 买卖股票的最佳时机 II,123. 买卖股票的最佳时机 III
  • 网闸技术解析:如何实现对国产数据库(达梦/金仓)的深度支持
  • 我如何从安全运维逆袭成企业CSO
  • WiFi原理与WiFi安全
  • 【软考中级网络工程师】知识点之 IPv6 全解析
  • 基于python高校固定资产管理系统
  • 【在线五子棋对战】十二、http请求处理
  • 【经典算法】二叉树最小深度详解:递归解法与可视化分析
  • 【自用】JavaSE--IO流(二)--缓冲流、转换流、打印流、数据流、序列化流、IO框架
  • Redis 数据类型和单线程模型补充
  • Spring的三层架构及其各个层用到注解详细解释。
  • reuse: for booting my spring project with mvn in Windows command line
  • 基于 InfluxDB 的服务器性能监控系统实战(三)
  • Ubuntu 安装 Elasticsearch
  • Elasticsearch 搜索模板(Search Templates)把“可配置查询”装进 Mustache
  • 人工智能-python-机器学习-决策树与集成学习:决策树分类与随机森林
  • 深入浅出DBSCAN:基于密度的聚类算法详解与Python实战
  • redis集群-本地环境
  • AAAI 2025丨具身智能+多模态感知如何精准锁定目标
  • BGP笔记整理