{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# PPO - Proximal Policy Optimization" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "At the end of the _policy gradient_ notebook, we identified the techniques used in PPO. The\n", "[PPO algorithm](https://arxiv.org/abs/1707.06347) was inspired by trying to simplify _TRPO_, for\n", "more details please check [this video](https://www.youtube.com/watch?v=KjWF8VIMGiY&list=PLwRJQ4m4UJjNymuBM9RdmB3Z9N5-0IlY0&index=4).\n", "\n", "Here are the techniques we are going to implement:\n", "\n", " 1. Learn from multiple parallel / vectorized environments.\n", " 2. Use _Generalized Advantage Estimation_, or [GAE](https://arxiv.org/abs/1506.02438).\n", " 3. Reuse recent experiences to learn in mini-batches (even if technically off-policy).\n", " 4. Clip the policy loss and gradients to prevent the policy from derailing.\n", " 5. Use _trajectory segments_ (instead of entire episodes) for learning.\n", "\n", "With all of that, we'll be able to land on the Moon with PPO!" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "import torch.optim as optim\n", "\n", "from dataclasses import dataclass\n", "from torch.distributions.normal import Normal\n", "\n", "import gymnasium as gym\n", "\n", "from util.gymnastics import DEVICE, gym_simulation, init_random" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Environment" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "lunar_lander_kwargs = {\n", " 'continuous': True,\n", " 'gravity': -10.0,\n", " 'enable_wind': False,\n", " 'wind_power': 15.0,\n", " 'turbulence_power': 1.5,\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "gym_simulation(\"LunarLander-v2\", env_kwargs=lunar_lander_kwargs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Trajectory Segments\n", "\n", "In PPO learning we don't loop over episodes (and max-steps-per episode). Instead, we learn across\n", "_trajectory segments_, which are stitched together when an episode ends and the next one begins.\n", "That has various advantages, one of which is the ability to better solve long-lasting environments.\n", "\n", "For a more in-depth explanation, check out the [CleanRL PPO implementation explanation](https://iclr-blog-track.github.io/2022/03/25/ppo-implementation-details/),\n", "specifically point 1. This implementation has been inspired in many parts by the CleanRL one." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "@dataclass\n", "class TrajectorySegment:\n", " \"\"\"A trajectory segment collected for PPO training.\n", "\n", " In particular, the segment contains M timesteps for N bots performing actions in the\n", " environment. So for example `states` has dimension: (M, N, ).\n", " \"\"\"\n", "\n", " states: torch.Tensor # (M, N, S_dim)\n", " \"\"\"The states for the N bots collected for M timesteps.\"\"\"\n", " actions: torch.Tensor # (M, N, A_dim)\n", " \"\"\"The actions taken by the N bots in the M timesteps.\"\"\"\n", " logprobs: torch.Tensor # (M, N)\n", " \"\"\"The log-probabilty of the action for the N bots in the M timesteps.\"\"\"\n", " values: torch.Tensor # (M, N)\n", " \"\"\"The estimated state value for the N bots for the M timesteps.\"\"\"\n", " rewards: torch.Tensor # (M, N)\n", " \"\"\"The rewards for the N bots at the M timesteps for the action taken.\"\"\"\n", " dones: torch.Tensor # (M, N)\n", " \"\"\"Whether the action taken at the current state transitioned to a terminal state.\n", "\n", " Note that dones[t] refers to whether the state[t+1] is a terminal state. Also, we\n", " never really store the terminal state in `states` because if the episode completes\n", " the environment resets and returns the next starting state for a new episode.\n", " \"\"\"\n", " next_start_state: torch.Tensor # (N, S_dim)\n", " \"\"\"The next state from which to start the next trajectory segment collection.\"\"\"\n", "\n", " def __len__(self):\n", " return self.states.shape[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Mini-batch Learning\n", "\n", "Remember that in REINFORCE we threw away the data collected by the agent every time? That is because\n", "once we learn from the experiences, the policy has changed and we cannot reuse the data sampled by\n", "an old policy to learn... but that sounds quite wasteful...\n", "\n", "In PPO, for each trajectory segment we can perform multiple epochs of learning in mini-batches\n", "instead! How is that possible?!\n", "\n", "It turns out that, if the old and new policy are \"similar enough\", we can do gradient ascent using\n", "the gradient of the _ratio_ of the new policy over the old policy times the return (a.k.a., the\n", "_surrogate_ function):\n", "\n", "$$\n", "g = \\nabla_{\\theta'} \\sum_{t} \\frac{\\pi_{\\theta'}(a_t|s_t)}{\\pi_{\\theta}(a_t|s_t)} R^{future}\n", "$$\n", "\n", "The only difference in the implementation is that we will use [log probabilities](https://en.wikipedia.org/wiki/Log_probability)\n", "for speed and numerical stability. For an in-depth explanation, see the appendix.\n", "\n", "Now that we know that, let's focus on implementing some convenient batching utilities!" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "@dataclass\n", "class LearningBatch:\n", " \"\"\"Data used for learning by the Agent.\n", "\n", " The learning batch contains (shuffled and flattened) collected experiences for N environment\n", " bots, and their corresponding advantages and returns. Assuming the size of the batch is B, then\n", " for example the `states` dimension is (B, ).\n", "\n", " Effectively, the learning batch is a bag of B tuples of:\n", "\n", " (state, action, logprob, advantage, return)\n", "\n", " Each tuple represents a single environment bot collected experience, hence the experiences in\n", " the learning batch can be randomly of different bots and in different `quantities` (in the\n", " case of mini-batches).\n", " \"\"\"\n", " states: torch.Tensor # (B, S_dim)\n", " actions: torch.Tensor # (B, A_dim)\n", " logprobs: torch.Tensor # (B)\n", " advantages: torch.Tensor # (B)\n", " returns: torch.Tensor # (B)\n", "\n", " def __len__(self):\n", " return self.states.shape[0]\n", "\n", " def __getitem__(self, key):\n", " \"\"\"Ability to slice a learning batch in mini-batches.\"\"\"\n", " return LearningBatch(self.states[key], self.actions[key], self.logprobs[key],\n", " self.advantages[key], self.returns[key])" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "class Batcher:\n", " \"\"\"Helper class to perform mini-batch learning after a trajectory segment is collected.\n", "\n", " In particular, the Batcher:\n", " * stores the data relevant for learning (segment, advantages, returns)\n", " * flattens learning data (thanks to the Markovian property)\n", " * shuffles the (flattened) segment and iterates through it in mini-batches.\n", " \"\"\"\n", " def __init__(self, seg: TrajectorySegment, advantages: torch.Tensor, returns: torch.Tensor,\n", " n_mini_batches: int):\n", " # TODO: Calculate batch_size as the product of the length of rollout times num_bots.\n", " # Hint: those values are the sizes of the first two dimension of `states`!\n", " self.batch_size = None\n", " # TODO: Calculate mini_batch_size as the integer division of batch_size and n_mini_batches.\n", " self.mini_batch_size = None\n", " # TODO: Flatten the states, actions, logprobs in the trajectory, plus advantages and returns\n", " # in a single LearningBatch that will be sliced. Hint: use the `flatten` function\n", " # defined below.\n", " self.experiences = None\n", "\n", " def shuffle(self):\n", " \"\"\"Shuffles the learning data and returns a new mini-batch iterator.\"\"\"\n", " # TODO: Get all the indices in the batch and shuffle them. Hint: np.random.shuffle\n", " # ...\n", " # TODO: Return a Batcher.MiniBatchIterator on the shuffled indices.\n", " return None\n", "\n", " @staticmethod\n", " def flatten(t: tuple[torch.Tensor, ...]) -> tuple:\n", " \"\"\"Utility function to flatten a multi-agents / bots trajectory segment.\n", "\n", " In particular input tensors have shape (segment_length, num_bots, ...), and they\n", " are flattened to (segment_length * num_bots, ...).\n", "\n", " That is useful to probe independent experiences (Markovian) from a trajectory.\n", " \"\"\"\n", " return tuple(x.flatten(0, 1) for x in t)\n", "\n", " class MiniBatchIterator:\n", " \"\"\"Iterator for a learning batch that loops over mini-batches.\"\"\"\n", " def __init__(self, experiences: LearningBatch, indices: list[int], mini_batch_size: int):\n", " self.experiences = experiences\n", " self.indices = indices\n", " self.mini_batch_size = mini_batch_size\n", " self.start = 0\n", " \n", " def __iter__(self):\n", " return self\n", "\n", " def __next__(self):\n", " if self.start >= len(self.experiences):\n", " raise StopIteration()\n", " # TODO: Return experiences at the indices[start : (start + mini_batch_size)].\n", " # Hint: make sure to update self.start before returning :)\n", " # ...\n", " return None" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## PPO Agent\n", "\n", "Let's implement the PPO agent! In particular, we will implement the loss function as described in\n", "the paper:\n", "\n", "$$\n", "L_t^{CLIP + VF + S}(\\theta) = \\hat{\\mathbb{E}_t}[L_t^{CLIP}(\\theta) + c_1 L_t^{VF}(\\theta) + c_2 S[\\pi_\\theta](s_t)]\n", "$$\n", "\n", "Which is a combined: policy loss (actor) + value loss (critic) + entropy loss. It is important to\n", "optimize a single loss function when the network share layers and weights of the networks (but we\n", "won't do it here).\n", "\n", "### Clipping\n", "\n", "As mentioned above, we want the policies to remain \"similar\". To do that, we clip the _ratio_ in the\n", "interval $[1 - \\epsilon, 1 + \\epsilon]$, as well as the total gradient of the loss. Check out more\n", "details about clipping in the paper itself!" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "class Agent(nn.Module):\n", " \"\"\"The PPO agent implementing actor-critic learning.\"\"\"\n", " def __init__(self, state_size, action_size, lr=2.5e-4, weight_mul=1e-3):\n", " super().__init__()\n", "\n", " def layer_init(layer, std=np.sqrt(2)):\n", " \"\"\"Layer initialization for the neural-network linear layers.\n", "\n", " Scaling the weights might affect learning speed.\n", " \"\"\"\n", " torch.nn.init.orthogonal_(layer.weight, std)\n", " torch.nn.init.constant_(layer.bias, 0.0)\n", " layer.weight.data.mul_(weight_mul)\n", " return layer\n", "\n", " # Critic network\n", " # TODO: Make a Sequential network with three linear layers (state_size, 32, 1), with tanh\n", " # non-linearity. Use `layer_init` above to initialize linear layers. The output is\n", " # directly the output of the last linear layer.\n", " self.critic = None\n", "\n", " # Actor network for the mean... and (log)std deviation parameter below.\n", " # TODO: Make a Sequential network with three linear layers (state_size, 32, action_size),\n", " # with tanh non-linearity. Use `layer_init` above to initialize linear layers. The\n", " # output is directly the output of the last linear layer.\n", " self.actor_mean = None\n", " # TODO: Create a Parameter for the actor logstd.\n", " self.actor_logstd = None\n", " # TODO: Create the Adam optimizer.\n", " self.optimizer = None\n", "\n", " @torch.no_grad()\n", " def act(self, state: np.array):\n", " \"\"\"Returns a single action (assuming a single agent) for simulation purposes.\"\"\"\n", " action, _ = self.sample_action(torch.from_numpy(state).unsqueeze(0))\n", " return action.numpy()[0]\n", "\n", " def get_value(self, states):\n", " \"\"\"Returns the estimated value of a state given by the critic.\"\"\"\n", " # TODO: Return the value from the critic network.\n", " pass\n", "\n", " def get_action_probs(self, states) -> torch.distributions.Distribution:\n", " \"\"\"Returns the probability distribution over the action space.\n", "\n", " Generally, policy-gradient methods assume the continuous actions are sampled from a normal\n", " distribution (hence, our neural network outputs mean and std of the gaussian).\n", " \"\"\"\n", " # TODO: Get the action mean via the actor network.\n", " action_mean = None\n", " # TODO: Get the logstd from the parameter. Hint: use `expand_as` the action_mean\n", " action_logstd = None\n", " # TODO: Get the standard deviation exponentiating the logstd.\n", " action_std = None\n", " # TODO: Return a Normal distribution based on mean and std.\n", " return None\n", "\n", " def sample_action(self, states):\n", " \"\"\"Samples an action using the current policy for the `states` passed as input.\n", "\n", " It returns the action itself, and its log-probability over the action space.\n", " \"\"\"\n", " # TODO: Get the action probabilities calling `get_action_probs`.\n", " probs = None\n", " # TODO: Sample the action.\n", " action = None\n", " # TODO: Return the action and the log-probability of the action. Hint: you may want to sum\n", " # on the first dimention... do you know why?\n", " return None\n", "\n", " def eval_action(self, states, action):\n", " \"\"\"Evaluates an action using the *current* (possibly updated) policy.\n", "\n", " It returns the log-probability of the action, along with the entropy (for entropy loss). \n", " \"\"\"\n", " # TODO: Get the action probabilities for the state.\n", " probs = None\n", " # TODO: Return the logprob of the action, as well as the entropy.\n", " return None\n", "\n", " def learn(self, batch: LearningBatch, entropy_coeff=0.01, vf_coeff=0.5, clip_coeff=0.1,\n", " max_grad_norm=0.75):\n", " \"\"\"PPO learning step on a mini-batch. Paper: https://arxiv.org/abs/1707.06347.\"\"\"\n", " # TODO: Get the newlogprobs and entropy via eval_action (of the most recent policy).\n", " newlogprobs, entropy = None\n", "\n", " # TODO: Compute the ratio for the surrogate function. Hint: with logs division becomes\n", " # subtraction (we have newlogprobs and the batch.logprobs)...so that we can just\n", " # exponentiate afterwards...\n", " ratio = None\n", " # TODO: Compute the clipped ratio between [1 - clip_coeff, 1 + clip_coeff].\n", " # Hint: you may use torch.clamp\n", " clipped_ratio = None\n", " # TODO: Get the advantages from the batch for convenience.\n", " advantages = None\n", "\n", " # Policy loss. The actor effectively maximizes the advantages scaled by the probability\n", " # ratio (~reweighting factor in importance sampling to be able to share previous experiences\n", " # in a new version of the policy) clipped to effectively keep the policy updates in the\n", " # vicinity of the previous version.\n", " # TODO: Compute entropy loss as mean of the entropy times the entropy coefficient.\n", " L_entropy = None\n", " # TODO: Compute the clipped loss as the min of the advantages times ratio and clipped ratio.\n", " # Get the mean, and make sure to _NEGATE_ it.\n", " L_clipped = None\n", " # TODO: Compute the actor loss as difference between clipped loss and entropy loss.\n", " L_actor = None\n", "\n", " # TODO: Compute the value loss as the mean squared error of the predicted values vs. the\n", " # actual returns.\n", " L_critic = None\n", "\n", " # TODO: Compute the PPO loss as sum of actor loss plus critic loss times its coefficient.\n", " L_ppo = None\n", " # TODO: zero_grad the optimizer and call backward.\n", " # ...\n", " # TODO: Clip the gradients too to `max_grad_norm`, to avoid too large of an update.\n", " # Hint: use nn.utils.clip_grad_norm_ on the parameters.\n", " # ...\n", " # TODO: Run a step of the optimizer.\n", " # ..." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Advantages and Returns\n", "\n", "Recall that intuitively the _advantage_ tells us how much better we are doing taking action $a$ in\n", "state $s$ compared to the \"value\" of that state (which itself represents the expected return):\n", "\n", "$$\n", "A(s_t, a_t) = G_t - V(s_t)\n", "$$\n", "\n", "That is the usual / \"basic\" way of computing advantages: it uses the entire trajectory to compute\n", "the return (in a Monte Carlo fashion). To reduce _variance_, we might want to use bootstrapping and\n", "a current estimate instead. GAE allows to do that, tuning via an extra parameter $\\lambda^{GAE}$.\n", "\n", "The code for advantages and returns computation is provided. You can easily map the\n", "`basic_advantages_and_returns` method to the formula above, and use the GAE paper to instead dig\n", "deeper into `gae_advantages_and_returns` :)\n" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "@torch.no_grad()\n", "def basic_advantages_and_returns(segment: TrajectorySegment, next_return: torch.Tensor, gamma=0.99):\n", " \"\"\"Computes returns and advantages for an segment in the \"standard\" way.\"\"\"\n", " returns = torch.zeros_like(segment.rewards).to(DEVICE).detach()\n", "\n", " for t in reversed(range(len(segment))):\n", " next_non_terminal = 1.0 - segment.dones[t]\n", " # G_t = R_t + gamma * R_t+1 + gamma^2 * R_t+2 + ...\n", " returns[t] = segment.rewards[t] + gamma * next_non_terminal * next_return\n", " # Reset the next_return if an episode terminates half-way (per bot).\n", " next_return = returns[t] * next_non_terminal\n", "\n", " advantages = returns - segment.values\n", " return advantages, returns" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "@torch.no_grad()\n", "def gae_advantages_and_returns(segment: TrajectorySegment, next_return: torch.Tensor, gamma=0.999,\n", " gae_lambda=0.98):\n", " \"\"\"Compute advantages via generalized advantage estimation (GAE).\"\"\"\n", " advantages = torch.zeros_like(segment.rewards).to(DEVICE)\n", " last_gae_lambda = 0\n", "\n", " for t in reversed(range(len(segment))):\n", " next_non_terminal = 1.0 - segment.dones[t]\n", " td_error = segment.rewards[t] + (\n", " gamma * next_return * next_non_terminal) - segment.values[t]\n", " advantages[t] = td_error + gamma * gae_lambda * next_non_terminal * last_gae_lambda\n", " next_return = segment.values[t]\n", " # Reset the last_gae_lambda if an episode terminates half-way (per bot).\n", " last_gae_lambda = advantages[t] * next_non_terminal\n", "\n", " returns = advantages + segment.values\n", " return advantages, returns" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## PPO Training Loop\n", "\n", "No surprises here! The loop is basically:\n", "\n", " * Collect a trajectory segment.\n", " * Compute advantages and returns.\n", " * Learn in mini-batches for $n$ epochs.\n", " * Repeat until the environment solved (or max steps)." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "class PPO:\n", " \"\"\"PPO implementation.\"\"\"\n", " def __init__(self, env: gym.vector.VectorEnv, agent: Agent, rollout_size=1_024,\n", " mini_batch_size=64, n_update_epochs=4, gae_enabled=True, solved_score = 200.0):\n", " self.env = env\n", " self.agent = agent.to(DEVICE)\n", " self.num_bots = env.num_envs\n", " self.n_mini_batches = (rollout_size * self.num_bots) // mini_batch_size\n", " self.n_update_epochs = n_update_epochs\n", " self.gae_enabled = gae_enabled\n", " self.action_size = env.action_space.shape[1]\n", " self.state_size = env.observation_space.shape[1]\n", " self.rollout_size = rollout_size\n", " self.solved_score = solved_score \n", " self.n_episode = 0 # Number of episode (across all bots)\n", "\n", " def train(self) -> list[float]:\n", " \"\"\"PPO training loop.\"\"\"\n", " # TODO: Reset the environment.\n", " start_state, _ = None\n", " # TODO: Convert the start state into a tensor.\n", " start_state = None\n", "\n", " while True:\n", " # Policy rollout. Given the \"vectorized\" environment that collects experiences from\n", " # a different number of bots (10 in this example), we reduce noise. \n", " # TODO: Collect a trajectory segment calling `collect_trajectory_segment`.\n", " segment = None\n", "\n", " # Advantages and returns computation for the learning phase (GAE or standard).\n", " # TODO: Compute the `next_return` (estimate) using the critic (i.e., the agent get_value\n", " # method).\n", " next_return = None\n", " # TODO: Gets the advantages and returns using one of the methods above. Hint: GAE is\n", " # usually a better choice :)\n", " advantages, returns = None\n", "\n", " # Policy learning. The agent learns on mini-batches provided by the Batcher.\n", " # TODO: Create a Batcher for mini-batch learning.\n", " batcher = None\n", " for _ in range(self.n_update_epochs):\n", " # TODO: Iterate over mini_batch via the Batcher shuffle() method, and call `learn`\n", " # on the agent.\n", " pass\n", "\n", " # Prepare for next rollout.\n", " start_state = segment.next_start_state\n", "\n", " # Checking scores and overall episode.\n", " if self.training_checkpoint(segment):\n", " break\n", " \n", " def collect_trajectory_segment(self, start_state):\n", " \"\"\"Collect a trajectory segment for a round of PPO policy rollout.\"\"\"\n", " batch_dim = (self.rollout_size, self.num_bots)\n", "\n", " s_states = torch.zeros(batch_dim + (self.state_size,)).to(DEVICE)\n", " s_actions = torch.zeros(batch_dim + (self.action_size,)).to(DEVICE)\n", " s_logprobs = torch.zeros(batch_dim).to(DEVICE)\n", " s_values = torch.zeros(batch_dim).to(DEVICE)\n", " s_rewards = torch.zeros(batch_dim).to(DEVICE)\n", " s_dones = torch.zeros(batch_dim).to(DEVICE)\n", "\n", " state = start_state\n", " for step in range(self.rollout_size):\n", " with torch.no_grad(): # Do not track gradients on policy rollout.\n", " # TODO: sample_action from the agent.\n", " action, logprob = None\n", " # TODO: clip the action between -1 and 1.\n", " # https://gymnasium.farama.org/environments/box2d/lunar_lander/\n", " clipped_action = None\n", " # TODO: get_value from the agent.\n", " value = None\n", "\n", " # TODO: Make a step in the environment.\n", " # NOTE: if done, the next_state is the new state from which to start a new episode.\n", " next_state, reward, term, trunc, _ = None\n", "\n", " s_states[step] = state\n", " s_actions[step] = action\n", " s_logprobs[step] = logprob\n", " s_values[step] = value.flatten()\n", " s_rewards[step] = torch.Tensor(reward).to(DEVICE)\n", " # dones[t] corresponds to whether the state[t+1] was done. But if that's the case, we\n", " # do not store such state b/c what the environment returns as `next_state` is the reset\n", " # start state for the new episode. This is relevant for advantage / return computation.\n", " # https://gymnasium.farama.org/api/vector/#gymnasium-vector-vectorenv\n", " s_dones[step] = torch.Tensor(np.logical_or(term, trunc)).to(DEVICE)\n", "\n", " state = torch.Tensor(next_state).to(DEVICE) # roll over states to next time step\n", "\n", " # TODO: Return a new trajectory segment.\n", " return None\n", "\n", " @torch.no_grad()\n", " def training_checkpoint(self, segment: TrajectorySegment) -> bool:\n", " \"\"\"Print statistics and determines whether to terminate training.\"\"\"\n", " for t in range(len(segment)):\n", " for b in range(self.num_bots):\n", " is_episode_end = segment.dones[t, b] == True\n", " if is_episode_end:\n", " self.n_episode += 1\n", " avg_score = np.mean(self.env.return_queue)\n", " print(f'\\rEpisode {self.n_episode}\\tAverage Score: {avg_score:.2f}',\n", " end=\"\\n\" if self.n_episode % 250 == 0 else \"\")\n", " if avg_score > self.solved_score:\n", " print(f'\\rEpisode {self.n_episode} solved environment! ' +\n", " f'Average Score: {avg_score:.2f}')\n", " return True\n", " return False" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "vector_env = gym.vector.make(\"LunarLander-v2\", num_envs=10, **lunar_lander_kwargs)\n", "vector_env = gym.wrappers.RecordEpisodeStatistics(vector_env)\n", "with init_random(vector_env) as env:\n", " agent = Agent(env.observation_space.shape[1], env.action_space.shape[1])\n", " ppo = PPO(env, agent)\n", " ppo.train()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "gym_simulation(\"LunarLander-v2\", agent, env_kwargs=lunar_lander_kwargs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Appendix\n", "\n", "### Off-Policy Policy Gradient (Intuition)\n", "\n", "Recall that the objective of reinforcement learning is to maximize the _expected_ return. Hence, we\n", "work with expectations.\n", "\n", "If we want to find out the expected value for a function evaluated on multiple trajectories (e.g.,\n", "the return of a reinforcement learning environment), we just go over all trajectories and multiply\n", "by the probability of that trajectory:\n", "\n", "$$\n", "\\mathbb{E}[R] = \\sum_\\tau P(\\tau)r(\\tau)\n", "$$\n", "\n", "Where $r$ gives you the return for the trajectory $\\tau$ and $P$ its probability. Let say that the\n", "problem is _parameterized_ by the parameters $\\theta$, the equation becomes:\n", "\n", "$$\n", "\\mathbb{E}[R_\\theta] = \\sum_\\tau P_{\\theta}(\\tau)r(\\tau)\n", "$$\n", "\n", "This becomes interesting if we consider new parameters $\\theta'$. Let's assume the sampling of\n", "$r(\\tau)$ happened under old parameters $\\theta$, but we now have updated the parameters to\n", "$\\theta'$. We cannot compute the _expected value_ just using the old $r$ with the new $P$, but what\n", "we can do is use a trick:\n", "\n", "$$\n", "\\mathbb{E}[R_{\\theta'}] = \\sum_\\tau P_{\\theta}(\\tau) \\frac{P_{\\theta'}(\\tau)}{P_{\\theta}(\\tau)} r(\\tau)\n", "$$\n", "\n", "The $\\frac{P_{\\theta'}(\\tau)}{P_{\\theta}(\\tau)}$ is called the _reweighting factor_, and we can\n", "compute updated _expectations_ using old sampled data (technique called _importance sampling_).\n", "\n", "-----\n", "\n", "Let's go back to the policy gradient update, the gradient $g$ is:\n", "\n", "$$\n", "g = \\sum_t \\nabla_{\\theta} log\\pi_{\\theta}(a_t | s_t) R_t^{future}\n", "$$\n", "\n", "Mathematically, the gradient of $log$ is equivalent to:\n", "\n", "$$\n", "g = \\sum_t \\frac{\\nabla_{\\theta} \\pi_{\\theta}(a_t | s_t)}{\\pi_{\\theta}(a_t | s_t)} R_t^{future}\n", "$$\n", "\n", "Now let's assume we have trajectories (hence, $R_t$) collected under an old policy $\\pi_{\\theta'}$.\n", "How do we compute the gradient? We can use the reweighting factor:\n", "\n", "$$\n", "g = \\frac{P_{\\theta}(\\tau)}{P_{\\theta'}(\\tau)} \\sum_t \\frac{\\nabla_{\\theta} \\pi_{\\theta}(a_t | s_t)}{\\pi_{\\theta}(a_t | s_t)} R_t^{future}\n", "$$\n", "\n", "$P$ represents the probability of a trajectory under a certain parameter $\\theta$:\n", "\n", "$$\n", "P_{\\theta}(\\tau) = p(s_1) \\prod_{t=1}^T \\pi_{\\theta}(a_t|s_t)p(s_{t+1}|s_t,a_t)\n", "$$\n", "\n", "Given that the dynamic of the MDP system (i.e., probabilities $p$) are the same, we get:\n", "\n", "\n", "$$\n", "\\frac{P_{\\theta}(\\tau)}{P_{\\theta'}(\\tau)} = \\prod_{t=1}^T \\frac{\\pi_{\\theta}(a_t|s_t)}{\\pi_{\\theta'}(a_t|s_t)}\n", "$$\n", "\n", "Let's integrate this term into the gradient considering causality (we discard the future term to be\n", "integrated with the reward term b/c it turns out we obtain still a good approximation):\n", "\n", "$$\n", "g = \\sum_t \\frac{\\nabla_{\\theta} \\pi_{\\theta}(a_t | s_t)}{\\pi_{\\theta}(a_t | s_t)}\n", " \\prod_{t'=1}^t \\frac{\\pi_{\\theta}(a_{t'}|s_{t'})}{\\pi_{\\theta'}(a_{t'}|s_{t'})}\n", " R_t^{future}\n", "$$\n", "\n", "That doesn't help much per-se. But if we think about what that fraction represents, we can replace\n", "it with the _marginal_ probabilities of the overall dynamics under the parameterized policies:\n", "\n", "$$\n", "g = \\sum_t \\frac{\\nabla_{\\theta} \\pi_{\\theta}(a_t | s_t)}{\\pi_{\\theta}(a_t | s_t)}\n", " \\frac{\\pi_{\\theta}^{mrg}(s_t, a_t)}{\\pi_{\\theta'}^{mrg}(s_t, a_t)}\n", " R_t^{future}\n", "$$\n", "\n", "This still is not useful by itself: to compute the _marginals_ we would need access to the entire\n", "dynamic of the system. But we can rewrite that in this form (by chain rule):\n", "\n", "$$\n", "g = \\sum_t \\frac{\\nabla_{\\theta} \\pi_{\\theta}(a_t | s_t)}{\\cancel{\\pi_{\\theta}(a_t | s_t)}}\n", " \\frac{\\pi_{\\theta}^{mrg}(s_t)}{\\pi_{\\theta'}^{mrg}(s_t)}\n", " \\frac{\\cancel{\\pi_{\\theta}(a_t|s_t)}}{\\pi_{\\theta'}(a_t|s_t)}\n", " R_t^{future}\n", "$$\n", "\n", "Suddenly, if we could just ignore the _state marginals_ we would get a manageble update for\n", "off-policy / importance-sampling policy gradient. [It turns out](https://www.youtube.com/watch?v=LtAt5M_a0dI&list=PL_iWQOsE6TfX7MaC6C3HcdOf1g337dlC9&index=40)\n", "that, if the policies are \"close\" enough (this is where \"proximity\" plays a critical role), we can\n", "indeed ignore those marginals (in the sense that we get a bounded error). Which gives us the final\n", "update:\n", "\n", "$$\n", "g = \\nabla_{\\theta} \\sum_t \\frac{\\pi_{\\theta}(a_t | s_t)}{\\pi_{\\theta'}(a_t | s_t)} R_t^{future}\n", "$$\n", "\n", "For a formal explanation, see [CS 285: Lecture 5](https://www.youtube.com/watch?v=KZd508qGFt0&list=PL_iWQOsE6TfX7MaC6C3HcdOf1g337dlC9&index=20)." ] } ], "metadata": { "kernelspec": { "display_name": "drlzh", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.7" } }, "nbformat": 4, "nbformat_minor": 2 }