"""RockSample(n,k) problem
Origin: Heuristic Search Value Iteration for POMDPs (UAI 2004)
Description:
State space:
Position {(1,1),(1,2),...(n,n)}
:math:`\\times` RockType_1 :math:`\\times` RockType_2, ..., :math:`\\times` RockType_k
where RockType_i = {Good, Bad}
:math:`\\times` TerminalState
(basically, the positions of rocks are known to the robot,
but not represented explicitly in the state space. Check_i
will smartly check the rock i at its location.)
Action space:
North, South, East, West, Sample, Check_1, ..., Check_k
The first four moves the agent deterministically
Sample: samples the rock at agent's current location
Check_i: receives a noisy observation about RockType_i
(noise determined by eta (:math:`\eta`). eta=1 -> perfect sensor; eta=0 -> uniform)
Observation: observes the property of rock i when taking Check_i. The
observation may be noisy, depending on an efficiency parameter which
decreases exponentially as the distance increases between the rover and
rock i. 'half_efficiency_dist' influences this parameter (larger, more robust)
Reward: +10 for Sample a good rock. -10 for Sampling a bad rock.
Move to exit area +10. Other actions have no cost or reward.
Initial belief: every rock has equal probability of being Good or Bad.
"""
import pomdp_py
import random
import math
import numpy as np
import sys
import copy
EPSILON = 1e-9
[docs]
def euclidean_dist(p1, p2):
return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)
[docs]
class RockType:
GOOD = "good"
BAD = "bad"
[docs]
@staticmethod
def invert(rocktype):
if rocktype == "good":
return "bad"
else:
return "good"
# return 1 - rocktype
[docs]
@staticmethod
def random(p=0.5):
if random.uniform(0, 1) >= p:
return RockType.GOOD
else:
return RockType.BAD
[docs]
class State(pomdp_py.State):
def __init__(self, position, rocktypes, terminal=False):
"""
position (tuple): (x,y) position of the rover on the grid.
rocktypes: tuple of size k. Each is either Good or Bad.
terminal (bool): The robot is at the terminal state.
(It is so true that the agent's state doesn't need to involve the map!)
x axis is horizontal. y axis is vertical.
"""
self.position = position
if type(rocktypes) != tuple:
rocktypes = tuple(rocktypes)
self.rocktypes = rocktypes
self.terminal = terminal
def __hash__(self):
return hash((self.position, self.rocktypes, self.terminal))
def __eq__(self, other):
if isinstance(other, State):
return (
self.position == other.position
and self.rocktypes == other.rocktypes
and self.terminal == other.terminal
)
else:
return False
def __str__(self):
return self.__repr__()
def __repr__(self):
return "State(%s | %s | %s)" % (
str(self.position),
str(self.rocktypes),
str(self.terminal),
)
[docs]
class Action(pomdp_py.Action):
def __init__(self, name):
self.name = name
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
if isinstance(other, Action):
return self.name == other.name
elif type(other) == str:
return self.name == other
def __str__(self):
return self.name
def __repr__(self):
return "Action(%s)" % self.name
[docs]
class MoveAction(Action):
EAST = (1, 0) # x is horizontal; x+ is right. y is vertical; y+ is up.
WEST = (-1, 0)
NORTH = (0, -1)
SOUTH = (0, 1)
def __init__(self, motion, name):
if motion not in {
MoveAction.EAST,
MoveAction.WEST,
MoveAction.NORTH,
MoveAction.SOUTH,
}:
raise ValueError("Invalid move motion %s" % motion)
self.motion = motion
super().__init__("move-%s" % str(name))
MoveEast = MoveAction(MoveAction.EAST, "EAST")
MoveWest = MoveAction(MoveAction.WEST, "WEST")
MoveNorth = MoveAction(MoveAction.NORTH, "NORTH")
MoveSouth = MoveAction(MoveAction.SOUTH, "SOUTH")
[docs]
class SampleAction(Action):
def __init__(self):
super().__init__("sample")
[docs]
class CheckAction(Action):
def __init__(self, rock_id):
self.rock_id = rock_id
super().__init__("check-%d" % self.rock_id)
[docs]
class Observation(pomdp_py.Observation):
def __init__(self, quality):
self.quality = quality
def __hash__(self):
return hash(self.quality)
def __eq__(self, other):
if isinstance(other, Observation):
return self.quality == other.quality
elif type(other) == str:
return self.quality == other
def __str__(self):
return str(self.quality)
def __repr__(self):
return "Observation(%s)" % str(self.quality)
[docs]
class RSTransitionModel(pomdp_py.TransitionModel):
"""The model is deterministic"""
def __init__(self, n, rock_locs, in_exit_area):
"""
rock_locs: a map from (x,y) location to rock_id
in_exit_area: a function (x,y) -> Bool that returns True if (x,y) is in exit area
"""
self._n = n
self._rock_locs = rock_locs
self._in_exit_area = in_exit_area
def _move_or_exit(self, position, action):
expected = (position[0] + action.motion[0], position[1] + action.motion[1])
if self._in_exit_area(expected):
return expected, True
else:
return (
max(0, min(position[0] + action.motion[0], self._n - 1)),
max(0, min(position[1] + action.motion[1], self._n - 1)),
), False
[docs]
def probability(self, next_state, state, action, normalized=False, **kwargs):
if next_state != self.sample(state, action):
return EPSILON
else:
return 1.0 - EPSILON
[docs]
def sample(self, state, action):
next_position = tuple(state.position)
rocktypes = tuple(state.rocktypes)
next_rocktypes = rocktypes
next_terminal = state.terminal
if state.terminal:
next_terminal = True # already terminated. So no state transition happens
else:
if isinstance(action, MoveAction):
next_position, exiting = self._move_or_exit(state.position, action)
if exiting:
next_terminal = True
elif isinstance(action, SampleAction):
if next_position in self._rock_locs:
rock_id = self._rock_locs[next_position]
_rocktypes = list(rocktypes)
_rocktypes[rock_id] = RockType.BAD
next_rocktypes = tuple(_rocktypes)
return State(next_position, next_rocktypes, next_terminal)
[docs]
def argmax(self, state, action):
"""Returns the most likely next state"""
return self.sample(state, action)
[docs]
class RSObservationModel(pomdp_py.ObservationModel):
def __init__(self, rock_locs, half_efficiency_dist=20):
self._half_efficiency_dist = half_efficiency_dist
self._rocks = {rock_locs[pos]: pos for pos in rock_locs}
[docs]
def probability(self, observation, next_state, action):
if isinstance(action, CheckAction):
# compute efficiency
rock_pos = self._rocks[action.rock_id]
dist = euclidean_dist(rock_pos, next_state.position)
eta = (1 + pow(2, -dist / self._half_efficiency_dist)) * 0.5
# compute probability
actual_rocktype = next_state.rocktypes[action.rock_id]
if actual_rocktype == observation:
return eta
else:
return 1.0 - eta
else:
if observation.quality is None:
return 1.0 - EPSILON # expected to receive no observation
else:
return EPSILON
[docs]
def sample(self, next_state, action, argmax=False):
if not next_state.terminal and isinstance(action, CheckAction):
# compute efficiency
rock_pos = self._rocks[action.rock_id]
dist = euclidean_dist(rock_pos, next_state.position)
eta = (1 + pow(2, -dist / self._half_efficiency_dist)) * 0.5
if argmax:
keep = eta > 0.5
else:
keep = eta > random.uniform(0, 1)
actual_rocktype = next_state.rocktypes[action.rock_id]
if not keep:
observed_rocktype = RockType.invert(actual_rocktype)
return Observation(observed_rocktype)
else:
return Observation(actual_rocktype)
else:
# Terminated or not a check action. So no observation.
return Observation(None)
return self._probs[next_state][action][observation]
[docs]
def argmax(self, next_state, action):
"""Returns the most likely observation"""
return self.sample(next_state, action, argmax=True)
[docs]
class RSRewardModel(pomdp_py.RewardModel):
def __init__(self, rock_locs, in_exit_area):
self._rock_locs = rock_locs
self._in_exit_area = in_exit_area
[docs]
def sample(self, state, action, next_state, normalized=False, **kwargs):
# deterministic
if state.terminal:
return 0 # terminated. No reward
if isinstance(action, SampleAction):
# need to check the rocktype in `state` because it has turned bad in `next_state`
if state.position in self._rock_locs:
if state.rocktypes[self._rock_locs[state.position]] == RockType.GOOD:
return 10
else:
# No rock or bad rock
return -10
else:
return 0 # problem didn't specify penalty for sampling empty space.
elif isinstance(action, MoveAction):
if self._in_exit_area(next_state.position):
return 10
return 0
[docs]
def argmax(self, state, action, next_state, normalized=False, **kwargs):
raise NotImplementedError
[docs]
def probability(
self, reward, state, action, next_state, normalized=False, **kwargs
):
raise NotImplementedError
[docs]
class RSPolicyModel(pomdp_py.RolloutPolicy):
"""Simple policy model according to problem description."""
def __init__(self, n, k):
check_actions = set({CheckAction(rock_id) for rock_id in range(k)})
self._move_actions = {MoveEast, MoveWest, MoveNorth, MoveSouth}
self._other_actions = {SampleAction()} | check_actions
self._all_actions = self._move_actions | self._other_actions
self._n = n
[docs]
def sample(self, state, normalized=False, **kwargs):
return random.sample(self.get_all_actions(state=state), 1)[0]
[docs]
def probability(self, action, state, normalized=False, **kwargs):
raise NotImplementedError
[docs]
def argmax(self, state, normalized=False, **kwargs):
"""Returns the most likely reward"""
raise NotImplementedError
[docs]
def get_all_actions(self, **kwargs):
state = kwargs.get("state", None)
if state is None:
return list(self._all_actions)
else:
motions = set(self._all_actions)
rover_x, rover_y = state.position
if rover_x == 0:
motions.remove(MoveWest)
if rover_y == 0:
motions.remove(MoveNorth)
if rover_y == self._n - 1:
motions.remove(MoveSouth)
return list(motions | self._other_actions)
[docs]
def rollout(self, state, history=None):
return random.sample(self.get_all_actions(state=state), 1)[0]
[docs]
class RockSampleProblem(pomdp_py.POMDP):
[docs]
@staticmethod
def random_free_location(n, not_free_locs):
"""returns a random (x,y) location in nxn grid that is free."""
while True:
loc = (random.randint(0, n - 1), random.randint(0, n - 1))
if loc not in not_free_locs:
return loc
[docs]
def in_exit_area(self, pos):
return pos[0] == self._n
[docs]
@staticmethod
def generate_instance(n, k):
"""Returns init_state and rock locations for an instance of RockSample(n,k)"""
rover_position = (0, random.randint(0, n - 1))
rock_locs = {} # map from rock location to rock id
for i in range(k):
loc = RockSampleProblem.random_free_location(
n, set(rock_locs.keys()) | set({rover_position})
)
rock_locs[loc] = i
# random rocktypes
rocktypes = tuple(RockType.random() for i in range(k))
# Ground truth state
init_state = State(rover_position, rocktypes, False)
return init_state, rock_locs
[docs]
def print_state(self):
string = "\n______ID______\n"
rover_position = self.env.state.position
rocktypes = self.env.state.rocktypes
# Rock id map
for y in range(self._n):
for x in range(self._n + 1):
char = "."
if x == self._n:
char = ">"
if (x, y) in self._rock_locs:
char = str(self._rock_locs[(x, y)])
if (x, y) == rover_position:
char = "R"
string += char
string += "\n"
string += "_____G/B_____\n"
# Good/bad map
for y in range(self._n):
for x in range(self._n + 1):
char = "."
if x == self._n:
char = ">"
if (x, y) in self._rock_locs:
if rocktypes[self._rock_locs[(x, y)]] == RockType.GOOD:
char = "$"
else:
char = "x"
if (x, y) == rover_position:
char = "R"
string += char
string += "\n"
print(string)
def __init__(
self, n, k, init_state, rock_locs, init_belief, half_efficiency_dist=20
):
self._n, self._k = n, k
agent = pomdp_py.Agent(
init_belief,
RSPolicyModel(n, k),
RSTransitionModel(n, rock_locs, self.in_exit_area),
RSObservationModel(rock_locs, half_efficiency_dist=half_efficiency_dist),
RSRewardModel(rock_locs, self.in_exit_area),
)
env = pomdp_py.Environment(
init_state,
RSTransitionModel(n, rock_locs, self.in_exit_area),
RSRewardModel(rock_locs, self.in_exit_area),
)
self._rock_locs = rock_locs
super().__init__(agent, env, name="RockSampleProblem")
[docs]
def test_planner(rocksample, planner, nsteps=3, discount=0.95):
gamma = 1.0
total_reward = 0
total_discounted_reward = 0
for i in range(nsteps):
print("==== Step %d ====" % (i + 1))
action = planner.plan(rocksample.agent)
# pomdp_py.visual.visualize_pouct_search_tree(rocksample.agent.tree,
# max_depth=5, anonymize=False)
true_state = copy.deepcopy(rocksample.env.state)
env_reward = rocksample.env.state_transition(action, execute=True)
true_next_state = copy.deepcopy(rocksample.env.state)
real_observation = rocksample.env.provide_observation(
rocksample.agent.observation_model, action
)
rocksample.agent.update_history(action, real_observation)
planner.update(rocksample.agent, action, real_observation)
total_reward += env_reward
total_discounted_reward += env_reward * gamma
gamma *= discount
print("True state: %s" % true_state)
print("Action: %s" % str(action))
print("Observation: %s" % str(real_observation))
print("Reward: %s" % str(env_reward))
print("Reward (Cumulative): %s" % str(total_reward))
print("Reward (Cumulative Discounted): %s" % str(total_discounted_reward))
if isinstance(planner, pomdp_py.POUCT):
print("__num_sims__: %d" % planner.last_num_sims)
print("__plan_time__: %.5f" % planner.last_planning_time)
if isinstance(planner, pomdp_py.PORollout):
print("__best_reward__: %d" % planner.last_best_reward)
print("World:")
rocksample.print_state()
if rocksample.in_exit_area(rocksample.env.state.position):
break
return total_reward, total_discounted_reward
[docs]
def init_particles_belief(k, num_particles, init_state, belief="uniform"):
num_particles = 200
particles = []
for _ in range(num_particles):
if belief == "uniform":
rocktypes = []
for i in range(k):
rocktypes.append(RockType.random())
rocktypes = tuple(rocktypes)
elif belief == "groundtruth":
rocktypes = copy.deepcopy(init_state.rocktypes)
particles.append(State(init_state.position, rocktypes, False))
init_belief = pomdp_py.Particles(particles)
return init_belief
[docs]
def minimal_instance(**kwargs):
# A particular instance for debugging purpose
n, k = 2, 2
rover_position = (0, 0)
rock_locs = {} # map from rock location to rock id
rock_locs[(0, 1)] = 0
rock_locs[(1, 1)] = 1
rocktypes = ("good", "good")
# Ground truth state
init_state = State(rover_position, rocktypes, False)
belief = "uniform"
init_belief = init_particles_belief(k, 200, init_state, belief=belief)
rocksample = RockSampleProblem(n, k, init_state, rock_locs, init_belief, **kwargs)
return rocksample
[docs]
def create_instance(n, k, **kwargs):
init_state, rock_locs = RockSampleProblem.generate_instance(n, k)
belief = "uniform"
# init belief (uniform), represented in particles;
# We don't factor the state here; We are also not doing any action prior.
init_belief = init_particles_belief(k, 200, init_state, belief=belief)
rocksample = RockSampleProblem(n, k, init_state, rock_locs, init_belief, **kwargs)
return rocksample
[docs]
def main():
rocksample = debug_instance() # create_instance(7, 8)
rocksample.print_state()
print("*** Testing POMCP ***")
pomcp = pomdp_py.POMCP(
max_depth=30,
discount_factor=0.95,
num_sims=10000,
exploration_const=5,
rollout_policy=rocksample.agent.policy_model,
num_visits_init=1,
)
tt, ttd = test_planner(rocksample, pomcp, nsteps=100, discount=0.95)
if __name__ == "__main__":
main()