当前位置: 首页 > backend >正文

LLM基础之源码一

transformers 核心源码梳理

Trainer部分:

__init__() 初始化函数:
def __init__(xxx):if args is None:output_dir = "tmp_trainer"args = TrainingArguments(output_dir=output_dir) self.args = argsself.compute_loss_func = compute_loss_func# Seed must be set before instantiating the model when using modelenable_full_determinism(self.args.seed) if self.args.full_determinism else set_seed(self.args.seed)self.hp_name = Noneself.deepspeed = Noneself.is_in_train = Falseself.model = modelself.create_accelerator_and_postprocess() # 初始化加速相关参数如: self.accelerator(Accelerator)self._memory_tracker = TrainerMemoryTracker(self.args.skip_memory_metrics) # cpu/gpu监控器初始化,貌似需要先安装了 psutil 模块 self._memory_tracker.start()# set the correct log level depending on the nodelog_level = args.get_process_log_level()logging.set_verbosity(log_level)# force device and distributed setup init explicitlyargs._setup_devicesif model is None:if model_init is not None: # 每次调train()生效的模型初始化函数。self.model_init = model_initmodel = self.call_model_init()xxxxx if model.__class__.__name__ in MODEL_MAPPING_NAMES: # 模型类名不能是已有的如albert raise ValueError(xxx)if getattr(model, "is_parallelizable", False) and getattr(model, "model_parallel", False):self.is_model_parallel = True if getattr(model, "hf_device_map", None) is not None:devices = [device for device in set(model.hf_device_map.values()) if device not in ["cpu", "disk"]]if len(devices) > 1:self.is_model_parallel = Trueelif len(devices) == 1:self.is_model_parallel = self.args.device != torch.device(devices[0])else:self.is_model_parallel = False# warn usersif self.is_model_parallel:logger.info("You have loaded a model on multiple GPUs. `is_model_parallel` attribute will be force-set"" to `True` to avoid any unexpected behavior such as device placement mismatching.")if self.args.use_liger_kernel: # Patch the model with liger kernels. xxx self.is_fsdp_xla_enabled = args.fsdp_config["xla"] # 核心功能是模型分片后存放各分片到不同的GPU上,而不是默认的一块GPU存放一个完整的模型,FSDP主要是为了解决传统数据并行(如DDP)在训练大模型时显存占用过高的问题# one place to sort out whether to place the model on device or not# postpone switching model to cuda when:# 1. MP - since we are trying to fit a much bigger than 1 gpu model# 2. fp16-enabled DeepSpeed loads the model in half the size and it doesn't need .to() anyway,#    and we only use deepspeed for training at the moment# 3. full bf16 or fp16 eval - since the model needs to be cast to the right dtype first# 4. FSDP - same as MPif (self.is_model_parallelor self.is_deepspeed_enabledor ((args.fp16_full_eval or args.bf16_full_eval) and not args.do_train)or self.is_fsdp_xla_enabledor self.is_fsdp_enabled):self.place_model_on_device = Falsedefault_collator = (DataCollatorWithPadding(processing_class)if processing_class is not Noneand isinstance(processing_class, (PreTrainedTokenizerBase, SequenceFeatureExtractor))else default_data_collator)self.data_collator = data_collator if data_collator is not None else default_collatorself.train_dataset = train_datasetself.eval_dataset = eval_datasetself.processing_class = processing_classxxxx # later use `self.model is self.model_wrapped` to check if it's wrapped or notself.model_wrapped = modelself.model = modelself.neftune_noise_alpha = args.neftune_noise_alphaself.compute_metrics = compute_metricsself.preprocess_logits_for_metrics = preprocess_logits_for_metricsself.optimizer, self.lr_scheduler = optimizersself.optimizer_cls_and_kwargs = optimizer_cls_and_kwargsdefault_callbacks = DEFAULT_CALLBACKS + get_reporting_integration_callbacks(self.args.report_to)callbacks = default_callbacks if callbacks is None else default_callbacks + callbacksself.callback_handler = CallbackHandler(callbacks, self.model, self.processing_class, self.optimizer, self.lr_scheduler)self.add_callback(PrinterCallback if self.args.disable_tqdm else DEFAULT_PROGRESS_CALLBACK)# Will be set to True by `self._setup_loggers()` on first call to `self.log()`.self._loggers_initialized = False# Label smoothingif self.args.label_smoothing_factor != 0:self.label_smoother = LabelSmoother(epsilon=self.args.label_smoothing_factor)self.control = TrainerControl() # 训练过程控制类,存储一些状态控制参数,与TrainerCallback搭配使用self.state = TrainerState( # 训练状态记录类,每次梯度更新时生效(gradient_accumulation_steps=1时每个batch后生效一次)is_local_process_zero=self.is_local_process_zero(),is_world_process_zero=self.is_world_process_zero(),stateful_callbacks=[cb for cb in self.callback_handler.callbacks + [self.control] if isinstance(cb, ExportableState)],)# Internal variable to count flos in each process, will be accumulated in `self.state.total_flos` then# returned to 0 every time flos need to be loggedself.current_flos = 0self.hp_search_backend = None default_label_names = find_labels(self.model.__class__) # 找label,基于torch的模型按forward()后返回 label 与否self.label_names = default_label_names if self.args.label_names is None else self.args.label_namesself.can_return_loss = can_return_loss(self.model.__class__) # 基于 forward()返回里包含 return_loss 与否self.control = self.callback_handler.on_init_end(self.args, self.state, self.control)# Internal variables to help with automatic batch size reductionself._train_batch_size = args.train_batch_sizeself._created_lr_scheduler = False# very lastself._memory_tracker.stop_and_update_metrics()self.is_fsdp_xla_v2_enabled = args.fsdp_config.get("xla_fsdp_v2", False)self.is_fsdp_xla_v1_enabled = self.is_fsdp_xla_enabled and not self.is_fsdp_xla_v2_enabled
train()函数:
train(self,resume_from_checkpoint: Optional[Union[str, bool]] = None,trial: Union["optuna.Trial", dict[str, Any], None] = None,ignore_keys_for_eval: Optional[list[str]] = None,**kwargs,):  # 内部核心实现代码 self._memory_tracker.start()args = self.argsself.is_in_train = True# This might change the seed so needs to run first.self._hp_search_setup(trial) # 超参搜索相关 # Model re-init 每次train()中重新执行 model_init model_reloaded = Falseif self.model_init is not None:# Seed must be set before instantiating the model when using model_init.enable_full_determinism(self.args.seed) if self.args.full_determinism else set_seed(self.args.seed)self.model = self.call_model_init(trial)model_reloaded = True# Reinitializes optimizer and schedulerself.optimizer, self.lr_scheduler = None, None# Load potential model checkpoint: 优先级比 model_init 更高if isinstance(resume_from_checkpoint, bool) and resume_from_checkpoint:resume_from_checkpoint = get_last_checkpoint(args.output_dir)if resume_from_checkpoint is not None:if not is_sagemaker_mp_enabled() and not self.is_deepspeed_enabled and not self.is_fsdp_enabled:self._load_from_checkpoint(resume_from_checkpoint)# If model was re-initialized, put it on the right device and update self.model_wrappedif model_reloaded:if self.place_model_on_device:self._move_model_to_device(self.model, args.device)self.model_wrapped = self.modelinner_training_loop = find_executable_batch_size(self._inner_training_loop, self._train_batch_size, args.auto_find_batch_size)return inner_training_loop(  # 该函数找到显存可承受的batch调用_inner_training_loop()训练args=args,resume_from_checkpoint=resume_from_checkpoint,trial=trial,ignore_keys_for_eval=ignore_keys_for_eval,)
_inner_training_loop()函数:
def _inner_training_loop(self, batch_size=None, args=None, resume_from_checkpoint=None, trial=None, ignore_keys_for_eval=None):  # 核心代码展开self.accelerator.free_memory()self._train_batch_size = batch_sizetrain_dataloader = self.get_train_dataloader() # 数据加载封装total_train_batch_size = self._train_batch_size * args.gradient_accumulation_steps * args.world_size (num_train_epochs, num_update_steps_per_epoch, xxx) = self.set_initial_training_values(args, train_dataloader, total_train_batch_size)  # 确定具体的训练数据num_train_tokens = None # 记录总训练的token数量if self.is_deepspeed_enabled:self.optimizer, self.lr_scheduler = deepspeed_init(self, num_training_steps=max_steps)self.state = TrainerState(xxx) # 记录当前训练状态if args.gradient_checkpointing:self.model.gradient_checkpointing_enable(gradient_checkpointing_kwargs=args.gradient_checkpointing_kwargs)model = self._wrap_model(self.model_wrapped)use_accelerator_prepare = True if model is self.model else Falseif delay_optimizer_creation:xxxself.create_optimizer_and_scheduler(num_training_steps=max_steps) # 设置优化器和lr调度器

