奖励模型和策略的价值头 不 仅仅查看响应。相反,它将 query 和 response 连接在一起,作为 query_response
def get_rewards_op(self, queries, responses):
tokens = tf.concat([queries, responses], axis=1)
return self._build(tokens)
举例来说,如果 query = “他在想某事,但他的眼神很难读懂”。 ,和 response = “他看着他的左手,手臂伸在他的前面。” ,那么奖励模型和策略的价值会对query_response = “他在想某事,但他的眼神很难读懂。他看着他的左手,手臂伸在他的前面。” 进行前向传递,并产生形状为 (B, T, 1) 的奖励和价值,其中 B 是 BS (批量大小),T 是序列长度,而 1 代表奖励头的输出结构的维度为 1
def respond_op(self, queries, length):
contexts = self.embed_queries(queries)
context_length = tf.shape(contexts)[1]
result = sample.sample_sequence(
step=self.step_core,
context=contexts,
length=length,
model_hparams=self.model_hparams,
temperature=self.temperature,
extra_outputs={'values':tf.float32},
)
return dict(
responses=result['tokens'][:, context_length:],
logprobs=result['logprobs'],
values=result['values'],
)
T 意味着每个 token 都有与其和前文关联的奖励。例如,眼神
token 将有一个与他在想某事,但他的眼神很难读懂
相对应的奖励。
def _generator():
inner_gen = self.generator(mode, seed=seed, shuffle=shuffle, comm=comm)
for text in inner_gen:
tokens = encoder.encode(text)
if start_token is not None:
try:
first_index = tokens.index(start_token)+1
if first_index < len(tokens):
tokens = tokens[first_index:]
except:
continue
tokens = tokens[:sequence_length] # token截断
if end_token is not None:
try:
last_index = len(tokens)-tokens[::-1].index(end_token)
tokens = tokens[:last_index]
except:
continue
if len(tokens) < sequence_length:
tokens = tokens + [padding_token] * (sequence_length - len(tokens)) # padding token填充
assert len(tokens) == sequence_length
yield dict(tokens=tokens)
self.padding_token = len(encoder) + 2 # +2 unnecessary, for historical reasons
tokenizer.add_special_tokens({"pad_token": "[PAD]"})
# 实战案例
import transformers
tokenizer = transformers.AutoTokenizer.from_pretrained("gpt2", padding_side="right")
tokenizer.add_special_tokens({"pad_token": "[PAD]"})
query_length = 5
texts = [
"usually, he would",
"she thought about it",
]
tokens = []
for text in texts:
tokens.append(tokenizer.encode(text)[:query_length])
print("tokens", tokens)
inputs = tokenizer.pad(
{"input_ids": tokens},
padding="max_length",
max_length=query_length,
return_tensors="pt",
return_attention_mask=True,
)
print("inputs", inputs)
"""prints are
tokens [[23073, 11, 339, 561], [7091, 1807, 546, 340]]
inputs {'input_ids': tensor([[23073, 11, 339, 561, 50257],
[ 7091, 1807, 546, 340, 50257]]), 'attention_mask': tensor([[1, 1, 1, 1, 0],
[1, 1, 1, 1, 0]])}
"""
def body(past, prev, output, logprobs, *extras):
next_outputs = step(model_hparams, prev[:, tf.newaxis], past=past,
past_tokens=output[:, :-1])
logits = tf.cast(next_outputs['logits'], tf.float32) * beta
if top_k != 0:
logits = tf.cond(tf.equal(top_k, 0),
lambda: logits,
lambda: utils.take_top_k_logits(logits, top_k))
if top_p != 1.0:
logits = utils.take_top_p_logits(logits, top_p)
# 采样
next_sample = utils.sample_from_logits(logits, dtype=tf.int32)
next_logprob = utils.logprobs_from_logits(logits=logits, labels=next_sample)
return [
tf.concat([past, next_outputs['presents']], axis=-2),
tf.squeeze(next_sample, axis=[1]),
tf.concat([output, next_sample], axis=1),
tf.concat([logprobs, next_logprob], axis=1),
*[tf.concat([prev, next_outputs[k]], axis=1) for k, prev in zip(extra_outputs, extras)],
]
实战案例:
import torch
import transformers
tokenizer = transformers.AutoTokenizer.from_pretrained("gpt2", padding_side="right")
tokenizer.add_special_tokens({"pad_token": "[PAD]"}) # 添加特殊词元
pad_id = tokenizer.pad_token_id
query = torch.tensor([
[pad_id, pad_id, 23073],
])
response = torch.tensor([
[11, 339, 561],
])
response_length = 4
temperature = 0.7
pretrained_model = transformers.AutoModelForCausalLM.from_pretrained("gpt2")
pretrained_model.generation_config.eos_token_id = None # disable `pad_token_id` and `eos_token_id` because we just want to
pretrained_model.generation_config.pad_token_id = None # generate tokens without truncation / padding
generation_config = transformers.GenerationConfig(
max_new_tokens=response_length,
min_new_tokens=response_length,
temperature=temperature,
top_k=0.0,
top_p=1.0,
do_sample=True,
)
context_length = query.shape[1]
attention_mask = query != tokenizer.pad_token_id
input_ids = query.clone()
input_ids[~attention_mask] = 0 # set padding tokens to 0
output = pretrained_model.generate(
input_ids=input_ids,
attention_mask=attention_mask,
# position_ids=attention_mask.cumsum(1) - attention_mask.long(), # generation collapsed if this was turned on. TODO: why does generation collapse with this?
generation_config=generation_config,
return_dict_in_generate=True,
)
print(output.sequences)
"""
tensor([[ 0, 0, 23073, 16851, 11, 475, 991]])
"""
奖励模型只训练一个 epcho,以避免过度拟合有限量的人类注释数据 (例如,descriptiveness 任务只有大约 5000 个标签)。在这个单一的 epcho 中,学习率会退火至零
类似于奖励模型训练,策略训练的学习率也会退火至零
def train(self):
labels = download_labels(
self.hparams.labels.source,
label_type=self.label_type,
question_schemas=self.question_schemas,
total_labels=self.hparams.labels.num_train,
comm=self.comm
)
self.add_to_buffer(labels)
if self.hparams.normalize_before:
target_mean, target_std = self.target_mean_std()
self.normalize(self.sample_policy_responses, target_mean, target_std)
# Collect training data for reward model training. train_indices will include the indices
# trained on across all ranks, and its size must be a multiple of minibatch_size.
per_rank_batch_size = utils.exact_div(self.hparams.batch_size, self.num_ranks)
# Make sure each rank gets the same shuffle so we train on each point exactly once
train_indices = self.comm.bcast(np.random.permutation(self.hparams.labels.num_train))
# Train on train_indices
print(self.rank, "training on", self.hparams.labels.num_train, "in batches of", per_rank_batch_size)
for start_index in range(0, self.hparams.labels.num_train, self.hparams.batch_size):
end_index = start_index + self.hparams.batch_size
all_ranks_indices = train_indices[start_index:end_index]
our_indices = all_ranks_indices[self.rank::self.num_ranks]
lr = (1 - start_index / self.hparams.labels.num_train) * self.hparams.lr # 学习率退火
self.train_batch(our_indices, lr)
if self.hparams.normalize_after:
target_mean, target_std = np.zeros([]), np.ones([])
self.normalize(self.sample_policy_responses, target_mean, target_std)
def _build(self, X):
results = self.model(X=X, padding_token=self.padding_token)
reward = results['reward'][:, -1] # 取最后一个token
with tf.variable_scope(f'{self.scope}/reward_norm'):
self.reward_gain = tf.get_variable('gain', shape=(), initializer=tf.constant_initializer(1))
self.reward_bias = tf.get_variable('bias', shape=(), initializer=tf.constant_initializer(0))
reward = self.reward_gain * reward + self.reward_bias
self._set_initializers()
return reward
i n i t i a l = N ( 0 , 1 / ( d model? + 1 ) ) initial = \mathcal{N}\left(0,1 /\left(\sqrt{d_{\text {model }}+1}\right)\right) initial=N(0,1/(dmodel??+1?))
def fc_layer(x, outshape, *, in_axes=1, scale=None):
inshape = tuple([int(d) for d in x.shape[-in_axes:]]) if in_axes>0 else ()
outshape = tuple(outshape)
if scale is None:
scale = 1 / np.sqrt(np.prod(inshape) + 1)
w = tf.get_variable('w', inshape + outshape, initializer=tf.random_normal_initializer(stddev=scale)) # 权重初始化
b = tf.get_variable('b', outshape, initializer=tf.constant_initializer(0)) # 偏置初始化为0
# Call the regularizer manually so that it works correctly with GradientTape
regularizer = tf.contrib.layers.l2_regularizer(scale=1/np.prod(outshape)) #so that initial value of regularizer is 1
reg_loss = regularizer(w)
return tensordot(x, w, in_axes) + b, reg_loss
dropped_h = dropout(h, self.hparams.head_pdrop, do_dropout=do_dropout, seed=head_seed, name='drop')
# TODO: refactor this, perhaps move to Policy
res, reg_loss = fc_layer(dropped_h, (), scale=0 if head_name == 'value' else None)
reward = reward * reward_gain + reward_bias
来计算奖励值def _build(self, tokens, do_dropout=False, name=None):
with tf.variable_scope(self.scope, reuse=self.built, auxiliary_name_scope=not self.built, use_resource=self.use_resource):
lm_output = self.model(X=tokens, do_dropout=do_dropout, padding_token=self.padding_token)
reward = lm_output['reward'][:, -1] # 奖励取最后一个token
with tf.variable_scope('reward_norm'):
if not self.built:
self.reward_gain = tf.get_variable('gain', shape=(), initializer=tf.constant_initializer(1)) # 奖励权重w
self.reward_bias = tf.get_variable('bias', shape=(), initializer=tf.constant_initializer(0)) # 奖励偏置b
self._reward_gain_p = tf.placeholder(name='gain_p', dtype=tf.float32, shape=())
self._reward_bias_p = tf.placeholder(name='bias_p', dtype=tf.float32, shape=())
self._set_reward_norm = tf.group(self.reward_gain.assign(self._reward_gain_p),
self.reward_bias.assign(self._reward_bias_p))
if reward is not None:
reward = self.reward_gain * reward + self.reward_bias # reward计算
if not self.built:
self._set_initializers()
self.built = True
return reward
def normalize(self, sample_fn, target_means, target_stds):
if not self.hparams.normalize_samples:
return
self.reset_reward_scales() # reward_gain=1, reward_bias=0
query_responses = sample_fn(self.hparams.normalize_samples) # 采样
means, stds = self.stats(query_responses) # 评估奖励的 实证均值和标准差
self.set_reward_norms(means, stds, target_means, target_stds) # 归一化
if self.hparams.debug_normalize:
query_responses = sample_fn(self.hparams.debug_normalize)
stats = self.stats(query_responses)
self.log_stats_after_normalize(stats)
我们用( μ D \mu_{\mathcal{D}} μD?) 来表示实证均值,用( σ D \sigma_{\mathcal{D}} σD? ) 表示实证标准差,用 ( g ) (g) (g)表示 reward_gain ,用( b b b) 表示 reward_bias ,用( μ T = 0 \mu_{\mathcal{T}} = 0 μT?=0) 表示 目标均值,用( σ T = 1 \sigma_{\mathcal{T}}=1 σT?=1) 表示 目标标准差。然后我们有以下公式。
g N ( μ D , σ D ) + b = N ( g μ D , g σ D ) + b = N ( g μ D + b , g σ D ) = N ( μ T , σ T ) ? g = σ T σ D ? b = μ T ? g μ D \begin{aligned}g\mathcal{N}(\mu_{\mathcal{D}}, \sigma_{\mathcal{D}}) + b &= \mathcal{N}(g\mu_{\mathcal{D}}, g\sigma_{\mathcal{D}}) + b= \mathcal{N}(g\mu_{\mathcal{D}} + b, g\sigma_{\mathcal{D}}) = \mathcal{N}(\mu_{\mathcal{T}}, \sigma_{\mathcal{T}}) ·g &= \frac{\sigma_{\mathcal{T}}}{\sigma_{\mathcal{D}}} ·b &= \mu_{\mathcal{T}} - g\mu_{\mathcal{D}}\end{aligned} gN(μD?,σD?)+b?=N(gμD?,gσD?)+b=N(gμD?+b,gσD?)=N(μT?,σT?)?g?=σD?σT???b?=μT??gμD??
def train(self):
labels = download_labels(
self.hparams.labels.source,
label_type=self.label_type,
question_schemas=self.question_schemas,
total_labels=self.hparams.labels.num_train,
comm=self.comm
)
self.add_to_buffer(labels)
if self.hparams.normalize_before: # 训练前进行mean和std归一化
target_mean, target_std = self.target_mean_std()
self.normalize(self.sample_policy_responses, target_mean, target_std)
# Collect training data for reward model training. train_indices will include the indices
# trained on across all ranks, and its size must be a multiple of minibatch_size.
per_rank_batch_size = utils.exact_div(self.hparams.batch_size, self.num_ranks)
# Make sure each rank gets the same shuffle so we train on each point exactly once
train_indices = self.comm.bcast(np.random.permutation(self.hparams.labels.num_train))
# Train on train_indices
print(self.rank, "training on", self.hparams.labels.num_train, "in batches of", per_rank_batch_size)
for start_index in range(0, self.hparams.labels.num_train, self.hparams.batch_size):
end_index = start_index + self.hparams.batch_size
all_ranks_indices = train_indices[start_index:end_index]
our_indices = all_ranks_indices[self.rank::self.num_ranks]
lr = (1 - start_index / self.hparams.labels.num_train) * self.hparams.lr
self.train_batch(our_indices, lr)
if self.hparams.normalize_after: #训练后进行mean和std归一化
target_mean, target_std = np.zeros([]), np.ones([])
self.normalize(self.sample_policy_responses, target_mean, target_std)
reward_trainer = RewardModelTrainer(
reward_model=reward_model,
policy=ref_policy, # reward模型来更新p
query_sampler=query_sampler,
hparams=hparams,
comm=comm,
)
logits /= self.temperature
数据预处理的细节:
def _generator():
inner_gen = self.generator(mode, seed=seed, shuffle=shuffle, comm=comm)
for text in inner_gen:
tokens = encoder.encode(text)
if start_token is not None: # start_text="."
try:
first_index = tokens.index(start_token)+1
if first_index < len(tokens):
tokens = tokens[first_index:]
except:
continue
tokens = tokens[:sequence_length]
if end_token is not None: # end_text="."
try:
last_index = len(tokens)-tokens[::-1].index(end_token)
tokens = tokens[:last_index]
except:
continue
if len(tokens) < sequence_length: # 填充文本
tokens = tokens + [padding_token] * (sequence_length - len(tokens))
assert len(tokens) == sequence_length
yield dict(tokens=tokens)
def step_core(self, model_hparams, tokens, past=None, past_tokens=None, do_dropout=False, name=None):
with tf.name_scope(name, 'step'):
with tf.variable_scope(
self.scope,
reuse=self.built,
auxiliary_name_scope=not self.built,
use_resource=self.use_resource):
lm_output = self.model(X=tokens, past=past, past_tokens=past_tokens,
do_dropout=do_dropout, padding_token=self.padding_token)
# need to slice logits since we don't want to generate special tokens
logits = lm_output['lm_logits'][:,:,:self.model_hparams.n_vocab]
presents = lm_output['present']
value = lm_output['value']
if not self.built:
self._set_initializers()
self.built = True
return {
'logits': logits,
'values': value,
'presents': presents,
}
如何实现?
token 截断:我们想要在第一个出现在响应的 truncate_after 位置之后的 truncate_token 处截断,将截断 token 后的所有 token 替换为填充 token
在截断响应上运行奖励模型: 在 token 截断过程将响应截断后,代码然后在 截断的响应 上运行奖励模型。
拒绝采样: 如果在第 16 和 24 个 token 之间没有句号,那么将响应的分数替换为固定的低值
def make_score_fn(hparams, score_model):
padding_token = score_model.padding_token
postprocess_fn = lm_tasks.postprocess_fn_from_hparams(hparams, padding_token)
#decorate requires a named function, postprocess_fn can be anonymous
@utils.graph_function(responses=Schema(tf.int32, (None, None)))
def postprocess(responses):
return postprocess_fn(responses)
filter_fn = lm_tasks.filter_fn_from_hparams(hparams)
@utils.graph_function(
responses=Schema(tf.int32, (None, None)),
rewards=Schema(tf.float32, (None,)))
def penalize(responses, rewards):
valid = filter_fn(responses)
return tf.where(valid, rewards, hparams.penalty_reward_value * tf.ones_like(rewards))
@utils.graph_function(
queries=Schema(tf.int32, (None, None)),
responses=Schema(tf.int32, (None, None))
)
def unpenalized_score_fn(queries, responses):
return score_model.score_fn(queries, responses)
# 打分函数
def score_fn(queries, responses):
responses = postprocess(responses)
score = penalize(responses, unpenalized_score_fn(queries, responses))
return score, responses, dict(score=score)
score_fn.stat_schemas = dict(score=Schema(tf.float32, (None,)))
return score_fn
import numpy as np
batch_size = 8
nminibatches = 2
gradient_accumulation_steps = 2
mini_batch_size = batch_size // nminibatches
micro_batch_size = mini_batch_size // gradient_accumulation_steps
data = np.arange(batch_size).astype(np.float32)
print("data:", data)
print("batch_size:", batch_size)
print("mini_batch_size:", mini_batch_size)
print("micro_batch_size:", micro_batch_size)
for epoch in range(4):
batch_inds = np.random.permutation(batch_size)
print("epoch:", epoch, "batch_inds:", batch_inds)
for mini_batch_start in range(0, batch_size, mini_batch_size):
mini_batch_end = mini_batch_start + mini_batch_size
mini_batch_inds = batch_inds[mini_batch_start:mini_batch_end]
# `optimizer.zero_grad()` set optimizer to zero for gradient accumulation
for micro_batch_start in range(0, mini_batch_size, micro_batch_size):
micro_batch_end = micro_batch_start + micro_batch_size
micro_batch_inds = mini_batch_inds[micro_batch_start:micro_batch_end]
print("____? a forward pass on", data[micro_batch_inds])
# `optimizer.step()`
print("? a backward pass on", data[mini_batch_inds])
# data: [0. 1. 2. 3. 4. 5. 6. 7.]
# batch_size: 8
# mini_batch_size: 4
# micro_batch_size: 2
# epoch: 0 batch_inds: [6 4 0 7 3 5 1 2]
# ____? a forward pass on [6. 4.]
# ____? a forward pass on [0. 7.]
# ? a backward pass on [6. 4. 0. 7.]
# ____? a forward pass on [3. 5.]
# ____? a forward pass on [1. 2.]
# ? a backward pass on [3. 5. 1. 2.]
# epoch: 1 batch_inds: [6 7 3 2 0 4 5 1]
# ____? a forward pass on [6. 7.]
# ____? a forward pass on [3. 2.]
# ? a backward pass on [6. 7. 3. 2.]
# ____? a forward pass on [0. 4.]
# ____? a forward pass on [5. 1.]
# ? a backward pass on [0. 4. 5. 1.]
# epoch: 2 batch_inds: [1 4 5 6 0 7 3 2]
# ____? a forward pass on [1. 4.]
# ____? a forward pass on [5. 6.]
# ? a backward pass on [1. 4. 5. 6.]
# ____? a forward pass on [0. 7.]
# ____? a forward pass on [3. 2.]
# ? a backward pass on [0. 7. 3. 2.]
# epoch: 3 batch_inds: [7 2 4 1 3 0 6 5]
# ____? a forward pass on [7. 2.]
# ____? a forward pass on [4. 1.]
# ? a backward pass on [7. 2. 4. 1.]
# ____? a forward pass on [3. 0.]
# ____? a forward pass on [6. 5.]
# ? a backward pass on [3. 0. 6. 5.]
def compute_rewards(scores, logprobs, ref_logprobs):
kl = logprobs - ref_logprobs
non_score_reward = -self.kl_ctl.value * kl
rewards = non_score_reward.copy()
rewards[:, -1] += scores
return rewards, non_score_reward, self.kl_ctl.value
self.compute_rewards = compute_rewards
以 “usually, he would” 为例,它被标记化为 [23073, 11, 339, 561] 。假设我们使用 [23073] 作为查询,[11, 339, 561] 作为响应。然后在默认的 gpt2 参数下,响应标记将具有参考策略的对数概率 logprobs=[-3.3213, -4.9980, -3.8690] 。
def whiten(values, shift_mean=True):
mean, var = torch.mean(values), torch.var(values, unbiased=False)
whitened = (values - mean)* torch.rsqrt(var + 1e-8)
if not shift_mean:
whitened += mean
return whitened
def loss(self, rollouts):
values = rollouts['values']
old_logprob = rollouts['logprobs']
rewards = rollouts['rewards']
with tf.name_scope('ppo_loss'):
if self.hparams.ppo.whiten_rewards:
rewards = utils.whiten(rewards, shift_mean=False) # 奖励白化
lastgaelam = 0
advantages_reversed = []
gen_length = self.hparams.task.response_length
for t in reversed(range(gen_length)):
nextvalues = values[:, t + 1] if t < gen_length - 1 else 0.0
delta = rewards[:, t] + self.hparams.ppo.gamma * nextvalues - values[:, t]
lastgaelam = delta + self.hparams.ppo.gamma * self.hparams.ppo.lam * lastgaelam
advantages_reversed.append(lastgaelam)
advantages = tf.stack(advantages_reversed[::-1], axis=1)
returns = advantages + values
advantages = utils.whiten(advantages)
advantages = tf.stop_gradient(advantages) # Shouldn't do anything, but better not to think about it
outputs = self.policy.analyze_responses_op(rollouts['queries'], rollouts['responses'])
vpred = outputs['values']
vpredclipped = tf.clip_by_value(vpred, values - self.hparams.ppo.cliprange_value, values + self.hparams.ppo.cliprange_value)
vf_losses1 = tf.square(vpred - returns)
vf_losses2 = tf.square(vpredclipped - returns)
vf_loss = .5 * tf.reduce_mean(tf.maximum(vf_losses1, vf_losses2))
vf_clipfrac = tf.reduce_mean(tf.cast(tf.greater(vf_losses2, vf_losses1), tf.float32))
logprob = outputs['logprobs']
ratio = tf.exp(logprob - old_logprob)
pg_losses = -advantages * ratio
pg_losses2 = -advantages * tf.clip_by_value(ratio, 1.0 - self.hparams.ppo.cliprange, 1.0 + self.hparams.ppo.cliprange)
pg_loss = tf.reduce_mean(tf.maximum(pg_losses, pg_losses2))
pg_clipfrac = tf.reduce_mean(tf.cast(tf.greater(pg_losses2, pg_losses), tf.float32))
loss = pg_loss + self.hparams.ppo.vf_coef * vf_loss
entropy = tf.reduce_mean(outputs['entropies'])
approxkl = .5 * tf.reduce_mean(tf.square(logprob - old_logprob))
return_mean, return_var = tf.nn.moments(returns, axes=list(range(returns.shape.ndims)))
value_mean, value_var = tf.nn.moments(values, axes=list(range(values.shape.ndims)))
stats = dict(
loss=dict(policy=pg_loss, value=vf_loss, total=loss),
policy=dict(entropy=entropy, approxkl=approxkl, clipfrac=pg_clipfrac),
returns=dict(mean=return_mean, var=return_var),
val=dict(vpred=tf.reduce_mean(vpred), error=tf.reduce_mean((vpred - returns) ** 2),
clipfrac=vf_clipfrac, mean=value_mean, var=value_var)
)
return loss, utils.flatten_dict(stats, sep='/')
vpred = outputs['values']
vpredclipped = tf.clip_by_value(vpred, values - self.hparams.ppo.cliprange_value, values + self.hparams.ppo.cliprange_value)
vf_losses1 = tf.square(vpred - returns)
vf_losses2 = tf.square(vpredclipped - returns)
vf_loss = .5 * tf.reduce_mean(tf.maximum(vf_losses1, vf_losses2))
vf_clipfrac = tf.reduce_mean(tf.cast(tf.greater(vf_losses2, vf_losses1), tf.float32))