Source code for pomdp_py.algorithms.bsp.blqr

"""Implementation of B-LQR algorithm described in "Belief space planning
assuming maximum likelihood observations" :cite:`platt2010belief`

import pomdp_py
import numpy as np
from scipy import optimize

[docs] class BLQR(pomdp_py.Planner): def __init__( self, func_sysd, func_obs, jac_sysd, jac_obs, jac_sysd_u, noise_obs, noise_sysd, Qlarge, L, Q, R, planning_horizon=15, ): """To initialize the planenr of B-LQR, one needs to supply parameters that describe the underlying linear gaussian system, and also matrices that describe the cost function. Note that math symbols (e.g. xt, ut) are vectors or matrices represented as np.arrays. The B-LQR cost function (Eq.14 in the paper): :math:`J(b_{t:T},u_{t:T}) = \bar{m}_T^TQ_{large}\bar{m}_T+\bar{s}_T^T\Lambda\bar{s}_T + \sum_{k=t}^{T-1}\bar{m}_k^TQ\bar{m}_k+\bar{u}_t^TR\bar{u}_t` Args: func_sysd (function): f: (xt, ut) -> xt+1 func_obs (function): g: (xt) -> zt jac_sysd (function): dfdx: (mt, ut) -> At jac_obs (function): dgdx: (mt) -> Ct jac_sysd_u (function): dfdu (mt, ut) -> Bt noise_obs (function): (xt) -> pomdp_py.Gaussian. Potentially state-dependent observation noise noise_sysd (function): (xt) -> pomdp_py.Gaussian. Potentially state-dependent system dynamics noise Qlarge (np.array) matrix of shape :math:`d \times d` where :math:`d` is the dimensionality of the state vector. L (np.array) matrix of shape :math:`d \times d` where :math:`d` is the dimensionality of the state vector. (Same as :math:`\Lambda` in the equation) Q (np.array) matrix of shape :math:`d \times d` where :math:`d` is the dimensionality of the state vector. R (np.array) matrix of shape :math:`c \times c` where :math:`c` is the dimensionality of the control vector. planning_horizon (int): This is the :math:`T`, the planning horizon. """ self._func_sysd = func_sysd self._func_obs = func_obs self._jac_sysd = jac_sysd self._jac_obs = jac_obs self._jac_sysd = jac_sysd_u self._noise_obs = noise_obs self._noise_sysd = noise_sysd self._Qlarge = Qlarge self._L = L # Lambda self._Q = Q self._R = R self._planning_horizon = planning_horizon self._dim_state = self._Q.shape[0] self._dim_control = self._R.shape[0]
[docs] def ekf_update_mlo(self, bt, ut): """ Performs the ekf belief update assuming maximum likelihood observation. This follows equations 12, 13 in the paper. It's the function :math:`F`. Args: bt (tuple): a belief point bt = (mt, Cov_t) representing the belief state. where mt is np.array of shape (d,) and Cov_t is np.array of shape (d,d). The cost function equation needs to turn Cov_t into a long vector consist of the columns of Cov_t stiched together. ut (np.array): control vector noise_t (pomdp_py.Gaussian): The Gaussian noise with "possibly state-dependent" covariance matrix Wt. noise_sysd (np.array): A noise term (e.g. Gaussian noise) added to the system dynamics (:math:`v` in Eq.12). This array should have the sam dimensionality as mt. noise_obs_cov (np.array): The covariance matrix of the Gaussian noise of the observation function. This corresponds to Wt in equation 13. """ # TODO: FIX mt, Cov_t = bt At = self._jac_sysd(mt, ut) Ct = self._jac_obs(mt) # based on maximum likelihood observation Wt = self._noise_obs(mt).cov Gat =, Cov_t), At.transpose()) # Gamma_t = At*Cov_t*At^T CGC_W_inv = np.linalg.inv(, Gat), Ct.transpose()) + Wt) Cov_next = Gat -, Ct.transpose()), CGC_W_inv), Ct), Gat ) m_next = self._func_sysd(mt, ut) + self._noise_sysd(mt).random() return (m_next, Cov_next)
def _b_u_seg(self, x, i): """Returns the elements of x that corresponds to the belief and controls, as a tuple (m_i, Covi, u_i)""" d = self._dim_state c = self._dim_control start = i * (d + (d * d) + c) end = (i + 1) * (d + (d * d) + c) m_i_end = i * (d + (d * d) + c) + d Covi_end = i * (d + (d * d) + c) + d + d * d u_end = i * (d + (d * d) + c) + d + d * d + c return x[start:m_i_end], x[m_i_end:Covi_end], x[Covi_end:u_end] def _control_max_constraint(self, x, i, max_val): _, _, u_i = self._b_u_seg(x, i) return max_val - u_i def _control_min_constraint(self, x, i, min_val): _, _, u_i = self._b_u_seg(x, i) return u_i - min_val def _mean_final_constraint(self, x, m_des, num_segments): m_k, _, _ = self._b_u_seg(x, num_segments - 1) return m_k - m_des def _mean_start_constraint(self, x, m_0): m_k, _, _ = self._b_u_seg(x, 0) return m_k - m_0 def _belief_constraint(self, x, i, num_segments): m_i, s_i, u_i = self._b_u_seg(x, i) Cov_i = s_i.reshape(self._dim_state, self._dim_state).transpose() m_ip1, Cov_ip1 = self.integrate_belief_segment((m_i, Cov_i), u_i, num_segments) s_ip1 = Cov_ip1.transpose().reshape( -1, ) b_i_vec = np.append(m_i, s_i) b_ip1_vec = np.append(m_ip1, s_ip1) return b_i_vec - b_ip1_vec
[docs] def integrate_belief_segment(self, b_i, u_i, num_segments): """This is to represent equation 18. phi(b_i', u_i') = F(b_i', u_i') + sum F(b_{t+1}, u_i) - F(b_t, u_i) t in seg This essentially returns b_{i+1}' """ m_i, Cov_i = b_i b_seg = [self.ekf_update_mlo((m_i, Cov_i), u_i)] for t in range(1, self._planning_horizon // num_segments): b_seg.append(self.ekf_update_mlo(b_seg[t - 1], u_i)) sum_mean = b_seg[0][0] sum_Cov = b_seg[0][1] for t in range(len(b_seg)): if t + 1 < len(b_seg) - 1: sum_mean += b_seg[t + 1][0] - b_seg[t][0] sum_Cov += b_seg[t + 1][1] - b_seg[t][1] return sum_mean, sum_Cov
def _opt_cost_func_seg(self, x, b_des, u_des, num_segments): """This will be used as part of the objective function for scipy optimizer. Therefore we only take in one input vector, x. We require the structure of x to be: [ m1 | Cov 1 | u1 ] ... [ m_i | Cov i | u_i ] Use the _b_u_seg function to obtain the individual parts. """ bu_traj = [] for i in range(num_segments): m_i, s_i, u_i = self._b_u_seg(x, i) Cov_i = s_i.reshape(self._dim_state, self._dim_state).transpose() bu_traj.append(((m_i, Cov_i), u_i)) return self.segmented_cost_function(bu_traj, b_des, u_des, num_segments)
[docs] def segmented_cost_function(self, bu_traj, b_des, u_des, num_segments): """The cost function in eq 17. Args: b_des (tuple): The desired belief (mT, CovT). This is needed to compute the cost function. u_des (list): A list of desired controls at the beginning of each segment. If this information is not available, pass in an empty list. """ if len(u_des) > 0 and len(u_des) != num_segments: raise ValueError( "The list of desired controls, if provided, must have one control" "per segment" ) b_T, u_T = bu_traj[-1] m_T, Cov_T = b_T s_T = Cov_T.transpose().reshape( -1, ) sLs =, self._L), s_T) Summation = 0 for i in range(num_segments): b_i, u_i = bu_traj[i] m_i, _ = b_i m_i_diff = m_i - b_des[0] if len(u_des) > 0: u_i_des = u_des[i] else: u_i_des = np.zeros(self._dim_control) u_i_diff = u_i - u_i_des Summation +=, self._Q), m_i_diff ) +, self._R), u_i_diff) return sLs + Summation
[docs] def create_plan( self, b_0, b_des, u_init, num_segments=5, control_bounds=None, opt_options={}, opt_callback=None, ): """Solves the SQP problem using direct transcription, and produce belief points and controls at segments. Reference:""" # build initial guess from initial belief and the given trajectory. x_0 = [] b_i = b_0 for i in range(num_segments): m_i, Cov_i = b_i s_i = Cov_i.transpose().reshape( -1, ) u_i = u_init[i] x_0.extend(m_i) x_0.extend(s_i) x_0.extend(u_i) b_i = self.integrate_belief_segment(b_i, u_i, num_segments) # constraints constraints = [] ## initial belief constraint cons_start = { "type": "eq", "fun": self._mean_start_constraint, "args": [b_0[0]], } constraints.append(cons_start) ## belief dynamics constraints and control bounds for i in range(num_segments): if i + 1 < num_segments - 1: cons_belief = { "type": "eq", "fun": self._belief_constraint, "args": [i, num_segments], } constraints.append(cons_belief) # control bounds if control_bounds is not None: cons_control_min = { "type": "ineq", "fun": self._control_min_constraint, "args": [i, control_bounds[0]], } cons_control_max = { "type": "ineq", "fun": self._control_max_constraint, "args": [i, control_bounds[1]], } constraints.append(cons_control_min) constraints.append(cons_control_max) # final belief constraint cons_final = { "type": "eq", "fun": self._mean_final_constraint, "args": [b_des[0], num_segments], } constraints.append(cons_final) opt_res = optimize.minimize( self._opt_cost_func_seg, x_0, method="SLSQP", args=(b_des, u_init, num_segments), constraints=constraints, options=opt_options, callback=opt_callback, ) return opt_res
[docs] def interpret_sqp_plan(self, opt_res, num_segments): x_res = opt_res.x plan = [] for i in range(num_segments): m_i, s_i, u_i = self._b_u_seg(x_res, i) Cov_i = s_i.reshape(self._dim_state, self._dim_state).transpose() plan.append((m_i, Cov_i, u_i)) return plan