代码
class FeedForward(nn.Module):def __init__(self, config: LMConfig):super().__init__()if config.hidden_dim is None:hidden_dim = 4 * config.dimhidden_dim = int(2 * hidden_dim / 3)config.hidden_dim = config.multiple_of * ((hidden_dim + config.multiple_of - 1) // config.multiple_of)self.w1 = nn.Linear(config.dim, config.hidden_dim, bias=False)self.w2 = nn.Linear(config.hidden_dim, config.dim, bias=False)self.w3 = nn.Linear(config.dim, config.hidden_dim, bias=False)self.dropout = nn.Dropout(config.dropout)def forward(self, x):return self.dropout(self.w2(F.silu(self.w1(x)) * self.w3(x)))class MoEGate(nn.Module):def __init__(self, config: LMConfig):super().__init__()self.config = configself.top_k = config.num_experts_per_tokself.n_routed_experts = config.n_routed_expertsself.scoring_func = config.scoring_funcself.alpha = config.aux_loss_alphaself.seq_aux = config.seq_auxself.norm_topk_prob = config.norm_topk_probself.gating_dim = config.dimself.weight = nn.Parameter(torch.empty((self.n_routed_experts, self.gating_dim)))self.reset_parameters()def reset_parameters(self) -> None:import torch.nn.init as initinit.kaiming_uniform_(self.weight, a=math.sqrt(5))def forward(self, hidden_states):bsz, seq_len, h = hidden_states.shapehidden_states = hidden_states.view(-1, h)logits = F.linear(hidden_states, self.weight, None)if self.scoring_func == 'softmax':scores = logits.softmax(dim=-1)else:raise NotImplementedError(f'insupportable scoring function for MoE gating: {self.scoring_func}')topk_weight, topk_idx = torch.topk(scores, k=self.top_k, dim=-1, sorted=False)if self.top_k > 1 and self.norm_topk_prob:denominator = topk_weight.sum(dim=-1, keepdim=True) + 1e-20topk_weight = topk_weight / denominatorif self.training and self.alpha > 0.0:scores_for_aux = scoresaux_topk = self.top_ktopk_idx_for_aux_loss = topk_idx.view(bsz, -1)if self.seq_aux:scores_for_seq_aux = scores_for_aux.view(bsz, seq_len, -1)ce = torch.zeros(bsz, self.n_routed_experts, device=hidden_states.device)ce.scatter_add_(1, topk_idx_for_aux_loss,torch.ones(bsz, seq_len * aux_topk, device=hidden_states.device)).div_(seq_len * aux_topk / self.n_routed_experts)aux_loss = (ce * scores_for_seq_aux.mean(dim=1)).sum(dim=1).mean() * self.alphaelse:mask_ce = F.one_hot(topk_idx_for_aux_loss.view(-1), num_classes=self.n_routed_experts)ce = mask_ce.float().mean(0)Pi = scores_for_aux.mean(0)fi = ce * self.n_routed_expertsaux_loss = (Pi * fi).sum() * self.alphaelse:aux_loss = 0return topk_idx, topk_weight, aux_lossclass MOEFeedForward(nn.Module):def __init__(self, config: LMConfig):super().__init__()self.config = configself.experts = nn.ModuleList([FeedForward(config)for _ in range(config.n_routed_experts)])self.gate = MoEGate(config)if config.n_shared_experts is not None:self.shared_experts = FeedForward(config)def forward(self, x):identity = xorig_shape = x.shapebsz, seq_len, _ = x.shape# 使用门控机制选择专家topk_idx, topk_weight, aux_loss = self.gate(x)x = x.view(-1, x.shape[-1])flat_topk_idx = topk_idx.view(-1)if self.training:# 训练模式下,重复输入数据x = x.repeat_interleave(self.config.num_experts_per_tok, dim=0)y = torch.empty_like(x, dtype=torch.float16)for i, expert in enumerate(self.experts):y[flat_topk_idx == i] = expert(x[flat_topk_idx == i]).to(y.dtype) # 确保类型一致y = (y.view(*topk_weight.shape, -1) * topk_weight.unsqueeze(-1)).sum(dim=1)y = y.view(*orig_shape)else:# 推理模式下,只选择最优专家y = self.moe_infer(x, flat_topk_idx, topk_weight.view(-1, 1)).view(*orig_shape)if self.config.n_shared_experts is not None:y = y + self.shared_experts(identity)self.aux_loss = aux_lossreturn y@torch.no_grad()def moe_infer(self, x, flat_expert_indices, flat_expert_weights):expert_cache = torch.zeros_like(x)idxs = flat_expert_indices.argsort()tokens_per_expert = flat_expert_indices.bincount().cpu().numpy().cumsum(0)token_idxs = idxs // self.config.num_experts_per_tok# 例如当tokens_per_expert=[6, 15, 20, 26, 33, 38, 46, 52]# 当token_idxs=[3, 7, 19, 21, 24, 25, 4, 5, 6, 10, 11, 12...]# 意味着当token_idxs[:6] -> [3, 7, 19, 21, 24, 25, 4]位置的token都由专家0处理,token_idxs[6:15]位置的token都由专家1处理......for i, end_idx in enumerate(tokens_per_expert):start_idx = 0 if i == 0 else tokens_per_expert[i - 1]if start_idx == end_idx:continueexpert = self.experts[i]exp_token_idx = token_idxs[start_idx:end_idx]expert_tokens = x[exp_token_idx]expert_out = expert(expert_tokens).to(expert_cache.dtype)expert_out.mul_(flat_expert_weights[idxs[start_idx:end_idx]])# 使用 scatter_add_ 进行 sum 操作expert_cache.scatter_add_(0, exp_token_idx.view(-1, 1).repeat(1, x.shape[-1]), expert_out)return expert_cache
代码解释
解释一下这段代码的主要组成部分:
FeedForward
类:
- 实现了一个基础的前馈网络
- 使用 SwiGLU 激活函数(
F.silu(self.w1(x)) * self.w3(x)
) - 包含三个线性层(w1、w2、w3)和一个 dropout 层
MoEGate
类(门控机制):
- 负责决定每个 token 应该由哪些专家处理
- 主要步骤:
- 计算每个 token 对应每个专家的分数(使用 softmax)
- 选择 top-k 个最高分的专家
- 计算辅助损失(aux_loss)来平衡专家的使用
MOEFeedForward
类(混合专家系统):
-
包含多个专家(FeedForward)和一个门控网络(MoEGate)
-
训练模式:
- 使用门控网络选择每个 token 的专家
- 将输入数据复制多份,分发给不同专家
- 专家并行处理数据
- 根据门控权重合并结果
-
推理模式(
moe_infer
):- 对专家索引排序,将相同专家的 token 批量处理
- 使用
scatter_add_
将专家输出累加到正确位置 - 更高效的推理实现,避免了数据重复
- 特殊功能:
- 支持共享专家(
n_shared_experts
) - 实现了专家负载均衡(通过辅助损失)
- 支持每个 token 选择多个专家(
num_experts_per_tok
)
这是一个典型的 MoE(Mixture of Experts)实现,用于大型语言模型中提高模型容量和计算效率。
示例
# 创建 MoE 实例
dim = 512 # 输入维度
n_routed_experts = 4 # 专家数量
num_experts_per_tok = 2 # 每个token选择的专家数量moe = MOEFeedForward(dim=dim,n_routed_experts=n_routed_experts,num_experts_per_tok=num_experts_per_tok,hidden_dim=None, # FFN隐藏层维度,None时自动计算dropout=0.1 # dropout比率
)# 创建示例输入
batch_size = 2
seq_len = 10
x = torch.randn(batch_size, seq_len, dim) # 形状: [2, 10, 512]moe(x)
输出
After gate - topk_idx.shape: torch.Size([20, 2]), topk_weight.shape: torch.Size([20, 2])
After view - x.shape: torch.Size([20, 512]), flat_topk_idx.shape: torch.Size([40])
After repeat_interleave - x.shape: torch.Size([40, 512])
Empty y tensor shape: torch.Size([40, 512])
Expert 0 - input shape: torch.Size([9, 512])
Expert 0 - output shape: torch.Size([9, 512])
Expert 1 - input shape: torch.Size([13, 512])
Expert 1 - output shape: torch.Size([13, 512])
Expert 2 - input shape: torch.Size([11, 512])
Expert 2 - output shape: torch.Size([11, 512])
Expert 3 - input shape: torch.Size([7, 512])
Expert 3 - output shape: torch.Size([7, 512])
Before view - y.shape: torch.Size([40, 512])
topk_weight.shape: torch.Size([20, 2])
After view and sum - y.shape: torch.Size([20, 512])
Final y.shape: torch.Size([2, 10, 512])
相应的torch函数
import torch
# empty: 创建未初始化的张量
x = torch.empty((2, 3)) # 创建形状为 2x3 的未初始化张量# zeros_like: 创建与输入相同形状的全零张量
a = torch.tensor([[1, 2], [3, 4]])
b = torch.zeros_like(a) # 创建形状为 2x2 的全零张量
print(b) # tensor([[0, 0], [0, 0]])
tensor([[0, 0],[0, 0]])
import torch.nn.functional as F
x = torch.tensor([[1, 2, 3, 4], [5, 6, 7, 8]])
# view: 改变张量形状
y = x.view(-1) # 展平为一维
print(y) # tensor([1, 2, 3, 4, 5, 6, 7, 8])# -1 表示自动计算该维度大小
z = x.view(-1, 2) # 重塑为 4x2
print(z) # tensor([[1, 2], [3, 4], [5, 6], [7, 8]])
tensor([1, 2, 3, 4, 5, 6, 7, 8])
tensor([[1, 2],[3, 4],[5, 6],[7, 8]])
# linear: 线性变换 y = xA^T + b
input = torch.randn(2, 3) # 2个样本,每个3维
weight = torch.randn(4, 3) # 输出4维
output = F.linear(input, weight) # 形状变为 [2, 4]# softmax: 将数值转换为概率分布
logits = torch.tensor([1.0, 2.0, 3.0])
probs = F.softmax(logits, dim=0)
print(probs) # tensor([0.0900, 0.2447, 0.6652])
tensor([0.0900, 0.2447, 0.6652])
# 找出最大的k个值及其索引
x = torch.tensor([1, 5, 2, 8, 3])
values, indices = torch.topk(x, k=2)
print(values) # tensor([8, 5])
print(indices) # tensor([3, 1])
tensor([8, 5])
tensor([3, 1])
x = torch.tensor([1, 2, 3])
# 每个元素重复2次
y = x.repeat_interleave(2)
print(y) # tensor([1, 1, 2, 2, 3, 3])
tensor([1, 1, 2, 2, 3, 3])
# 统计每个数字出现的次数
x = torch.tensor([1, 1, 2, 3, 1, 2])
counts = x.bincount()
print(counts) # tensor([0, 3, 2, 1]) # 0出现0次,1出现3次,2出现2次,3出现1次
tensor([0, 3, 2, 1])
# 在指定位置累加值
src = torch.tensor([[1, 2], [3, 4]], dtype=torch.float) # 指定数据类型为 float
index = torch.tensor([[0, 1], [0, 1]])
out = torch.zeros(2, 2, dtype=torch.float) # 确保与 src 的数据类型相同
out.scatter_add_(0, index, src)
print(out)
tensor([[4., 0.],[0., 6.]])
# 返回排序后的索引
x = torch.tensor([3, 1, 4, 1, 5])
indices = x.argsort()
print(indices) # tensor([1, 3, 0, 2, 4]) # 最小值在位置1和3,然后是0,2,4
tensor([1, 3, 0, 2, 4])