Tube Bundle#
This notebook implements the ISF algorithm to simulate inviscid fluids, governed by the incompressible Euler equations. The mesh is generated using the gmsh library for Python and the ISF differential problems are solved using FEniCSx, a Finite Element library.
[1]:
import numpy as np
from dolfinx.io.gmshio import model_to_mesh
from mpi4py import MPI
from dolfinx.io import XDMFFile
import gmsh
from petsc4py import PETSc
mesh_comm = MPI.COMM_WORLD
model_rank = 0
mesh_factor = 0.035
# Initialize the gmsh module
gmsh.initialize()
# Load the .geo file
gmsh.merge('mesh/tube_bundle2D.geo')
gmsh.model.geo.synchronize()
# Set algorithm (adaptive = 1, Frontal-Delaunay = 6)
gmsh.option.setNumber("Mesh.Algorithm", 6)
gmsh.option.setNumber("Mesh.MeshSizeFactor", mesh_factor)
gdim = 2
# Linear Finite Element
gmsh.model.mesh.generate(gdim)
gmsh.model.mesh.optimize("Netgen")
Info : Reading 'mesh/tube_bundle2D.geo'...
Info : Done reading 'mesh/tube_bundle2D.geo'
Info : Meshing 1D...
Info : [ 0%] Meshing curve 1 (Line)
Info : [ 10%] Meshing curve 2 (Line)
Info : [ 20%] Meshing curve 3 (Line)
Info : [ 20%] Meshing curve 4 (Line)
Info : [ 30%] Meshing curve 5 (Line)
Info : [ 30%] Meshing curve 6 (Line)
Info : [ 40%] Meshing curve 7 (Line)
Info : [ 40%] Meshing curve 8 (Line)
Info : [ 50%] Meshing curve 9 (Line)
Info : [ 50%] Meshing curve 10 (Line)
Info : [ 60%] Meshing curve 11 (Circle)
Info : [ 70%] Meshing curve 12 (Circle)
Info : [ 70%] Meshing curve 13 (Circle)
Info : [ 80%] Meshing curve 14 (Circle)
Info : [ 80%] Meshing curve 15 (Circle)
Info : [ 90%] Meshing curve 16 (Circle)
Info : [ 90%] Meshing curve 17 (Circle)
Info : [100%] Meshing curve 18 (Circle)
Info : Done meshing 1D (Wall 0.014566s, CPU 0.09841s)
Info : Meshing 2D...
Info : Meshing surface 1 (Plane, Frontal-Delaunay)
Info : Done meshing 2D (Wall 0.169252s, CPU 0.509922s)
Info : 4154 nodes 8320 elements
Info : Optimizing mesh (Netgen)...
Info : Done optimizing mesh (Wall 1.58802e-06s, CPU 0s)
Loading the mesh
[2]:
# Import into dolfinx
domain, _, ft = model_to_mesh(gmsh.model, MPI.COMM_WORLD, model_rank, gdim = gdim )
ft.name = "Facet markers"
boundary_markers = dict()
boundary_markers['inlet'] = 10
boundary_markers['adiab_walls'] = 20
boundary_markers['coldest_walls'] = 30
boundary_markers['hottest_walls'] = 40
boundary_markers['hot_centre'] = 50
boundary_markers['cold_centre'] = 60
boundary_markers['outlet'] = 70
domain_marker = 100
# Finalize the gmsh module
gmsh.finalize()
The ISF algorithm is based on the analogy between hydrodynamics and quantum mechinacs: in fact, it can be shown thta the Euler equations
are equivalent to a 2-component wave function \(\Psi=[\psi_1,\psi_2]^T\in\mathbb{C}^2\), solution to Schrodinger equation
and constrained to
The solution algorithm is divided into three main steps, at each time iteration:
Advection: through the free-particle Schrodinger
Pressure projection: Poisson problem to have a divergence free velocity
Phase shift
[3]:
import tqdm
import os
import matplotlib.pyplot as plt
from matplotlib import cm
The steps of the algorithm are imported from the src
folder.
[4]:
import sys
sys.path.append('../../.')
from src.prediction import schrodinger
from src.ISFvelocity import computeVelocity
from src.projection import poisson, phaseShift
from src.scalar_adv_diff import adv_diffusion
Time Loop#
An inlet velocity \(\mathbf{u}_{in} = [0.5, 0]^T\) is used and the total simulation time is \(T\), using a time step \(\Delta t = 10^{-2}\).
[5]:
# Inlet velocity
u_in = np.array([.5, 0., 0.])
# Time step
dt = 1e-2
# Equivalent viscosity
hbar = 0.01
# Assigning physical paramemeters
phys_parameters = {'uin': u_in,
'hbar': hbar}
gravity_dict = {'gravity': np.array([0, -9.81, 0]), 'beta': 0.4, 'Tref': 0}
# Degree of the polynomial for FEM - the Neumann BC at the outlet for step 2 either requires a finer mesh or degree_psi = 2
degree_psi = 1
### Assembling forms #####
print('Assembling Schrodinger predict')
step1 = schrodinger(domain, ft, boundary_marks=boundary_markers, degree_psi = degree_psi, c = np.array([.5 + 0 * 1j, .5 + 0 * 1j]))
step1.assemble(phys_parameters, dt=dt, gravity_dict=gravity_dict)
print('Assembling Schrodinger velocity')
ISF_vel = computeVelocity(domain, degree_psi=degree_psi)
ISF_vel.assemble(hbar)
print('Assembling Poisson')
step2 = poisson(domain, ft, boundary_marks=boundary_markers)
bc_poisson_values = {'uin': u_in, 'fully_Neumann': False, 'is_dirichlet': True}
step2.assemble(bc_poisson_values, gauss = True)
print('Assembling Schrodinger phase-shift')
step3 = phaseShift(domain, degree_psi = degree_psi)
step3.assemble(hbar)
##### Store solution #####
store_results = False
save_tilde = False
if store_results:
path = 'Results_Beta_'+str(gravity_dict['beta'])+'/'
if not os.path.exists(path):
os.system("mkdir "+path)
psi1_xdmf = XDMFFile(domain.comm, path+"psi1.xdmf", "w")
psi2_xdmf = XDMFFile(domain.comm, path+"psi2.xdmf", "w")
p_xdmf = XDMFFile(domain.comm, path+"p.xdmf", "w")
u_xdmf = XDMFFile(domain.comm, path+"U.xdmf", "w")
psi1_xdmf.write_mesh(domain)
psi2_xdmf.write_mesh(domain)
p_xdmf.write_mesh(domain)
u_xdmf.write_mesh(domain)
if save_tilde:
psi1_Tilde_xdmf = XDMFFile(domain.comm, path+"psi1_tilde.xdmf", "w")
psi2_Tilde_xdmf = XDMFFile(domain.comm, path+"psi2_tilde.xdmf", "w")
psi1_Tilde_xdmf.write_mesh(domain)
psi2_Tilde_xdmf.write_mesh(domain)
else:
path = None
Assembling Schrodinger predict
Assembling Schrodinger velocity
Assembling Poisson
Assembling Schrodinger phase-shift
The advection diffusion equation reads \begin{equation} \frac{\partial T}{\partial t} + \mathbf{u}\cdot \nabla T - \alpha \Delta T =0 \end{equation}
[6]:
temperature_bound_marks = {
'dirichlet' : dict(),
'neumann' : dict(),
'robin' : None
}
# Define class for passive scalar transport
_names = {'dirichlet': ['inlet', 'hot_centre', 'cold_centre'],
'neumann': ['hottest_walls', 'coldest_walls']}
for key in list(_names.keys()):
for name in _names[key]:
temperature_bound_marks[key][name] = boundary_markers[name]
particles = adv_diffusion(domain, ft, temperature_bound_marks)
# Define boundaries values
assigned_temperatures = [1, 4, 0.5]
bc_values = {'dirichlet': dict(), 'neumann': dict()}
for ii in range(len(assigned_temperatures)):
bc_values['dirichlet'][_names['dirichlet'][ii]] = PETSc.ScalarType(assigned_temperatures[ii])
assigned_heat_flux = [-0.05, 0.03]
for ii in range(len(assigned_heat_flux)):
bc_values['neumann'][_names['neumann'][ii]] = PETSc.ScalarType(assigned_heat_flux[ii])
# Assembling the differential problem
particles.assemble(bc_values, dt = dt, alpha = 5e-4, T_0=gravity_dict['Tref'])
# Create file for storage
if store_results:
T_xdmf = XDMFFile(domain.comm, path+"T.xdmf", "w")
T_xdmf.write_mesh(domain)
Let us solve the transient.
[7]:
from dolfinx import common
# Time parameters
T = 10.
num_steps = int(T/dt)
# Sampling rate to store the data
LL = 10
### Make contour plots
make_plots = True
if make_plots:
transient_data = {
'time': list(),
'psi1': list(),
'psi2': list(),
'p': list(),
'U': list(),
'T': list()
}
steps = ["1a", "1b", "1c", "2 ", "3a", "3b", "4"]
t_steps = list()
progress = tqdm.tqdm(desc="Advancing in time", total=num_steps)
for nn in range(num_steps):
## Step 1a: advection
with common.Timer("~Step 1a"):
if np.isclose(gravity_dict['beta'], 0):
psi1_adv, psi2_adv = step1.advect()
else:
psi1_adv, psi2_adv = step1.advect(temperature=particles.Told)
## Step 1b: normalisation
with common.Timer("~Step 1b"):
PsiTilde_1 = psi1_adv.copy()
PsiTilde_2 = psi2_adv.copy()
norm = np.sqrt(np.conj(psi1_adv.x.array[:]) * psi1_adv.x.array[:] + np.conj(psi2_adv.x.array[:]) * psi2_adv.x.array[:])
PsiTilde_1.x.array[:] = psi1_adv.x.array[:] / norm.real
PsiTilde_2.x.array[:] = psi2_adv.x.array[:] / norm.real
## Step 1c: compute non-solenoidal velocity
with common.Timer("~Step 1c"):
uTilde = ISF_vel.solve(PsiTilde_1, PsiTilde_2)
## Step 2: pressure projection
with common.Timer("~Step 2 "):
pressure = step2.solve(uTilde)
## Step 3a: phase shift
with common.Timer("~Step 3a"):
Psi_1, Psi_2 = step3.solve(PsiTilde_1, PsiTilde_2, pressure)
## Step 3b: compute new velocity
with common.Timer("~Step 3b"):
u_new = ISF_vel.solve(Psi_1, Psi_2)
## Step 4: Temperature advection-diffusion
with common.Timer("~Step 4"):
T_new = particles.solve(u_new)
if np.isclose((nn+1) % LL, 0):
if store_results:
pressure.name = "p"
Psi_1.name = "psi_1"
Psi_2.name = "psi_2"
u_new.name = "U"
T_new.name = "T"
p_xdmf.write_function(pressure, (nn+1) * dt)
psi1_xdmf.write_function(Psi_1, (nn+1) * dt)
psi2_xdmf.write_function(Psi_2, (nn+1) * dt)
u_xdmf.write_function(u_new, (nn+1) * dt)
T_xdmf.write_function(T_new, (nn+1) * dt)
if save_tilde:
psi1_Tilde_xdmf.write_function(PsiTilde_1, (nn+1) * dt)
psi2_Tilde_xdmf.write_function(PsiTilde_2, (nn+1) * dt)
if make_plots:
transient_data['time'].append((nn+1) * dt)
transient_data['psi1'].append(Psi_1.vector.array[:].copy())
transient_data['psi2'].append(Psi_2.vector.array[:].copy())
transient_data['p'].append(pressure.vector.array[:].copy())
transient_data['T'].append(T_new.vector.array[:].copy())
transient_data['U'].append(u_new.vector.array[:].copy())
# Update Old
with Psi_1.vector.localForm() as loc_, step1.psi1_old.vector.localForm() as loc_n:
loc_.copy(loc_n)
with Psi_2.vector.localForm() as loc_, step1.psi2_old.vector.localForm() as loc_n:
loc_.copy(loc_n)
t_steps.append(list())
for step in steps:
t_steps[nn].append(mesh_comm.gather(common.timing("~Step "+step), root=0))
progress.update(1)
if store_results:
p_xdmf.close()
psi1_xdmf.close()
psi2_xdmf.close()
u_xdmf.close()
T_xdmf.close()
if save_tilde:
psi1_Tilde_xdmf.close()
psi2_Tilde_xdmf.close()
del progress
Advancing in time: 100%|██████████| 1000/1000 [02:46<00:00, 6.02it/s]
In the end, let us print the time required by each step during the simulation
[8]:
if mesh_comm.rank == 0:
print("Time-step breakdown")
step_arr = np.asarray(t_steps)
time_per_run = step_arr[:, :, 0, 1] / step_arr[:, :, 0, 0]
for jj, step in enumerate(steps):
print(f"Step "+step+f": Min time: {np.min(time_per_run[:, jj]):.3e}, Max time: {np.max(time_per_run[:, jj]):.3e}")
Time-step breakdown
Step 1a: Min time: 3.575e-02, Max time: 5.005e-02
Step 1b: Min time: 1.960e-04, Max time: 3.131e-04
Step 1c: Min time: 5.891e-03, Max time: 7.291e-03
Step 2 : Min time: 1.014e-01, Max time: 1.097e-01
Step 3a: Min time: 7.297e-03, Max time: 8.815e-03
Step 3b: Min time: 5.744e-03, Max time: 6.171e-03
Step 4: Min time: 5.374e-03, Max time: 5.981e-03
Post Processing#
In this section, if the data have been saved some contour plots are produced using pyvista.
[9]:
import vtk
import pyvista as pv
import dolfinx
pv.start_xvfb()
def vector_grids(fun: dolfinx.fem.Function, mag_plot: bool, varname='u'):
topology, cells, geometry = dolfinx.plot.vtk_mesh(fun.function_space)
grid = pv.UnstructuredGrid(topology, cells, geometry)
values = np.zeros((geometry.shape[0], 3))
values[:, :len(fun)] = np.real(fun.x.array.reshape(geometry.shape[0], len(fun)))
grid[varname] = values
if mag_plot:
warped = grid.warp_by_vector(varname, factor=0.0)
else:
warped = grid.glyph(varname, factor=0.075, tolerance=0.015)
return warped, values
def grids(fun: dolfinx.fem.Function, real = True, varname='u'):
topology, cell_types, geometry = dolfinx.plot.vtk_mesh(fun.function_space)
u_grid = pv.UnstructuredGrid(topology, cell_types, geometry)
if real:
u_grid.point_data[varname] = fun.x.array[:].real
else:
u_grid.point_data[varname] = fun.x.array[:].imag
u_grid.set_active_scalars(varname)
return u_grid
Wave Functions#
A function is defined to plot the time instances of the real and imaginary part of the wave function \(\Psi = [\psi_1, \psi_2]\)
[10]:
def plot_wave_functions( snaps: dict, fun_spaces: dict,
vars_to_plot: list[str], tex_vars: list[str],
time_to_plot: int, title : str,
clims : list, filename: str = None, colormaps : list = None,
):
nrows = 2
ncols = len(vars_to_plot)
assert(len(tex_vars) == ncols)
assert(len(clims) == ncols)
if colormaps is None:
_colormaps = [cm.jet] * len(vars_to_plot)
else:
assert(nrows == len(colormaps))
_colormaps = colormaps
resolution = [750 * ncols, 400 * nrows]
plotter = pv.Plotter(shape=(nrows, ncols), off_screen=True, border=False, window_size=resolution)
lab_fontsize = 30
title_fontsize = 40
zoom = 1.75
dict_cb = dict( width = 0.75, height = 0.2,
title_font_size=title_fontsize,
label_font_size=lab_fontsize,
n_labels=4,
color = 'k',
position_x=0.125, position_y=0.75,
shadow=False)
for col in range(ncols):
fun = dolfinx.fem.Function(fun_spaces[vars_to_plot[col]])
fun.x.array[:] = snaps[vars_to_plot[col]][time_to_plot]
is_real = [True, False]
tex_real = ['\Re', '\Im']
for row in range(nrows):
plotter.subplot(row,col)
warped_ = grids(fun, real=is_real[row], varname=vars_to_plot[col])
dict_cb['title'] = '$'+tex_real[row]+'\{'+tex_vars[col]+'\}$'
plotter.add_mesh(warped_, clim=clims[col], cmap=_colormaps[col], show_edges=False, scalar_bar_args=dict_cb)
plotter.view_xy()
plotter.camera.zoom(zoom)
plotter.set_background('white', top='white')
plotter.subplot(nrows-1, 0)
plotter.add_text(title, color= 'k', position=[50, 50], font_size=30)
## Save figure
if filename is not None:
plotter.screenshot(filename+'.png', transparent_background = True, window_size=resolution)
plotter.close()
else:
plotter.show()
im = plotter.image
plotter.close()
return im
For some time steps the images are generated
[11]:
from IPython.display import clear_output
fun_spaces = {
'psi1': step3.V.sub(0).collapse()[0],
'psi2': step3.V.sub(1).collapse()[0]
}
vars_to_plot = ['psi1', 'psi2']
tex_vars_to_plot = [r'\psi_1', r'\psi_2']
clims = [
[-1,1],
[-1,1]
]
colormaps = [
cm.plasma,
cm.hot
]
times_to_plot = np.arange(0, len(transient_data['time']), 2, dtype=int)
frames = list()
for time_to_plot in times_to_plot:
title = 'Time = {:.2f}'.format(transient_data['time'][time_to_plot])
a = plot_wave_functions(transient_data, fun_spaces, vars_to_plot, tex_vars_to_plot, time_to_plot, title, clims, colormaps = colormaps)
frames.append(a)
clear_output()
Let us create a gif or a video and let us display it
[16]:
# from io import BytesIO
# from IPython.display import Image as IPImage, display
# from PIL import Image
# # Convert each frame array to an image
# pil_images = [Image.fromarray(frame) for frame in frames]
# # Save the frames as a GIF in memory
# gif_buffer = BytesIO()
# pil_images[0].save(gif_buffer, format='GIF', save_all=True, append_images=pil_images[1:], duration=200, loop=0)
# gif_buffer.seek(0) # Move the cursor to the start of the buffer
# # Display the GIF in the notebook
# display(IPImage(data=gif_buffer.read()))
from io import BytesIO
import imageio
from IPython.display import Video, HTML
import base64
# Create an in-memory buffer to store the video
video_buffer = BytesIO()
# Write frames to the video buffer using imageio-ffmpeg
writer = imageio.get_writer(video_buffer, format='mp4', mode='I', fps=10, codec='libx264')
for frame in frames:
writer.append_data(frame)
writer.close()
# Ensure the cursor is at the start of the buffer
video_buffer.seek(0)
# Convert the video buffer to a base64 string
video_data = base64.b64encode(video_buffer.read()).decode('ascii')
# Display the video using HTML with autoplay and loop
video_html = f'''
<video controls autoplay loop>
<source src="data:video/mp4;base64,{video_data}" type="video/mp4">
</video>
'''
display(HTML(video_html))
Fluid Dynamics Variables#
A function is defined to plot the time instances of the velocity \(\mathbf{u}\) and the temperature \(T\)
[13]:
def plot_fluid_vars( snaps: dict, fun_spaces: dict,
vars_to_plot: list[str], tex_vars: list[str],
time_to_plot: int, title : str,
clims : list,
filename: str = None, colormaps : list = None, mag_plot = True,
):
nrows = 1
ncols = len(vars_to_plot)
assert(len(tex_vars) == ncols)
assert(len(clims) == ncols)
if colormaps is None:
_colormaps = [cm.jet] * len(vars_to_plot)
else:
assert(ncols == len(colormaps))
_colormaps = colormaps
resolution = [750 * ncols, 400 * nrows]
plotter = pv.Plotter(shape=(nrows, ncols), off_screen=True, border=False, window_size=resolution)
lab_fontsize = 30
title_fontsize = 40
zoom = 1.75
dict_cb = dict( width = 0.75, height = 0.2,
title_font_size=title_fontsize,
label_font_size=lab_fontsize,
n_labels=4,
color = 'k',
position_x=0.125, position_y=0.75,
shadow=False)
for col in range(ncols):
fun = dolfinx.fem.Function(fun_spaces[vars_to_plot[col]])
fun.x.array[:] = snaps[vars_to_plot[col]][time_to_plot]
plotter.subplot(0,col)
if fun_spaces[vars_to_plot[col]].num_sub_spaces > 0:
warped_, _ = vector_grids(fun, mag_plot=mag_plot, varname=vars_to_plot[col])
else:
warped_ = grids(fun)
dict_cb['title'] = '$'+tex_vars[col]+'$'
if fun_spaces[vars_to_plot[col]].num_sub_spaces > 0 and mag_plot == False:
warped2, _ = vector_grids(fun, mag_plot=True, varname=vars_to_plot[col])
plotter.add_mesh(warped2, clim=clims[col], cmap=_colormaps[col], show_edges=False, scalar_bar_args=dict_cb)
plotter.add_mesh(warped_, clim=clims[col], color='black', show_edges=False, scalar_bar_args=dict_cb)
else:
plotter.add_mesh(warped_, clim=clims[col], cmap=_colormaps[col], show_edges=False, scalar_bar_args=dict_cb)
plotter.view_xy()
plotter.camera.zoom(zoom)
plotter.set_background('white', top='white')
plotter.subplot(nrows-1, 0)
plotter.add_text(title, color= 'k', position=[50, 50], font_size=30)
## Save figure
if filename is not None:
plotter.screenshot(filename+'.png', transparent_background = True, window_size=resolution)
plotter.close()
else:
plotter.show()
im = plotter.image
plotter.close()
return im
Let us plot the fluid dynamics variables
[14]:
from IPython.display import clear_output
fun_spaces = {
'U': particles.W,
'T': particles.Q
}
vars_to_plot = ['U', 'T']
tex_vars_to_plot = [r'\mathbf{u}', r'T']
clims = [
[0, 0.5],
[-0.5, 4]
]
colormaps = [
cm.Spectral_r, cm.turbo
]
times_to_plot = np.arange(0, len(transient_data['time']), 1, dtype=int)
frames_th = list()
for time_to_plot in times_to_plot:
title = 'Time = {:.2f}'.format(transient_data['time'][time_to_plot])
a = plot_fluid_vars(transient_data, fun_spaces, vars_to_plot, tex_vars_to_plot, time_to_plot, title, clims, colormaps = colormaps, mag_plot=True)
frames_th.append(a)
clear_output()
Let us create a gif or a video and let us display it
[15]:
# # Convert each frame array to an image
# pil_images = [Image.fromarray(frame) for frame in frames_th]
# # Save the frames as a GIF in memory
# gif_buffer = BytesIO()
# pil_images[0].save(gif_buffer, format='GIF', save_all=True, append_images=pil_images[1:], duration=200, loop=0)
# gif_buffer.seek(0) # Move the cursor to the start of the buffer
# # Display the GIF in the notebook
# display(IPImage(data=gif_buffer.read()))
# Create an in-memory buffer to store the video
video_buffer = BytesIO()
# Write frames to the video buffer using imageio-ffmpeg
writer = imageio.get_writer(video_buffer, format='mp4', mode='I', fps=10, codec='libx264')
for frame in frames_th:
writer.append_data(frame)
writer.close()
# Ensure the cursor is at the start of the buffer
video_buffer.seek(0)
# Convert the video buffer to a base64 string
video_data = base64.b64encode(video_buffer.read()).decode('ascii')
# Display the video using HTML with autoplay and loop
video_html = f'''
<video controls autoplay loop>
<source src="data:video/mp4;base64,{video_data}" type="video/mp4">
</video>
'''
display(HTML(video_html))