目录

MOEFeedForward-模块

MOEFeedForward 模块

代码

class FeedForward(nn.Module): def init(self, config: LMConfig): super().init() if config.hidden_dim is None: hidden_dim = 4 * config.dim hidden_dim = int(2 * hidden_dim / 3) config.hidden_dim = config.multiple_of * ((hidden_dim + config.multiple_of - 1) // config.multiple_of) self.w1 = nn.Linear(config.dim, config.hidden_dim, bias=False) self.w2 = nn.Linear(config.hidden_dim, config.dim, bias=False) self.w3 = nn.Linear(config.dim, config.hidden_dim, bias=False) self.dropout = nn.Dropout(config.dropout) def forward(self, x): return self.dropout(self.w2(F.silu(self.w1(x)) * self.w3(x))) class MoEGate(nn.Module): def init(self, config: LMConfig): super().init() self.config = config self.top_k = config.num_experts_per_tok self.n_routed_experts = config.n_routed_experts self.scoring_func = config.scoring_func self.alpha = config.aux_loss_alpha self.seq_aux = config.seq_aux self.norm_topk_prob = config.norm_topk_prob self.gating_dim = config.dim self.weight = nn.Parameter(torch.empty((self.n_routed_experts, self.gating_dim))) self.reset_parameters() def reset_parameters(self) -> None: import torch.nn.init as init init.kaiming_uniform_(self.weight, a=math.sqrt(5)) def forward(self, hidden_states): bsz, seq_len, h = hidden_states.shape hidden_states = hidden_states.view(-1, h) logits = F.linear(hidden_states, self.weight, None) if self.scoring_func == ‘softmax’: scores = logits.softmax(dim=-1) else: raise NotImplementedError(f’insupportable scoring function for MoE gating: {self.scoring_func}’) topk_weight, topk_idx = torch.topk(scores, k=self.top_k, dim=-1, sorted=False) if self.top_k > 1 and self.norm_topk_prob: denominator = topk_weight.sum(dim=-1, keepdim=True) + 1e-20 topk_weight = topk_weight / denominator if self.training and self.alpha > 0.0: scores_for_aux = scores aux_topk = self.top_k topk_idx_for_aux_loss = topk_idx.view(bsz, -1) if self.seq_aux: scores_for_seq_aux = scores_for_aux.view(bsz, seq_len, -1) ce = torch.zeros(bsz, self.n_routed_experts, device=hidden_states.device) ce.scatter_add_(1, topk_idx_for_aux_loss, torch.ones(bsz, seq_len * aux_topk, device=hidden_states.device)).div_( seq_len * aux_topk / self.n_routed_experts) aux_loss = (ce * scores_for_seq_aux.mean(dim=1)).sum(dim=1).mean() * self.alpha else: mask_ce = F.one_hot(topk_idx_for_aux_loss.view(-1), num_classes=self.n_routed_experts) ce = mask_ce.float().mean(0) Pi = scores_for_aux.mean(0) fi = ce * self.n_routed_experts aux_loss = (Pi * fi).sum() * self.alpha else: aux_loss = 0 return topk_idx, topk_weight, aux_loss class MOEFeedForward(nn.Module): def init(self, config: LMConfig): super().init() self.config = config self.experts = nn.ModuleList([ FeedForward(config) for _ in range(config.n_routed_experts) ]) self.gate = MoEGate(config) if config.n_shared_experts is not None: self.shared_experts = FeedForward(config) def forward(self, x): identity = x orig_shape = x.shape bsz, seq_len, _ = x.shape

使用门控机制选择专家

topk_idx, topk_weight, aux_loss = self.gate(x) x = x.view(-1, x.shape[-1]) flat_topk_idx = topk_idx.view(-1) if self.training:

训练模式下,重复输入数据

x = x.repeat_interleave(self.config.num_experts_per_tok, dim=0) y = torch.empty_like(x, dtype=torch.float16) for i, expert in enumerate(self.experts): y[flat_topk_idx == i] = expert(x[flat_topk_idx == i]).to(y.dtype) # 确保类型一致 y = (y.view(*topk_weight.shape, -1) * topk_weight.unsqueeze(-1)).sum(dim=1) y = y.view(*orig_shape) else:

推理模式下,只选择最优专家

y = self.moe_infer(x, flat_topk_idx, topk_weight.view(-1, 1)).view(*orig_shape) if self.config.n_shared_experts is not None: y = y + self.shared_experts(identity) self.aux_loss = aux_loss return y @torch.no_grad() def moe_infer(self, x, flat_expert_indices, flat_expert_weights): expert_cache = torch.zeros_like(x) idxs = flat_expert_indices.argsort() tokens_per_expert = flat_expert_indices.bincount().cpu().numpy().cumsum(0) token_idxs = idxs // self.config.num_experts_per_tok

例如当tokens_per_expert=[6, 15, 20, 26, 33, 38, 46, 52]

当token_idxs=[3, 7, 19, 21, 24, 25, 4, 5, 6, 10, 11, 12…]

意味着当token_idxs[:6] -> [3, 7, 19, 21, 24, 25, 4]位置的token都由专家0处理,token_idxs[6:15]位置的token都由专家1处理……

for i, end_idx in enumerate(tokens_per_expert): start_idx = 0 if i == 0 else tokens_per_expert[i - 1] if start_idx == end_idx: continue expert = self.experts[i] exp_token_idx = token_idxs[start_idx:end_idx] expert_tokens = x[exp_token_idx] expert_out = expert(expert_tokens).to(expert_cache.dtype) expert_out.mul_(flat_expert_weights[idxs[start_idx:end_idx]])

使用 scatter_add_ 进行 sum 操作

expert_cache.scatter_add_(0, exp_token_idx.view(-1, 1).repeat(1, x.shape[-1]), expert_out) return expert_cache

代码解释

解释一下这段代码的主要组成部分:

  1. FeedForward 类:
  • 实现了一个基础的前馈网络
  • 使用 SwiGLU 激活函数(F.silu(self.w1(x)) * self.w3(x)
  • 包含三个线性层(w1、w2、w3)和一个 dropout 层
  1. MoEGate 类(门控机制):
  • 负责决定每个 token 应该由哪些专家处理
  • 主要步骤:
  1. 计算每个 token 对应每个专家的分数(使用 softmax)
  2. 选择 top-k 个最高分的专家
  3. 计算辅助损失(aux_loss)来平衡专家的使用
  4. MOEFeedForward 类(混合专家系统):
  • 包含多个专家(FeedForward)和一个门控网络(MoEGate)
  • 训练模式:
  1. 使用门控网络选择每个 token 的专家
  2. 将输入数据复制多份,分发给不同专家
  3. 专家并行处理数据
  4. 根据门控权重合并结果
  • 推理模式(moe_infer):
  1. 对专家索引排序,将相同专家的 token 批量处理
  2. 使用 scatter_add_ 将专家输出累加到正确位置
  3. 更高效的推理实现,避免了数据重复
  4. 特殊功能:
  • 支持共享专家(n_shared_experts
  • 实现了专家负载均衡(通过辅助损失)
  • 支持每个 token 选择多个专家(num_experts_per_tok) 这是一个典型的 MoE(Mixture of Experts)实现,用于大型语言模型中提高模型容量和计算效率。

示例

创建 MoE 实例

dim = 512 # 输入维度 n_routed_experts = 4 # 专家数量 num_experts_per_tok = 2 # 每个token选择的专家数量 moe = MOEFeedForward( dim=dim, n_routed_experts=n_routed_experts, num_experts_per_tok=num_experts_per_tok, hidden_dim=None, # FFN隐藏层维度,None时自动计算 dropout=0.1 # dropout比率 )

创建示例输入

batch_size = 2 seq_len = 10 x = torch.randn(batch_size, seq_len, dim) # 形状: [2, 10, 512] moe(x) 输出 After gate - topk_idx.shape: torch.Size([20, 2]), topk_weight.shape: torch.Size([20, 2]) After view - x.shape: torch.Size([20, 512]), flat_topk_idx.shape: torch.Size([40]) After repeat_interleave - x.shape: torch.Size([40, 512]) Empty y tensor shape: torch.Size([40, 512]) Expert 0 - input shape: torch.Size([9, 512]) Expert 0 - output shape: torch.Size([9, 512]) Expert 1 - input shape: torch.Size([13, 512]) Expert 1 - output shape: torch.Size([13, 512]) Expert 2 - input shape: torch.Size([11, 512]) Expert 2 - output shape: torch.Size([11, 512]) Expert 3 - input shape: torch.Size([7, 512]) Expert 3 - output shape: torch.Size([7, 512]) Before view - y.shape: torch.Size([40, 512]) topk_weight.shape: torch.Size([20, 2]) After view and sum - y.shape: torch.Size([20, 512]) Final y.shape: torch.Size([2, 10, 512])

相应的torch函数

import torch

empty: 创建未初始化的张量

x = torch.empty((2, 3)) # 创建形状为 2x3 的未初始化张量

zeros_like: 创建与输入相同形状的全零张量

a = torch.tensor([[1, 2], [3, 4]]) b = torch.zeros_like(a) # 创建形状为 2x2 的全零张量 print(b) # tensor([[0, 0], [0, 0]]) tensor([[0, 0], [0, 0]]) import torch.nn.functional as F x = torch.tensor([[1, 2, 3, 4], [5, 6, 7, 8]])

view: 改变张量形状

y = x.view(-1) # 展平为一维 print(y) # tensor([1, 2, 3, 4, 5, 6, 7, 8])

-1 表示自动计算该维度大小

z = x.view(-1, 2) # 重塑为 4x2 print(z) # tensor([[1, 2], [3, 4], [5, 6], [7, 8]]) tensor([1, 2, 3, 4, 5, 6, 7, 8]) tensor([[1, 2], [3, 4], [5, 6], [7, 8]])

linear: 线性变换 y = xA^T + b

input = torch.randn(2, 3) # 2个样本,每个3维 weight = torch.randn(4, 3) # 输出4维 output = F.linear(input, weight) # 形状变为 [2, 4]

softmax: 将数值转换为概率分布

logits = torch.tensor([1.0, 2.0, 3.0]) probs = F.softmax(logits, dim=0) print(probs) # tensor([0.0900, 0.2447, 0.6652]) tensor([0.0900, 0.2447, 0.6652])

找出最大的k个值及其索引

x = torch.tensor([1, 5, 2, 8, 3]) values, indices = torch.topk(x, k=2) print(values) # tensor([8, 5]) print(indices) # tensor([3, 1]) tensor([8, 5]) tensor([3, 1]) x = torch.tensor([1, 2, 3])

每个元素重复2次

y = x.repeat_interleave(2) print(y) # tensor([1, 1, 2, 2, 3, 3]) tensor([1, 1, 2, 2, 3, 3])

统计每个数字出现的次数

x = torch.tensor([1, 1, 2, 3, 1, 2]) counts = x.bincount() print(counts) # tensor([0, 3, 2, 1]) # 0出现0次,1出现3次,2出现2次,3出现1次 tensor([0, 3, 2, 1])

在指定位置累加值

src = torch.tensor([[1, 2], [3, 4]], dtype=torch.float) # 指定数据类型为 float index = torch.tensor([[0, 1], [0, 1]]) out = torch.zeros(2, 2, dtype=torch.float) # 确保与 src 的数据类型相同 out.scatter_add_(0, index, src) print(out) tensor([[4., 0.], [0., 6.]])

返回排序后的索引

x = torch.tensor([3, 1, 4, 1, 5]) indices = x.argsort() print(indices) # tensor([1, 3, 0, 2, 4]) # 最小值在位置1和3,然后是0,2,4 tensor([1, 3, 0, 2, 4])