当前位置: 首页 > news >正文

CTFd 文件上传模块解读

CTFd 的 utils.uploads 上传模块

ctfd 的文件上传模块是一种支持多 backend 的文件存储抽象。用户可以在 config 里修改 UPLOAD_PROVIDER 切换最终的存储位置到 s3 和 filesystem。一言以蔽之,就是封装一个 store 成员函数,然后根据不同 sdk 的 api 填充不同实现

目录结构

uploads/
├── __init__.py
└── uploaders.py

对外暴露的 api 定义在 __init__.py 里,uploaders.py 定义了工具类。用户使用时的代码 pattern 形如:

from CTFd.utils.uploads import upload_file

其中 upload_file 定义在 __init__

upload_file 定义

将文件上传到后端数据存储 backend 里,并检查数据库,当文件元数据缺失时创建元数据。

def upload_file(*args, **kwargs):"""upload the file to data storage backend and create file metadata in `Files`, `ChallengeFiles`, or `PageFiles` table.Check if backend updated file metadata in database. If not, create a new file metadata entry."""file_obj = kwargs.get("file")challenge_id = kwargs.get("challenge_id") or kwargs.get("challenge")page_id = kwargs.get("page_id") or kwargs.get("page")file_type = kwargs.get("type", "standard")location = kwargs.get("location")# Validate location and default filename to uploaded file's nameparent = Nonefilename = file_obj.filenameif location:path = Path(location)if len(path.parts) != 2:raise ValueError("Location must contain two parts, a directory and a filename")# Allow location to override the directory and filenameparent = path.parts[0]filename = path.parts[1]location = parent + "/" + filenamemodel_args = {"type": file_type, "location": location}model = Filesif file_type == "challenge":model = ChallengeFilesmodel_args["challenge_id"] = challenge_idif file_type == "page":model = PageFilesmodel_args["page_id"] = page_id# Hash is calculated before upload since S3 file upload closes file objectsha1sum = hash_file(fp=file_obj)uploader = get_uploader()location = uploader.upload(file_obj=file_obj, filename=filename, path=parent)model_args["location"] = locationmodel_args["sha1sum"] = sha1sumexisting_file = Files.query.filter_by(location=location).first()if existing_file:for k, v in model_args.items():setattr(existing_file, k, v)db.session.commit()file_row = existing_fileelse:file_row = model(**model_args)db.session.add(file_row)db.session.commit()return file_row

uploader 的定义

主要成员方法 :store() 方法接受一个文件路径,并实际负责把文件内容存储到文件路径;upload() 方法是对用户暴露的方法,在保存前对用户入参做过滤和检查。

