Source code for problems.tag.env.visual

"""Largely based on MosViz, except this is not an OO-POMDP"""

import pygame
import cv2
import math
import numpy as np
import random
import pomdp_py.utils as util
from pomdp_py.problems.tag.env.env import *
from pomdp_py.problems.tag.domain.observation import *
from pomdp_py.problems.tag.domain.action import *
from pomdp_py.problems.tag.domain.state import *
from pomdp_py.problems.tag.example_worlds import *
from pomdp_py.problems.tag.models.observation_model import *


#### Visualization through pygame ####
[docs] class TagViz: def __init__(self, env, res=30, fps=30, controllable=False, observation_model=None): self._env = env self._res = res self._img = self._make_gridworld_image(res) self._last_observation = None self._last_action = None self._last_belief = None self._observation_model = observation_model self._controllable = controllable self._running = False self._fps = fps self._playtime = 0.0 self._target_color = (200, 0, 50) def _make_gridworld_image(self, r): # Preparing 2d array w, l = self._env.width, self._env.length arr2d = np.full((self._env.width, self._env.length), 0) # free grids # Creating image img = np.full((w * r, l * r, 3), 255, dtype=np.int32) for x in range(w): for y in range(l): if (x, y) not in self._env.grid_map.obstacle_poses: arr2d[x, y] == 0 # free cv2.rectangle( img, (y * r, x * r), (y * r + r, x * r + r), (255, 255, 255), -1 ) else: arr2d[x, y] == 1 # obstacle cv2.rectangle( img, (y * r, x * r), (y * r + r, x * r + r), (40, 31, 3), -1 ) cv2.rectangle( img, (y * r, x * r), (y * r + r, x * r + r), (0, 0, 0), 1, 8 ) return img @property def img_width(self): return self._img.shape[0] @property def img_height(self): return self._img.shape[1] @property def last_observation(self): return self._last_observation
[docs] def update(self, action, observation, belief): """ Update the visualization after there is new real action and observation and updated belief. """ self._last_action = action self._last_observation = observation self._last_belief = belief
[docs] @staticmethod def draw_robot(img, x, y, th, size, color=(255, 12, 12)): radius = int(round(size / 2)) cv2.circle(img, (y + radius, x + radius), radius, color, thickness=6)
# endpoint = (y+radius + int(round(radius*math.sin(th))), # x+radius + int(round(radius*math.cos(th)))) # cv2.line(img, (y+radius,x+radius), endpoint, color, 2)
[docs] @staticmethod def draw_observation(img, z, rx, ry, rth, r, size, color=(12, 12, 255)): assert type(z) == TagObservation, "%s != TagObservation" % (str(type(z))) radius = int(round(r / 2)) if z.target_position is not None: lx, ly = z.target_position cv2.circle( img, (ly * r + radius, lx * r + radius), size, color, thickness=-1 )
# TODO! Deprecated.
[docs] @staticmethod def draw_belief(img, belief, r, size, target_color): """belief (OOBelief)""" radius = int(round(r / 2)) circle_drawn = {} # map from pose to number of times drawn hist = belief.get_histogram() color = target_color last_val = -1 count = 0 for state in reversed(sorted(hist, key=hist.get)): if last_val != -1: color = util.lighter(color, 1 - hist[state] / last_val) if np.mean(np.array(color) / np.array([255, 255, 255])) < 0.999: tx, ty = state.target_position if (tx, ty) not in circle_drawn: circle_drawn[(tx, ty)] = 0 circle_drawn[(tx, ty)] += 1 cv2.circle( img, (ty * r + radius, tx * r + radius), size // circle_drawn[(tx, ty)], color, thickness=-1, ) last_val = hist[state] count += 1 if last_val <= 0: break
# PyGame interface functions
[docs] def on_init(self): """pygame init""" pygame.init() # calls pygame.font.init() # init main screen and background self._display_surf = pygame.display.set_mode( (self.img_width, self.img_height), pygame.HWSURFACE ) self._background = pygame.Surface(self._display_surf.get_size()).convert() self._clock = pygame.time.Clock() # Font self._myfont = pygame.font.SysFont("Comic Sans MS", 30) self._running = True
[docs] def on_event(self, event): if event.type == pygame.QUIT: self._running = False # TODO! DEPRECATED! elif event.type == pygame.KEYDOWN: u = None # control signal according to motion model action = None # control input by user if event.key == pygame.K_LEFT: action = MoveWest2D elif event.key == pygame.K_RIGHT: action = MoveEast2D elif event.key == pygame.K_DOWN: action = MoveSouth2D elif event.key == pygame.K_UP: action = MoveNorth2D elif event.key == pygame.K_SPACE: action = TagAction() if action is None: return if self._controllable: reward = self._env.state_transition(action, execute=True) robot_pose = self._env.state.robot_position z = None if self._observation_model is not None: z = self._observation_model.sample(self._env.state, action) self._last_observation = z print(" state: %s" % str(self._env.state)) print(" action: %s" % str(action.name)) print(" observation: %s" % str(z)) print(" reward: %s" % str(reward)) print( " valid motions: %s" % str( self._env.grid_map.valid_motions(self._env.state.robot_position) ) ) print("------------") if self._env.state.target_found: self._running = False return action
[docs] def on_loop(self): self._playtime += self._clock.tick(self._fps) / 1000.0
[docs] def on_render(self): # self._display_surf.blit(self._background, (0, 0)) self.render_env(self._display_surf) rx, ry = self._env.state.robot_position fps_text = "FPS: {0:.2f}".format(self._clock.get_fps()) last_action = self._last_action last_action_str = "no_action" if last_action is None else str(last_action) pygame.display.set_caption( "%s | Robot(%.2f,%.2f,%.2f) | %s | %s" % (last_action_str, rx, ry, 0, str(self._env.state.target_found), fps_text) ) pygame.display.flip()
[docs] def on_cleanup(self): pygame.quit()
[docs] def on_execute(self): if self.on_init() == False: self._running = False while self._running: for event in pygame.event.get(): self.on_event(event) self.on_loop() self.on_render() self.on_cleanup()
[docs] def render_env(self, display_surf): img = np.copy(self._img) r = self._res # Not radius! It's resolution. # draw target tx, ty = self._env.state.target_position cv2.rectangle( img, (ty * r, tx * r), (ty * r + r, tx * r + r), (255, 165, 0), -1 ) # draw robot rx, ry = self._env.state.robot_position r = self._res # Not radius! # last_observation = self._last_observation.get(robot_id, None) # last_viz_observation = self._last_viz_observation.get(robot_id, None) # last_belief = self._last_belief.get(robot_id, None) if self._last_belief is not None: TagViz.draw_belief(img, self._last_belief, r, r // 3, self._target_color) if self._last_observation is not None: TagViz.draw_observation( img, self._last_observation, rx, ry, 0, r, r // 8, color=(20, 20, 180) ) TagViz.draw_robot(img, rx * r, ry * r, 0, r, color=(200, 12, 150)) pygame.surfarray.blit_array(display_surf, img)
# TODO! DEPRECATED!
[docs] def unittest(): worldmap, robot = world0 env = TagEnvironment.from_str(worldmap) observation_model = TagObservationModel() viz = TagViz(env, controllable=True, observation_model=observation_model) viz.on_execute()
if __name__ == "__main__": unittest()