# Schrodinger Advection Step
# Author: Stefano Riva, PhD Student, NRG, Politecnico di Milano
# Latest Code Update: 24 November 2023
# Latest Doc Update: 24 November 2023
from math import isclose
import numpy as np
import dolfinx
from dolfinx import fem
from dolfinx.fem import petsc, Function, FunctionSpace, dirichletbc, locate_dofs_topological, form, assemble_scalar
import ufl
from ufl import grad, div, nabla_grad, inner, dot, real, FacetNormal
from petsc4py import PETSc
[docs]
class PlaneWave():
"""
A class to generate a plane wave for the wave function, defined as
.. math::
\psi(\mathbf{x}, t) = c \cdot e^{ i\,(\mathbf{k} \cdot \mathbf{x} - \omega \, t) }
in which :math:`c\in\mathbb{C}` is a normalisation constant, :math:`\mathbf{k}` is the wave vector and :math:`\omega` is the frequency, defined as
.. math::
\mathbf{k} = h^{-1}\cdot \mathbf{u} \qquad \qquad \omega = (\mathbf{u}\cdot \mathbf{u}) / (2\hbar)
The plane wave is used to assign a fixed velocity field to a certain region.
Parameters
----------
t : float
Initial Time.
kVec : np.ndarray
Wave vector.
omega : float
Frequency of the wave.
hbar : float
Value of :math:`\hbar`.
c : complex
Normalisation constant.
"""
def __init__(self, t: float, kVec: np.ndarray, omega: float, hbar: float, c: complex):
self.t = t
self.hbar = hbar
self.kVec = kVec
self.omega = omega
self.c = c
def __call__(self, x):
"""
Defines the class as callable, this is required to interpolate this into a `dolfinx.fem.Function` object.
"""
# Definition of the spatial contribution of the plane wave
spatial = sum([self.kVec[ii] * x[ii] for ii in range(len(self.kVec))])
return self.c * np.exp( 1j * ( spatial - self.omega * self.t))
[docs]
class schrodinger():
"""
This class implements the prediction step of the Incompressible Schrodinger Flow, in particular the free-particle Schrodinger equation is solved.
Parameters
----------
domain : dolfinx.mesh.Mesh
Mesh imported.
ft : dolfinx.cpp.mesh.MeshTags_int32
Face tags of the boundaries.
boundary_marks : dict
Dictionary with the markers for the boundaries.
degree_psi : int (Default = 1)
Degree of the polynomials used for the definition of the functional space.
c : np.ndarray (Default = :math:`[1, 0.01]`)
Constant, in the `__init__` is normalised such that :math:`\sqrt{c_1^*c_1+c_2^*c_2}`.
"""
def __init__(self, domain: dolfinx.mesh.Mesh, ft: dolfinx.cpp.mesh.MeshTags_int32, boundary_marks: dict,
degree_psi: int = 1, c: np.ndarray = np.array([1.0 + 0 * 1j, 0.01 + 0 * 1j])):
# Domain
self.domain = domain
self.ft = ft
self.gdim = self.domain.geometry.dim
self.fdim = self.gdim - 1
# Definition of the integral differential (both in and on the domain)
metadata = {"quadrature_degree": 4}
self.dx = ufl.Measure("dx", domain=self.domain, metadata=metadata)
self.ds = ufl.Measure("ds", domain=self.domain, subdomain_data=self.ft, metadata=metadata)
# Assign markers: only inlet is needed, on the other boundaries homogeneous Neumann is imposed
self.inl_marker = boundary_marks['inlet']
# Functional Spaces for the wave function
self.P2 = ufl.FiniteElement("Lagrange", self.domain.ufl_cell(), degree_psi)
self.mixEl = ufl.MixedElement(self.P2, self.P2)
self.V = FunctionSpace(self.domain, self.mixEl)
# Functional Spaces for the temperature - buoyancy effects
self.Qn = FunctionSpace(self.domain, ("Lagrange", 1))
# Definition of the trial and test functions for the wave function
(self.psi1, self.psi2) = ufl.TrialFunctions(self.V)
(self.v1, self.v2) = ufl.TestFunctions(self.V)
# Initialisation
self.t = 0.
self.c1 = c[0]
self.c2 = c[1]
constantNorm = np.sqrt(np.conj(self.c1) * self.c1 + np.conj(self.c2) * self.c2)
self.c1 /= constantNorm
self.c2 /= constantNorm
self.psi1_old = Function(self.V.sub(0).collapse()[0]).copy()
self.psi2_old = Function(self.V.sub(1).collapse()[0]).copy()
self.psi1_old.interpolate(lambda x: self.c1 + x[0] * 0.0 + x[1] * 0.0)
self.psi2_old.interpolate(lambda x: self.c2 + x[0] * 0.0 + x[1] * 0.0)
# self.psi1_old.x.set(self.c1)
# self.psi2_old.x.set(self.c2)
[docs]
def updateInlet(self):
"""
This function updates the inlet condition of the plane wave.
"""
self.psi1_in.interpolate(self.wave1)
self.psi2_in.interpolate(self.wave2)
[docs]
def assemble(self, phys_parameters: dict, dt: float = 1e-2, direct : bool = False,
gravity_dict: dict = {'gravity': np.array([0, 0, 0]), 'beta': 0, 'Tref': 0} ):
"""
This function assign the boundary condition (if inlet is present), assembles the forms of the weak formulations and creates the linear structures for the solution (matrix, rhs and solver).
Parameters
----------
phys_parameters : dict
Dictionary with the physical parameters.
dt : float (Default: 1e-2)
Time step.
direct : boolean (Default: False)
Boolean variable to identify if a direct solver must be used.
gravity_dict : dict (Default: None)
Dictionary with the gravity parameters.
"""
# Assign the time step
self.dt = dt
# Assign the hbar value
self.hbar = phys_parameters['hbar']
# Assign the inlet BC, if required
if self.inl_marker is not None:
# Defining wave vector and frequency
self.kVec = phys_parameters['uin'] / self.hbar
self.omega = np.dot(phys_parameters['uin'], phys_parameters['uin']) / ( 2 * self.hbar)
# Defining inlet waves
self.psi1_in = Function(self.V.sub(0).collapse()[0]).copy()
self.psi2_in = Function(self.V.sub(1).collapse()[0]).copy()
self.wave1 = PlaneWave(self.t, self.kVec, self.omega, self.hbar, self.c1)
self.wave2 = PlaneWave(self.t, self.kVec, self.omega, self.hbar, self.c2)
# Updating value of the inlet waves
self.updateInlet()
# Define the inlet boundary conditions
self.bc_in1 = dirichletbc(self.psi1_in, locate_dofs_topological((self.V.sub(0), self.V.sub(0).collapse()[0]), self.fdim, self.ft.find(self.inl_marker)), self.V.sub(0))
self.bc_in2 = dirichletbc(self.psi2_in, locate_dofs_topological((self.V.sub(1), self.V.sub(1).collapse()[0]), self.fdim, self.ft.find(self.inl_marker)), self.V.sub(1))
self.bcs = [self.bc_in1, self.bc_in2]
else:
# If there is no inlet the whole boundary has homogeneous Neumann imposed.
self.bcs = []
# Define the left-hand side of the form
self.lhs = 1. / self.dt * inner(self.psi1, self.v1) * self.dx
self.lhs += 1. / self.dt * inner(self.psi2, self.v2) * self.dx
self.lhs += 1j * self.hbar / 2. * inner(grad(self.psi1), grad(self.v1)) * self.dx
self.lhs += 1j * self.hbar / 2. * inner(grad(self.psi2), grad(self.v2)) * self.dx
## If the gravitational field is considered an additional potential term is added
if sum(abs(gravity_dict['gravity'])) > 0:
# Defining gravity functions
self.g1 = Function(self.V.sub(0).collapse()[0]).copy()
self.g2 = Function(self.V.sub(1).collapse()[0]).copy()
self.g1.interpolate(lambda xx: +(xx[0] * gravity_dict['gravity'][0] + xx[1] * gravity_dict['gravity'][1] + xx[2] * gravity_dict['gravity'][2]))
self.g2.interpolate(lambda xx: -(xx[0] * gravity_dict['gravity'][0] + xx[1] * gravity_dict['gravity'][1] + xx[2] * gravity_dict['gravity'][2]))
# Defining the temperature functions
self.Tref = Function(self.Qn).copy()
self.Told = Function(self.Qn).copy()
self.Tref.interpolate(lambda x: gravity_dict['Tref'] + x[0] * 0.0 + x[1] * 0.0)
self.Told.interpolate(lambda x: gravity_dict['Tref'] + x[0] * 0.0 + x[1] * 0.0)
self.beta = PETSc.ScalarType(gravity_dict['beta'])
# Defining the buoyancy term and add to the lhs
self._g1 = self.g1 * ( 1 - self.beta * (self.Told - self.Tref) )
self._g2 = self.g2 * ( 1 - self.beta * (self.Told - self.Tref) )
self.lhs += inner(self._g1 * self.psi1, self.v1) * self.dx
self.lhs += inner(self._g2 * self.psi2, self.v2) * self.dx
# Forming the lhs
self.a = form( self.lhs )
# Defining the right-hand side of the form
self.L = form( 1. / self.dt * inner(self.psi1_old, self.v1) * self.dx +
1. / self.dt * inner(self.psi2_old, self.v2) * self.dx)
# Creating matrix and vector
self.A = petsc.create_matrix(self.a)
petsc.assemble_matrix(self.A, self.a, self.bcs)
self.A.assemble()
self.b = fem.petsc.create_vector(self.L)
# Define solver
self.solver = PETSc.KSP().create(self.domain.comm)
self.solver.setOperators(self.A)
if direct:
self.solver.setType(PETSc.KSP.Type.PREONLY)
self.solver.getPC().setType(PETSc.PC.Type.LU)
else:
self.solver.setType(PETSc.KSP.Type.BCGS)
self.solver.getPC().setType(PETSc.PC.Type.JACOBI)
# Define solution structures
self.solution = Function(self.V).copy()
# self.psi1_norm, self.psi2_norm = self.solution.sub(0).collapse(), self.solution.sub(1).collapse()
[docs]
def advect(self, temperature: dolfinx.fem.Function = None):
"""
This function advances in time by solving the linear problem at each time step.
Parameters
----------
temperature : dolfinx.fem.Function (Default = None)
Temperature at time :math:`t_n`, if None there is no update.
"""
self.t += self.dt
# Updating BC
if self.inl_marker is not None:
self.wave1.t = self.t
self.wave2.t = self.t
self.updateInlet()
# Updating Temperature - If not None
if temperature is not None:
self.Told.x.array[:] = temperature.x.array
self.A.zeroEntries()
fem.petsc.assemble_matrix(self.A, self.a, self.bcs)
self.A.assemble()
# Assembling right-hand side vector
with self.b.localForm() as loc:
loc.set(0)
fem.petsc.assemble_vector(self.b, self.L)
fem.petsc.apply_lifting(self.b, [self.a], [self.bcs])
self.b.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
fem.petsc.set_bc(self.b, self.bcs)
# Solving the linear problem and returning solution
self.solver.solve(self.b, self.solution.vector)
self.solution.x.scatter_forward()
return self.solution.sub(0).collapse(), self.solution.sub(1).collapse()
##########################################################################################################################################
################### Additional routines ################
##########################################################################################################################################
# def extractInlet(self, wave_fun, in_size):
# Ny = 50
# y_grid = np.linspace(0., in_size, Ny)
# points = np.zeros((3, Ny))
# points[0, :] = 0.
# points[1, :] = y_grid
# bb_tree = dolfinx.geometry.BoundingBoxTree(self.domain, self.domain.topology.dim)
# cells = []
# points_on_proc = []
# cell_candidates = dolfinx.geometry.compute_collisions(bb_tree, points.T)
# colliding_cells = dolfinx.geometry.compute_colliding_cells(self.domain, cell_candidates, points.T)
# for i, point in enumerate(points.T):
# if len(colliding_cells.links(i))>0:
# points_on_proc.append(point)
# cells.append(colliding_cells.links(i)[0])
# xPlot = np.array(points_on_proc, dtype=np.float64)
# in_wave = wave_fun.eval(xPlot, cells).flatten()
# return in_wave[int(Ny/2)]
# class normalisation():
# """_summary_
# """
# def __init__(self, domain, ft, boundary_marks : dict, degree_psi = 2):
# # Domain
# self.domain = domain
# self.ft = ft
# self.gdim = self.domain.geometry.dim
# self.fdim = self.gdim - 1
# metadata = {"quadrature_degree": 4}
# self.dx = ufl.Measure("dx", domain=self.domain, metadata=metadata)
# self.ds = ufl.Measure("ds", domain=self.domain, subdomain_data=self.ft, metadata=metadata)
# # Assign markers
# self.inl_marker = boundary_marks['inlet']
# self.out_marker = boundary_marks['outlet']
# # Functional Spaces
# self.P2 = ufl.FiniteElement("Lagrange", self.domain.ufl_cell(), degree_psi)
# self.mixEl = ufl.MixedElement(self.P2, self.P2)
# self.V = FunctionSpace(self.domain, self.mixEl)
# (self.psi1, self.psi2) = ufl.TrialFunctions(self.V)
# (self.v1, self.v2) = ufl.TestFunctions(self.V)
# def assemble(self):
# self.psiTilde = Function(self.V)
# (self.psiTilde_1, self.psiTilde_2) = (self.psiTilde.sub(0).collapse(), self.psiTilde.sub(1).collapse())
# self.a = form( inner(self.psi1, self.v1) * self.dx + inner(self.psi2, self.v2) * self.dx )
# self.L = form( inner(self.psiTilde_1 * ufl.algebra.Power( inner(self.psiTilde_1, self.psiTilde_1) +
# inner(self.psiTilde_2, self.psiTilde_2), -0.5),
# self.v1) * self.dx +
# inner(self.psiTilde_2 * ufl.algebra.Power( inner(self.psiTilde_1, self.psiTilde_1) +
# inner(self.psiTilde_2, self.psiTilde_2), -0.5),
# self.v2) * self.dx )
# self.A = fem.petsc.assemble_matrix(self.a)
# self.A.assemble()
# self.b = fem.petsc.create_vector(self.L)
# self.solver = PETSc.KSP().create(self.domain.comm)
# self.solver.setOperators(self.A)
# self.solver.setType(PETSc.KSP.Type.CG)
# self.solver.getPC().setType(PETSc.PC.Type.SOR)
# # self.solver.setType(PETSc.KSP.Type.PREONLY)
# # self.solver.getPC().setType(PETSc.PC.Type.LU)
# def solve(self, psiTilde_1, psiTilde_2):
# with psiTilde_1.vector.localForm() as loc_, self.psiTilde_1.vector.localForm() as loc_n:
# loc_.copy(loc_n)
# with psiTilde_2.vector.localForm() as loc_, self.psiTilde_2.vector.localForm() as loc_n:
# loc_.copy(loc_n)
# with self.b.localForm() as loc:
# loc.set(0)
# fem.petsc.assemble_vector(self.b, self.L)
# self.b.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE)
# solution = Function(self.V).copy()
# self.solver.solve(self.b, solution.vector)
# return (solution.sub(0).collapse(), solution.sub(1).collapse())