最近在做推荐算法相关研究,
先拿一个协同过滤代码练手。
网上找代码很容易,但是大多是讲原理的示例代码,在真实数据集上运行问题特别多。
以一个2w节点的开源数据集为例(baby.inter)
https://github.com/enoche/MMRec/
https://drive.google.com/drive/folders/13cBy1EA_saTUuXxVllKgtfci2A09jyaG?usp=sharing
列举一些我遇到的问题:
1.稀疏矩阵占用内存过大,使用coo_matrix可以极大减少占用
2.矩阵类型转换过程中维度经常出错,矩阵读取某列的方法和稀疏矩阵读某列的方法返回格式不同
3.使用多线程提速的过程中内存溢出,导致部分线程崩溃,运行结束后只得到一半结果。
4.获取结果的语句嵌套过多难以理解,例如:recommendations = unrated_items[np.argsort(-pred[unrated_items])][0:top_k]
5.格式转换不当导致内存溢出(64g内存的电脑啊,直接蓝屏了)
6.np.save np.savez不会用。
下面直接上代码
1.读取数据,这里数据的最后一位是训练集、验证集、测试集的划分标签。chatGPT写了几遍也不对,还是自己写的。
# 数据读取
def load_data(file_path):
train_data_u = []
train_data_i = []
train_data_r = []
val_data_u = []
val_data_i = []
val_data_r = []
test_data_u = []
test_data_i = []
test_data_r = []
with open(file_path) as file:
next(file) # 跳过第一行
for line in file:
line = line.strip().split("\t")
userID = int(line[0])
itemID = int(line[1])
rating = float(line[2])
x_label = line[4]
if x_label == '0':
train_data_u.append(userID)
train_data_i.append(itemID)
train_data_r.append(rating)
elif x_label == '1':
val_data_u.append(userID)
val_data_i.append(itemID)
val_data_r.append(rating)
elif x_label == '2':
test_data_u.append(userID)
test_data_i.append(itemID)
test_data_r.append(rating)
num_users = max(train_data_u) + 1
num_items = max(train_data_i) + 1
train_data_matrix = coo_matrix((train_data_r, (train_data_u,train_data_i)), shape=(num_users, num_items))
val_data_matrix = coo_matrix((val_data_r, (val_data_u,val_data_i)), shape=(num_users, num_items))
test_data_matrix = coo_matrix((test_data_r, (test_data_u,test_data_i)), shape=(num_users, num_items))
return train_data_matrix, val_data_matrix,test_data_matrix
2.训练模型,这里很多代码都只获取用户相似度,然后在预测过程中用户相似度乘评分矩阵得到相似用户的评分和。但是把矩阵乘法运算放在这里只需要计算1次,减少了约2w次的大规模矩阵乘法运算。
函数返回的矩阵每行表示某个用户对所有物品的评分。(这里把每个矩阵的维度都列举出来就能分析清楚了,我只是做笔记,懒得写。)矩阵每列表示一个商品。
# 模型训练
def train_model(data_matrix):
similarity_matrix = cosine_similarity(data_matrix, dense_output=False)
pred=similarity_matrix.dot(data_matrix)
print(pred)
return pred
3.预测用户评分,这里注意tocsr(),被坑了好久,coo_matrix可以直接进行点乘运算,但是不能做切片运算,切片出来某一行也和普通矩阵索引出来某行不一样。
然后就是非常复杂的排序运算,这里实在无法优化,也是整个代码最耗时的部分,因为要执行大约2w次。
def predict(ratings,pred, user_id, top_k):
pred1=pred.tocsr()[user_id].toarray()[0]
ratings1=ratings.tocsr()[user_id].toarray()[0]
unrated_items = np.where(ratings1 == 0)[0]
sort_index=np.argsort(-pred1[unrated_items])
sort_items=unrated_items[sort_index]
# 根据预测评分生成推荐列表
recommendations =sort_items [0:top_k]
return recommendations
4.评价函数,没什么说的,gpt写的直接拿来用
# 评估指标
def evaluate_recommendations(recommendations, test_matrix, k):
num_users, num_items = test_matrix.shape
recall = 0.0
ndcg = 0.0
precision = 0.0
map_value = 0.0
for user_id in range(num_users):
test_ratings = test_matrix[user_id]
if np.sum(test_ratings) > 0:
relevant_items = np.where(test_ratings > 0)[0]
recall += len(set(recommendations[user_id]) & set(relevant_items)) / len(relevant_items)
dcg = 0.0
idcg = np.log2(2)
for i in range(k):
item_id = recommendations[user_id][i]
if item_id in relevant_items:
dcg += 1 / np.log2(i + 2)
ndcg += dcg / idcg
precision += len(set(recommendations[user_id]) & set(relevant_items)) / k
ap = 0.0
for i in range(k):
item_id = recommendations[user_id][i]
if item_id in relevant_items:
ap += len(set(recommendations[user_id]) & set(relevant_items)) / (i + 1)
ap /= len(relevant_items)
map_value += ap
recall /= num_users
ndcg /= num_users
precision /= num_users
map_value /= num_users
return recall, ndcg, precision, map_value
5.主函数,写主函数一部分是c语言的习惯,另一部分是随时可以用return打断,否则当执行到thread.join时代码无法ctrl+c打断,只能直接关控制台。(哦,我都是记事本写代码,在控制台执行,没有IDE)
def printTime():
print(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}]")
def main():
print('start')
printTime()
# 数据路径
file_path = "..\\MMRec-master\\data\\baby\\baby.inter"
# 加载数据
train_matrix, val_matrix,test_matrix = load_data(file_path)
# 训练
pred = train_model(train_matrix)
print(pred.toarray()[0])
# 生成推荐
top_k = 10
recommendations = {}
recommendations[0]=(predict(train_matrix,pred,0, top_k))
print(recommendations[0])
print(len(recommendations))
#return
# 创建一个线程列表
threads = []
num_threads=16
# 将用户数量按照线程数进行划分
user_per_thread = test_matrix.shape[0] // num_threads
# 定义一个函数,用于并行计算推荐结果
def calculate_recommendations(start, end):
print(start,end)
for user_id in range(start, end):
recommendations[user_id] = predict(train_matrix, pred, user_id, top_k)#2.82G
if(user_id%500==0):
print(user_id)
for i in range(num_threads):
start = i * user_per_thread
end = (i+1) * user_per_thread
# 创建并启动一个新线程
t = threading.Thread(target=calculate_recommendations, args=(start, end))
t.start()
threads.append(t)
# 等待所有线程完成
for t in threads:
t.join()
start= int( test_matrix.shape[0] // num_threads)*num_threads
end=test_matrix.shape[0]
calculate_recommendations(start, end)
printTime()
np.savez('data.npz',recommendations=recommendations,test_matrix=test_matrix)
'''data=np.load('data.npz',allow_pickle=True)
recommendations=data['recommendations'].item();
test_matrix=data['test_matrix'].item();'''
# 评估指标
recall, ndcg, precision, map_value = evaluate_recommendations(recommendations, test_matrix.toarray(), top_k)
printTime()
print("Recall =", recall)
print("NDCG =", ndcg)
print("Precision =", precision)
print("MAP =", map_value)
main()
6.上面的np.savez是因为在花费了11个小时运行完代码后在评分环节可能崩溃,为了避免重新运行11个小时,第一时间保存结果。
后记:
这个代码在单线程第一次跑通时占用2.8g内存,11个小时。
后来4个线程,2.8*4+2.8+2=16,刚好让一台16g内存的电脑随时崩溃,4个子线程,1个主线程,2g系统占用。
后来换了一台电脑8个线程占用1个半小时,内存占用25g。
现在这版同时开16个线程,只需要20秒,
我去挑战更大的数据集了。