# Synthetic Online Phase: Failing Sensors with drifted measures
# Author: Stefano Riva, PhD Student, NRG, Politecnico di Milano
# Latest Code Update: 7 June 2025
# Latest Doc Update: 7 June 2025
import numpy as np
import numpy.linalg as la
from dolfinx.fem import Function
from pyforce.tools.backends import norms, LoopProgress
from pyforce.tools.functions_list import FunctionsList
[docs]
def remove_lin_combine(fun_list: FunctionsList, vec: np.ndarray, M: int, sensI_drifted: int):
r"""
This auxiliary function is used to perform a linear combination of basis functions without `sensI_drifted` (i.e., :math:`j`).
.. math::
u = \sum_{i=1,i\neq j}^{M} \alpha_i \cdot \psi_i
Parameters
----------
fun_list : FunctionsList
List of basis functions
vec : np.ndarray
Iterable containing the coefficients :math:`\boldsymbol{\alpha}\in\mathbb{R}^{M-1}` of the linear combination.
M : int
Maximum number of basis functions without removal.
sensI_drifted : int
Index of the drifted sensor and hence drifted coefficient :math:`\alpha_j` .
Returns
-------
combination : np.ndarray
`np.ndarray` object storing the result of the linear combination
"""
kk = 0
for ii in range(M):
combination = np.zeros(fun_list.fun_shape,)
for ii in range(len(vec)):
if ii != sensI_drifted:
combination += vec[kk] * fun_list(ii)
kk += 1
return combination
[docs]
def compute_measure(snaps: FunctionsList, sensors: FunctionsList, noise_value: float = None, M = None) -> np.ndarray:
r"""
Computes the measurement matrix from the `snaps` input, using `sensors` given as input to which synthetic random noise is added.
If the dimension `M` is not given, the whole set of magic sensors is used.
.. math::
y_m = v_m(u) +\epsilon_m \qquad \qquad m = 1, \dots, M
If the dimension :math:`M` is not given, the whole set of magic sensors is used.
Parameters
----------
snap : FunctionsList
FunctionsList from which measurements are to be extracted
sensors : FunctionsList
FunctionsList containing the sensors
noise_value : float, optional (Default = None)
Standard deviation of the noise, modelled as a normal :math:`\mathcal{N}(0, \sigma^2)`
M : int, optional (default = None)
Maximum number of sensor to use (if None is set to the number of magic functions/sensors)
Returns
----------
measure : np.ndarray
Measurement vector :math:`\mathbf{y}\in\mathbb{R}^{M\times N_s}`
"""
# Check on the input M, maximum number of sensors to use
if M is None:
M = len(sensors)
elif M > len(sensors):
print('The maximum number of measures must not be higher than '+str(len(sensors))+' --> set equal to '+str(len(sensors)))
M = len(sensors)
Ns = len(snaps)
norm_ = norms(snaps.fun_space)
measure = np.zeros((M, Ns))
for mm in range(M):
for mu in range(len(snaps)):
measure[mm, mu] = norm_.L2innerProd(snaps(mu), sensors(mm))
# Adding synthetic noise
if noise_value is not None:
measure += np.random.normal(0, noise_value, measure.shape)
return measure
# TR-GEIM online with drifted synthetic measures
[docs]
class TRGEIM():
r"""
This class can be used to perform the online phase of the TR-GEIM algorihtm for synthetic measures :math:`\mathbf{y}\in\mathbb{R}^M` obtained as evaluations of the magic sensors on the snapshot :math:`u(\mathbf{x};\,\boldsymbol{\mu})` as
.. math::
y_m = v_m(u(\boldsymbol{\mu})) + \epsilon_m + \delta_{m} \qquad \qquad m = 1, \dots, M
in which :math:`\epsilon_m\sim \mathcal{N}(0, \sigma^2)` is random noise and :math:`\delta_m \sim \mathcal{N}(\kappa, \rho^2)` acting on some measurements. This term is referred to as drift.
Two approaches are implemented:
- **Unregularised case**: how the drifted measurement affect the reconstruction?
- **Remove measure case**: the failed measure (once at a time) is removed as well as the correspondent magic function
Parameters
----------
magic_fun : FunctionsList
List of magic functions computed during the offline phase.
magic_sen : FunctionsList
List of magic sensors computed during the offline phase.
mean_beta : np.ndarray
Mean values :math:`\langle\beta_m\rangle` of the training reduced coefficients
std_beta : np.ndarray
Standard deviations :math:`\sigma_{\beta_m}` of the training reduced coefficients
name : str
Name of the snapshots (e.g., temperature T)
"""
def __init__(self, magic_fun: FunctionsList, magic_sen: FunctionsList, mean_beta: np.ndarray, std_beta: np.ndarray, name: str) -> None:
self.mf = magic_fun
self.ms = magic_sen
self.V = magic_fun.fun_space
self.name = name
# Defining the norm class to make scalar products and norms
self.norms = norms(self.V)
# Computing the matrix B
assert (len(self.ms) == len(self.mf)), "Number of magic sensors must be equal to number of magic functions"
self.Mmax = len(self.ms)
self.B = np.zeros((self.Mmax, self.Mmax))
for nn in range(self.Mmax):
for mm in range(self.Mmax):
self.B[nn, mm] = self.norms.L2innerProd(self.ms(nn), self.mf(mm))
# Compute matrix T for the regularisation
assert (mean_beta.shape[0] == self.Mmax), "The size of the mean values of beta must be equal to the number of magic sensors/functions"
assert (mean_beta.shape == std_beta.shape), "The size of the mean values of beta must be equal to the size of the standard deviations"
self.mean_beta = mean_beta
self.T = np.diag(1 / std_beta)
[docs]
def compute_measure(self, snap: Function, noise_value: float = None, M = None) -> np.ndarray:
r"""
Computes the measurement vector from the `snap` input, using the magic sensors stored to which synthetic random noise is added.
If the dimension `M` is not given, the whole set of magic sensors is used.
.. math::
y_m = v_m(u) +\epsilon_m \qquad \qquad m = 1, \dots, M
If the dimension :math:`M` is not given, the whole set of magic sensors is used.
Parameters
----------
snap : Function
Function from which measurements are to be extracted
noise_value : float, optional (Default = None)
Standard deviation of the noise, modelled as a normal :math:`\mathcal{N}(0, \sigma^2)`
M : int, optional (default = None)
Maximum number of sensor to use (if None is set to the number of magic functions/sensors)
Returns
----------
measure : np.ndarray
Measurement vector :math:`\mathbf{y}\in\mathbb{R}^M`
"""
# Check on the input M, maximum number of sensors to use
if M is None:
M = self.Mmax
elif M > self.Mmax:
print('The maximum number of measures must not be higher than '+str(self.Mmax)+' --> set equal to '+str(self.Mmax))
M = self.Mmax
measure = np.zeros((M,))
for mm in range(M):
measure[mm] = self.norms.L2innerProd(snap, self.ms(mm))
# Adding synthetic noise
if noise_value is not None:
measure += np.random.normal(0, noise_value, len(measure))
return measure
[docs]
def drift_test_err(self, snaps: FunctionsList, M: int, noise_value: float, reg_param: float,
kappa: float, rho: float, idx_failed: list[int],
num_rep_exp: int = 30, mu_failure: int = 0, verbose = False):
r"""
The TR-GEIM algorithm is used to reconstruct the `snaps` (`FunctionsList`), by solving the TR-GEIM linear system
.. math::
\left(\mathbb{B}^T\mathbb{B}+\lambda \mathbb{T}^T\mathbb{T}\right)\boldsymbol{\beta} = \mathbb{B}^T\mathbf{y}+\lambda \mathbb{T}^T\mathbb{T} \langle{\boldsymbol{\beta}}\rangle
then the inteprolant and residual are computed and returned per each element of the list.
Parameters
----------
snaps : FunctionsList
Function to reconstruction
M : int
Number of sensor to use
noise_value : float
Standard deviation of the noise, modelled as a normal :math:`\mathcal{N}(0, \sigma^2)`
reg_param : float
Regularising parameter :math:`\lambda`
kappa : float
Mean value :math:`\kappa` of the drift
rho : float
Standard deviation :math:`\rho` of the drift
idx_failed : list[int]
List of integers with the failed sensors
num_rep_exp : int, optional (default = 30)
Number of repeated experiments.
mu_failure : int, optional (default = 0)
Index from which failure starts, typically time.
verbose : boolean, optional (default = False)
If true, output is printed.
Returns
----------
errors : np.ndarray
Errors per each element in `snaps` (first column: absolute, second column: relative), measure in :math:`||\cdot ||_{L^2}` and averaged for the numerical experiments.
interps : FunctionsList
Interpolant Field :math:`\mathcal{I}_M[u]` of TR-GEIM.
resids : FunctionsList
Residual Field :math:`r_M[u]`,
"""
if M > self.Mmax:
print('The maximum number of measures must not be higher than '+str(self.Mmax)+' --> set equal to '+str(self.Mmax))
M = self.Mmax
assert(max(idx_failed) < M)
Ns = len(snaps)
# Computing clean measures and norms of the snapshots
clean_measures = np.zeros((Ns, M))
snap_norms = np.zeros((Ns,))
for mu in range(Ns):
clean_measures[mu] = self.compute_measure(snaps(mu), M = M)
snap_norms[mu] = self.norms.L2norm(snaps(mu))
errors = np.zeros((num_rep_exp, Ns, 2))
interps = FunctionsList(self.V)
resids = FunctionsList(self.V)
if verbose:
progressBar = LoopProgress(msg = "Computing TR-GEIM drifted (synthetic) - " + self.name, final = num_rep_exp )
# Assembling matrix for TR-GEIM
sys_matrix = (self.B[:M, :M].T @ self.B[:M, :M]) + reg_param * (self.T[:M, :M].T @ self.T[:M, :M])
for kk in range(num_rep_exp):
for mu in range(Ns):
# Compute measurement vector with noise
# y = self.compute_measure(snaps(mu), noise_value, M)
y = clean_measures[mu].reshape(M,) + np.random.normal(0, noise_value, M).reshape(M,)
# Adding drift
if mu >= mu_failure:
y[idx_failed] += np.random.normal(kappa, rho, len(idx_failed))
# Creating vector
rhs = np.dot(self.B[:M, :M].T, y) + reg_param * np.dot((self.T[:M, :M].T @ self.T[:M, :M]), self.mean_beta[:M])
# Solving the linear system
coeff = la.solve(sys_matrix, rhs)
# Compute the interpolant and residual
interpolant = self.mf.lin_combine(coeff)
residual = np.abs(snaps(mu) - interpolant)
errors[kk, mu, 0] = self.norms.L2norm(residual)
errors[kk, mu, 1] = errors[kk, mu, 0] / snap_norms[mu]
# Storing interpolant and residual
if kk + 1 == num_rep_exp:
interps.append(interpolant)
resids.append(residual)
if verbose:
progressBar.update(1, percentage = False)
return errors, interps, resids
[docs]
def pure_remove_test_err(self, snaps: FunctionsList, M: int, noise_value: float, reg_param: float,
idx_failed: list[int], mu_failure: int = 0, verbose = False):
r"""
The TR-GEIM algorithm is used to reconstruct the `snaps` (`FunctionsList`), by solving the modified TR-GEIM linear system: in particular, `idx_failed` measure is removed thus from :math:`\mathbf{y}` the `idx_failed` row is deleted, from :math:`\mathbb{T}` and :math:`\mathbb{B}` their `idx_failed` row and col are deleted.
The interpolant is then defined as the sum over the obtained coefficients from the modified TR-GEIM linear system, without `idx_failed`.
Parameters
----------
snaps : FunctionsList
Function to reconstruction
M : int
Number of sensor to use
noise_value : float
Standard deviation of the noise, modelled as a normal :math:`\mathcal{N}(0, \sigma^2)`
reg_param : float
Regularising parameter :math:`\lambda`
idx_failed : list
List of integers with the failed sensor
mu_failure : int, optional (default = 0)
Index from which failure starts, typically time.
verbose : boolean, optional (default = False)
If true, output is printed.
Returns
----------
errors : np.ndarray
Errors per each element in `snaps` (first column: absolute, second column: relative), measure in :math:`||\cdot ||_{L^2}`.
interps : FunctionsList
Interpolant Field :math:`\mathcal{I}_M[u]` of TR-GEIM.
resids : FunctionsList
Residual Field :math:`r_M[u]`,
"""
if M > self.Mmax:
print('The maximum number of measures must not be higher than '+str(self.Mmax)+' --> set equal to '+str(self.Mmax))
M = self.Mmax
assert(max(idx_failed) < M)
Ns = len(snaps)
# Computing clean measures and norms of the snapshots
clean_measures = np.zeros((Ns, M))
snap_norms = np.zeros((Ns,))
for mu in range(Ns):
clean_measures[mu] = self.compute_measure(snaps(mu), M = M)
snap_norms[mu] = self.norms.L2norm(snaps(mu))
errors = np.zeros((Ns, 2))
interps = FunctionsList(self.V)
resids = FunctionsList(self.V)
if verbose:
progressBar = LoopProgress(msg = "Computing TR-GEIM remove (synthetic) - " + self.name, final = Ns )
# Assembling matrix for TR-GEIM
sys_matrix = (self.B[:M, :M].T @ self.B[:M, :M]) + reg_param * (self.T[:M, :M].T @ self.T[:M, :M])
for mu in range(Ns):
# Compute measurement vector with noise
y = clean_measures[mu].reshape(M,) + np.random.normal(0, noise_value, M).reshape(M,)
rhs = np.dot(self.B[:M, :M].T, y) + reg_param * np.dot((self.T[:M, :M].T @ self.T[:M, :M]), self.mean_beta[:M])
# If failure the measurement and the matrix are modified
if mu >= mu_failure:
_rhs = np.delete(rhs, idx_failed)
_sys_matrix = np.delete(np.delete(sys_matrix, idx_failed, axis=1), idx_failed, axis=0)
else:
_rhs = rhs
_sys_matrix = sys_matrix
# Solving the linear system
coeff = la.solve(_sys_matrix, _rhs)
# Compute the interpolant and residual
if mu >= mu_failure:
interpolant = remove_lin_combine(self.mf, coeff, M, sensI_drifted=idx_failed)
else:
interpolant = self.mf.lin_combine(coeff)
residual = np.abs(snaps(mu) - interpolant)
errors[mu, 0] = self.norms.L2norm(residual)
errors[mu, 1] = errors[mu, 0] / snap_norms[mu]
# Storing interpolant and residual
interps.append(interpolant)
resids.append(residual)
if verbose:
progressBar.update(1, percentage = False)
return errors, interps, resids
[docs]
def gpr_measure_test_err(self, snaps: FunctionsList, M: int, noise_value: float, reg_param: float,
ext_sens: FunctionsList, surrogate_model: list, idx_failed: list[int], mu_failure: int = 0, verbose = False):
r"""
The TR-GEIM algorithm is used to reconstruct the `snaps` (`FunctionsList`), by solving the TR-GEIM linear system with `idx_failed` measure.
In order to retrieve information on the "failed measure", a surrogate model (e.g., GPR) has been trained to learn the map from non-failed external measures and the one related to `idx_failed`.
The interpolant is then defined in the standard way.
Parameters
----------
snaps : FunctionsList
Functions to reconstruction
M : int
Number of sensor to use
noise_value : float
Standard deviation of the noise, modelled as a normal :math:`\mathcal{N}(0, \sigma^2)`
reg_param : float
Regularising parameter :math:`\lambda`
ext_sens : FunctionsList
Basis sensors adopted to compute the external measures, input of `surrogate_model`
surrogate_model : list
List of all the trained surrogate models
idx_failed : list[int]
List of integers with the failed sensors
mu_failure : int, optional (default = 0)
Index from which failure starts, typically time.
verbose : boolean, optional (default = False)
If true, output is printed.
Returns
----------
errors : np.ndarray
Errors per each element in `snaps` (first column: absolute, second column: relative), measure in :math:`||\cdot ||_{L^2}`.
interps : FunctionsList
Interpolant Field :math:`\mathcal{I}_M[u]` of TR-GEIM.
resids : FunctionsList
Residual Field :math:`r_M[u]`,
"""
if M > self.Mmax:
print('The maximum number of measures must not be higher than '+str(self.Mmax)+' --> set equal to '+str(self.Mmax))
M = self.Mmax
assert(max(idx_failed) < M)
Ns = len(snaps)
# Computing clean measures and norms of the snapshots
clean_measures = np.zeros((Ns, M))
snap_norms = np.zeros((Ns,))
for mu in range(Ns):
clean_measures[mu] = self.compute_measure(snaps(mu), M = M)
snap_norms[mu] = self.norms.L2norm(snaps(mu))
ext_measures = compute_measure(snaps, ext_sens, noise_value=noise_value).T
errors = np.zeros((Ns, 2))
interps = FunctionsList(self.V)
resids = FunctionsList(self.V)
if verbose:
progressBar = LoopProgress(msg = "Computing TR-GEIM ml-remove (synthetic) - " + self.name, final = Ns )
# Assembling matrix for TR-GEIM
sys_matrix = (self.B[:M, :M].T @ self.B[:M, :M]) + reg_param * (self.T[:M, :M].T @ self.T[:M, :M])
for mu in range(Ns):
# Compute measurement vector with noise
y = clean_measures[mu].reshape(M,) + np.random.normal(0, noise_value, M).reshape(M,)
# If failure the measurement and the matrix are modified
if mu >= mu_failure:
for ii, _idx in enumerate(idx_failed):
y[_idx] = surrogate_model[ii].predict(ext_measures[mu].reshape(1,-1))[0].flatten()
rhs = np.dot(self.B[:M, :M].T, y) + reg_param * np.dot((self.T[:M, :M].T @ self.T[:M, :M]), self.mean_beta[:M])
# Solving the linear system
coeff = la.solve(sys_matrix, rhs)
# Compute the interpolant and residual
interpolant = self.mf.lin_combine(coeff)
residual = np.abs(snaps(mu) - interpolant)
errors[mu, 0] = self.norms.L2norm(residual)
errors[mu, 1] = errors[mu, 0] / snap_norms[mu]
# Storing interpolant and residual
interps.append(interpolant)
resids.append(residual)
if verbose:
progressBar.update(1, percentage = False)
return errors, interps, resids
[docs]
class PBDW():
r"""
This class can be used to perform the online phase of the PBDW formulation for synthetic measures :math:`\mathbf{y}\in\mathbb{R}^M` obtained as evaluations of the magic sensors on the snapshot :math:`u(\mathbf{x};\,\boldsymbol{\mu})` as
.. math::
y_m = v_m(u(\boldsymbol{\mu})) + \epsilon_m + \delta_{m} \qquad \qquad m = 1, \dots, M
in which :math:`\epsilon_,\sim \mathcal{N}(0, \sigma^2)` is random noise and :math:`\delta_m \sim \mathcal{N}(\kappa, \rho^2)` acting on some measurements. This term is referred to as drift.
Parameters
----------
basis_functions : FunctionsList
List of functions spanning the reduced space
basis_sensors : FunctionsList
List of sensors representation spanning the update space
name : str
Name of the snapshots (e.g., temperature T)
is_H1: boolean, optional (Default= False)
Boolean indicating whether to use scalar product in :math:`\mathcal{H}^1` or :math:`L^2`.
"""
def __init__(self, basis_functions: FunctionsList, basis_sensors: FunctionsList, name: str, is_H1: bool = False) -> None:
# store basis function and basis sensors
self.basis_functions = FunctionsList(basis_functions.fun_space)
self.basis_sensors = FunctionsList(basis_sensors.fun_space)
self.basis_functions._list = basis_functions.copy()
self.basis_sensors._list = basis_sensors.copy()
self.V = basis_functions.fun_space
# Defining the norm class to make scalar products and norms
self.is_H1 = is_H1
self.norms = norms(self.V, is_H1=is_H1)
self.name = name
N = len(basis_functions)
M = len(basis_sensors)
# A_{ii,jj} = (basis_sensors[ii], basis_sensors[jj])
self.A = np.zeros((M,M))
for ii in range(M):
for jj in range(M):
if jj>=ii:
if self.is_H1:
self.A[ii,jj] = self.norms.H1innerProd(basis_sensors(ii), basis_sensors(jj), semi = False)
else:
self.A[ii, jj] = self.norms.L2innerProd(basis_sensors(ii), basis_sensors(jj))
else:
self.A[ii,jj] = self.A[jj, ii]
# K_{ii,jj} = (basis_sensors[ii], basis_functions[jj])
self.K = np.zeros((M, N))
for ii in range(M):
for jj in range(N):
if self.is_H1:
self.K[ii,jj] = self.norms.H1innerProd(basis_sensors(ii), basis_functions(jj), semi = False)
else:
self.K[ii, jj] = self.norms.L2innerProd(basis_sensors(ii), basis_functions(jj))
self.Nmax = N
self.Mmax = M
[docs]
def drift_test_err(self, snaps: FunctionsList, N: int, M: int,
noise_value : float, reg_param : np.ndarray,
kappa: float, rho: float, idx_failed: list[int],
num_rep_exp: int = 30, mu_failure: int = 0, verbose = False):
r"""
The PBDW online algorithm is used to reconstruct the `snaps` (`FunctionsList`), by solving the PBDW linear system
.. math::
\left[
\begin{array}{ccc}
\xi \cdot M \cdot \mathbb{I} + \mathbb{A} & & \mathbb{K} \\ & & \\
\mathbb{K}^T & & 0
\end{array}
\right] \cdot
\left[
\begin{array}{c}
\boldsymbol{\alpha} \\ \\ \boldsymbol{\theta}
\end{array}
\right] =
\left[
\begin{array}{c}
\mathbf{y} \\ \\ \mathbf{0}
\end{array}
\right]
then the inteprolant and residual are computed and returned per each element of the list.
Parameters
----------
snaps : FunctionsList
Function to reconstruction
N : int
Dimension of the reduced space
M : int
Number of sensor to use
noise_value : float
Standard deviation of the noise, modelled as a normal :math:`\mathcal{N}(0, \sigma^2)`
reg_param : np.ndarray
Regularising parameter :math:`\lambda`
kappa : float
Mean value :math:`\kappa` of the drift
rho : float
Standard deviation :math:`\rho` of the drift
idx_failed : list[int]
List of integers with the failed sensors
num_rep_exp : int, optional (default = 30)
Number of repeated experiments.
mu_failure : int, optional (default = 0)
Index from which failure starts, typically time.
verbose : boolean, optional (default = False)
If true, output is printed.
Returns
----------
errors : np.ndarray
Errors per each element in `snaps` (first column: absolute, second column: relative), measure in :math:`||\cdot ||_{L^2}` and averaged for the numerical experiments.
interps : FunctionsList
Interpolant Field :math:`\mathcal{I}_M[u]` of TR-GEIM.
resids : FunctionsList
Residual Field :math:`r_M[u]`,
"""
if M > self.Mmax:
print('The maximum number of measures must not be higher than '+str(self.Mmax)+' --> set equal to '+str(self.Mmax))
M = self.Mmax
if N > self.Nmax:
print('The maximum number of basis functions must not be higher than '+str(self.Nmax)+' --> set equal to '+str(self.Nmax))
N = self.Nmax
assert(max(idx_failed) < M)
Ns = len(snaps)
# Computing clean measures and norms of the snapshots
clean_measures = np.zeros((Ns, M))
snap_norms = np.zeros((Ns,))
for mu in range(Ns):
for mm in range(M):
if self.is_H1:
clean_measures[mu, mm] = self.norms.H1innerProd(snaps(mu), self.basis_sensors(mm), semi=False)
else:
clean_measures[mu, mm] = self.norms.L2innerProd(snaps(mu), self.basis_sensors(mm))
snap_norms[mu] = self.norms.L2norm(snaps(mu))
errors = np.zeros((num_rep_exp, Ns, 2))
interps = FunctionsList(self.V)
resids = FunctionsList(self.V)
if verbose:
progressBar = LoopProgress(msg = "Computing PBDW drifted (synthetic) - " + self.name, final = num_rep_exp )
# Assembling PBDW matrix
assert(len(reg_param) == M)
sys_matr1 = np.hstack([reg_param * (M * np.eye(M, M)) + self.A[:M, :M], self.K[:M, :N]])
sys_matr2 = np.hstack([self.K[:M, :N].T, np.zeros((N, N))])
sys_matr = np.vstack([sys_matr1, sys_matr2])
for kk in range(num_rep_exp):
for mu in range(Ns):
# Compute measurement vector with noise
y = clean_measures[mu].reshape(M,) + np.random.normal(0, noise_value, M).reshape(M,)
# Adding drift
if mu >= mu_failure:
y[idx_failed] += np.random.normal(kappa, rho, len(idx_failed))
# Creating rhs
rhs = np.hstack([y, np.zeros((N,))]).flatten()
# Solving the linear system
coeff = la.solve(sys_matr, rhs)
# Compute the interpolant and residual
interpolant = self.basis_sensors.lin_combine(coeff[:M]) + self.basis_functions.lin_combine(coeff[M:])
residual = np.abs(snaps(mu) - interpolant)
errors[kk, mu, 0] = self.norms.L2norm(residual)
errors[kk, mu, 1] = errors[kk, mu, 0] / snap_norms[mu]
# Storing interpolant and residual
if kk + 1 == num_rep_exp:
interps.append(interpolant)
resids.append(residual)
if verbose:
progressBar.update(1, percentage = False)
return errors, interps, resids
[docs]
def pure_remove_test_err(self, snaps: FunctionsList, N: int, M: int,
noise_value : float, reg_param : np.ndarray,
idx_failed: list[int], mu_failure: int = 0, verbose = False):
r"""
The PBDW online algorithm is used to reconstruct the `snaps` (`FunctionsList`), by solving the modified PBDW linear system: in particular, `idx_failed` measure is removed thus from :math:`\mathbf{y}` the `idx_failed` row is deleted, from :math:`\mathbb{A}` its `idx_failed` row and col are deleted and from :math:`\mathbb{K}` its `idx_failed` row is deleted.
The interpolant is then defined as the sum over the obtained coefficients from the modified PBDW linear system, without `idx_failed`.
Parameters
----------
snaps : FunctionsList
Function to reconstruction
N : int
Dimension of the reduced space
M : int
Number of sensor to use
noise_value : float
Standard deviation of the noise, modelled as a normal :math:`\mathcal{N}(0, \sigma^2)`
reg_param : np.ndarray
Regularising parameter :math:`\lambda`
idx_failed : list[int]
List of integers with the failed sensors
mu_failure : int, optional (default = 0)
Index from which failure starts, typically time.
verbose : boolean, optional (default = False)
If true, output is printed.
Returns
----------
errors : np.ndarray
Errors per each element in `snaps` (first column: absolute, second column: relative), measure in :math:`||\cdot ||_{L^2}`.
interps : FunctionsList
Interpolant Field :math:`\mathcal{I}_M[u]` of TR-GEIM.
resids : FunctionsList
Residual Field :math:`r_M[u]`,
"""
if M > self.Mmax:
print('The maximum number of measures must not be higher than '+str(self.Mmax)+' --> set equal to '+str(self.Mmax))
M = self.Mmax
if N > self.Nmax:
print('The maximum number of basis functions must not be higher than '+str(self.Nmax)+' --> set equal to '+str(self.Nmax))
N = self.Nmax
assert(max(idx_failed) < M)
Ns = len(snaps)
# Computing clean measures and norms of the snapshots
clean_measures = np.zeros((Ns, M))
snap_norms = np.zeros((Ns,))
for mu in range(Ns):
for mm in range(M):
if self.is_H1:
clean_measures[mu, mm] = self.norms.H1innerProd(snaps(mu), self.basis_sensors(mm), semi=False)
else:
clean_measures[mu, mm] = self.norms.L2innerProd(snaps(mu), self.basis_sensors(mm))
snap_norms[mu] = self.norms.L2norm(snaps(mu))
errors = np.zeros((Ns, 2))
interps = FunctionsList(self.V)
resids = FunctionsList(self.V)
if verbose:
progressBar = LoopProgress(msg = "Computing PBDW remove (synthetic) - " + self.name, final = Ns )
# Assembling PBDW matrix
assert(len(reg_param) == M)
sys_matr1 = np.hstack([reg_param * (M * np.eye(M, M)) + self.A[:M, :M], self.K[:M, :N]])
sys_matr2 = np.hstack([self.K[:M, :N].T, np.zeros((N, N))])
sys_matr = np.vstack([sys_matr1, sys_matr2])
for mu in range(Ns):
# Compute measurement vector with noise
y = clean_measures[mu].reshape(M,) + np.random.normal(0, noise_value, M).reshape(M,)
# Creating rhs
rhs = np.hstack([y, np.zeros((N,))]).flatten()
# If failure the measurement and the matrix are modified
if mu >= mu_failure:
_rhs = np.delete(rhs, idx_failed)
_sys_matrix = np.delete(np.delete(sys_matr, idx_failed, axis=1), idx_failed, axis=0)
else:
_rhs = rhs
_sys_matrix = sys_matr
# Solving the linear system
coeff = la.solve(_sys_matrix, _rhs)
# Compute the interpolant and residual
if mu >= mu_failure:
interpolant = remove_lin_combine(self.basis_sensors, coeff[:M-1], M, sensI_drifted=idx_failed) + self.basis_functions.lin_combine(coeff[M-1:])
else:
interpolant = self.basis_sensors.lin_combine(coeff[:M]) + self.basis_functions.lin_combine(coeff[M:])
residual = np.abs(snaps(mu) - interpolant)
errors[mu, 0] = self.norms.L2norm(residual)
errors[mu, 1] = errors[mu, 0] / snap_norms[mu]
# Storing interpolant and residual
interps.append(interpolant)
resids.append(residual)
if verbose:
progressBar.update(1, percentage = False)
return errors, interps, resids
[docs]
def gpr_measure_test_err(self, snaps: FunctionsList, N: int, M: int,
noise_value : float, reg_param : np.ndarray,
ext_sens: FunctionsList, surrogate_model: list, idx_failed: list[int], mu_failure: int = 0, verbose = False):
r"""
The PBDW online algorithm is used to reconstruct the `snaps` (`FunctionsList`), by solving the PBDW linear systemwith `idx_failed` measure.
In order to retrieve information on the "failed measure", a surrogate model (e.g., GPR) has been trained to learn the map from non-failed external measures and the one related to `idx_failed`.
The interpolant is then defined in the standard way.
Parameters
----------
snaps : FunctionsList
Function to reconstruction
N : int
Dimension of the reduced space
M : int
Number of sensor to use
noise_value : float
Standard deviation of the noise, modelled as a normal :math:`\mathcal{N}(0, \sigma^2)`
reg_param : np.ndarray
Regularising parameter :math:`\lambda`
ext_sens : FunctionsList
Basis sensors adopted to compute the external measures, input of `surrogate_model`
surrogate_model : list
List of all the trained surrogate models
idx_failed : list[int]
List of integers with the failed sensors
mu_failure : int, optional (default = 0)
Index from which failure starts, typically time.
verbose : boolean, optional (default = False)
If true, output is printed.
Returns
----------
errors : np.ndarray
Errors per each element in `snaps` (first column: absolute, second column: relative), measure in :math:`||\cdot ||_{L^2}`.
interps : FunctionsList
Interpolant Field :math:`\mathcal{I}_M[u]` of TR-GEIM.
resids : FunctionsList
Residual Field :math:`r_M[u]`,
"""
if M > self.Mmax:
print('The maximum number of measures must not be higher than '+str(self.Mmax)+' --> set equal to '+str(self.Mmax))
M = self.Mmax
if N > self.Nmax:
print('The maximum number of basis functions must not be higher than '+str(self.Nmax)+' --> set equal to '+str(self.Nmax))
N = self.Nmax
assert(max(idx_failed) < M)
Ns = len(snaps)
# Computing clean measures and norms of the snapshots
clean_measures = np.zeros((Ns, M))
snap_norms = np.zeros((Ns,))
for mu in range(Ns):
for mm in range(M):
if self.is_H1:
clean_measures[mu, mm] = self.norms.H1innerProd(snaps(mu), self.basis_sensors(mm), semi=False)
else:
clean_measures[mu, mm] = self.norms.L2innerProd(snaps(mu), self.basis_sensors(mm))
snap_norms[mu] = self.norms.L2norm(snaps(mu))
ext_measures = compute_measure(snaps, ext_sens, noise_value=noise_value).T
errors = np.zeros((Ns, 2))
interps = FunctionsList(self.V)
resids = FunctionsList(self.V)
if verbose:
progressBar = LoopProgress(msg = "Computing PBDW remove (synthetic) - " + self.name, final = Ns )
# Assembling PBDW matrix
sys_matr1 = np.hstack([reg_param * (M * np.eye(M, M)) + self.A[:M, :M], self.K[:M, :N]])
sys_matr2 = np.hstack([self.K[:M, :N].T, np.zeros((N, N))])
sys_matr = np.vstack([sys_matr1, sys_matr2])
for mu in range(Ns):
# Compute measurement vector with noise
y = clean_measures[mu].reshape(M,) + np.random.normal(0, noise_value, M).reshape(M,)
if mu >= mu_failure:
for ii, _idx in enumerate(idx_failed):
y[_idx] = surrogate_model[ii].predict(ext_measures[mu].reshape(1,-1))[0].flatten()
# Creating rhs
rhs = np.hstack([y, np.zeros((N,))]).flatten()
# Solving the linear system
coeff = la.solve(sys_matr, rhs)
# Compute the interpolant and residual
interpolant = self.basis_sensors.lin_combine(coeff[:M]) + self.basis_functions.lin_combine(coeff[M:])
residual = np.abs(snaps(mu) - interpolant)
errors[mu, 0] = self.norms.L2norm(residual)
errors[mu, 1] = errors[mu, 0] / snap_norms[mu]
# Storing interpolant and residual
interps.append(interpolant)
resids.append(residual)
if verbose:
progressBar.update(1, percentage = False)
return errors, interps, resids