Source code for pomdp_py.utils.interfaces.conversion

"""
Provides utility to convert a POMDP written in pomdp_py to
a POMDP file format (.pomdp or .pomdpx). Output to a file.
"""

import subprocess
import os
import pomdp_py
import numpy as np
import xml.etree.ElementTree as ET


[docs] def to_pomdp_file(agent, output_path=None, discount_factor=0.95, float_precision=9): """ Pass in an Agent, and use its components to generate a .pomdp file to `output_path`. The .pomdp file format is specified at: http://www.pomdp.org/code/pomdp-file-spec.html Note: * It is assumed that the reward is independent of the observation. * The state, action, and observations of the agent must be explicitly enumerable. * The state, action and observations of the agent must be convertable to a string that does not contain any blank space. Args: agent (~pomdp_py.framework.basics.Agent): The agent output_path (str): The path of the output file to write in. Optional. Default None. discount_factor (float): The discount factor float_precision (int): Number of decimals for float to str conversion. Default 6. Returns: (list, list, list): The list of states, actions, observations that are ordered in the same way as they are in the .pomdp file. """ # Preamble try: all_states = list(agent.all_states) all_actions = list(agent.all_actions) all_observations = list(agent.all_observations) except NotImplementedError: raise ValueError( "S, A, O must be enumerable for a given agent to convert to .pomdp format" ) content = f"discount: %.{float_precision}f\n" % discount_factor content += "values: reward\n" # We only consider reward, not cost. list_of_states = " ".join(str(s) for s in all_states) assert len(list_of_states.split(" ")) == len( all_states ), "states must be convertable to strings without blank spaces" content += "states: %s\n" % list_of_states list_of_actions = " ".join(str(a) for a in all_actions) assert len(list_of_actions.split(" ")) == len( all_actions ), "actions must be convertable to strings without blank spaces" content += "actions: %s\n" % list_of_actions list_of_observations = " ".join(str(o) for o in all_observations) assert len(list_of_observations.split(" ")) == len( all_observations ), "observations must be convertable to strings without blank spaces" content += "observations: %s\n" % list_of_observations # Starting belief state - they need to be normalized total_belief = sum(agent.belief[s] for s in all_states) content += "start: %s\n" % ( " ".join( [ f"%.{float_precision}f" % (agent.belief[s] / total_belief) for s in all_states ] ) ) # State transition probabilities - they need to be normalized for s in all_states: for a in all_actions: probs = [] for s_next in all_states: prob = agent.transition_model.probability(s_next, s, a) probs.append(prob) total_prob = sum(probs) for i, s_next in enumerate(all_states): prob_norm = probs[i] / total_prob content += f"T : %s : %s : %s %.{float_precision}f\n" % ( a, s, s_next, prob_norm, ) # Observation probabilities - they need to be normalized for s_next in all_states: for a in all_actions: probs = [] for o in all_observations: prob = agent.observation_model.probability(o, s_next, a) probs.append(prob) total_prob = sum(probs) assert ( total_prob > 0.0 ), "No observation is probable under state={} action={}".format(s_next, a) for i, o in enumerate(all_observations): prob_norm = probs[i] / total_prob content += f"O : %s : %s : %s %.{float_precision}f\n" % ( a, s_next, o, prob_norm, ) # Immediate rewards for s in all_states: for a in all_actions: for s_next in all_states: # We will take the argmax reward, which works for deterministic rewards. r = agent.reward_model.sample(s, a, s_next) content += f"R : %s : %s : %s : * %.{float_precision}f\n" % ( a, s, s_next, r, ) if output_path is not None: with open(output_path, "w") as f: f.write(content) return all_states, all_actions, all_observations
[docs] def to_pomdpx_file(agent, pomdpconvert_path, output_path=None, discount_factor=0.95): """ Converts an agent to a pomdpx file. This works by first converting the agent into a .pomdp file and then using the :code:`pomdpconvert` utility program to convert that file to a .pomdpx file. Check out :code:`pomdpconvert` at `github://AdaCompNUS/sarsop <https://github.com/AdaCompNUS/sarsop>`_ Follow the instructions at https://github.com/AdaCompNUS/sarsop to download and build sarsop (I tested on Ubuntu 18.04, gcc version 7.5.0) See documentation for pomdpx at: https://bigbird.comp.nus.edu.sg/pmwiki/farm/appl/index.php?n=Main.PomdpXDocumentation First converts the agent into .pomdp, then convert it to pomdpx. Args: agent (~pomdp_py.framework.basics.Agent): The agent pomdpconvert_path (str): Path to the :code:`pomdpconvert` binary output_path (str): The path of the output file to write in. Optional. Default None. discount_factor (float): The discount factor """ pomdp_path = "./temp-pomdp.pomdp" to_pomdp_file(agent, pomdp_path, discount_factor=discount_factor) proc = subprocess.Popen([pomdpconvert_path, pomdp_path]) proc.wait() pomdpx_path = pomdp_path + "x" assert os.path.exists(pomdpx_path), "POMDPx conversion failed." with open(pomdpx_path, "r") as f: content = f.read() if output_path is not None: os.rename(pomdpx_path, output_path) # Delete temporary files os.remove(pomdp_path) if os.path.exists(pomdpx_path): os.remove(pomdpx_path)
def parse_pomdp_solve_output(alpha_path, pg_path=None): """Parse the output of pomdp_solve, given by an .alpha file and a .pg file. Given a path to a .alpha file, read and interpret its contents. The file formats are specified at: https://www.pomdp.org/code/alpha-file-spec.html https://www.pomdp.org/code/pg-file-spec.html Note on policy graph (from the official website): To use this first requires knowing which of the policy graph states to start in. This can be achieved by finding the alpha vector with the maximal dot product with the initial starting state. Note: Parsing the .alpha file is required. The .pg path is optional (this is because I noticed some errors in the .pg file produced) Returns: alphas: [(alpha_vector, action_number) ...] policy_graph: a mapping from node number to (action_number, edges) """ alphas = [] # (alpha_vector, action_number) tuples with open(alpha_path, "r") as f: action_number = None alpha_vector = None mode = "action" for line in f: line = line.rstrip() if len(line) == 0: continue if mode == "action": action_number = int(line) mode = "alpha" elif mode == "alpha": alpha_vector = tuple(map(float, line.split(" "))) alphas.append((alpha_vector, action_number)) mode = "action" action_number = None alpha_vector = None policy_graph = {} # a mapping from node number to (action_number, edges) if pg_path is None: return alphas else: with open(pg_path, "r") as f: for line in f: line = line.rstrip() if len(line) == 0: continue parts = list(map(int, line.split())) # Splits on whitespace assert parts[0] not in policy_graph, ( "The node id %d already exists. Something wrong" % parts[0] ) policy_graph[parts[0]] = (parts[1], parts[2:]) return alphas, policy_graph
[docs] class AlphaVectorPolicy(pomdp_py.Planner): """ An offline POMDP policy is specified by a collection of alpha vectors, each associated with an action. When planning is needed, the dot product of these alpha vectors and the agent's belief vector is computed and the alpha vector leading to the maximum is the 'dominating' alpha vector and we return its corresponding action. An offline policy can be optionally represented as a policy graph. In this case, the agent can plan without actively maintaining a belief because the policy graph is a finite state machine that transitions by observations. This can be constructed using .policy file created by sarsop. """ def __init__(self, alphas, states): """ Args: alphas (list): A list of (alpha_vector, action) tuples. An alpha_vector is a list of floats :code:`[V1, V2, ... VN]`. states (list): List of states, ordered as in .pomdp file """ self.alphas = alphas self.states = states
[docs] def plan(self, agent): """Returns an action that is mapped by the agent belief, under this policy""" b = [agent.belief[s] for s in self.states] _, action = max(self.alphas, key=lambda va: np.dot(b, va[0])) return action
[docs] def value(self, belief): """ Returns the value V(b) under this alpha vector policy. :math:`V(b) = max_{a\in\Gamma} {a} \cdot b` """ b = [belief[s] for s in self.states] alpha_vector, _ = max(self.alphas, key=lambda va: np.dot(b, va[0])) return np.dot(b, alpha_vector)
[docs] @classmethod def construct(cls, policy_path, states, actions, solver="pomdpsol"): """ Returns an AlphaVectorPolicy, given `alphas`, which are the output of parse_appl_policy_file. Args: policy_path (str): Path to the generated .policy file (for sarsop) or .alpha file (for pomdp-solve) states (list): A list of States, in the same order as in the .pomdp file actions (list): A list of Actions, in the same order as in the .pomdp file Returns: AlphaVectorPolicy: The policy stored in the given policy file. """ if solver == "pomdp-solve" or solver == "vi": return cls.construct_from_pomdp_solve(policy_path, states, actions) elif solver == "pomdpsol" or solver == "sarsop": alphas = [] root = ET.parse(policy_path).getroot() for vector in root.find("AlphaVector").iter("Vector"): action = actions[int(vector.attrib["action"])] alpha_vector = tuple(map(float, vector.text.split())) alphas.append((alpha_vector, action)) return AlphaVectorPolicy(alphas, states)
@classmethod def construct_from_pomdp_solve(cls, alpha_path, states, actions): alphas_with_action_numbers = parse_pomdp_solve_output(alpha_path) alphas = [ (alpha_vector, actions[action_number]) for alpha_vector, action_number in alphas_with_action_numbers ] return AlphaVectorPolicy(alphas, states)
class PGNode: """A node on the policy graph""" def __init__(self, node_id, alpha_vector, action): self.node_id = node_id self.alpha_vector = alpha_vector self.action = action def __eq__(self, other): if isinstance(other, PolicyNode): return self.node_id == other.node_id return False def __hash__(self): return hash(self.node_id) def __str__(self): return repr(self) def __repr__(self): return "NodeID(%d)::AlphaVector(%s)::Action(%s)\n" % ( self.node_id, str(self.alpha_vector), self.action, )
[docs] class PolicyGraph(pomdp_py.Planner): """A PolicyGraph encodes a POMDP plan. It can be constructed from the alphas and policy graph format output by Cassandra's pomdp-solver.""" def __init__(self, nodes, edges, states): """ Initializes a PolicyGraph. Args: nodes (list): A list of PGNodes edges (dict): Mapping from node_id to a dictionary {observation -> node_id} states (list): List of states, ordered as in .pomdp file """ self.nodes = {n.node_id: n for n in nodes} self.edges = edges self.states = states self._current_node = None
[docs] @classmethod def construct(cls, alpha_path, pg_path, states, actions, observations): """ See parse_pomdp_solve_output for detailed definitions of alphas and pg. Args: alpha_path (str): Path to .alpha file pg_path (str): Path to .pg file states (list): List of states, ordered as in .pomdp file actions (list): List of actions, ordered as in .pomdp file observations (list): List of observations, ordered as in .pomdp file """ # alphas (list): List of ( [V1, V2, ... VN], A ) tuples # pg (dict): {node_id -> (A, edges)} alphas, pg = parse_pomdp_solve_output(alpha_path, pg_path) nodes = [] for node_id, (alpha_vector, action_number) in enumerate(alphas): node = PGNode(node_id, alpha_vector, actions[action_number]) nodes.append(node) edges = {} for node_id in pg: assert 0 <= node_id < len(nodes), "Invalid node id in policy graph" action_number, o_links = pg[node_id] assert ( actions[action_number] == nodes[node_id].action ), "Inconsistent action mapping" edges[node_id] = {} for o_index, next_node_id in enumerate(o_links): observation = observations[o_index] edges[node_id][observation] = next_node_id return PolicyGraph(nodes, edges, states)
[docs] def plan(self, agent): """Returns an action that is mapped by the agent belief, under this policy""" if self._current_node is None: self._current_node = self._find_node(agent) return self._current_node.action
def _find_node(self, agent): """Locate the node in the policy graph corresponding to the agent's current belief state.""" b = [agent.belief[s] for s in self.states] nid = max(self.nodes, key=lambda nid: np.dot(b, self.nodes[nid].alpha_vector)) return self.nodes[nid]
[docs] def update(self, agent, action, observation): """ Updates the planner based on real action and observation. Basically sets the current node pointer based on the incoming observation.""" if self._current_node is None: # Find out the node number using agent current belief self._current_node = self._find_node(agent) # Transition the current node following the graph self._current_node = self.nodes[ self.edges[self._current_node.node_id][observation] ]