技术要学会分享、交流,不建议闭门造车。一个人可以走的很快、一堆人可以走的更远。
本文完整代码、相关资料、技术交流&答疑,均可加我们的交流群获取,群友已超过2000人,添加时最好的备注方式为:来源+兴趣方向,方便找到志同道合的朋友。
?方式①、微信搜索公众号:Python学习与数据挖掘,后台回复:加群
方式②、添加微信号:dkl88194,备注:来自CSDN + 技术交流
首先导入代码的基本函数和数据类型,在exp文件下exp_informer import中的类Exp_Informer中,定义了模型参数、get_data、model_optim、train、test、eval等函数
from exp.exp_informer import Exp_Informer
进一步解析模型需要的参数,参数的含义如下表格:
参数名称 | ?? ?参数类型 | ?? ?参数讲解 |
model?? | str?? | 这是一个用于实验的参数设置,其中包含了三个选项: informer, informerstack, informerlight。根据实验需求,可以选择其中之一来进行实验,默认是使用informer模型。 |
data | str?? ? | 数据,这个并不是你理解的你的数据集文件,而是你想要用官方定义的方法还是你自己的数据集进行定义数据加载器,如果是自己的数据集就输入custom |
root_path | str | 这个才是你文件的路径,不要到具体的文件,到目录级别即可。 |
data_path | str? | 这个填写你文件的名称。 |
features? | str?? ? | 这个是特征有三个选项M,MS,S。分别是多元预测多元,多元预测单元,单元预测单元。 |
target? | str?? ? | 这个是你数据集中你想要预测那一列数据,假设我预测的是油温OT列就输入OT即可。 |
freq? | str? | 时间的间隔,你数据集每一条数据之间的时间间隔。 |
checkpoints | str | 训练出来的模型保存路径 |
seq_len | int | 用过去的多少条数据来预测未来的数据 |
label_len | int? | 可以裂解为更高的权重占比的部分要小于seq_len |
pred_len | int?? | 预测未来多少个时间点的数据 ? |
enc_in?? ? | int?? | 你数据有多少列,要减去时间那一列,这里我是输入8列数据但是有一列是时间所以就填写7 |
dec_in | int | 同上 |
c_out | int? | 这里有一些不同如果你的features填写的是M那么和上面就一样,如果填写的MS那么这里要输入1因为你的输出只有一列数据。 |
d_model | int | 用于设置模型的维度,默认值为512。可以根据需要调整该参数的数值来改变模型的维度 |
n_heads?? ? | int?? | 用于设置模型中的注意力头数。默认值为8,表示模型会使用8个注意力头,我建议和的输入数据的总体保持一致,列如我输入的是8列数据不用刨去时间的那一列就输入8即可。 |
e_layers | int?? ? ? | 用于设置编码器的层数 |
d_layers? ? | int?? | 用于设置解码器的层数 |
s_layers | str?? ? | 用于设置堆叠编码器的层数 |
d_ff? ? | int | 模型中全连接网络(FCN)的维度,默认值为2048 |
factor | int?? ? | ProbSparse自注意力中的因子,默认值为5 ? |
padding | int | 填充类型,默认值为0,这个应该大家都理解,如果不够数据就填写0. ? |
distil ? | bool? | 是否在编码器中使用蒸馏操作。使用--distil参数表示不使用蒸馏操作,默认为True也是我们的论文中比较重要的一个改进。 |
dropout | float | 这个应该都理解不说了,丢弃的概率,防止过拟合的。 |
?attn | str ? | 编码器中使用的注意力类型,默认为"prob"我们论文的主要改进点,提出的注意力机制。 |
embed | str?? | ?时间特征的编码方式,默认为"timeF" |
activation?? | str?? | 激活函数 |
output_attention?? ? ? | bool?? ? | 是否在编码器中输出注意力,默认为False |
do_predict?? ? | bool | 是否进行预测,这里模型中没有给添加算是一个小bug我们需要填写一个default=True在其中。 |
mix ? | bool? | 在生成式解码器中是否使用混合注意力,默认为True |
cols? ? | str | 从数据文件中选择特定的列作为输入特征,应该用不到 |
num_workers?? ? ? | int | 线程windows大家最好设置成0否则会报线程错误,linux系统随便设置。 |
itr | int? ? | 实验运行的次数,默认为2,我们这里改成数字1. |
train_epochs?? ? | int | 训练的次数 |
batch_size?? ? | int?? | 一次往模型力输入多少条数据 |
patience ? | int? | 早停机制,如果损失多少个epochs没有改变就停止训练。 |
learning_rate?? ? ? | float | 学习率。 |
des? ? | str | 实验描述,默认为"test" |
loss? ? | str | 损失函数,默认为"mse" |
?lradj | str | 学习率的调整方式,默认为"type1" |
use_amp?? ? ? | bool?? | ?混合精度训练, |
inverse? ? | bool | 我们的数据输入之前会被进行归一化处理,这里默认为False,算是一个小bug,因为输出的数据模型没有给我们转化成我们的数据,我们要改成True。 |
use_gpu?? ? ? | bool | 是否使用GPU训练,根据自身来选择 |
gpu? ? | int?? | GPU的编号 |
use_multi_gpu?? ? | bool?? ? | 是否使用多个GPU训练。 |
devices? | str?? ? | GPU的编号 |
接下来判断是否使用GPU设备进行训练
args.use_gpu = True if torch.cuda.is_available() and args.use_gpu else False
if args.use_gpu and args.use_multi_gpu:
args.devices = args.devices.replace(' ','')
device_ids = args.devices.split(',')
args.device_ids = [int(id_) for id_ in device_ids]
args.gpu = args.device_ids[0]
然后解析数据集的信息,字典data_parser中包含了不同数据集的信息,键值为数据集名称('ETTh1’等),对应一个包含.csv数据文件名。接着遍历字典data_parser,将数据信息存储在data_info变量中,并将相关信息存储在args中。最后将将args.s_layers中的字符串转换为整数列表。
data_parser = {
'ETTh1':{'data':'ETTh1.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
'ETTh2':{'data':'ETTh2.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
'ETTm1':{'data':'ETTm1.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
'ETTm2':{'data':'ETTm2.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
'WTH':{'data':'WTH.csv','T':'WetBulbCelsius','M':[12,12,12],'S':[1,1,1],'MS':[12,12,1]},
'ECL':{'data':'ECL.csv','T':'MT_320','M':[321,321,321],'S':[1,1,1],'MS':[321,321,1]},
'Solar':{'data':'solar_AL.csv','T':'POWER_136','M':[137,137,137],'S':[1,1,1],'MS':[137,137,1]},
}
if args.data in data_parser.keys():
data_info = data_parser[args.data]
args.data_path = data_info['data']
args.target = data_info['T']
args.enc_in, args.dec_in, args.c_out = data_info[args.features]
args.s_layers = [int(s_l) for s_l in args.s_layers.replace(' ','').split(',')]
args.detail_freq = args.freq
args.freq = args.freq[-1:]
print('Args in experiment:')
print(args)
最后声明Informer模型,开始训练、验证和预测,最后清楚GPU缓存。
Exp = Exp_Informer
for ii in range(args.itr):
setting = '{}_{}_ft{}_sl{}_ll{}_pl{}_dm{}_nh{}_el{}_dl{}_df{}_at{}_fc{}_eb{}_dt{}_mx{}_{}_{}'.format(args.model, args.data, args.features,
args.seq_len, args.label_len, args.pred_len,
args.d_model, args.n_heads, args.e_layers, args.d_layers, args.d_ff, args.attn, args.factor,
args.embed, args.distil, args.mix, args.des, ii)
exp = Exp(args)
print('>>>>>>>start training : {}>>>>>>>>>>>>>>>>>>>>>>>>>>'.format(setting))
exp.train(setting)
print('>>>>>>>testing : {}<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<'.format(setting))
exp.test(setting)
if args.do_predict:
print('>>>>>>>predicting : {}<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<'.format(setting))
exp.predict(setting, True)
torch.cuda.empty_cache()
train代码主要实现了Informer模型的训练过程,包括数据加载、模型训练、损失计算、学习率调整和提前停止训练过程。
def train(self, setting):
train_data, train_loader = self._get_data(flag = 'train')
vali_data, vali_loader = self._get_data(flag = 'val')
test_data, test_loader = self._get_data(flag = 'test')
path = os.path.join(self.args.checkpoints, setting)
if not os.path.exists(path):
os.makedirs(path)
time_now = time.time()
train_steps = len(train_loader)
early_stopping = EarlyStopping(patience=self.args.patience, verbose=True)
model_optim = self._select_optimizer()
criterion = self._select_criterion()
if self.args.use_amp:
scaler = torch.cuda.amp.GradScaler()
for epoch in range(self.args.train_epochs):
iter_count = 0
train_loss = []
self.model.train()
epoch_time = time.time()
for i, (batch_x,batch_y,batch_x_mark,batch_y_mark) in enumerate(train_loader):
iter_count += 1
model_optim.zero_grad()
pred, true = self._process_one_batch(
train_data, batch_x, batch_y, batch_x_mark, batch_y_mark)
loss = criterion(pred, true)
train_loss.append(loss.item())
if (i+1) % 100==0:
print("\titers: {0}, epoch: {1} | loss: {2:.7f}".format(i + 1, epoch + 1, loss.item()))
speed = (time.time()-time_now)/iter_count
left_time = speed*((self.args.train_epochs - epoch)*train_steps - i)
print('\tspeed: {:.4f}s/iter; left time: {:.4f}s'.format(speed, left_time))
iter_count = 0
time_now = time.time()
if self.args.use_amp:
scaler.scale(loss).backward()
scaler.step(model_optim)
scaler.update()
else:
loss.backward()
model_optim.step()
print("Epoch: {} cost time: {}".format(epoch+1, time.time()-epoch_time))
train_loss = np.average(train_loss)
vali_loss = self.vali(vali_data, vali_loader, criterion)
test_loss = self.vali(test_data, test_loader, criterion)
print("Epoch: {0}, Steps: {1} | Train Loss: {2:.7f} Vali Loss: {3:.7f} Test Loss: {4:.7f}".format(
epoch + 1, train_steps, train_loss, vali_loss, test_loss))
early_stopping(vali_loss, self.model, path)
if early_stopping.early_stop:
print("Early stopping")
break
adjust_learning_rate(model_optim, epoch+1, self.args)
best_model_path = path+'/'+'checkpoint.pth'
self.model.load_state_dict(torch.load(best_model_path))
return self.model
test函数主要实现了Informer模型的测试过程,包括测试数据加载、模型测试、损失计算、保存测试结果的过程。
def test(self, setting):
test_data, test_loader = self._get_data(flag='test')
self.model.eval()
preds = []
trues = []
for i, (batch_x,batch_y,batch_x_mark,batch_y_mark) in enumerate(test_loader):
pred, true = self._process_one_batch(
test_data, batch_x, batch_y, batch_x_mark, batch_y_mark)
preds.append(pred.detach().cpu().numpy())
trues.append(true.detach().cpu().numpy())
preds = np.array(preds)
trues = np.array(trues)
print('test shape:', preds.shape, trues.shape)
preds = preds.reshape(-1, preds.shape[-2], preds.shape[-1])
trues = trues.reshape(-1, trues.shape[-2], trues.shape[-1])
print('test shape:', preds.shape, trues.shape)
folder_path = './results/' + setting +'/'
if not os.path.exists(folder_path):
os.makedirs(folder_path)
mae, mse, rmse, mape, mspe = metric(preds, trues)
print('mse:{}, mae:{}'.format(mse, mae))
np.save(folder_path+'metrics.npy', np.array([mae, mse, rmse, mape, mspe]))
np.save(folder_path+'pred.npy', preds)
np.save(folder_path+'true.npy', trues)
return
predict函数利用Informer模型的最佳训练参数进行预测,包括预测数据加载、模型预测、保存预测结果的过程。
def predict(self, setting, load=False):
pred_data, pred_loader = self._get_data(flag='pred')
if load:
path = os.path.join(self.args.checkpoints, setting)
best_model_path = path+'/'+'checkpoint.pth'
self.model.load_state_dict(torch.load(best_model_path))
self.model.eval()
preds = []
for i, (batch_x,batch_y,batch_x_mark,batch_y_mark) in enumerate(pred_loader):
pred, true = self._process_one_batch(
pred_data, batch_x, batch_y, batch_x_mark, batch_y_mark)
preds.append(pred.detach().cpu().numpy())
preds = np.array(preds)
preds = preds.reshape(-1, preds.shape[-2], preds.shape[-1])
folder_path = './results/' + setting +'/'
if not os.path.exists(folder_path):
os.makedirs(folder_path)
np.save(folder_path+'real_prediction.npy', preds)
return
前面介绍了训练、测试和预测的的流程,那么每一批次是数据是如何利用Informer模型进行训练的呢?首先看一下每一个批次的训练函数**process_one_batch,**包括数据加载、数据处理、模型训练、保存预测结果的过程。
def _process_one_batch(self, dataset_object, batch_x, batch_y, batch_x_mark, batch_y_mark):
batch_x = batch_x.float().to(self.device)
batch_y = batch_y.float()
batch_x_mark = batch_x_mark.float().to(self.device)
batch_y_mark = batch_y_mark.float().to(self.device)
if self.args.padding==0:
dec_inp = torch.zeros([batch_y.shape[0], self.args.pred_len, batch_y.shape[-1]]).float()
elif self.args.padding==1:
dec_inp = torch.ones([batch_y.shape[0], self.args.pred_len, batch_y.shape[-1]]).float()
dec_inp = torch.cat([batch_y[:,:self.args.label_len,:], dec_inp], dim=1).float().to(self.device)
if self.args.use_amp:
with torch.cuda.amp.autocast():
if self.args.output_attention:
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
else:
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
else:
if self.args.output_attention:
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
else:
outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
if self.args.inverse:
outputs = dataset_object.inverse_transform(outputs)
f_dim = -1 if self.args.features=='MS' else 0
batch_y = batch_y[:,-self.args.pred_len:,f_dim:].to(self.device)
return outputs, batch_y
Informer模型主函数进行Encoding -> Attention -> Encoder -> Decoder - > Linear过程,对应论文的流程图。
class Informer(nn.Module):
def __init__(self, enc_in, dec_in, c_out, seq_len, label_len, out_len,
factor=5, d_model=512, n_heads=8, e_layers=3, d_layers=2, d_ff=512,
dropout=0.0, attn='prob', embed='fixed', freq='h', activation='gelu',
output_attention = False, distil=True, mix=True,
device=torch.device('cuda:0')):
super(Informer, self).__init__()
self.pred_len = out_len
self.attn = attn
self.output_attention = output_attention
self.enc_embedding = DataEmbedding(enc_in, d_model, embed, freq, dropout)
self.dec_embedding = DataEmbedding(dec_in, d_model, embed, freq, dropout)
Attn = ProbAttention if attn=='prob' else FullAttention
self.encoder = Encoder(
[
EncoderLayer(
AttentionLayer(Attn(False, factor, attention_dropout=dropout, output_attention=output_attention),
d_model, n_heads, mix=False),
d_model,
d_ff,
dropout=dropout,
activation=activation
) for l in range(e_layers)
],
[
ConvLayer(
d_model
) for l in range(e_layers-1)
] if distil else None,
norm_layer=torch.nn.LayerNorm(d_model)
)
self.decoder = Decoder(
[
DecoderLayer(
AttentionLayer(Attn(True, factor, attention_dropout=dropout, output_attention=False),
d_model, n_heads, mix=mix),
AttentionLayer(FullAttention(False, factor, attention_dropout=dropout, output_attention=False),
d_model, n_heads, mix=False),
d_model,
d_ff,
dropout=dropout,
activation=activation,
)
for l in range(d_layers)
],
norm_layer=torch.nn.LayerNorm(d_model)
)
self.projection = nn.Linear(d_model, c_out, bias=True)
def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):
enc_out = self.enc_embedding(x_enc, x_mark_enc)
enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)
dec_out = self.dec_embedding(x_dec, x_mark_dec)
dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)
dec_out = self.projection(dec_out)
if self.output_attention:
return dec_out[:,-self.pred_len:,:], attns
else:
return dec_out[:,-self.pred_len:,:]
一般Transformer框架的第一层都是embedding,把各种特征信息融合在一起,作者从3个角度进行特征融合,并执行两步DataEmbedding操作,对应论文原理图中有两部分输入 X_en 和 X_de:
self.enc_embedding = DataEmbedding(enc_in, d_model, embed, freq, dropout)
self.dec_embedding = DataEmbedding(dec_in, d_model, embed, freq, dropout)
DataEmbedding操作过程如下:
class DataEmbedding(nn.Module):
def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
super(DataEmbedding, self).__init__()
self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
self.position_embedding = PositionalEmbedding(d_model=d_model)
self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
d_model=d_model, embed_type=embed_type, freq=freq)
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, x_mark):
x = self.value_embedding(x) + self.temporal_embedding(x_mark) + self.position_embedding(x)
return self.dropout(x)
class TokenEmbedding(nn.Module):
def __init__(self, c_in, d_model):
super(TokenEmbedding, self).__init__()
padding = 1 if torch.__version__ >= '1.5.0' else 2
self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,
kernel_size=3, padding=padding, padding_mode='circular', bias=False)
for m in self.modules():
if isinstance(m, nn.Conv1d):
nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu')
def forward(self, x):
x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)
return x
class PositionalEmbedding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEmbedding, self).__init__()
pe = torch.zeros(max_len, d_model).float()
pe.require_grad = False
position = torch.arange(0, max_len).float().unsqueeze(1)
div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
return self.pe[:, :x.size(1)]
class TemporalEmbedding(nn.Module):
def __init__(self, d_model, embed_type='fixed', freq='h'):
super(TemporalEmbedding, self).__init__()
minute_size = 4
hour_size = 24
weekday_size = 7
day_size = 32
month_size = 13
Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embedding
if freq == 't':
self.minute_embed = Embed(minute_size, d_model)
self.hour_embed = Embed(hour_size, d_model)
self.weekday_embed = Embed(weekday_size, d_model)
self.day_embed = Embed(day_size, d_model)
self.month_embed = Embed(month_size, d_model)
主要思想是在计算每个quey稀疏性得分时,只需采样出的部分和key计算就可以了。就是找到这些重要的/稀疏的query,从而只计算这些queryl的attention值,来优化计算效率。
class ProbAttention(nn.Module):
def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
super(ProbAttention, self).__init__()
self.factor = factor
self.scale = scale
self.mask_flag = mask_flag
self.output_attention = output_attention
self.dropout = nn.Dropout(attention_dropout)
def _prob_QK(self, Q, K, sample_k, n_top):
B, H, L_K, E = K.shape
_, _, L_Q, _ = Q.shape
K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)
index_sample = torch.randint(L_K, (L_Q, sample_k))
K_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]
Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze(-2)
M = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)
M_top = M.topk(n_top, sorted=False)[1]
Q_reduce = Q[torch.arange(B)[:, None, None],
torch.arange(H)[None, :, None],
M_top, :]
Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1))
return Q_K, M_top
def _get_initial_context(self, V, L_Q):
B, H, L_V, D = V.shape
if not self.mask_flag:
V_sum = V.mean(dim=-2)
contex = V_sum.unsqueeze(-2).expand(B, H, L_Q, V_sum.shape[-1]).clone()
else:
assert(L_Q == L_V)
contex = V.cumsum(dim=-2)
return contex
def _update_context(self, context_in, V, scores, index, L_Q, attn_mask):
B, H, L_V, D = V.shape
if self.mask_flag:
attn_mask = ProbMask(B, H, L_Q, index, scores, device=V.device)
scores.masked_fill_(attn_mask.mask, -np.inf)
attn = torch.softmax(scores, dim=-1)
context_in[torch.arange(B)[:, None, None],
torch.arange(H)[None, :, None],
index, :] = torch.matmul(attn, V).type_as(context_in)
if self.output_attention:
attns = (torch.ones([B, H, L_V, L_V])/L_V).type_as(attn).to(attn.device)
attns[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], index, :] = attn
return (context_in, attns)
else:
return (context_in, None)
def forward(self, queries, keys, values, attn_mask):
B, L_Q, H, D = queries.shape
_, L_K, _, _ = keys.shape
queries = queries.transpose(2,1)
keys = keys.transpose(2,1)
values = values.transpose(2,1)
U_part = self.factor * np.ceil(np.log(L_K)).astype('int').item()
u = self.factor * np.ceil(np.log(L_Q)).astype('int').item()
U_part = U_part if U_part<L_K else L_K
u = u if u<L_Q else L_Q
scores_top, index = self._prob_QK(queries, keys, sample_k=U_part, n_top=u)
scale = self.scale or 1./sqrt(D)
if scale is not None:
scores_top = scores_top * scale
context = self._get_initial_context(values, L_Q)
context, attn = self._update_context(context, values, scores_top, index, L_Q, attn_mask)
return context.transpose(2,1).contiguous(), attn
编码器旨在提取长顺序输入的稳健长程依赖性,Encoder编码器的实现
class Encoder(nn.Module):
def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
super(Encoder, self).__init__()
self.attn_layers = nn.ModuleList(attn_layers)
self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
self.norm = norm_layer
def forward(self, x, attn_mask=None):
attns = []
if self.conv_layers is not None:
for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
x, attn = attn_layer(x, attn_mask=attn_mask)
x = conv_layer(x)
attns.append(attn)
x, attn = self.attn_layers[-1](x, attn_mask=attn_mask)
attns.append(attn)
else:
for attn_layer in self.attn_layers:
x, attn = attn_layer(x, attn_mask=attn_mask)
attns.append(attn)
if self.norm is not None:
x = self.norm(x)
return x, attns
其中编码器层的实现如下:
class EncoderLayer(nn.Module):
def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
super(EncoderLayer, self).__init__()
d_ff = d_ff or 4*d_model
self.attention = attention
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
self.activation = F.relu if activation == "relu" else F.gelu
def forward(self, x, attn_mask=None):
new_x, attn = self.attention(
x, x, x,
attn_mask = attn_mask
)
x = x + self.dropout(new_x)
y = x = self.norm1(x)
y = self.dropout(self.activation(self.conv1(y.transpose(-1,1))))
y = self.dropout(self.conv2(y).transpose(-1,1))
return self.norm2(x+y), attn
提出了生成式的decoder机制,在预测序列(也包括inferencel阶段)时一步得到结果,而不是step-by-step,直接将预测时间复杂度降低。
class Encoder(nn.Module):
def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
super(Encoder, self).__init__()
self.attn_layers = nn.ModuleList(attn_layers)
self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
self.norm = norm_layer
def forward(self, x, attn_mask=None):
attns = []
if self.conv_layers is not None:
for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
x, attn = attn_layer(x, attn_mask=attn_mask)
x = conv_layer(x)
attns.append(attn)
x, attn = self.attn_layers[-1](x, attn_mask=attn_mask)
attns.append(attn)
else:
for attn_layer in self.attn_layers:
x, attn = attn_layer(x, attn_mask=attn_mask)
attns.append(attn)
if self.norm is not None:
x = self.norm(x)
return x, attns
其中解码器层的实现如下:
class DecoderLayer(nn.Module):
def __init__(self, self_attention, cross_attention, d_model, d_ff=None,
dropout=0.1, activation="relu"):
super(DecoderLayer, self).__init__()
d_ff = d_ff or 4*d_model
self.self_attention = self_attention
self.cross_attention = cross_attention
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
self.activation = F.relu if activation == "relu" else F.gelu
def forward(self, x, cross, x_mask=None, cross_mask=None):
x = x + self.dropout(self.self_attention(
x, x, x,
attn_mask=x_mask
)[0])
x = self.norm1(x)
x = x + self.dropout(self.cross_attention(
x, cross, cross,
attn_mask=cross_mask
)[0])
y = x = self.norm2(x)
y = self.dropout(self.activation(self.conv1(y.transpose(-1,1))))
y = self.dropout(self.conv2(y).transpose(-1,1))
return self.norm3(x+y)
其中定义了许多参数,在其中存在一些bug有如下的->
这个bug是因为头两行参数的,中的required=True导致的,我们将其删除掉即可,改为如下:
parser.add_argument('--model', type=str, default='informer',help='model of experiment, options: [informer, informerstack, informerlight(TBD)]')
parser.add_argument('--data', type=str, default='ETTh1', help='data')
最后写如下的脚本文件可视化预测结果:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
file_path1 = "results/informer_ETTh1_ftM_sl96_ll48_pl24_dm512_nh8_el2_dl1_df2048_atprob_fc5_ebtimeF_dtTrue_mxTrue_test_0/true.npy"
file_path2 = "results/informer_ETTh1_ftM_sl96_ll48_pl24_dm512_nh8_el2_dl1_df2048_atprob_fc5_ebtimeF_dtTrue_mxTrue_test_1/pred.npy"
true_value = []
pred_value = []
data1 = np.load(file_path1)
data2 = np.load(file_path2)
print(data2)
for i in range(24):
true_value.append(data2[0][i][6])
pred_value.append(data1[0][i][6])
print(true_value)
print(pred_value)
df = pd.DataFrame({'real': true_value, 'pred': pred_value})
df.to_csv('results.csv', index=False)
fig = plt.figure(figsize=( 16, 8))
plt.plot(df['real'], marker='o', markersize=8)
plt.plot(df['pred'], marker='o', markersize=8)
plt.tick_params(labelsize = 28)
plt.legend(['real','pred'],fontsize=28)
plt.show()