离线大文件与断点续传:ABP + TUS + MinIO/S3
离线大文件与断点续传:ABP + TUS + MinIO/S3 🚀
适用:需要在弱网/断点/超大文件(GB~TB)场景中,既要浏览器直传降压后端,又要企业级合规与可观测的 .NET/ABP 团队。
📚 目录
- 离线大文件与断点续传:ABP + TUS + MinIO/S3 🚀
- 1. 场景与目标 🎯
- 2. 架构与链路 🧱
- 2.1 总体架构图
- 2.2 上传时序图 ⏱️
- 3. 协议与约束速查 📜
- 4. 数据模型与状态机 🔄
- 4.1 上传状态机
- 5. 目录结构 📦
- 6. 环境搭建 ⚙️
- 6.1 MinIO + ClamAV(docker-compose)
- 6.2 ABP Host(模块初始化 + tusdotnet)
- 7. 扩展自检:确认 Concatenation/Checksum 🧪
- 8. 后端实现(S3/MinIO + 端到端校验)🧩
- 8.1 分片大小自适应(避免 10k 片)
- 8.2 CRC32C 聚合(跨分片复用,零额外 IO)🧮
- 8.3 S3 适配(Multipart + 分片/整对象校验)
- 8.4 合并、隔离与 AV 扫描(失败回补)
- 9. 前端实现(tus-js-client)与“可恢复”指纹 🧑💻
- 10. 多租户、安全与限流(TUS 路由放宽)🛡️
- 10.1 部署与限流示意图
- 11. 预签名直传(回退/小文件)🪪
- 12. 压测与验证(scripts/bench)🧪
- 13. 常见坑位与最佳实践 ✅
- 参考与延伸
1. 场景与目标 🎯
-
目标能力
- 可恢复上传(断网/刷新/窗口关闭后继续);
- 并发分片提升吞吐;
- 浏览器直传降低网关压力;
- 多租户隔离与审计;
- 安全闭环:篡改防护 + 端到端校验 + AV 沙箱扫描。
-
协议/存储选型
- TUS 1.0(支持 creation/checksum/concatenation 扩展)
- S3/MinIO Multipart(每片 5 MiB–5 GiB、≤10 000 片、对象 ≤5 TiB;最后一片可 <5 MiB)
- 完整性:启用 S3 分片/整对象校验(Checksum-*),不要把 ETag 当 MD5
- 合规:上传完成后进入隔离区,经 ClamAV(clamd/3310) 扫描通过才“发布”
参考链接见文末“参考与延伸”,正文只保留必要说明,阅读更顺畅。📚
2. 架构与链路 🧱
2.1 总体架构图
要点:前端 TUS 断点续传;网关用 tusdotnet 处理协议与中间态;合并阶段提交整对象 CRC32C;通过 ClamAV 扫描后再发布(或给预签名下载)。
2.2 上传时序图 ⏱️
3. 协议与约束速查 📜
- TUS:
POST
创建、PATCH
追加(Upload-Offset
)、HEAD
查询进度;并发需 Concatenation;可选 Checksum 扩展(若浏览器逐分片计算成本高,可改用 S3 端校验完成闭环)。 - S3/MinIO Multipart:片大小 5 MiB–5 GiB(最后一片可 <5 MiB),≤10 000 片,对象 ≤5 TiB;不要把 Multipart ETag 当整对象 MD5。
4. 数据模型与状态机 🔄
表 Uploads
UploadId
(TUS) | TenantId
| Status
(Created/Uploading/Scanning/Quarantined/Published/Failed) | BytesReceived
| PartSize
| S3UploadId
| Parts
(PartNumber,ETag,Checksum)[] | Sha256
| MetaHmac
| ExpiresAt
对账与回补
- 优先用本地
(PartNumber, ETag)
列表提交合并; - 必要时用
ListParts
对账/灾后恢复:分页(≤1000/页)与升序;同一PartNumber
多次成功上传时,以最后一次成功的 ETag 为准。
4.1 上传状态机
5. 目录结构 📦
abp-tus-s3-upload/modules/Abp.Uploads.Tus/ # TUS 中间件封装 + 自定义 Store(可选)Abp.ObjectStorage/ # S3/MinIO Multipart & 预签名Abp.Uploads.Scan/ # ClamAV 扫描与隔离/发布src/Abp.Uploads.Gateway/ # ABP Host(聚合 API / 路由)web/demo-uploader/ # tus-js-client 前端(PWA/离线队列)scripts/bench/ # 并发压测脚本(Node)infra/docker-compose.yaml # MinIO + ClamAV 一键启动docs/ops.md # 运维参数(分片/并发/TTL/配额/指标/代理超时/大头部)
6. 环境搭建 ⚙️
6.1 MinIO + ClamAV(docker-compose)
# infra/docker-compose.yaml
version: "3.8"
services:minio:image: minio/miniocommand: server /data --console-address ":9001"ports: ["9000:9000", "9001:9001"]environment:MINIO_ROOT_USER: minioadminMINIO_ROOT_PASSWORD: minioadminvolumes: ["./data/minio:/data"]clamav:image: mkodockx/docker-clamav:latestports: ["3310:3310"] # clamd (TCP)# freshclam 在容器内自动更新病毒库
ClamAV 大文件参数(ops.md 建议追加):
# clamd.conf(示例值,请按压测结果调整)
StreamMaxLength 2000M
MaxFileSize 2000M
MaxScanSize 4000M
clamd 默认限制较小;超大文件若不放宽会导致扫描被跳过或失败。机器内存建议 ≥3 GiB。
反向代理超时与大头部(ops.md 建议追加)
-
Nginx:
proxy_read_timeout 900s; proxy_send_timeout 900s; client_max_body_size 0; large_client_header_buffers 8 64k; # TUS 头较多,建议放宽
-
AWS ALB:Idle timeout ≥ 300 s(按分片时长/弱网适当上调)。
6.2 ABP Host(模块初始化 + tusdotnet)
// src/Abp.Uploads.Gateway/YourHostModule.cs
[DependsOn(typeof(AbpAspNetCoreMvcModule))]
public class YourHostModule : AbpModule
{public override void OnApplicationInitialization(ApplicationInitializationContext context){var app = context.GetApplicationBuilder();app.UseRouting();// ★ 对 TUS 路由使用更宽松的限流策略(见 §10)app.UseRateLimiter();// 注册 TUS 端点app.UseTus(ctx => new DefaultTusConfiguration{Store = new TusDiskStore(Path.Combine(AppContext.BaseDirectory, "tus-temp")),UrlPath = "/files",Events = new Events{OnAuthorizeAsync = _ => Task.CompletedTask,OnFileCompleteAsync = async e =>{var svc = e.HttpContext.RequestServices.GetRequiredService<IUploadFinalizeService>();await svc.FinalizeAndScanAsync(e.File.Id, e.File.Metadata);}}});}
}
7. 扩展自检:确认 Concatenation/Checksum 🧪
curl -i -X OPTIONS http://localhost:5000/files
# 期望响应头包含:
# Tus-Extension: creation,creation-with-upload,checksum,concatenation
未见
concatenation
→parallelUploads
会失败(501)。自定义 Store 时需实现ITusConcatenationStore
/ITusChecksumStore
。
8. 后端实现(S3/MinIO + 端到端校验)🧩
8.1 分片大小自适应(避免 10k 片)
static long CalcPartSize(long fileSize)
{const long MiB = 1024L * 1024L;long min = 8 * MiB; // 略高于最小 5MiB,提高吞吐long byCount = (long)Math.Ceiling((double)fileSize / 9000); // 预留冗余return Math.Max(min, AlignToMiB(byCount));
}
static long AlignToMiB(long bytes) =>((bytes + (1024*1024 - 1)) / (1024*1024)) * (1024*1024);
8.2 CRC32C 聚合(跨分片复用,零额外 IO)🧮
关键点:同一个 UploadId 贯穿同一个 CRC32C 聚合器,所有分片在上传时都把字节流喂给它;等全部分片结束后再一次性
Finalize
得到整对象 CRC32C(Base64),在 Complete 阶段提交。
public interface ICrc32CAggregator
{void Append(ReadOnlySpan<byte> data);string FinalizeBase64(); // 所有分片结束后调用一次
}public sealed class Crc32CAggregator : ICrc32CAggregator
{private readonly Force.Crc32.Crc32CAlgorithm _crc = new();private bool _finalized;public void Append(ReadOnlySpan<byte> data){if (_finalized) throw new InvalidOperationException("Already finalized");if (!data.IsEmpty){var tmp = data.ToArray(); // TransformBlock 需要 byte[]_crc.TransformBlock(tmp, 0, tmp.Length, null, 0);}}public string FinalizeBase64(){if (!_finalized){_crc.TransformFinalBlock(Array.Empty<byte>(), 0, 0);_finalized = true;}return Convert.ToBase64String(_crc.Hash!);}
}// 计数流:把读到的字节转喂聚合器
public sealed class Crc32CCountingStream : Stream
{private readonly Stream _inner;private readonly ICrc32CAggregator _agg;public Crc32CCountingStream(Stream inner, ICrc32CAggregator agg) { _inner = inner; _agg = agg; }public override int Read(byte[] buffer, int offset, int count){int n = _inner.Read(buffer, offset, count);if (n > 0) _agg.Append(buffer.AsSpan(offset, n));return n;}public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken ct){int n = await _inner.ReadAsync(buffer.AsMemory(offset, count), ct);if (n > 0) _agg.Append(buffer.AsSpan(offset, n));return n;}// 其余成员代理public override bool CanRead => _inner.CanRead;public override bool CanSeek => false;public override bool CanWrite => false;public override long Length => _inner.CanSeek ? _inner.Length : 0;public override long Position { get => _inner.CanSeek ? _inner.Position : 0; set => throw new NotSupportedException(); }public override void Flush() => _inner.Flush();public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();public override void SetLength(long value) => throw new NotSupportedException();public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
}
.NET 8 可用
System.IO.Hashing.Crc32C
实现同样接口。
8.3 S3 适配(Multipart + 分片/整对象校验)
public sealed record UploadedPart(int PartNumber, string ETag, string? Checksum);public interface IObjectStorageService
{Task<string> CreateMultipartAsync(string bucket, string key, IDictionary<string,string>? meta);Task<UploadedPart> UploadPartAsync(string bucket, string key, string uploadId, int partNo,Stream stream, long size, ICrc32CAggregator globalAgg, CancellationToken ct);Task CompleteAsync(string bucket, string key, string uploadId,IEnumerable<UploadedPart> parts, ICrc32CAggregator agg, CancellationToken ct);
}public class S3StorageService : IObjectStorageService
{private readonly IAmazonS3 _s3;public S3StorageService(IAmazonS3 s3) => _s3 = s3;public async Task<string> CreateMultipartAsync(string bucket, string key, IDictionary<string,string>? meta){var resp = await _s3.InitiateMultipartUploadAsync(new InitiateMultipartUploadRequest{BucketName = bucket, Key = key, Metadata = meta});return resp.UploadId;}public async Task<UploadedPart> UploadPartAsync(string bucket, string key, string uploadId, int partNo,Stream stream, long size, ICrc32CAggregator globalAgg, CancellationToken ct){using var counting = new Crc32CCountingStream(stream, globalAgg);var req = new UploadPartRequest{BucketName = bucket, Key = key, UploadId = uploadId,PartNumber = partNo, InputStream = counting, PartSize = size,ChecksumAlgorithm = ChecksumAlgorithm.CRC32C // 分片校验};var resp = await _s3.UploadPartAsync(req, ct);return new(partNo, resp.ETag, resp.ChecksumCRC32C ?? resp.ChecksumSHA256);}public Task CompleteAsync(string bucket, string key, string uploadId,IEnumerable<UploadedPart> parts, ICrc32CAggregator agg, CancellationToken ct){var completed = parts.OrderBy(p=>p.PartNumber).Select(p => new Amazon.S3.Model.PartETag(p.PartNumber, p.ETag)).ToList();var req = new CompleteMultipartUploadRequest{BucketName = bucket, Key = key, UploadId = uploadId, PartETags = completed,ChecksumCRC32C = agg.FinalizeBase64() // ★ 提交整对象校验};return _s3.CompleteMultipartUploadAsync(req, ct);}
}
8.4 合并、隔离与 AV 扫描(失败回补)
public class UploadFinalizeService : IUploadFinalizeService
{private readonly IObjectStorageService _s3;private readonly IUploadsRepository _repo;private readonly IAntivirus _av;private readonly IQuarantineService _quarantine;public async Task FinalizeAndScanAsync(string tusFileId, IDictionary<string, Metadata> meta){var rec = await _repo.GetAsync(tusFileId);var agg = await _repo.GetOrCreateCrc32CAggregatorAsync(rec.UploadDbId); // 同一 UploadId 贯穿// 1) 用“本地已确认的 (PartNumber, ETag)”提交合并,并附整对象校验await _s3.CompleteAsync(rec.Bucket, rec.ObjectKey, rec.S3UploadId, rec.Parts, agg, default);// 2) 隔离区扫描(clamd 3310)await _quarantine.MoveToQuarantineAsync(rec.Bucket, rec.ObjectKey);await using var stream = await _quarantine.OpenStreamAsync(rec.Bucket, rec.ObjectKey);var verdict = await _av.ScanStreamAsync(stream);if (verdict.IsClean){await _quarantine.PublishAsync(rec.Bucket, rec.ObjectKey);await _repo.MarkPublishedAsync(tusFileId);}else{await _repo.MarkQuarantinedAsync(tusFileId, verdict.Signature);}}// 定时对账与回补(必要时用 ListParts)public async Task ReconcileAsync(Guid uploadDbId, CancellationToken ct){var rec = await _repo.GetAsync(uploadDbId);var expected = rec.Parts.ToDictionary(p=>p.PartNumber, p=>p);// 分页读取 ListParts(≤1000/页),汇总并按分片号升序var listed = await _repo.ListAllPartsAsync(rec);var missing = expected.Keys.Except(listed.Select(p=>p.PartNumber));foreach (var partNo in missing){await using var part = await _repo.OpenTempPartAsync(rec, partNo, ct);var agg = await _repo.GetOrCreateCrc32CAggregatorAsync(rec.UploadDbId);await _s3.UploadPartAsync(rec.Bucket, rec.ObjectKey, rec.S3UploadId, partNo, part, part.Length, agg, ct);}}
}
9. 前端实现(tus-js-client)与“可恢复”指纹 🧑💻
⚠️ fingerprint 必须稳定可复用。随机盐需持久化(IndexedDB),否则会破坏断点恢复。
import * as tus from "tus-js-client";// IndexedDB 示例:根据租户+用户+文件特征,加载或创建稳定盐
async function loadOrCreateSaltFromIDB(tenant: string, userId: string, name: string, size: number, lastModified: number) {const key = `${tenant}:${userId}:${name}:${size}:${lastModified}`;const db = await openDB("tus-salts", 1, { upgrade(db) { db.createObjectStore("salts"); } });const exist = await db.get("salts", key);if (exist) return exist as string;const salt = crypto.randomUUID();await db.put("salts", salt, key);return salt;
}const salt = await loadOrCreateSaltFromIDB(currentTenantId, currentUserId, file.name, file.size, file.lastModified);const upload = new tus.Upload(file, {endpoint: "/files",retryDelays: [0, 1000, 3000, 5000],metadata: {name: file.name,size: String(file.size),sha256: await sha256(file), // 供 HMAC/后端校验tenant: currentTenantId,ts: String(Date.now()),hmac: await signMeta(/* name|size|sha256|tenant|ts */)},parallelUploads: 3, // ★ 需服务器支持 concatenationchunkSize: 8 * 1024 * 1024,fingerprint: (f) =>`${currentTenantId}/${currentUserId}/${f.name}/${f.size}/${f.lastModified}/${salt}`
});upload.start();
10. 多租户、安全与限流(TUS 路由放宽)🛡️
builder.Services.AddRateLimiter(o =>
{// 全局默认o.GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, string>(ctx =>RateLimitPartition.GetTokenBucket(ctx.User?.Identity?.Name ?? ctx.Connection.RemoteIpAddress?.ToString() ?? "anon",_ => new TokenBucketRateLimiterOptions{TokenLimit = 100, TokensPerPeriod = 100,ReplenishmentPeriod = TimeSpan.FromSeconds(10),QueueLimit = 100, AutoReplenishment = true}));// TUS 路由专属“软”策略(按 Upload-Token/用户分区)o.AddPolicy("tus-soft", ctx =>RateLimitPartition.GetTokenBucket("tus:" + (ctx.Request.Headers["Upload-Token"].ToString()?? $"{ctx.User?.Identity?.Name}:{ctx.Connection.RemoteIpAddress}"),_ => new TokenBucketRateLimiterOptions{TokenLimit = 200, TokensPerPeriod = 200,ReplenishmentPeriod = TimeSpan.FromSeconds(10),QueueLimit = 500, AutoReplenishment = true}));
});var app = builder.Build();
app.UseRouting();
// 仅对 /files 启用“tus-soft”;其他路径走全局 limiter
app.UseWhen(ctx => ctx.Request.Path.StartsWithSegments("/files"),b => b.UseRateLimiter(new RateLimiterOptions { PolicyName = "tus-soft" }));
app.UseRateLimiter();
app.MapControllers();
10.1 部署与限流示意图
11. 预签名直传(回退/小文件)🪪
- 小文件使用 PUT 预签名直接上传到 MinIO/S3,可要求
Content-MD5
/Content-Type 等; - 大文件如需分片预签名:服务端维护
UploadId
,为每个PartNumber
生成 URL,前端并行UploadPart
后回调合并(非本文重点)。
12. 压测与验证(scripts/bench)🧪
- 吞吐:对比 8/16/32/64 MiB
chunkSize
与parallelUploads=1/3/5
的增益; - 稳定性:模拟断网/重连/刷新恢复;
- 约束验证:验证 ≤10 000 片、最小 5 MiB(最后一片可 <5 MiB) 与 Checksum 对账(分片/整对象)。
// scripts/bench/tus-bench.js
import * as tus from "tus-js-client";
import fs from "node:fs";const filePath = process.argv[2];
const concurrency = Number(process.env.N || 50);
const endpoint = process.env.ENDPOINT || "http://localhost:5000/files";function startOne(i){return new Promise((resolve,reject)=>{const stream = fs.createReadStream(filePath);const upload = new tus.Upload(stream, {endpoint,metadata: { name: `bench-${i}` },parallelUploads: 3,retryDelays: [0, 1000, 3000],chunkSize: 8*1024*1024});upload.onError = reject;upload.onSuccess = resolve;upload.start();});
}
await Promise.all([...Array(concurrency)].map((_,i)=>startOne(i)));
console.log("ALL DONE");
13. 常见坑位与最佳实践 ✅
- 并发需要 Concatenation:缺失会 501;
OPTIONS /files
自检。 - ETag ≠ MD5(尤其 Multipart):完整性请用 Checksum-* 或自算 SHA-256。
- Complete 用本地 Part 列表;
ListParts
仅用于对账/恢复,注意分页与升序;同一分片以最后一次成功为准。 - clamd 参数:放宽
StreamMaxLength/MaxFileSize/MaxScanSize
;内存 ≥3 GiB。 - 限流:TUS 路由单独放宽;按 UploadId/用户做分区。
- 反向代理:放宽超时;允许大头部(
large_client_header_buffers
);确保HEAD/PATCH
透传。 - Fingerprint:加入
userId
与“持久化盐”,保证断点恢复的稳定性与低碰撞。
参考与延伸
- TUS 协议与扩展(Concatenation/Checksum):https://tus.io/protocols/resumable-upload
- tus-js-client(并发/断点):https://github.com/tus/tus-js-client
- tusdotnet(.NET 服务端,扩展与事件):https://github.com/tusdotnet/tusdotnet
- S3 多段上传限制(片大小/片数/对象上限/最后一片):https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
- S3 完整性校验(分片/整对象、Checksum-* API、ETag 注意):https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
- S3 多段上传概览(API/分页/完成):https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html
- MinIO 浏览器预签名上传(PUT):https://docs.min.io/community/minio-object-store/integrations/presigned-put-upload-via-browser.html
- ASP.NET Core Rate Limiting(注册/策略/管道):https://learn.microsoft.com/aspnet/core/performance/rate-limit
- OpenTelemetry + Prometheus(.NET 指标导出):https://learn.microsoft.com/dotnet/core/diagnostics/metrics-collection#prometheus
- ClamAV(clamd/3310、使用与配置要点):https://docs.clamav.net/manual/Usage.html