您好,登錄后才能下訂單哦!
本篇內容主要講解“通過CartPole游戲詳解PPO優化的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“通過CartPole游戲詳解PPO優化的方法”吧!
在一個光滑的軌道上有個推車,桿子垂直微置在推車上,隨時有倒的風險。系統每次對推車施加向左或者向右的力,但我們的目標是讓桿子保持直立。桿子保持直立的每個時間單位都會獲得 +1 的獎勵。但是當桿子與垂直方向成 15 度以上的位置,或者推車偏離中心點超過 2.4 個單位后,這一輪局游戲結束。因此我們可以獲得的最高回報等于 200 。我們這里就是要通過使用 PPO 算法來訓練一個強化學習模型 actor-critic ,通過對比模型訓練前后的游戲運行 gif 圖,可以看出來我們訓練好的模型能長時間保持桿子處于垂直狀態。
python==3.10.9 tensorflow-gpu==2.10.0 imageio==2.26.1 keras==2.10,0 gym==0.20.0 pyglet==1.5.20 scipy==1.10.1
這段代碼主要是導入所需的庫,并設置了一些超參數。
import numpy as np import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers import gym import scipy.signal import time from tqdm import tqdm steps_per_epoch = 5000 # 每個 epoch 中訓練的步數 epochs = 20 # 用于訓練的 epoch 數 gamma = 0.90 # 折扣因子,用于計算回報 clip_ratio = 0.2 # PPO 算法中用于限制策略更新的比率 policy_learning_rate = 3e-4 # 策略網絡的學習率 value_function_learning_rate = 3e-4 # 值函數網絡的學習率 train_policy_iterations = 80 # 策略網絡的訓練迭代次數 train_value_iterations = 80 # 值函數網絡的訓練迭代次數 lam = 0.97 # PPO 算法中的 λ 參數 target_kl = 0.01 # PPO 算法中的目標 KL 散度 hidden_sizes = (64, 64) # 神經網絡的隱藏層維度 render = False # 是否開啟畫面渲染,False 表示不開啟
(1)這里定義了一個函數 discounted_cumulative_sums
,接受兩個參數 x
和 discount
,該函數的作用是計算給定獎勵序列 x
的折扣累計和,折扣因子 discount
是一個介于 0 和 1 之間的值,表示對未來獎勵的折扣程度。 在強化學習中,折扣累計和是一個常用的概念,表示對未來獎勵的折扣累加。
def discounted_cumulative_sums(x, discount): return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]
(2)這里定義了一個Buffer類,用于存儲訓練數據。類中有如下主要的函數:
init: 初始化函數,用于設置成員變量的初始值
store: 將觀測值、行為、獎勵、價值和對數概率存儲到對應的緩沖區中
finish_trajectory: 結束一條軌跡,用于計算優勢和回報,并更新 trajectory_start_index 的值
get: 獲取所有緩沖區的值,用在訓練模型過程中。在返回緩沖區的值之前,將優勢緩沖區的值進行標準化處理,使其均值為 0 ,方差為 1
class Buffer: def __init__(self, observation_dimensions, size, gamma=0.99, lam=0.95): self.observation_buffer = np.zeros( (size, observation_dimensions), dtype=np.float32 ) self.action_buffer = np.zeros(size, dtype=np.int32) self.advantage_buffer = np.zeros(size, dtype=np.float32) self.reward_buffer = np.zeros(size, dtype=np.float32) self.return_buffer = np.zeros(size, dtype=np.float32) self.value_buffer = np.zeros(size, dtype=np.float32) self.logprobability_buffer = np.zeros(size, dtype=np.float32) self.gamma, self.lam = gamma, lam self.pointer, self.trajectory_start_index = 0, 0 def store(self, observation, action, reward, value, logprobability): self.observation_buffer[self.pointer] = observation self.action_buffer[self.pointer] = action self.reward_buffer[self.pointer] = reward self.value_buffer[self.pointer] = value self.logprobability_buffer[self.pointer] = logprobability self.pointer += 1 def finish_trajectory(self, last_value=0): path_slice = slice(self.trajectory_start_index, self.pointer) rewards = np.append(self.reward_buffer[path_slice], last_value) values = np.append(self.value_buffer[path_slice], last_value) deltas = rewards[:-1] + self.gamma * values[1:] - values[:-1] self.advantage_buffer[path_slice] = discounted_cumulative_sums( deltas, self.gamma * self.lam ) self.return_buffer[path_slice] = discounted_cumulative_sums( rewards, self.gamma )[:-1] self.trajectory_start_index = self.pointer def get(self): self.pointer, self.trajectory_start_index = 0, 0 advantage_mean, advantage_std = ( np.mean(self.advantage_buffer), np.std(self.advantage_buffer), ) self.advantage_buffer = (self.advantage_buffer - advantage_mean) / advantage_std return ( self.observation_buffer, self.action_buffer, self.advantage_buffer, self.return_buffer, self.logprobability_buffer, )
(3)這里定義了一個多層感知機(Multi-Layer Perceptron,MLP)的網絡結構,有如下參數:
x
:輸入的張量
sizes
:一個包含每一層的神經元個數的列表
activation
:激活函數,用于中間層的神經元
output_activation
:輸出層的激活函數
該函數通過循環生成相應個數的全連接層,并將 x
作為輸入傳入。其中,units
指定每一層的神經元個數,activation
指定該層使用的激活函數,返回最后一層的結果。
def mlp(x, sizes, activation=tf.tanh, output_activation=None): for size in sizes[:-1]: x = layers.Dense(units=size, activation=activation)(x) return layers.Dense(units=sizes[-1], activation=output_activation)(x)
(4)這里定義了一個函數 logprobabilities
,用于計算給定動作 a
的對數概率。函數接受兩個參數,logits
和 a
,其中 logits
表示模型輸出的未歸一化的概率分布,a
表示當前采取的動作。函數首先對 logits
進行 softmax 歸一化,然后對歸一化后的概率分布取對數,得到所有動作的對數概率。接著,函數使用 tf.one_hot
函數生成一個 one-hot 編碼的動作向量,并與所有動作的對數概率向量相乘,最后對結果進行求和得到給定動作的對數概率。
def logprobabilities(logits, a): logprobabilities_all = tf.nn.log_softmax(logits) logprobability = tf.reduce_sum( tf.one_hot(a, num_actions) * logprobabilities_all, axis=1 ) return logprobability
(5)這里定義了一個函數 sample_action
。該函數接受一個 observation
(觀測值)參數,并在 actor 網絡上運行該觀測值以獲得動作 logits(邏輯值)。然后使用邏輯值(logits)來隨機采樣出一個動作,并將結果作為函數的輸出。
@tf.function def sample_action(observation): logits = actor(observation) action = tf.squeeze(tf.random.categorical(logits, 1), axis=1) return logits, action
(6)這里定義了一個用于訓練策略的函數train_policy
。該函數使用帶權重裁剪的 PPO 算法,用于更新 actor 的權重。
observation_buffer
:輸入的觀測緩沖區
action_buffer
:輸入的動作緩沖區
logprobability_buffer
:輸入的對數概率緩沖區
advantage_buffer
:輸入的優勢值緩沖區
在該函數內部,使用tf.GradientTape
記錄執行的操作,用于計算梯度并更新策略網絡。計算的策略損失是策略梯度和剪裁比率的交集和。使用優化器policy_optimizer
來更新actor的權重。最后,計算并返回 kl 散度的平均值,該值用于監控訓練的過程。
@tf.function def train_policy( observation_buffer, action_buffer, logprobability_buffer, advantage_buffer): with tf.GradientTape() as tape: ratio = tf.exp( logprobabilities(actor(observation_buffer), action_buffer) - logprobability_buffer ) min_advantage = tf.where( advantage_buffer > 0, (1 + clip_ratio) * advantage_buffer, (1 - clip_ratio) * advantage_buffer, ) policy_loss = -tf.reduce_mean( tf.minimum(ratio * advantage_buffer, min_advantage) ) policy_grads = tape.gradient(policy_loss, actor.trainable_variables) policy_optimizer.apply_gradients(zip(policy_grads, actor.trainable_variables)) kl = tf.reduce_mean( logprobability_buffer - logprobabilities(actor(observation_buffer), action_buffer) ) kl = tf.reduce_sum(kl) return kl
(7)這里實現了價值函數(critic)的訓練過程,函數接受兩個參數:一個是 observation_buffer
,表示當前存儲的狀態觀察序列;另一個是 return_buffer
,表示狀態序列對應的回報序列。在函數內部,首先使用 critic
模型來預測當前狀態序列對應的狀態值(V), 然后計算當前狀態序列的平均回報與 V 之間的均方誤差,并對其進行求和取平均得到損失函數 value_loss
。接下來計算梯度來更新可訓練的變量值。
@tf.function def train_value_function(observation_buffer, return_buffer): with tf.GradientTape() as tape: value_loss = tf.reduce_mean((return_buffer - critic(observation_buffer)) ** 2) value_grads = tape.gradient(value_loss, critic.trainable_variables) value_optimizer.apply_gradients(zip(value_grads, critic.trainable_variables))
這里用于構建強化學習中的 Actor-Critic 網絡模型。首先,使用 gy m庫中的 CartPole-v0 環境創建一個環境實例 env 。然后,定義了兩個變量,分別表示觀測空間的維度 observation_dimensions 和動作空間的大小 num_actions,這些信息都可以從 env 中獲取。接著,定義了一個 Buffer 類的實例,用于存儲每個時間步的觀測、動作、獎勵、下一個觀測和 done 信號,以便后面的訓練使用。
然后,使用 Keras 庫定義了一個神經網絡模型 Actor ,用于近似模仿策略函數,該模型輸入是當前的觀測,輸出是每個動作的概率分布的對數。
另外,還定義了一個神經網絡模型 Critic ,用于近似模仿值函數,該模型輸入是當前的觀測,輸出是一個值,表示這個觀測的價值。最后,定義了兩個優化器,policy_optimizer 用于更新 Actor 網絡的參數,value_optimizer 用于更新 Critic 網絡的參數。
env = gym.make("CartPole-v0") observation_dimensions = env.observation_space.shape[0] num_actions = env.action_space.n buffer = Buffer(observation_dimensions, steps_per_epoch) observation_input = keras.Input(shape=(observation_dimensions,), dtype=tf.float32) logits = mlp(observation_input, list(hidden_sizes) + [num_actions], tf.tanh, None) actor = keras.Model(inputs=observation_input, outputs=logits) value = tf.squeeze( mlp(observation_input, list(hidden_sizes) + [1], tf.tanh, None), axis=1 ) critic = keras.Model(inputs=observation_input, outputs=value) policy_optimizer = keras.optimizers.Adam(learning_rate=policy_learning_rate) value_optimizer = keras.optimizers.Adam(learning_rate=value_function_learning_rate)
在未訓練模型之前,將模型控制游戲的情況保存是 gif ,可以看出來技術很糟糕,很快就結束了游戲。
import imageio start = env.reset() frames = [] for t in range(steps_per_epoch): frames.append(env.render(mode='rgb_array')) start = start.reshape(1, -1) logits, action = sample_action(start) start, reward, done, _ = env.step(action[0].numpy()) if done: break with imageio.get_writer('未訓練前的樣子.gif', mode='I') as writer: for frame in frames: writer.append_data(frame)
這里主要是訓練模型,執行 eopch 輪,每一輪中循環 steps_per_epoch 步,每一步就是根據當前的觀測結果 observation 來抽樣得到下一步動作,然后將得到的各種觀測結果、動作、獎勵、value 值、對數概率值保存在 buffer 對象中,待這一輪執行游戲運行完畢,收集了一輪的數據之后,就開始訓練策略和值函數,并打印本輪的訓練結果,不斷重復這個過程,
observation, episode_return, episode_length = env.reset(), 0, 0 for epoch in tqdm(range(epochs)): sum_return = 0 sum_length = 0 num_episodes = 0 for t in range(steps_per_epoch): if render: env.render() observation = observation.reshape(1, -1) logits, action = sample_action(observation) observation_new, reward, done, _ = env.step(action[0].numpy()) episode_return += reward episode_length += 1 value_t = critic(observation) logprobability_t = logprobabilities(logits, action) buffer.store(observation, action, reward, value_t, logprobability_t) observation = observation_new terminal = done if terminal or (t == steps_per_epoch - 1): last_value = 0 if done else critic(observation.reshape(1, -1)) buffer.finish_trajectory(last_value) sum_return += episode_return sum_length += episode_length num_episodes += 1 observation, episode_return, episode_length = env.reset(), 0, 0 ( observation_buffer, action_buffer, advantage_buffer, return_buffer, logprobability_buffer, ) = buffer.get() for _ in range(train_policy_iterations): kl = train_policy( observation_buffer, action_buffer, logprobability_buffer, advantage_buffer ) if kl > 1.5 * target_kl: break for _ in range(train_value_iterations): train_value_function(observation_buffer, return_buffer) print( f"完成第 {epoch + 1} 輪訓練, 平均獎勵: {sum_length / num_episodes}" )
打印:完成第 1 輪訓練, 平均獎勵: 30.864197530864196
完成第 2 輪訓練, 平均獎勵: 40.32258064516129
...
完成第 9 輪訓練, 平均獎勵: 185.1851851851852
完成第 11 輪訓練, 平均獎勵: 172.41379310344828
...
完成第 14 輪訓練, 平均獎勵: 172.41379310344828
...
完成第 18 輪訓練, 平均獎勵: 185.1851851851852
...
完成第 20 輪訓練, 平均獎勵: 200.0
在訓練模型之后,將模型控制游戲的情況保存是 gif ,可以看出來技術很嫻熟,可以在很長的時間內使得棒子始終保持近似垂直的狀態。
import imageio start = env.reset() frames = [] for t in range(steps_per_epoch): frames.append(env.render(mode='rgb_array')) start = start.reshape(1, -1) logits, action = sample_action(start) start, reward, done, _ = env.step(action[0].numpy()) if done: break with imageio.get_writer('訓練后的樣子.gif', mode='I') as writer: for frame in frames: writer.append_data(frame)
到此,相信大家對“通過CartPole游戲詳解PPO優化的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。