代码地址:https://github.com/marlbenchmark/on-policy
官方出品轻量化mappo代码:https://github.com/tinyzqh/light_mappo
对应论文:The Surprising Effectiveness of MAPPO in Cooperative, Multi-Agent Games
代码运行
使用conda新建环境,python版本推荐为3.6.7,并且安装readme中的MPE环境,然后将pycharm中train_mpe.py文件的configuration中的参数设置为--env_name "MPE" --algorithm_name "rmappo" --experiment_name "check" --scenario_name "simple_spread" --num_agents 3 --num_landmarks 3 --seed 1 --n_training_threads 1 --n_rollout_threads 4 --num_mini_batch 1 --episode_length 25 --num_env_steps 10000 --ppo_epoch 10 --use_ReLU --gain 0.01 --lr 7e-4 --critic_lr 7e-4 --wandb_name "leafii" --user_name "leafii"
,以免因为电脑配置不足无法运行。安装其他包时要参考文件夹中requirement.txt中的版本,以免由于版本兼容问题造成代码无法正常运行,debug。
MAPPO算法伪代码
有两个网络,分别是策略$\pi_{\theta}$和值函数$V_{\phi}$。(作者在文中说如果智能体是同种类的就采用相同的网络参数,对于每个智能体内部也可以采用各自的actor和critic网络,为了符号的便利性,作者直接使用一个网络参数来表示)。值函数$V_{\phi}$需要学习一个映射: $S \rightarrow \mathbb R$。策略函数$\pi_{\theta}$学习一个映射从观测$o_t^{a}$到一个范围的分布或者是映射到一个高斯函数的动作均值和方差用于之后采样动作。
- Actor的优化目标为:
$$L(\theta) = [\frac{1}{B_n}\sum_{i=1}^{B}\sum_{k-1}^{n}min(r_{\theta,i}^{(k)}A_{i}^{(k)},clip(r_{\theta,i}^{(k)},1-\epsilon,1+\epsilon)A_i^{(k)})]+\sigma \frac{1}{B_n}\sum_{i=1}^{B}\sum_{k-1}^{n}S[\pi_{\theta}(o_i^{(k)})],where r_{\theta,i}^{(k)}=\frac{\pi_{\theta}(a_i^{(k)}|o_i^{(k)})}{\pi_{\theta_{old}}(a_i^{(k)}|o_i^{(k)})}$$
其中优势函数$A_i^{(k)}$是采用GAE方法的,S表示策略的熵,$\sigma$是控制熵系数的一个超参数。
- Critic网络优化目标为:
$$L(\phi)=\frac{1}{B_n}\sum_{i=1}^{B}\sum_{k=1}^{n}(max[(V_\phi(s_i^{(k)})-\hat R_i)^2,(clip(V_\phi(s_i^{(k)}),V_{\phi_{old}}(s_i^{(k)}-\varepsilon),V_{\phi_{old}}(s_i^{(k)}+\varepsilon)-\hat R_i)^2]$$
其中$\hat R_i$是折扣奖励。B表示batch_size的大小,n表示智能体的数量。
代码整体流程
每个局部智能体接收一个局部的观察obs,输出一个动作概率,所有的actor智能体都采用一个actor网络。critic网络接收所有智能体的观测obs,cent_obs_space = n * obs_space
,其中n为智能体的个数,输出一个V值,V值用于actor的更新。actor的loss和PPO的loss类似,有添加一个熵的loss。Critic的loss更多的是对value的值做normalizer,并且在计算episode的折扣奖励的时候不是单纯的算折扣奖励,有采用gae算折扣回报的方式。
网络定义
代码定义在
onpolicy/algorithms/r_mappo/algorithm/rMAPPOPolicy.py
每一个智能体的观测obs_space为一个18维的向量,有3个智能体,cent_obs_space为一个54维的向量,单个智能体的动作空间act_space 为一个离散的5维的向量
actor
在
onpolicy/algorithms/utils/act.py
中,输入一个观测(18维),输出一个确切的动作actions和这个动作对数概率。
action_dim = action_space.n
self.action_out = Categorical(inputs_dim, action_dim, use_orthogonal, gain)
action_logits = self.action_out(x, available_actions)
actions = action_logits.mode() if deterministic else action_logits.sample()
action_log_probs = action_logits.log_probs(actions)
critic
critic输入维度为
cent_obs_space = n * obs_space = 54
,输出维度为1。
critic_features = self.base(cent_obs)
if self._use_naive_recurrent_policy or self._use_recurrent_policy:
critic_features, rnn_states = self.rnn(critic_features, rnn_states, masks)
values = self.v_out(critic_features)
采样流程
- 初始化obs
在on policy/scripts/train/train_mpe.py的make_train_env(all_args)函数中实例化4个环境:
return SubprocVecEnv([get_env_fn(i) for i in range(all_args.n_rollout_threads)])
在onpolicy/runner/shared/mpe_runner.py中的的warmup函数中,如果采用centralized_V值函数的训练方式(???),那么需要初始化的时候构造出多个智能体的share_obs:
obs = self.envs.reset() # shape = (4, 3, 18)
share_obs = obs.reshape(self.n_rollout_threads, -1) # shape = (4, 54)
# 指定3个智能体
share_obs = np.expand_dims(share_obs, 1).repeat(self.num_agents, axis=1) # shape = (4, 3, 54)
在share_obs中会将n=3个智能体的obs叠加在一起作为share_obs。
collect()
采用rollout
方式采样数据
在onpolicy/runner/shared/mpe_runner.py中的的collect函数中,调用self.trainer.prep_rollout()
函数将actor和critic都设置为eval()
格式,然后用np.concatenate()
函数将并行的环境的数据拼接在一起,这一步是将并行采样的那个纬度降掉:
value, action, action_log_prob, rnn_states, rnn_states_critic \
= self.trainer.policy.get_actions(np.concatenate(self.buffer.share_obs[step]),
np.concatenate(self.buffer.obs[step]),
np.concatenate(self.buffer.rnn_states[step]),# ?
np.concatenate(self.buffer.rnn_states_critic[step]),
np.concatenate(self.buffer.masks[step]))
将数据传入总的MAPPO策略网络R_MAPPOPolicy(onpolicy/algorithms/r_mappo/algorithm/rMAPPOPolicy.py)中去获取一个时间步的数据。在get_actions()里面调用actor去获取动作以及动作的对数概率,critic网络去获取对于cent_obs的状态值函数的输出:
# 调用actor去获取动作和动作的对数概率
actions, action_log_probs, rnn_states_actor = self.actor(obs,rnn_states_actor, masks, available_actions, deterministic)
在这里obs的shape是(4*3, 18),输出actions和action_log_probs的shape都为(12,1).
values, rnn_states_critic = self.critic(cent_obs, rnn_states_critic, masks) # 调用critic去对动作打分,得到values
cent_obs’s shape is (5*2, 14),values’s shape is (12,1),and rnn_states’s shape is (12, 1, 64).
values = np.array(np.split(_t2n(value), self.n_rollout_threads)) # 将value转化成4层3行1列的数据
actions = np.array(np.split(_t2n(action), self.n_rollout_threads)) # 这action转化成4层3行1列的数据
action_log_probs = np.array(np.split(_t2n(action_log_prob), self.n_rollout_threads))
rnn_states = np.array(np.split(_t2n(rnn_states), self.n_rollout_threads))
rnn_states_critic = np.array(np.split(_t2n(rnn_states_critic), self.n_rollout_threads))
...
actions_env = np.squeeze(np.eye(self.envs.action_space[0].n)[actions], 2) # 5维 (4, 3, 5)
最后将(12 , 1
)的actions
转换成(4, 3, 1
)的形式,方便之后并行送到并行的环境中去,作者这里还将动作进行了one-hot
编码,最后变成了(4, 3, 5
)的形式送入到环境中去。
# Obser reward and next obs
obs, rewards, dones, infos = self.envs.step(actions_env)
data = obs, rewards, dones, infos, values, actions, action_log_probs, rnn_states, rnn_states_critic
# insert data into buffer
self.insert(data)
环境下一次输出的obs
还是(4, 3, 18
)的形式,之后调insert
方法将数据添加到buffer
里面,在insert
方法里面会将局部观测构造一个全局观测share_obs
其shape=(4, 3, 54
)出来:
def insert(self, data):
obs, rewards, dones, infos, values, actions, action_log_probs, rnn_states, rnn_states_critic = data
rnn_states[dones == True] = np.zeros(((dones == True).sum(), self.recurrent_N, self.hidden_size), dtype=np.float32)
rnn_states_critic[dones == True] = np.zeros(((dones == True).sum(), *self.buffer.rnn_states_critic.shape[3:]), dtype=np.float32)
masks = np.ones((self.n_rollout_threads, self.num_agents, 1), dtype=np.float32)
masks[dones == True] = np.zeros(((dones == True).sum(), 1), dtype=np.float32)
if self.use_centralized_V:
share_obs = obs.reshape(self.n_rollout_threads, -1)
share_obs = np.expand_dims(share_obs, 1).repeat(self.num_agents, axis=1)
else:
share_obs = obs
self.buffer.insert(share_obs, obs, rnn_states, rnn_states_critic, actions, action_log_probs, values, rewards, masks)
上述过程循环迭代self.episode_length=100
次。
训练流程
- 计算优势函数
在训练之前,首先调用self.compute()
函数计算episode
的折扣回报,在计算折扣回报之前,先算这个episode
最后一个状态的状态值函数next_values
,其shape=(12, 1)
然后调用compute_returns
函数计算折扣回报:
def compute(self): # 计算这个episode的折扣回报,先用rMAPPOPolicy.py里面的get_values计算一下next_values
"""Calculate returns for the collected data."""
self.trainer.prep_rollout()
next_values = self.trainer.policy.get_values(np.concatenate(self.buffer.share_obs[-1]),
np.concatenate(self.buffer.rnn_states_critic[-1]),
np.concatenate(self.buffer.masks[-1]))
next_values = np.array(np.split(_t2n(next_values), self.n_rollout_threads))
self.buffer.compute_returns(next_values, self.trainer.value_normalizer) # 折扣回报的的计算方式
有了数据之后就可以开始计算折扣回报了(在这里有采用gae
算折扣回报的方式,并且有将value
做normalizer
)。compute_returns
函数在onpolicy/utils/shared_buffer.py
中,核心代码如下:
self.value_preds[-1] = next_value
gae = 0
for step in reversed(range(self.rewards.shape[0])):
if self._use_popart or self._use_valuenorm:
delta = self.rewards[step] + self.gamma * value_normalizer.denormalize(
self.value_preds[step + 1]) * self.masks[step + 1] \
- value_normalizer.denormalize(self.value_preds[step])
gae = delta + self.gamma * self.gae_lambda * self.masks[step + 1] * gae
self.returns[step] = gae + value_normalizer.denormalize(self.value_preds[step])
算完折扣回报后调用self.train()
函数进行训练:
def train(self): # 算完折扣回报之后调用self.train()函数进行训练
"""Train policies with data in buffer. """
self.trainer.prep_training() # 将网络设置为train()的格式
train_infos = self.trainer.train(self.buffer)
self.buffer.after_update() # 将buffer的第一个元素设置为其episode最后的一个元素
return train_infos
在self.trainer.train(self.buffer)
函数中先基于数据,计算优势函数(优势函数是针对全局的观测信息所得到的):
advantages = buffer.returns[:-1] - self.value_normalizer.denormalize(buffer.value_preds[:-1])
advantages_copy = advantages.copy()
advantages_copy[buffer.active_masks[:-1] == 0.0] = np.nan
mean_advantages = np.nanmean(advantages_copy) # float, shape = (1)
std_advantages = np.nanstd(advantages_copy) # float, shape = (1)
advantages = (advantages - mean_advantages) / (std_advantages + 1e-5)
然后从buffer
中采样数据,把线程、智能体的纬度全部降掉
# 从 data_generator 中采样
for sample in data_generator:
# 进行一次 PPO 更新
value_loss, critic_grad_norm, policy_loss, dist_entropy, actor_grad_norm, imp_weights \
= self.ppo_update(sample, update_actor)
ppo_update
函数大体流程是:
- 从buffer中抽样建立sample
- 将抽样的数据传递给rMAPPOPolicy.py中的evaluate_actions函数,得到 values, action_log_probs, dist_entropy
- 计算actor的loss
- 计算critic的loss
然后在onpolicy/algorithms/r_mappo/r_mappo.py
中
share_obs_batch, obs_batch, rnn_states_batch, rnn_states_critic_batch, actions_batch,\
value_preds_batch, return_batch,masks_batch,active_masks_batch,old_action_log_probs_batch, \
adv_targ, available_actions_batch = sample
拿到采样之后的数据,把obs
送给actor
网络,得到action_log_probs
, dist_entropy
。把cent_obs
送到critic
得到新的values
。
- 计算actor的loss
在ppo_update()
中,利用新老动作的概率分布和优势函数之后就可以更新actor
网络了:
# actor update
imp_weights = torch.exp(action_log_probs - old_action_log_probs_batch)
surr1 = imp_weights * adv_targ
surr2 = torch.clamp(imp_weights, 1.0 - self.clip_param, 1.0 + self.clip_param) * adv_targ
policy_action_loss = (-torch.sum(torch.min(surr1, surr2),
dim=-1,
keepdim=True) * active_masks_batch).sum() / active_masks_batch.sum()
(policy_loss - dist_entropy * self.entropy_coef).backward()
- 计算critic的loss
新的value
和老的value_preds_batch
和计算的return_batch
送到onpolicy/algorithms/r_mappo/r_mappo.py
文件的cal_value_loss
函数中去计算critic
的loss
:
value_loss = self.cal_value_loss(values, value_preds_batch, return_batch, active_masks_batch)
and then
# 对value做一个clipped
value_pred_clipped = value_preds_batch + (values - value_preds_batch).clamp(-self.clip_param, self.clip_param)
# 然后计算误差的clip
error_clipped = return_batch - value_pred_clipped
error_original = return_batch - values
# 然后直接计算loss
value_loss_clipped = mse_loss(error_clipped)
value_loss_original = mse_loss(error_original)
# 算出loss之后反向传播即可
(value_loss * self.value_loss_coef).backward()
实验设计
以下实验来源于Policy Regularization via Noisy Advantage Values for
Cooperative Multi-agent Actor-Critic methods(arXiv:2106.14334v13 )
算法表现对比
首先展示了论文介绍的算法以及其对比算法在SMAC各种场景下的胜率,将自己算法效果好于或者等于其他算法的效果的场景下的胜率进行加粗展示:
论文对该表的解释:
表 2 中的实验结果表明 (1) NV-MAPPO 在大多数困难场景 2 上的性能显着超过 MAPPO,例如 5m_vs_6m (+65%)、走廊 (+97%)、6h_vs_8z (+87%) 和 3s5z_vs_3s6z (+31%)。 (2)NV-IPPO在Super Hard场景3s5z_vs_3s6z(96%)和6h_vs_8z(94%)中取得了超高的胜率;我们推测这是因为噪声还可以防止 IPPO 由于非平稳性而过拟合。 (3) NV-MAPPO 在硬场景上的平均性能优于 Fine-tuned QMIX 和 MAPPO-FP。 (4) 我们在附录 B.1 中比较了 MAPG 和 NV-MAPG,发现 NV-MAPG 的性能也明显优于 MAPG。
所有这些结果表明,噪声值函数在实际任务中效果很好。由于我们使用 Fine-tuned QMIX [5] 作为基线,QMIX 的中位测试获胜率明显优于过去文献中的实验结果 [14, 19, 20, 26]。至此,NV-MAPPO 和 NV-IPPO 在 SMAC 中共同实现了 SOTA。具体来说,NV-IPPO(适用于 3s5z_vs_3s6z 和 6h_vs_8z)和 NVMAPPO(适用于其他硬场景)在所有硬场景中的平均胜率为 97%。
NA-MAPPO和NV-MAPPO的对比
接着又对NA-MAPPO和NV-MAPPO进行了对比,并对NA-MAPPO算法胜率的较大方差进行了解释,算法胜率的方差可以显示出算法的稳定性.
如图4所示,我们发现噪声优势方法在某些情况下可能会损害算法的稳定性,即噪声优势方法的胜率有很大的方差。我们推测,可能是显性噪声破坏了政策梯度的原有方向。 但是,在smac的某些困难情况下,NA-MAPPO的性能仍可与NV-MAPPO媲美; 我们注意到NA-MAPPO非常容易实现。所有这些结果表明,噪声优势值确实改善了vanilla MAPPO的性能。
然后对NV-MAPPO的噪声值函数如何影响性能进行进一步的实验分析:
接下来,我们对NV-MAPPO的噪声值函数如何影响性能进行进一步的实验分析。我们在图 5 中展示了一些 Hard 场景下代理维度中值函数𝑣𝑖的标准差。我们发现,在某些场景中,𝑣𝑖 的大方差意味着 NV-MAPPO 在这些场景中相对于 vanilla MAPPO 的性能提升也很大,如3s5z_vs_3s6z和6h_vs_8z(见图5和图4)。该定律表明,NV-MAPPO 的性能提升确实来自价值函数的噪声扰动。
最后分析了噪声值函数在3𝑠5𝑧_𝑣𝑠_3𝑠6𝑧场景下对策略熵的影响
最后分析了噪声值函数对方案3 𝑠 5 𝑧 _ 𝑣𝑠 _ 3 𝑠 6 𝑧 策略熵的影响。如图6所示,vanilla MAPPO策略的熵迅速下降,并落入局部最优解,因此胜率始终为零。对于NV-MAPPO,我们平滑了采样的优势值,并且噪声可以防止策略过拟合,因此策略的熵会更加谨慎地降低。
策略熵和好奇心探索的异同
policy entropy,从动作空间的角度出发,尽可能探索各种不同的动作,使得策略熵最大化,隐含着各种状态空间也会被探索到。policy entropy依据action的Q值来分配探索的概率,而不是$\epsilon-greedy$中固定概率$\epsilon/|A|$探索较低Q值的action.另外policy entropy不简单是一个正则项,有另一个完整的RL框架来描述它,参见此链接
curiosity,从状态空间的角度出发,尽可能探索环境的状态转移方式,并把未知的状态转移方式作为内部rewards,以期改变agent的行为。参考此链接
应用上,policy entropy适合连续动作空间,如操作机械臂等,得到的策略更鲁棒;curiosity适合sparse rewards的问题,如走迷宫,它能为训练提供更丰富的学习信号。
参考文献
- 多智能体强化学习(二) MAPPO算法详解 https://blog.csdn.net/weixin_39059031/article/details/117283800
- 多智能体强化学习MAPPO源代码解读 https://blog.csdn.net/onlyyyyyyee/article/details/118888711
- 如何理解强化学习中的”好奇心探索”和”策略熵”的异同? - bigiceberg M的回答 - 知乎 https://www.zhihu.com/question/311267842/answer/631757642