Source code for pomdp_py.problems.tag.agent.agent

import pomdp_py
import copy
import random
from pomdp_py.problems.tag.domain.observation import *
from pomdp_py.problems.tag.domain.action import *
from pomdp_py.problems.tag.domain.state import *
from pomdp_py.problems.tag.models.observation_model import *
from pomdp_py.problems.tag.models.transition_model import *
from pomdp_py.problems.tag.models.reward_model import *
from pomdp_py.problems.tag.models.policy_model import *
from pomdp_py.problems.tag.models.components.motion_policy import *
from pomdp_py.problems.tag.models.components.grid_map import *


## initialize belief
[docs] def initialize_belief(grid_map, init_robot_position, prior={}): """Initialize belief. Args: grid_map (GridMap): Holds information of the map occupancy prior (dict): A map from (x,y)->[0,1]. If empty, the belief will be uniform.""" hist = {} # state -> prob total_prob = 0.0 for target_position in prior: state = TagState(init_robot_position, target_position, False) hist[state] = prior[target_position] total_prob += hist[state] for x in range(grid_map.width): for y in range(grid_map.length): if (x, y) in grid_map.obstacle_poses: # Skip obstacles continue state = TagState(init_robot_position, (x, y), False) if len(prior) > 0: if (x, y) not in prior: hist[state] = 1e-9 else: hist[state] = 1.0 total_prob += hist[state] # Normalize for state in hist: hist[state] /= total_prob hist_belief = pomdp_py.Histogram(hist) return hist_belief
[docs] def initialize_particles_belief( grid_map, init_robot_position, num_particles=100, prior={} ): """Initialize belief. Args: grid_map (GridMap): Holds information of the map occupancy prior (dict): A map from (x,y)->[0,1]. If empty, the belief will be uniform.""" particles = [] if len(prior) > 0: # prior knowledge provided. Just use the prior knowledge prior_sum = sum(prior[pose] for pose in prior) for pose in prior[objid]: state = TagState(init_robot_position, pose) amount_to_add = (prior[objid][pose] / prior_sum) * num_particles for _ in range(amount_to_add): particles.append(state) else: while len(particles) < num_particles: target_position = ( random.randint(0, grid_map.width - 1), random.randint(0, grid_map.length - 1), ) if target_position in grid_map.obstacle_poses: # Skip obstacles continue state = TagState(init_robot_position, target_position, False) particles.append(state) return pomdp_py.Particles(particles)
## belief update
[docs] def belief_update(agent, real_action, real_observation): # Update agent belief current_mpe_state = agent.cur_belief.mpe() next_robot_position = agent.transition_model.sample( current_mpe_state, real_action ).robot_position next_state_space = set({}) for state in agent.cur_belief: next_state = copy.deepcopy(state) next_state.robot_position = next_robot_position next_state_space.add(next_state) new_belief = pomdp_py.update_histogram_belief( agent.cur_belief, real_action, real_observation, agent.observation_model, agent.transition_model, next_state_space=next_state_space, ) agent.set_belief(new_belief)
[docs] class TagAgent(pomdp_py.Agent): def __init__(self, init_belief, grid_map, pr_stay=0.2, small=1, big=10): self._grid_map = grid_map target_motion_policy = TagTargetMotionPolicy(grid_map, pr_stay) transition_model = TagTransitionModel(grid_map, target_motion_policy) reward_model = TagRewardModel(small=small, big=big) observation_model = TagObservationModel() policy_model = TagPolicyModel(grid_map=grid_map) super().__init__( init_belief, policy_model, transition_model=transition_model, observation_model=observation_model, reward_model=reward_model, )
[docs] def clear_history(self): """Custum function; clear history""" self._history = None