Source code for pyforce.offline.pod

# Offline Phase: Proper Orthogonal Decomposition
# Author: Stefano Riva, PhD Student, NRG, Politecnico di Milano
# Latest Code Update: 09 September 2024
# Latest Doc  Update: 24 May 2024

import numpy as np
import scipy
import warnings

from dolfinx.fem import (Function, FunctionSpace)
from sklearn.utils.extmath import randomized_svd

from pyforce.tools.backends import norms, LoopProgress
from pyforce.tools.functions_list import *

[docs] class POD(): r""" A class to perform the POD on a list of snapshots :math:`u(\mathbf{x};\,\boldsymbol{\mu})` dependent on some parameter :math:`\boldsymbol{\mu}\in\mathcal{P}\subset\mathbb{R}^p`. This class is used for `FunctionsList`, the POD modes are obtained from the eigendecomposition of the correlation matrix :math:`C\in\mathbb{R}^{N_s\times N_s}` .. math:: C_{ij} = \left(u(\cdot;\,\boldsymbol{\mu}_i),\,u(\cdot;\,\boldsymbol{\mu}_j)\right)_{L^2}\qquad i,j = 1, \dots, N_s .. math:: C \boldsymbol{\eta_n} = \lambda_n \boldsymbol{\eta_n}\qquad\qquad\qquad n = 1, \dots, N_s The eigenvalues :math:`\lambda_n` and eigenvectors :math:`\boldsymbol{\eta_n}` are immediately computed. Parameters ---------- train_snap : FunctionsList List of snapshots onto which the POD is performed. name : str Name of the field. verbose : boolean, optional (Default = False) If `True`, print of the progress is enabled. """ def __init__(self, train_snap: FunctionsList, name: str, use_scipy=False, verbose = False) -> None: self.Ns = len(train_snap) self.V = train_snap.fun_space self.norm = norms(self.V) self.name = name # Generate the correlation matrix using inner product in L^2 corrMatrix = np.zeros((self.Ns, self.Ns)) if verbose: progressBar = LoopProgress(msg = "Computing " + self.name + ' correlation matrix', final = self.Ns) for ii in range(self.Ns): for jj in range(self.Ns): if jj>=ii: corrMatrix[ii,jj] = self.norm.L2innerProd(train_snap(ii), train_snap(jj)) else: corrMatrix[ii,jj] = corrMatrix[jj,ii] if verbose: progressBar.update(1, percentage = False) # Solving the eigenvalue problem and sorting the eigenvalue/eigenvector pairs if use_scipy: eigenvalues, eigenvectors = scipy.linalg.eigh(corrMatrix, subset_by_value=[0,np.inf]) else: eigenvalues, eigenvectors = np.linalg.eigh(corrMatrix) sorted_indexes = np.argsort( eigenvalues * (-1.) ) eigenvalues = eigenvalues[sorted_indexes] eigenvectors = eigenvectors[:,sorted_indexes] # if sum(eigenvalues < 0) > 0: # warnings.warn("Check eigenvalues: some of them are negative! \n Try DiscretePOD with `svd` method") # Store the eigenvalue/eigenvector pairs self.eigenvalues = eigenvalues self.eigenvectors = eigenvectors
[docs] def GramSchmidt(self, fun: Function) -> np.ndarray: r""" Perform a step of the Gram-Schmidt process on POD modes :math:`\{\psi_k\}_{k=1}^r` adding `fun` :math:`=f` to enforce the orthonormality in :math:`L^2` .. math:: \psi_{r+1} = f - \sum_{k=1}^r \frac{(f, \psi_k)_{L^2}}{(\psi_k, \psi_k)_{L^2}}\psi_k Parameters ---------- fun : Function Function to add to the POD basis. Returns ------- normalised_fun : Function Orthonormalised function :math:`\psi_{r+1}` with respect to the POD basis :math:`\{\psi_k\}_{k=1}^r`. """ ii = len(self.PODmodes) # Defining the summation term _rescaling = list() for jj in range(ii+1): if jj < ii: _rescaling.append( self.norm.L2innerProd(fun, self.PODmodes(jj)) / self.norm.L2innerProd(self.PODmodes(jj), self.PODmodes(jj)) * self.PODmodes(jj) ) # Computing the normalised function normalised_fun = fun - sum(_rescaling) return normalised_fun / self.norm.L2norm(normalised_fun)
# Deprecated? # for jj in range(ii+1): # if jj < ii: # fun.vector.axpy(- self.norm.L2innerProd(fun, self.PODmodes(jj)) / self.norm.L2innerProd(self.PODmodes(jj), self.PODmodes(jj)), # self.PODmodes.map(jj).vector) # return fun.x.array[:] / self.norm.L2norm(fun)
[docs] def mode(self, train_snap: FunctionsList, r: int) -> Function: r""" Computes the `r`-th POD mode, according to the following formula .. math:: \psi_{r} (\mathbf{x})= \frac{1}{\lambda_r}\sum_{i=1}^{N_s} \eta_{r, i}\,u(\mathbf{x};\,\boldsymbol{\mu}_i) Parameters ---------- train_snap : FunctionsList List of snapshots onto which the POD is performed. r : int Integer input indicating the mode to define. """ return train_snap.lin_combine(self.eigenvectors[:,r] / np.sqrt(self.eigenvalues[r]))
[docs] def compute_basis(self, train_snap: FunctionsList, maxBasis: int, normalise = False) -> None: r""" Computes the POD modes. To enforce the orthonormality in :math:`L^2`, the Gram-Schmidt procedure can be used, if the number of modes to be used is high the numerical error in the eigendecomposition may be too large and the orthonormality is lost. Parameters ---------- train_snap : FunctionsList List of snapshots onto which the POD is performed. maxBasis : int Integer input indicating the number of modes to define. normalise : boolean, optional (Default = False) If True, the Gram-Schmidt procedure is used to normalise the POD modes. """ self.PODmodes = FunctionsList(self.V) for rankII in range(maxBasis): if normalise: self.PODmodes.append(self.GramSchmidt(self.mode(train_snap, rankII))) else: self.PODmodes.append(self.mode(train_snap, rankII))
[docs] def projection(self, u: Function, N: int) -> np.ndarray: r""" The reduced coefficients :math:`\{\alpha_k\}_{k=1}^N` of `u` using `N` modes :math:`\{\psi_k\}_{k=1}^N` are computed using projection in :math:`L_2`, i.e. .. math:: \alpha_k(\boldsymbol{\mu}) = (u(\cdot;\,\boldsymbol{\mu}), \,\psi_k)_{L^2}\qquad k = 1, \dots, N Parameters ---------- u : Function Function object to project onto the reduced space of dimension `N`. N : int Dimension of the reduced space, modes to be used. Returns ------- coeff : np.ndarray Modal POD coefficients of `u`, :math:`\{\alpha_k\}_{k=1}^N`. """ # The coefficients are computed using projection in L^2 coeff = np.zeros((N,)) for ii in range(N): coeff[ii] = self.norm.L2innerProd(u, self.PODmodes(ii)) return coeff
[docs] def train_error(self, train_snap: FunctionsList, maxBasis: int, verbose : bool = False) -> tuple[np.ndarray, np.ndarray, np.ndarray]: r""" The maximum absolute :math:`E_N` and relative :math:`\varepsilon_N` error on the train set is computed, by projecting it onto the reduced space in :math:`L^2`-sense .. math:: E_N = \max\limits_{\boldsymbol{\mu}\in\Xi_{\text{train}}} \left\| u(\mathbf{x};\,\boldsymbol{\mu}) - \sum_{n=1}^N \alpha_n(\boldsymbol{\mu})\cdot \psi_n(\mathbf{x})\right\|_{L^2} .. math:: \varepsilon_N = \max\limits_{\boldsymbol{\mu}\in\Xi_{\text{train}}} \frac{\left\| u(\mathbf{x};\,\boldsymbol{\mu}) - \sum_{n=1}^N \alpha_n(\boldsymbol{\mu})\cdot \psi_n(\mathbf{x})\right\|_{L^2}}{\left\| u(\mathbf{x};\,\boldsymbol{\mu})\right\|_{L^2}} Parameters ---------- train_snap : FunctionsList List of snapshots onto which the train error of the POD basis is performed. maxBasis : int Integer input indicating the maximum number of modes to use. verbose : boolean, optional (Default = False) If `True`, print of the progress is enabled. Returns ------- maxAbsErr : np.ndarray Maximum absolute errors as a function of the dimension of the reduced space. maxRelErr : np.ndarray Maximum absolute errors as a function of the dimension of the reduced space. coeff_matrix : np.ndarray Matrix of the modal coefficients, obtained by projection in :math:`L^2`. """ absErr = np.zeros((self.Ns, maxBasis)) relErr = np.zeros_like(absErr) coeff_matrix = np.zeros_like(absErr) if verbose: progressBar = LoopProgress(msg = "Computing train error " + self.name, final = self.Ns) resid = Function(self.V).copy() for mu in range(self.Ns): # Projecting the snapshots onto the reduced space coeff = self.projection(train_snap.map(mu), maxBasis) for M in range(maxBasis): # building residual field resid.x.array[:] = train_snap(mu) - self.PODmodes.lin_combine(coeff[:M+1]) absErr[mu,M] = self.norm.L2norm(resid) relErr[mu,M] = absErr[mu,M] / self.norm.L2norm(train_snap(mu)) coeff_matrix[mu, :] = coeff[:] if verbose: progressBar.update(1, percentage = False) return absErr.max(axis = 0), relErr.max(axis = 0), coeff_matrix
[docs] def test_error(self, test_snap: FunctionsList, maxBasis: int, verbose : bool = False): r""" The average absolute :math:`E_N` and relative :math:`\varepsilon_N` error on the test set is computed, by projecting it onto the reduced space in :math:`L^2`-sense .. math:: E_N = \left\langle \left\| u(\mathbf{x};\,\boldsymbol{\mu}) - \sum_{n=1}^N \alpha_n(\boldsymbol{\mu})\cdot \psi_n(\mathbf{x})\right\|_{L^2} \right\rangle_{\boldsymbol{\mu}\in\Xi_{\text{test}}} .. math:: \varepsilon_N =\left\langle \frac{\left\| u(\mathbf{x};\,\boldsymbol{\mu}) - \sum_{n=1}^N \alpha_n(\boldsymbol{\mu})\cdot \psi_n(\mathbf{x})\right\|_{L^2}}{\left\| u(\mathbf{x};\,\boldsymbol{\mu})\right\|_{L^2}} \right\rangle_{\boldsymbol{\mu}\in\Xi_{\text{test}}} Parameters ---------- test_snap : FunctionsList List of snapshots onto which the test error of the POD basis is performed. maxBasis : int Integer input indicating the maximum number of modes to use. verbose : boolean, optional (Default = False) If `True`, print of the progress is enabled. Returns ------- meanAbsErr : np.ndarray Maximum absolute errors as a function of the dimension of the reduced space. maxRelErr : np.ndarray Maximum absolute errors as a function of the dimension of the reduced space. coeff_matrix : np.ndarray Matrix of the modal coefficients, obtained by projection in :math:`L^2`. """ Ns_test = len(test_snap) absErr = np.zeros((Ns_test, maxBasis)) relErr = np.zeros_like(absErr) coeff_matrix = np.zeros_like(absErr) if verbose: progressBar = LoopProgress(msg = "Computing POD test error (projection) - " + self.name, final = Ns_test) resid = Function(self.V).copy() for mu in range(Ns_test): # Projecting the snapshots onto the reduced space coeff = self.projection(test_snap.map(mu), maxBasis) for M in range(maxBasis): # building residual field resid.x.array[:] = test_snap(mu) - self.PODmodes.lin_combine(coeff[:M+1]) absErr[mu,M] = self.norm.L2norm(resid) relErr[mu,M] = absErr[mu,M] / self.norm.L2norm(test_snap(mu)) coeff_matrix[mu, :] = coeff[:] if verbose: progressBar.update(1, percentage = False) meanAbsErr = absErr.mean(axis = 0) meanRelErr = relErr.mean(axis = 0) return meanAbsErr, meanRelErr, coeff_matrix
# discete POD - the SVD of the full snapshot matrix is used
[docs] class DiscretePOD(): r""" A class to perform the POD on a list of snapshots :math:`u(\mathbf{x};\,\boldsymbol{\mu})` dependent on some parameter :math:`\boldsymbol{\mu}`. This class is used for several kind of inputs (`FunctionsList`, vectors, images, matrices...). The snapshots are represented by a matrix :math:`\mathbb{S}\in\mathbb{R}^{\mathcal{N}_h\times N_s}`, such that .. math:: \mathbb{S}_{ij} = u(\mathbf{x}_i;\,\boldsymbol{\mu}_j) in which the dependence on :math:`\mathbf{x}_i` can be a true spatial dependence or the dofs of a matrix/image. The basis functions are computed using the `svd`, i.e. .. math:: \mathbb{U}, \Sigma, \mathbb{V}^\dagger = \text{svd}(\mathbb{S}) The basis functions are orthogonal in :math:`l_2` sense, hence the matrix containing the modes is orthogonal. Parameters ---------- train_snap : FunctionsMatrix or FunctionsList List of snapshots onto which the POD is performed. name : str Name of the field. Nmax : int, optional (default=None) If `None` the full matrices are stored, else only the first `Nmax`. random : bool, optional (default = False) If True and if `Nmax` is provided, the randomised SVD is used. """ def __init__(self, train_snap: FunctionsList, name: str, Nmax = None, random = False) -> None: self.Ns = len(train_snap) self.Nh = len(train_snap(0)) self.name = name self.modes = FunctionsList(dofs = train_snap.fun_shape) # Performing SVD of the snapshot matrix if random and Nmax is not None: U, Sigma, V_T = randomized_svd(train_snap.return_matrix(), n_components=Nmax, n_iter='auto') else: U, Sigma, V_T = np.linalg.svd(train_snap.return_matrix(), full_matrices=False) if sum(Sigma < 0) > 0: warnings.warn("Check singular values: some of them are negative!") if Nmax is None: self.Nmax = len(Sigma) else: self.Nmax = Nmax self.sing_vals = Sigma[:self.Nmax] self.Vh_train = V_T[:self.Nmax, :] # shape (Nmax, Ns) # Computing POD basis self.U = U for rankII in range(self.Nmax): self.modes.append(U[:, rankII]) # shape (Nh, Nmax)
[docs] def projection(self, snap: np.ndarray, N: int = None): r""" The reduced coefficients :math:`\mathbf{V}^\dagger_\star\in\mathbb{R}^{N\times N_\star}` of `snap`:math:`=\mathbb{S}_\star\in\mathbb{R}^{\mathcal{N}_h\times N_\star}` using `N` modes :math:`\mathbb{U}\in\mathbb{R}^{\mathcal{N}_h\times N}` are computed using projection in :math:`l_2`, i.e. .. math:: \mathbf{V}^\dagger_\star = \Sigma^{-1}\mathbb{U}^T\mathbb{S}_\star Parameters ---------- snap : np.ndarray Matrix object to project onto the reduced space of dimension `N`. Must be :math:`(\mathcal{N}_h, N_\star)`. N : int, optional (default = None) Dimension of the reduced space, modes to be used. If `None` all the modes are used. Returns ------- coeff : np.ndarray Modal POD coefficients of `snap`, :math:`\mathbf{V}^\dagger_\star`. """ if N is None: N = self.Nmax if isinstance(snap, FunctionsList): _snap = snap.return_matrix() else: _snap = snap Vh_star = np.dot(np.linalg.inv(np.diag(self.sing_vals[:N])), np.dot(self.modes.return_matrix().T[:N], _snap)) return Vh_star
[docs] def reconstruct(self, Vh_star: np.ndarray): r""" The reduced coefficients :math:`\mathbf{V}^\dagger_\star\in\mathbb{R}^{N\times N_\star}` are used to decode into the Full Order space :math:`\mathbb{R}^{\mathcal{N}_h}` using `N` modes :math:`\mathbb{U}\in\mathbb{R}^{\mathcal{N}_h\times N}`. .. math:: \mathbb{S}_\star = \mathbb{U}\Sigma\mathbf{V}^\dagger_\star Parameters ---------- Vh_star : np.ndarray Matrix object containing the POD coefficients. Must be :math:`(N, N_\star)`. Returns ------- snaps : np.ndarray Reconstructed field returned as an element of :math:`\mathbf{R}^{\mathcal{N_h}\times N_\star}`. """ N = len(Vh_star) assert( N <= self.Nmax ) return np.dot(self.modes.return_matrix()[:, :N] * self.sing_vals[:N], Vh_star)
[docs] def train_error(self, train_snap: FunctionsList, maxBasis: int, verbose = False): r""" The maximum absolute :math:`E_N` and relative :math:`\varepsilon_N` error on the train set is computed, by projecting it onto the reduced space in :math:`l^2`-sense .. math:: E_N = \max\limits_{\boldsymbol{\mu}\in\Xi_{\text{train}}} \left\| u(\mathbf{x};\,\boldsymbol{\mu}) - \sum_{n=1}^N \alpha_n(\boldsymbol{\mu})\cdot \psi_n(\mathbf{x})\right\|_{2} .. math:: \varepsilon_N = \max\limits_{\boldsymbol{\mu}\in\Xi_{\text{train}}} \frac{\left\| u(\mathbf{x};\,\boldsymbol{\mu}) - \sum_{n=1}^N \alpha_n(\boldsymbol{\mu})\cdot \psi_n(\mathbf{x})\right\|_{2}}{\left\| u(\mathbf{x};\,\boldsymbol{\mu})\right\|_{2}} Parameters ---------- train_snap : FunctionsMatrix or FunctionsList List of snapshots to project and compute errors maxBasis : int Integer input indicating the maximum number of modes to use. verbose : boolean, optional (Default = False) If `True`, print of the progress is enabled. Returns ------- maxAbsErr : np.ndarray Maximum absolute errors as a function of the dimension of the reduced space. maxRelErr : np.ndarray Maximum absolute errors as a function of the dimension of the reduced space. """ assert(maxBasis <= self.Nmax) absErr = np.zeros((self.Ns, maxBasis)) relErr = np.zeros_like(absErr) if verbose: progressBar = LoopProgress(msg="Computing train error - "+self.name, final = self.Ns) for mu in range(self.Ns): norm = np.linalg.norm(train_snap(mu)) for rank in range(maxBasis): recon = self.reconstruct(self.Vh_train[:rank+1, mu]) absErr[mu, rank] = np.linalg.norm(recon - train_snap(mu)) relErr[mu, rank] = absErr[mu, rank] / norm if verbose: progressBar.update(1, percentage=False) return absErr.max(axis = 0), relErr.max(axis = 0)