TOIS-2023 CCF-A
本文的主要内容是介绍了一种名为LightFR的轻量级联邦推荐系统,该系统采用隐私保护的矩阵分解技术。文章首先回顾了矩阵分解、学习哈希和联邦推荐系统等相关领域的研究,然后详细介绍了LightFR的框架和算法,并从存储/通信效率、推荐效率和隐私保护等多个方面论证了其优越性。接下来,文章通过实验验证了LightFR的有效性和效率,并讨论了其对用户隐私的保护能力。
服务器和客户端之间高效的联邦离散算法
,以嵌入用户偏好到离散汉明空间,同时降低服务器和客户端的资源利用,保护用户隐私。论文提出了一种名为LightFR的轻量级联邦推荐方法,它通过矩阵分解和隐私保护的学习哈希技术实现轻量级、高效且安全的推荐。
实验设计包括以下几个方面:
本文的最核心创新点包括以下几点:
Client.py
import numpy as np
from Metrics import Metrics
class Client:
def __init__(self, configs):
self.bu = None #客户端的哈希表示
self.D = None # 与项目相关的全局参数
self.data_u = None #特定客户端的用户数据
self.data_bin_u = None #特定客户端的用户数据的二进制表示
self.data_len_u = None #特定客户端的用户数据的长度
self.configs = configs
def client_update(self, client, master_flag):
'''
client process, could be implemented in parallel
:param master_flag:
:param bu:
:param D:
:param data_u:
:param data_bin_u:
:param l:
:return:
'''
while True:
flag = 0
for k in range(self.configs.code_len):
dk = client.D[:, k]
buk_hat = np.sum(
( client.data_u - np.dot(client.D, client.bu.T)) * dk * client.data_bin_u) + 2 * self.configs.lambdaa * client.data_len_u * client.bu[k]
buk_new = np.sign(self.K(buk_hat, client.bu[k]))
if (client.bu[k] != buk_new):
flag = 1
client.bu[k] = buk_new
if (flag == 0):
break
master_flag = 1
return client.bu, master_flag
def get_inter_params(self, i, k):
di = self.D[i, :]
grads = (self.data_u[i] - np.dot(self.bu, di.T)) * self.bu[k] * self.data_bin_u[i]
grads_len = self.data_bin_u[i]
return grads, grads_len
def K(self, x, y):
return x if x != 0 else y
def calculate_loss(self):
local_loss = np.sum((self.data_u - np.dot(self.D, self.bu)) ** 2 * self.data_bin_u)
return local_loss
def evaluate_local(self, items, val_data):
configs = {'top_k': 10, 'num_negative_test': 49, }
metric = Metrics(configs)
bus = self.bu
dis = self.D[items]
rating_pred = np.multiply(bus, dis)
preds = np.sum(rating_pred, axis=1)
val_data['pred'] = preds.tolist()
hr = metric.get_hit_ratio(val_data)
ndcg = metric.get_ndcg(val_data)
return hr, ndcg
Client.py
定义解读
属性:
self.bu
: 客户端的哈希表示,代表用户的特征。self.D
:与项目相关的全局参数,可能是由服务端维护并与客户端共享的项目特征。self.data_u
: 特定客户端的用户数据,如用户的评分或交互数据。self.data_bin_u
: 用户数据的二进制表示,用于某些类型的计算。self.data_len_u
: 用户数据的长度,可能用于计算或正则化。self.configs
:客户端的配置设置,如哈希码的长度、正则化参数等。方法
client_update
:
get_inter_params
:
K
calculate_loss
:
evaluate_local
:
总结: 这个 Client 类体现了联邦学习在推荐系统中的应用,其中每个客户端独立地更新其模型(哈希表示),并可以在本地进行模型性能评估。整个过程旨在优化用户的哈希表示,使其能够更好地捕捉用户的偏好和行为模式,同时在联邦学习的框架下保持用户数据的隐私性。
Base.py
# -*- coding: utf-8 -*-
import numpy as np
import scipy.linalg as la
from collections import defaultdict
from math import log
import pandas as pd
import torch
from DataLoader import DataLoaderCenter
from Metrics import Metrics
class Base:
def __init__(self):
self.user = {}
self.item = {}
self.id2user = {}
self.id2item = {}
self.u_i_r = defaultdict(dict)
self.i_u_r = defaultdict(dict)
self.minVal = 0.5
self.maxVal = 4
self.dataset_name = 'filmtrust'
self.federated_train_data_path = 'data/' + self.dataset_name + '/' + self.dataset_name + '_train.csv'
self.federated_valid_data_path = 'data/' + self.dataset_name + '/' + self.dataset_name + '_val.csv'
self.federated_test_data_path = 'data/' + self.dataset_name + '/' + self.dataset_name + '_test.csv'
pass
def init_model(self):
self.generate_vocabulary()
self.rating_matrix, self.rating_matrix_bin, self.globalmean = self.get_rating_matrix()
self.B = np.sign(np.array(np.random.randn(len(self.user), self.configs.code_len) / (self.configs.code_len ** 0.5)))
self.D = np.sign(np.array(np.random.randn(len(self.item), self.configs.code_len) / (self.configs.code_len ** 0.5)))
self.loss, self.last_delta_loss = 0.0, 0.0
def trainSet(self):
with open(self.federated_train_data_path, 'r') as f:
for index, line in enumerate(f):
if index != 0: # 去除headers
u, i, r = line.strip('\r\n').split(',')
r = 2 * self.configs.code_len * (float(r)) - self.configs.code_len
yield (int(u), int(i), float(r))
def containUser(self, user_id):
if user_id in self.user:
return True
else:
return False
def containItem(self, item_id):
if item_id in self.item:
return True
else:
return False
def valid_test_Set(self, path):
with open(path, 'r') as f:
for index, line in enumerate(f):
if index != 0: # 去除headers
u, i, r = line.strip('\r\n').split(',')
# r = 2 * self.code_len * (float(int(r) - self.minVal) / (self.maxVal - self.minVal) + 0.01) - self.code_len
yield (int(u), int(i), float(r))
def read_federated_valid_dataset(self, path):
data_val = pd.read_csv(path)
return data_val
def generate_vocabulary(self):
for index, line in enumerate(self.trainSet()):
user_id, item_id, rating = line
self.u_i_r[user_id][item_id] = rating
self.i_u_r[item_id][user_id] = rating
if user_id not in self.user:
self.user[user_id] = len(self.user)
self.id2user[self.user[user_id]] = user_id
if item_id not in self.item:
self.item[item_id] = len(self.item)
self.id2item[self.item[item_id]] = item_id
for index, line in enumerate(self.valid_test_Set(self.federated_valid_data_path)):
user_id, item_id, rating = line
self.u_i_r[user_id][item_id] = rating
self.i_u_r[item_id][user_id] = rating
if user_id not in self.user:
self.user[user_id] = len(self.user)
self.id2user[self.user[user_id]] = user_id
if item_id not in self.item:
self.item[item_id] = len(self.item)
self.id2item[self.item[item_id]] = item_id
for index, line in enumerate(self.valid_test_Set(self.federated_test_data_path)):
user_id, item_id, rating = line
self.u_i_r[user_id][item_id] = rating
self.i_u_r[item_id][user_id] = rating
if user_id not in self.user:
self.user[user_id] = len(self.user)
self.id2user[self.user[user_id]] = user_id
if item_id not in self.item:
self.item[item_id] = len(self.item)
self.id2item[self.item[item_id]] = item_id
def get_rating_matrix(self):
rating_matrix = np.zeros((len(self.user), len(self.item))) # (943, 1596)
globalmean = 0.0
lens = 0
for index, line in enumerate(self.trainSet()):
lens += 1
user_id, item_id, rating = line
globalmean += rating
rating_matrix[self.user[user_id]][self.item[item_id]] = int(rating)
rating_matrix_bin = (rating_matrix > 0).astype('int')
globalmean = globalmean / (lens)
return rating_matrix, rating_matrix_bin, globalmean
def K(self, x, y):
return x if x != 0 else y
def valid_test_model(self, path):
pre_true_dict = defaultdict(list)
for index, line in enumerate(self.valid_test_Set(path)):
user_id, item_id, rating = line
if (self.containUser(user_id) and self.containItem(item_id)):
bu = self.B[self.user[user_id], :]
di = self.D[self.item[item_id], :]
pre = np.dot(bu, di)
elif (self.containUser(user_id) and not self.containItem(item_id)):
pre = sum(self.u_i_r[user_id].values()) / float(len(self.u_i_r[user_id]))
elif (not self.containUser(user_id) and self.containItem(item_id)):
pre = sum(self.i_u_r[item_id].values()) / float(len(self.i_u_r[item_id]))
else:
pre = self.globalmean
pre_true_dict[user_id].append([pre, rating])
metrics = Metrics()
ndcg_10 = metrics.calDCG_k(pre_true_dict, 10)
return ndcg_10
Base.py
定义解读
这段代码定义了一个名为 Base
的类,它是一个推荐系统的基础架构。这个类包括初始化、构建词汇表、生成评分矩阵以及评估模型的方法。以下是对代码中各个部分的详细解读:
类初始化 (__init__)
初始化中定义了多个字典和路径变量,用于存储用户和项目的信息以及训练、验证和测试数据的路径。
生成词汇表 (generate_vocabulary)
从训练集、验证集和测试集中提取用户-项目评分信息,建立两个双向映射:用户ID与内部索引的映射 (self.user
和 self.id2user
),以及项目ID与内部索引的映射 (self.item
和 self.id2item
)。
构建用户到项目 (self.u_i_r
) 和项目到用户 (self.i_u_r
) 的评分字典。
生成评分矩阵 (get_rating_matrix)
创建一个用户-项目评分矩阵 (rating_matrix),其中每个元素代表用户对项目的评分。
生成一个二值评分矩阵 (rating_matrix_bin),表示用户是否对项目进行了评分。
计算全局平均评分 (globalmean)。
模型初始化 (init_model)
调用 generate_vocabulary
方法并生成评分矩阵。
初始化用户和项目的隐特征矩阵 (self.B 和 self.D),这些矩阵用随机值填充并通过符号函数处理。
训练集和验证/测试集的处理 (trainSet, valid_test_Set)
这些方法从指定路径读取训练集和验证/测试集数据。
用户和项目存在性检查 (containUser, containItem)
检查特定的用户ID或项目ID是否存在于已定义的用户或项目字典中。
评估模型 (valid_test_model)
使用验证或测试集数据评估模型的性能。
计算每个用户的预测评分和实际评分,然后使用这些数据计算归一化累积增益(NDCG)。
总结
Base
类提供了一个推荐系统基本框架,包括数据处理、模型初始化和评估。这个类能够处理用户和项目的交互数据,生成评分矩阵,并对推荐模型的性能进行评估。通过这种方式,它为构建更复杂的推荐系统模型提供了基础。
Configs.py
class Configs:
def __init__(self):
self.code_len = 64
self.threshold = 1e-4
self.global_rounds = 50
self.client_ratio = 0.6
self.lambdaa = 0.6
Configs
类的定义
Configs
类的属性
self.code_len:
表示哈希码的长度。在这个上下文中,它可能指的是用户或项目的哈希表示中使用的二进制位的数量。
在这个例子中,哈希码长度设置为 64,这意味着每个用户或项目将被表示为一个包含 64 位的向量。
self.threshold:
这是一个阈值参数,可能用于确定训练过程中的收敛标准或用于其他类型的判断。
在这里,阈值设置为 1e-4(即 0.0001),这可能表示当模型在连续迭代中的改进低于这个值时,训练可以停止。
self.global_rounds:
指定全局训练轮数。在联邦学习的背景下,这可能指的是所有客户端参与模型更新的总轮数。
这里设置为 50,意味着整个训练过程将进行 50 轮迭代。
self.client_ratio:
这个参数可能用于确定在每轮训练中参与的客户端比例。
0.6 表示每轮有 60% 的客户端将被随机选中参与模型的更新。
self.lambdaa:
这是正则化参数,通常用于控制模型复杂性,以避免过拟合。
设置为 0.6,这个参数在计算损失函数或进行参数更新时可能被用作正则化项的权重。
总结:
Configs
类作为一个配置存储器,提供了一种便捷的方式来管理和调整模型训练过程中使用的多个参数。通过调整这些参数,可以控制模型的训练行为,如迭代次数、客户端参与度以及正则化程度等,这对于优化模型的性能和效率至关重要。
DataLoader类:
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader as TDataLoader
class DataLoader():
def __init__(self, configs, client_data):
self.configs = configs
self.train_data, self.val_data, self.test_data = client_data['train'], client_data['val'], client_data[
'test']
def get_train_dataloader(self):
users, items, labels = torch.LongTensor(np.array(self.train_data['user_id'])), torch.LongTensor(
np.array(self.train_data['item_id'])), torch.FloatTensor(np.array(self.train_data['ratings']))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
return TDataLoader(dataset, batch_size=self.configs['local_batch_size'], shuffle=True)
def get_val_dataloader(self):
if self.val_data.empty:
users, items, labels = torch.LongTensor(self.val_data['user_id']), torch.LongTensor(
self.val_data['item_id']), torch.FloatTensor(self.val_data['ratings'])
else:
users, items, labels = torch.LongTensor(np.array(self.val_data['user_id'])), torch.LongTensor(
np.array(self.val_data['item_id'])), torch.FloatTensor(np.array(self.val_data['ratings']))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
client_data_len = len(items) # 100 for implicit feedback, actual length for explicit feedback during validation in each local client
return TDataLoader(dataset, batch_size=client_data_len, shuffle=False)
def get_test_dataloader(self):
if self.test_data.empty:
users, items, labels = torch.LongTensor(self.test_data['user_id']), torch.LongTensor(
self.test_data['item_id']), torch.FloatTensor(self.test_data['ratings'])
else:
users, items, labels = torch.LongTensor(np.array(self.test_data['user_id'])), torch.LongTensor(
np.array(self.test_data['item_id'])), torch.FloatTensor(np.array(self.test_data['ratings']))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
client_data_len = len(items)
return TDataLoader(dataset, batch_size=client_data_len, shuffle=False)
class DataLoaderCenter():
def __init__(self, configs, val_data):
self.configs = configs
self.val_data= val_data
def get_train_dataloader(self):
users, items, labels = torch.LongTensor(np.array(self.train_data['user_id'], dtype='int32')), torch.LongTensor(
np.array(self.train_data['item_id'], dtype='int32')), torch.FloatTensor(
np.array(self.train_data['ratings'], dtype='float32'))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
return TDataLoader(dataset, batch_size=self.configs['local_batch_size'], shuffle=True)
def get_val_dataloader(self):
if self.val_data.empty:
users, items, labels = torch.LongTensor(self.val_data['user_id']), torch.LongTensor(
self.val_data['item_id']), torch.FloatTensor(self.val_data['ratings'])
else:
users, items, labels = torch.LongTensor(np.array(self.val_data['user_id'], dtype='int32')), torch.LongTensor(
np.array(self.val_data['item_id'], dtype='int32')), torch.FloatTensor(np.array(self.val_data['ratings'], dtype='float32'))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
data_len = self.configs['num_negative_test'] + 1
return TDataLoader(dataset, batch_size=data_len, shuffle=False)
def get_test_dataloader(self):
if self.test_data.empty:
users, items, labels = torch.LongTensor(self.test_data['user_id']), torch.LongTensor(
self.test_data['item_id']), torch.FloatTensor(self.test_data['ratings'])
else:
users, items, labels = torch.LongTensor(np.array(self.test_data['user_id'], dtype='int32')), torch.LongTensor(
np.array(self.test_data['item_id'], dtype='int32')), torch.FloatTensor(np.array(self.test_data['ratings'], dtype='float32'))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
data_len = self.configs['num_negative_test'] + 1
return TDataLoader(dataset, batch_size=data_len, shuffle=False)
class UserItemRatingDataset(Dataset):
"""Wrapper, convert <user, item, rating> Tensor into Pytorch Dataset"""
def __init__(self, user_tensor, item_tensor, target_tensor):
"""
args:
target_tensor: torch.Tensor, the corresponding rating for <user, item> pair
"""
self.user_tensor = user_tensor
self.item_tensor = item_tensor
self.target_tensor = target_tensor
def __getitem__(self, index):
return self.user_tensor[index], self.item_tensor[index], self.target_tensor[index]
def __len__(self):
return self.user_tensor.size(0)
if __name__ == '__main__':
configs = {
'dataset': 'ml-1m',
'data_type': 'explicit',
'num_negative_train': 4,
'num_negative_test': 49,
'local_batch_size': 100,
'cold_nums': 10
}
dr = DataReader(configs)
# client_data = dr.get_data_by_client(0)
data = dr.get_train_val_test_data()
dl_center = DataLoaderCenter(configs, data)
td = dl_center.get_train_dataloader()
vd = dl_center.get_val_dataloader()
for index, data in enumerate(vd):
if index == 0:
print(data)
这段代码定义了一个数据加载的类 DataLoader
和 DataLoaderCenter
,以及一个PyTorch Dataset
子类 UserItemRatingDataset
。这些类用于将推荐系统中的用户、项目和评分数据转换为适用于机器学习模型训练和评估的格式。以下是对代码中各个部分的详细解读:
DataLoader
和 DataLoaderCenter
类
这两个类的作用是从提供的数据中创建可用于训练、验证和测试的数据加载器 (DataLoader)。
1.初始化
2.获取数据加载器:
get_train_dataloader, get_val_dataloader, get_test_dataloader
方法分别用于创建训练集、验证集和测试集的DataLoader
。
这些方法首先将数据转换为适用于PyTorch
的张量格式,然后创建一个 UserItemRatingDataset
实例,最后使用 TDataLoader
返回一个数据加载器。
3.UserItemRatingDataset类
总结:这段代码主要用于数据的预处理和加载,为推荐系统的机器学习模型和训练评估提供了必要的数据输入。通过将原始的数据转换为Pytorch模型的格式,这些类和方法使得模型训练过程更加高效和灵活。
main.py
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import torch
from Base import Base
from Client import Client
from Configs import Configs
from DataLoader import DataLoaderCenter
class LightFR(Base):
def __init__(self):
super(LightFR, self).__init__()
self.configs = Configs()
pass
def get_random_client_list(self):
size = int(len(self.user) * self.configs.client_ratio)
random_client_list = np.random.choice(list(self.user.values()), size)
return random_client_list
def get_client_data(self, client_id):
client = Client(self.configs)
client.bu = self.B[client_id, :]
client.D = self.D
client.data_u = self.rating_matrix[client_id, :]
client.data_bin_u = self.rating_matrix_bin[client_id, :]
client.data_len_u = len(self.u_i_r[self.id2user[client_id]])
return client
def train_model(self):
current_round = 0
last_loss = 0.0
while (current_round < self.configs.global_rounds-40):
master_flag = 0
current_round += 1
sampled_clients = self.get_random_client_list()
#runing on clients, could be implemented in parallel
for u in sampled_clients:
client = self.get_client_data(u)
bu, master_flag = client.client_update(client, master_flag)
# running on the server
for i in range(len(self.item)):
while True:
flag = 0
di = self.D[i, :]
for k in range(self.configs.code_len):
# The following can be uploaded by the client side, and we upload the intermediate gradients, i.e., grads_a and grads_b, instead of the raw rating or the user codes. We can use the client-style computation as descriped in the paper, such as B[u,k], rating_matrix[u,i] and rating_matrix_bin[u,i], but it runs slowly.
# For efficient training, we use the batch-style computation to calculate the gradients.
# The intermediate gradients can be divided into multiple clients, that is loss_total=(self.rating_matrix[:, i] - np.dot(self.B, di.T)) can be reformulated into loss_user=(self.rating_matrix[u, i] - np.dot(self.B[u,:], di.T)), so the loss_total can be regarded as the aggregation from the multiple local loss_user.
bk = self.B[sampled_clients, k]
grads_a = (self.rating_matrix[sampled_clients, i] - np.dot(self.B[sampled_clients], di.T)) * bk * self.rating_matrix_bin[sampled_clients, i]
grads_b = len(self.rating_matrix_bin[sampled_clients, i])
# the following performs the simulated aggregation process
dik_hat = np.sum(grads_a) + grads_b * di[k]
dik_new = np.sign(self.K(dik_hat, di[k]))
if (di[k] != dik_new):
flag = 1
di[k] = dik_new
if (flag == 0):
break
self.D[i, :] = di
master_flag = 1
# calculating the loss for all the clients and upload its loss into the server and then aggregate them
self.loss = 0.0
for u in range(len(self.user)):
client = self.get_client_data(u)
local_loss = client.calculate_loss()
self.loss += local_loss
federated_valid_hr_10, federated_valid_ndcg_10 = self.federated_valid_test_model(
self.federated_valid_data_path)
delta_loss = self.loss - last_loss
print('current_round %d: current_loss = %.5f, delta_loss = %.5f valid_HR@10=%.5f valid_NDCG@10=%.5f' %
(current_round, self.loss, delta_loss, federated_valid_hr_10, federated_valid_ndcg_10))
if (master_flag == 0):
break
if (abs(delta_loss) < self.configs.threshold or abs(delta_loss) == abs(self.last_delta_loss)):
break
self.last_delta_loss = delta_loss
last_loss = self.loss
federated_valid_hr_10, federated_valid_ndcg_10 = self.federated_valid_test_model(self.federated_test_data_path)
print('test HR@10 = %.5f, NGCD@10 = %.5f' % (federated_valid_hr_10, federated_valid_ndcg_10))
def federated_valid_test_model(self, path):
val_data = self.read_federated_valid_dataset(path)
configs = {'top_k': 10, 'num_negative_test': 49, }
dl = DataLoaderCenter(configs, val_data)
val_dataloader = dl.get_val_dataloader()
hr_10, ndcg_10 = 0.0, 0.0
len = 0
# one batch represents a client since there is the same user in a batch
for batch_id, batch in enumerate(val_dataloader):
len += 1
assert isinstance(batch[0], torch.LongTensor)
users, items, ratings = batch[0], batch[1], batch[2]
val_data = pd.DataFrame(zip(users.tolist(), items.tolist(), ratings.tolist()),
columns=['user_id', 'item_id', 'ratings'])
items = [self.item[item] for item in items.tolist()]
user_id = self.user[int(users[0])]
client = self.get_client_data(user_id)
hr, ndcg = client.evaluate_local(items, val_data)
hr_10 += hr[10]
ndcg_10 += ndcg[10]
hr_10 /= len
ndcg_10 /= len
return hr_10, ndcg_10
def main(self):
self.init_model()
self.train_model()
if __name__ == '__main__':
model = LightFR()
model.main()
main.py类的定义:
这段代码定义了LightFR
类,它继承自Base
类并实现了一个联邦学习框架。LightFR
类中包含了模型的初始化、训练过程和验证/测试方法。以下是对主要方法的详细解读:
__init__
) get_random_client_list
) import pandas as pd
import torch
import numpy as np
import math
from sklearn import metrics as sk_metrics
# from reader.data_reader import DataReader
# from loader.data_loader import DataLoader
class Metrics(object):
def __init__(self, configs):
super(Metrics, self).__init__()
self.configs = configs
def get_hit_ratio(self, test_data: pd.DataFrame): # for implicit feedback
top_k = self.configs['top_k']
hrs = {}
if test_data.empty:
for current_top_k in range(1, top_k + 1):
hrs[current_top_k] = 0.0
return hrs
assert 'pred' in test_data.columns, "没有预测值"
test_data['rank'] = test_data['pred'].rank(method='first', ascending=False)
test_data_rank = int(test_data.head(1)['rank'])
for current_top_k in range(1, top_k + 1):
if test_data_rank <= current_top_k:
hrs[current_top_k] = 1.0
else:
hrs[current_top_k] = 0.0
return hrs
def get_ndcg(self, test_data: pd.DataFrame): # for implicit feedback
top_k = self.configs['top_k']
ndcgs = {}
if test_data.empty:
for current_top_k in range(1, top_k + 1):
ndcgs[current_top_k] = 0.0
return ndcgs
assert 'pred' in test_data.columns, "没有预测值"
test_data['rank'] = test_data['pred'].rank(method='first', ascending=False)
test_data_rank = int(test_data.head(1)['rank'])
for current_top_k in range(1, top_k + 1):
if test_data_rank <= current_top_k:
ndcgs[current_top_k] = math.log(2) * 1.0 / math.log(1 + test_data_rank)
else:
ndcgs[current_top_k] = 0.0
return ndcgs
def get_hit_ratio_explicit_client(self, test_data: pd.DataFrame): # for explicit feedback
top_k = self.configs['top_k']
hrs = {}
if test_data.empty:
for current_top_k in range(1, top_k + 1):
hrs[current_top_k] = 0.0
return hrs
assert 'pred' in test_data.columns, "没有预测值"
data = test_data[['pred', 'ratings']].to_numpy()
real_value_list = sorted(data, key=lambda x: x[1], reverse=True)
predict_value_list = sorted(data, key=lambda x: x[0], reverse=True)
test_data['rank'] = test_data['pred'].rank(method='first', ascending=False)
test_data_rank = int(test_data.head(1)['rank'])
for current_top_k in range(1, top_k + 1):
if test_data_rank <= current_top_k:
hrs[current_top_k] = 1.0
else:
hrs[current_top_k] = 0.0
return hrs
def get_ndcg_explicit_client(self, test_data: pd.DataFrame): # for explicit feedback
top_k = self.configs['top_k']
ndcgs = {}
if test_data.empty:
for current_top_k in range(1, top_k + 1):
ndcgs[current_top_k] = 0.0
return ndcgs
assert 'pred' in test_data.columns, "没有预测值"
data = test_data[['pred', 'ratings']].to_numpy()
real_value_list = sorted(data, key=lambda x: x[1], reverse=True)
predict_value_list = sorted(data, key=lambda x: x[0], reverse=True)
for current_top_k in range(1, top_k + 1):
if len(real_value_list) >= current_top_k:
idcg, dcg = 0.0, 0.0
for i in range(current_top_k):
idcg += (pow(2, real_value_list[i][1]) - 1) / (math.log(i + 2, 2))
dcg += (pow(2, predict_value_list[i][1]) - 1) / (math.log(i + 2, 2))
if idcg != 0:
ndcgs[current_top_k] = float(dcg / idcg)
else:
ndcgs[current_top_k] = 0.0
else:
ndcgs[current_top_k] = 0.0
return ndcgs
def get_auc(self, test_data: pd.DataFrame):
pass
def get_mrr(self, test_data: pd.DataFrame):
pass
def get_rmse(self, test_data: pd.DataFrame):
assert 'pred' in test_data.columns, "没有预测值"
y = test_data['ratings']
y_hat = test_data['pred']
value = sk_metrics.mean_squared_error(y, y_hat) ** 0.5
return value
def get_mae(self, test_data: pd.DataFrame):
assert 'pred' in test_data.columns, "没有预测值"
y = test_data['ratings']
y_hat = test_data['pred']
value = sk_metrics.mean_absolute_error(y, y_hat)
return value
def get_rmse_client(self, test_data: pd.DataFrame):
assert 'pred' in test_data.columns, "没有预测值"
y = test_data['ratings']
y_hat = test_data['pred']
l = len(y)
value = abs(y - y_hat) ** 2
value = value.sum()
result = math.sqrt(value / l)
return result
def get_mae_client(self, test_data: pd.DataFrame):
assert 'pred' in test_data.columns, "没有预测值"
y = test_data['ratings']
y_hat = test_data['pred']
l = len(y)
value = abs(y - y_hat)
value = value.sum()
result = value / l
return result
def calDCG_k(self, dictdata, k):
nDCG = []
for key in dictdata.keys():
listdata = dictdata[key]
real_value_list = sorted(listdata, key=lambda x: x[1], reverse=True)
idcg = 0.0
predict_value_list = sorted(listdata, key=lambda x: x[0], reverse=True)
dcg = 0.0
if len(listdata) >= k:
for i in range(k):
idcg += (pow(2, real_value_list[i][1]) - 1) / (log(i + 2, 2))
dcg += (pow(2, predict_value_list[i][1]) - 1) / (log(i + 2, 2))
if (idcg != 0):
nDCG.append(float(dcg / idcg))
else:
continue
ave_ndcg = np.mean(nDCG)
# print(nDCG)
return ave_ndcg
if __name__ == '__main__':
configs = {
'dataset': 'filmtrust',
'data_type': 'implicit',
'num_negative_train': 4,
'num_negative_test': 99,
'local_batch_size': 100,
'top_k': 10
}
dr = DataReader(configs)
client_data = dr.get_data_by_client(0)
dl = DataLoader(configs, client_data)
test_data = dl.get_test_dataloader()
metric = Metrics(configs)
for batch_id, batch in enumerate(test_data):
assert isinstance(batch[0], torch.LongTensor)
users, items, labels = batch[0], batch[1], batch[2]
if batch_id == 0:
pred = np.random.uniform(0, 1, 100)
test_data = pd.DataFrame(
{'user_id': users,
'item_id': items,
'label': labels,
'pred': pred}
)
print(test_data)
value1 = metric.get_hit_ratio(test_data)
print(f'value1:{value1}')
value2 = metric.get_ndcg(test_data)
print(f'value2:{value2}')
value3 = metric.get_rmse(test_data)
print(f'value3:{value3}')
value4 = metric.get_mae(test_data)
print(f'value4:{value4}')
这段代码中定义了一个名为‘Metrics‘的类,用于计算推荐系统中的多种性能指标,包括命中率 (Hit Ratio)、归一化累积增益 (NDCG)、均方根误差 (RMSE) 和平均绝对误差 (MAE)。以下是对这个类中主要方法的解读:
Metrics
类初始化 (init):接收配置参数 configs,这些配置参数包含了评估指标时所需的信息,如 top_k。 性能评估方法
总结:
在提供的代码中,客户端使用其本地数据独立更新哈希表示。这些更新在内存中保留,并可以被服务端用来更新全局模型。尽管代码中没有直接显示客户端和服务端之间的通信过程,但在一个完整的联邦学习系统中,客户端的更新通常会被发送到服务端,服务端则根据这些信息进行全局模型的更新。这样的设计旨在优化模型的整体性能,同时保护每个客户端的数据隐私。