# Subspace based dimension reduction techniques
from __future__ import division, print_function
import os
if 'DISPLAY' not in os.environ:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import scipy.linalg
from scipy.spatial.distance import cdist
import cvxpy as cp
import cvxopt
from itertools import combinations
from tqdm import tqdm
import time
#from .domains import Domain
from .misc import merge
from .subspace import SubspaceBasedDimensionReduction
from .coord import CoordinateBasedDimensionReduction
from .sample import initial_sample, maximin_coffeehouse
from .geometry import voronoi_vertex_sample
from .geometry import unique_points
from .minimax import minimax
from .function import BaseFunction
from .pgf import PGF
__all__ = ['LipschitzMatrix', 'LipschitzConstant', 'DiagonalLipschitzMatrix']
[docs]class LipschitzMatrix(SubspaceBasedDimensionReduction):
r"""Constructs the subspace-based dimension reduction from the Lipschitz Matrix.
The Lipschitz matrix :math:`\mathbf{L} \in \mathbb{R}^{m \times m}` a matrix that
acts analogously to the Lipschitz constant, defining a function class where
.. math::
\lbrace f: \mathbb{R}^m\to \mathbb{R}: |f(\mathbf{x}_1) - f(\mathbf{x}_2)| \le \|\mathbf{L}(\mathbf{x}_1 - \mathbf{x}_2\|_2 \rbrace.
In general we cannot determine the Lipschitz matrix analytically.
Instead we seek to estimate it via the lower bound based on samples :math:`\lbrace \mathbf{x}_i, f(\mathbf{x}_i)\rbrace_i`
and/or gradients :math:`\lbrace \nabla f(\mathbf{x}_i)\rbrace_i`.
Here we do so by solving a semidefinite program for the symmetric positive definite matrix :math:`\mathbf{M} \in \mathbb{R}^{m\times m}`:
.. math::
\min_{\mathbf{H} \in \mathbb{S}^{m}_+} & \ \text{Trace } \mathbf{H} \\
\text{such that} & \ |f(\mathbf{x}_i) - f(\mathbf{x}_j)|^2 \le (\mathbf{x}_i - \mathbf{x}_j)^\top \mathbf{H} (\mathbf{x}_i - \mathbf{x}_j) \\
& \ \nabla f(\mathbf{x}_k) \nabla f(\mathbf{x}_k)^\top \preceq \mathbf{H}
If the parameter :code:`epsilon` is passed, the :math:`\epsilon`-Lipschitz matrix
is computed instead by solving the semidefinite program
.. math::
\min_{\mathbf{H} \in \mathbb{S}^{m}_+} & \ \text{Trace } \mathbf{H} \\
\text{such that} & \ (|f(\mathbf{x}_i) - f(\mathbf{x}_j)| -\epsilon)^2
\le (\mathbf{x}_i - \mathbf{x}_j)^\top \mathbf{H} (\mathbf{x}_i - \mathbf{x}_j).
Note that when computing the :math:`\epsilon`-Lipschitz matrix,
gradient information cannot be used because gradients no longer constrain
the Lipschitz matrix.
Parameters
----------
epsilon: float or None
Tolerance to be used if find the :math:`\epsilon`-Lipschitz matrix
**kwargs: dict (optional)
Additional parameters to pass to cvxpy
"""
def __init__(self, epsilon = None, method = 'cvxopt', L = None, **kwargs):
if L is not None:
self._L = np.atleast_2d(L)
self._U = None
self.kwargs = kwargs
assert method in ['cvxopt', 'param', 'cvxpy']
if method == 'cvxopt':
self._build_lipschitz_matrix = self._build_lipschitz_matrix_cvxopt
elif method == 'param':
self._build_lipschitz_matrix = self._build_lipschitz_matrix_param
elif method == 'cvxpy':
self._build_lipschitz_matrix = self._build_lipschitz_matrix_cvxpy
if epsilon is not None:
epsilon = float(epsilon)
assert epsilon >= 0, "Epsilon must be positive"
self.epsilon = epsilon
# According to a google groups discussion
# https://groups.google.com/forum/#!topic/cvxopt/HSc80A5mWzY
# Too small tolerances can yield divide by zero errors
# 1e-10 was breaking, using 1e-9 instead
self.kwargs = merge({'solver': 'CVXOPT', 'abstol':1e-9, 'reltol':1e-9, 'feastol':1e-9, 'refinement': 1, 'kktsolver': 'robust'} , kwargs)
def __str__(self):
if self.epsilon is not None and self.epsilon > 0:
return "<Lipschitz Matrix epsilon %g>" % (self.epsilon)
return "<Lipschitz Matrix>"
[docs] def fit(self, X = None, fX = None, grads = None):
r""" Estimate the Lipschitz matrix from data
Parameters
----------
X : array-like (N, m), optional
Input coordinates for function samples
fX: array-like (N,), optional
Values of the function at X[i]
grads: array-like (N,m), optional
Gradients of the function evaluated anywhere
"""
self._init_dim(X = X, grads = grads)
if X is not None and fX is not None:
N = len(X)
assert len(fX) == N, "Dimension of input and output does not match"
X = np.atleast_2d(np.array(X))
self._dimension = X.shape[1]
fX = np.array(fX).reshape(X.shape[0])
elif X is None and fX is None:
X = np.zeros((0,len(self)))
fX = np.zeros((0,))
else:
raise AssertionError("X and fX must both be specified simultaneously or not specified")
if grads is not None:
grads = np.array(grads).reshape(-1,len(self))
else:
grads = np.zeros((0, len(self)))
# Scale the output units for numerical stability
try:
scale1 = np.max(fX) - np.min(fX)
except ValueError:
scale1 = 1.
try:
scale2 = np.max([np.linalg.norm(grad) for grad in grads])
except ValueError:
scale2 = 1.
scale = max(scale1, scale2)
if self.epsilon is not None:
epsilon = self.epsilon/scale
else:
epsilon = 0.
self._fit(X, fX, grads, epsilon, scale)
def _fit(self, X, fX, grads, epsilon, scale):
H = self._build_lipschitz_matrix(X, fX/scale, grads/scale, epsilon)
# Compute the important directions
#self._U, _, _ = np.linalg.svd(self._H)
ew, U = scipy.linalg.eigh(H)
# Because eigenvalues are in ascending order, the subspace basis needs to be flipped
U = U[:,::-1]
ew = ew[::-1]
# Fix the signs for the subspace directions
U = self._fix_subspace_signs(U, X, fX/scale, grads/scale)
self._U = U
# Force to be SPD
self._H = scale**2 * U.dot(np.diag(np.maximum(ew,0)).dot(U.T))
# Compute the Lipschitz matrix
#self._L = scipy.linalg.cholesky(self.H[::-1][:,::-1], lower = False)[::-1][:,::-1]
self._L = scale * U.dot(np.diag(np.sqrt(np.maximum(ew, 0))).dot(U.T))
@property
def X(self): return self._X
@property
def fX(self): return self._fX
@property
def grads(self): return self._grads
@property
def U(self): return np.copy(self._U)
@property
def H(self):
r""" The symmetric positive definite solution to the semidefinite program
"""
return self._H
@property
def L(self):
r""" The Lipschitz matrix estimate based on samples
"""
return self._L
def _build_lipschitz_matrix_cvxpy(self, X, fX, grads, epsilon):
# Constrain H to symmetric positive semidefinite (PSD)
H = cp.Variable( (len(self), len(self)), PSD = True)
constraints = []
# Sample constraint
for i in range(len(X)):
for j in range(i+1, len(X)):
lhs = max(np.abs(fX[i] - fX[j]) - epsilon,0)**2
y = X[i] - X[j]
# y.T M y
#rhs = H.__matmul__(y).__rmatmul__(y.T)
rhs = cp.quad_form(y, H)
if lhs > 0:
constraints.append(lhs <= rhs)
# gradient constraints
for g in grads:
constraints.append( np.outer(g,g) << H)
problem = cp.Problem(cp.Minimize(cp.trace(H)), constraints)
problem.solve(**self.kwargs)
print("Inside Lipschitz")
print(np.array(H.value))
return np.array(H.value).reshape(len(self),len(self))
def _build_lipschitz_matrix_param(self, X, fX, grads, epsilon):
r""" Use an explicit parameterization
"""
# Build the basis
Es = []
I = np.eye(len(self))
for i in range(len(self)):
ei = I[:,i]
Es.append(np.outer(ei,ei))
for j in range(i+1,len(self)):
ej = I[:,j]
Es.append(0.5*np.outer(ei+ej,ei+ej))
alpha = cp.Variable(len(Es))
H = cp.sum([alpha_i*Ei for alpha_i, Ei in zip(alpha, Es)])
constraints = [H >> 0]
# Construct gradient constraints
for grad in grads:
constraints.append( H >> np.outer(grad, grad))
# Construct linear inequality constraints for samples
A = np.zeros( (len(X)*(len(X)-1)//2, len(Es)) )
b = np.zeros(A.shape[0])
row = 0
for i in range(len(X)):
for j in range(i+1,len(X)):
p = X[i] - X[j]
A[row, :] = [p.dot(E.dot(p)) for E in Es]
b[row] = (max(np.abs(fX[i] - fX[j]) - epsilon, 0))**2
row += 1
# Remove inactive rows
A = A[b!= 0]
b = b[b!= 0]
if A.shape[0] > 0:
constraints.append( b <= alpha.__rmatmul__(A) )
problem = cp.Problem(cp.Minimize(cp.sum(alpha)), constraints)
problem.solve(**self.kwargs)
alpha = np.array(alpha.value)
H = np.sum([ alpha_i * Ei for alpha_i, Ei in zip(alpha, Es)], axis = 0)
return H
def _build_lipschitz_matrix_cvxopt(self, X, fX, grads, epsilon, structure = 'full'):
r""" Directly accessing cvxopt rather than going through CVXPY results in noticable speed improvements
"""
# Build the basis
Es = []
I = np.eye(len(self))
if structure == 'full':
for i in range(len(self)):
ei = I[:,i]
Es.append(np.outer(ei,ei))
for j in range(i+1,len(self)):
ej = I[:,j]
Es.append(0.5*np.outer(ei+ej,ei+ej))
elif structure == 'diag':
for i in range(len(self)):
ei = I[:,i]
Es.append(np.outer(ei,ei))
else:
raise NotImplementedError
Eten = np.array(Es)
# Constraint matrices for CVXOPT
# The format is
# sum_i x_i * G[i].reshape(square matrix) <= h.reshape(square matrix)
Gs = []
hs = []
# Construct linear inequality constraints for samples
for i in range(len(X)):
for j in range(i+1,len(X)):
if np.abs(fX[i] - fX[j]) > epsilon:
p = X[i] - X[j]
# Normalizing here seems to reduce the normalization once inside CVXOPT
p_norm = np.linalg.norm(p)
# Vectorize to improve performance
#G = [-p.dot(E.dot(p)) for E in Es]
G = -np.tensordot(np.tensordot(Eten, p/p_norm, axes = (2,0)), p/p_norm, axes = (1,0))
Gs.append(cvxopt.matrix(G).T)
hs.append(cvxopt.matrix( [[ -(max(np.abs(fX[i] - fX[j]) - epsilon,0))**2/p_norm**2]]))
# Add constraint to enforce H is positive-semidefinite
# Flatten in Fortran---column major order
G = cvxopt.matrix(np.vstack([E.flatten('F') for E in Es]).T)
Gs.append(-G)
hs.append(cvxopt.matrix(np.zeros((len(self),len(self)))))
# Build constraints
for grad in grads:
Gs.append(-G)
gg = -np.outer(grad, grad)
hs.append(cvxopt.matrix(gg))
# Setup objective
c = cvxopt.matrix(np.array([ np.trace(E) for E in Es]))
if 'verbose' in self.kwargs:
cvxopt.solvers.options['show_progress'] = self.kwargs['verbose']
else:
cvxopt.solvers.options['show_progress'] = False
# For some reason, using the 'kktsolver' option fails
for name in ['abstol', 'reltol', 'feastol', 'refinement']:
if name in self.kwargs:
cvxopt.solvers.options[name] = self.kwargs[name]
sol = cvxopt.solvers.sdp(c, Gs = Gs, hs = hs)
alpha = sol['x']
H = np.sum([ alpha_i * Ei for alpha_i, Ei in zip(alpha, Es)], axis = 0)
return H
[docs] def uncertainty(self, X, fX, Xtest):
r""" Compute range of possible values at test points.
Given pairs of inputs :math:`\widehat{\mathbf{x}}_i`
and outputs :math:`y_i := f(\widehat{\mathbf{x}}_i)`
as well as the Lipschitz matrix :math:`\mathbf{L}`, compute the uncertainty interval
associated with the provided points :math:`\mathbf{x}_i`; namely,
.. math::
\mathcal{U}(\mathbf{x}) =
\left[
\max_{j=1,\ldots,M} y_j - \|\mathbf{L} (\mathbf{x} - \widehat{\mathbf{x}}_i)\|_2, \
\min_{j=1,\ldots,M} y_j + \|\mathbf{L} (\mathbf{x} - \widehat{\mathbf{x}}_i)\|_2
\right].
Parameters
----------
X: array-like (M, m)
Array of input coordinates
fX: array-like (M,)
Array of function values
Xtest: array-like (N, m)
Array of places at which to determine uncertainty
Returns
-------
lb: array-like (N,)
Lower bounds
ub: array-like (N,)
Upper bounds
"""
X = np.atleast_2d(np.array(X))
Xtest = np.atleast_2d(np.array(Xtest))
lb = -np.inf*np.ones(Xtest.shape[0])
ub = np.inf*np.ones(Xtest.shape[0])
fX = fX.flatten()
LX = self.L.dot(X.T).T
LXtest = self.L.dot(Xtest.T).T
# TODO: Replace with proper chunking to help vectorize this operation
for Lx, fx in zip(LX, fX):
dist = cdist(Lx.reshape(1,-1), LXtest).flatten()
lb = np.maximum(lb, fx - dist)
ub = np.minimum(ub, fx + dist)
if self.epsilon is None:
return lb, ub
else:
return lb - self.epsilon, ub + self.epsilon
[docs] def uncertainty_domain(self, X, fX, domain, Nsamp = int(1e2), verbose = False, progress = False, tqdm_kwargs = {}, **kwargs):
r""" Compute the uncertainty associated with a set inside the domain
This estimates the uncertainty associated with a subset of the domain,
essentially finding points :math:`\mathbf{x} \in \mathcal{S}`
corresponding to extrema of the uncertainty interval
.. math::
\mathcal{U}(\mathcal{S} ) =
\left[
\min_{\mathbf{x} \in \mathcal{S}} \max_{j=1,\ldots,M} y_j - \|\mathbf{L} (\mathbf{x} - \widehat{\mathbf{x}}_i)\|_2, \
\max_{\mathbf{x} \in \mathcal{S}} \min_{j=1,\ldots,M} y_j + \|\mathbf{L} (\mathbf{x} - \widehat{\mathbf{x}}_i)\|_2
\right].
Parameters
----------
X: array-like (M, m)
Array of input coordinates
fX: array-like (M,)
Array of function values
domain: Domain
Domain on which to find exterma of limits
Nsamp: int, optional
Number of Voronoi vertices to sample to find exterma
verbose: bool (default: False)
If True, print the iteration history for the optimization program for each initalization
progress: bool (default: False)
If True, show a progress bar for trying different initializations
tqdm_kwargs: dict, optional
Additional arguments to pass to tqdm for progress plotting
Returns
-------
lb: float
Lower bound
ub: float
Upper bound
"""
lower_bound = LowerBound(self.L, X, fX)
upper_bound = UpperBound(self.L, X, fX)
fX = fX.flatten()
# Force these to be far apart
#X0 = initial_sample(domain, self.L, Nsamp = Nsamp)
X0 = maximin_coffeehouse(domain, Nsamp, L = self.L, N0 = 50)
#X0 = voronoi_vertex_sample(domain, X, X0, L = self.L, randomize = False)
# Remove duplicates
I = unique_points(X0)
X0 = X0[I]
# Iterate through all candidates
if progress:
iterator = tqdm(X0, desc = 'bounds_domain', total = len(X0), dynamic_ncols = True, **tqdm_kwargs)
else:
iterator = X0
lbs = []
ubs = []
for x0 in iterator:
# Lower bound
x = minimax(lower_bound, x0, domain = domain, verbose = verbose, trust_region = False)
lbs.append(np.max(lower_bound(x)))
# Upper bound
x = minimax(upper_bound, x0, domain = domain, verbose = verbose, trust_region = False)
ubs.append(np.min(-upper_bound(x)))
#print(lbs[-1], ubs[-1])
return float(np.min(lbs)), float(np.max(ubs))
# # Attempts at making a parallel interface
# else:
# lbs = []
# ubs = []
# for x0 in X0:
# x = dask.delayed(minimax)(lower_bound, x0, domain = domain, trust_region = False)
# lb = dask.delayed(lower_bound)(x)
# lbs.append(lb)
#
# x = dask.delayed(minimax)(upper_bound, x0, domain = domain, trust_region = False)
# ub = dask.delayed(upper_bound)(x)
# ubs.append(ub)
#
# lb = dask.delayed(np.max)(lbs)
# ub = dask.delayed(np.max)(ubs)
#
# with ProgressBar():
# lb = lb.compute()
# ub = -1.*ub.compute()
# return lb, ub
# if client is None:
# client = Client(processes = False)
# lb_res = []
# ub_res = []
# for x0 in X0:
# res = client.submit(minimax, lower_bound, x0, domain = domain, trust_region = False)
# lb_res.append(res)
# res = client.submit(minimax, upper_bound, x0, domain = domain, trust_region = False)
# ub_res.append(res)
#
# if progress:
# with tqdm(range(len(X0)), desc = 'bounds_domain', total = 2*len(X0),
# dynamic_ncols = True, **tqdm_kwargs) as it:
# while True:
# it.n = np.sum([res.done() for res in lb_res + ub_res])
# it.refresh()
# if it.n == 2*len(X0):
# break
# time.sleep(0.1)
#
# lbs = np.array([np.max(lower_bound(res.result())) for res in lb_res])
# ubs = np.array([np.min(-upper_bound(res.result())) for res in ub_res])
#
[docs] def shadow_uncertainty(self, domain, X, fX, ax = None, ngrid = 50, dim = 1, U = None, pgfname = None,
plot_kwargs = {}, progress = False, **kwargs):
r""" Plot the uncertainty intervals given data
"""
X = np.atleast_2d(np.array(X))
fX = np.array(fX)
assert dim == 1, "Higher dimensions not yet implemented"
if U is None:
u = self.U[:,0].flatten()
else:
u = U.flatten()
# Find extent in the active direction
c1 = domain.corner(u)
e1 = u.dot(c1)
c2 = domain.corner(-u)
e2 = u.dot(c2)
# Flip so order is increasing
if e1 > e2:
e1, e2 = e2, e1
c1, c2 = c2, c1
# Points to estimate uncertainty at
ys = np.linspace(e1, e2, ngrid)
lbs = np.zeros(ngrid)
ubs = np.zeros(ngrid)
# We treat the endpoints separately because there is only one point
lbs[0], ubs[0] = self.uncertainty(X, fX, c1)
lbs[-1], ubs[-1] = self.uncertainty(X, fX, c2)
# Now for the points on the interior
iterator = range(1, ngrid - 1)
tqdm_kwargs = {}
if progress:
iterator = tqdm(iterator, "shadow uncertainty", position = 0, dynamic_ncols = True)
if progress > 1:
tqdm_kwargs = {'position':1}
for i in iterator:
dom_eq = domain.add_constraints(A_eq = u.reshape(1,-1), b_eq = ys[i].reshape(1))
lbs[i], ubs[i] = self.uncertainty_domain(X, fX, dom_eq, progress = progress - 1, tqdm_kwargs = tqdm_kwargs, **kwargs)
if ax is not None:
ax.fill_between(ys, lbs, ubs, **plot_kwargs)
if pgfname is not None:
pgf = PGF()
pgf.add('y', ys)
pgf.add('lb', lbs)
pgf.add('ub', ubs)
pgf.write(pgfname)
[docs]class DiagonalLipschitzMatrix(CoordinateBasedDimensionReduction, LipschitzMatrix):
r""" Constructs a diagonal Lipschitz matrix
Much like the standard Lipschitz matrix class :meth:`psdr.LipschitzMatrix`,
this class computes a Lipschitz matrix except with the constraint the matrix is diagonal.
"""
def __init__(self, L = None, **kwargs):
if L is not None:
L = np.atleast_2d(L)
self._L = np.diag(np.diag(L))
LipschitzMatrix.__init__(self,**kwargs)
def _fit(self, X, fX, grads, epsilon, scale):
H = self._build_lipschitz_matrix(X, fX/scale, grads/scale, epsilon, structure = 'diag')
# Ensure entries are numerically positive
H = np.diag(np.maximum(np.diag(H),0))
# Unlike the matrix case, we do not rotate coordinates
self._H = scale**2 * H
self._L = scale * np.diag(np.sqrt(np.diag(H)))
self._score = np.diag(self._L).copy()
#self._U = np.eye(len(self._score))[:,np.argsort(-self._score)]
class LipschitzConstant(LipschitzMatrix):
r""" Computes the scalar Lipschitz constant
"""
def __init__(self, L = None, **kwargs):
if L is not None:
self._L = float(L)
self._dimension = 1
LipschitzMatrix.__init__(self, **kwargs)
def _fit(self, X, fX, grads, epsilon, scale):
L = 0
if len(X) > 0:
L_samp = np.max([ (abs(fX[i] - fX[j]) - epsilon )/ np.linalg.norm(X[i] - X[j])
for i,j in combinations(range(len(X)), 2)])
L = max(L, L_samp)
if len(grads) > 0:
L_grad = np.max([np.linalg.norm(grad) for grad in grads])
L = max(L, L_grad)
self._L = L
@property
def H(self):
return self._L**2 * np.eye(len(self))
@property
def L(self):
return self._L * np.eye(len(self))
@property
def U(self):
return np.eye(len(self))
# Helper functions for determining bounds
class LowerBound(BaseFunction):
def __init__(self, L, X, fX):
self.L = L
self.Y = np.dot(self.L, X.T).T
self.fX = fX.reshape(len(X))
def eval(self, x):
y = np.dot(self.L, x)
norms = cdist(y.reshape(1,-1), self.Y, 'euclidean').flatten()
return self.fX - norms
def grad(self, x):
y = np.dot(self.L, x)
norms = cdist(y.reshape(1,-1), self.Y, 'euclidean').flatten()
G = np.zeros([self.Y.shape[0], x.shape[0]] )
I = (norms > 0)
G[I,:] = -( (y - self.Y[I]).T /norms[I]).T
G = self.L.T.dot(G.T).T
return G
class UpperBound(BaseFunction):
def __init__(self, L, X, fX):
self.L = L
self.Y = np.dot(self.L, X.T).T
self.fX = fX.reshape(len(X))
def eval(self, x):
y = np.dot(self.L, x)
norms = cdist(y.reshape(1,-1), self.Y, 'euclidean').flatten()
return -(self.fX + norms)
def grad(self, x):
y = np.dot(self.L, x)
norms = cdist(y.reshape(1,-1), self.Y, 'euclidean').flatten()
G = np.zeros([self.Y.shape[0], x.shape[0]] )
I = (norms > 0)
G[I,:] = -((y - self.Y[I]).T/norms[I]).T
G = self.L.T.dot(G.T).T
return G