http://www.xdnf.cn/news/2275.html

相关文章:

  • asammdf 库的依赖项和安装指南
  • 【数据结构】优先级队列
  • 【人工智能之大模型】详述大模型中流水线并行(Pipeline Parallelism)的​GPipe推理框架?
  • 【树莓派 PICO 2 测评】ADC 水位监测系统
  • ZBrush2025.1.3 中文版【ZBrush2025版下载】附安装教程
  • tkinter中Listbox列表框常用的操作方法
  • 单片机-89C51部分:4、固件烧录
  • Pygame多人游戏开发:本地双人对战实战
  • C++篇——继承
  • 详解Adobe Photoshop 2024 下载与安装教程
  • Adruino:人机界面及接口技术
  • SSE协议
  • 飞帆:自定义控件平台
  • 【CF】Day44——Codeforces Round 908 (Div. 2) C + Codeforces Round 1020 (Div. 3) DE
  • PyQt6实例_消息工具_使用与完整代码分享
  • 网络安全于应用服务web中间件服务 默认配置文件的关联(配置漏洞)(完成)
  • 理解计算机系统_网络编程(3)
  • Python循环结构深度解析与高效应用实践
  • 基于STM32定时器中断讲解(HAL库)
  • leetcode66.加一
  • Dubbo(79)Dubbo的监控机制是如何实现的?
  • Python部署Docker报错:curl: (56) Recv failure: Connection reset by peer
  • 零拷贝技术原理的详细解析与java实战方案
  • Java中的final关键字【最通俗易懂】
  • 【Linux网络#1】:网络基础知识
  • Redux基础知识
  • 论文笔记(八十)π0.5: a Vision-Language-Action Model with Open-World Generalization
  • MCP协议:AI与数据世界的“万能连接器“
  • 作为无线信号传输如何理解WIFI信号本质上也是串行传输?
  • 基于先进MCU的机器人运动控制系统设计:理论、实践与前沿技术