Skip to content
Snippets Groups Projects
Commit e659e385 authored by Tizian Wenzel's avatar Tizian Wenzel
Browse files

Initial commit.m

parents
Branches
No related tags found
No related merge requests found
# ecVKOGA
Python implementation of the energy conserving VKOGA (curl-free approximation).
## Usage:
The implementation is similar to the original VKOGA, see https://github.com/GabrieleSantin/VKOGA.
main_demo.py provides an example how to use the ecVKOGA.
In order to set up the venv, run ./setup.sh (maybe run 'chmod +x setup.sh' before).
## How to cite:
The corresponding paper is still in preparation. If you want to use this code, please get in touch with the author on how to cite it.
# Introduction to the basic usage of a ecVKOGA model.
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from utils.ecVKOGA import ecVKOGA
# Define a dataset `(X, y)` to run some experiments.
x = np.linspace(0, 1, 31)
y = np.linspace(0, 1, 31)
xv, yv = np.meshgrid(x, y)
f1 = lambda x, y: y
f2 = lambda x, y: x # these are the derivatives of a potential Phi(x,y) = x*y
X = np.hstack([xv.reshape(-1,1), yv.reshape(-1,1)])
y = np.hstack([f1(X[:, [0]], X[:, [1]]), f2(X[:, [0]], X[:, [1]])])
kernel_par = 1
# We split the dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=2)
# We start by defining a VKOGA model with default parameters.
model = ecVKOGA(kernel_par=kernel_par, max_iter=30)
# The VKOGA model can now be trained on the dataset using the `fit` method:
_ = model.fit(X_train, y_train)
# Get some information
f_max, p_max = model.train_hist['f'], model.train_hist['p']
# Calculate error on test set
y_test_predict = model.predict_tmp(X_test)
max_error_test = np.max(np.abs(y_test_predict - y_test))
# Plots on the decay of the error
plt.figure(1)
plt.clf()
plt.plot(np.arange(1, len(f_max)+1), f_max)
plt.plot([1, len(f_max)], [max_error_test**2, max_error_test**2])
plt.yscale('log')
plt.title('fmax')
plt.figure(2)
plt.clf()
plt.plot(np.arange(1, len(p_max)+1), p_max)
plt.yscale('log')
plt.title('pmax')
setup.sh 0 → 100755
set -e
export BASEDIR="$(cd "$(dirname ${BASH_SOURCE[0]})" ; pwd -P)"
cd "${BASEDIR}"
# Create and source virtualenv
if [ -e "${BASEDIR}/venv/bin/activate" ]; then
echo "using existing virtualenv"
else
echo "creating virtualenv ..."
virtualenv --python=python3 venv
fi
source venv/bin/activate
# Upgrade pip and install libraries (dependencies are also installed)
pip install --upgrade pip
pip install numpy scipy matplotlib sklearn
# pip freeze > requirements.txt # not required
#!/usr/bin/env python3
from utils.kernels_derivatives import Gaussian
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
# VKOGA implementation
class ecVKOGA(BaseEstimator):
def __init__(self, kernel=Gaussian(), kernel_par=1,
verbose=True, n_report=10,
greedy_type='p_greedy', reg_par=0, restr_par=0,
tol_f=1e-10, tol_p=1e-10, max_iter=10):
# Set the verbosity on/off
self.verbose = verbose
# Set the frequency of report
self.n_report = n_report
# Set the params defining the method
self.kernel = kernel
self.kernel_par = kernel_par
self.greedy_type = greedy_type
self.reg_par = reg_par
if reg_par > 0:
print('reg_para > 0 is not yet implemented correctly!')
self.restr_par = restr_par
# Set the stopping values
self.max_iter = max_iter
self.tol_f = tol_f
self.tol_p = tol_p
# Set the output dimension
self.q = None
def selection_rule(self, f, p):
if self.restr_par > 0:
p_ = np.max(p)
restr_idx = np.nonzero(p >= self.restr_par * p_)[0]
else:
restr_idx = np.arange(len(p))
f = np.sum(f ** 2, axis=1)
if self.greedy_type == 'f_greedy':
idx = np.argmax(f[restr_idx])
idx = restr_idx[idx]
f_max = np.max(f)
p_max = np.max(p)
elif self.greedy_type == 'fp_greedy': # I'm not sure whether the optimality of fp-greedy still holds
idx = np.argmax(f[restr_idx] / p[restr_idx])
idx = restr_idx[idx]
f_max = np.max(f)
p_max = np.max(p)
elif self.greedy_type == 'p_greedy':
f_max = np.max(f)
idx = np.argmax(p)
idx = restr_idx[idx]
p_max = p[idx]
return idx, f_max, p_max
def fit(self, X, y, weight=None, flag_early_stopping=False, flag_select_origin_first=True):
# weight (of shape 1 x y.shape[1]) allows to weight the dimensions of the target vector
if weight is None:
weight = 1
# flag needed to break out of loop if negative power values occur
flag_break = False
# Check the dataset
X, y = check_X_y(X, y, multi_output=True)
# Initialize the convergence history (cold start)
self.train_hist = {}
self.train_hist['n'] = []
self.train_hist['f'] = []
self.train_hist['p'] = []
# Initialize the residual (i.e. the derivatives of the residual of the potential)
y = np.array(y)
if len(y.shape) == 1:
y = y[:, None]
# Get the data dimension
N, q = y.shape
self.q = q
self.max_iter = min(self.max_iter, N)
self.kernel.set_params(self.kernel_par)
# Check compatibility of restriction
if self.greedy_type == 'p_greedy':
self.restr_par = 0
if not self.reg_par == 0:
self.restr_par = 0
self.indI_ = []
notIndI = list(range(N))
# Initialize arrays to store Newton basis and coefficients
Vx = np.zeros((N, self.max_iter * q))
c = np.zeros(self.max_iter * q)
# Initialize a 3D array which stores the derivatives of the Newton basis
der_Vx = np.zeros((q, N, self.max_iter * q))
# Initialize an array to store squared power function values in each direction
# p = np.matlib.repmat(self.kernel.dd_diagonal(X)[:, None] + self.reg_par, 1, q)
p = self.kernel.dd_diagonal(X)
# Initialize Cut matrix (transpose of C matrix, such that it is lower triangular
self.Cut_ = np.zeros((self.max_iter * q, self.max_iter * q))
self.print_message('begin')
# Iterative selection of new points
for n_point in range(self.max_iter):
# prepare
self.train_hist['n'].append(n_point + 1)
self.train_hist['f'].append([])
self.train_hist['p'].append([])
# Apply selection rule (we take the l2 norm for the power values)
if n_point == 0 and flag_select_origin_first: # select the origin at the beginning!
idx = 0
self.train_hist['f'][n_point] = np.max(np.sum((y / weight) ** 2, axis=1))
self.train_hist['p'][n_point] = np.max(np.linalg.norm(p[notIndI, :], 2, 1)[:, None])
else: # standard selection for later points
idx, self.train_hist['f'][n_point], self.train_hist['p'][n_point] = \
self.selection_rule(y / weight, np.linalg.norm(p[notIndI, :], 2, 1)[:, None])
# add the current index
self.indI_.append(notIndI[idx])
# check if the tolerances are reacheded
if self.train_hist['f'][n_point] <= self.tol_f:
n_point = n_point - 1
self.print_message('end')
break
if self.train_hist['p'][n_point] <= self.tol_p:
n_point = n_point - 1
self.print_message('end')
break
# For each point use all derivative
for idx_dim in range(q):
# Compute the next Newton basis element
Vx[notIndI, n_point * q + idx_dim] = \
(self.kernel.deval(X[notIndI, :], X[[self.indI_[n_point]], :], idx_dim)
- Vx[notIndI, 0:n_point * q + idx_dim + 1] @
der_Vx[idx_dim, [self.indI_[n_point]], 0:n_point * q + idx_dim + 1].transpose()).reshape(-1)
# Normalize the Newton basis element (if negative values occur: break)
if p[self.indI_[n_point], idx_dim] < 0:
# n_point = n_point - 5
self.print_message('end')
flag_break = True
break
Vx[notIndI, n_point * q + idx_dim] = \
Vx[notIndI, n_point * q + idx_dim] / np.sqrt(p[self.indI_[n_point], idx_dim])
# Calculate all q derivatives of the previously computed Newton basis element
for idx_dim2 in range(q):
der_Vx[idx_dim2, notIndI, n_point * q + idx_dim] = \
(self.kernel.ddeval(X[notIndI, :], X[[self.indI_[n_point]], :], idx_dim2, idx_dim)
- der_Vx[idx_dim2, notIndI, 0:n_point * q + idx_dim + 1] @
der_Vx[idx_dim, [self.indI_[n_point]], 0:n_point * q + idx_dim + 1].transpose()).reshape(-1)
der_Vx[idx_dim2, notIndI, n_point * q + idx_dim] = \
der_Vx[idx_dim2, notIndI, n_point * q + idx_dim] / \
np.sqrt(p[self.indI_[n_point], idx_dim])
# Calculate next row of Cut and then update it
Cut_new_row = np.ones(n_point * q + idx_dim + 1) # like this the last entry is set to one
Cut_new_row[0:n_point * q + idx_dim] = -der_Vx[idx_dim, self.indI_[n_point], 0:n_point * q + idx_dim] \
@ self.Cut_[:n_point * q + idx_dim:, :n_point * q + idx_dim]
self.Cut_[n_point * q + idx_dim, :n_point * q + idx_dim + 1] = \
Cut_new_row / der_Vx[idx_dim, self.indI_[n_point], n_point * q + idx_dim]
# compute the (n_point*q+idx_dim)-th coefficient
c[n_point * q + idx_dim] = \
y[self.indI_[n_point], idx_dim] / np.sqrt(p[self.indI_[n_point], idx_dim])
# update all q Power functions
p[notIndI, :] = p[notIndI, :] - der_Vx[:, notIndI, n_point * q + idx_dim].transpose() ** 2
# update the residuals (i.e. the derivatives of the residual of the potential)
y[notIndI, :] = y[notIndI, :] - \
der_Vx[:, notIndI, n_point * q + idx_dim].transpose() * c[n_point * q + idx_dim]
if flag_break:
print('Break due do negative power values.')
break
# remove the nth index from the dictionary
notIndI.pop(idx)
# Report some data every now and then
if n_point % self.n_report == 0:
self.print_message('track')
else:
self.print_message('end')
# Early stopping:
if flag_early_stopping:
idx_argmin = np.argmin(np.asarray(self.train_hist['f']))
print('flag_early_stopping removes {} centers.'.format(
n_point + 1 - idx_argmin))
n_point = idx_argmin - 1 # next lines use n_point + 1
# Define coefficients and centers
c = c[:(n_point + 1) * q] # recall we used n_point = n_point - 1 above
self.Cut_ = self.Cut_[0:(n_point + 1) * q, 0:(n_point + 1) * q]
self.indI_ = self.indI_[0:(n_point + 1)]
self.coef_ = self.Cut_.transpose() @ c
self.ctrs_ = X[self.indI_, :]
print('{} / {}'.format(n_point, N))
return self
def predict(self, X):
# This one does NOT predict the potential, but the forces (i.e. the derivatives derived from the potential)
# Check if fit has been called
check_is_fitted(self, 'coef_')
# Validate the input
X = check_array(X)
# Evaluate the model (for this loop over the output dimensions) (ToDo: improve this part of code .. :/)
coeffs = self.coef_.reshape(-1, self.q)
list_to_concatenate = []
# Loop over dimensions to calculate all outputs
for idx_dim in range(self.q):
array_sum = np.zeros((X.shape[0], 1))
# For each center point there are q Newton Riesz representer, thus loop
for idx_dim2 in range(self.q):
array_to_add = self.kernel.ddeval(X, self.ctrs_, idx_dim, idx_dim2) @ coeffs[:, idx_dim2]
array_sum += array_to_add.reshape(-1, 1)
list_to_concatenate.append(array_sum)
# finally horizontal-stack the single dimensions
return np.hstack(list_to_concatenate)
def predict_tmp(self, X):
# This one does NOT predict the potential, but the forces (i.e. the derivatives derived from the potential)
# Check if fit has been called
check_is_fitted(self, 'coef_')
# Validate the input
X = check_array(X)
# Calculate all outputs
result = self.kernel.ddeval_tmp(X, self.ctrs_) @ self.coef_.reshape(-1, self.q).flatten(order='F')
return result.reshape(self.q, -1).transpose()
def print_message(self, when):
if self.verbose and when == 'begin':
print('')
print('*' * 30 + ' [VKOGA] ' + '*' * 30)
print('Training model with')
print(' |_ kernel : %s' % self.kernel)
print(' |_ regularization par. : %2.2e' % self.reg_par)
print(' |_ restriction par. : %2.2e' % self.restr_par)
print('')
if self.verbose and when == 'end':
print('Training completed with')
print(' |_ selected points : %8d / %8d' % (self.train_hist['n'][-1], self.max_iter))
print(' |_ train residual : %2.2e / %2.2e' % (self.train_hist['f'][-1], self.tol_f))
print(' |_ train power fun : %2.2e / %2.2e' % (self.train_hist['p'][-1], self.tol_p))
if self.verbose and when == 'track':
print('Training ongoing with')
print(' |_ selected points : %8d / %8d' % (self.train_hist['n'][-1], self.max_iter))
print(' |_ train residual : %2.2e / %2.2e' % (self.train_hist['f'][-1], self.tol_f))
print(' |_ train power fun : %2.2e / %2.2e' % (self.train_hist['p'][-1], self.tol_p))
# %% Utilities to
import pickle
def save_object(obj, filename):
with open(filename, 'wb') as output:
pickle.dump(obj, output, pickle.HIGHEST_PROTOCOL)
def load_object(filename):
with open(filename, 'rb') as input:
obj = pickle.load(input)
return obj
#!/usr/bin/env python3
from abc import ABC, abstractmethod
from scipy.spatial import distance_matrix
import numpy as np
import matplotlib.pyplot as plt
# Abstract kernel
class Kernel(ABC):
@abstractmethod
def __init__(self):
super().__init__()
@abstractmethod
def eval(self, x, y):
pass
def eval_prod(self, x, y, v, batch_size=100):
N = x.shape[0]
num_batches = int(np.ceil(N / batch_size))
mat_vec_prod = np.zeros((N, 1))
for idx in range(num_batches):
idx_begin = idx * batch_size
idx_end = (idx + 1) * batch_size
A = self.eval(x[idx_begin:idx_end, :], y)
mat_vec_prod[idx_begin:idx_end] = A @ v
return mat_vec_prod
@abstractmethod
def diagonal(self, X):
pass
@abstractmethod
def __str__(self):
pass
@abstractmethod
def set_params(self, params):
pass
# Abstract RBF
class RBF(Kernel):
@abstractmethod
def __init__(self):
super(RBF, self).__init__()
def eval(self, x, y):
return self.rbf(self.ep, distance_matrix(np.atleast_2d(x), np.atleast_2d(y)))
def diagonal(self, X):
return np.ones(X.shape[0]) * self.rbf(self.ep, 0)
def __str__(self):
return self.name + ' [gamma = %2.2e]' % self.ep
def set_params(self, par):
self.ep = par
# Implementation of concrete RBFs
class Gaussian(RBF):
def __init__(self, ep=1):
self.ep = ep
self.name = 'gauss'
# self.rbf = lambda ep, r: np.exp(-(ep * r) ** 2)
def rbf(self, ep, r):
return np.exp(-(ep*r)**2)
def dd_diagonal(self, X):
'''Returns the double derivative (j-th derivative wrt to first and
j-th derivative wrt to second argument) along the diagonal, i.e. for
both arguments equal to X. Should be independent of j !'''
# return 2 * self.ep ** 2 * np.ones(X.shape[0]) * self.rbf(self.ep, 0) # see 28.04.20\A1
return 2 * self.ep ** 2 * self.rbf(self.ep, 0) * np.ones(X.shape)
def deval(self, x, y, j):
'''Returns the derivative of the kernel in j-th direction wrt to the second argument
evaluated in x and y.'''
return 2 * self.ep ** 2 * (x[:, [j]] - y[:, [j]].transpose()) * self.eval(x, y) # see 28.04.20\A1
def ddeval(self, x, y, j1, j2):
'''Return the j1-th derivative (wrt to 1st argument) of the j2-th
derivative (wrt to 2nd argument).'''
if j1 == j2: # there should be a numpy way of 'kronecker delta'
first_summand = 2 * self.ep ** 2 * self.eval(x, y)
else:
first_summand = 0
second_summand = -4 * self.ep ** 4 * \
(x[:, [j1]] - y[:, [j1]].transpose()) * \
(x[:, [j2]] - y[:, [j2]].transpose()) * self.eval(x, y)
return first_summand + second_summand
def ddeval_tmp(self, x, y):
'''This is like ddeval, but it returns a matrix D with the structure
[D_{1, 1}, D_{1, 2}, ..., D_{1, q}]
[D_{2, 1}, D_{2, 2}, ..., D_{2, q}]
D = [ ... ]
[D_{q, 1}, D_{q, 2}, ..., D_{q, q}]
where D_{i, j} = i-th derivative in the first argument of the
j-th derivative in the second argument of the kernel matrix A = K(x, y)
(so D_{i, j} is of the same size of A).
The matrices D_{i, j} are computed with the code of ddeval, but without
calling ddeval to avoid multiple evaluations of the kernel matrix and
using the fact that with
A_ep = 2 * self.ep ** 2 * self.eval(x, y)
we have
first_summand = A_ep * delta_ij
second_summand = -2 ep ** 2 * A_ep * (terms with x and y)
so in the new variables
first_summand + second_summand = A_ep * ((i == j) + x_y_ep)
'''
n_x, q = x.shape
n_y = y.shape[0]
D = np.empty((q * n_x, q * n_y))
A_ep = 2 * self.ep ** 2 * self.eval(x, y)
for i in range(q):
for j in range(q):
x_y_ep = -2 * self.ep ** 2 * \
(x[:, [i]] - y[:, [i]].transpose()) * \
(x[:, [j]] - y[:, [j]].transpose())
D[i * n_x : (i + 1) * n_x, j * n_y : (j + 1) * n_y] = A_ep * ((i == j) + x_y_ep)
return D
class GaussianTanh(RBF):
def __init__(self, ep=1):
self.ep = ep
self.name = 'gauss_tanh'
self.rbf = lambda ep, r: np.exp(-(ep * np.tanh(r)) ** 2)
class IMQ(RBF):
def __init__(self, ep=1):
self.ep = ep
self.name = 'imq'
self.rbf = lambda ep, r: 1. / np.sqrt(1 + (ep * r) ** 2)
class Matern(RBF):
def __init__(self, ep=1, k=0):
self.ep = ep
if k == 0:
self.name = 'mat0'
self.rbf = lambda ep, r: np.exp(-ep * r)
elif k == 1:
self.name = 'mat1'
self.rbf = lambda ep, r: np.exp(-ep * r) * (1 + ep * r)
elif k == 2:
self.name = 'mat2'
self.rbf = lambda ep, r: np.exp(-ep * r) * (3 + 3 * ep * r + (ep * r) ** 2)
elif k == 3:
self.name = 'mat3'
self.rbf = lambda ep, r: np.exp(-ep * r) * (15 + 15 * ep * r + 6 * (ep * r) ** 2 + (ep * r) ** 3)
else:
self.name = None
self.rbf = None
raise Exception('This Matern kernel is not implemented')
class Wendland(RBF):
def __init__(self, ep=1, k=0, d=1):
self.ep = ep
self.name = 'wen_' + str(d) + '_' + str(k)
l = np.floor(d / 2) + k + 1
if k == 0:
p = lambda r: 1
elif k == 1:
p = lambda r: (l + 1) * r + 1
elif k == 2:
p = lambda r: (l + 3) * (l + 1) * r ** 2 + 3 * (l + 2) * r + 3
elif k == 3:
p = lambda r: (l + 5) * (l + 3) * (l + 1) * r ** 3 + (45 + 6 * l * (l + 6)) * r ** 2 + (
15 * (l + 3)) * r + 15
elif k == 4:
p = lambda r: (l + 7) * (l + 5) * (l + 3) * (l + 1) * r ** 4 + (
5 * (l + 4) * (21 + 2 * l * (8 + l))) * r ** 3 + (45 * (14 + l * (l + 8))) * r ** 2 + (
105 * (l + 4)) * r + 105
else:
raise Exception('This Wendland kernel is not implemented')
c = np.math.factorial(l + 2 * k) / np.math.factorial(l)
e = l + k
self.rbf = lambda ep, r: np.maximum(1 - ep * r, 0) ** e * p(ep * r) / c
# Polynomial kernels
class Polynomial(Kernel):
def __init__(self, a=0, p=1):
self.a = a
self.p = p
def eval(self, x, y):
return (np.atleast_2d(x) @ np.atleast_2d(y).transpose() + self.a) ** self.p
def diagonal(self, X):
return ((np.linalg.norm(X, axis=1) + self.a) ** self.p)[:, None]
def __str__(self):
return 'polynomial' + ' [a = %2.2e, p = %2.2e]' % (self.a, self.p)
def set_params(self, par):
self.a = par[0]
self.p = par[1]
# A demo usage
def main():
ker = Gaussian()
x = np.linspace(-1, 1, 100)[:, None]
y = np.matrix([0])
A = ker.eval(x, y)
fig = plt.figure(1)
fig.clf()
ax = fig.gca()
ax.plot(x, A)
ax.set_title('A kernel: ' + str(ker))
fig.show()
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment