当前位置: 首页 > news >正文

megatron——EP并行

1、专家并行(Expert Parallelism, EP)适用场景


定义:
专家并行是指在混合专家模型(Mixture of Experts, MoE)中,将不同的专家(即子模型)分配到不同的设备上,每个设备只负责一部分专家的计算。这样可以有效利用设备资源,提高模型的容量和效率。

实现机制:

  1. 专家拆分:将模型中的多个专家(子模型)分配到不同的设备上。
  2. 路由机制:在前向传播过程中,输入数据被路由到不同的专家,每个专家只处理部分数据。
  3. 结果聚合:将各个专家的计算结果聚合,得到最终的输出。
  4. 反向传播:在反向传播过程中,梯度同样被路由到相应的专家,每个专家计算其对应的梯度部分,最后再聚合梯度。

优点:

  1. 模型容量:可以显著增加模型的容量,因为每个设备只需要处理部分专家。
  2. 计算效率:通过并行处理不同的专家,可以提高计算效率。
  3. 缺点:
  4. 通信开销:需要在设备之间传输数据和梯度,增加通信开销。
  5. 负载均衡:需要设计有效的路由机制,确保各个专家之间的负载均衡,避免某些设备过载。

总结
张量并行适用于将模型的张量拆分到多个设备上,减少单个设备的内存负担,并加速计算。适用于大多数深度学习模型。
专家并行适用于混合专家模型,通过将不同的专家分配到不同的设备上,增加模型的容量和计算效率。特别适用于大规模的稀疏模型。
选择哪种并行化技术取决于具体的模型结构和应用场景。对于传统的深度学习模型,张量并行可能更合适而对于混合专家模型,专家并行则更为有效。
                        
原文链接:https://blog.csdn.net/weixin_41012399/article/details/147421449

2、初始化实现

