Source code for problems.multi_object_search.models.transition_model

"""Defines the TransitionModel for the 2D Multi-Object Search domain.

Origin: Multi-Object Search using Object-Oriented POMDPs (ICRA 2019)
(extensions: action space changes, different sensor model, gridworld instead of
topological graph)

Description: Multi-Object Search in a 2D grid world.

Transition: deterministic
"""

import pomdp_py
import copy
from pomdp_py.problems.multi_object_search.domain.state import *
from pomdp_py.problems.multi_object_search.domain.observation import *
from pomdp_py.problems.multi_object_search.domain.action import *


####### Transition Model #######
[docs] class MosTransitionModel(pomdp_py.OOTransitionModel): """Object-oriented transition model; The transition model supports the multi-robot case, where each robot is equipped with a sensor; The multi-robot transition model should be used by the Environment, but not necessarily by each robot for planning. """ def __init__(self, dim, sensors, object_ids, epsilon=1e-9): """ sensors (dict): robot_id -> Sensor for_env (bool): True if this is a robot transition model used by the Environment. see RobotTransitionModel for details. """ self._sensors = sensors transition_models = { objid: StaticObjectTransitionModel(objid, epsilon=epsilon) for objid in object_ids if objid not in sensors } for robot_id in sensors: transition_models[robot_id] = RobotTransitionModel( sensors[robot_id], dim, epsilon=epsilon ) super().__init__(transition_models)
[docs] def sample(self, state, action, **kwargs): oostate = pomdp_py.OOTransitionModel.sample(self, state, action, **kwargs) return MosOOState(oostate.object_states)
[docs] def argmax(self, state, action, normalized=False, **kwargs): oostate = pomdp_py.OOTransitionModel.argmax(self, state, action, **kwargs) return MosOOState(oostate.object_states)
[docs] class StaticObjectTransitionModel(pomdp_py.TransitionModel): """This model assumes the object is static.""" def __init__(self, objid, epsilon=1e-9): self._objid = objid self._epsilon = epsilon
[docs] def probability(self, next_object_state, state, action): if next_object_state != state.object_states[next_object_state["id"]]: return self._epsilon else: return 1.0 - self._epsilon
[docs] def sample(self, state, action): """Returns next_object_state""" return self.argmax(state, action)
[docs] def argmax(self, state, action): """Returns the most likely next object_state""" return copy.deepcopy(state.object_states[self._objid])
[docs] class RobotTransitionModel(pomdp_py.TransitionModel): """We assume that the robot control is perfect and transitions are deterministic.""" def __init__(self, sensor, dim, epsilon=1e-9): """ dim (tuple): a tuple (width, length) for the dimension of the world """ # this is used to determine objects found for FindAction self._sensor = sensor self._robot_id = sensor.robot_id self._dim = dim self._epsilon = epsilon
[docs] @classmethod def if_move_by(cls, robot_id, state, action, dim, check_collision=True): """Defines the dynamics of robot motion; dim (tuple): the width, length of the search world.""" if not isinstance(action, MotionAction): raise ValueError("Cannot move robot with %s action" % str(type(action))) robot_pose = state.pose(robot_id) rx, ry, rth = robot_pose if action.scheme == MotionAction.SCHEME_XYTH: dx, dy, th = action.motion rx += dx ry += dy rth = th elif action.scheme == MotionAction.SCHEME_VW: # odometry motion model forward, angle = action.motion rth += angle # angle (radian) rx = int(round(rx + forward * math.cos(rth))) ry = int(round(ry + forward * math.sin(rth))) rth = rth % (2 * math.pi) if valid_pose( (rx, ry, rth), dim[0], dim[1], state=state, check_collision=check_collision, pose_objid=robot_id, ): return (rx, ry, rth) else: return robot_pose # no change because change results in invalid pose
[docs] def probability(self, next_robot_state, state, action): if next_robot_state != self.argmax(state, action): return self._epsilon else: return 1.0 - self._epsilon
[docs] def argmax(self, state, action): """Returns the most likely next robot_state""" if isinstance(state, RobotState): robot_state = state else: robot_state = state.object_states[self._robot_id] next_robot_state = copy.deepcopy(robot_state) # camera direction is only not None when looking next_robot_state["camera_direction"] = None if isinstance(action, MotionAction): # motion action next_robot_state["pose"] = RobotTransitionModel.if_move_by( self._robot_id, state, action, self._dim ) elif isinstance(action, LookAction): if hasattr(action, "motion") and action.motion is not None: # rotate the robot next_robot_state["pose"] = self._if_move_by( self._robot_id, state, action, self._dim ) next_robot_state["camera_direction"] = action.name elif isinstance(action, FindAction): robot_pose = state.pose(self._robot_id) z = self._sensor.observe(robot_pose, state) # Update "objects_found" set for target objects observed_target_objects = { objid for objid in z.objposes if ( state.object_states[objid].objclass == "target" and z.objposes[objid] != ObjectObservation.NULL ) } next_robot_state["objects_found"] = tuple( set(next_robot_state["objects_found"]) | set(observed_target_objects) ) return next_robot_state
[docs] def sample(self, state, action): """Returns next_robot_state""" return self.argmax(state, action)
# Utility functions
[docs] def valid_pose(pose, width, length, state=None, check_collision=True, pose_objid=None): """ Returns True if the given `pose` (x,y) is a valid pose; If `check_collision` is True, then the pose is only valid if it is not overlapping with any object pose in the environment state. """ x, y = pose[:2] # Check collision with obstacles if check_collision and state is not None: object_poses = state.object_poses for objid in object_poses: if state.object_states[objid].objclass.startswith("obstacle"): if objid == pose_objid: continue if (x, y) == object_poses[objid]: return False return in_boundary(pose, width, length)
[docs] def in_boundary(pose, width, length): # Check if in boundary x, y = pose[:2] if x >= 0 and x < width: if y >= 0 and y < length: if len(pose) == 3: th = pose[2] # radian if th < 0 or th > 2 * math.pi: return False return True return False