Source code for pomdp_py.problems.multi_object_search.models.observation_model

"""Defines the ObservationModel for the 2D Multi-Object Search domain.

Origin: Multi-Object Search using Object-Oriented POMDPs (ICRA 2019)
(extensions: action space changes, different sensor model, gridworld instead of
topological graph)

Observation: {objid : pose(x,y) or NULL}. The sensor model could vary;
             it could be a fan-shaped model as the original paper, or
             it could be something else. But the resulting observation
             should be a map from object id to observed pose or NULL (not observed).

Observation Model

  The agent can observe its own state, as well as object poses
  that are within its sensor range. We only need to model object
  observation.

"""

import pomdp_py
import math
import random
import numpy as np
from pomdp_py.problems.multi_object_search.domain.state import *
from pomdp_py.problems.multi_object_search.domain.action import *
from pomdp_py.problems.multi_object_search.domain.observation import *


#### Observation Models ####
[docs] class MosObservationModel(pomdp_py.OOObservationModel): """Object-oriented transition model""" def __init__(self, dim, sensor, object_ids, sigma=0.01, epsilon=1): self.sigma = sigma self.epsilon = epsilon observation_models = { objid: ObjectObservationModel( objid, sensor, dim, sigma=sigma, epsilon=epsilon ) for objid in object_ids } pomdp_py.OOObservationModel.__init__(self, observation_models)
[docs] def sample(self, next_state, action, argmax=False, **kwargs): if not isinstance(action, LookAction): return MosOOObservation({}) # return MosOOObservation({objid: ObjectObservationModel.NULL # for objid in next_state.object_states # if objid != next_state.object_states[objid].objclass != "robot"}) factored_observations = super().sample(next_state, action, argmax=argmax) return MosOOObservation.merge(factored_observations, next_state)
[docs] class ObjectObservationModel(pomdp_py.ObservationModel): def __init__(self, objid, sensor, dim, sigma=0, epsilon=1): """ sigma and epsilon are parameters of the observation model (see paper), dim (tuple): a tuple (width, length) for the dimension of the world""" self._objid = objid self._sensor = sensor self._dim = dim self.sigma = sigma self.epsilon = epsilon def _compute_params(self, object_in_sensing_region): if object_in_sensing_region: # Object is in the sensing region alpha = self.epsilon beta = (1.0 - self.epsilon) / 2.0 gamma = (1.0 - self.epsilon) / 2.0 else: # Object is not in the sensing region. alpha = (1.0 - self.epsilon) / 2.0 beta = (1.0 - self.epsilon) / 2.0 gamma = self.epsilon return alpha, beta, gamma
[docs] def probability(self, observation, next_state, action, **kwargs): """ Returns the probability of Pr (observation | next_state, action). Args: observation (ObjectObservation) next_state (State) action (Action) """ if not isinstance(action, LookAction): # No observation should be received if observation.pose == ObjectObservation.NULL: return 1.0 else: return 0.0 if observation.objid != self._objid: raise ValueError("The observation is not about the same object") # The (funny) business of allowing histogram belief update using O(oi|si',sr',a). next_robot_state = kwargs.get("next_robot_state", None) if next_robot_state is not None: assert ( next_robot_state["id"] == self._sensor.robot_id ), "Robot id of observation model mismatch with given state" robot_pose = next_robot_state.pose if isinstance(next_state, ObjectState): assert ( next_state["id"] == self._objid ), "Object id of observation model mismatch with given state" object_pose = next_state.pose else: object_pose = next_state.pose(self._objid) else: robot_pose = next_state.pose(self._sensor.robot_id) object_pose = next_state.pose(self._objid) # Compute the probability zi = observation.pose alpha, beta, gamma = self._compute_params( self._sensor.within_range(robot_pose, object_pose) ) # Requires Python >= 3.6 prob = 0.0 # Event A: # object in sensing region and observation comes from object i if zi == ObjectObservation.NULL: # Even though event A occurred, the observation is NULL. # This has 0.0 probability. prob += 0.0 * alpha else: gaussian = pomdp_py.Gaussian( list(object_pose), [[self.sigma**2, 0], [0, self.sigma**2]] ) prob += gaussian[zi] * alpha # Event B prob += (1.0 / self._sensor.sensing_region_size) * beta # Event C pr_c = 1.0 if zi == ObjectObservation.NULL else 0.0 # indicator zi == NULL prob += pr_c * gamma return prob
[docs] def sample(self, next_state, action, **kwargs): """Returns observation""" if not isinstance(action, LookAction): # Not a look action. So no observation return ObjectObservation(self._objid, ObjectObservation.NULL) robot_pose = next_state.pose(self._sensor.robot_id) object_pose = next_state.pose(self._objid) # Obtain observation according to distribution. alpha, beta, gamma = self._compute_params( self._sensor.within_range(robot_pose, object_pose) ) # Requires Python >= 3.6 event_occured = random.choices( ["A", "B", "C"], weights=[alpha, beta, gamma], k=1 )[0] zi = self._sample_zi(event_occured, next_state) return ObjectObservation(self._objid, zi)
[docs] def argmax(self, next_state, action, **kwargs): # Obtain observation according to distribution. alpha, beta, gamma = self._compute_params( self._sensor.within_range(robot_pose, object_pose) ) event_probs = {"A": alpha, "B": beta, "C": gamma} event_occured = max(event_probs, key=lambda e: event_probs[e]) zi = self._sample_zi(event_occured, next_state, argmax=True) return ObjectObservation(self._objid, zi)
def _sample_zi(self, event, next_state, argmax=False): if event == "A": object_true_pose = next_state.object_pose(self._objid) gaussian = pomdp_py.Gaussian( list(object_true_pose), [[self.sigma**2, 0], [0, self.sigma**2]] ) if not argmax: zi = gaussian.random() else: zi = gaussian.mpe() zi = (int(round(zi[0])), int(round(zi[1]))) elif event == "B": # TODO: FIX. zi should ONLY come from the field of view. # There is currently no easy way to sample from the field of view. width, height = self._dim zi = ( random.randint(0, width), # x axis random.randint(0, height), ) # y axis else: # event == C zi = ObjectObservation.NULL return zi
### Unit test ###
[docs] def unittest(): from ..env.env import ( make_laser_sensor, make_proximity_sensor, equip_sensors, interpret, interpret_robot_id, ) # Test within search region check, # and the observation model probability and # sampling functions. worldmap = """ .......... ....T..... ......x... ..T.r.T... ..x....... ....T..... .......... """ # 0123456789 # 10 x 8 worldstr = equip_sensors(worldmap, {"r": make_laser_sensor(90, (1, 5), 0.5, False)}) env = interpret(worldstr) robot_id = interpret_robot_id("r") robot_pose = env.state.pose(robot_id) # within_range test sensor = env.sensors[robot_id] assert sensor.within_range(robot_pose, (4, 3)) == False assert sensor.within_range(robot_pose, (5, 3)) == True assert sensor.within_range(robot_pose, (6, 3)) == True assert sensor.within_range(robot_pose, (7, 2)) == True assert sensor.within_range(robot_pose, (7, 3)) == True assert sensor.within_range(robot_pose, (4, 3)) == False assert sensor.within_range(robot_pose, (2, 4)) == False assert sensor.within_range(robot_pose, (4, 1)) == False assert sensor.within_range(robot_pose, (4, 5)) == False assert sensor.within_range(robot_pose, (0, 0)) == False print(env.state) # observation model test O0 = ObjectObservationModel( 0, sensor, (env.width, env.length), sigma=0.01, epsilon=1 ) O2 = ObjectObservationModel( 2, sensor, (env.width, env.length), sigma=0.01, epsilon=1 ) O3 = ObjectObservationModel( 3, sensor, (env.width, env.length), sigma=0.01, epsilon=1 ) O5 = ObjectObservationModel( 5, sensor, (env.width, env.length), sigma=0.01, epsilon=1 ) z0 = O0.sample(env.state, Look) assert z0.pose == ObjectObservation.NULL z2 = O2.sample(env.state, Look) assert z2.pose == ObjectObservation.NULL z3 = O3.sample(env.state, Look) assert z3.pose == (6, 3) z5 = O5.sample(env.state, Look) assert z5.pose == ObjectObservation.NULL assert O0.probability(z0, env.state, Look) == 1.0 assert O2.probability(z2, env.state, Look) == 1.0 assert O3.probability(z3, env.state, Look) >= 1.0 assert ( O3.probability(ObjectObservation(3, ObjectObservation.NULL), env.state, Look) == 0.0 ) assert O5.probability(z5, env.state, Look) == 1.0
if __name__ == "__main__": unittest()