阿里云OSS 上传文件 Python版本
单文件:
import alibabacloud_oss_v2 as oss# 配置accessKeyId = "***************"
accessKeySecret = "#########"
customRegion = "cn-%%%%%%"
customeBucket = "&&&&&&&&&"# 配置静态访问凭证(生产环境不推荐硬编码)
static_cred = oss.credentials.StaticCredentialsProvider(access_key_id=accessKeyId,access_key_secret=accessKeySecret,
)def init_client():cfg = oss.config.load_default()cfg.credentials_provider = static_credcfg.region = customRegionreturn oss.Client(cfg)def multi_upload(files_map):""":param files_map: 字典类型 { "目标object路径": "本地文件绝对路径" }"""client = init_client()uploader = client.uploader()for obj_key, local_path in files_map.items():try:request = oss.PutObjectRequest(bucket=customeBucket, key=obj_key)result = uploader.upload_file(request,filepath=local_path,part_size=10 * 1024 * 1024, # 分片大小10MBpart_num=3, # 并发分片数)print(f"文件 {local_path} 上传成功,ETag: {result.etag}")except oss as e:print(f"上传失败:{e.message}")if __name__ == "__main__":# 定义多文件映射(目标路径:本地路径)file_mapping = {"sunwukongtest/index.html": "C:/Users/57307/Desktop/Test/TestHtml/index.html","sunwukongtest/index1.html": "C:/Users/57307/Desktop/Test/TestHtml/index1.html","sunwukongtest/index2.html": "C:/Users/57307/Desktop/Test/TestHtml/index2.html","sunwukongtest/index3.html": "C:/Users/57307/Desktop/Test/TestHtml/index3.html","sunwukongtest/index4.html": "C:/Users/57307/Desktop/Test/TestHtml/index4.html",}multi_upload(file_mapping)
分片上传
import os
import alibabacloud_oss_v2 as oss# 配置accessKeyId = "***************"
accessKeySecret = "#########"
customRegion = "cn-%%%%%%"
customeBucket = "&&&&&&&&&"# 配置静态访问凭证(生产环境不推荐硬编码)
static_cred = oss.credentials.StaticCredentialsProvider(access_key_id=accessKeyId,access_key_secret=accessKeySecret,
)def init_client():cfg = oss.config.load_default()cfg.credentials_provider = static_credcfg.region = customRegionreturn oss.Client(cfg)def multi_upload(files_map):""":param files_map: 字典类型 { "目标object路径": "本地文件绝对路径" }"""client = init_client()for obj_key, local_path in files_map.items():# 初始化分片上传请求,获取upload_id用于后续分片上传result = client.initiate_multipart_upload(oss.InitiateMultipartUploadRequest(bucket=customeBucket,key=obj_key,))# 定义每个分片的大小为5MBpart_size = 5 * 1024 * 1024# 获取要上传文件的总大小data_size = os.path.getsize(local_path)# 初始化分片编号,从1开始part_number = 1# 存储每个分片上传的结果upload_parts = []# 打开文件以二进制模式读取with open(local_path, "rb") as f:# 遍历文件,按照part_size分片上传for start in range(0, data_size, part_size):n = part_sizeif start + n > data_size: # 处理最后一个分片可能小于part_size的情况n = data_size - start# 创建SectionReader来读取文件的特定部分reader = oss.io_utils.SectionReader(oss.io_utils.ReadAtReader(f), start, n)# 上传分片up_result = client.upload_part(oss.UploadPartRequest(bucket=customeBucket,key=obj_key,upload_id=result.upload_id,part_number=part_number,body=reader,))# 打印每个分片上传的结果信息print(f"status code: {up_result.status_code},"f" request id: {up_result.request_id},"f" part number: {part_number},"f" content md5: {up_result.content_md5},"f" etag: {up_result.etag},"f" hash crc64: {up_result.hash_crc64},")# 将分片上传结果保存到列表中upload_parts.append(oss.UploadPart(part_number=part_number, etag=up_result.etag))# 增加分片编号part_number += 1# 对上传的分片按照分片编号排序parts = sorted(upload_parts, key=lambda p: p.part_number)# 发送完成分片上传请求,合并所有分片为一个完整的对象result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(bucket=customeBucket,key=obj_key,upload_id=result.upload_id,complete_multipart_upload=oss.CompleteMultipartUpload(parts=parts),))# 下面的代码是另一种方式,通过服务器端列出并合并所有分片数据为一个完整的对象# 这种方法适用于当您不确定所有分片是否都已成功上传时# Merge fragmented data into a complete Object through the server-side List method# result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(# bucket=args.bucket,# key=args.key,# upload_id=result.upload_id,# complete_all='yes'# ))# 输出完成分片上传的结果信息print(f"status code: {result.status_code},"f" request id: {result.request_id},"f" bucket: {result.bucket},"f" key: {result.key},"f" location: {result.location},"f" etag: {result.etag},"f" encoding type: {result.encoding_type},"f" hash crc64: {result.hash_crc64},"f" version id: {result.version_id},")if __name__ == "__main__":# 定义多文件映射(目标路径:本地路径)file_mapping = {"sunwukongtest/indx.html": "C:/Users/57307/Desktop/Test/TestHtml/index.html",}multi_upload(file_mapping)