class BaseUploader(object):def __init__(self):"""Initialize the uploader with any required information"""raise NotImplementedErrordef store(self, fileobj, filename):"""Directly store a file object at the specified filename"""raise NotImplementedErrordef upload(self, file_obj, filename):"""Upload a file while handling any security protections or file renaming"""raise NotImplementedErrordef download(self, filename):"""Generate a Flask response to download the requested file"""raise NotImplementedErrordef delete(self, filename):"""Delete an uploaded file from the file store"""raise NotImplementedErrordef sync(self):"""Download all remotely hosted files for the purpose of exporting"""raise NotImplementedErrordef open(self, mode="rb"):"""Return a file pointer for an uploaded file.In the case of remotely hosted files, download the target file and thenreturn the file pointer for the local copy."""raise NotImplementedErrorclass FilesystemUploader(BaseUploader):def __init__(self, base_path=None):super(BaseUploader, self).__init__()self.base_path = base_path or current_app.config.get("UPLOAD_FOLDER")def store(self, fileobj, filename):location = os.path.join(self.base_path, filename)directory = os.path.dirname(location)if not os.path.exists(directory):os.makedirs(directory)with open(location, "wb") as dst:copyfileobj(fileobj, dst, 16384)return filenamedef upload(self, file_obj, filename, path=None):if len(filename) == 0:raise Exception("Empty filenames cannot be used")# Sanitize directory nameif path:path = secure_filename(path) or hexencode(os.urandom(16))path = path.replace(".", "")else:path = hexencode(os.urandom(16))# Sanitize file namefilename = secure_filename(filename)file_path = posixpath.join(path, filename)return self.store(file_obj, file_path)def download(self, filename):return send_file(safe_join(self.base_path, filename), as_attachment=True)def delete(self, filename):if os.path.exists(os.path.join(self.base_path, filename)):file_path = PurePath(filename).parts[0]rmtree(os.path.join(self.base_path, file_path))return Truereturn Falsedef sync(self):passdef open(self, filename, mode="rb"):path = Path(safe_join(self.base_path, filename))return path.open(mode=mode)class S3Uploader(BaseUploader):def __init__(self):super(BaseUploader, self).__init__()self.s3 = self._get_s3_connection()self.bucket = get_app_config("AWS_S3_BUCKET")# If the custom prefix is provided, add a slash if it's missingcustom_prefix = get_app_config("AWS_S3_CUSTOM_PREFIX")if custom_prefix and custom_prefix.endswith("/") is False:custom_prefix += "/"self.s3_prefix: str = custom_prefixdef _get_s3_connection(self):access_key = get_app_config("AWS_ACCESS_KEY_ID")secret_key = get_app_config("AWS_SECRET_ACCESS_KEY")endpoint = get_app_config("AWS_S3_ENDPOINT_URL")region = get_app_config("AWS_S3_REGION")addressing_style = get_app_config("AWS_S3_ADDRESSING_STYLE")client = boto3.client("s3",config=Config(signature_version="s3v4", s3={"addressing_style": addressing_style}),aws_access_key_id=access_key,aws_secret_access_key=secret_key,endpoint_url=endpoint,region_name=region,)return clientdef _clean_filename(self, c):if c in string.ascii_letters + string.digits + "-" + "_" + ".":return Truedef store(self, fileobj, filename):if self.s3_prefix:filename = self.s3_prefix + filenameself.s3.upload_fileobj(fileobj, self.bucket, filename)return filenamedef upload(self, file_obj, filename, path=None):# Sanitize directory nameif path:path = secure_filename(path) or hexencode(os.urandom(16))path = path.replace(".", "")# Sanitize pathpath = filter(self._clean_filename, secure_filename(path).replace(" ", "_"))path = "".join(path)else:path = hexencode(os.urandom(16))# Sanitize file namefilename = filter(self._clean_filename, secure_filename(filename).replace(" ", "_"))filename = "".join(filename)if len(filename) <= 0:return Falsedst = path + "/" + filenames3_dst = dstif self.s3_prefix:s3_dst = self.s3_prefix + dstself.s3.upload_fileobj(file_obj, self.bucket, s3_dst)return dstdef download(self, filename):# S3 URLs by default are valid for one hour.# We round the timestamp down to the previous hour and generate the link at that timecurrent_timestamp = int(time.time())truncated_timestamp = current_timestamp - (current_timestamp % 3600)if self.s3_prefix:filename = self.s3_prefix + filenamekey = filenamefilename = filename.split("/").pop()with freeze_time(datetime.datetime.utcfromtimestamp(truncated_timestamp)):url = self.s3.generate_presigned_url("get_object",Params={"Bucket": self.bucket,"Key": key,"ResponseContentDisposition": "attachment; filename={}".format(filename),"ResponseCacheControl": "max-age=3600",},ExpiresIn=3600,)custom_domain = get_app_config("AWS_S3_CUSTOM_DOMAIN")if custom_domain:url = urlparse(url)._replace(netloc=custom_domain).geturl()return redirect(url)def delete(self, filename):if self.s3_prefix:filename = self.s3_prefix + filenameself.s3.delete_object(Bucket=self.bucket, Key=filename)return Truedef sync(self):local_folder = current_app.config.get("UPLOAD_FOLDER")# If the bucket is empty then Contents will not be in the responseif self.s3_prefix:bucket_list = self.s3.list_objects(Bucket=self.bucket, Prefix=self.s3_prefix).get("Contents", [])else:bucket_list = self.s3.list_objects(Bucket=self.bucket).get("Contents", [])for s3_key in bucket_list:s3_object = s3_key["Key"]# We don't want to download any directoriesif s3_object.endswith("/") is False:local_s3_object = s3_objectif self.s3_prefix:local_s3_object = local_s3_object.removeprefix(self.s3_prefix)local_path = os.path.join(local_folder, local_s3_object)directory = os.path.dirname(local_path)if not os.path.exists(directory):os.makedirs(directory)self.s3.download_file(self.bucket, s3_object, local_path)def open(self, filename, mode="rb"):local_folder = current_app.config.get("UPLOAD_FOLDER")local_path = os.path.join(local_folder, filename)self.s3.download_file(self.bucket, filename, local_path)return Path(local_path).open(mode=mode)
http://www.xdnf.cn/news/383725.html

相关文章:

  • CSDN 中 LaTeX 数学公式输入方法
  • NVMe控制器之仿真平台搭建
  • 深入探究 InnoDB 的写失效问题
  • C34-递归函数编码实战
  • Profinet转CanOpen协议转换网关,破解工业设备“语言障碍”
  • 前端CSS场景题篇【持续更新】
  • Pass@1、EM、LC-winrate/winrate、CSL—— 大模型评测指标
  • Linux时间同步服务
  • Java多线程(超详细版!!)
  • 智能指针:C++内存管理的现代解决方案
  • 专业级软件卸载工具:免费使用,彻底卸载无残留!
  • 【CF】Day56——Codeforces Round 940 (Div. 2) and CodeCraft-23 BCD
  • 警备,TRO风向预警,In-N-Out Burgers维权风暴来袭
  • 25.K个一组翻转链表
  • 2025年PMP 学习七 -第5章 项目范围管理 (5.4,5.5,5.6 )
  • 多线程获取VI模块的YUV数据
  • 21、DeepSeekMath论文笔记(GRPO)
  • 十七、统一建模语言 UML
  • Win11安装APK方法详解
  • Trex -用 Python生成特定的流量模式
  • C++:this指针
  • CMake 入门实践
  • 牛客练习赛138
  • 8.5 表格进阶
  • (四)毛子整洁架构(Presentation层/Authentiacation)
  • 批量修改json文件中的标签
  • 【MCAL】TC397+EB-tresos之I2c配置实战(同步、异步)
  • 2025年客运从业资格证备考单选练习题
  • Wallcraft 3.53.0 | 提供高质量动态4D壁纸,解锁高级版,无广告干扰
  • 《Python星球日记》 第50天:深度学习概述与环境搭建