《Python批量删除阿里云OSS文件:多线程删除与关键词过滤全解析》
上一篇文章介绍了如何批量下载阿里云oss里的文件,
这次的问题是, oss里有大量已经过时的日志文件,占用了大量空间,需要删除。所以写了一个批量删除 名称里包含某些字符的工具。
所需库内容如下
requirements.txt
aliyun-python-sdk-core==2.16.0
aliyun-python-sdk-kms==2.16.5
certifi==2025.4.26
cffi==1.17.1
charset-normalizer==3.4.2
crcmod==1.7
cryptography==45.0.3
idna==3.10
jmespath==0.10.0
oss2==2.19.1
pycparser==2.22
pycryptodome==3.23.0
python-dotenv==1.1.0
requests==2.32.3
six==1.17.0
urllib3==1.26
请自行新建.env文件
格式如下
OSS_ACCESS_KEY_ID=阿里KEY
OSS_ACCESS_KEY_SECRET=阿里KEY_SECRET
OSS_BUCKET_NAME = OSS包名
OSS_ENDPOINT = 所属节点
脚本代码如下
import oss2
from concurrent.futures import ThreadPoolExecutor
import time
import sys
import os
from dotenv import load_dotenv# 加载环境变量
load_dotenv()# 获取配置
access_key_id = os.getenv('OSS_ACCESS_KEY_ID')
access_key_secret = os.getenv('OSS_ACCESS_KEY_SECRET')
bucket_name = os.getenv('OSS_BUCKET_NAME')
endpoint = os.getenv('OSS_ENDPOINT')# 验证配置
if None in [access_key_id, access_key_secret, bucket_name, endpoint]:raise ValueError("Missing required OSS configuration in .env file")# 初始化OSS客户端
auth = oss2.Auth(access_key_id, access_key_secret)
bucket = oss2.Bucket(auth, endpoint, bucket_name)# 要匹配的文件名关键词
KEYWORD = 'rizhi-changshan20'def delete_files(max_workers=8, batch_size=1000):"""删除包含指定关键词的所有文件参数:max_workers: 删除线程数 (建议4-16)batch_size: 每页获取的对象数量 (必须1-1000)"""# 参数验证if not 1 <= batch_size <= 1000:raise ValueError("batch_size must be between 1 and 1000")total_files = 0deleted_files = 0skipped_files = 0start_time = time.time()# 进度显示def show_progress():elapsed = time.time() - start_timesys.stdout.write(f"\r处理中: 总数 {total_files} | "f"已删除 {deleted_files} | "f"跳过 {skipped_files} | "f"耗时 {elapsed:.1f}s")sys.stdout.flush()print(f"开始删除OSS中包含 '{KEYWORD}' 的文件...")print("按Ctrl+C可中断操作\n")try:with ThreadPoolExecutor(max_workers=max_workers) as executor:next_marker = ''while True:# 分页获取文件列表result = bucket.list_objects(max_keys=batch_size,marker=next_marker)if not result.object_list:break# 提交删除任务futures = []for obj in result.object_list:if KEYWORD in obj.key:futures.append(executor.submit(bucket.delete_object, obj.key))total_files += 1else:skipped_files += 1# 等待删除完成for future in futures:try:future.result()deleted_files += 1except Exception as e:print(f"\n删除失败: {str(e)}")if total_files % 10 == 0:show_progress()next_marker = result.next_markerif not next_marker:break# 最终结果show_progress()print("\n\n删除操作完成!")print(f"总扫描文件: {total_files + skipped_files}")print(f"匹配到文件: {total_files}")print(f"成功删除: {deleted_files}")print(f"删除失败: {total_files - deleted_files}")except KeyboardInterrupt:print("\n\n用户中断操作,已删除的文件无法恢复!")except Exception as e:print(f"\n发生错误: {str(e)}")finally:print("\n操作结束")if __name__ == '__main__':# 安全确认delete_files(max_workers=8, batch_size=1000)
有请AI助理解释代码
这段代码用于从阿里云OSS(对象存储服务)中批量删除文件,删除的条件是文件名包含指定的关键词。它通过多线程和分页的方式提高删除操作的效率。接下来,我会逐行解释代码的作用。
导入库
import oss2
from concurrent.futures import ThreadPoolExecutor
import time
import sys
import os
from dotenv import load_dotenv
- oss2:阿里云OSS的Python SDK,用于与阿里云OSS进行交互。
- ThreadPoolExecutor:来自
concurrent.futures
库,用于创建线程池,支持并发执行任务。 - time:提供时间操作,主要用来计算运行时长和展示进度。
- sys:用于输出控制台信息(比如实时显示进度)。
- os:用于访问操作系统功能(例如环境变量的读取)。
- dotenv:用于加载
.env
文件中的环境变量(例如OSS的凭证、存储桶信息等)。
加载环境变量
load_dotenv()
加载.env
文件中的环境变量,通常存放在项目根目录下,包含阿里云OSS访问的凭证和其他配置信息。
获取配置
access_key_id = os.getenv('OSS_ACCESS_KEY_ID')
access_key_secret = os.getenv('OSS_ACCESS_KEY_SECRET')
bucket_name = os.getenv('OSS_BUCKET_NAME')
endpoint = os.getenv('OSS_ENDPOINT')
从.env
文件中获取以下配置:
OSS_ACCESS_KEY_ID
:阿里云OSS的Access Key ID。OSS_ACCESS_KEY_SECRET
:阿里云OSS的Access Key Secret。OSS_BUCKET_NAME
:OSS存储桶的名称。OSS_ENDPOINT
:OSS的访问终端节点。
验证配置
if None in [access_key_id, access_key_secret, bucket_name, endpoint]:raise ValueError("Missing required OSS configuration in .env file")
检查加载的环境变量是否有效,如果缺少任何配置项,抛出异常提示用户配置错误。
初始化OSS客户端
auth = oss2.Auth(access_key_id, access_key_secret)
bucket = oss2.Bucket(auth, endpoint, bucket_name)
- 使用
access_key_id
和access_key_secret
初始化阿里云OSS的认证。 - 使用认证信息、存储桶名称和终端节点来初始化
oss2.Bucket
对象,之后可以用它执行文件操作(例如列出文件、删除文件等)。
定义关键词
KEYWORD = 'rizhi-changshan20'
定义要匹配的文件名关键词。所有文件名中包含该关键词的文件将会被删除。
删除文件函数
def delete_files(max_workers=8, batch_size=1000):...
delete_files
函数是实现文件删除的主要功能,它接受两个参数:
max_workers
: 多线程执行的最大线程数,默认值为8。batch_size
: 每次列出文件时获取的最大文件数量,默认为1000。
参数验证
if not 1 <= batch_size <= 1000:raise ValueError("batch_size must be between 1 and 1000")
验证batch_size
是否在合理范围(1到1000之间)。
进度显示
def show_progress():elapsed = time.time() - start_timesys.stdout.write(f"\r处理中: 总数 {total_files} | "f"已删除 {deleted_files} | "f"跳过 {skipped_files} | "f"耗时 {elapsed:.1f}s")sys.stdout.flush()
show_progress
函数用于在控制台实时显示处理进度,包括:
- 总文件数(
total_files
) - 已删除的文件数(
deleted_files
) - 跳过的文件数(
skipped_files
) - 已用时间(
elapsed
)
分页获取文件列表并删除
with ThreadPoolExecutor(max_workers=max_workers) as executor:next_marker = ''while True:result = bucket.list_objects(max_keys=batch_size,marker=next_marker)
使用分页方式获取存储桶中的文件列表。bucket.list_objects
方法返回文件列表,每次最多返回batch_size
个文件。当文件列表超过batch_size
时,使用next_marker
进行分页。
提交删除任务
futures = []
for obj in result.object_list:if KEYWORD in obj.key:futures.append(executor.submit(bucket.delete_object, obj.key))total_files += 1else:skipped_files += 1
遍历返回的文件列表,检查文件名是否包含KEYWORD
。如果是,则将删除任务提交到线程池,executor.submit
返回一个future
对象,代表异步执行的任务。
等待删除任务完成
for future in futures:try:future.result()deleted_files += 1except Exception as e:print(f"\n删除失败: {str(e)}")
等待所有异步删除任务完成。如果删除成功,更新deleted_files
计数。如果出现异常,则打印错误信息。
处理分页
next_marker = result.next_marker
if not next_marker:break
处理分页,当next_marker
为None
时表示已经处理完所有文件,退出循环。
完成删除操作后输出统计信息
show_progress()
print("\n\n删除操作完成!")
print(f"总扫描文件: {total_files + skipped_files}")
print(f"匹配到文件: {total_files}")
print(f"成功删除: {deleted_files}")
print(f"删除失败: {total_files - deleted_files}")
在所有文件删除完成后,调用show_progress
显示最终进度,并输出文件扫描、匹配、成功删除和失败删除的统计信息。
捕获中断和异常
except KeyboardInterrupt:print("\n\n用户中断操作,已删除的文件无法恢复!")
except Exception as e:print(f"\n发生错误: {str(e)}")
finally:print("\n操作结束")
- 捕获用户的中断操作(
Ctrl+C
),并提示已删除的文件无法恢复。 - 捕获其他异常并打印错误信息,最后打印“操作结束”。
主程序
if __name__ == '__main__':delete_files(max_workers=8, batch_size=1000)
总结
这段代码的主要功能是从阿里云OSS中批量删除符合指定关键词的文件。它通过分页和多线程的方式加速删除过程,同时提供了进度跟踪、错误处理和用户确认机制。