Source code for pomdp_py.problems.load_unload.load_unload

"""The load unload problem. An agent is placed on a one dimensional grid world
and is tasked with loading itself up on the right side of the world and
unloading on the left. The agent can observe whether or not it is in the load or
unload block but can not tell its exact location of whether it is loaded or
unloaded. Therefore the agent must maintain belief about it's location and load
status.

States are defined by the location of the agent and whether or not it is loaded
Actions: "move-left", "move-right"
Rewards:
    +100 for moving into the unload block while loaded
    -1 otherwise

"""

import pomdp_py
import random
import numpy as np
import sys
import copy
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation


EPSILON = 1e-3
LOAD_LOCATION = 10


[docs] class LUState(pomdp_py.State): def __init__(self, x, loaded): if type(x) != int or x < 0: raise ValueError( "Invalid state: {}\n".format((x, loaded)) + "x must be an integer > 0" ) if type(loaded) != bool: raise ValueError( "Invalid state: {}\n".format((x, loaded)) + "loaded must be a boolean" ) if x == 0 and loaded == True: raise ValueError("Agent can not be loaded in the 0th position") if x == LOAD_LOCATION and loaded == False: raise ValueError("Agent can not be unloaded in the last position") self.x = x self.loaded = loaded def __hash__(self): return hash((self.x, self.loaded)) def __eq__(self, other): if isinstance(other, LUState): return self.x == other.x and self.loaded == self.loaded elif type(other) == tuple: return self.x == other[0] and self.loaded == other[1] def __str__(self): return str((self.x, self.loaded)) def __repr__(self): return "State({})".format(self)
[docs] class LUAction(pomdp_py.Action): def __init__(self, name): if name not in ["move-left", "move-right"]: raise ValueError("Invalid action: %s" % name) self.name = name def __hash__(self): return hash(self.name) def __eq__(self, other): if isinstance(other, LUAction): return self.name == other.name elif type(other) == str: return self.name == other def __str__(self): return self.name def __repr__(self): return "Action(%s)" % self.name
[docs] class LUObservation(pomdp_py.Observation): def __init__(self, obs): if obs not in ["load", "unload", "middle"]: raise ValueError( "Invalid observation: {}\n".format(name) + "Observation must be an integer > 0" ) self.name = obs def __hash__(self): return hash(self.name) def __eq__(self, other): if isinstance(other, LUObservation): return self.name == other.name elif type(other) == str: return self.name == other def __str__(self): return str(self.name) def __repr__(self): return "Observation(%s)" % str(self.x)
# Observation model
[docs] class LUObservationModel(pomdp_py.ObservationModel): """This problem is small enough for the probabilities to be directly given externally"""
[docs] def probability(self, observation, next_state, action, normalized=False, **kwargs): if observation != self.sample(next_state, action): # return EPSILON to avoid degradation of particles return EPSILON else: return 1 - EPSILON
[docs] def sample(self, next_state, action, normalized=False, **kwargs): if next_state.x == 0: return LUObservation("unload") elif next_state.x == LOAD_LOCATION: return LUObservation("load") else: return LUObservation("middle")
[docs] def argmax(self, next_state, action, normalized=False, **kwargs): """Returns the most likely observation""" return self.sample(next_state, action)
# Transition Model
[docs] class LUTransitionModel(pomdp_py.TransitionModel): """This problem is small enough for the probabilities to be directly given externally"""
[docs] def probability(self, next_state, state, action, normalized=False, **kwargs): if next_state != self.sample(state, action): return EPSILON else: return 1 - EPSILON
[docs] def sample(self, state, action, normalized=False, **kwargs): if (state.x == LOAD_LOCATION and action == "move-right") or ( state.x == 0 and action == "move-left" ): # trying to make invalid move, stay in the same place return state if action == "move-right": # make sure we're always loaded in the far right cell if state.x == LOAD_LOCATION - 1: return LUState(state.x + 1, True) return LUState(state.x + 1, state.loaded) if action == "move-left": # make sure we're always unloaded in the first cell if state.x == 1: return LUState(state.x - 1, False) return LUState(state.x - 1, state.loaded)
[docs] def argmax(self, state, action, normalized=False, **kwargs): """Returns the most likely next state""" return self.sample(state, action)
# Reward Model
[docs] class LURewardModel(pomdp_py.RewardModel):
[docs] def probability( self, reward, state, action, next_state, normalized=False, **kwargs ): if reward == self.sample(state, action): return 1.0 else: return 0.0
[docs] def sample(self, state, action, next_state, normalized=False, **kwargs): # if we are unloaded things, give reward 100, otherwise give -1 if action == "move-left" and state.loaded == True and state.x == 1: return 100 else: return -1
[docs] def argmax(self, state, action, next_state, normalized=False, **kwargs): """Returns the most likely reward""" return self.sample(state, action)
# Policy Model
[docs] class LUPolicyModel(pomdp_py.RandomRollout): """This is an extremely dumb policy model; To keep consistent with the framework.""" def __init__(self): self._all_actions = {LUAction("move-right"), LUAction("move-left")}
[docs] def probability(self, action, state, normalized=False, **kwargs): raise NotImplementedError # Never used
[docs] def sample(self, state, normalized=False, **kwargs): return self.get_all_actions().random()
[docs] def argmax(self, state, normalized=False, **kwargs): """Returns the most likely reward""" raise NotImplementedError
[docs] def get_all_actions(self, **kwargs): return self._all_actions
[docs] class LoadUnloadProblem(pomdp_py.POMDP): def __init__(self, init_state, init_belief): """init_belief is a Distribution.""" agent = pomdp_py.Agent( init_belief, LUPolicyModel(), LUTransitionModel(), LUObservationModel(), LURewardModel(), ) env = pomdp_py.Environment(init_state, LUTransitionModel(), LURewardModel()) super().__init__(agent, env, name="LoadUnloadProblem")
[docs] def generate_random_state(): # Flip a coin to determine if we are loaded loaded = np.random.rand() > 0.5 location = np.random.randint(0, LOAD_LOCATION + 1) if location == 0: loaded = False if location == LOAD_LOCATION: loaded = True return LUState(location, loaded)
[docs] def generate_init_belief(num_particles): particles = [] for _ in range(num_particles): particles.append(generate_random_state()) return pomdp_py.Particles(particles)
[docs] def test_planner(load_unload_problem, planner, nsteps=3, discount=0.95): gamma = 1.0 total_reward = 0 total_discounted_reward = 0 fig = plt.figure() plt.title("Load/Unload problem (Red = empty, Blue = full)") plt.xlabel("Position") ax = fig.add_subplot(111) ax.set_xlim(-1, LOAD_LOCATION + 1) ax.set_ylim(0, 2) x, y = [], [] (scat,) = ax.plot(x, y, marker="x", markersize=20, ls=" ", color="black") def update(t): nonlocal gamma, total_reward, total_discounted_reward print("==== Step %d ====" % (t + 1)) action = planner.plan(load_unload_problem.agent) env_reward = load_unload_problem.env.state_transition(action, execute=True) true_state = copy.deepcopy(load_unload_problem.env.state) real_observation = load_unload_problem.env.provide_observation( load_unload_problem.agent.observation_model, action ) load_unload_problem.agent.update_history(action, real_observation) planner.update(load_unload_problem.agent, action, real_observation) total_reward += env_reward total_discounted_reward += env_reward * gamma gamma *= discount print("True state: %s" % true_state) print("Action: %s" % str(action)) print("Observation: %s" % str(real_observation)) print("Reward: %s" % str(env_reward)) print("Reward (Cumulative): %s" % str(total_reward)) print("Reward (Cumulative Discounted): %s" % str(total_discounted_reward)) print("Belief: %s" % str(load_unload_problem.agent.sample_belief())) if isinstance(planner, pomdp_py.POUCT): print("__num_sims__: %d" % planner.last_num_sims) print("__plan_time__: %.5f" % planner.last_planning_time) if isinstance(planner, pomdp_py.PORollout): print("__best_reward__: %d" % planner.last_best_reward) new_x, new_y = [true_state.x], [1] scat.set_data(new_x, new_y) scat.set_color("b" if true_state.loaded else "r") return (scat,) ani = FuncAnimation(fig, update, frames=nsteps, interval=500) plt.show()
[docs] def main(): init_state = generate_random_state() init_belief = generate_init_belief(num_particles=100) load_unload_problem = LoadUnloadProblem(init_state, init_belief) print("** Testing POMCP **") pomcp = pomdp_py.POMCP( max_depth=100, discount_factor=0.95, num_sims=100, exploration_const=110, rollout_policy=load_unload_problem.agent.policy_model, ) test_planner(load_unload_problem, pomcp, nsteps=100)
if __name__ == "__main__": main()