Reinforcement Learning (the manual way)
Reinforcement Learning is fundamentally interesting to me — a computer getting a series of inputs, being told what is good/bad and learning how to do better.
In this case, RL was done just with a few algorithms and data structures to understand how RL works. No fancy AI libraries to be seen here…
I started from this great post: https://towardsdatascience.com/hands-on-introduction-to-reinforcement-learning-in-python-da07f7aaca88 and extended the project a fair bit.
Here is a maze where the robot (a “.”) starts in the top left corner and get’s a yummy reward when it makes it passed all the walls (“X”) to the bottom right hand corner.
The dots are showing all of the places that the robot visited during an attempt.
It has 4,000 attempts (episodes) at reaching the end of the maze. The best-case is to complete the maze in 24 moves, however at the start (when it has no knowledge and is 100% experimentation), it can take a few hundred or even thousands of moves.
We control how much exploit (where it uses its experience to make decisions) vs experimentation (robot walks in a random direction) is chosen as the program runs — starting with 100% experimentation and finishing with 100% exploitation.
Anyway, without further ado, here is the result and then we’ll explain the RL parts of the code:
Reinforcement Learning relies on us giving a reward (the carrot) or a penalty (the stick).
def get_reward(self):
# if at end give 0 reward
# if not at end give -1 reward
if self.robot_position == (self.maze_size[0]-1, self.maze_size[1]-1):
return 0
else:
return -1
In this case, the machine is simply penalised for every turn it takes per episode. We want it to perform as few moves as possible to complete the maze.
Statistically choose if we are going to go a random direction (experiment) or whether we are going to use our knowledge (exploit):
def choose_action(self, state, allowedMoves):
maxG = -10e15
next_move = None
randomN = np.random.random()
if randomN < self.random_factor:
# if random number below random factor, choose random action
next_move = np.random.choice(allowedMoves)
else:
# if exploiting, gather all possible actions and choose one with the highest G (reward)
for action in allowedMoves:
new_state = tuple([sum(x) for x in zip(state, ACTIONS[action])])
if self.G[new_state] >= maxG:
next_move = action
maxG = self.G[new_state]
return next_move
The exploit part of this is where some of the RL magic happens. If the next state is one that we have followed more often to get better rewards (or in this case, less-penalty), then we follow that path.
And the learning part:
def learn(self):
target = 0
#this is the important bit - re-affirming positive G
for prev, reward in reversed(self.state_history):
self.G[prev] = self.G[prev] + self.alpha * (target - self.G[prev])
target += reward
self.state_history = []
This is called at the end of each episode. Weighting is added to every state that was followed, depending on the success of the overall episode. In this case “state” contains every position (that isn’t a wall) and every possible direction that the robot can take from that position.
In this way, we reinforce the pathways that were most successful.
I had a lot of fun building upon https://medium.com/@nehadesaraju’s work here.
Complete files are here: https://github.com/nfitbh/ai-rl
src/agent.py
import numpy as np
ACTIONS = {'U': (-1, 0), 'D': (1, 0), 'L': (0, -1), 'R': (0, 1)}
class Agent(object):
def __init__(self, states, alpha=0.15, random_factor=0.8, random_adjust=-0.01, min_random_factor=0.1): # 80% explore, 20% exploit
self.state_history = [((0, 0), 0)] # state, reward
self.alpha = alpha
self.random_factor = random_factor
self.G = {}
self.init_reward(states)
self.random_adjust = random_adjust
self.min_random_factor = min_random_factor
def init_reward(self, states):
for i, row in enumerate(states):
for j, col in enumerate(row):
self.G[(j, i)] = np.random.uniform(low=1.0, high=0.1)
def choose_action(self, state, allowedMoves):
maxG = -10e15
next_move = None
randomN = np.random.random()
if randomN < self.random_factor:
# if random number below random factor, choose random action
next_move = np.random.choice(allowedMoves)
else:
# if exploiting, gather all possible actions and choose one with the highest G (reward)
for action in allowedMoves:
new_state = tuple([sum(x) for x in zip(state, ACTIONS[action])])
if self.G[new_state] >= maxG:
next_move = action
maxG = self.G[new_state]
return next_move
def update_state_history(self, state, reward):
self.state_history.append((state, reward))
def learn(self):
target = 0
#this is the important bit - re-affirming positive G
for prev, reward in reversed(self.state_history):
self.G[prev] = self.G[prev] + self.alpha * (target - self.G[prev])
target += reward
self.state_history = []
def update_random_factor(self):
self.random_factor += self.random_adjust # decrease random factor each episode of play
if self.random_factor < self.min_random_factor:
self.random_factor = self.min_random_factor
src/environment.py:
from time import sleep
import numpy as np
ACTIONS = {'U': (-1, 0), 'D': (1, 0), 'L': (0, -1), 'R': (0, 1)}
class Maze(object):
def __init__(self):
self.maze_size = (9, 9)
self.maze = np.zeros(self.maze_size)
#contruct the walls of the maze
self.maze[0, 0] = 2
self.maze[5, :5] = 1
self.maze[:4, 5] = 1
self.maze[2, 2:6] = 1
self.maze[3, 2] = 1
self.maze[1:8, 7] = 1
self.maze[7, 2:8] = 1\
#robot start
self.robot_position = (0, 0)
self.steps = 0
self.construct_allowed_states()
self.maze_visited = self.maze.copy()
#used for screen shotting the maze before action begins
#self.maze_visited[0, 0] = 0
#print(self.st())
#exit()
def is_allowed_move(self, state, action):
# check allowed move from a given state
y, x = state
y += ACTIONS[action][0]
x += ACTIONS[action][1]
if y < 0 or x < 0 or y > self.maze_size[0]-1 or x > self.maze_size[1]-1:
# if robot will move off the board
return False
return (self.maze[y, x] != 1) # not a wall
def construct_allowed_states(self):
# create a dictionary of allowed states from any position
# using the isAllowedMove() function
# this is so that you don't have to call the function every time
allowed_states = {}
for y, row in enumerate(self.maze):
for x, col in enumerate(row):
# iterate through all spaces
#print(x, y)
if self.maze[(y,x)] != 1:
# if the space is not a wall, add it to the allowed states dictionary
allowed_states[(y,x)] = []
for action in ACTIONS:
if self.is_allowed_move((y,x), action) & (action != 0):
allowed_states[(y,x)].append(action)
self.allowed_states = allowed_states
def update_maze(self, action):
y, x = self.robot_position # get current position
self.maze[y, x] = 0 # set the current position to 0
y += ACTIONS[action][0] # get new position
x += ACTIONS[action][1] # get new position
self.robot_position = (y, x) # set new position
self.maze_visited[y, x] = 2
self.maze[y, x] = 2 # set new position
self.steps += 1 # add steps
def is_game_over(self):
# check if robot in the final position
y, x = self.robot_position
return ((y+1, x+1) == self.maze_size)
def get_state_and_reward(self):
return self.robot_position, self.get_reward()
def get_reward(self):
# if at end give 0 reward
# if not at end give -1 reward
if self.robot_position == (self.maze_size[0]-1, self.maze_size[1]-1):
return 0
else:
return -1
def st(self):
st = "-------------------------------------------------------------------\n"
for row in self.maze_visited:
for col in row:
if col == 0:
st += "\t" # empty space
elif col == 1:
st += "X\t" # walls
elif col == 2:
st += ".\t" # robot visited
st += "\n"
st += "-------------------------------------------------------------------\n"
return st
src/term.py:
import statistics
import time
class Term:
def __init__(self, robot, numEpisodes, frameDuration, reportAfterEpisode) -> None:
self.robot = robot
self.numEpisodes = numEpisodes
self.reportAfterEpisode = reportAfterEpisode
self.frameDuration = frameDuration
self.frameStartTime = time.time()
def print_maze_visited(self, maze, episodeNum, moveHistory):
global frameStartTime
print(chr(27) + "[2J")
avgCompletion = 0
if len(moveHistory) > self.reportAfterEpisode:
avgCompletion = statistics.mean(moveHistory[-self.reportAfterEpisode:])
random_factor_str = "{:.1f}".format(self.robot.random_factor*100) + "%"
st = f"Episode Number:\t\t{episodeNum}/{self.numEpisodes}\nRandom Factor:\t\t{random_factor_str}\nIts per attempt:\t{avgCompletion}\n"
st += maze.st()
print(st)
delta = self.frameDuration - (time.time() - self.frameStartTime)
#print(delta)
if delta > 0 and delta < self.frameDuration:
time.sleep(delta)
self.frameStartTime = time.time()
and src/main.py:
import time
import numpy as np
from environment import Maze
from agent import Agent
from term import Term
import statistics
frameStartTime = time.time()
frameDuration = 0.05
if __name__ == '__main__':
num_episodes = 4000
reportAfterEpisodes = 5
maze = Maze()
robot = Agent(maze.maze, alpha=0.1, random_factor=1.0, random_adjust=-0.00027, min_random_factor=0.0)
term = Term(robot, num_episodes, frameDuration, reportAfterEpisodes)
moveHistory = []
for i in range(num_episodes):
while not maze.is_game_over():
state, _ = maze.get_state_and_reward() # get the current state
action = robot.choose_action(state, maze.allowed_states[state]) # choose an action (explore or exploit)
maze.update_maze(action) # update the maze according to the action
state, reward = maze.get_state_and_reward() # get the new state and reward
robot.update_state_history(state, reward) # update the robot memory with state and reward
if maze.steps > 10000:
break
robot.learn() # robot should learn after every episode
robot.update_random_factor()
moveHistory.append(maze.steps) # get a history of number of steps taken to plot later
if i % reportAfterEpisodes == 0:
term.print_maze_visited(maze, i, moveHistory)
if i < num_episodes-1:
maze = Maze() # reinitialize the maze
term.print_maze_visited(maze, i+1, moveHistory)
finally a Dockerfile:
ARG PYTHON_VERSION=3.11.4
FROM python:${PYTHON_VERSION}-slim as base
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
#RUN apt-get update -y
#RUN apt install python3 -y
#RUN pip install --upgrade pip
RUN --mount=type=cache,target=/root/.cache/pip \
--mount=type=bind,source=requirements.txt,target=requirements.txt \
pip3 install -r requirements.txt
COPY src/*.py /app
CMD python3 /app/main.py
and docker-run.sh:
docker build . -t ai-rl-docker
docker run --rm -it --name ai-rl ai-rl-docker