计科210X 甘晴void 202108010XXX
节点分类(Node Classification)是图/图谱数据上常被采用的一个学习任务,既是用模型预测图中每个节点的类别。链接预测(Link Prediction)一般指的是,对存在多对象的总体中,每个对象之间的相互作用和相互依赖关系的推断过程。
共有7个文件如下
具体描述如下:
可以较为简单地理解如下:
GeneList
内保存了基因i对应的名称,Positive_LinkSL
内保存了基因i和j的联系,该文件内的每一行都是某两个基因之间的联系以及该联系的置信分数。(Network2_CPDB.tsv与Network3_string.tsv相仿)★但是由于Network2_CPDB和Network3_string并没有给出相应的节点特征信息,我认为给出的信息应该是不全的,故没有采用。
对于上述信息可以概述如下:
任务1
任务2
DGL(Deep Graph Library)是一个用于图神经网络(GNN)的开源深度学习库。它为研究人员和开发者提供了在图结构数据上进行深度学习的工具和接口。DGL支持多种图神经网络模型,包括GCN(Graph Convolutional Network)、GraphSAGE(Graph Sample and Aggregation)、GAT(Graph Attention Network)等。
DGL的主要特点包括:
读取基因数据和构建图:
open
函数读取基因列表文件(‘GeneList.txt’),将每行的基因名存储在gene_list
列表中。gene_dict
,将基因名映射为索引。torch.tensor
创建包含边索引和置信分数的图数据结构graph
。torch.tensor
转换为PyTorch张量。该部分的代码如下
# 读取基因列表
with open('GeneList.txt', 'r') as f:
gene_list = [line.strip() for line in f]
# 构建基因到索引的映射
gene_dict = {gene: idx for idx, gene in enumerate(gene_list)}
# 读取基因关系和置信分数
with open('Positive_LinkSL.txt', 'r') as f:
edges = [line.strip().split() for line in f]
# 提取基因关系的源节点、目标节点和置信分数
src_nodes = [gene_dict[edge[0]] for edge in edges] + [gene_dict[edge[1]] for edge in edges]
dst_nodes = [gene_dict[edge[1]] for edge in edges] + [gene_dict[edge[0]] for edge in edges]
confidence_scores = [float(edge[2]) for edge in edges] + [float(edge[2]) for edge in edges]
# 读取特征
with open('feature1_go.txt', 'r') as file:
feature1_go = np.array([list(map(float, line.split())) for line in file])
with open('feature2_ppi.txt', 'r') as file:
feature2_ppi = np.array([list(map(float, line.split())) for line in file])
# 构建图
edges = torch.tensor(src_nodes),torch.tensor(dst_nodes)
graph = dgl.graph(edges)
graph.edata['confidence'] = torch.tensor(confidence_scores,dtype=torch.float32)
graph.ndata['feature1_go'] = torch.tensor(feature1_go,dtype=torch.float32)
graph.ndata['feature2_ppi'] = torch.tensor(feature2_ppi,dtype=torch.float32)
"""print(graph)
# 输出边的权值值
edge_weights = graph.edata['confidence'].squeeze().numpy()
print("Edge Weights:")
print(edge_weights)
# 输出节点特征 'feature1_go'
feature1_go_values = graph.ndata['feature1_go'].squeeze().numpy()
print("Node Feature 'feature1_go':")
print(feature1_go_values)
# 输出节点特征 'feature2_ppi'
feature2_ppi_values = graph.ndata['feature2_ppi'].squeeze().numpy()
print("Node Feature 'feature2_ppi':")
print(feature2_ppi_values)"""
print(graph)
运行结果如下:
E:\anaconda\envs\python3-11\python.exe E:\python_files\数据挖掘\exp4\my.py
Graph(num_nodes=6375, num_edges=39334,
ndata_schemes={'feature1_go': Scheme(shape=(128,), dtype=torch.float32), 'feature2_ppi': Scheme(shape=(128,), dtype=torch.float32)}
edata_schemes={'confidence': Scheme(shape=(), dtype=torch.float32)})
该部分是成功的,成功地将我们需要的所有信息加入到图中了。
预处理结束之后,需要构建图神经网络模型
SAGE
。construct_negative_graph
函数构建负样本图。DotProductPredictor
模型。Model
,包括SAGE卷积和得分计算模块。代码如下:
# 构建一个2层的GNN模型
import dgl.nn as dglnn
import torch.nn as nn
import torch.nn.functional as F
class SAGE(nn.Module):
def __init__(self, in_feats, hid_feats, out_feats):
super().__init__()
# 实例化SAGEConve,in_feats是输入特征的维度,out_feats是输出特征的维度,aggregator_type是聚合函数的类型
self.conv1 = dglnn.SAGEConv(
in_feats=in_feats, out_feats=hid_feats, aggregator_type='mean')
self.conv2 = dglnn.SAGEConv(
in_feats=hid_feats, out_feats=out_feats, aggregator_type='mean')
def forward(self, graph, inputs):
# 输入是节点的特征
h = self.conv1(graph, inputs)
h = F.relu(h)
h = self.conv2(graph, h)
return h
def construct_negative_graph(graph, k):
src, dst = graph.edges()
neg_src = src.repeat_interleave(k)
neg_dst = torch.randint(0, graph.num_nodes(), (len(src) * k,))
return dgl.graph((neg_src, neg_dst), num_nodes=graph.num_nodes())
import dgl.function as fn
class DotProductPredictor(nn.Module):
def forward(self, graph, h):
# h是从5.1节的GNN模型中计算出的节点表示
with graph.local_scope():
graph.ndata['h'] = h
graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
return graph.edata['score']
def compute_loss(pos_score, neg_score):
# 间隔损失
n_edges = pos_score.shape[0]
return (1 - pos_score.unsqueeze(1) + neg_score.view(n_edges, -1)).clamp(min=0).mean()
class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.sage = SAGE(in_features, hidden_features, out_features)
self.pred = DotProductPredictor()
def forward(self, g, neg_g, x):
h = self.sage(g, x)
#return self.pred(g, h), self.pred(neg_g, h)
pos_score = self.pred(g, h)
neg_score = self.pred(neg_g, h)
return pos_score, neg_score
该步的图结构模型应该是没有问题的。
完成模型定义之后,可以开始训练模型:
construct_negative_graph
生成负样本图。代码如下:
node_features = graph.ndata['feature1_go']
n_features = node_features.shape[1]
k = 5
model = Model(n_features, 10, 5)
opt = torch.optim.Adam(model.parameters())
for epoch in range(1):
negative_graph = construct_negative_graph(graph, k)
pos_score, neg_score = model(graph, negative_graph, node_features)
loss = compute_loss(pos_score, neg_score)
opt.zero_grad()
loss.backward()
opt.step()
print(f'Epoch {epoch + 1}, Loss: {loss.item()}')
其中,k
是用于构建负样本图的参数。具体来说,对于每一对正样本边,会通过construct_negative_graph
函数生成 k
个负样本边。构建负样本是为了训练图神经网络(GNN)模型,其中负样本边的目的是提供模型更多的信息,使其能够更好地区分正样本和负样本,从而提高模型的性能。
一般来说,k取值不宜过低,但是,k取值增大会带来计算代价的增加和内存占用的增加。
仅仅对于k=5,我的本地计算机就出现了较大的问题。
首先是内存代价的不可接受,这需要30943271120bytes内存空间,换算过后是大约28.81GB,对于本地计算机的16GB运行内存来说,这已经超出太多了。
我将k值调整为1,即使仅仅是这样,虽然可以运行,但是资源基本上已经被全部占用了。
此外,我还将深度学习的层数调整为了1,但
假设上面的步骤都全部正确,接下来进行的是可视化输出。
true_labels
。# 输出边的置信度分布
print("Edge Confidence Distribution:")
print(pos_score.detach().numpy())
import networkx as nx
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
true_labels = torch.randint(0, 3, (len(gene_list),)) # 0, 1, 2 之间的随机标签
# 获取节点表示
with torch.no_grad():
node_embeddings = model.sage(graph, node_features).numpy()
# 将节点表示降维到二维空间进行可视化
tsne = TSNE(n_components=2, random_state=42)
node_embeddings_2d = tsne.fit_transform(node_embeddings)
# 构建 NetworkX 图
G = nx.Graph()
for i, gene in enumerate(gene_list):
G.add_node(gene, label=true_labels[i].item(), color=true_labels[i].item())
for edge, score in zip(edges, pos_score.detach().numpy()):
G.add_edge(gene_list[edge[0]], gene_list[edge[1]], score=score)
# 绘制图
plt.figure(figsize=(12, 8))
pos = nx.spring_layout(G, seed=42)
node_color = [true_labels[i].item() for i in range(len(gene_list))]
# 绘制节点
nx.draw_networkx_nodes(G, pos, node_size=100, node_color=node_color, cmap='viridis')
# 绘制链接预测的边
edge_color = ['b' if score > 0.5 else 'r' for score in nx.get_edge_attributes(G, 'score').values()]
nx.draw_networkx_edges(G, pos, edge_color=edge_color, width=1.5, alpha=0.6)
# 绘制节点标签
labels = nx.get_node_attributes(G, 'label')
nx.draw_networkx_labels(G, pos, labels=labels, font_size=8)
plt.title('Link Prediction Visualization')
plt.show()
这里为了让节点彼此区分开来,给不同的节点随机分配了颜色。
若之前步骤正确,在这一步可以对于之前的模型进行评估。
对于Accuracy、Precision、Recall、F1 Score
# 模型评估
model.eval() # 切换模型为评估模式,这会影响某些层(如Dropout)
with torch.no_grad():
# 这里的 node_features 为测试集的特征
test_pos_score, test_neg_score = model(graph, negative_graph, node_features)
test_predicted_labels = torch.where(test_pos_score > 0.5, 1, 0).numpy()
# 计算评估指标
test_true_labels = torch.randint(0, 3, (graph.num_nodes(),)) # 替换为实际的测试集标签
accuracy = accuracy_score(test_true_labels.numpy(), test_predicted_labels)
precision = precision_score(test_true_labels.numpy(), test_predicted_labels)
recall = recall_score(test_true_labels.numpy(), test_predicted_labels)
f1 = f1_score(test_true_labels.numpy(), test_predicted_labels)
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
print(f"Test F1 Score: {f1:.4f}")
对于ROC、AUC、AUPR
# 计算 ROC 和 AUC
fpr, tpr, _ = roc_curve(true_labels.numpy(), pos_score.detach().numpy())
roc_auc = roc_auc_score(true_labels.numpy(), pos_score.detach().numpy())
# 绘制 ROC 曲线
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()
# 计算 AUPR
precision, recall, _ = precision_recall_curve(true_labels.numpy(), pos_score.detach().numpy())
aupr = average_precision_score(true_labels.numpy(), pos_score.detach().numpy())
# 绘制 Precision-Recall 曲线
plt.figure(figsize=(8, 6))
plt.step(recall, precision, color='b', alpha=0.2, where='post')
plt.fill_between(recall, precision, step='post', alpha=0.2, color='b')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve (AUPR = {0:.2f})'.format(aupr))
plt.show()
由于DGL对于资源的需求实在太大了,本地计算机的内存和算力都不能满足要求,故本实验使用该种方法似乎并不能得到满意的结果。
DGL是一个很好用的工具,但是确实不太适合本地计算机来运行。
以上的代码与推演,照理应该是正确的,在算力和内存等资源充足的地方应该能发挥效果。
read_data(file_path)
: 读取文件中的数据,并返回每一行的列表。build_graph_data(gene_list, link_list, feature1, feature2)
: 构建图数据,包括节点特征 (feature1
和 feature2
),边的索引 (edge_index
) 和边的属性 (edge_attr
)。同时,构建了一个基因字典 gene_dict
用于将基因名称映射到索引。定义读取文件的函数如下
def read_data(file_path):
with open(file_path, 'r') as f:
data = f.read().splitlines()
return data
其中,对于图数据的构建如下:
# 构建图数据
def build_graph_data(gene_list, link_list, feature1, feature2):
edge_index = []
edge_attr = []
x1 = []
x2 = []
gene_dict = {gene: idx for idx, gene in enumerate(gene_list)}
for link in link_list:
gene1, gene2, confidence = link.split('\t')
if gene1 in gene_dict and gene2 in gene_dict:
edge_index.append([gene_dict[gene1], gene_dict[gene2]])
edge_attr.append(float(confidence))
edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
edge_attr = torch.tensor(edge_attr, dtype=torch.float).view(-1, 1)
for gene in gene_list:
if gene in gene_dict:
x1.append(feature1[gene_dict[gene]])
x2.append(feature2[gene_dict[gene]])
x1 = torch.tensor(x1, dtype=torch.float)
x2 = torch.tensor(x2, dtype=torch.float)
data = Data(x1=x1, x2=x2, edge_index=edge_index, edge_attr=edge_attr)
return data
读取基因列表 (GeneList.txt
)、链接列表 (Positive_LinkSL.txt
) 以及两个特征文件 (feature1_go.txt
和 feature2_ppi.txt
)。然后划分数据集为训练集和测试集,并构建相应的图数据。在主函数中调用的读取代码如下:
# 读取数据
gene_list = read_data('GeneList.txt')
link_list = read_data('Positive_LinkSL.txt')
feature1 = np.loadtxt('feature1_go.txt')
feature2 = np.loadtxt('feature2_ppi.txt')
# 划分数据集和测试集
train_gene_list, test_gene_list = train_test_split(gene_list, test_size=0.2, random_state=42)
# 构建训练集和测试集的图数据
train_data = build_graph_data(train_gene_list, link_list, feature1, feature2)
test_data = build_graph_data(test_gene_list, link_list, feature1, feature2)
GATModel(nn.Module)
: 定义了一个简单的 GAT 模型,使用了 GATConv
层。# GAT 模型定义
class GATModel(nn.Module):
def __init__(self, in_channels, out_channels, heads):
super(GATModel, self).__init__()
self.conv1 = GATConv(in_channels, out_channels, heads=heads)
def forward(self, x, edge_index, edge_attr):
x = self.conv1(x, edge_index, edge_attr)
return x
train(model, data, optimizer, criterion, epochs)
: 训练 GAT 模型。在每个 epoch 中,计算模型的损失值,并将其记录在 losses
列表中。训练完成后,通过 Matplotlib 绘制损失曲线图。# 训练模型
def train(model, data, optimizer, criterion, epochs):
model.train()
losses = [] # 用于记录每个 epoch 的损失值
for epoch in range(epochs):
optimizer.zero_grad()
out = model(data.x1, data.edge_index, data.edge_attr)
loss = criterion(out, data.x2)
loss.backward()
optimizer.step()
losses.append(loss.item()) # 记录当前 epoch 的损失值
print(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}')
# 绘制损失曲线图
plt.plot(losses)
plt.title('Training Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()
evaluate(y_true, y_pred)
: 使用 sklearn 库中的指标评估链接预测结果,包括准确率、精确度、召回率、F1 分数、ROC AUC 和平均精度 (AUPR)。def evaluate(y_true, y_pred):
y_true = (y_true > 0.3).int().cpu().numpy()
y_pred = (y_pred > 0.3).int().cpu().numpy()
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='micro')
recall = recall_score(y_true, y_pred, average='micro')
f1 = f1_score(y_true, y_pred, average='micro')
roc_auc = roc_auc_score(y_true, y_pred)
aupr = average_precision_score(y_true, y_pred)
return accuracy, precision, recall, f1, roc_auc, aupr
train
函数进行模型训练。# 创建并训练 GAT 模型
model = GATModel(in_channels=128, out_channels=128, heads=1)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
train(model, train_data, optimizer, criterion, epochs=200)
指定训练次数为100次,学习率调为0.001。
evaluate
函数评估预测结果。# 进行链接预测
pred_scores = model(test_data.x1, test_data.edge_index, test_data.edge_attr)
# 评估链接预测结果
accuracy, precision, recall, f1, roc_auc, aupr = evaluate(test_data.x2, pred_scores)
print(f'Accuracy: {accuracy} \nPrecision: {precision} \nRecall: {recall} \nF1 Score: {f1}')
print(f'ROC AUC: {roc_auc} \nAUPR: {aupr}')
import networkx as nx
import torch
from torch_geometric.data import Data
# 将 PyTorch Geometric 图数据转换为 NetworkX 图
G = nx.Graph()
G.add_nodes_from(range(test_data.num_nodes))
G.add_edges_from(test_data.edge_index.t().tolist())
# 使用 NetworkX 绘制图
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True, font_weight='bold', node_color='lightblue', node_size=1000, font_size=8, edge_color='gray')
plt.show()
使用上述模型进行运行,损失曲线如下。
部分损失值如下:
Epoch 1/200, Loss: 0.8566558361053467
Epoch 2/200, Loss: 0.7528260350227356
Epoch 3/200, Loss: 0.6675369143486023
Epoch 4/200, Loss: 0.5916386842727661
Epoch 5/200, Loss: 0.5249260067939758
Epoch 6/200, Loss: 0.46694767475128174
Epoch 7/200, Loss: 0.41712379455566406
Epoch 8/200, Loss: 0.37475287914276123
Epoch 9/200, Loss: 0.3390277028083801
Epoch 10/200, Loss: 0.309112012386322
Epoch 11/200, Loss: 0.284216046333313
Epoch 12/200, Loss: 0.2636083960533142
Epoch 13/200, Loss: 0.2465600073337555
Epoch 14/200, Loss: 0.23244094848632812
……
Epoch 195/200, Loss: 0.10945269465446472
Epoch 196/200, Loss: 0.10929632186889648
Epoch 197/200, Loss: 0.10914068669080734
Epoch 198/200, Loss: 0.1089857891201973
Epoch 199/200, Loss: 0.10883160680532455
Epoch 200/200, Loss: 0.10867814719676971
进行200次之后,大概在0.1左右。
模型评估结果如下
Accuracy: 0.4549019607843137
Precision: 0.8565955895528382
Recall: 0.9963490534849291
F1 Score: 0.9212020532584679
ROC AUC: 0.5012495279165683
AUPR: 0.8531546660454162
解释如下:
构建基因链接预测图如下
(选取预测分数大于指定阈值的链接作为预测有关的链接)
这是整体的趋势图,对于其中的局部放大可以看到目标基因之间的联系。
对于中间部分,与周围联系较多的节点,可以通过节点编号查到基因名
1027 CLDN23
116 ADRB1
740 CBR3
617 C1QBP
下面是一些其它的局部结构
使用多通道在刚刚的基础上对模型和训练做修改
这里我们使用的多通道卷积网络。所以对于模型的定义需要修改,把原本的单通道扩展成多个,并在适当的地方进行合并。
# Multi-Channel Graph Convolutional Network 模型定义
class MultiChannelGCN(nn.Module):
def __init__(self, in_channels, out_channels):
super(MultiChannelGCN, self).__init__()
self.conv1 = GCNConv(in_channels, out_channels)
self.conv2 = GCNConv(in_channels, out_channels)
def forward(self, x1, x2, edge_index, edge_attr):
x1 = self.conv1(x1, edge_index, edge_attr)
x2 = self.conv2(x2, edge_index, edge_attr)
return x1, x2
除了要在模型定义的地方进行修改,在训练函数以及调用函数也要进行修改。
修改训练函数:
# 训练模型
def train(model, data, optimizer, criterion, epochs):
model.train()
losses = [] # 用于记录每个 epoch 的损失值
for epoch in range(epochs):
optimizer.zero_grad()
out1, out2 = model(data.x1, data.x2, data.edge_index, data.edge_attr)
loss1 = criterion(out1, data.x1)
loss2 = criterion(out2, data.x2)
loss = loss1 + loss2
loss.backward()
optimizer.step()
losses.append(loss.item()) # 记录当前 epoch 的损失值
print(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}')
# 绘制损失曲线图
plt.plot(losses)
plt.title('Training Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()
修改调用部分:
# 进行链接预测
pred_scores1, pred_scores2 = model(test_data.x1, test_data.x2, test_data.edge_index, test_data.edge_attr)
pred_scores = (pred_scores1 + pred_scores2) / 2 # 取两个通道的平均值
这样就将其转化为了一个使用双通道的图卷积网络模型。
损失率如下:
Epoch 1/200, Loss: 1.9401469230651855
Epoch 2/200, Loss: 1.682145357131958
Epoch 3/200, Loss: 1.4546871185302734
Epoch 4/200, Loss: 1.2563203573226929
Epoch 5/200, Loss: 1.084963083267212
Epoch 6/200, Loss: 0.9381833076477051
Epoch 7/200, Loss: 0.8134356737136841
Epoch 8/200, Loss: 0.708167552947998
Epoch 9/200, Loss: 0.6199674606323242
Epoch 10/200, Loss: 0.5466182827949524
Epoch 11/200, Loss: 0.48613178730010986
Epoch 12/200, Loss: 0.4367343485355377
Epoch 13/200, Loss: 0.39682072401046753
Epoch 14/200, Loss: 0.36491310596466064
……
Epoch 195/200, Loss: 0.15746958553791046
Epoch 196/200, Loss: 0.1571885496377945
Epoch 197/200, Loss: 0.15690799057483673
Epoch 198/200, Loss: 0.15662789344787598
Epoch 199/200, Loss: 0.15634828805923462
Epoch 200/200, Loss: 0.15606917440891266
指标评估如下:
Accuracy: 0.5427450980392157
Precision: 0.8652827615217433
Recall: 0.9757082692501186
F1 Score: 0.9171837684645032
ROC AUC: 0.5324953459502417
AUPR: 0.8606581811658711
整体展示如下:
部分局部展示如下:
使用双通道后,由于对于原来的特征彼此之间区分信息的保留变多了,所以链接的预测正确率有明显的上升。所以双通道以及多通道的图神经网络还是有好处的。
继续修改刚刚的代码,使用数组替换模型中的x1和x2,达到n通道的效果,如下:
# Multi-Channel Graph Convolutional Network 模型定义
class MultiChannelGCN(nn.Module):
def __init__(self, in_channels, out_channels, num_channels):
super(MultiChannelGCN, self).__init__()
self.channels = nn.ModuleList([GCNConv(in_channels, out_channels) for _ in range(num_channels)])
def forward(self, *inputs):
output_channels = [channel(x, inputs[-2], inputs[-1]) for channel, x in zip(self.channels, inputs[:-2])]
return output_channels
详细代码附在后面,修改代码中的num_channels =
,调整为想要的通道数即可。
发现将通道从1上调至2后,正确率上升效果明显,继续上调后,正确率上升效果不明显。
这是通道数目为10时的结果:
Accuracy: 0.5435294117647059
Precision: 0.8650597497897928
Recall: 0.976010119158845
F1 Score: 0.9171917738830919
ROC AUC: 0.5317604380292384
AUPR: 0.8605590887908858
上升不显著,基本还是在0.54,其余指标基本都略微有变化,但变化不是很多。故认为2通道基本已经能满足要求。
由于老师将收作业的时间延后了,我确实有更多的时间来进行探究,感觉对于图神经网络有了一个更为直观的感悟。但是我还是没有从一个更底层的角度去深究其原理,仅仅停留在代码层面,还是不够的,还有很多需要学习的地方。
本学期在数据挖掘上确实学习到了很多。
import dgl
import torch
import numpy as np
# 读取基因列表
with open('GeneList.txt', 'r') as f:
gene_list = [line.strip() for line in f]
# 构建基因到索引的映射
gene_dict = {gene: idx for idx, gene in enumerate(gene_list)}
# 读取基因关系和置信分数
with open('Positive_LinkSL.txt', 'r') as f:
edges = [line.strip().split() for line in f]
# 提取基因关系的源节点、目标节点和置信分数
src_nodes = [gene_dict[edge[0]] for edge in edges] + [gene_dict[edge[1]] for edge in edges]
dst_nodes = [gene_dict[edge[1]] for edge in edges] + [gene_dict[edge[0]] for edge in edges]
confidence_scores = [float(edge[2]) for edge in edges] + [float(edge[2]) for edge in edges]
# 读取特征
with open('feature1_go.txt', 'r') as file:
feature1_go = np.array([list(map(float, line.split())) for line in file])
with open('feature2_ppi.txt', 'r') as file:
feature2_ppi = np.array([list(map(float, line.split())) for line in file])
# 构建图
edges = torch.tensor(src_nodes),torch.tensor(dst_nodes)
graph = dgl.graph(edges)
graph.edata['confidence'] = torch.tensor(confidence_scores,dtype=torch.float32)
graph.ndata['feature1_go'] = torch.tensor(feature1_go,dtype=torch.float32)
graph.ndata['feature2_ppi'] = torch.tensor(feature2_ppi,dtype=torch.float32)
"""print(graph)
# 输出边的权值值
edge_weights = graph.edata['confidence'].squeeze().numpy()
print("Edge Weights:")
print(edge_weights)
# 输出节点特征 'feature1_go'
feature1_go_values = graph.ndata['feature1_go'].squeeze().numpy()
print("Node Feature 'feature1_go':")
print(feature1_go_values)
# 输出节点特征 'feature2_ppi'
feature2_ppi_values = graph.ndata['feature2_ppi'].squeeze().numpy()
print("Node Feature 'feature2_ppi':")
print(feature2_ppi_values)"""
print(graph)
# 构建一个2层的GNN模型
import dgl.nn as dglnn
import torch.nn as nn
import torch.nn.functional as F
class SAGE(nn.Module):
def __init__(self, in_feats, hid_feats, out_feats):
super().__init__()
# 实例化SAGEConve,in_feats是输入特征的维度,out_feats是输出特征的维度,aggregator_type是聚合函数的类型
self.conv1 = dglnn.SAGEConv(
in_feats=in_feats, out_feats=hid_feats, aggregator_type='mean')
self.conv2 = dglnn.SAGEConv(
in_feats=hid_feats, out_feats=out_feats, aggregator_type='mean')
def forward(self, graph, inputs):
# 输入是节点的特征
h = self.conv1(graph, inputs)
h = F.relu(h)
h = self.conv2(graph, h)
return h
def construct_negative_graph(graph, k):
src, dst = graph.edges()
neg_src = src.repeat_interleave(k)
neg_dst = torch.randint(0, graph.num_nodes(), (len(src) * k,))
return dgl.graph((neg_src, neg_dst), num_nodes=graph.num_nodes())
import dgl.function as fn
class DotProductPredictor(nn.Module):
def forward(self, graph, h):
# h是从5.1节的GNN模型中计算出的节点表示
with graph.local_scope():
graph.ndata['h'] = h
graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
return graph.edata['score']
def compute_loss(pos_score, neg_score):
# 间隔损失
n_edges = pos_score.shape[0]
return (1 - pos_score.unsqueeze(1) + neg_score.view(n_edges, -1)).clamp(min=0).mean()
class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.sage = SAGE(in_features, hidden_features, out_features)
self.pred = DotProductPredictor()
def forward(self, g, neg_g, x):
h = self.sage(g, x)
#return self.pred(g, h), self.pred(neg_g, h)
pos_score = self.pred(g, h)
neg_score = self.pred(neg_g, h)
return pos_score, neg_score
node_features = graph.ndata['feature1_go']
n_features = node_features.shape[1]
k = 1
model = Model(n_features, 10, 5)
opt = torch.optim.Adam(model.parameters())
for epoch in range(1):
negative_graph = construct_negative_graph(graph, k)
pos_score, neg_score = model(graph, negative_graph, node_features)
loss = compute_loss(pos_score, neg_score)
opt.zero_grad()
loss.backward()
opt.step()
print(f'Epoch {epoch + 1}, Loss: {loss.item()}')
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.data import Data
from torch_geometric.nn import GATConv
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, average_precision_score, roc_curve, auc
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
# 读取数据
def read_data(file_path):
with open(file_path, 'r') as f:
data = f.read().splitlines()
return data
# 构建图数据
def build_graph_data(gene_list, link_list, feature1, feature2):
edge_index = []
edge_attr = []
x1 = []
x2 = []
gene_dict = {gene: idx for idx, gene in enumerate(gene_list)}
for link in link_list:
gene1, gene2, confidence = link.split('\t')
if gene1 in gene_dict and gene2 in gene_dict:
edge_index.append([gene_dict[gene1], gene_dict[gene2]])
edge_attr.append(float(confidence))
edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
edge_attr = torch.tensor(edge_attr, dtype=torch.float).view(-1, 1)
for gene in gene_list:
if gene in gene_dict:
x1.append(feature1[gene_dict[gene]])
x2.append(feature2[gene_dict[gene]])
x1 = torch.tensor(x1, dtype=torch.float)
x2 = torch.tensor(x2, dtype=torch.float)
data = Data(x1=x1, x2=x2, edge_index=edge_index, edge_attr=edge_attr)
return data
# GAT 模型定义
class GATModel(nn.Module):
def __init__(self, in_channels, out_channels, heads):
super(GATModel, self).__init__()
self.conv1 = GATConv(in_channels, out_channels, heads=heads)
def forward(self, x, edge_index, edge_attr):
x = self.conv1(x, edge_index, edge_attr)
return x
# 训练模型
def train(model, data, optimizer, criterion, epochs):
model.train()
losses = [] # 用于记录每个 epoch 的损失值
for epoch in range(epochs):
optimizer.zero_grad()
out = model(data.x1, data.edge_index, data.edge_attr)
loss = criterion(out, data.x2)
loss.backward()
optimizer.step()
losses.append(loss.item()) # 记录当前 epoch 的损失值
print(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}')
# 绘制损失曲线图
plt.plot(losses)
plt.title('Training Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()
# 评估链接预测结果
def evaluate(y_true, y_pred):
y_true = (y_true > 0.5).int().cpu().numpy()
y_pred = (y_pred > 0.5).int().cpu().numpy()
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='micro')
recall = recall_score(y_true, y_pred, average='micro')
f1 = f1_score(y_true, y_pred, average='micro')
roc_auc = roc_auc_score(y_true, y_pred)
aupr = average_precision_score(y_true, y_pred)
return accuracy, precision, recall, f1, roc_auc, aupr
# 读取数据
gene_list = read_data('GeneList.txt')
link_list = read_data('Positive_LinkSL.txt')
feature1 = np.loadtxt('feature1_go.txt')
feature2 = np.loadtxt('feature2_ppi.txt')
# 划分数据集和测试集
train_gene_list, test_gene_list = train_test_split(gene_list, test_size=0.2, random_state=42)
# 构建训练集和测试集的图数据
train_data = build_graph_data(train_gene_list, link_list, feature1, feature2)
test_data = build_graph_data(test_gene_list, link_list, feature1, feature2)
# 创建并训练 GAT 模型
model = GATModel(in_channels=128, out_channels=128, heads=1)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
train(model, train_data, optimizer, criterion, epochs=200)
# 进行链接预测
pred_scores = model(test_data.x1, test_data.edge_index, test_data.edge_attr)
# 评估链接预测结果
accuracy, precision, recall, f1, roc_auc, aupr = evaluate(test_data.x2, pred_scores)
print(f'Accuracy: {accuracy} \nPrecision: {precision} \nRecall: {recall} \nF1 Score: {f1}')
print(f'ROC AUC: {roc_auc} \nAUPR: {aupr}')
import networkx as nx
import torch
from torch_geometric.data import Data
# 将 PyTorch Geometric 图数据转换为 NetworkX 图
G = nx.Graph()
G.add_nodes_from(range(test_data.num_nodes))
G.add_edges_from(test_data.edge_index.t().tolist())
# 使用 NetworkX 绘制图
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True, font_weight='bold', node_color='lightblue', node_size=1000, font_size=8, edge_color='gray')
plt.show()
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, average_precision_score, roc_curve, auc
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
# 读取数据
def read_data(file_path):
with open(file_path, 'r') as f:
data = f.read().splitlines()
return data
# 构建图数据
def build_graph_data(gene_list, link_list, feature1, feature2):
edge_index = []
edge_attr = []
x1 = []
x2 = []
gene_dict = {gene: idx for idx, gene in enumerate(gene_list)}
for link in link_list:
gene1, gene2, confidence = link.split('\t')
if gene1 in gene_dict and gene2 in gene_dict:
edge_index.append([gene_dict[gene1], gene_dict[gene2]])
edge_attr.append(float(confidence))
edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
edge_attr = torch.tensor(edge_attr, dtype=torch.float).view(-1, 1)
for gene in gene_list:
if gene in gene_dict:
x1.append(feature1[gene_dict[gene]])
x2.append(feature2[gene_dict[gene]])
x1 = torch.tensor(x1, dtype=torch.float)
x2 = torch.tensor(x2, dtype=torch.float)
data = Data(x1=x1, x2=x2, edge_index=edge_index, edge_attr=edge_attr)
return data
# Multi-Channel Graph Convolutional Network 模型定义
class MultiChannelGCN(nn.Module):
def __init__(self, in_channels, out_channels):
super(MultiChannelGCN, self).__init__()
self.conv1 = GCNConv(in_channels, out_channels)
self.conv2 = GCNConv(in_channels, out_channels)
def forward(self, x1, x2, edge_index, edge_attr):
x1 = self.conv1(x1, edge_index, edge_attr)
x2 = self.conv2(x2, edge_index, edge_attr)
return x1, x2
# 训练模型
def train(model, data, optimizer, criterion, epochs):
model.train()
losses = [] # 用于记录每个 epoch 的损失值
for epoch in range(epochs):
optimizer.zero_grad()
out1, out2 = model(data.x1, data.x2, data.edge_index, data.edge_attr)
loss1 = criterion(out1, data.x1)
loss2 = criterion(out2, data.x2)
loss = loss1 + loss2
loss.backward()
optimizer.step()
losses.append(loss.item()) # 记录当前 epoch 的损失值
print(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}')
# 绘制损失曲线图
plt.plot(losses)
plt.title('Training Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()
# 评估链接预测结果
def evaluate(y_true, y_pred):
y_true = (y_true > 0.3).int().cpu().numpy()
y_pred = (y_pred > 0.3).int().cpu().numpy()
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='micro')
recall = recall_score(y_true, y_pred, average='micro')
f1 = f1_score(y_true, y_pred, average='micro')
roc_auc = roc_auc_score(y_true, y_pred)
aupr = average_precision_score(y_true, y_pred)
return accuracy, precision, recall, f1, roc_auc, aupr
# 读取数据
gene_list = read_data('GeneList.txt')
link_list = read_data('Positive_LinkSL.txt')
feature1 = np.loadtxt('feature1_go.txt')
feature2 = np.loadtxt('feature2_ppi.txt')
# 划分数据集和测试集
train_gene_list, test_gene_list = train_test_split(gene_list, test_size=0.2, random_state=42)
# 构建训练集和测试集的图数据
train_data = build_graph_data(train_gene_list, link_list, feature1, feature2)
test_data = build_graph_data(test_gene_list, link_list, feature1, feature2)
# 创建并训练 Multi-Channel GCN 模型
model = MultiChannelGCN(in_channels=128, out_channels=128)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
train(model, train_data, optimizer, criterion, epochs=200)
# 进行链接预测
pred_scores1, pred_scores2 = model(test_data.x1, test_data.x2, test_data.edge_index, test_data.edge_attr)
pred_scores = (pred_scores1 + pred_scores2) / 2 # 取两个通道的平均值
# 评估链接预测结果
accuracy, precision, recall, f1, roc_auc, aupr = evaluate(test_data.x2, pred_scores)
print(f'Accuracy: {accuracy} \nPrecision: {precision} \nRecall: {recall} \nF1 Score: {f1}')
print(f'ROC AUC: {roc_auc} \nAUPR: {aupr}')
import networkx as nx
import torch
from torch_geometric.data import Data
# 将 PyTorch Geometric 图数据转换为 NetworkX 图
G = nx.Graph()
G.add_nodes_from(range(test_data.num_nodes))
G.add_edges_from(test_data.edge_index.t().tolist())
# 使用 NetworkX 绘制图
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True, font_weight='bold', node_color='lightblue', node_size=1000, font_size=8, edge_color='gray')
plt.show()
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, \
average_precision_score
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
# 读取数据
def read_data(file_path):
with open(file_path, 'r') as f:
data = f.read().splitlines()
return data
# 构建图数据
def build_graph_data(gene_list, link_list, feature1, feature2):
edge_index = []
edge_attr = []
x1 = []
x2 = []
gene_dict = {gene: idx for idx, gene in enumerate(gene_list)}
for link in link_list:
gene1, gene2, confidence = link.split('\t')
if gene1 in gene_dict and gene2 in gene_dict:
edge_index.append([gene_dict[gene1], gene_dict[gene2]])
edge_attr.append(float(confidence))
edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
edge_attr = torch.tensor(edge_attr, dtype=torch.float).view(-1, 1)
for gene in gene_list:
if gene in gene_dict:
x1.append(feature1[gene_dict[gene]])
x2.append(feature2[gene_dict[gene]])
x1 = torch.tensor(x1, dtype=torch.float)
x2 = torch.tensor(x2, dtype=torch.float)
data = Data(x1=x1, x2=x2, edge_index=edge_index, edge_attr=edge_attr)
return data
# Multi-Channel Graph Convolutional Network 模型定义
class MultiChannelGCN(nn.Module):
def __init__(self, in_channels, out_channels, num_channels):
super(MultiChannelGCN, self).__init__()
self.channels = nn.ModuleList([GCNConv(in_channels, out_channels) for _ in range(num_channels)])
def forward(self, *inputs):
output_channels = [channel(x, inputs[-2], inputs[-1]) for channel, x in zip(self.channels, inputs[:-2])]
return output_channels
# 训练模型
def train(model, data, optimizer, criterion, epochs):
model.train()
losses = [] # 用于记录每个 epoch 的损失值
for epoch in range(epochs):
optimizer.zero_grad()
output_channels = model(data.x1, data.x2, data.edge_index, data.edge_attr)
# Assuming that data.x1 and data.x2 are the target values for each channel
loss = sum(criterion(output, data.x1 if i == 0 else data.x2) for i, output in enumerate(output_channels))
loss.backward()
optimizer.step()
losses.append(loss.item()) # 记录当前 epoch 的损失值
print(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}')
# 绘制损失曲线图
plt.plot(losses)
plt.title('Training Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()
# 评估链接预测结果
def evaluate(y_true, y_pred):
y_true = (y_true > 0.3).int().cpu().numpy()
y_pred = (y_pred > 0.3).int().cpu().numpy()
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='micro')
recall = recall_score(y_true, y_pred, average='micro')
f1 = f1_score(y_true, y_pred, average='micro')
roc_auc = roc_auc_score(y_true, y_pred)
aupr = average_precision_score(y_true, y_pred)
return accuracy, precision, recall, f1, roc_auc, aupr
# 读取数据
gene_list = read_data('GeneList.txt')
link_list = read_data('Positive_LinkSL.txt')
feature1 = np.loadtxt('feature1_go.txt')
feature2 = np.loadtxt('feature2_ppi.txt')
# 划分数据集和测试集
train_gene_list, test_gene_list = train_test_split(gene_list, test_size=0.2, random_state=42)
# 构建训练集和测试集的图数据
train_data = build_graph_data(train_gene_list, link_list, feature1, feature2)
test_data = build_graph_data(test_gene_list, link_list, feature1, feature2)
# 创建并训练 Multi-Channel GCN 模型
num_channels = 150 # Set the number of channels (adjust as needed)
model = MultiChannelGCN(in_channels=128, out_channels=128, num_channels=num_channels)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
train(model, train_data, optimizer, criterion, epochs=200)
# 进行链接预测
pred_scores_list = model(test_data.x1, test_data.x2, test_data.edge_index, test_data.edge_attr)
pred_scores = torch.stack(pred_scores_list).mean(dim=0) # Take the mean across channels
# 评估链接预测结果
accuracy, precision, recall, f1, roc_auc, aupr = evaluate(test_data.x2, pred_scores)
print(f'Accuracy: {accuracy} \nPrecision: {precision} \nRecall: {recall} \nF1 Score: {f1}')
print(f'ROC AUC: {roc_auc} \nAUPR: {aupr}')
import networkx as nx
import torch
from torch_geometric.data import Data
# 将 PyTorch Geometric 图数据转换为 NetworkX 图
G = nx.Graph()
G.add_nodes_from(range(test_data.num_nodes))
G.add_edges_from(test_data.edge_index.t().tolist())
# 使用 NetworkX 绘制图
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True, font_weight='bold', node_color='lightblue', node_size=1000, font_size=8,
edge_color='gray')
plt.show()
使用图神经网络进行链接预测
https://docs.dgl.ai/tutorials/blitz/4_link_predict.html
https://docs.dgl.ai/en/0.8.x/guide_cn/training-link.html
https://github.com/Giantjc/LinkPrediction
https://zhuanlan.zhihu.com/p/599510610?utm_id=0
https://docs.dgl.ai/en/latest/guide_cn/training-node.html