#4:MinIO分片上传和集群部署
本文介绍了MinIO大文件分片上传和集群部署的实现方案。对于分片上传,详细说明了使用AWSS3 SDK实现的分片流程(初始化、上传、合并),包括Java配置、API设计和Vue前端实现。在集群部署方面,介绍了4节点MinIO集群的搭建过程,包括磁盘准备、纠删码配置、负载均衡(Nginx)等关键步骤。此外还提供了MinIO工具的简单使用方法,如文件上传和Docker部署配置。该方案支持大文件上传、断点续传、秒传等功能,适用于需要高可用分布式存储的场景。
一、分片上传
当上传大文件(>100M)时,或者大于单次上传限额5GB时,需要断点续传、秒传、进度条功能时,并可以利用宽带加速实现分片上传传输。
注意:如果你使用的为新版本的依赖,minio弃用了原先的create和init方法,需要结合AWS S3
案例项目地址:https://github.com/dev-wangchao/MinIO.git
1、分片流程
分片初始化
分片上传
分片合并
2、依赖引入
<dependency><groupId>io.minio</groupId><artifactId>minio</artifactId><version>8.4.3</version></dependency><!-- AWS S3 SDK for multipart upload --><dependency><groupId>com.amazonaws</groupId><artifactId>aws-java-sdk-s3</artifactId><version>1.12.565</version></dependency>
3、基本配置
application.yml配置文件
minio:endpoint: http://1.27.236.84:9000username: minioadminpassword: minioadmindefaultBucket: bucketpresignedUrlExpiry: 7
属性配置类
@Component
@ConfigurationProperties(prefix = "minio")
@Data
public class MinioProperties {private String endpoint;private String username;private String password;private String defaultBucket;private Integer presignedUrlExpiry;}
AmazonS3配置
@Beanpublic AmazonS3 amazonS3Client() {// 创建AWS凭证AWSCredentials credentials = new BasicAWSCredentials(minioProperties.getUsername(),minioProperties.getPassword());// 客户端配置ClientConfiguration clientConfiguration = new ClientConfiguration();clientConfiguration.setSignerOverride("AWSS3V4SignerType");// 构建S3客户端,配置为使用MinIOreturn AmazonS3ClientBuilder.standard().withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(minioProperties.getEndpoint(), "us-east-1")).withCredentials(new AWSStaticCredentialsProvider(credentials)).withPathStyleAccessEnabled(true) // MinIO使用路径样式访问.withClientConfiguration(clientConfiguration).build();}
4、初始化分片
@PostMapping("/init")public String initMultipartUpload(@RequestParam String fileName,@RequestParam String contentType) {String newFileName = String.format("%s/%s",LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE), fileName);ObjectMetadata metadata = new ObjectMetadata();metadata.setContentType(contentType);InitiateMultipartUploadRequest uploadRequest = new InitiateMultipartUploadRequest(minioProperties.getDefaultBucket(), newFileName, metadata);InitiateMultipartUploadResult uploadResult = amazonS3.initiateMultipartUpload(uploadRequest);String uploadId = uploadResult.getUploadId();log.info("初始化分片上传成功,uploadId: {}", uploadId);String redisKey = String.format("minio:multipart:upload:%s", uploadId);Map<String, Object> partInfo = Map.of("bucket", minioProperties.getDefaultBucket(),"originFileName", fileName, "newFileName", newFileName, "uploadId", uploadId);stringRedisTemplate.opsForHash().putAll(redisKey, partInfo);stringRedisTemplate.expire(redisKey, Duration.ofMinutes(10));return uploadId;}
5、分片上传文件
@PostMapping("/upload")public void uploadMultipartFile(@RequestParam MultipartFile partFile,@RequestParam String uploadId,@RequestParam Integer partNumber) throws Exception {Assert.isTrue(1 <= partNumber && partNumber < 10000, "分片号必须在1-10000之间");String redisKey = String.format("minio:multipart:upload:%s", uploadId);Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(redisKey);UploadPartRequest uploadPartRequest = new UploadPartRequest().withInputStream(partFile.getInputStream()).withPartSize(partFile.getSize()).withUploadId(uploadId).withPartNumber(partNumber).withBucketName((String) entries.get("bucket"))// 确保所有分片上传到同一个对象.withKey((String) entries.get("newFileName"));UploadPartResult result = amazonS3.uploadPart(uploadPartRequest);log.info("分片上传成功: uploadId={}, partNumber={}, etag={}",uploadId, partNumber, result.getPartETag().getETag());String redisPartKey = String.format("minio:multipart:%s:%s", uploadId, partNumber);Map<String, Object> partInfo = Map.of("partNumber", partNumber.toString(), "etag", result.getPartETag().getETag());stringRedisTemplate.opsForHash().putAll(redisPartKey, partInfo);}
6、合并分片
@PostMapping("/marge")public void margeMultipartUpload(@RequestParam String uploadId) {String redisKey = String.format("minio:multipart:upload:%s", uploadId);Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(redisKey);// 获取所有分片信息(排除上传信息本身)String partKeys = String.format("minio:multipart:%s:*", uploadId);Set<String> keys = stringRedisTemplate.keys(partKeys);Assert.isTrue(keys != null && !keys.isEmpty(), "请先上传分片");// 过滤掉上传信息本身,只保留分片信息Set<String> partKeysOnly = keys.stream().filter(key -> !key.equals(String.format("minio:multipart:upload:%s", uploadId))).collect(java.util.stream.Collectors.toSet());Assert.isTrue(!partKeysOnly.isEmpty(), "没有找到有效的分片信息");// 构建分片ETagList<Map<Object, Object>> partInfoMap = partKeysOnly.stream().map(key -> stringRedisTemplate.opsForHash().entries(key)).sorted(Comparator.comparingInt(p -> Integer.parseInt(p.get("partNumber").toString()))).toList();List<PartETag> partETags = partInfoMap.stream().map(partInfo ->new PartETag(Integer.parseInt(partInfo.get("partNumber").toString()) , partInfo.get("etag").toString())).toList();CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest().withUploadId(uploadId).withBucketName(entries.get("bucket").toString()).withKey(entries.get("newFileName").toString()).withPartETags(partETags);CompleteMultipartUploadResult result = amazonS3.completeMultipartUpload(completeMultipartUploadRequest);// 清理Redis数据stringRedisTemplate.delete(redisKey);log.info("合并分片成功: uploadId={}, objectKey={}, etag={}", uploadId, result.getKey(), result.getETag());}
7、Vue代码
前端代码我只贴出了部分,前端测试可能需要你们自己生成测试文件啦
<template><div class="multipart-upload-demo"><el-card><template #header><div class="card-header"><span>AWS S3 分片上传演示</span></div></template><div class="upload-section"><el-uploadref="uploadRef"class="upload-demo":auto-upload="false":on-change="handleFileChange":file-list="fileList":show-file-list="false"drag><el-icon class="el-icon--upload"><UploadFilled /></el-icon><div class="el-upload__text">拖拽文件到此处或 <em>点击选择文件</em></div><template #tip><div class="el-upload__tip">支持大文件上传,自动分片处理</div></template></el-upload><div class="file-info" v-if="selectedFile"><h4>选择的文件:</h4><p><strong>文件名:</strong>{{ selectedFile.name }}</p><p><strong>文件大小:</strong>{{ formatFileSize(selectedFile.size) }}</p><p><strong>文件类型:</strong>{{ selectedFile.type || '未知' }}</p><p><strong>预计分片数:</strong>{{ estimatedChunks }}</p></div><div class="upload-controls" v-if="selectedFile"><el-button type="primary" @click="startUpload" :loading="uploading">开始上传</el-button><el-button @click="clearFile">清除文件</el-button><el-button v-if="currentUploadId" type="danger" @click="abortUpload":loading="aborting">取消上传</el-button></div><div class="upload-progress" v-if="uploading || uploadProgress.total > 0"><h4>上传进度:</h4><el-progress :percentage="uploadPercentage" :status="uploadStatus":stroke-width="20"/><p>已上传:{{ uploadProgress.completed }} / {{ uploadProgress.total }} 分片</p><div v-if="uploadSpeed > 0" class="upload-stats"><p>上传速度:{{ formatFileSize(uploadSpeed) }}/s</p><p>剩余时间:{{ estimatedTime }}</p></div></div><div class="upload-result" v-if="uploadResult"><el-alert title="上传成功" type="success" :closable="false"show-icon><p><strong>上传ID:</strong>{{ uploadResult.uploadId }}</p><p><strong>文件名:</strong>{{ uploadResult.fileName }}</p><p><strong>文件大小:</strong>{{ formatFileSize(uploadResult.fileSize) }}</p><p><strong>分片数量:</strong>{{ uploadResult.chunkCount }}</p><p><strong>状态:</strong>{{ uploadResult.message }}</p></el-alert></div><div class="error-message" v-if="errorMessage"><el-alert :title="errorMessage" type="error" :closable="true"@close="clearError"show-icon/></div></div></el-card></div>
</template><script>
import { ref, computed } from 'vue'
import { ElMessage } from 'element-plus'
import { UploadFilled } from '@element-plus/icons-vue'
import MultipartUpload from '@/utils/MultipartUpload'export default {name: 'MultipartUploadDemo',components: {UploadFilled},setup() {const selectedFile = ref(null)const fileList = ref([])const uploading = ref(false)const aborting = ref(false)const uploadProgress = ref({ completed: 0, total: 0 })const uploadResult = ref(null)const errorMessage = ref('')const currentUploadId = ref('')const uploadStartTime = ref(0)const uploadedBytes = ref(0)const uploadSpeed = ref(0)// 分片大小 (10MB)const chunkSize = 10 * 1024 * 1024// 计算预计分片数const estimatedChunks = computed(() => {if (!selectedFile.value) return 0return Math.ceil(selectedFile.value.size / chunkSize)})// 计算上传百分比const uploadPercentage = computed(() => {if (uploadProgress.value.total === 0) return 0return Math.round((uploadProgress.value.completed / uploadProgress.value.total) * 100)})// 上传状态const uploadStatus = computed(() => {if (errorMessage.value) return 'exception'if (uploadPercentage.value === 100) return 'success'return undefined})// 预计剩余时间const estimatedTime = computed(() => {if (uploadSpeed.value === 0 || !selectedFile.value) return '计算中...'const remainingBytes = selectedFile.value.size - uploadedBytes.valueconst remainingSeconds = remainingBytes / uploadSpeed.valueif (remainingSeconds < 60) {return `${Math.round(remainingSeconds)}秒`} else if (remainingSeconds < 3600) {return `${Math.round(remainingSeconds / 60)}分钟`} else {return `${Math.round(remainingSeconds / 3600)}小时`}})// 文件选择处理const handleFileChange = (file) => {selectedFile.value = file.rawclearResult()}// 开始上传const startUpload = async () => {if (!selectedFile.value) {ElMessage.warning('请先选择文件')return}uploading.value = trueuploadProgress.value = { completed: 0, total: estimatedChunks.value }uploadStartTime.value = Date.now()uploadedBytes.value = 0clearError()clearResult()try {const multipartUpload = new MultipartUpload({chunkSize: chunkSize,onProgress: (progress) => {uploadProgress.value.completed = progress.partNumber// 计算上传速度const currentTime = Date.now()const elapsedTime = (currentTime - uploadStartTime.value) / 1000 // 秒uploadedBytes.value = progress.partNumber * chunkSizeuploadSpeed.value = uploadedBytes.value / elapsedTime},onError: (error) => {errorMessage.value = error.message || '上传失败'uploading.value = false},onSuccess: (result) => {uploadResult.value = resultElMessage.success('文件上传成功')uploading.value = false}})const result = await multipartUpload.upload(selectedFile.value)console.log('上传完成:', result)// 保存uploadId用于可能的取消操作currentUploadId.value = result.uploadId} catch (error) {console.error('上传失败:', error)errorMessage.value = error.message || '上传失败'ElMessage.error('文件上传失败')} finally {uploading.value = false}}// 取消上传const abortUpload = async () => {if (!currentUploadId.value) returnaborting.value = truetry {// 由于MinioChunkUploadController没有abort方法,这里只能在前端取消// 实际上已经上传的分片仍然会保留在Minio中ElMessage.info('上传已取消(注意:已上传的分片仍保留在服务器中)')uploading.value = falsecurrentUploadId.value = ''uploadProgress.value = { completed: 0, total: 0 }} catch (error) {console.error('取消上传失败:', error)ElMessage.error('取消上传失败')} finally {aborting.value = false}}// 清除文件const clearFile = () => {selectedFile.value = nullfileList.value = []clearResult()clearError()uploadProgress.value = { completed: 0, total: 0 }}// 清除结果const clearResult = () => {uploadResult.value = null}// 清除错误const clearError = () => {errorMessage.value = ''}// 格式化文件大小const formatFileSize = (bytes) => {if (bytes === 0) return '0 B'const k = 1024const sizes = ['B', 'KB', 'MB', 'GB']const i = Math.floor(Math.log(bytes) / Math.log(k))return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i]}return {selectedFile,fileList,uploading,aborting,uploadProgress,uploadResult,errorMessage,currentUploadId,uploadSpeed,estimatedChunks,uploadPercentage,uploadStatus,estimatedTime,handleFileChange,startUpload,abortUpload,clearFile,clearError,formatFileSize}}
}
</script><style scoped>
.multipart-upload-demo {padding: 20px;
}.upload-section {max-width: 800px;margin: 0 auto;
}.upload-demo {margin-bottom: 20px;
}.file-info {background: #f5f7fa;padding: 15px;border-radius: 4px;margin: 20px 0;
}.file-info h4 {margin-top: 0;color: #303133;
}.file-info p {margin: 8px 0;color: #606266;
}.upload-controls {margin: 20px 0;text-align: center;
}.upload-controls .el-button {margin: 0 10px;
}.upload-progress {margin: 20px 0;
}.upload-progress h4 {color: #303133;margin-bottom: 15px;
}.upload-stats {margin-top: 10px;font-size: 14px;color: #909399;
}.upload-result {margin: 20px 0;
}.error-message {margin: 20px 0;
}
</style>
8、结果测试
可以观察到分片上传成功
二、集群部署
1、集群准备
MinIO的分布式部署依赖纠删码实现数据冗余和高可用,要求最小集群规模为4节点,以支持数据分片(K个数据库)和(M个校验块)的分布式存储。
2、磁盘添加
部署纠删码需要挂载到一块全新的磁盘中,故需要添加一块全新的磁盘
3、检查磁盘块
lsblk
4、格式化挂载
mkfs.xfs /dev/sdb
mkdir -p /opt/minio/data
mount /dev/sdb /opt/minio/data
5.安装MinIO
以下命令都要同时在四台服务器中执行
mkdir -p /usr/local/minio
mkdir -p /etc/minio
cd /usr/local/minio
wget http://dl.minio.org.cn/server/minio/release/linux-amd64/minio
chmod +x minio
vim start_minio.sh
sh脚本命令:
#!/bin/bash
# 设置MinIO的管理员账号密码(建议生产环境修改为强密码)
export MINIO_ROOT_USER=minioadmin
export MINIO_ROOT_PASSWORD=minioadmin# 启动MinIO分布式集群
/usr/local/minio/minio server \--config-dir /etc/minio \ # 指定配置文件目录--address :9000 \ # 设置API服务监听端口--console-address :9001 \ # 设置控制台监听端口\# 节点1的4个数据目录http://192.168.88.131/opt/minio/data/data1 \http://192.168.88.131/opt/minio/data/data2 \http://192.168.88.131/opt/minio/data/data3 \http://192.168.88.131/opt/minio/data/data4 \\# 节点2的4个数据目录http://192.168.88.132/opt/minio/data/data1 \http://192.168.88.132/opt/minio/data/data2 \http://192.168.88.132/opt/minio/data/data3 \http://192.168.88.132/opt/minio/data/data4 \\# 节点3的4个数据目录http://192.168.88.133/opt/minio/data/data1 \http://192.168.88.133/opt/minio/data/data2 \http://192.168.88.133/opt/minio/data/data3 \http://192.168.88.133/opt/minio/data/data4 \\# 节点4的4个数据目录http://192.168.88.134/opt/minio/data/data1 \http://192.168.88.134/opt/minio/data/data2 \http://192.168.88.134/opt/minio/data/data3 \http://192.168.88.134/opt/minio/data/data4 \& # 后台运行
chmod +x start_minio.sh
chmod +x /usr/local/minio/minio
mkdir -p /opt/minio/data/data1 /opt/minio/data/data2 /opt/minio/data/data3 /opt/minio/data/data4
./start_minio.sh
至此集群环境已经搭建完毕,你可以在其中一个节点上传文件,会发现另外一个节点也可以看见文件数据
部署分布图:
6.负载均衡
在主节点里搭建Nginx来负载均衡到四个从节点
yum install gcc openssl openssl-devel pcre pcre-devel zlib zlib-devel -y
wget http://nginx.org/download/nginx-1.24.0.tar.gz
tar -zxvf nginx-1.24.0.tar.gz
cd nginx-1.24.0/
./configure --prefix=/usr/local/nginx
make install
/usr/local/nginx/sbin/nginx -c /usr/local/nginx/conf/nginx.conf
cd /usr/local/nginx/conf
在Nginx的配置文件中添加数据组,默认使用轮询
配置文件参考如下:
#user nobody;
worker_processes 1;events {worker_connections 1024;
}http {include mime.types;default_type application/octet-stream;sendfile on;keepalive_timeout 65;upstream api {server 192.168.88.131:9000;server 192.168.88.132:9000;server 192.168.88.133:9000;server 192.168.88.134:9000;}upstream webapi {server 192.168.88.131:9001;server 192.168.88.132:9001;server 192.168.88.133:9001;server 192.168.88.134:9001;}server {listen 80;server_name localhost;# MinIO APIlocation /minio/api/ {proxy_pass http://api;proxy_set_header Host $http_host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}# 根路径跳转到控制台location = / {return 302 /console/;}}server {listen 90;server_name localhost;# 关键修改:路径重写 + 根路径代理location /console/ {# 移除路径前缀:将 /console/xxx -> /xxxrewrite ^/console/(.*)$ /$1 break;proxy_pass http://webapi;# 必须添加的请求头proxy_set_header Host $http_host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;# 关键:支持WebSocketproxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";# 解决静态资源路径问题sub_filter_once off;sub_filter_types *;sub_filter 'href="/' 'href="/console/';sub_filter 'src="/' 'src="/console/';sub_filter '="/' '="/console/'; # 修复相对路径资源}# 防止直接访问根路径404location = / {return 302 /console/;}}
}
遂至,所有的集群工作都已经完成。
三、MinIO工具
1、MinioCient配置
@Beanpublic MinioClient minioClient() throws Exception {return MinioClient.builder().endpoint(minioProperties.getEndpoint()).credentials(minioProperties.getUsername(), minioProperties.getPassword()).httpClient(new OkHttpClient()).build();}
2、文件上传
minio内部会自动对大文件分片处理封装
@PostMapping("pdf")public String uploadPdf(MultipartFile file) throws Exception{String fileName = getObjName(file.getOriginalFilename(), PdfType);try(InputStream inputStream = file.getInputStream()) {minioClient.putObject(PutObjectArgs.builder().bucket(minioProperties.getDefaultBucket()).object(fileName)// 文件流;文件的总大小;每个分片的大小,-1表示默认分片5或者64MB.stream(inputStream, file.getSize(), -1).build());}// 获取文件对象进行验证minioClient.getObject(GetObjectArgs.builder().bucket(minioProperties.getDefaultBucket()).object(fileName).build());return minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder().bucket(minioProperties.getDefaultBucket()).object(fileName).method(Method.GET).expiry(minioProperties.getPresignedUrlExpiry(), TimeUnit.DAYS).build());}
3、docker-compose.yml
version: '3.7'services:minio:image: minio/minio:latestcontainer_name: minioports:- "9000:9000" # API端口- "9001:9001" # 控制台端口environment:MINIO_ROOT_USER: minioadmin # 默认用户名(强烈建议修改!)MINIO_ROOT_PASSWORD: minioadmin # 默认密码(强烈建议修改!)volumes:- /opt/minio-server/minio-data/:/data # 持久化存储目录command: server /data --console-address ":9001" # 明确指定控制台端口restart: unless-stoppednetworks:- minio_networknetworks:minio_network:driver: bridge
以上就是我整理的MinIO学习笔记,如有不足之处,欢迎指正。