Source code for pomdp_py.problems.tag.problem

import pomdp_py
import random
from pomdp_py.problems.tag.env.env import *
from pomdp_py.problems.tag.env.visual import *
from pomdp_py.problems.tag.agent.agent import *
from pomdp_py.problems.tag.example_worlds import *
import time


[docs] class TagProblem(pomdp_py.POMDP): def __init__( self, init_robot_position, init_target_position, grid_map, pr_stay=0.2, small=1, big=10, prior="uniform", belief_type="hist", num_particles=6, ): init_state = TagState(init_robot_position, init_target_position, False) env = TagEnvironment(init_state, grid_map, pr_stay=pr_stay, small=1, big=10) if prior == "uniform": prior = {} elif prior == "informed": prior = {init_target_position: 1.0} else: raise ValueError("Unrecognized prior type: %s" % prior) if belief_type == "particles": init_belief = initialize_particles_belief( grid_map, init_robot_position, prior=prior, num_particles=num_particles ) else: init_belief = initialize_belief(grid_map, init_robot_position, prior=prior) agent = TagAgent(init_belief, grid_map, pr_stay=pr_stay, small=1, big=10) super().__init__(agent, env, name="TagProblem")
[docs] def solve( problem, planner_type="pouct", max_depth=10, # planning horizon discount_factor=0.99, planning_time=1.0, # amount of time (s) to plan each step exploration_const=1000, # exploration constant visualize=True, max_time=120, # maximum amount of time allowed to solve the problem max_steps=500, ): # maximum number of planning steps the agent can take. if planner_type == "pouct": planner = pomdp_py.POUCT( max_depth=max_depth, discount_factor=discount_factor, planning_time=planning_time, exploration_const=exploration_const, rollout_policy=problem.agent.policy_model, ) else: planner = pomdp_py.POMCP( max_depth=max_depth, discount_factor=discount_factor, planning_time=planning_time, exploration_const=exploration_const, rollout_policy=problem.agent.policy_model, ) if visualize: viz = TagViz(problem.env, controllable=False) if viz.on_init() == False: raise Exception("Environment failed to initialize") viz.update(None, None, problem.agent.cur_belief) viz.on_render() _discount = 1.0 _time_used = 0 _find_actions_count = 0 _total_reward = 0 # total, undiscounted reward _total_discounted_reward = 0 for i in range(max_steps): # Plan action _start = time.time() real_action = planner.plan(problem.agent) _time_used += time.time() - _start if _time_used > max_time: break # no more time to update. # Execute action reward = problem.env.state_transition(real_action, execute=True) # Receive observation _start = time.time() real_observation = problem.env.provide_observation( problem.agent.observation_model, real_action ) # Updates problem.agent.clear_history() # truncate history problem.agent.update_history(real_action, real_observation) planner.update(problem.agent, real_action, real_observation) if planner_type == "pouct": belief_update(problem.agent, real_action, real_observation) _time_used += time.time() - _start # Info and render _total_reward += reward _total_discounted_reward += reward * _discount _discount = _discount * discount_factor print("==== Step %d ====" % (i + 1)) print("Action: %s" % str(real_action)) print("Observation: %s" % str(real_observation)) print("Reward: %s" % str(reward)) print("Reward (Cumulative): %s" % str(_total_reward)) print("Reward (Discounted): %s" % str(_total_discounted_reward)) print("Find Actions Count: %d" % _find_actions_count) if isinstance(planner, pomdp_py.POUCT): print("__num_sims__: %d" % planner.last_num_sims) if visualize: viz.update(real_action, real_observation, problem.agent.cur_belief) viz.on_loop() viz.on_render() # Termination check if problem.env.state.target_found: print("Done!") break if _time_used > max_time: print("Maximum time reached.") break if _discount * 10 < 1e-4: print("Discount factor already too small") break return _total_discounted_reward
[docs] def main(): worldstr, robotstr = world0 grid_map = GridMap.from_str(worldstr) free_cells = grid_map.free_cells() init_robot_position = random.sample(free_cells, 1)[0] init_target_position = random.sample(free_cells, 1)[0] problem = TagProblem( init_robot_position, init_target_position, grid_map, pr_stay=0.2, small=1, big=10, prior="uniform", belief_type="histogram", ) solve( problem, max_depth=15, discount_factor=0.95, planning_time=0.8, exploration_const=20, visualize=True, max_time=360, max_steps=251, planner_type="pouct", )
if __name__ == "__main__": main()