(A Deep Dive Into Reinforcement Learning)
It’s 20–20, and the first player to 21 wins. “ I’m going to beat you, Daniel,” says your little brother. You serve. And you lose. This is the reality for many people, including myself. So I decided to make an AI agent that could win for me.
In this project, I decided to replicate what google deep mind did on their Reinforcement Learning Paper from 2015. I got a lot of my guidance from this Udemy Course on Deep Reinforcement Learning so if you are interested in going deeper I would suggest checking that out here.
Since I do not have the same computational power and amount of time to spend training, I did not achieve quite the same results; however, you will see at the end that there is unequivocal proof of learning.
Before reading this article, I highly recommend you checking out my other article, “Introduction To Reinforcement Learning.” Because most of the things talked about here, I have already explained.
I have broken this article into two different sections. I will give a brief overview of reinforcement learning and the concepts incorporated into the code. The second half of this article will be the actual code that I implemented. Feel free to skip that part if you do not want to get into all of the details.
So What Really Is Reinforcement Learning?
To put it simply, it’s the trial and error of artificial intelligence. The goal is to create an algorithm (Agent) to learn from its experiences so that it can perform a certain task. In this example, the goal is to learn how to play Atari games.
How it works
The Agent learns from its experiences through what is called the Markov Decision Process(MDP).
The agent takes an Action(A) in a state(S) and is given a reward(R). It is then given an updated state prime(St+1).
Optimal Policy
It is important to understand that once the agent has figured out a way to complete a task in an optimal way, that way is called the Policy.
Let Me Give An Example
Imagine you are a baby first trying to walk. In this case, the agent would be the baby. The environment would be the room/place that you are trying to walk in. The state would be at any given point in time. And finally, the actions would be movements of your body. The baby quickly can learn that performing some actions will cause negative rewards(Falling over, for example). The Agent will also figure out that performing some actions give it some success. Eventually, after a given number of tries, the baby will have learned how to walk.
Q Learning
Now the way that normal Reinforcement learning works is that we can record every single action and reward from each state into a table known as the Q table.
This can work well for environments that do not have that many different possibilities. However, it can run into problems when dealing with much larger environments such as the Atari Library. It would just take too much space and time to map out every single outcome.
In Comes Deep Q Learning
To fix this problem of time and space, we use neural networks to approximate the value of each action. Then the agent selects the action with the highest value.
What are Neural Networks
Neural Networks are function approximators. They take as input any given input, in our case, screen pixels. And then they can output something that you want to learn about the data that is inputted. In our case, we want to know what value each action has during a given state. Because we are just estimating values, and we are not mapping out every single combination, the neural network predicts each action’s value based on a target. Depending on how the agent predicted the values relative to that target, it then uses backpropagation to update different coefficients. These coefficients are called weights and biases. They can be randomly selected at the beginning of training; however, when the agent progresses, the weights and biases will become better and lead the algorithm to have more accurate predictions.
The Loss Function
The neural networks update their weights and biases by combining a loss function and stochastic gradient descent.
First, the neural network will approximate each action’s Q-values, which is called the approximated values of the Q-Network (The Q(s, a) in the loss function below). You will then find the error between that and a Q-learning target, which is just a reward from the state plus the Q-value of the max action it can take after that. (The r + ymax Q(s’, a’) in the loss function below)
Once you have calculated the loss, you will multiply it by the gradient (or partial derivatives) of all the parameters (weights and biases). And then multiply that by the learning rate. (The learning rate is just another parameter that is used to tell you how much to move the weights and biases each time)
Eventually, after a high number of iterations, you will have optimal parameters that allow you to predict the correct values for each action.
What Happens Once We Made The Prediction
This next part is simple. Once the neural network has approximated each action’s value in a certain state based on a target, the agent will take the action with the max value.
However, since we want the agent to explore the environment, we set an epsilon parameter, which determines whether the agent will take a greedy action or a random action. This starts at 1.0 at the beginning. However, it decreases to 0.1 after a certain amount of training steps. This is called an e-greedy policy. When the epsilon value is higher, then the agent will take more random actions.
Eventually, after the agent has iterated through hundreds of thousands of training steps, it can predict what actions have the highest value and will be able to play atari video games!
Time to code
The code is broken into different sections. The Utils file, the Agent’s Memory, the Neural Network Architecture, the Agent, and then the Main file.
But before everything else, like always, we need to do our imports.
import numpy as np
import gym
import atari_py
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import os
import collections
import cv2
import matplotlib.pyplot as plt
import imageio
Utils File
Before doing any of the coding, I had to deal with the preprocessing of the frames.
These are the things that had to be changed before we could actually play Atari games.
- Sometimes in the open AI gym environments, there is flickering of frames, so I had to take the max value of the two previous frames.
- Repeating the action 4 times.
- Deal with the fact that the Pytorch expects images to be returned with channels first, and the OpenAI gym returns images with channels last.
- This includes changing the image from 3 channels(color) to 1 channel(grayscale).
- Resizing the images from 210x160x3 to 84x84x1.
- Scaling the inputs of the pixels from 0–255 to 0–1
- Stacking the previous 4 frames. This is done so that the agent can see which way the ball and paddles are moving. (how would you be able to know what is going on in a game if there is just one picture of a frame)
The Agent’s Memory
The agent chooses which states it should take an action through what is called experience replay. The agent fills up its Replay Memory with the states, actions, rewards, new states, and dones (if it is a terminal state or not, the meaning is it the final state before the agent loses/dies). It then randomly samples from this memory in batch sizes of 32.
There are two key reasons why this is done.
- Avoids correlations between data points
- Much more efficient use of training data
First, we have to create empty arrays of zeros for the state memory, new state memory, action memory, reward memory, and the terminal memory.
class ReplayBuffer():
def __init__(self, max_size, input_shape, n_actions):self.mem_size = max_size
self.mem_cntr = 0self.state_memory = np.zeros((self.mem_size, *input_shape), dtype=np.float32)self.new_state_memory = np.zeros((self.mem_size, *input_shape), dtype=np.float32)self.action_memory = np.zeros(self.mem_size, dtype=np.int64)
self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
self.terminal_memory = np.zeros(self.mem_size, dtype=np.uint8)
We then have to write code that will store each transition into the empty array of zeros that we just created.
def store_transition(self, state, action, reward, state_, done):index = self.mem_cntr % self.mem_size
self.state_memory[index] = state
self.action_memory[index] = action
self.reward_memory[index] = reward
self.new_state_memory[index] = state_
self.terminal_memory[index] = doneself.mem_cntr += 1
This next function allows us to sample states, actions, rewards, and dones in batch sizes of 32 from our replay memory.
def sample_buffer(self, batch_size):max_mem = min(self.mem_cntr, self.mem_size)
batch = np.random.choice(max_mem, batch_size, replace=False)states = self.state_memory[batch]
actions = self.action_memory[batch]
rewards = self.reward_memory[batch]
states_ = self.new_state_memory[batch]
dones = self.terminal_memory[batch]return states, actions, rewards, states_, dones
The Neural Network Architecture
For the architecture and other parameters, I decided to copy off of google DeepMinds Deep learning Paper.
To start, you have 3 convolutional layers. The first layer takes the frames input dimensions. The 32 stands for the number of outgoing filters, with an 8×8 kernel and a stride of 4. The second layer takes 32 input filters and outputs 64. This layer has a 4×4 kernel and a stride of 2. The final Convolutional layer takes in the 64, outputs 64 again, with a 3×3 kernel and a stride of 1. In between all three of these convolutional layers, we also Relu activation functions.
self.conv1 = nn.Conv2d(input_dims[0], 32, 8, stride=4)self.conv2 = nn.Conv2d(32, 64, 4, stride=2)self.conv3 = nn.Conv2d(64, 64, 3, stride=1)fc_input_dims = self.calculate_conv_output_dims(input_dims)
After the three convolutional layers, we perform a function on the convolutional layers’ final output to be inputted into the first of two fully connected layers.
def calculate_conv_output_dims(self, input_dims):state = T.zeros(1, *input_dims)dims1 = self.conv1(state)
dims2 = self.conv2(dims1)
dims3 = self.conv3(dims2)return int(np.prod(dims3.size()))
Like I mentioned, the first fully connected layer takes n number of input dimensions and then outputs 512 units. The second fully connected layer takes the 512 units as input and then outputs the number of actions.
self.fc1 = nn.Linear(fc_input_dims, 512)self.fc2 = nn.Linear(512, n_actions)
In this section of my code, I also set the loss function to MSELoss(Mean Squared Error) and the optimizer to RMSprop(Root Mean Squared Propagation). I then check to see if Cuda is available and if so, move everything onto my GPU.
self.optimizer = optim.RMSprop(self.parameters(), lr=lr)self.loss = nn.MSELoss()self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')self.to(self.device)
After all of this is initialized, I can create my forward function that ties it all together. This then returns the value of each of the possible actions. As you can see, I used a Relu activation function in between all of the layers. This is used because you want to scale the new inputs between layers so that the agent is not receiving huge numbers.
def forward(self, state):conv1 = F.relu(self.conv1(state))
conv2 = F.relu(self.conv2(conv1))
conv3 = F.relu(self.conv3(conv2))conv_state = conv3.view(conv3.size()[0], -1)flat1 = F.relu(self.fc1(conv_state))
actions = self.fc2(flat1)return actions
The Agent
In this section of the code, I had to initialize all of the different parameters. Them being:
self.lr = lr
- Gamma(discount of future rewards)
self.gamma = gamma
- Epsilon(whether the agent takes greedy actions or random actions)
self.epsilon = epsilon
self.input_dims = input_dims
self.batch_size = batch_size
- The minimum value of epsilon and how much it decreases by
self.eps_min = eps_min
- When to replace the target network
self.replace_target_cnt = replace
self.env_name = env_name
self.algo = algo
- A learn step counter (which is used to determine when to update the weights of the target network with the weights of the evaluation network)
self.learn_step_counter = 0
self.n_actions = actions
self.eps_dec = eps_dec
self.chkpt_dir = chkpt_dir
self.action_space = [i for i in range(self.n_actions)]
After initializing all the parameters, I created the target network and the evaluation network. This is using a technique known as using Fixed Target Networks.
This is done so that the same weights and biases are not being updated and predicting target and new values simultaneously. So what we do instead is freeze the parameters for the target network for an x amount of steps ( I set this to 1000 steps). Once these steps are over, you will update the target network with the evaluation network’s weights and biases.
self.q_eval = DeepQNetwork(self.lr, self.n_actions,input_dims=self.input_dims,name=self.env_name+'_'+self.algo+'_q_eval',chkpt_dir=self.chkpt_dir)self.q_next = DeepQNetwork(self.lr, self.n_actions,input_dims=self.input_dims,name=self.env_name+'_'+self.algo+'_q_next',chkpt_dir=self.chkpt_dir)
After initializing the target and evaluation network, I defined the first “choose action function” that takes the observation(state) as input. It then returns the action taken based on whether it is a greedy action(max value action)or not (random action).
def choose_action(self, observation):
if np.random.random() > self.epsilon:state = T.tensor([observation],dtype=T.float).to(self.q_eval.device)
actions = self.q_eval.forward(state)
action = T.argmax(actions).item()else:
action = np.random.choice(self.action_space)return action
After taking the action, I then store the transition in the replay memory and write a function to sample from the replay memory.
def store_transition(self, state, action, reward, state_, done):
self.memory.store_transition(state, action, reward, state_, done)def sample_memory(self):
state, action, reward, new_state, done = self.memory.sample_buffer(self.batch_size)states = T.tensor(state).to(self.q_eval.device)
rewards = T.tensor(reward).to(self.q_eval.device)
dones = T.tensor(done).to(self.q_eval.device)
actions = T.tensor(action).to(self.q_eval.device)
states_ = T.tensor(new_state).to(self.q_eval.device)
return states, actions, rewards, states_, dones
This next function tells the algorithm when to update the target network weights and biases. Like I mentioned before, this is done every 1000 steps.
def replace_target_network(self):if self.learn_step_counter % self.replace_target_cnt == 0:self.q_next.load_state_dict(self.q_eval.state_dict())
Next, we have a function to save and load the models.
def save_models(self):
self.q_eval.save_checkpoint()
self.q_next.save_checkpoint()def load_models(self):
self.q_eval.load_checkpoint()
self.q_next.load_checkpoint()
And finally, we define the learn function that ties all of this together.
In the first part of the learn function, we zero out the gradients, and if needed, we replace the weights of the target network.
def learn(self):
if self.memory.mem_cntr < self.batch_size:
returnself.q_eval.optimizer.zero_grad()
self.replace_target_network()
Next, we have to deal with the actual calculations of the two different networks. We first sample 32 different states, actions, rewards states_, and done, from our replay memory and arrange them by their indices. We then do a forward pass on each network and receive the actual and predicted action values in each state and the max action from each state.
At the end, we store the rewards and discounted future values in the q_target variable.
states, actions, rewards, states_, dones = self.sample_memory()indices = np.arange(self.batch_size)q_pred = self.q_eval.forward(states)[indices, actions]
q_next = self.q_next.forward(states_).max(dim=1)[0]q_next[dones] = 0.0
q_target = rewards + self.gamma*q_next
Finally, we send the q_eval(evaluation networks) target and predicted values to the GPU for faster computing and then save it inside a variable called loss.
We then backpropagate using out .backward() function from the PyTorch library.
loss = self.q_eval.loss(q_target, q_pred).to(self.q_eval.device)
loss.backward()self.q_eval.optimizer.step()
self.learn_step_counter += 1self.decrement_epsilon()
In the end, we do the optimizer step, which changes all the weights and biases based on the loss function. Then we increase the learn_step_counter by 1 and decrement epsilon.
Main File
Almost there!
The first thing that we have to do is create our environment, initiate a best score so that the model saves, and initialize all of our parameters.
if __name__ == '__main__':
env = make_env('PongNoFrameskip-v4')
best_score = -np.inf
load_checkpoint = False
n_games = 500
agent = DQNAgent(gamma=0.99, epsilon=1.0, lr=0.0001,input_dims=(env.observation_space.shape),n_actions=env.action_space.n, mem_size=30000, eps_min=0.1,batch_size=32, replace=1000, eps_dec=1e-5,chkpt_dir='models/', algo='DQNAgent',env_name='PongNoFrameskip-v4')
Here you can see that I used the PongNoFrameSkip-v4 as my environment. I made the best score -infinity so that as soon as a new game is played, it will immediately save. I chose to do 500 games. The gamma/discount of future rewards was 0.99; the epsilon(whether to take a random action or not) was set to 1. However, this does decrease over time. The learning rate was 0.001, as mentioned in the paper.
After initialing the parameters, I made variables to store the algorithm and the plots.
fname = agent.algo +'_' + agent.env_name + '_lr' + str(agent.lr) + '_' + '_' + str(n_games) + 'games'figure_file = 'plots/' + fname + '.png'
After that, I set the number of steps to 0 and had three empty lists for the score, the epsilon history, and steps taken.
n_steps = 0
scores, eps_history, steps_array = [], [], []
Now we can start playing our games.
Like always, we have to set done to False, score to 0, and reset the environment.
for i in range(n_games):
done = False
score = 0
observation = env.reset()
Now when done is not false, the agent chooses an action based on the current state. The agent is then given the new state, reward, done(if the state is terminal), and debugging info.
After that, we add the reward to the score, set the new state to state prime, add 1 to the step counter.
while not done:
action = agent.choose_action(observation)
observation_, reward, done, info = env.step(actionscore += reward
observation = observation_
n_steps += 1
Finally, we append the score to the scores variable and the number of steps to the steps_array that we defined earlier and calculated the average score of the previous 100 games.
scores.append(score)
steps_array.append(n_steps)
avg_score = np.mean(scores[-100:])
That finishes off all of the code that we need to implement, and now we can print our information to the terminal.
print('episode ', i, 'score: ', score, 'average score %.1f best score %.1f epsilon %.2f' %(avg_score, best_score, agent.epsilon), 'steps' , n_steps)
This is what it looks like in the terminal.
The final thing that must be done is if the average score is greater than the best score, we update the model. The best_score is then set to be the average score.
if avg_score > best_score:
if not load_checkpoint:
agent.save_models()
best_score = avg_scoreeps_history.append(agent.epsilon)
Results
Pong