Source code for problems.multi_object_search.domain.observation

"""
Defines the Observation for the 2D Multi-Object Search domain;

Origin: Multi-Object Search using Object-Oriented POMDPs (ICRA 2019)
(extensions: action space changes, different sensor model, gridworld instead of topological graph)

Observation:

    :code:`{objid : pose(x,y) or NULL}`.
    The sensor model could vary;
    it could be a fan-shaped model as the original paper, or
    it could be something else. But the resulting observation
    should be a map from object id to observed pose or NULL (not observed).
"""

import pomdp_py


###### Observation ######
[docs] class ObjectObservation(pomdp_py.Observation): """The xy pose of the object is observed; or NULL if not observed""" NULL = None def __init__(self, objid, pose): self.objid = objid if type(pose) == tuple and len(pose) == 2 or pose == ObjectObservation.NULL: self.pose = pose else: raise ValueError("Invalid observation %s for object" % (str(pose), objid)) def __hash__(self): return hash((self.objid, self.pose)) def __eq__(self, other): if not isinstance(other, ObjectObservation): return False else: return self.objid == other.objid and self.pose == other.pose
[docs] class MosOOObservation(pomdp_py.OOObservation): """Observation for Mos that can be factored by objects; thus this is an OOObservation.""" def __init__(self, objposes): """ objposes (dict): map from objid to 2d pose or NULL (not ObjectObservation!). """ self._hashcode = hash(frozenset(objposes.items())) self.objposes = objposes
[docs] def for_obj(self, objid): if objid in self.objposes: return ObjectObservation(objid, self.objposes[objid]) else: return ObjectObservation(objid, ObjectObservation.NULL)
def __hash__(self): return self._hashcode def __eq__(self, other): if not isinstance(other, MosOOObservation): return False else: return self.objposes == other.objposes def __str__(self): return "MosOOObservation(%s)" % str(self.objposes) def __repr__(self): return str(self)
[docs] def factor(self, next_state, *params, **kwargs): """Factor this OO-observation by objects""" return { objid: ObjectObservation(objid, self.objposes[objid]) for objid in next_state.object_states if objid != next_state.robot_id }
[docs] @classmethod def merge(cls, object_observations, next_state, *params, **kwargs): """Merge `object_observations` into a single OOObservation object; object_observation (dict): Maps from objid to ObjectObservation""" return MosOOObservation( { objid: object_observations[objid].pose for objid in object_observations if objid != next_state.object_states[objid].objclass != "robot" } )