* Q-Learning and Monte Carlo search both "worked backwards" from the end
* Wandering around before the goal decreases Q(s,a) by the -1 step penalty
* The first tile to get a meaninful utility is the one adjacent to the goal
* The others are full of wandering
---
## Intuition
* The tile two away from the goal could have a much more negative Q estimate
* Imagine states $A \leftrightarrow B \leftrightarrow Start \leftrightarrow C \leftrightarrow D \leftrightarrow Goal$
* If we have a loop, for example, connecting A and D, exploration can get unlucky
---
## Example
* We initialize Q with -1 for all states and actions
* Let's set $\alpha=1, \gamma=1$ in our update rule:
* $Q(S,A) \leftarrow Q(S,A) + \alpha[R + \gamma \underset{a}{max}Q(S',a) - Q(S,A)]$
* In the first episode, let's say we wander the path: $S \rightarrow C \rightarrow D \rightarrow C \rightarrow S \rightarrow B \rightarrow A \rightarrow D \rightarrow G$
* What Q values do we estimate?
---
## Example
* $S \rightarrow C \rightarrow S \rightarrow B \rightarrow A \rightarrow D \rightarrow G$
* Step 1:
* $Q(S, right) \leftarrow Q(S, right) + [-1 + \underset{a}{max}Q(C,a) - Q(S, right)] = -1 + [-1 + -1 -(-1)] = -2$
* Step 2:
* $Q(C, right) \leftarrow Q(C, right) + [-1 + \underset{a}{max}Q(S,a) - Q(C, right)] = -1 + [-1 + -1 -(-1)] = -2$
---
## Example
* $S \rightarrow C \rightarrow S \rightarrow B \rightarrow A \rightarrow D \rightarrow G$
* Step 2:
* $Q(D, left) \leftarrow Q(S, right) + [-1 + \underset{a}{max}Q(C,a) - Q(S, right)] = -1 + [-1 + -1 -(-1)] = -2$
* Step 3:
* $Q(C, left) \leftarrow Q(C, left) + [-1 + \underset{a}{max}Q(S,a) - Q(C, left)] = -1 + [-1 + -1 -(-1)] = -2$
---
## Example
* $S \rightarrow C \rightarrow S \rightarrow B \rightarrow A \rightarrow D \rightarrow G$
* Step 4:
* $Q(S, left) \leftarrow Q(S, left) + [-1 + \underset{a}{max}Q(B,a) - Q(B, left)] = -1 + [-1 + -1 -(-1)] = -2$
* Step 5:
* $Q(B, left) \leftarrow Q(B, left) + [-1 + \underset{a}{max}Q(A,a) - Q(B, left)] = -1 + [-1 + -1 -(-1)] = -2$
---
## Example
* $S \rightarrow C \rightarrow S \rightarrow B \rightarrow A \rightarrow D \rightarrow G$
* Step 6:
* $Q(A, loop) \leftarrow Q(A, loop) + [-1 + \underset{a}{max}Q(D,a) - Q(A, loop)] = -1 + [-1 + -1 -(-1)] = -2$
* Step 7:
* $Q(D, right) \leftarrow Q(D, left) + [0 + \underset{a}{max}Q(G,a) - Q(D, right)] = -1 + [0 + 0 -(-1)] = 0$
---
## Exploration
* Notice that exploration could have gone right from C, but going left was equally valid
* AFter that, going right from S looked worse than going left from S
* So the rest of this is logical
* If the next episode begins with going left from S, state A's cost will go down to -1
* If the next episode goes from S to C and back to S, they'll look even worse
---
## Exploration
* It is worthwhile to work that out on the board/on pen and paper to be sure you understand the algorithm
* Notice that we actually have enough information to find the better path after the first episode
* We can go right from C to reach D, and right from there to reach the Goal
* Value iteration would find it, but we started with no information, so Q(C, right) is -2
* Despite Q(D, right) being 0
---
## Other States
* We can see that going right twice will reach the goal
* And value iteration would find with with the current Q estimatesx
* This is the point of the planning simulation step
* We gather information with Q-Learning, and update estimates by replaying past experiences
---
## Complex maze example
* Let's say that we have a complicated maze
```
WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW
WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW
WWWW W W W W WWWWW
WWWW W W W W WW
WWWW W W W WWWWWWW W WWWWWWWW
WW W W W W
WW W WWWWWWWWWWWWWWWWWW W W W
WW W WWWWWWWWWWWWWWWWWWWWWW WW W
WW W WWWWWWWWWWWW WW W
WW W WWWWWWWWWWWWWWWWWWWWWWWWWWW W
WW W WWW WWFWWWW WWW W
WW WWWWW WWW W WWWWWWW WW
WW WWWW WWWW W WWWWWWWWWWWWWWW WW
WW WWWW WWW WWWWWWWWWWW WW WW
WW WWW WWW WW WW WW
WW WW WWWW WWWWWWWWWWWWWW WW WW
WW W WWWWWWWWW W WWWW
WW WW WWWW W W W WWWWW
WW WW W W WWWWW
WW WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW
WWSSSSSSWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW
```
---
## Maze Rules
* We can only move up, down, left, right, or stay
* Bumping into a wall leaves us in place
* This makes the problem tractable with previous techniques
---
## Comparison with Q-Learning
* We care about two things
* How quickly do we learn per episode?
* How good are the Q estimates?
---
## Code: Double Q
```python
class DoubleQ:
def __init__(self, actions, default_q, alpha, gamma):
"""
Arguments:
actions (list): Possible actions
default_q (float): Mean of default Q values. Will be added to a uniform value.
alpha (float): Learning rate
gamma (float): Discount value
"""
super(DoubleQ, self).__init__()
# We will dynamically initialize estimates
# The Q tables will store arrays, indexed by action numbers
self.Q1 = {}
self.Q2 = {}
# Notice that while we could have an unlimited number of states, this
# will limit the total number of action since we need to allocate an
# array every time we initialize estimates for a new state.
self.actions = actions
self.alpha = alpha
self.gamma = gamma
self.default_q = default_q
def _verify(self, state):
if state not in self.Q1:
self.Q1[state] = [self.default_q - 0.5 + random.random() for _ in range(len(self.actions))]
self.Q2[state] = [self.default_q - 0.5 + random.random() for _ in range(len(self.actions))]
def updateQ(self, state, new_state, action_idx, reward):
self._verify(state)
self._verify(new_state)
# Update q(s,a) = q(s,a) + alpha(R + gamma*\underset{a}{Q(s',a)} - q(s,a))
# Double Q-learning uses two q lookups to prevent bias
if random.random() < 0.5:
self.Q1[state][action_idx] = self.Q1[state][action_idx] + self.alpha * (reward + self.gamma * self.Q2[new_state][numpy.argmax(self.Q1[new_state])] - self.Q1[state][action_idx])
else:
self.Q2[state][action_idx] = self.Q2[state][action_idx] + self.alpha * (reward + self.gamma * self.Q1[new_state][numpy.argmax(self.Q2[new_state])] - self.Q2[state][action_idx])
def bestAction(self, state):
self._verify(state)
# Get the action with the best Q estimate for the given state
return numpy.argmax([a + b for a,b in zip(self.Q1[state], self.Q2[state])])
def maxReward(self, state):
self._verify(state)
# Get the action with the best Q estimate for the given state
return numpy.max([(a + b)/2 for a,b in zip(self.Q1[state], self.Q2[state])])
def meanReward(self, state):
self._verify(state)
# Get the action with the best Q estimate for the given state
return numpy.mean([(a + b)/2 for a,b in zip(self.Q1[state], self.Q2[state])])
```
---
## Code: Dyna-Q
```python
class DynaQ:
def __init__(self, actions, default_q, alpha, gamma, n_simulate):
"""
Arguments:
actions (list): Possible actions
default_q (float): Mean of default Q values. Will be added to a uniform value.
alpha (float): Learning rate
gamma (float): Discount value
n_simulate (int): Number of simulations to do after each action
"""
super(DynaQ, self).__init__()
self.Q = DoubleQ(actions, default_q, alpha, gamma)
self.model = {}
self.n_simulate = n_simulate
self.actions = actions
def updateQ(self, state, new_state, action_idx, reward):
# Update Q estimates, then run n simulation steps
self.Q.updateQ(state, new_state, action_idx, reward)
if state not in self.model:
self.model[state] = [None] * len(self.actions)
# Here we assume that the same action in the same state will always
# lead to the same new state and reward.
self.model[state][action_idx] = (reward, new_state)
sim_states = random.choices(list(self.model.keys()), k=self.n_simulate)
for sim_state in sim_states:
possible_actions = [idx for idx in range(len(self.actions)) if self.model[sim_state][idx] is not None]
sim_action = random.choice(possible_actions)
R, next_sim_state = self.model[sim_state][sim_action]
self.Q.updateQ(sim_state, next_sim_state, sim_action, R)
def bestAction(self, state):
return self.Q.bestAction(state)
def maxReward(self, state):
return self.Q.maxReward(state)
def meanReward(self, state):
return self.Q.meanReward(state)
```
---
## Learning
```python
###########################################################
# Estimate a policy for a maze using Q-Learning or Dyna-Q #
###########################################################
import argparse
import random
import sys
import numpy
import pickle
import q_map
import maze_sim
def softProbs(target_policy, epsilon, state, actions):
# Get the probability of the soft policy selecting an action in a state
# b(A_t|S_t)
# Our behavioral policy will be similar to the target policy
# Action probabilities sum to 1, with epsilon distributed over all possible actions
action_probs = [epsilon / len(actions)]*len(actions)
action_probs[target_policy.getAction(state)] += 1 - epsilon
return action_probs
def softPolicy(target_policy, epsilon, state, actions):
# Select from one of the possible actions based upon our behavior policy.
action_probs = softProbs(target_policy, epsilon, state, actions)
return random.choices(actions, action_probs, k=1)[0]
class DoubleQ:
def __init__(self, actions, default_q, alpha, gamma):
"""
Arguments:
actions (list): Possible actions
default_q (float): Mean of default Q values. Will be added to a uniform value.
alpha (float): Learning rate
gamma (float): Discount value
"""
super(DoubleQ, self).__init__()
# We will dynamically initialize estimates
# The Q tables will store arrays, indexed by action numbers
self.Q1 = {}
self.Q2 = {}
# Notice that while we could have an unlimited number of states, this
# will limit the total number of action since we need to allocate an
# array every time we initialize estimates for a new state.
self.actions = actions
self.alpha = alpha
self.gamma = gamma
self.default_q = default_q
def _verify(self, state):
if state not in self.Q1:
self.Q1[state] = [self.default_q - 0.5 + random.random() for _ in range(len(self.actions))]
self.Q2[state] = [self.default_q - 0.5 + random.random() for _ in range(len(self.actions))]
def updateQ(self, state, new_state, action_idx, reward):
self._verify(state)
self._verify(new_state)
# Update q(s,a) = q(s,a) + alpha(R + gamma*\underset{a}{Q(s',a)} - q(s,a))
# Double Q-learning uses two q lookups to prevent bias
if random.random() < 0.5:
self.Q1[state][action_idx] = self.Q1[state][action_idx] + self.alpha * (reward + self.gamma * self.Q2[new_state][numpy.argmax(self.Q1[new_state])] - self.Q1[state][action_idx])
else:
self.Q2[state][action_idx] = self.Q2[state][action_idx] + self.alpha * (reward + self.gamma * self.Q1[new_state][numpy.argmax(self.Q2[new_state])] - self.Q2[state][action_idx])
def bestAction(self, state):
self._verify(state)
# Get the action with the best Q estimate for the given state
return numpy.argmax([a + b for a,b in zip(self.Q1[state], self.Q2[state])])
def maxReward(self, state):
self._verify(state)
# Get the action with the best Q estimate for the given state
return numpy.max([(a + b)/2 for a,b in zip(self.Q1[state], self.Q2[state])])
def meanReward(self, state):
self._verify(state)
# Get the action with the best Q estimate for the given state
return numpy.mean([(a + b)/2 for a,b in zip(self.Q1[state], self.Q2[state])])
class DynaQ:
def __init__(self, actions, default_q, alpha, gamma, n_simulate):
"""
Arguments:
actions (list): Possible actions
default_q (float): Mean of default Q values. Will be added to a uniform value.
alpha (float): Learning rate
gamma (float): Discount value
n_simulate (int): Number of simulations to do after each action
"""
super(DynaQ, self).__init__()
self.Q = DoubleQ(actions, default_q, alpha, gamma)
self.model = {}
self.n_simulate = n_simulate
self.actions = actions
def updateQ(self, state, new_state, action_idx, reward):
# Update Q estimates, then run n simulation steps
self.Q.updateQ(state, new_state, action_idx, reward)
if state not in self.model:
self.model[state] = [None] * len(self.actions)
# Here we assume that the same action in the same state will always
# lead to the same new state and reward.
self.model[state][action_idx] = (reward, new_state)
sim_states = random.choices(list(self.model.keys()), k=self.n_simulate)
for sim_state in sim_states:
possible_actions = [idx for idx in range(len(self.actions)) if self.model[sim_state][idx] is not None]
sim_action = random.choice(possible_actions)
R, next_sim_state = self.model[sim_state][sim_action]
self.Q.updateQ(sim_state, next_sim_state, sim_action, R)
def bestAction(self, state):
return self.Q.bestAction(state)
def maxReward(self, state):
return self.Q.maxReward(state)
def meanReward(self, state):
return self.Q.meanReward(state)
class Policy:
def __init__(self, actions, Q):
super(Policy, self).__init__()
self.actions = actions
self.Q = Q
# We will lazy-initialize the actions in our policy
self.policy = {}
def getAction(self, state):
if state not in self.policy:
self.updatePolicy(state)
return self.policy[state]
def updatePolicy(self, state):
self.policy[state] = Q.bestAction(state)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--map",
required=True,
type=str,
help="Map file")
parser.add_argument(
"--policy",
required=False,
type=str,
default=None,
help="Pickle file with an existing policy")
parser.add_argument(
"--algorithm",
required=False,
type=str,
default='doubleq',
choices=['doubleq', 'dynaq'],
help="Policy search algorithm.")
args = parser.parse_args()
maze = []
with open(args.map, 'r') as mapfile:
for line in mapfile:
maze.append(line[:-1])
# Find all of the starting and finish locations
starts = [(y, x) for y, row in enumerate(maze)
for x, val in enumerate(row) if val == 'S']
finishes = [(y, x) for y, row in enumerate(maze)
for x, val in enumerate(row) if val == 'F']
# Make a list of all possible actions
# Just up, down, left, right, immobile
actions = [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]
# The initial value of the Q function
DEFAULT_Q = -30
# Do we need to train, or was a policy provided?
if args.policy is None:
# Need to train
# Initialize our starting variables
epsilon = 0.1
# Alpha is the rate at which we adjust Q(S,A) estimates
alpha = 0.3
gamma = 1
if args.algorithm == "doubleq":
Q = DoubleQ(actions, DEFAULT_Q, alpha=0.3, gamma=1)
elif args.algorithm == "dynaq":
Q = DynaQ(actions, DEFAULT_Q, alpha=0.3, gamma=1, n_simulate=5)
policy = Policy(actions, Q)
print("Beginning episodes.")
## Keep maze of how long the trials are taking
episode_durations = []
# Create a policy map visualizer
vis = q_map.Visualizer(len(maze), len(maze[0]), actions)
for episode in range(5000):
if episode % 100 == 0:
if episode != 0:
vis.plotMax(Q, f"double_q_learning_maxQ_{episode}.png")
# Convert to a video later with a command like: convert -delay 10 -layers Optimize `ls -tr double_q_learning_maxQ_*.png` foo.gif
print(f"Beginning episode {episode}")
if episode > 0:
print(f"Mean of last 10 episode durations was {numpy.mean(episode_durations[-10:])}")
# Run the simulation
finished = False
# Pick a random starting location
location = random.choice(starts)
states = []
past_actions = []
rewards = [0]
while not finished:
# Remember the state
states.append(location)
# Get the action from the behavior policy
action = softPolicy(policy, epsilon, location, actions)
# Remember the action
past_actions.append(action)
# Update the location
new_y = location[0] + action[0]
new_x = location[1] + action[1]
# Handle bumping into a wall (no movement)
if new_y < 0 or new_y >= len(maze) or new_x < 0 or new_x >= len(maze[0]) or maze[new_y][new_x] == "W":
# Hit a wall. Don't move.
new_y, new_x = location[0], location[1]
elif maze[new_y][new_x] == "F":
# Finished with this episode.
finished = True
rewards.append(0)
# Move to the next location
old_location = location
location = new_y, new_x
if finished:
# For Q-learning we need a special state for being finished
location = (-10, -10)
# Negative reward for every step that still hasn't finished
if not finished:
rewards.append(-1)
# Q learning will update the Q(S, A) estimate immediately
action_idx = actions.index(action)
Q.updateQ(old_location, location, action_idx, rewards[-1])
# Update the policy for the previous state
policy.updatePolicy(old_location)
# For illustration
episode_durations.append(len(states))
# Save the policy
with open("policy.pkl", "wb") as outfile:
pickle.dump((policy, Q), outfile)
else:
with open(args.policy, 'rb') as policyfile:
policy, Q = pickle.load(policyfile)
# debugging/information
print(f"Actions are {actions}")
for start in starts:
print(f"At location {start} Q has max action scores {Q.maxReward(start)}")
print(f"At location {start} policy is {policy.getAction(start)}, which is {actions[policy.getAction(start)]}")
maze_sim.simulate(policy, actions, maze, starts)
```
-v-
## Plotting
```python
####################
# Matplotlib code to visualize policy learning.
####################
import matplotlib.pyplot as plt
from matplotlib.colors import SymLogNorm
import matplotlib
import matplotlib as mpl
import numpy
class Visualizer:
def __init__(self, height, width, speeds):
self.height = height
self.width = width
self.speeds = speeds
self.ax = plt.gca()
def plotMax(self, Q, outname):
self.ax.clear()
max_values = [[Q.maxReward((y, x)) for x in range(self.width)] for y in range(self.height)]
im = self.ax.imshow(max_values, norm=SymLogNorm(linthresh=1))
cbar = self.ax.figure.colorbar(im, ax=self.ax, cmap="YlGn")
self.ax.figure.tight_layout()
plt.savefig(outname, dpi=2*96)
cbar.remove()
def plotMean(self, Q, outname):
self.ax.clear()
max_values = [[Q.meanReward((y, x)) for x in range(self.width)] for y in range(self.height)]
im = self.ax.imshow(avg_values)
cbar = self.ax.figure.colorbar(im, ax=self.ax, cmap="YlGn")
self.ax.figure.tight_layout()
plt.savefig(outname, dpi=2*96)
cbar.remove()
```
-v-
## Maze Simulation
```python
####################################
# Simulator for a maze and policy. #
####################################
import curses
import random
import time
def curses_simulate(stdscr, policy, actions, maze, starts):
# Clear screen and hide the cursor
curses.curs_set(0)
stdscr.clear()
# Get screen dimensions, but we're ignoring them for now
scr_height, scr_width = stdscr.getmaxyx()
# Demonstrate a maze run.
finished = False
# Pick a random starting location
location = random.choice(starts)
while not finished:
# Print out the maze
for y in range(len(maze)):
for x in range(len(maze[0])):
if (y, x) == location:
stdscr.addch(y, x, 'C')
else:
stdscr.addch(y, x, maze[y][x])
# Refresh the screen to show changes
stdscr.refresh()
time.sleep(0.2)
# Get the action from the target policy
a_idx = policy.getAction(location)
action = actions[a_idx]
# Update the location
new_y = location[0] + action[0]
new_x = location[1] + action[1]
# Handle bumping into a wall (no movement)
if new_y < 0 or new_y >= len(maze) or new_x < 0 or new_x >= len(maze[0]) or maze[new_y][new_x] == "W":
# Hit a wall. Don't move.
new_y, new_x = location[0], location[1]
elif maze[new_y][new_x] == "F":
# Finished.
finished = True
# Move to the next location
location = new_y, new_x
def simulate(policy, actions, maze, starts):
curses.wrapper(curses_simulate, policy, actions, maze, starts)
```
---
## Comparison: Exploration