描述: 使用上下文嵌入进行结构化数据分类。
本示例演示了如何使用TabTransformer进行结构化数据分类,TabTransformer是一种用于监督和半监督学习的深度表格数据建模架构。TabTransformer基于自注意力的Transformer构建而成。Transformer层将分类特征的嵌入转换为强大的上下文嵌入,以实现更高的预测准确性。
import keras
from keras import layers
from keras import ops
import math
import numpy as np
import pandas as pd
from tensorflow import data as tf_data
import matplotlib.pyplot as plt
from functools import partial
本示例使用由UC Irvine Machine Learning Repository提供的美国人口普查收入数据集。任务是进行二元分类,预测一个人是否可能年收入超过5万美元。
数据集包括48,842个实例,具有14个输入特征:5个数值特征和9个分类特征。
首先,让我们将数据集从UCI Machine Learning Repository加载到一个Pandas DataFrame中:
# 导入必要的库
import pandas as pd
# 定义CSV文件的列名
CSV_HEADER = [
"age",
"workclass",
"fnlwgt",
"education",
"education_num",
"marital_status",
"occupation",
"relationship",
"race",
"gender",
"capital_gain",
"capital_loss",
"hours_per_week",
"native_country",
"income_bracket",
]
# 定义训练数据的URL
train_data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
# 从URL中读取训练数据,并指定列名
train_data = pd.read_csv(train_data_url, header=None, names=CSV_HEADER)
# 定义测试数据的URL
test_data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test"
# 从URL中读取测试数据,并指定列名
test_data = pd.read_csv(test_data_url, header=None, names=CSV_HEADER)
# 打印训练数据集的形状
print(f"Train dataset shape: {train_data.shape}")
# 打印测试数据集的形状
print(f"Test dataset shape: {test_data.shape}")
移除第一条记录(因为它不是一个有效的数据示例),以及类标签中的一个尾随的“点”。
# 从第二行开始,将test_data的数据赋值给test_data,相当于去掉了第一行的数据
test_data = test_data[1:]
# 将test_data的income_bracket列的每个值应用lambda函数进行处理
# lambda函数的作用是将value中的"."替换为空字符串""
test_data.income_bracket = test_data.income_bracket.apply(
lambda value: value.replace(".", "")
)
现在我们将训练数据和测试数据分别存储在不同的CSV文件中。
# 定义训练数据文件名为 "train_data.csv"
train_data_file = "train_data.csv"
# 定义测试数据文件名为 "test_data.csv"
test_data_file = "test_data.csv"
# 将训练数据保存为CSV文件,不包含索引和表头
train_data.to_csv(train_data_file, index=False, header=False)
# 将测试数据保存为CSV文件,不包含索引和表头
test_data.to_csv(test_data_file, index=False, header=False)
在这里,我们定义数据集的元数据,这些元数据对于读取和解析数据为输入特征以及根据其类型对输入特征进行编码非常有用。
# 数值特征的名称列表
NUMERIC_FEATURE_NAMES = [
"age", # 年龄
"education_num", # 受教育年限
"capital_gain", # 资本收益
"capital_loss", # 资本损失
"hours_per_week", # 每周工作小时数
]
# 分类特征及其词汇表的字典
CATEGORICAL_FEATURES_WITH_VOCABULARY = {
"workclass": sorted(list(train_data["workclass"].unique())), # 工作类别
"education": sorted(list(train_data["education"].unique())), # 教育程度
"marital_status": sorted(list(train_data["marital_status"].unique())), # 婚姻状况
"occupation": sorted(list(train_data["occupation"].unique())), # 职业
"relationship": sorted(list(train_data["relationship"].unique())), # 家庭关系
"race": sorted(list(train_data["race"].unique())), # 种族
"gender": sorted(list(train_data["gender"].unique())), # 性别
"native_country": sorted(list(train_data["native_country"].unique())), # 出生国家
}
# 用作实例权重的列名
WEIGHT_COLUMN_NAME = "fnlwgt"
# 分类特征的名称列表
CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURES_WITH_VOCABULARY.keys())
# 所有输入特征的名称列表
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES
# 每个特征的列默认值列表
COLUMN_DEFAULTS = [
[0.0] if feature_name in NUMERIC_FEATURE_NAMES + [WEIGHT_COLUMN_NAME] else ["NA"]
for feature_name in CSV_HEADER
]
# 目标特征的名称
TARGET_FEATURE_NAME = "income_bracket"
# 目标特征的标签列表
TARGET_LABELS = [" <=50K", " >50K"]
超参数包括模型架构和训练配置。
# 定义超参数
LEARNING_RATE = 0.001 # 学习率
WEIGHT_DECAY = 0.0001 # 权重衰减
DROPOUT_RATE = 0.2 # Dropout概率
BATCH_SIZE = 265 # 批次大小
NUM_EPOCHS = 15 # 迭代次数
# 定义Transformer模型的参数
NUM_TRANSFORMER_BLOCKS = 3 # Transformer块的数量
NUM_HEADS = 4 # 注意力头的数量
EMBEDDING_DIMS = 16 # 类别特征的嵌入维度
# 定义MLP模型的参数
MLP_HIDDEN_UNITS_FACTORS = [
2,
1,
] # MLP隐藏层单元数,作为输入数量的因子
NUM_MLP_BLOCKS = 2 # 基线模型中MLP块的数量
我们定义一个输入函数,该函数读取和解析文件,然后将特征和标签转换为tf.data.Dataset
以供训练或评估使用。
# 导入必要的库
import tensorflow as tf
from tensorflow.keras import layers
# 创建一个StringLookup对象,用于将目标标签转换为整数索引
target_label_lookup = layers.StringLookup(
vocabulary=TARGET_LABELS, mask_token=None, num_oov_indices=0
)
# 准备样本的函数,将特征和目标转换为相应的索引,并返回特征、目标索引和权重
def prepare_example(features, target):
# 将目标标签转换为整数索引
target_index = target_label_lookup(target)
# 获取权重
weights = features.pop(WEIGHT_COLUMN_NAME)
return features, target_index, weights
# 创建一个字典,用于存储特征名称和对应的StringLookup对象
lookup_dict = {}
for feature_name in CATEGORICAL_FEATURE_NAMES:
# 获取特征对应的词汇表
vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
# 创建一个StringLookup对象,用于将字符串值转换为整数索引
# 由于我们不使用掩码标记,也不期望任何未知词汇(oov)标记,所以将mask_token设置为None,num_oov_indices设置为0
lookup = layers.StringLookup(
vocabulary=vocabulary, mask_token=None, num_oov_indices=0
)
# 将特征名称和StringLookup对象存储到字典中
lookup_dict[feature_name] = lookup
# 对分类特征进行编码的函数
def encode_categorical(batch_x, batch_y, weights):
for feature_name in CATEGORICAL_FEATURE_NAMES:
# 将分类特征转换为整数索引
batch_x[feature_name] = lookup_dict[feature_name](batch_x[feature_name])
return batch_x, batch_y, weights
# 从CSV文件中获取数据集的函数
def get_dataset_from_csv(csv_file_path, batch_size=128, shuffle=False):
# 使用tf_data.experimental.make_csv_dataset函数从CSV文件中创建数据集
dataset = (
tf_data.experimental.make_csv_dataset(
csv_file_path,
batch_size=batch_size,
column_names=CSV_HEADER,
column_defaults=COLUMN_DEFAULTS,
label_name=TARGET_FEATURE_NAME,
num_epochs=1,
header=False,
na_value="?",
shuffle=shuffle,
)
# 使用map函数将数据集中的每个样本应用prepare_example函数进行预处理
.map(prepare_example, num_parallel_calls=tf_data.AUTOTUNE, deterministic=False)
# 使用map函数将数据集中的每个样本应用encode_categorical函数进行编码
.map(encode_categorical)
)
# 对数据集进行缓存
return dataset.cache()
import keras
# 定义一个函数,用于运行实验
def run_experiment(
model, # 模型
train_data_file, # 训练数据文件
test_data_file, # 测试数据文件
num_epochs, # 迭代次数
learning_rate, # 学习率
weight_decay, # 权重衰减
batch_size, # 批次大小
):
# 使用AdamW优化器,设置学习率和权重衰减
optimizer = keras.optimizers.AdamW(
learning_rate=learning_rate, weight_decay=weight_decay
)
# 编译模型,使用二元交叉熵作为损失函数,使用二元准确率作为评估指标
model.compile(
optimizer=optimizer,
loss=keras.losses.BinaryCrossentropy(),
metrics=[keras.metrics.BinaryAccuracy(name="accuracy")],
)
# 从CSV文件中获取训练数据集,设置批次大小,并进行随机打乱
train_dataset = get_dataset_from_csv(train_data_file, batch_size, shuffle=True)
# 从CSV文件中获取验证数据集,设置批次大小
validation_dataset = get_dataset_from_csv(test_data_file, batch_size)
# 输出训练开始的提示信息
print("Start training the model...")
# 使用训练数据集进行模型训练,设置迭代次数和验证数据集
history = model.fit(
train_dataset, epochs=num_epochs, validation_data=validation_dataset
)
# 输出训练结束的提示信息
print("Model training finished")
# 在验证数据集上评估模型,获取准确率
_, accuracy = model.evaluate(validation_dataset, verbose=0)
# 输出验证准确率
print(f"Validation accuracy: {round(accuracy * 100, 2)}%")
# 返回训练历史记录
return history
现在,将模型的输入定义为一个字典,其中键是特征名称,值是具有相应特征形状和数据类型的keras.layers.Input
张量。
# 定义一个函数create_model_inputs,用于创建模型的输入
def create_model_inputs():
# 创建一个空字典inputs,用于存储输入特征
inputs = {}
# 遍历FEATURE_NAMES列表中的每个特征名
for feature_name in FEATURE_NAMES:
# 如果特征名在NUMERIC_FEATURE_NAMES列表中
if feature_name in NUMERIC_FEATURE_NAMES:
# 创建一个数值型输入层,名称为feature_name,形状为空,数据类型为float32,并将其添加到inputs字典中
inputs[feature_name] = layers.Input(
name=feature_name, shape=(), dtype="float32"
)
else:
# 创建一个非数值型输入层,名称为feature_name,形状为空,数据类型为float32,并将其添加到inputs字典中
inputs[feature_name] = layers.Input(
name=feature_name, shape=(), dtype="float32"
)
# 返回inputs字典作为模型的输入
return inputs
encode_inputs
方法返回 encoded_categorical_feature_list
和 numerical_feature_list
。
我们将分类特征编码为嵌入向量,对于所有特征使用固定的 embedding_dims
,而不考虑它们的词汇大小。
这对于 Transformer 模型是必需的。
# 定义一个函数encode_inputs,用于将输入数据进行编码
# 输入参数包括inputs(输入数据)和embedding_dims(嵌入维度)
def encode_inputs(inputs, embedding_dims):
# 定义一个空列表,用于存储编码后的分类特征
encoded_categorical_feature_list = []
# 定义一个空列表,用于存储数值特征
numerical_feature_list = []
# 遍历输入数据中的每个特征
for feature_name in inputs:
# 判断特征名是否在CATEGORICAL_FEATURE_NAMES中
if feature_name in CATEGORICAL_FEATURE_NAMES:
# 获取特征名对应的词汇表
vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
# 创建一个查找表,用于将字符串值转换为整数索引
# 由于我们不使用掩码标记,也不期望有任何未知词汇(oov)标记,因此将mask_token设置为None,num_oov_indices设置为0
# 将字符串输入值转换为整数索引
# 创建一个指定维度的嵌入层
embedding = layers.Embedding(
input_dim=len(vocabulary), output_dim=embedding_dims
)
# 将索引值转换为嵌入表示
encoded_categorical_feature = embedding(inputs[feature_name])
# 将编码后的分类特征添加到列表中
encoded_categorical_feature_list.append(encoded_categorical_feature)
else:
# 将数值特征保持不变
numerical_feature = ops.expand_dims(inputs[feature_name], -1)
# 将数值特征添加到列表中
numerical_feature_list.append(numerical_feature)
# 返回编码后的分类特征列表和数值特征列表
return encoded_categorical_feature_list, numerical_feature_list
# 定义一个函数create_mlp,用于创建多层感知机模型
# 参数说明:
# - hidden_units: 一个列表,表示每个隐藏层的神经元个数
# - dropout_rate: 一个浮点数,表示在训练过程中随机丢弃神经元的比例
# - activation: 一个字符串,表示激活函数的名称
# - normalization_layer: 一个函数,用于对输入数据进行归一化处理
# - name: 一个字符串,表示模型的名称
def create_mlp(hidden_units, dropout_rate, activation, normalization_layer, name=None):
# 创建一个空列表,用于存储MLP的各个层
mlp_layers = []
# 遍历隐藏层的神经元个数
for units in hidden_units:
# 将归一化层添加到mlp_layers列表中
mlp_layers.append(normalization_layer())
# 将全连接层添加到mlp_layers列表中,指定神经元个数和激活函数
mlp_layers.append(layers.Dense(units, activation=activation))
# 将Dropout层添加到mlp_layers列表中,指定丢弃神经元的比例
mlp_layers.append(layers.Dropout(dropout_rate))
# 使用Sequential函数将mlp_layers列表中的层按顺序连接起来,创建一个Sequential模型
# 并指定模型的名称为name
return keras.Sequential(mlp_layers, name=name)
在第一个实验中,我们创建了一个简单的多层前馈网络。
def create_baseline_model(
embedding_dims, num_mlp_blocks, mlp_hidden_units_factors, dropout_rate
):
# 创建模型输入
inputs = create_model_inputs()
# 编码特征
encoded_categorical_feature_list, numerical_feature_list = encode_inputs(
inputs, embedding_dims
)
# 将所有特征连接起来
features = layers.concatenate(
encoded_categorical_feature_list + numerical_feature_list
)
# 计算前馈层的单元数
feedforward_units = [features.shape[-1]]
# 创建多个带有跳跃连接的前馈层
for layer_idx in range(num_mlp_blocks):
features = create_mlp(
hidden_units=feedforward_units,
dropout_rate=dropout_rate,
activation=keras.activations.gelu,
normalization_layer=layers.LayerNormalization,
name=f"feedforward_{layer_idx}",
)(features)
# 计算MLP的隐藏单元数
mlp_hidden_units = [
factor * features.shape[-1] for factor in mlp_hidden_units_factors
]
# 创建最终的MLP
features = create_mlp(
hidden_units=mlp_hidden_units,
dropout_rate=dropout_rate,
activation=keras.activations.selu,
normalization_layer=layers.BatchNormalization,
name="MLP",
)(features)
# 添加一个sigmoid作为二分类器
outputs = layers.Dense(units=1, activation="sigmoid", name="sigmoid")(features)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
baseline_model = create_baseline_model(
embedding_dims=EMBEDDING_DIMS,
num_mlp_blocks=NUM_MLP_BLOCKS,
mlp_hidden_units_factors=MLP_HIDDEN_UNITS_FACTORS,
dropout_rate=DROPOUT_RATE,
)
print("Total model weights:", baseline_model.count_params())
keras.utils.plot_model(baseline_model, show_shapes=True, rankdir="LR")
让我们训练和评估基准模型:
# 运行实验
history = run_experiment(
model=baseline_model(),
train_data_file='train_data.pt',
test_data_file='test_data.pt',
num_epochs=NUM_EPOCHS,
learning_rate=LEARNING_RATE,
weight_decay=WEIGHT_DECAY,
batch_size=BATCH_SIZE,
)
基准线性模型实现了约81%的验证准确率。
TabTransformer架构的工作原理如下:
embedding_dims
。softmax
分类器。论文在附录:实验和模型细节部分讨论了列嵌入的加法和连接。
TabTransformer的架构如下图所示,与论文中的展示一致。
def create_tabtransformer_classifier(
num_transformer_blocks,
num_heads,
embedding_dims,
mlp_hidden_units_factors,
dropout_rate,
use_column_embedding=False,
):
# 创建模型的输入
inputs = create_model_inputs()
# 对特征进行编码
encoded_categorical_feature_list, numerical_feature_list = encode_inputs(
inputs, embedding_dims
)
# 将编码后的分类特征堆叠起来,用于Transformer
encoded_categorical_features = ops.stack(encoded_categorical_feature_list, axis=1)
# 将数值特征进行拼接
numerical_features = layers.concatenate(numerical_feature_list)
# 如果使用列嵌入,将列嵌入添加到分类特征嵌入中
if use_column_embedding:
num_columns = encoded_categorical_features.shape[1]
column_embedding = layers.Embedding(
input_dim=num_columns, output_dim=embedding_dims
)
column_indices = ops.arange(start=0, stop=num_columns, step=1)
encoded_categorical_features = encoded_categorical_features + column_embedding(
column_indices
)
# 创建多个Transformer块的层
for block_idx in range(num_transformer_blocks):
# 创建多头注意力层
attention_output = layers.MultiHeadAttention(
num_heads=num_heads,
key_dim=embedding_dims,
dropout=dropout_rate,
name=f"multihead_attention_{block_idx}",
)(encoded_categorical_features, encoded_categorical_features)
# 跳跃连接1
x = layers.Add(name=f"skip_connection1_{block_idx}")(
[attention_output, encoded_categorical_features]
)
# 层归一化1
x = layers.LayerNormalization(name=f"layer_norm1_{block_idx}", epsilon=1e-6)(x)
# 前馈神经网络
feedforward_output = create_mlp(
hidden_units=[embedding_dims],
dropout_rate=dropout_rate,
activation=keras.activations.gelu,
normalization_layer=partial(
layers.LayerNormalization, epsilon=1e-6
), # 使用partial在初始化之前提供关键字参数
name=f"feedforward_{block_idx}",
)(x)
# 跳跃连接2
x = layers.Add(name=f"skip_connection2_{block_idx}")([feedforward_output, x])
# 层归一化2
encoded_categorical_features = layers.LayerNormalization(
name=f"layer_norm2_{block_idx}", epsilon=1e-6
)(x)
# 将分类特征的"上下文化"嵌入展平
categorical_features = layers.Flatten()(encoded_categorical_features)
# 对数值特征应用层归一化
numerical_features = layers.LayerNormalization(epsilon=1e-6)(numerical_features)
# 准备最终MLP块的输入
features = layers.concatenate([categorical_features, numerical_features])
# 计算MLP的隐藏单元
mlp_hidden_units = [
factor * features.shape[-1] for factor in mlp_hidden_units_factors
]
# 创建最终的MLP
features = create_mlp(
hidden_units=mlp_hidden_units,
dropout_rate=dropout_rate,
activation=keras.activations.selu,
normalization_layer=layers.BatchNormalization,
name="MLP",
)(features)
# 添加一个sigmoid作为二分类器
outputs = layers.Dense(units=1, activation="sigmoid", name="sigmoid")(features)
# 创建模型
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# 创建TabTransformer分类器模型
tabtransformer_model = create_tabtransformer_classifier(
num_transformer_blocks=NUM_TRANSFORMER_BLOCKS,
num_heads=NUM_HEADS,
embedding_dims=EMBEDDING_DIMS,
mlp_hidden_units_factors=MLP_HIDDEN_UNITS_FACTORS,
dropout_rate=DROPOUT_RATE,
)
# 打印模型的总参数数量
print("Total model weights:", tabtransformer_model.count_params())
# 绘制模型结构图
keras.utils.plot_model(tabtransformer_model, show_shapes=True, rankdir="LR")
让我们训练和评估TabTransformer模型:
# 运行实验
history = run_experiment(
model=tabtransformer_model,
train_data_file=train_data_file,
test_data_file=test_data_file,
num_epochs=NUM_EPOCHS,
learning_rate=LEARNING_RATE,
weight_decay=WEIGHT_DECAY,
batch_size=BATCH_SIZE,
)
TabTransformer模型达到了约85%的验证准确率。
需要注意的是,默认参数配置下,基准模型和TabTransformer模型的可训练权重数量相似,分别为109,629和92,151,并且两者使用相同的训练超参数。
TabTransformer在表格数据上明显优于MLP和最近的深度网络,同时与基于树的集成模型的性能相匹配。
TabTransformer可以通过使用标记的示例进行端到端监督训练来学习。
对于有少量标记示例和大量未标记示例的情况,可以使用预训练过程使用未标记数据来训练Transformer层。
然后,使用标记数据对预训练的Transformer层和顶部MLP层进行微调。