基于DQN和TensorFlow的LunarLander实现(全代码)

发布时间:2024年01月18日

使用深度Q网络(Deep Q-Network, DQN)来训练一个在openai-gym的LunarLander-v2环境中的强化学习agent,让小火箭成功着陆。
下面代码直接扔到jupyter notebook或CoLab上就能跑起来。

在这里插入图片描述

安装和导入所需的库和环境

安装和设置所需的库和环境,使其能够在Jupyter Notebook中运行。

!pip install gym
!apt-get install xvfb -y
!pip install pyvirtualdisplay   #用于在没有显示器的环境中创建虚拟显示
!pip install Pillow             #一个图像处理库
!pip install swig
!pip install "gym[box2d]"

创建并启动一个虚拟显示,在没有图形界面的服务器上运行强化学习环境:

from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()

引入所需库:

import gym
import time
import tqdm
import numpy as np
from IPython import display as ipydisplay
from PIL import Image

创建一个LunarLander-v2环境的DQN代理:

agent = DQNAgent('LunarLander-v2')

total_score, records = agent.simulate(visualize=True)
print(f'Total score {total_score:.2f}')
record_list = []
for i in tqdm.tqdm(range(100)):
    total_score, _ = agent.simulate(visualize=False)
    record_list.append(total_score)

print(f'Average score in 100 episode {np.mean(record_list):.2f}')

?

Q网络搭建

import tensorflow as tf

L = tf.keras.layers

def create_network_model(input_shape: np.ndarray,
                         action_space: np.ndarray,
                         learning_rate=0.001) -> tf.keras.Sequential:
    model = tf.keras.Sequential([
        L.Dense(512, input_shape=input_shape, activation="relu"),
        L.Dense(256, input_shape=input_shape, activation="relu"),
        L.Dense(action_space)
    ])
    model.compile(loss="mse",
                  optimizer=tf.optimizers.Adam(lr=learning_rate))
    return model

?

经验回放实现

经验回放是一种在深度强化学习中常用的技术,主要用于打破数据的相关性和减少过拟合。
在强化学习中,代理通常会在训练过程中与环境进行大量交互,经验回放允许代理存储这些经验,并在后续的训练中反复利用这些数据。这种机制有助于改善学习效率减少数据样本间的时间相关性,提高训练过程的稳定性。

import random
import numpy as np
from collections import namedtuple

# 代表每一个样本的 namedtuple,方便存储和读取数据
Experience = namedtuple('Experience', ('state', 'action', 'reward', 'next_state', 'done'))

class ReplayMemory:

    def __init__(self, max_size):
        self.max_size = max_size
        self.memory = []

    def append(self, state, action, reward, next_state, done):
        """记录一个新的样本"""
        sample = Experience(state, action, reward, next_state, done)
        self.memory.append(sample)
        # 只留下最新记录的 self.max_size 个样本
        self.memory = self.memory[-self.max_size:]

    def sample(self, batch_size):
        """按照给定批次大小取样"""
        samples = random.sample(self.memory, batch_size)
        batch = Experience(*zip(*samples))

        # 转换数据为 numpy 张量返回
        states = np.array(batch.state)
        actions = np.array(batch.action)
        rewards = np.array(batch.reward)
        states_next = np.array(batch.next_state)
        dones = np.array(batch.done)

        return states, actions, rewards, states_next, dones

    def __len__(self):
        return len(self.memory)

?

DQNAgent实现

DQNAgent类是DQN算法的核心实现。它包含以下关键部分:
1、初始化:初始化环境、神经网络模型和经验回放缓存。
2、行为选择(choose_action):根据当前状态和ε-greedy策略选择行为。
3、经验回放(replay):从记忆中随机抽取小批量经验进行学习。
4、训练(train):进行多个episode的训练。

from IPython import display
from PIL import Image

# 定义超参数
LEARNING_RATE = 0.001
GAMMA = 0.99
EPSILON_DECAY = 0.995
EPSILON_MIN = 0.01