(1)pretrain_gpt.py

        model = GPTModel(config=config,transformer_layer_spec=transformer_layer_spec,vocab_size=args.padded_vocab_size,max_sequence_length=args.max_position_embeddings,pre_process=pre_process,post_process=post_process,fp16_lm_cross_entropy=args.fp16_lm_cross_entropy,parallel_output=True,share_embeddings_and_output_weights=not args.untie_embeddings_and_output_weights,position_embedding_type=args.position_embedding_type,rotary_percent=args.rotary_percent,seq_len_interpolation_factor=args.rotary_seq_len_interpolation_factor

gpt_model.py 

        # Transformer.self.decoder = TransformerBlock(config=self.config,spec=transformer_layer_spec,pre_process=self.pre_process,post_process=self.post_process,)

 training/training.py

# 片段1   self._build_layers()self.num_layers_per_pipeline_rank = len(self.layers)# 片段2num_layers_to_build = num_layers_per_pipeline_rank# 片段3elif isinstance(spec, ModuleSpec):if issubclass(spec.module, TransformerBlock):return spec.submoduleselif issubclass(spec.module, BaseTransformerLayer):num_layers = get_num_layers_to_build(config)return TransformerBlockSubmodules(layer_specs=[spec] * num_layers,layer_norm=LayerNormImpl,)

 从以上代码可以看出,每个rank仅构建和计算部分layers,其他代码和计算逻辑保持不变。这里的spec在初始化时传入,deepseek的spec,即decoder module如下。

tasks/models/spec/deepseek.py


args = get_args()
num_experts, moe_grouped_gemm, qk_layernorm, mla_mm_split = (args.num_experts,args.moe_grouped_gemm,args.qk_layernorm,args.mla_mm_split,
)layer_spec = ModuleSpec(module=TransformerLayer,submodules=TransformerLayerSubmodules(input_layernorm=PTNorm,self_attention=ModuleSpec(module=MultiHeadLatentAttention,params={"attn_mask_type": AttnMaskType.causal},submodules=MLASelfAttentionSubmodules(linear_qkv=LinearNoTP,core_attention=MlaDotProductAttention,linear_proj=RowParallelLinear,q_layernorm=PTNorm if qk_layernorm else IdentityOp,k_layernorm=PTNorm if qk_layernorm else IdentityOp,linear_qb=ColumnParallelLinear,linear_kvb=ColumnParallelLinear,)if not mla_mm_splitelse MLASelfAttentionWithMMSplitSubmodules(linear_qkv=LinearNoTP,core_attention=MlaDotProductAttention,linear_proj=RowParallelLinear,q_layernorm=PTNorm if qk_layernorm else IdentityOp,k_layernorm=PTNorm if qk_layernorm else IdentityOp,linear_qk_nope=ColumnParallelLinear,linear_qk_rope=ColumnParallelLinear,linear_kv_nope=ColumnParallelLinear,linear_v=ColumnParallelLinear,),),self_attn_bda=get_bias_dropout_add,pre_mlp_layernorm=PTNorm,# different mlp spec varied from different layers.# So the real deepseek_mlp_spec would be defined in build_layer of Transformer Blockmlp=_get_mlp_module_spec(use_te=False, num_experts=num_experts, moe_grouped_gemm=moe_grouped_gemm),mlp_bda=get_bias_dropout_add,sharded_state_dict_keys_map={"input_layernorm.": "self_attention.linear_qkv.layer_norm_","pre_mlp_layernorm.": "mlp.linear_fc1.layer_norm_",},),
)

这里面定义了,mla、mlp的实现 ,点击_get_mlp_module_spec,可以找到我们想要的moe结构了。

gpt_layers_specs.py

def _get_mlp_module_spec(use_te: bool = True, num_experts: int = None, moe_grouped_gemm: bool = False
) -> ModuleSpec:if num_experts is None:# Dense MLP w/ or w/o TE modules.return ModuleSpec(module=MLP,submodules=MLPSubmodules(linear_fc1=TELayerNormColumnParallelLinear if use_te else ColumnParallelLinear,linear_fc2=TERowParallelLinear if use_te else RowParallelLinear,),)else:# Mixture of experts with modules in megatron core.if use_te and moe_grouped_gemm:linear_fc1 = TEColumnParallelGroupedLinearlinear_fc2 = TERowParallelGroupedLinearelse:linear_fc1 = ColumnParallelLinearlinear_fc2 = RowParallelLinearuse_te_grouped_gemm = use_te and TEColumnParallelGroupedLinear is not Nonereturn ModuleSpec(module=MoELayer,submodules=(MLPSubmodules(linear_fc1=linear_fc1, linear_fc2=linear_fc2)if not moe_grouped_gemm or use_te_grouped_gemmelse None),)

若定义了num_expert,则MLP模块为MoeLayer,继续查看下

moe_layer.py

(1)初始化

 def __init__(self, config: TransformerConfig, submodules: MLPSubmodules = None, layer_number: int = None):self.submodules = submodulessuper(MoELayer, self).__init__(config=config, layer_number=layer_number)self.router = TopKRouter(config=self.config)if self.config.moe_grouped_gemm:if isinstance(self.submodules, MLPSubmodules):self.experts = TEGroupedMLP(self.num_local_experts, self.config, self.submodules)else:self.experts = GroupedMLP(self.num_local_experts, self.config)else:assert isinstance(self.submodules, MLPSubmodules)self.experts = SequentialMLP(self.num_local_experts, self.config, self.submodules)if config.moe_token_dispatcher_type == "allgather":self.token_dispatcher = MoEAllGatherTokenDispatcher(self.num_local_experts, self.local_expert_indices, config=self.config)elif config.moe_token_dispatcher_type == "alltoall":self.token_dispatcher = MoEAlltoAllTokenDispatcher(self.num_local_experts, self.local_expert_indices, config=self.config)else:raise ValueError(f"Unsupported token dispatcher type: {config.moe_token_dispatcher_type}")self.moe_layer_recompute = config.moe_layer_recompute

定义了几种不同的token分发方法,router,experts的构建。由于继承了BaseMoELayer,查看下

class BaseMoELayer(MegatronModule, ABC):"""Base class for a mixture of experts layer.Args:config (TransformerConfig): Configuration object for the transformer model."""def __init__(self, config: TransformerConfig, layer_number: int = None):super(BaseMoELayer, self).__init__(config)self.config = configself.expert_parallel_size = parallel_state.get_expert_model_parallel_world_size()assert self.expert_parallel_size > 0, "Expected non-negative expert parallel size"if self.config.moe_extended_tp:self.num_local_experts = self.config.num_moe_expertslocal_expert_indices_offset = 0else:assert self.config.num_moe_experts % self.expert_parallel_size == 0self.num_local_experts = self.config.num_moe_experts // self.expert_parallel_sizelocal_expert_indices_offset = (parallel_state.get_expert_model_parallel_rank() * self.num_local_experts)self.local_expert_indices = [local_expert_indices_offset + i for i in range(self.num_local_experts)]assert all(map(lambda x: x < self.config.num_moe_experts, self.local_expert_indices))self.router = Noneself.experts = Noneself.token_dispatcher = Noneself.layer_number = layer_number
self.num_local_experts = self.config.num_moe_experts // self.expert_parallel_size

这里即计算出了每个rank上的专家数量。这个属性会用来构造实际的experts属性,在经典的SequentialMLP实现中,可以看到:

experts.py

class SequentialMLP(MegatronModule):"""An implementation of the Experts layer using a sequence of MLP layers.This class executes each expert sequentially."""def __init__(self, num_local_experts, config: TransformerConfig, submodules: MLPSubmodules):super().__init__(config=config)self.add_bias = config.add_bias_linearself.moe_extended_tp = config.moe_extended_tpself.num_local_experts = num_local_expertsself.local_experts = torch.nn.ModuleList()for _ in range(self.num_local_experts):expert = MLP(self.config, submodules, is_expert=True)self.local_experts.append(expert)

其他方式如TEGroupedMLP则使用大的线性层做了专家的合并,以提高并行度,如下

class TEGroupedMLP(MegatronModule):"""An efficient implementation of the Experts layer using TE's GroupedLinear.This class is designed to execute multiple experts in parallel, thereby maximizing computational efficiency."""def __init__(self, num_local_experts, config: TransformerConfig, submodules: MLPSubmodules):super().__init__(config=config)self.moe_extended_tp = config.moe_extended_tpself.num_local_experts = num_local_expertsself.input_size = self.config.hidden_size# If this is a gated linear unit we double the output width, see https://arxiv.org/pdf/2002.05202.pdfffn_hidden_size = self.config.ffn_hidden_sizeif self.config.gated_linear_unit:ffn_hidden_size *= 2self.linear_fc1 = build_module(submodules.linear_fc1,self.num_local_experts,self.input_size,ffn_hidden_size,config=self.config,init_method=self.config.init_method,bias=self.config.add_bias_linear,skip_bias_add=True,is_expert=True,tp_comm_buffer_name='fc1',)self.activation_func = self.config.activation_funcself.linear_fc2 = build_module(submodules.linear_fc2,self.num_local_experts,self.config.ffn_hidden_size,self.config.hidden_size,config=self.config,init_method=self.config.output_layer_init_method,bias=self.config.add_bias_linear,skip_bias_add=True,is_expert=True,tp_comm_buffer_name='fc2',)

3、前向计算

moe_layer的前向计算

    def forward(self, hidden_states: torch.Tensor):if (self.trainingand self.config.tensor_model_parallel_size > 1and not self.config.sequence_parallel):raise ValueError("During training, performance may degrade if MoE and tensor parallelism""are enabled without also enabling sequence parallelism.")# process MoEdef custom_forward(hidden_states):probs, indices = self.router(hidden_states)(dispatched_input, tokens_per_expert) = self.token_dispatcher.token_permutation(hidden_states, probs, indices)expert_output, mlp_bias = self.experts(dispatched_input, tokens_per_expert)output, mlp_bias = self.token_dispatcher.token_unpermutation(expert_output, mlp_bias)return output, mlp_biasif self.moe_layer_recompute:output, mlp_bias = tensor_parallel.checkpoint(custom_forward, False, hidden_states)else:output, mlp_bias = custom_forward(hidden_states)return output, mlp_bias

 (1)输入经过self.router进行前向计算,

self.router前向核心部分如下:

    def forward(self, input: torch.Tensor):"""Forward pass of the router.Args:input (torch.Tensor): Input tensor."""self.hidden = input.shape[-1]# Apply input jitterinput = self.apply_input_jitter(input)logits = self.gating(input)logits = logits.view(-1, self.config.num_moe_experts)scores, indices = self.routing(logits)return scores, indices

gating为线性层计算,返回logits维度为所有专家数 

    def routing(self, logits: torch.Tensor):"""Top-k routing functionArgs:logits (torch.Tensor): Logits tensor after gating.Returns:probs (torch.Tensor): the probabilities tensor after load balancing.indices (torch.Tensor): the indices tensor after top-k selection."""logits = logits.view(-1, self.config.num_moe_experts)# Apply Z-Losslogits = self.apply_z_loss(logits)if (parallel_state.get_tensor_model_parallel_world_size() > 1and self.config.moe_token_dispatcher_type == "alltoall"):# Gather the logits from the TP regionlogits = gather_from_sequence_parallel_region(logits)if self.routing_type == "sinkhorn":scores, indices = self.sinkhorn_load_balancing(logits)elif self.routing_type == "aux_loss":scores, indices = self.aux_loss_load_balancing(logits)elif self.routing_type == "none":# A naive top-k routing without load balancingscores, indices, _ = topk_softmax_with_capacity(logits,self.topk,capacity_factor=self.config.moe_expert_capacity_factor,pad_to_capacity=self.config.moe_pad_expert_input_to_capacity,drop_policy=self.config.moe_token_drop_policy,use_pre_softmax=self.config.moe_router_pre_softmax,)else:raise ValueError(f"Unsupported MoE routing type: {self.routing_type}")return scores, indices

实现了3种负载均衡策略,返回topk专家的分数和索引

(2)回到moe_layer的前向计算部分

(dispatched_input, tokens_per_expert) = self.token_dispatcher.token_permutation(hidden_states, probs, indices
)
expert_output, mlp_bias = self.experts(dispatched_input, tokens_per_expert)
output, mlp_bias = self.token_dispatcher.token_unpermutation(expert_output, mlp_bias)
return output, mlp_bias

这几行代码没太看明白,topk专家中有部分可能是不在当前rank的,因此需要all2all通讯到其他rank的专家(同时从其他rank中获取当前rank的需要计算的内容?),然后传递到self.experts的local专家中进行计算,如果是这个逻辑,那就能理解了。

token_unpermutation,local expert计算完后,还需要发给其他rank。

4、其他代码注意要点

(1)tok-group

从这个函数可以看出

  • num_groups为ep数,专家被切分成ep个组
  • group_topk,选择的topk组数,为训练脚本中指定,对应 --topk-group 参数,默认为4,注意单机单卡未做ep拆分时,--topk-group要设置为1
  • 因此topk-group必须小于ep数
def group_limited_topk(scores: torch.Tensor,topk: int,num_tokens: int,num_experts: int,num_groups: int,group_topk: int,
):"""Perform top-k routing on a subset of expert groups.When using group-limited routing:1. Experts are divided into 'moe_router_num_groups' equal-sized groups2. For each token, 'moe_router_group_topk' groups are selected based on routing scores(specifically, the sum of top-2 expert scores within each group)3. From these selected groups, 'moe_router_topk' individual experts are chosenTwo common use cases:- Device-limited routing: Set 'moe_router_num_groups' equal to expert parallel size (EP)to limit each token to experts on a subset of devices- Node-limited routing: Set 'moe_router_num_groups' equal to number of nodes in EP groupto limit each token to experts on a subset of nodesArgs:scores (torch.Tensor): Softmax scores from the router.topk (int): The number of experts to select for each token.num_tokens (int): The number of tokens.num_experts (int): The number of experts.num_groups (int): Number of groups for routed experts.group_topk (int): Number of groups selected for each token.Returns:Tuple[torch.Tensor, torch.Tensor]: Probs and indices tensor."""import pudb;pudb.set_trace()# Organize the experts into groupsgroup_scores = scores.view(num_tokens, num_groups, -1).topk(2, dim=-1)[0].sum(dim=-1)group_idx = torch.topk(group_scores, k=group_topk, dim=-1, sorted=False)[1]group_mask = torch.zeros_like(group_scores)group_mask.scatter_(1, group_idx, 1)# Mask the experts based on selection groupsscore_mask = (group_mask.unsqueeze(-1).expand(num_tokens, num_groups, num_experts // num_groups).reshape(num_tokens, -1))masked_scores = scores.masked_fill(~score_mask.bool(), float('-inf'))probs, top_indices = torch.topk(masked_scores, k=topk, dim=-1)return probs, top_indices

(2)moe-router-topk

在deepseek3中默认为8,对应路由分发数,和上面的参数有什么区别?

http://www.xdnf.cn/news/538993.html

相关文章:

  • 商标名称起好后,尽快申请注册确权!
  • 【cursor疑惑】cursor续杯后使用agent对话时,提示“需要pro或商业订阅的用户才能使用“
  • 电路研究9.3.6——合宙Air780EP中的AT开发指南:FTP 应用指南
  • np.r_的用法
  • 代码随想录 算法训练 Day6:哈希表part1
  • Mybatis的标签:if标签、where标签、choose,when标签、set标签
  • 【vs2022的C#窗体项目】打开运行+sql Server改为mysql数据库+发布
  • React学习———Immer 和 use-immer
  • 编译zstd
  • 《垒球百科全书》垒球是什么·棒球1号位
  • `asyncio.gather()` 是什么
  • 深度强化学习框架DI-engine
  • Java大师成长计划之第27天:RESTful API设计与实现
  • 算法竞赛 Java 高精度 大数 小数 模版
  • MySQL故障排查域生产环境优化
  • IIR 巴特沃斯II型滤波器设计与实现
  • React Contxt详解
  • 孤立森林和随机森林主要区别
  • Java实现:如何在文件夹中查找重复文件
  • 如何从容应对面试?
  • vi实时查看日志
  • UA 编译和建模入门教程(zhanzhi学习笔记)
  • 基于大模型的脑出血全流程预测系统技术方案大纲
  • 物联网安全技术的最新进展与挑战
  • 深入理解pip:Python包管理的核心工具与实战指南
  • (1-5)Java 常用工具类、包装类、StringStringBuilderString
  • 计算机存储与数据单位的核心定义及换算逻辑
  • 学习黑客 PowerShell 详解
  • 相机Camera日志分析之十五:高通相机Camx 基于预览1帧的ConfigureStreams Usecase完整过程日志分析详解
  • 辅助驾驶平权与出海,Mobileye的双重助力