在前两篇文章中,我们发现RL模型的目标是基于观察空间 (observations) 和最大化奖励和 (maximumize sum rewards) 的。
如果我们能够拟合出一个函数 (function) 来解决上述问题,那就可以避免存储一个 (在Double Q-Learning中甚至是两个) 巨大的Q_table。
Tabular -> Function
len(state) * len(action)
大小的Q_table在早期人们试过使用核函数或者线性函数等各种方法去拟合这个function,但后来深度神经网络出现后人们纷纷开始研究如何用DNN来拟合。
然而以上的拟合方式不免存在一个问题,我们期望得到一个DNN,使得DNN(state)->Q-value
。
可是强化学习中,最好的Q-value在开始时是不知道的 (这也是强化学习和机器学习不一样的地方:我们不知道能否训练到一个Q值,直到有人把它训练出来),这就导致我们在训练过程中没有目标函数。
所有的第一步必须从高维的感官输入中获得对环境的有效表示
深度Q网络(DQN)是一种将深度学习和Q学习相结合的强化学习方法。DQN由DeepMind于2015年提出,并在玩Atari视频游戏方面取得了显著的成功。DQN的核心原理是使用深度神经网络来近似Q函数,即在给定状态下采取某一动作的预期累积奖励。
使用神经网络近似Q函数:
经验回放:
DQN引入了经验回放机制,即将代理的经验(状态、动作、奖励、新状态)存储在回放缓冲区中。
训练时,从这个缓冲区中随机抽取小批量经验进行学习。这增加了数据的多样性,减少了样本之间的相关性,从而稳定了训练。
目标网络:
import time
from collections import defaultdict
import gymnasium as gym
import numpy as np
import random
from matplotlib import pyplot as plt, animation
from IPython.display import display, clear_output
env = gym.make("LunarLander-v2", continuous=False, render_mode='rgb_array')
class ExperienceBuffer:
def __init__(self, size=0):
self.states = []
self.actions = []
self.rewards = []
self.states_next = []
self.actions_next = []
self.size = 0
def clear(self):
self.__init__()
def append(self, s, a, r, s_n, a_n):
self.states.append(s)
self.actions.append(a)
self.rewards.append(r)
self.states_next.append(s_n)
self.actions_next.append(a_n)
self.size += 1
def batch(self, batch_size=128):
indices = np.random.choice(self.size, size=batch_size, replace=True)
return (
np.array(self.states)[indices],
np.array(self.actions)[indices],
np.array(self.rewards)[indices],
np.array(self.states_next)[indices],
np.array(self.actions_next)[indices],
)
import torch
from torch import nn
from torch.nn.functional import relu
import torch.nn.functional as F
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class DQN(nn.Module):
def __init__(self, state_size, action_size):
super().__init__()
self.state_size = state_size
self.action_size = action_size
self.hidden_size = 32
self.linear_1 = nn.Linear(self.state_size, self.hidden_size)
self.linear_2 = nn.Linear(self.hidden_size, self.action_size)
nn.init.uniform_(self.linear_1.weight, a=-0.1, b=0.1)
nn.init.uniform_(self.linear_2.weight, a=-0.1, b=0.1)
def forward(self, state):
if not isinstance(state, torch.Tensor):
state = torch.tensor([state], dtype=torch.float)
state = state.to(device)
return self.linear_2(relu(self.linear_1(state)))
def policy(model, state, eval=False):
eps = 0.1
if not eval and random.random() < eps:
return random.randint(0, model.action_size - 1)
else:
q_values = model(torch.tensor([state], dtype=torch.float))
action = torch.multinomial(F.softmax(q_values), num_samples=1)
return int(action[0])
dqn_model = DQN(state_size=8, action_size=4).to(device)
target_model = DQN(state_size=8, action_size=4).to(device)
from tqdm.notebook import tqdm
# 学习率
alpha = 0.9
# 折扣因子
gamma = 0.95
# 训练次数
episode = 1000
experience_buffer = ExperienceBuffer()
eval_iter = 100
eval_num = 100
# collect
def collect():
for e in tqdm(range(episode)):
state, info = env.reset()
action = policy(dqn_model, state)
sum_reward = 0
while True:
state_next, reward, terminated, truncated, info_next = env.step(action)
action_next= policy(dqn_model, state_next)
sum_reward += reward
experience_buffer.append(
state, action, reward, state_next, action_next
)
if terminated or truncated:
break
state = state_next
info = info_next
action = action_next
## learning
from torch.optim import Adam
loss_fn = nn.MSELoss()
optimizer = Adam(lr=1e-5, params=dqn_model.parameters())
losses = []
target_fix_period = 5
epoch = 3
def train():
for e in range(epoch):
batch_size = 128
for i in range(experience_buffer.size // batch_size):
s, a, r, s_n, a_n = experience_buffer.batch(batch_size)
s = torch.tensor(s, dtype=torch.float).to(device)
s_n = torch.tensor(s_n, dtype=torch.float).to(device)
r = torch.tensor(r, dtype=torch.float).to(device)
a = torch.tensor(a, dtype=torch.long).to(device)
a_n = torch.tensor(a_n, dtype=torch.long).to(device)
y = r + target_model(s_n).gather(1, a_n.unsqueeze(1)).squeeze(1)
y_hat = dqn_model(s).gather(1, a.unsqueeze(1)).squeeze(1)
loss = loss_fn(y, y_hat)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if i % 500 == 0:
print(f'i == {i}, loss = {loss} ')
if i % target_fix_period == 0:
target_model.load_state_dict(dqn_model.state_dict())
a_n
:动作
s_n
:状态将状态
s_n
作为输入,target_model
的输出是针对每个可能动作的 Q 值;如果s_n
包含多个状态(比如一个批量),输出将是一个批量的 Q 值
for i in range(10):
print(f'collect/train: {i}')
experience_buffer.clear()
collect()
train()
task_num = 10
frames = []
for _ in range(10):
state, _ = env.reset()
while True:
action = policy(dqn_model, state, eval=True)
state_next, reward, terminated, truncated, info_next = env.step(action)
frames.append(env.render())
if terminated or truncated:
break