Source code for pomdp_py.problems.multi_object_search.env.env

"""The Environment"""

import pomdp_py
import copy
from pomdp_py.problems.multi_object_search.models.transition_model import *
from pomdp_py.problems.multi_object_search.models.reward_model import *
from pomdp_py.problems.multi_object_search.models.components.sensor import *
from pomdp_py.problems.multi_object_search.domain.state import *


[docs] class MosEnvironment(pomdp_py.Environment): """""" def __init__(self, dim, init_state, sensors, obstacles=set({})): """ Args: sensors (dict): Map from robot_id to sensor (Sensor); Sensors equipped on robots; Used to determine which objects should be marked as found. obstacles (set): set of object ids that are obstacles; The set difference of all object ids then yields the target object ids.""" self.width, self.length = dim self.sensors = sensors self.obstacles = obstacles transition_model = MosTransitionModel( dim, sensors, set(init_state.object_states.keys()) ) # Target objects, a set of ids, are not robot nor obstacles self.target_objects = { objid for objid in set(init_state.object_states.keys()) - self.obstacles if not isinstance(init_state.object_states[objid], RobotState) } reward_model = GoalRewardModel(self.target_objects) super().__init__(init_state, transition_model, reward_model) @property def robot_ids(self): return set(self.sensors.keys())
[docs] def state_transition(self, action, execute=True, robot_id=None): """state_transition(self, action, execute=True, **kwargs) Overriding parent class function. Simulates a state transition given `action`. If `execute` is set to True, then the resulting state will be the new current state of the environment. Args: action (Action): action that triggers the state transition execute (bool): If True, the resulting state of the transition will become the current state. Returns: float or tuple: reward as a result of `action` and state transition, if `execute` is True (next_state, reward) if `execute` is False. """ assert ( robot_id is not None ), "state transition should happen for a specific robot" next_state = copy.deepcopy(self.state) next_state.object_states[robot_id] = self.transition_model[robot_id].sample( self.state, action ) reward = self.reward_model.sample( self.state, action, next_state, robot_id=robot_id ) if execute: self.apply_transition(next_state) return reward else: return next_state, reward
#### Interpret string as an initial world state ####
[docs] def interpret(worldstr): """ Interprets a problem instance description in `worldstr` and returns the corresponding MosEnvironment. For example: This string .. code-block:: text rx... .x.xT ..... *** r: laser fov=90 min_range=1 max_range=10 describes a 3 by 5 world where x indicates obsticles and T indicates the "target object". T could be replaced by any upper-case letter A-Z which will serve as the object's id. Lower-case letters a-z (except for x) serve as id for robot(s). After the world, the :code:`***` signals description of the sensor for each robot. For example "r laser 90 1 10" means that robot `r` will have a Laser2Dsensor with fov 90, min_range 1.0, and max_range of 10.0. Args: worldstr (str): a string that describes the initial state of the world. Returns: MosEnvironment: the corresponding environment for the world description. """ worldlines = [] sensorlines = [] mode = "world" for line in worldstr.splitlines(): line = line.strip() if len(line) > 0: if line == "***": mode = "sensor" continue if mode == "world": worldlines.append(line) if mode == "sensor": sensorlines.append(line) lines = [line for line in worldlines if len(line) > 0] w, l = len(worldlines[0]), len(worldlines) objects = {} # objid -> object_state(pose) obstacles = set({}) # objid robots = {} # robot_id -> robot_state(pose) sensors = {} # robot_id -> Sensor # Parse world for y, line in enumerate(worldlines): if len(line) != w: raise ValueError( "World size inconsistent.Expected width: %d; Actual Width: %d" % (w, len(line)) ) for x, c in enumerate(line): if c == "x": # obstacle objid = 1000 + len(obstacles) # obstacle id objects[objid] = ObjectState(objid, "obstacle", (x, y)) obstacles.add(objid) elif c.isupper(): # target object objid = len(objects) objects[objid] = ObjectState(objid, "target", (x, y)) elif c.islower(): # robot robot_id = interpret_robot_id(c) robots[robot_id] = RobotState(robot_id, (x, y, 0), (), None) else: assert c == ".", "Unrecognized character %s in worldstr" % c if len(robots) == 0: raise ValueError("No initial robot pose!") if len(objects) == 0: raise ValueError("No object!") # Parse sensors for line in sensorlines: if "," in line: raise ValueError( "Wrong Fromat. SHould not have ','. Separate tokens with space." ) robot_name = line.split(":")[0].strip() robot_id = interpret_robot_id(robot_name) assert robot_id in robots, "Sensor specified for unknown robot %s" % ( robot_name ) sensor_setting = line.split(":")[1].strip() sensor_type = sensor_setting.split(" ")[0].strip() sensor_params = {} for token in sensor_setting.split(" ")[1:]: param_name = token.split("=")[0].strip() param_value = eval(token.split("=")[1].strip()) sensor_params[param_name] = param_value if sensor_type == "laser": sensor = Laser2DSensor(robot_id, **sensor_params) elif sensor_type == "proximity": sensor = ProximitySensor(robot_id, **sensor_params) else: raise ValueError("Unknown sensor type %s" % sensor_type) sensors[robot_id] = sensor return (w, l), robots, objects, obstacles, sensors
[docs] def interpret_robot_id(robot_name): return -ord(robot_name)
#### Utility functions for building the worldstr ####
[docs] def equip_sensors(worldmap, sensors): """ Args: worldmap (str): a string that describes the initial state of the world. sensors (dict) a map from robot character representation (e.g. 'r') to a string that describes its sensor (e.g. 'laser fov=90 min_range=1 max_range=5 angle_increment=5') Returns: str: A string that can be used as input to the `interpret` function """ worldmap += "\n***\n" for robot_char in sensors: worldmap += "%s: %s\n" % (robot_char, sensors[robot_char]) return worldmap
[docs] def make_laser_sensor(fov, dist_range, angle_increment, occlusion): """ Returns string representation of the laser scanner configuration. For example: "laser fov=90 min_range=1 max_range=10" Args: fov (int or float): angle between the start and end beams of one scan (degree). dist_range (tuple): (min_range, max_range) angle_increment (int or float): angular distance between measurements (rad). occlusion (bool): True if consider occlusion Returns: str: String representation of the laser scanner configuration. """ fovstr = "fov=%s" % str(fov) rangestr = "min_range=%s max_range=%s" % (str(dist_range[0]), str(dist_range[1])) angicstr = "angle_increment=%s" % (str(angle_increment)) occstr = "occlusion_enabled=%s" % str(occlusion) return "laser %s %s %s %s" % (fovstr, rangestr, angicstr, occstr)
[docs] def make_proximity_sensor(radius, occlusion): """ Returns string representation of the proximity sensor configuration. For example: "proximity radius=5 occlusion_enabled=False" Args: radius (int or float) occlusion (bool): True if consider occlusion Returns: str: String representation of the proximity sensor configuration. """ radiustr = "radius=%s" % str(radius) occstr = "occlusion_enabled=%s" % str(occlusion) return "proximity %s %s" % (radiustr, occstr)