本篇博客是本人参加Datawhale组队学习第一次任务的笔记
【教程地址】https://github.com/datawhalechina/joyrl-book
【强化学习库JoyRL】https://github.com/datawhalechina/joyrl/tree/main
【JoyRL开发周报】 https://datawhale.feishu.cn/docx/OM8fdsNl0o5omoxB5nXcyzsInGe?from=from_copylink
【教程参考地址】https://github.com/datawhalechina/easy-rl
为什么学习强化学习?
对于任意问题,只要能够建模成序列决策问题或者带有鲜明的试错学习特征,就可以使用强化学习来解决,并且这是截至目前最为高效的方法之一。
强化学习的应用有哪些?
强化学习的方向有哪些?
强化学习与深度学习的联系?
深度学习在强化学习中扮演的角色主要是提供了一个强大的函数拟合能力,使得智能体能够处理复杂、高维度和非线性的环境。深度学习与强化学习之间的关系相当于眼睛和大脑的关键,眼睛是帮助大脑决策更好地观测世界的工具,对于一些没有眼睛的动物例如蚯蚓也可以通过其他的感官来观测并解析状态。
强化学习的算法大致可以分为哪几类?
强化学习算法基本上就分为两类,即基于价值的和基于策略梯度的算法
马尔可夫决策过程是强化学习的基本问题模型之一,它能够以数学的形式来描述智能体在与环境交互的过程中学到一个目标的过程。
智能体充当的是作出决策或动作,并且在交互过程中学习的角色
环境指的是智能体与之交互的一切外在事物,不包括智能体本身。
\qquad
确切地说,智能体与环境之间是在一系列离散的时步 ①(
time?step
\text{time step}
time?step )交互的,一般用
t
t
t 来表示,
t
=
0
,
1
,
2
,
?
t=0,1,2,\cdots
t=0,1,2,?②。在每个时步
t
t
t, 智能体会观测或者接收到当前环境的状态
s
t
s_t
st?,根据这个状态
s
t
s_t
st? 执行动作
a
t
a_t
at?。执行完动作之后会收到一个奖励
r
t
+
1
r_{t+1}
rt+1? ③,同时环境也会收到动作
a
t
a_t
at? 的影响会变成新的状态
s
t
+
1
s_{t+1}
st+1?,并且在
t
+
1
t+1
t+1 时步被智能体观测到。如此循环下去,我们就可以在这个交互过程中得到一串轨迹,如下式所示。
s
0
,
a
0
,
r
1
,
s
1
,
a
1
,
r
2
,
?
?
,
s
t
,
a
t
,
r
t
+
1
,
?
s_0,a_0,r_1,s_1,a_1,r_2,\cdots,s_t,a_t,r_{t+1},\cdots
s0?,a0?,r1?,s1?,a1?,r2?,?,st?,at?,rt+1?,?
以学习弹钢琴为例:
···
流程如下——学生根据老师的评价和琴声的反馈来更新自己演奏方式智能体:学生
环境:钢琴和教师
动作:弹琴
状态:所弹琴键的位置顺序
奖励:钢琴的声音和教师的反馈
···
1.智能体接收上一步环境的状态和奖励
2.智能体做出动作与环境进行交互
3.更新环境的状态和奖励
马尔可夫性质:某个状态的未来只与当前状态有关,与历史的状态无关。
P
(
s
t
+
1
∣
s
t
)
=
P
(
s
t
+
1
∣
s
0
,
s
1
,
?
?
,
s
t
)
P(s_{t+1}|s_t) = P(s_{t+1}|s_0, s_1,\cdots,s_t)
P(st+1?∣st?)=P(st+1?∣s0?,s1?,?,st?)
马尔可夫决策过程中智能体的目标是最大化累积的奖励(回报),用 G t G_t Gt? 表示,最简单的回报公式可以写成:
G t = r t + 1 + r t + 2 + ? + r T G_t = r_{t+1} + r_{t+2} + \cdots + r_T Gt?=rt+1?+rt+2?+?+rT?
其中 T T T 表示最后一个时步,也就是每回合的最大步数。
还有一种情况是智能体和环境持续交互,此时 T = ∞ T=\infty T=∞,针对这种情况我们引入一个折扣因子(discount factor) γ \gamma γ,并可以将回报表示:
G t = r t + 1 + γ r t + 2 + γ 2 r t + 3 + ? = ∑ k = 0 T = ∞ γ k r t + k + 1 G_t = r_{t+1} + \gamma r_{t+2} + \gamma^2 r_{t+3} + \cdots = \sum_{k=0}^{T=\infty} \gamma^k r_{t+k+1} Gt?=rt+1?+γrt+2?+γ2rt+3?+?=k=0∑T=∞?γkrt+k+1?
其中 γ \gamma γ 取值范围在 0 0 0 到 1 1 1 之间,它表示了我们在考虑未来奖励时的重要程度,控制着当前奖励和未来奖励之间的权衡。换句话说,它体现了我们对长远目标的关注度。当 γ = 0 \gamma=0 γ=0 时,我们只会关心当前的奖励,而不会关心将来的任何奖励。而当 γ \gamma γ 接近 1 1 1 时,我们会对所有未来奖励都给予较高的关注度。
这样做的好处是会让当前时步的回报 G t G_t Gt? 跟下一个时步 G t + 1 G_{t+1} Gt+1? 的回报是有所关联的,我们可以得到递推式:
G t ? r t + 1 + γ r t + 2 + γ 2 r t + 3 + γ 3 r t + 4 + ? = r t + 1 + γ ( r t + 2 + γ r t + 3 + γ 2 r t + 4 + ? ? ) = r t + 1 + γ G t + 1 \begin{aligned} G_t & \doteq r_{t+1}+\gamma r_{t+2}+\gamma^2 r_{t+3}+\gamma^3 r_{t+4}+\cdots \\ & =r_{t+1}+\gamma\left(r_{t+2}+\gamma r_{t+3}+\gamma^2 r_{t+4}+\cdots\right) \\ & =r_{t+1}+\gamma G_{t+1} \end{aligned} Gt???rt+1?+γrt+2?+γ2rt+3?+γ3rt+4?+?=rt+1?+γ(rt+2?+γrt+3?+γ2rt+4?+?)=rt+1?+γGt+1??
状态转移矩阵是指在马尔可夫过程中,描述系统从一个状态到另一个状态的概率转移矩阵。
图中每个曲线箭头表示指向自己,比如当学生在认真听讲即处于状态
s
1
s_1
s1? 时,会有
0.2
0.2
0.2 的概率继续认真听讲。当然也会分别有
0.4
0.4
0.4 和
0.4
0.4
0.4 的概率玩手机(
s
2
s_2
s2?)或者睡觉(
s
3
s_3
s3?)。此外,当学生处于状态
s
2
s_2
s2? 时,也会有
0.2
0.2
0.2 的概率会到认真听讲的状态(
s
1
s_1
s1?),像这种两个状态之间能互相切换的情况我们用一条没有箭头的线连接起来。
于状态数是有限的,我们可以把这些概率绘制成表格的形式:
状态转移矩阵是环境的一部分,跟智能体是没什么关系的,而智能体会根据状态转移矩阵来做出决策。在马尔可夫链(马尔可夫过程)的基础上增加奖励元素就会形成马尔可夫奖励过程(Markov reward process, MRP)
实际问题中,有很多例子其实是不符合马尔可夫性质的,比如我们所熟知的棋类游戏,因为在我们决策的过程中不仅需要考虑到当前棋子的位置和对手的情况,还需要考虑历史走子的位置例如吃子等。
MDP基于一组交互对象,即智能体和环境进行构建,所具有的要素包括状态、动作、策略和奖励
马尔可夫链 ? 奖励元素 马尔可夫奖励过程 ? 动作 马尔可夫决策过程 马尔可夫链 \stackrel{奖励元素}{\longrightarrow}马尔可夫奖励过程\stackrel{动作}{\longrightarrow} 马尔可夫决策过程 马尔可夫链?奖励元素?马尔可夫奖励过程?动作?马尔可夫决策过程
在马尔可夫链(马尔可夫过程)的基础上增加奖励元素就会形成马尔可夫奖励过程(Markov reward process,MRP), 在马尔可夫奖励过程基础上增加动作的元素就会形成马尔可夫决策过程,也就是强化学习的基本问题模型之一。
其中马尔可夫链和马尔可夫奖励过程在其他领域例如金融分析会用的比较多,强化学习则重在决策
相比于Q learning,DQN本质上是为了适应更为复杂的环境,并且经过不断的改良迭代,到了Nature DQN(即Volodymyr Mnih发表的Nature论文)这里才算是基本完善。DQN主要改动的点有三个:
设计初衷:强化学习每次产生的样本之间是相互关联的 , Q-learning算法训练的方式就是把每次通过与环境交互一次的样本直接喂入网络中训练。而在DQN中,我们会把每次与环境交互得到的样本都存储在一个经验回放中,然后每次从经验池中随机抽取一批样本来训练网络
使用一堆历史数据去训练,比之前用一次就扔掉好多了,大大提高样本效率
减少样本之间的相关性,原则上获取经验跟学习阶段是分开的,原来时序的训练数据有可能是不稳定的,打乱之后再学习有助于提高训练的稳定性,跟深度学习中划分训练测试集时打乱样本是一个道理。
首先是定义模型,就是定义两个神经网路,即当前网络和目标网络,它们的网络结构相同
class MLP(nn.Module): # 所有网络必须继承 nn.Module 类,这是 PyTorch 的特性
def __init__(self, input_dim,output_dim,hidden_dim=128):
super(MLP, self).__init__()
# 定义网络的层,这里都是线性层
self.fc1 = nn.Linear(input_dim, hidden_dim) # 输入层
self.fc2 = nn.Linear(hidden_dim,hidden_dim) # 隐藏层
self.fc3 = nn.Linear(hidden_dim, output_dim) # 输出层
def forward(self, x):
# 各层对应的激活函数
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x) # 输出层不需要激活函数
经验回放的功能比较简单,主要实现缓存样本和取出样本等两个功能:
class ReplayBuffer:
def __init__(self, capacity):
self.capacity = capacity # 经验回放的容量
self.buffer = [] # 用列表存放样本
self.position = 0 # 样本下标,便于覆盖旧样本
def push(self, state, action, reward, next_state, done):
''' 缓存样本
'''
if len(self.buffer) < self.capacity: # 如果样本数小于容量
self.buffer.append(None)
self.buffer[self.position] = (state, action, reward, next_state, done)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
''' 取出样本,即采样
'''
batch = random.sample(self.buffer, batch_size) # 随机采出小批量转移
state, action, reward, next_state, done = zip(*batch) # 解压成状态,动作等
return state, action, reward, next_state, done
def __len__(self):
''' 返回当前样本数
'''
return len(self.buffer)
智能体即策略的载体,因此有的时候也会称为策略。智能体的主要功能就是根据当前状态输出动作和更新策略,分别跟伪代码中的交互采样和模型更新过程相对应。
import torch
import torch.optim as optim
import math
import numpy as np
class DQN:
def __init__(self,model,memory,cfg):
self.action_dim = cfg.action_dim
self.device = torch.device(cfg.device)
self.gamma = cfg.gamma # 奖励的折扣因子
# e-greedy策略相关参数
self.sample_count = 0 # 用于epsilon的衰减计数
self.epsilon_start = cfg.epsilon_start
self.epsilon_end =cfg.epsilon_end
self.epsilon_decay = cfg.epsilon_decay
self.batch_size = cfg.batch_size
self.policy_net = model.to(self.device)
self.target_net = model.to(self.device)
# 复制参数到目标网络
for target_param, param in zip(self.target_net.parameters(),self.policy_net.parameters()):
target_param.data.copy_(param.data)
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg.lr) # 优化器
self.memory = memory # 经验回放
def sample_action(self, state):
''' 采样动作
'''
self.sample_count += 1
# epsilon指数衰减
self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
math.exp(-1. * self.sample_count / self.epsilon_decay)
if random.random() > self.epsilon:
with torch.no_grad():
state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
q_values = self.policy_net(state)
action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value
else:
action = random.randrange(self.action_dim)
return action
@torch.no_grad() # 不计算梯度,该装饰器效果等同于with torch.no_grad():
def predict_action(self, state):
''' 预测动作
'''
state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
q_values = self.policy_net(state)
action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value
return action
def update(self):
if len(self.memory) < self.batch_size: # 当经验回放中不满足一个批量时,不更新策略
return
# 从经验回放中随机采样一个批量的转移(transition)
state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(
self.batch_size)
# 将数据转换为tensor
state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)
action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1)
reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float)
next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)
done_batch = torch.tensor(np.float32(done_batch), device=self.device)
q_values = self.policy_net(state_batch).gather(dim=1, index=action_batch) # 计算当前状态(s_t,a)对应的Q(s_t, a)
next_q_values = self.target_net(next_state_batch).max(1)[0].detach() # 计算下一时刻的状态(s_t_,a)对应的Q值
# 计算期望的Q值,对于终止状态,此时done_batch[0]=1, 对应的expected_q_value等于reward
expected_q_values = reward_batch + self.gamma * next_q_values * (1-done_batch)
loss = nn.MSELoss()(q_values, expected_q_values.unsqueeze(1)) # 计算均方根损失
# 优化更新模型
self.optimizer.zero_grad()
loss.backward()
# clip防止梯度爆炸
for param in self.policy_net.parameters():
param.grad.data.clamp_(-1, 1)
self.optimizer.step()
所以我们简单介绍一下该环境。环境名称叫做 推车杆游戏。如图 所示,我们的目标是持续左右推动保持倒立的杆一直不倒。
环境的状态数是4(推车的位置、速度、杆的角度、角速度) , 动作数是 2(倒、不倒)
import gymnasium as gym
import os
def all_seed(seed = 1):
''' 万能的seed函数
'''
if seed == 0: # 不设置seed
return
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed) # config for CPU
torch.cuda.manual_seed(seed) # config for GPU
os.environ['PYTHONHASHSEED'] = str(seed) # config for python scripts
# config for cudnn
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.enabled = False
def env_agent_config(cfg):
env = gym.make(cfg.env_id) # 创建环境
all_seed(seed=cfg.seed)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
print(f"状态空间维度:{state_dim},动作空间维度:{action_dim}")
setattr(cfg,"state_dim",state_dim) # 更新state_dim到cfg参数中
setattr(cfg,"action_dim",action_dim) # 更新action_dim到cfg参数中
model = MLP(state_dim, action_dim, hidden_dim = cfg.hidden_dim) # 创建模型
memory = ReplayBuffer(cfg.memory_capacity)
agent = DQN(model,memory,cfg)
return env,agent
参数设置:
self.epsilon_start = 0.95 # epsilon 起始值
self.epsilon_end = 0.01 # epsilon 终止值
self.epsilon_decay = 500 # epsilon 衰减率
self.gamma = 0.95 # 折扣因子
self.lr = 0.0001 # 学习率
self.buffer_size = 100000 # 经验回放容量(经验值)
self.batch_size = 64 # 批大小
self.target_update = 4 # 目标网络更新频率
训练函数:
def train(cfg, env, agent):
''' 训练
'''
print("开始训练!")
rewards = [] # 记录所有回合的奖励
steps = []
for i_ep in range(cfg.train_eps):
ep_reward = 0 # 记录一回合内的奖励
ep_step = 0
state, info = env.reset(seed = cfg.seed) # 重置环境,返回初始状态
for _ in range(cfg.max_steps):
ep_step += 1
action = agent.sample_action(state) # 选择动作
next_state, reward, terminated, truncated , info = env.step(action) # 更新环境,返回transition
agent.memory.push((state, action, reward, next_state, terminated)) # 保存transition
state = next_state # 更新下一个状态
agent.update() # 更新智能体
ep_reward += reward # 累加奖励
if terminated:
break
if (i_ep + 1) % cfg.target_update == 0: # 智能体目标网络更新
agent.target_net.load_state_dict(agent.policy_net.state_dict())
steps.append(ep_step)
rewards.append(ep_reward)
if (i_ep + 1) % 10 == 0:
print(f"回合:{i_ep+1}/{cfg.train_eps},奖励:{ep_reward:.2f},Epislon:{agent.epsilon:.3f}")
print("完成训练!")
env.close()
return {'rewards':rewards}
def test(cfg, env, agent):
print("开始测试!")
rewards = [] # 记录所有回合的奖励
steps = []
for i_ep in range(cfg.test_eps):
ep_reward = 0 # 记录一回合内的奖励
ep_step = 0
state, info = env.reset(seed = cfg.seed) # 重置环境,返回初始状态
for _ in range(cfg.max_steps):
ep_step+=1
action = agent.predict_action(state) # 选择动作
next_state, reward, terminated, truncated , info = env.step(action) # 更新环境,返回transition
state = next_state # 更新下一个状态
ep_reward += reward # 累加奖励
if terminated:
break
steps.append(ep_step)
rewards.append(ep_reward)
print(f"回合:{i_ep+1}/{cfg.test_eps},奖励:{ep_reward:.2f}")
print("完成测试")
env.close()
return {'rewards':rewards}
结果展示:
# 获取参数
cfg = Config()
# 训练
env, agent = env_agent_config(cfg)
res_dic = train(cfg, env, agent)
plot_rewards(res_dic['rewards'], cfg, tag="train")
# 测试
res_dic = test(cfg, env, agent)
plot_rewards(res_dic['rewards'], cfg, tag="test") # 画出结果
状态空间维度:4,动作空间维度:2
开始训练!
回合:10/100,奖励:19.00,Epislon:0.638
回合:20/100,奖励:18.00,Epislon:0.487
回合:30/100,奖励:13.00,Epislon:0.366
回合:40/100,奖励:9.00,Epislon:0.296
回合:50/100,奖励:16.00,Epislon:0.239
回合:60/100,奖励:9.00,Epislon:0.198
回合:70/100,奖励:24.00,Epislon:0.135
回合:80/100,奖励:70.00,Epislon:0.049
回合:90/100,奖励:91.00,Epislon:0.019
回合:100/100,奖励:200.00,Epislon:0.010
完成训练!
开始测试!
回合:1/20,奖励:200.00
回合:2/20,奖励:200.00
回合:3/20,奖励:200.00
回合:4/20,奖励:200.00
回合:5/20,奖励:200.00
回合:6/20,奖励:200.00
回合:7/20,奖励:200.00
回合:8/20,奖励:200.00
回合:9/20,奖励:200.00
回合:10/20,奖励:200.00
回合:11/20,奖励:200.00
回合:12/20,奖励:200.00
回合:13/20,奖励:200.00
回合:14/20,奖励:200.00
回合:15/20,奖励:200.00
回合:16/20,奖励:200.00
回合:17/20,奖励:200.00
回合:18/20,奖励:200.00
回合:19/20,奖励:200.00
回合:20/20,奖励:200.00
完成测试
- 用深度网络代替 Q表,它的输入可以是连续的值,因此只需要把每个维度的坐标看作一个输入,就可以处理高维的状态空间了
- Q-leanring算法训练的方式就是把每次通过与环境交互一次的样本直接喂入网络中训练。而在 DQN中,我们会把每次与环境交互得到的样本都存储在一个经验回放中,然后每次从经验池中随机抽取一批样本来训练网络
引入策略是为了平衡探索与利用的过程,如果智能体只进行利用,那么它可能会陷入局部最优,而忽略了更好的动作或状态;如果智能体只进行探索,那么它可能会浪费时间和资源,而无法收敛到最优的策略。因此,智能体需要在探索和利用之间进行权衡,以达到最佳的学习效果。
在实践中每隔若干步才把每步更新的当前网络参数复制给目标网络,这样做的好处是保证训练的稳定,避免 Q 值的估计发散。
首先每次迭代的样本都是从经验池中随机抽取的,因此每次迭代的样本都是近似独立同分布的,这样就满足了梯度下降法的假设。
其次,经验池中的样本是从环境中实时交互得到的,因此每次迭代的样本都是相互关联的,这样的方式相当于是把每次迭代的样本都进行了一个打乱的操作,这样也能够有效地避免训练的不稳定性。
class DoubleDQN(object):
def __init__(self,cfg):
self.states = cfg.n_states
self.n_actions = cfg.n_actions
self.device = torch.device(cfg.device)
self.gamma = cfg.gamma # 折扣因子
# e-greedy策略相关参数
self.sample_count = 0 # 用于epsilon的衰减计数
self.epsilon = cfg.epsilon_start
self.sample_count = 0
self.epsilon_start = cfg.epsilon_start
self.epsilon_end = cfg.epsilon_end
self.epsilon_decay = cfg.epsilon_decay
self.batch_size = cfg.batch_size
self.target_update = cfg.target_update
self.policy_net = MLP(cfg.n_states,cfg.n_actions,hidden_dim=cfg.hidden_dim).to(self.device)
self.target_net = MLP(cfg.n_states,cfg.n_actions,hidden_dim=cfg.hidden_dim).to(self.device)
# 复制参数到目标网络
for target_param, param in zip(self.target_net.parameters(),self.policy_net.parameters()):
target_param.data.copy_(param.data)
# self.target_net.load_state_dict(self.policy_net.state_dict()) # or use this to copy parameters
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg.lr) # 优化器
self.memory = ReplayBuffer(cfg.buffer_size) # 经验回放
self.update_flag = False
def sample_action(self, state):
''' 采样动作
'''
self.sample_count += 1
# epsilon指数衰减
self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
math.exp(-1. * self.sample_count / self.epsilon_decay)
if random.random() > self.epsilon:
with torch.no_grad():
state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
q_values = self.policy_net(state)
action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value
else:
action = random.randrange(self.n_actions)
return action
@torch.no_grad() # 不计算梯度,该装饰器效果等同于with torch.no_grad():
def predict_action(self, state):
''' 预测动作
'''
state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
q_values = self.policy_net(state)
action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value
return action
def update(self):
if len(self.memory) < self.batch_size: # 当经验回放中不满足一个批量时,不更新策略
return
else:
if not self.update_flag:
print("开始更新策略!")
self.update_flag = True
# 从经验回放中随机采样一个批量的转移(transition)
state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(
self.batch_size)
# 将数据转换为tensor
state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)
action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1)
reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float).unsqueeze(1)
next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)
done_batch = torch.tensor(np.float32(done_batch), device=self.device).unsqueeze(1)
q_value_batch = self.policy_net(state_batch).gather(dim=1, index=action_batch) # 实际的Q值
next_q_value_batch = self.policy_net(next_state_batch) # 下一个状态对应的实际策略网络Q值
##################################################################################
next_target_value_batch = self.target_net(next_state_batch) # 下一个状态对应的目标网络Q值
# 将策略网络Q值最大的动作对应的目标网络Q值作为期望的Q值
next_target_q_value_batch = next_target_value_batch.gather(1, torch.max(next_q_value_batch, 1)[1].unsqueeze(1))
expected_q_value_batch = reward_batch + self.gamma * next_target_q_value_batch* (1-done_batch) # 期望的Q值
##################################################################################
# 计算损失
loss = nn.MSELoss()(q_value_batch, expected_q_value_batch)
# 优化更新模型
self.optimizer.zero_grad()
loss.backward()
# clip防止梯度爆炸
for param in self.policy_net.parameters():
param.grad.data.clamp_(-1, 1)
self.optimizer.step()
if self.sample_count % self.target_update == 0: # 每隔一段时间,将策略网络的参数复制到目标网络
self.target_net.load_state_dict(self.policy_net.state_dict())
import torch.nn as nn
import torch.nn.functional as F
class DuelingNet(nn.Module):
def __init__(self, n_states, n_actions,hidden_dim=128):
super(DuelingNet, self).__init__()
# hidden layer
self.hidden_layer = nn.Sequential(
nn.Linear(n_states, hidden_dim),
nn.ReLU()
)
# advantage
self.advantage_layer = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, n_actions)
)
# value
self.value_layer = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, state):
x = self.hidden_layer(state)
advantage = self.advantage_layer(x)
value = self.value_layer(x)
return value + advantage - advantage.mean()
Noisy DQN算法也是通过优化网络结构的方法来提升DQN算法的性能,但与Dueling DQN算法不同的是,它的目的并不是为了提高Q值的估计,而是增强网络的探索能力。
在神经网络中引入了噪声层来提高网络性能的,即将随机性应用到神经网络中的参数或者说权重,增加了 网络对于状态和动作空间的探索能力,从而提高收敛速度和稳定性
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
class NoisyLinear(nn.Module):
def __init__(self, input_dim, output_dim, std_init=0.4):
super(NoisyLinear, self).__init__()
self.input_dim = input_dim
self.output_dim = output_dim
self.std_init = std_init
self.weight_mu = nn.Parameter(torch.FloatTensor(output_dim, input_dim))
self.weight_sigma = nn.Parameter(torch.FloatTensor(output_dim, input_dim))
self.register_buffer('weight_epsilon', torch.FloatTensor(output_dim, input_dim))
self.bias_mu = nn.Parameter(torch.FloatTensor(output_dim))
self.bias_sigma = nn.Parameter(torch.FloatTensor(output_dim))
self.register_buffer('bias_epsilon', torch.FloatTensor(output_dim))
self.reset_parameters()
self.reset_noise()
def forward(self, x):
if self.training:
weight = self.weight_mu + self.weight_sigma.mul(torch.tensor(self.weight_epsilon))
bias = self.bias_mu + self.bias_sigma.mul(torch.tensor(self.bias_epsilon))
else:
weight = self.weight_mu
bias = self.bias_mu
return F.linear(x, weight, bias)
def reset_parameters(self):
mu_range = 1 / math.sqrt(self.weight_mu.size(1))
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.std_init / math.sqrt(self.weight_sigma.size(1)))
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.std_init / math.sqrt(self.bias_sigma.size(0)))
def reset_noise(self):
epsilon_in = self._scale_noise(self.input_dim)
epsilon_out = self._scale_noise(self.output_dim)
self.weight_epsilon.copy_(epsilon_out.ger(epsilon_in))
self.bias_epsilon.copy_(self._scale_noise(self.output_dim))
def _scale_noise(self, size):
x = torch.randn(size)
x = x.sign().mul(x.abs().sqrt())
return x
class NoisyMLP(nn.Module):
def __init__(self, input_dim,output_dim,hidden_dim=128):
super(NoisyMLP, self).__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.noisy_fc2 = NoisyLinear(hidden_dim, hidden_dim)
self.noisy_fc3 = NoisyLinear(hidden_dim, output_dim)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.noisy_fc2(x))
x = self.noisy_fc3(x)
return x
def reset_noise(self):
self.noisy_fc2.reset_noise()
self.noisy_fc3.reset_noise()
优化了经验回放的设计从而提高模型的收敛能力和鲁棒性。PER 可以翻译为优先经验回放(prioritized experience replay),跟数据结构中优先队列与普通队列一样,会在采样过程中赋予经验回放中样本的优先级。
TD误差越大,损失函数的值也越大,对于反向传播的作用也就越大。 。因此我们只需要设计一个经验回放,根据经验回放中的每个样本计算出的TD误差赋予对应的优先级,然后在采样的时候取出优先级较大的样本。
DQN算法为什么会产生Q值的过估计问题?
即使目标网络已经更新,选择最优动作的神经网络仍然可能会选择一个过高估计的动作,毕竟它们本来源自同一个权重,只不过稍有变化,所以会使得Q值过高。
同样是提高探索,Noisy DQN与ε?greedy策略有什么区别
深度强化学习既要考虑与环境交互过程中的探索能力,也要考虑深度模型本身的探索能力,从而尽量避免陷入局部最优解的困境之中,Noisy DQN是在神经网络的权重上添加参数化的噪音,从而使Q函数的输出具有随机性,而ε-greedy策略是在选择动作时以一定的概率随机选择一个动作,而不是最优的动作。同时Noisy DQN的探索是基于权重的,可以影响所有的动作的选择,而ε-greedy策略的探索是基于动作的,只影响单个的动作的选择。因此,Noisy DQN的探索更具有全局性和多样性,而ε-greedy策略的探索更具有局部性和随机性。
本篇文章首先介绍了什么是强化学习,强化学习的作用以及应用场景。之后介绍了马尔可夫决策这一基本强化学习问题,了解了马尔可夫决策过程主要包含哪些要素,以及它和马尔可夫链之间的关系。之后我们学习了DQN算法,它首次将深度学习引入强化学习中,通过与Q-learning的对比介绍了DQN算法的基本流程以及代码实现。最后介绍了一些DQN算法的变种。