class DQNAgent:
    def __init__(self, env_name):
        self.env = gym.make(env_name)
        self.observation_shape = self.env.observation_space.shape
        self.action_count = self.env.action_space.n
        self.model = create_network_model(self.observation_shape, self.action_count)
        self.memory = ReplayMemory(500000)
        self.epsilon = 1.0
        self.batch_size = 64

    def choose_action(self, state, epsilon=None):
        """
        根据给定状态选择行为
        - epsilon == 0 完全使用模型选择行为
        - epsilon == 1 完全随机选择行为
        """
        if epsilon is None:
            epsilon = self.epsilon
        if np.random.rand() < epsilon:
            return np.random.randint(self.action_count)
        else:
            q_values = self.model.predict(np.expand_dims(state, axis=0))
            return np.argmax(q_values[0])

    def replay(self):
        """进行经验回放学习"""

        # 如果当前经验池经验数量少于批次大小,则跳过
        if len(self.memory) < self.batch_size:
            return

        states, actions, rewards, states_next, dones = self.memory.sample(self.batch_size)
        q_pred = self.model.predict(states)

        q_next = self.model.predict(states_next).max(axis=1)
        q_next = q_next * (1 - dones)
        q_update = rewards + GAMMA * q_next

        indices = np.arange(self.batch_size)
        q_pred[[indices], [actions]] = q_update

        self.model.train_on_batch(states, q_pred)

    def simulate(self, epsilon=None, visualize=True):
        records = []
        state = self.env.reset()
        is_done = False
        total_score = 0
        total_step  = 0
        while not is_done:
            action = self.choose_action(state, epsilon)
            state, reward, is_done, info = self.env.step(action)
            total_score += reward
            total_step += 1

            rgb_array = self.env.render(mode='rgb_array')
            records.append((rgb_array, action, reward, total_score))

            if visualize:
                display.clear_output(wait=True)
                img = Image.fromarray(rgb_array)
                # 当前 Cell 中展示图片
                display.display(img)
                print(f'Action {action} Action reward {reward:.2f} | Total score {total_score:.2f} | Step {total_step}')

                time.sleep(0.01)
        self.env.close()
        return total_score, records

    def train(self, episode_count: int, log_dir: str):
        """
        训练方法,按照给定 episode 数量进行训练,并记录训练过程关键参数到 TensorBoard
        """
        # 初始化一个 TensorBoard 记录器
        file_writer = tf.summary.create_file_writer(log_dir)
        file_writer.set_as_default()

        score_list = []
        best_avg_score = -np.inf

        for episode_index in range(episode_count):
            state = self.env.reset()
            score, step = 0, 0
            is_done = False
            while not is_done:
                # 根据状态选择一个行为
                action = self.choose_action(state)
                # 执行行为,记录行为和结果到经验池
                state_next, reward, is_done, info = self.env.step(action)
                self.memory.append(state, action, reward, state_next, is_done)
                score += reward

                state = state_next
                # 每 6 步进行一次回放训练
                # 此处也可以选择每一步回放训练,但会降低训练速度,这个是一个经验技巧
                if step % 1 == 0:
                    self.replay()
                step += 1

            # 记录当前 Episode 的得分,计算最后 100 Episode 的平均得分
            score_list.append(score)
            avg_score = np.mean(score_list[-100:])

            # 记录当前 Episode 得分,epsilon 和最后 100 Episode 的平均得分到 TensorBoard
            tf.summary.scalar('score', data=score, step=episode_index)
            tf.summary.scalar('average score', data=avg_score, step=episode_index)
            tf.summary.scalar('epsilon', data=self.epsilon, step=episode_index)

            # 终端输出训练进度
            print(f'Episode: {episode_index} Reward: {score:03.2f} '
                  f'Average Reward: {avg_score:03.2f} Epsilon: {self.epsilon:.3f}')

            # 调整 epsilon 值,逐渐减少随机探索比例
            if self.epsilon > EPSILON_MIN:
                self.epsilon *= EPSILON_DECAY

            # 如果当前平均得分比之前有改善,保存模型
            # 确保提前创建目录 outputs/chapter_15
            if avg_score > best_avg_score:
                best_avg_score = avg_score
                self.model.save(f'outputs/chapter_15/dqn_best_{episode_index}.h5')

?

训练

# 使用 LunarLander 初始化 Agent
agent = DQNAgent('LunarLander-v2')
import glob
# 读取现在已经记录的日志数量,避免日志重复记录
tf_log_index = len(glob.glob('tf_dir/lunar_lander/run_*'))
log_dir = f'tf_dir/lunar_lander/run_{tf_log_index}'

# 训练 2000 个 Episode
agent.train(20, log_dir)

agent.model.summary()
文章来源:https://blog.csdn.net/weixin_45116099/article/details/135681308
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。