基于联邦学习的MNIST手写数字识别-Pytorch实现
基于联邦学习的MNIST手写数字识别-Pytorch实现
基于联邦学习的MNIST手写数字识别-Pytorch实现
背景知识
联邦学习 :联邦学习是为了解决“数据孤岛”的问题,即不同机构由于隐私保护等限制无法获取更多的数据来完善模型,从而形成孤岛。联邦学习让不同的机构通过一个中心服务器传输模型参数,在一定程度上达到了共享数据集的效果。各个机构通过联邦学习框架进行合作,最大化其收益。
MNIST数据集 :MNIST数据集是计算机视觉领域中比较常用的数据集,它包含60000个训练数据和10000个测试数据。其内容是0-9的手写数字,且每张图片都是灰度图像,像素为28×28,具体见下图:
Pytorch :Pytorch是开源的机器学习库,在近几年的论文中得到广泛应用。
导入库
import torch
import torchvision
import torchvision.transforms as transforms
import torch.utils.data.dataloader as dataloader
from torch.utils.data import Subset
import torch.nn as nn
import torch.optim as optim
from torch.nn.parameter import Parameter
读取MNIST数据集
使用Subset函数对训练数据集进行划分,这里总共有ABC三个机构,每个机构的训练集数目为1000。然后将训练数据集放入Dataloader里,这里batch_size即每次往神经网络放入多少数据,比如batch_size为100就是每次放入100个数据,总共放10次。shuffle为True意为将数据集打乱。这里我们把整个训练集作为一个batch_size,所以不需要打乱。
train_set = torchvision.datasets.MNIST(root="./data",train=True,transform=transforms.ToTensor(),download=True)
train_set_A=Subset(train_set,range(0,1000))
train_set_B=Subset(train_set,range(1000,2000))
train_set_C=Subset(train_set,range(2000,3000))
train_loader_A = dataloader.DataLoader(dataset=train_set_A,batch_size=1000,shuffle=False)
train_loader_B = dataloader.DataLoader(dataset=train_set_B,batch_size=1000,shuffle=False)
train_loader_C = dataloader.DataLoader(dataset=train_set_C,batch_size=1000,shuffle=False)
test_set = torchvision.datasets.MNIST(root="./data",train=False,transform=transforms.ToTensor(),download=True)
test_set=Subset(test_set,range(0,2000))
test_loader = dataloader.DataLoader(dataset=test_set,shuffle=True)
训练和测试
train_and_test_1 :普通的训练测试过程。
首先定义神经网络的类型,这里用的是最简单的三层神经网络(也可以说是两层,不算输入层),输入层28×28,隐藏层12个神经元,输出层10个神经元。
def train_and_test_1(train_loader,test_loader):
class NeuralNet(nn.Module):
def __init__(self, input_num, hidden_num, output_num):
super(NeuralNet, self).__init__()
self.fc1 = nn.Linear(input_num, hidden_num) # 服从正态分布的权重w
self.fc2 = nn.Linear(hidden_num, output_num)
nn.init.normal_(self.fc1.weight)
nn.init.normal_(self.fc2.weight)
nn.init.constant_(self.fc1.bias, val=0) # 初始化bias为0
nn.init.constant_(self.fc2.bias, val=0)
self.relu = nn.ReLU() # Relu激励函数
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
y = self.fc2(x)
return y
epoches = 20 # 迭代20轮
lr = 0.01 # 学习率,即步长
input_num = 784
hidden_num = 12
output_num = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = NeuralNet(input_num, hidden_num, output_num)
model.to(device)
loss_func = nn.CrossEntropyLoss() # 损失函数的类型:交叉熵损失函数
optimizer = optim.Adam(model.parameters(), lr=lr) # Adam优化,也可以用SGD随机梯度下降法
# optimizer = optim.SGD(model.parameters(), lr=lr)
for epoch in range(epoches):
flag = 0
for images, labels in train_loader:
images = images.reshape(-1, 28 * 28).to(device)
labels = labels.to(device)
output = model(images)
loss = loss_func(output, labels)
optimizer.zero_grad()
loss.backward() # 误差反向传播,计算参数更新值
optimizer.step() # 将参数更新值施加到net的parameters上
# 以下两步可以看每轮损失函数具体的变化情况
# if (flag + 1) % 10 == 0:
# print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch + 1, epoches, loss.item()))
flag += 1
params = list(model.named_parameters()) # 获取模型参数
# 测试,评估准确率
correct = 0
total = 0
for images, labels in test_loader:
images = images.reshape(-1, 28 * 28).to(device)
labels = labels.to(device)
output = model(images)
values, predicte = torch.max(output, 1) # 0是每列的最大值,1是每行的最大值
total += labels.size(0)
# predicte == labels 返回每张图片的布尔类型
correct += (predicte == labels).sum().item()
print("The accuracy of total {} images: {}%".format(total, 100 * correct / total))
return params
train*and_test_2 :联邦后的训练测试过程。
注意每次联邦前,bias 都要重置为 0,这样效果会好。并且需要把做完平均后的 w 传入到模型中。
def train_and_test_2(train_loader,test_loader,com_para_fc1,com_para_fc2):
class NeuralNet(nn.Module):
def **init**(self, input_num, hidden_num, output_num,com_para_fc1,com_para_fc2):
super(NeuralNet, self).**init**()
self.fc1 = nn.Linear(input_num, hidden_num)
self.fc2 = nn.Linear(hidden_num, output_num)
self.fc1.weight=Parameter(com_para_fc1)
self.fc2.weight=Parameter(com_para_fc2)
nn.init.constant*(self.fc1.bias, val=0)
nn.init.constant_(self.fc2.bias, val=0)
self.relu = nn.ReLU()
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
y = self.fc2(x)
return y
epoches = 20
lr = 0.01
input_num = 784
hidden_num = 12
output_num = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = NeuralNet(input_num, hidden_num, output_num,com_para_fc1,com_para_fc2)
model.to(device)
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
# optimizer = optim.SGD(model.parameters(), lr=lr)
for epoch in range(epoches):
flag = 0
for images, labels in train_loader:
# (images, labels) = data
images = images.reshape(-1, 28 * 28).to(device)
labels = labels.to(device)
output = model(images)
loss = loss_func(output, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# if (flag + 1) % 10 == 0:
# print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch + 1, epoches, loss.item()))
flag += 1
params = list(model.named_parameters())#get the index by debuging
correct = 0
total = 0
for images, labels in test_loader:
images = images.reshape(-1, 28 * 28).to(device)
labels = labels.to(device)
output = model(images)
values, predicte = torch.max(output, 1)
total += labels.size(0)
correct += (predicte == labels).sum().item()
print("The accuracy of total {} images: {}%".format(total, 100 * correct / total))
return params
combine_params :对模型参数做平均,只对权重 w 做平均。
def combine_params(para_A,para_B,para_C):
fc1_wA=para_A[0][1].data
fc1_wB=para_B[0][1].data
fc1_wC=para_C[0][1].data
fc2_wA=para_A[2][1].data
fc2_wB=para_B[2][1].data
fc2_wC=para_C[2][1].data
com_para_fc1=(fc1_wA+fc1_wB+fc1_wC)/3
com_para_fc2=(fc2_wA+fc2_wB+fc2_wC)/3
return com_para_fc1,com_para_fc2
主函数 :先进行正常的训练测试,再进行联邦后的训练测试。
para_A=train_and_test_1(train_loader_A,test_loader)
para_B=train_and_test_1(train_loader_B,test_loader)
para_C=train_and_test_1(train_loader_C,test_loader)
for i in range(10):
print("The {} round to be federated!!!".format(i+1))
com_para_fc1,com_para_fc2=combine_params(para_A,para_B,para_C)
para_A=train_and_test_2(train_loader_A,test_loader,com_para_fc1,com_para_fc2)
para_B=train_and_test_2(train_loader_B,test_loader,com_para_fc1,com_para_fc2)
para_C=train_and_test_2(train_loader_C,test_loader,com_para_fc1,com_para_fc2)
实验结果
三个机构 :
可以看到,刚开始三个机构的准确率分别为 24.85%、17.95%和 20%,而联邦以后,其准确率有明显提高,且随着联邦轮数的增加,最终三个机构的准确率均能达到 80%以上。
五个机构 :对于五个机构,只需改一下对参数做平均的函数以及主函数。
机构数目越多,意味着可共享的数据越多,当第六轮联邦后,各机构的准确率均达到 82%以上,继续联邦,其准确率提高不大。由于我们采用的是最简单的三层神经网络(输入层、隐藏层、输出层),这样的准确率还是令人满意的。
结语
以前碰到配置环境中的 bug、程序的 bug 或者无从下手得项目,都是看 csdn 博客来学习。本着分享知识和见证自己成长的初心,我写下了自己的第一篇博客!