Select Git revision
LDDsimulation.py
LDDsimulation.py 59.33 KiB
"""
LDD simulation class
"""
import os
import dolfin as df
import numpy as np
import typing as tp
import mshr
import domainPatch as dp
import helpers as hlp
import pandas as pd
import sys
import copy
# import errno
import h5py
from termcolor import colored
class LDDsimulation(object):
""" Main Class of LDD simulation
LDDsimulation is the main class for the LDD simulation. It contains methods for
setting up and running the LDD simulation for different meshes and domains.
"""
def __init__(self, dimension: int = 2,#
tol: float = None):
hlp.print_once("\n ###############################################\n",
"############# Begin Simulation ################\n",
"###############################################\n")
hlp.print_once("Init LDD simulation...")
# parameter as set in NSAC_simulation by Lukas Ostrowski
hlp.print_once("Set internal fenics parameter...")
# calculation exactness for the whole simulation.
if tol:
self.tol = tol
else:
self.tol = df.DOLFIN_EPS
# dimension of the simulation. This is by default 2 as I don't intend
# to implement 3D.
self.dim = dimension
#Parameters
# df.parameters["allow_extrapolation"] = True
# df.parameters["refinement_algorithm"] = "plaza_with_parent_facets"
df.parameters["form_compiler"]["quadrature_degree"] = 12
# interpolation degree, for source terms, intitial and boundary conditions.
self.interpolation_degree = 8
# # To be able to run DG in parallel
# df.parameters["ghost_mode"] = "shared_facet"
# df.parameters["ghost_mode"] = "none"
# Form compiler options
df.parameters["form_compiler"]["optimize"] = True
df.parameters["form_compiler"]["cpp_optimize"] = True
# df.parameters['std_out_all_processes'] = False
df.set_log_level(df.LogLevel.INFO)
# df.set_log_level(df.LogLevel.INFO)
# CRITICAL = 50, // errors that may lead to data corruption and suchlike
# ERROR = 40, // things that go boom
# WARNING = 30, // things that may go boom later
# INFO = 20, // information of general interest
# PROGRESS = 16, // what's happening (broadly)
# TRACE = 13, // what's happening (in detail)
# DBG = 10 // sundry
hlp.print_once("Set default parameter...\n")
self.restart_file = None
# # write checkpoints
# self.write_checkpoints = True
# # dont project solution to CG for visualization
# self.project_to_cg = False
# # no sponge zones
# self.sponge = False
# set up mpi for internal parallelisation.
self._mpi_rank = df.MPI.rank(df.MPI.comm_world)
self._mpi_communicator = df.MPI.comm_world
## Public variables.
# directory to write files output to.
self.output_dir = None
# list of subdomain meshes. Is populated by self.init_meshes_and_markers()
self.mesh_subdomain = []
# global marker for subdomains and interfaces. Is set by self.init_meshes_and_markers()
self.domain_marker = None
# List of interfaces of class Interface. This is populated by self.init_interfaces
self.interface = []
# The global interface marker. Gets initialised by self.init_interfaces().
self.interface_marker = None
# List of list of indices indicating subdomains adjacent to interfaces.
# self.adjacent_subdomains[i] gives the two indices of subdomains which
# are share the interface self.interface[i]. This gets populated by
# self.init_interfaces()
self.adjacent_subdomains = None
## Private variables
# maximal number of L-iterations that the LDD solver uses.
self._max_iter_num = 1000
# TODO rewrite this with regard to the mesh sizes
# self.calc_tol = self.tol
# list of timesteps that get analyed. Gets initiated by self._init_analyse_timesteps
self.analyse_timesteps = None
# The number of subdomains are counted by self.init_meshes_and_markers()
self._number_of_subdomains = 0
# variable to check if self.set_parameters() has been called
self._parameters_set = False
# flag holds true if self.initialise() has been executed.
self._initialised = False
# dictionary of objects of class DomainPatch initialised by self._init_subdomains()
self.subdomain = dict()
# dictionary to hold the input Expressions for the external boundary. These are
# turned into df.Expressions objects by the method update_DirichletBC_dictionary()
# this is mostly in place when examples with exact solutions are calculated.
self.dirichletBC_expression_strings = None
# dictionary to hold the df.Expressions for the external boundary created
# from the above self.dirichletBC_expression_strings. These are
# turned into df.dirichletBC objects by the method update_DirichletBC_dictionary()
self.dirichletBC_dfExpression = dict()
# dictionary to store the df.dirichletBC objects in. These are used by the
# solver to set the boundary conditions
self.outerBC = dict()
# Dictionary that holdes the exact solution. Gets set by self.set_parameters()
# but can remain None if we don't consider a case with an exact solution.
self.exact_solution = None
# Dictionary of df. Expressions holding for each subdomain and phase the
# dolfin expressions generated of self.exact_solution by self._init_initial_values()
self.exact_solution_expr = dict()
# dictionary holding the source expressions for each subdomain. These are
# saved to the respective subdomains by the self._init_subdomains() method.
self.sources = None#
# dictionary holding expressions for the initial conditions for each subdomain.
# These are interpolated and stored to each respective subdomain by
# self._init_initial_values()
self.initial_conditions = None
# dictionary of xdmf files for writing the solution for each
# subdomain. This gets populated by self._init_solution_files()
self.solution_file = None
########## SOLVER DEFINITION AND PARAMETERS ########
# print("\nLinear Algebra Backends:")
# df.list_linear_algebra_backends()
# # print("\nLinear Solver Methods:")
# df.list_linear_solver_methods()
# # print("\nPeconditioners for Krylov Solvers:")
# df.list_krylov_solver_preconditioners()
### Define the linear solver to be used.
self.solver = 'bicgstab' #'gmres'#'bicgstab' # biconjugate gradient stabilized method
self.preconditioner = 'jacobi'#'hypre_amg' #'ilu'#'hypre_amg' # incomplete LU factorization
# dictionary of solver parametrs. This is passed to self._init_subdomains,
# where for each subdomain a sovler object of type self.solver is created
# with these parameters.
self.solver_parameters = {
'nonzero_initial_guess': True,
'absolute_tolerance': 1E-12,
'relative_tolerance': 1E-10,
'maximum_iterations': 1000,
'report': False
}
self.debug_solver = False
# Dictionaries needed by the LDD Solver
# dictionary holding bool variables for each subdomain indicating wether
# minimal error has been achieved, i.e. the calculation can be considered
# done or not.
self._calc_isdone_for_subdomain = dict()
# dictionary holding for each subdomain a dictionary with bool variables
# for each phase indicating wether minimal error has been achieved, i.e.
# the calculation can be considered done or not. This dictionary is necessary,
# because it can happen, that one phase needs severly less iterations than
# the other to achieve the error criterion.
self._calc_isdone_for_phase = dict()
def set_parameters(self,#
output_dir: str,#
subdomain_def_points: tp.List[tp.List[df.Point]],#
# this is actually an array of bools
isRichards: tp.Dict[int, bool],#
interface_def_points: tp.List[tp.List[df.Point]],#
outer_boundary_def_points: tp.Dict[int, tp.Dict[int, tp.List[df.Point]]],#
adjacent_subdomains: tp.List[np.ndarray],#
mesh_resolution: float,#
viscosity: tp.Dict[int, tp.List[float]],#
porosity: tp.Dict[int, tp.Dict[str, float]],#
L: tp.Dict[int, tp.Dict[str, float]],#
lambda_param: tp.Dict[int, tp.Dict[str, float]],#
relative_permeability: tp.Dict[int, tp.Dict[str, tp.Callable[...,None]] ],#
saturation: tp.Dict[int, tp.Callable[...,None]],#
starttime: float,#
timestep_size: tp.Dict[int, float],#
number_of_timesteps: int,#
number_of_timesteps_to_analyse: int,#
sources: tp.Dict[int, tp.Dict[str, str]],#
initial_conditions: tp.Dict[int, tp.Dict[str, str]],#
dirichletBC_expression_strings: tp.Dict[int, tp.Dict[str, str]],#
write2file: tp.Dict[str, bool],#
exact_solution: tp.Dict[int, tp.Dict[str, str]] = None, #
)-> None:
""" set parameters of an instance of class LDDsimulation"""
self.output_dir = output_dir
self.subdomain_def_points = subdomain_def_points
self.isRichards = isRichards
self.mesh_resolution = mesh_resolution
self.interface_def_points = interface_def_points
self.outer_boundary_def_points = outer_boundary_def_points
self.adjacent_subdomains = adjacent_subdomains
self.viscosity = viscosity
self.porosity = porosity
self.L = L
self.lambda_param = lambda_param
self.relative_permeability = relative_permeability
self.saturation = saturation
# simulation start time and time variable
self.starttime = starttime
self.t = starttime
# total number of timesteps.
self.number_of_timesteps = number_of_timesteps
self.number_of_timesteps_to_analyse = number_of_timesteps_to_analyse
self.timestep_size = timestep_size
self.sources = sources
self.initial_conditions = initial_conditions
self.dirichletBC_expression_strings = dirichletBC_expression_strings
self.exact_solution = exact_solution
self.write2file = write2file
self._parameters_set = True
def initialise(self) -> None:
""" initialise LDD simulation
Public method to call all the init methods of the LDD simulation.
"""
if not self._parameters_set:
raise RuntimeError('Parameters note set!\nYou forgott to run self.set_parameters(**kwds)')
self._add_parameters_to_output_dirname()
self._init_analyse_timesteps()
df.info(colored("initialise meshes and marker functions ...\n", "orange"))
self._init_meshes_and_markers()
df.info(colored("initialise interfaces ...\n", "orange"))
self._init_interfaces()
df.info(colored("initialise subdomains ...\n", "orange"))
self._init_subdomains()
df.info(colored("creating xdmf files for each subdomain ...\n", "orange"))
self._init_solution_files()
# initial values get initialised here to be able to call the solver at different
# timesteps without having to call self.run().
df.info(colored("create initial values ...", "orange"))
self._init_initial_values()
df.info(colored("create source Expressions ...", "orange"))
self._init_sources()
# initialise exact solution expression dictionary if we have an exact
# solution case.
self._init_exact_solution_expression()
if self.exact_solution:
df.info(colored("writing exact solution for all time steps to xdmf files ...", "orange"))
self.write_exact_solution_to_xdmf()
self._initialised = True
def run(self, analyse_solver_at_timesteps: tp.List[float] = None):
""" run the simulation
This method implements the time stepping, calls the acutal solvers and writes
solutions to files. It is the main funtion of the simulation class. If you
want to understand this code, start reading here.
PARAMETERS
analyse_solver_at_timesteps #type tp.List[float] List of timesteps at which
to produce output to
analyse the solver at
given timestep. This
tells the function at
which timesteps to
toggle the analyse_timestep
flag of self.LDDsolver.
"""
timer = df.Timer("Simulation time")
# check if simulation has been initialised. If the user calls self.initialise()
# from his/her script, then the time for initialisation is not counted in
# the simulation time.
if not self._initialised:
print("Simulation has not been initialised by the user. \n",
"Initialising simulation now ... ")
self.initialise()
if analyse_solver_at_timesteps is None:
analyse_solver_at_timesteps = self.analyse_timesteps
df.info(colored("Start time stepping","green"))
# write initial values to file.
self.timestep_num = int(0)
# note that at this point of the code self.t = self.starttime as set in
# the beginning.
while self.timestep_num < self.number_of_timesteps:
print(f"entering timestep ",
"{}".format(self.timestep_num),
"at time t = "+ "{number:.{digits}f}".format(number = self.t, digits = 4))
# check if the solver should beanalised or not.
if analyse_solver_at_timesteps is not None and np.isin(self.timestep_num, analyse_solver_at_timesteps, assume_unique=True):
print("analyising timestep ...")
self.LDDsolver(debug = self.debug_solver, analyse_timestep = True)
else:
self.LDDsolver(debug = self.debug_solver, analyse_timestep = False)
self.t += self.timestep_size
self.timestep_num += 1
# write solutions to files.
for subdom_ind, subdomain in self.subdomain.items():
subdomain.write_solution_to_xdmf(#
file = self.solution_file[subdom_ind], #
time = self.t,#
write_iter_for_fixed_time = False,
)
# if we have an exact solution, write out the |u -uh|_L2 to see the
# absolute error.
if subdomain.pressure_exact is not None:
relative_L2_errornorm = dict()
for phase in subdomain.has_phases:
pa_exact = subdomain.pressure_exact[phase]
pa_exact.t = self.t
norm_pa_exact = df.norm(pa_exact, norm_type='L2', mesh=subdomain.mesh)
error_calculated = df.errornorm(pa_exact, subdomain.pressure[phase], 'L2', degree_rise=6)
if norm_pa_exact > self.tol:
relative_L2_errornorm.update(
{phase: error_calculated/norm_pa_exact}
)
else:
relative_L2_errornorm.update(
{phase: error_calculated}
)
errornorm_filename = self.output_dir+self.output_filename_parameter_part[subdom_ind]+\
"_L2_errornorms_over_time" +".csv"
self.write_errornorms_to_csv(
filename = errornorm_filename, #
subdomain_index = subdom_ind,
errors = relative_L2_errornorm,
)
df.info("Finished")
df.info("Elapsed time:"+str(timer.elapsed()[0]))
timer.stop()
# r_cons.close()
# energy.close()
df.list_timings(df.TimingClear.keep, [df.TimingType.wall, df.TimingType.system])
df.dump_timings_to_xml(self.output_dir+"timings.xml",df.TimingClear.keep)
def LDDsolver(self, time: float = None, #
debug: bool = False, #
analyse_timestep: bool = False):
""" calulate the solution on all patches for the next timestep
The heart of the simulation. The function LDDsolver implements the LDD
iteration, that is solves the problem given the initial data at timestep
time.
"""
# in case no time is given, use the time of the class.
if time is None:
time = self.t
error_criterion = 1E-8#min(1E-9, subdomain.mesh.hmax()**2*subdomain.timestep_size)
if analyse_timestep:
# dictionary holding filename strings for each subdomain to store
# iterations in, used to analyse the behaviour of the iterates within
# the current time step.
solution_over_iteration_within_timestep = dict()
subsequent_errors_over_iteration = dict()
# before the iteration gets started all iteration numbers must be nulled
# and the self._calc_isdone_for_subdomain dictionary must be set to False
for ind, subdomain in self.subdomain.items():
# create xdmf files for writing out iterations if analyse_timestep is
# given.
if analyse_timestep:
filename = self.output_dir+self.output_filename_parameter_part[ind]\
+ "solution_iterations_at_t"\
+ "{number:.{digits}f}".format(number=time, digits=2) +".xdmf"
solution_over_iteration_within_timestep.update( #
{ind: SolutionFile(self._mpi_communicator, filename)}
)
subsequent_errors_over_iteration.update( {ind: dict()} )
# # we need a dictionries for each phase.
# for phase in subdomain.has_phases:
# subsequent_errors_over_iteration[ind].update( {phase: dict()} )
#
self._calc_isdone_for_subdomain.update({ind: False})
# reset all interface[has_interface].current_iteration[ind] = 0, for
# index has_interface in subdomain.has_interface
subdomain.null_all_interface_iteration_numbers()
# update the time of the source terms before starting the iteration.
# The sources and sinks are the same throughout the iteration.
# self._eval_sources(time = time,
# subdomain_index = ind)
self.update_DirichletBC_dictionary(time = time, #
subdomain_index = ind,
)
# # this is the beginning of the solver, so iteration must be set
# # to 0.
subdomain.iteration_number = 0
for phase in subdomain.has_phases:
# evaluate the sources to the current time.
subdomain.source[phase].t = time
# set the solution of the previous timestep as new prev_timestep pressure
subdomain.pressure_prev_timestep[phase].assign(
subdomain.pressure[phase]
)
# All isdone flags need to be zero before we start iterating
self._calc_isdone_for_phase[ind].update({phase: False})
### actual iteration starts here
# gobal stopping criterion for the iteration.
iteration = 0
all_subdomains_are_done = False
while iteration <= self._max_iter_num and not all_subdomains_are_done:
# we need to loop over the subdomains and solve an L-scheme type step
# on each subdomain. here it should be possible to parallelise in
# the future.
for sd_index, subdomain in self.subdomain.items():
# check if the calculation on subdomain has already be marked as
# finished.
if not self._calc_isdone_for_subdomain[sd_index]:
# set subdomain iteration to current iteration
subdomain.iteration_number = iteration
if debug:
print(f"On subdomain{sd_index}: Entering iteration {iteration}\n")
print(f"subdomain{sd_index}.isRichards = {subdomain.isRichards}.")
print(f"subdomain{sd_index}.has_phases = {subdomain.has_phases}.")
# solve the problem on subdomain
self.Lsolver_step(subdomain_index = sd_index,#
debug = debug,
)
if iteration > 0:
subsequent_iter_error = dict()
for phase in subdomain.has_phases:
error = df.Function(subdomain.function_space[phase])
error.assign(subdomain.pressure[phase] - subdomain.pressure_prev_iter[phase])
error_calculated = df.norm(error, 'L2')
subsequent_iter_error.update(
{phase: error_calculated}
)
# set flag to stop solving for that phase if error_criterion is met.
if subsequent_iter_error[phase] < error_criterion:
self._calc_isdone_for_phase[sd_index].update({phase: True})
if debug:
print(f"the subsequent error in iteration {iteration} for phase {phase} is {subsequent_iter_error[phase]}")
if analyse_timestep:
# save the calculated L2 errors to csv file for plotting
# with pgfplots.
print(f"t = {self.t} the subsequent error in iteration {iteration} is {subsequent_iter_error[phase]}")
subsequent_error_filename = self.output_dir\
+self.output_filename_parameter_part[sd_index]\
+"subsequent_iteration_errors" +"_at_time"+\
"{number:.{digits}f}".format(number=time, digits=2) +".csv"
self.write_subsequent_errors_to_csv(
filename = subsequent_error_filename, #
subdomain_index = sd_index,
errors = subsequent_iter_error,
time = time)
# this saves the actual difference function between subsequent
# iterations to analyse then in paraview later.
subsequent_errors_over_iteration[sd_index].update(
{ phase: error })
# both phases should be done before we mark the subdomain as
# finished.
print("max(subsequent_iter_error.values()): ", max(subsequent_iter_error.values()))
total_subsequent_iter_error = max(subsequent_iter_error.values())
# check if error criterion has been met
if debug:
print(f" total_subsequent_error in iter {iteration} = {total_subsequent_iter_error }\n")
print(f"the maximum distance between any to points in a cell mesh.hmax() on subdomain{sd_index} is h={subdomain.mesh.hmax()}")
print(f"the minimal distance between any to points in a cell mesh.hmin() on subdomain{sd_index} is h={subdomain.mesh.hmin()}")
print(f"the error criterion is tol = {error_criterion}")
# wetting_done = self._calc_isdone_for_phase[sd_index]['wetting']
# nonwetting_done = self._calc_isdone_for_phase[sd_index]['nonwetting']
if total_subsequent_iter_error < error_criterion:
if debug:
print(f"subdomain{sd_index} reachd error tol = {error_criterion} in iteration {iteration}.")
self._calc_isdone_for_subdomain[sd_index] = True
# prepare next iteration
# write the newly calculated solution to the inteface dictionaries
# for communication
subdomain.write_pressure_to_interfaces()
if analyse_timestep:
subdomain.write_solution_to_xdmf(#
file = solution_over_iteration_within_timestep[sd_index], #
time = time,#
write_iter_for_fixed_time = True,
)#subsequent_errors = subsequent_errors_over_iteration[sd_index])
for phase in subdomain.has_phases:
subdomain.pressure_prev_iter[phase].assign(
subdomain.pressure[phase]
)
else:
# one subdomain is done. Check if all subdomains are done to
# stop the while loop if needed.
# For this, set
all_subdomains_are_done = True
# then check if all domains are done. If not, reset
# all_subdomains_are_done to False.
for subdomain, isdone in self._calc_isdone_for_subdomain.items():
if not isdone:
all_subdomains_are_done = False
#TODO check if maybe I still need to do
# subdomain.iteration_number += 1
# or adapt the calc_gli_term method to reflect that one subdomain
# could be done earlier than others.
# you wouldn't be so stupid as to write an infinite loop, would you?
iteration += 1
# end iteration while loop.
def Lsolver_step(self, subdomain_index: int,#
debug: bool = False) -> None:
""" L-scheme solver iteration step for an object of class subdomain
Lsolver_step implements L-scheme solver iteration step for an subdomain
with index subdomain_index, i.e. for the model form (already in L-scheme
form) stored in the domain patch subdomain.
# parameters
subdomain_index: int subdomain index that defines
which subdomain to perform
the iteration on.
iteration: int iteration number
debug: bool flag to toggle weather or not
to write out each iteration.
"""
subdomain = self.subdomain[subdomain_index]
calc_isdone_for_phase = self._calc_isdone_for_phase[subdomain_index]
iteration = subdomain.iteration_number
# calculate the gli terms on the right hand side and store
subdomain.calc_gli_term()
for phase in subdomain.has_phases:
if not calc_isdone_for_phase[phase]:
# extract L-scheme form and rhs (without gli term) from subdomain.
governing_problem = subdomain.governing_problem(phase = phase)
form = governing_problem['form']
rhs_without_gli = governing_problem['rhs_without_gli']
# assemble the form and rhs
form_assembled = df.assemble(form)
rhs_without_gli_assembled = df.assemble(rhs_without_gli)
# access the assembled gli term for the rhs.
gli_term_assembled = subdomain.gli_assembled[phase]
# subdomain.calc_gli_term() asslembles gli but on the left hand side
# so gli_term_assembled needs to actually be subtracted from the rhs.
if debug and subdomain.mesh.num_cells() < 36:
print("\nSystem before applying outer boundary conditions:")
print(f"phase = {phase}: rhs_without_gli:\n", rhs_without_gli_assembled.get_local())
rhs_without_gli_assembled.set_local(rhs_without_gli_assembled.get_local() - gli_term_assembled)
rhs_assembled = rhs_without_gli_assembled
if debug and subdomain.mesh.num_cells() < 36:
# print(f"phase = {phase}: form_assembled:\n", form_assembled.array())
# print(f"phase = {phase}: rhs_without_gli:\n", rhs_without_gli_assembled.get_local())
print(f"phase = {phase}: gli_term:\n", gli_term_assembled)
print(f"phase = {phase}: rhs_assembled:\n", rhs_assembled.get_local())
# apply outer Dirichlet boundary conditions if present.
if subdomain.outer_boundary is not None:
# apply boundary dirichlet conditions for all outer boundaries.
for index, boundary in enumerate(subdomain.outer_boundary):
self.outerBC[subdomain_index][index][phase].apply(form_assembled, rhs_assembled)
if debug and subdomain.mesh.num_cells() < 36:
print("\nSystem after applying outer boundary conditions:")
# print(f"phase = {phase}: form_assembled:\n", form_assembled.array())
print(f"phase = {phase}: rhs_assembled:\n", rhs_assembled.get_local())
# # set previous iteration as initial guess for the linear solver
# pi_prev = subdomain.pressure_prev_iter[phase].vector().get_local()
# subdomain.pressure[phase].vector().set_local(pi_prev)
if debug and subdomain.mesh.num_cells() < 36:
print("\npressure before solver:\n", subdomain.pressure[phase].vector().get_local())
linear_solver = subdomain.linear_solver
if linear_solver is None:
df.solve(form_assembled, subdomain.pressure[phase].vector(), rhs_assembled, self.solver, self.preconditioner)
else:
linear_solver.solve(form_assembled, subdomain.pressure[phase].vector(), rhs_assembled)
if debug and subdomain.mesh.num_cells() < 36:
print("\npressure after solver:\n", subdomain.pressure[phase].vector().get_local())
else:
print(f"calculation for phase {phase} is already done in timestep = {self.timestep_num}, t = {self.t}. skipping ..." )
pass
def _init_solution_files(self):
""" set up solution files for saving the solution of the LDD scheme for
each subdomain.
"""
# set up solution file names for each subdomain.
self.output_filename_parameter_part = dict()
self.solution_file = dict()
for subdom_ind, subdomain in self.subdomain.items():
tau = subdomain.timestep_size
L = subdomain.L
h = subdomain.mesh.hmax()
if subdomain.isRichards:
Lam = subdomain.lambda_param['wetting']
name1 ="subdomain" + "{}".format(subdom_ind)
# only show three digits of the mesh size.
name2 = "_dt" + "{}".format(tau)+"_hmax" + "{number:.{digits}f}".format(number=h, digits=3)
name3 = "_lambda" + "{}".format(Lam) + "_Lw" + "{}".format(L["wetting"])
filename_param_part = name1 + name2 + name3
filename = self.output_dir + filename_param_part + "_solution" +".xdmf"
else:
Lam_w = subdomain.lambda_param['wetting']
Lam_nw = subdomain.lambda_param['nonwetting']
filename_param_part = "subdomain" + "{}".format(subdom_ind)\
+"_dt" + "{}".format(tau)\
+"_hmax" + "{number:.{digits}f}".format(number=h, digits=3)\
+"_lambda_w" + "{}".format(Lam_w)\
+"_lambda_nw" + "{}".format(Lam_nw)\
+"_Lw" + "{}".format(L["wetting"])\
+"_Lnw" + "{}".format(L["nonwetting"])
filename = self.output_dir + filename_param_part + "_solution" +".xdmf"
# create solution files and populate dictionary.
self.output_filename_parameter_part.update(
{subdom_ind: filename_param_part}
)
self.solution_file.update( #
{subdom_ind: SolutionFile(self._mpi_communicator, filename)}
)
def write_exact_solution_to_xdmf(self):
"""
evaluate the exact solution of the simulation (in case there is one) at
all timesteps and write it to the solution file.
"""
t = copy.copy(self.starttime)
for timestep in range(0,self.number_of_timesteps+1):
for subdom_ind, subdomain in self.subdomain.items():
file = self.solution_file[subdom_ind]
for phase in subdomain.has_phases:
pa_exact = subdomain.pressure_exact[phase]
pa_exact.t = t
tmp_exact_pressure = df.interpolate(pa_exact, subdomain.function_space[phase])
tmp_exact_pressure.rename("exact_pressure_"+"{}".format(phase), "exactpressure_"+"{}".format(phase))
file.write(tmp_exact_pressure, t)
t += self.timestep_size
def write_subsequent_errors_to_csv(self,#
filename: str, #
subdomain_index: int,#
errors: tp.Dict[str, float],#
time: float = None) -> None:
""" write subsequent errors to csv file for plotting with pgfplots"""
# in case no time is given, use the time of the class.
subdomain = self.subdomain[subdomain_index]
if time is None:
time = self.t
subdomain = self.subdomain[subdomain_index]
iteration = subdomain.iteration_number
if subdomain.isRichards:
self.errors = pd.DataFrame({ "iteration": iteration,
#"th. initial mass": df.assemble(rho*df.dx), \
#"energy": self.total_energy(), \
"wetting": errors["wetting"]}, index=[iteration])
else:
self.errors = pd.DataFrame({"iteration":iteration, #\
#"th. initial mass": df.assemble(rho*df.dx), \
#"energy": self.total_energy(), \
"wetting": errors["wetting"],
"nonwetting": errors["nonwetting"]}, index=[iteration])
# if self._rank == 0:
# check if file exists
if os.path.isfile(filename) == True:
with open(filename, 'a') as f:
self.errors.to_csv(f, header=False, sep='\t', encoding='utf-8', index=False)
else:
self.errors.to_csv(filename, sep='\t', encoding='utf-8', index=False)
def write_errornorms_to_csv(self,#
filename: str, #
subdomain_index: int,#
errors: tp.Dict[str, float],#
) -> None:
""" write subsequent errors to csv file for plotting with pgfplots"""
# in case no time is given, use the time of the class.
subdomain = self.subdomain[subdomain_index]
time = self.t
timestep = self.timestep_num
subdomain = self.subdomain[subdomain_index]
# iteration = subdomain.iteration_number
if subdomain.isRichards:
self.errors = pd.DataFrame({ "time": self.t,
"timestep": self.timestep_num,
#"th. initial mass": df.assemble(rho*df.dx), \
#"energy": self.total_energy(), \
"wetting": errors["wetting"]}, index=[timestep])
else:
self.errors = pd.DataFrame({ "time": self.t,
"timestep": self.timestep_num,
"wetting": errors["wetting"],
"nonwetting": errors["nonwetting"]}, index=[timestep])
# if self._rank == 0:
# check if file exists
if os.path.isfile(filename) == True:
with open(filename, 'a') as f:
self.errors.to_csv(f, header=False, sep='\t', encoding='utf-8', index=False)
else:
self.errors.to_csv(filename, sep='\t', encoding='utf-8', index=False)
## Private methods
def _init_meshes_and_markers(self, subdomain_def_points: tp.List[tp.List[df.Point]] = None,#
mesh_resolution: float = None, #
) -> None:
""" Generate meshes and markerfunctions for sudomains.
This method generate meshes and markerfunctions for sudomains and
sets the internal lists of meshes used by the LDDsimulation class.
The following lists and variables are set:
self.mesh_subdomain: List of subdomain meshes. self.mesh_subdomain[0]
is the whole simulation domain, self.mesh_subdomain[i]
is the submesh of subdomain i.
self.domain_marker: Global marker function on self.mesh_subdomain[0]
marking the subdomains by their respective number.
self._number_of_subdomains Internal variable holding the total number of
actual subdomains (global domain not counted).
The method is intended to be used by the self.initialise() method to set
up the simulation but also accepts needed input parameters explicitly for
greater debugging felxibility.
# Parameters
subdomain_def_points #type: tp.List[tp.List[df.Point]] The sub-
domains are passed as list of lists of dolfin
points forming the polygonal boundary of the
subdomains. subdomain_def_points[0] is
to contain the boundary polygon of the whole
domain omega, whereas subdomain_def_points[i]
is supposed to contain the boundary polygone
of subdomain i.
mesh_resolution #type float positive number used by the
mshr mesh generator to influence how coarse
the mesh is. The higher the number, the finer
the mesh.
"""
if not subdomain_def_points:
subdomain_def_points = self.subdomain_def_points
if not mesh_resolution:
mesh_resolution = self.mesh_resolution
### generate global mesh and subdomain meshes. Also count number of subdomains.
# define the global simulation domain polygon first.
domain = mshr.Polygon(subdomain_def_points[0])
for num, subdomain_vertices in enumerate(subdomain_def_points[1:], 1):
subdomain = mshr.Polygon(subdomain_vertices)
domain.set_subdomain(num, subdomain)
# count number of subdomains for further for loops
self._number_of_subdomains = num
# create global mesh. Submeshes still need to be extracted.
mesh = mshr.generate_mesh(domain, mesh_resolution)
if self.write2file['meshes_and_markers']:
filepath = self.output_dir+"global_mesh.pvd"
df.File(filepath) << mesh
filepath = self.output_dir+"global_mesh.xml.gz"
df.File(filepath) << mesh
self.mesh_subdomain.append(mesh)
# define domainmarker on global mesh and set marker values to domain
# number as assigned in the above for loop.
self.domain_marker = df.MeshFunction('size_t', mesh, mesh.topology().dim(), mesh.domains())
# extract subdomain meshes
for num in range(1, self._number_of_subdomains+1):
# if MPI.size(mpi_comm_world()) == 1:
self.mesh_subdomain.append(df.SubMesh(mesh, self.domain_marker, num))
# else:
# self.mesh_subdomain.append(create_submesh(mesh, self.domain_marker, num))
if self.write2file['meshes_and_markers']:
filepath = self.output_dir+"domain_marker.pvd"
df.File(filepath) << self.domain_marker
def _init_interfaces(self, interface_def_points: tp.List[tp.List[df.Point]] = None,#
adjacent_subdomains: tp.List[np.ndarray] = None) -> None:
""" generate interfaces and interface markerfunctions for suddomains and
set the internal lists used by the LDDsimulation class.
This initialises a list of interfaces and global interface marker functions
used by the LDDsimulation class.
The following lists and variables are set:
self.interface_marker global interface marker
self.interface list containing interface objectso of class
Interface.
The method is intended to be used internally by the self.initialise()
method to set up the simulation but also accepts needed input parameters
explicitly for greater debugging felxibility. In case input parameters
are provided, they override the ones set by self.initialise().
# Parameters
interface_def_points #type: tp.List[tp.List[df.Point]] list of
lists of dolfin points containing the internal
interfaces dividing the domain omega into subdomains.
the indices of this list introduce a numbering
of the interfaces which will determin the list
of neighbours attached to each subdomain.
adjacent_subdomains #type: tp.List[tp.array[int]] List of
list of indices such that adjacent_subdomains[i]
gives the indices of the subdomains that share
the interface defined by the dolfin points in
interface_def_points
"""
if not interface_def_points:
interface_def_points = self.interface_def_points
else:
# overwrite what has been set by init()
self.adjacent_subdomains = adjacent_subdomains
if not adjacent_subdomains:
adjacent_subdomains = self.adjacent_subdomains
else:
# overwrite what has been set by init()
self.adjacent_subdomains = adjacent_subdomains
mesh = self.mesh_subdomain[0]
# define global interface marker. This is a mesh function on edges of the mesh.
self.interface_marker = df.MeshFunction('size_t', mesh, mesh.topology().dim()-1)
self.interface_marker.set_all(0)
for num, vertices in enumerate(interface_def_points):
self.interface.append(dp.Interface(vertices=vertices, #
tol=self.tol,#mesh.hmin()/100,
internal=True,#
adjacent_subdomains = adjacent_subdomains[num],#
isRichards = self.isRichards,#
global_index = num,
)
)#
# marker value gets internally set to global_index+1.
# The reason is that we want to mark outer boundaries with the value 0.
self.interface[num].mark(self.interface_marker, self.interface[num].marker_value)
# print("Global interface vertices:")
self.interface[num].init_interface_values(self.interface_marker, self.interface[num].marker_value)
if self.write2file['meshes_and_markers']:
filepath = self.output_dir+"interface_marker_global.pvd"
df.File(filepath) << self.interface_marker
def _init_subdomains(self) -> None:
""" generate dictionary of opjects of class DomainPatch.
This method generates a dictionary of opjects of class DomainPatch, containing
everything that is needed for the LDDsolver. Additionally a linear solver
object is created.
"""
# The list is_richards has the same length as the nuber of subdomains.
# Therefor it is used in the subdomain initialisation loop.
for subdom_num, isR in self.isRichards.items():
interface_list = self._get_interfaces(subdom_num)
# print(f"has_interface list for subdomain {subdom_num}:", interface_list)
self.subdomain.update(#
{subdom_num : dp.DomainPatch(#
subdomain_index = subdom_num,#
isRichards = isR,#
mesh = self.mesh_subdomain[subdom_num],#
viscosity = self.viscosity[subdom_num],#
porosity = self.porosity[subdom_num],#
outer_boundary_def_points = self.outer_boundary_def_points[subdom_num],#
interfaces = self.interface,#
has_interface = interface_list,#
L = self.L[subdom_num],#
lambda_param = self.lambda_param[subdom_num],#
relative_permeability = self.relative_permeability[subdom_num],#
saturation = self.saturation[subdom_num],#
timestep_size = self.timestep_size,#
tol = self.tol)
}
)
# setup the linear solvers and set the solver parameters.
self.subdomain[subdom_num].linear_solver = df.KrylovSolver(self.solver, self.preconditioner)
# we use the previous iteration as an initial guess for the linear solver.
solver_param = self.subdomain[subdom_num].linear_solver.parameters
solver_param['nonzero_initial_guess'] = self.solver_parameters['nonzero_initial_guess']
solver_param['absolute_tolerance'] = self.solver_parameters['absolute_tolerance']
solver_param['relative_tolerance'] = self.solver_parameters['relative_tolerance']
solver_param['maximum_iterations'] = self.solver_parameters['maximum_iterations']
solver_param['report'] = self.solver_parameters['report']
# ## print out set solver parameters
# for parameter, value in self.linear_solver.parameters.items():
# print(f"parameter: {parameter} = {value}")
# df.info(solver_param, True)
# setup self._calc_isdone_for_phase dictionary.
self._calc_isdone_for_phase.update({subdom_num: dict()})
def _get_interfaces(self, subdomain_index: int, #
adjacent_subdomains: tp.List[np.ndarray] = None) -> np.ndarray:
""" determin all (global) indices of interfaces belonging to subdomain
with index subdomain_index.
This method determins all (global) indices of interfaces belonging to
subdomain with index subdomain_index from adjacent_subdomains.
"""
if adjacent_subdomains is None:
adjacent_subdomains = self.adjacent_subdomains
else:
# overwrite what has been set by init()
self.adjacent_subdomains = adjacent_subdomains
interface_of_subdomain =[]
for global_interface_ind, adj_doms in enumerate(adjacent_subdomains):
# interface [i,j] and [j,i] are not counted seperately in adjacent_subdomains
# therefor we don't care wether subdomain_index == adj_doms[0] or
# subdomain_index == adj_doms[1]. if subdomain_index is one of either,
# the interface belongs to the subdomain with this index.
if np.isin(subdomain_index, adj_doms, assume_unique=True):
interface_of_subdomain.append(global_interface_ind)
return interface_of_subdomain
def _init_initial_values(self, interpolation_degree: int = None):
""" set initial values
This method populates the subdomain dictionaries as well as the interface
dictionaries with initial values needed for the LDDsolver iterations.
Additionally, the initial value gets written to the solution file of each
subdomain.
"""
if interpolation_degree is None:
interpolation_degree = self.interpolation_degree
for num, subdomain in self.subdomain.items():
V = subdomain.function_space
# p0 contains both pw_0 and pnw_0 for subdomain[num]
p0 = self.initial_conditions[num]
for phase in subdomain.has_phases:
# note that the default interpolation degree is 2
pa0 = df.Expression(p0[phase], domain = subdomain.mesh, #
degree = interpolation_degree,#
t=self.starttime)
pa0_interpolated = df.interpolate(pa0, V[phase])
# alocate memory
subdomain.pressure.update(
{phase: df.Function(V[phase])}
)
subdomain.pressure_prev_iter.update(
{phase: df.Function(V[phase])}
)
subdomain.pressure_prev_timestep.update(
{phase: df.Function(V[phase])}
)
subdomain.pressure[phase].assign(pa0_interpolated)
subdomain.pressure_prev_iter[phase].assign(pa0_interpolated)
subdomain.pressure_prev_timestep[phase].assign(pa0_interpolated)
# populate interface dictionaries with pressure values.
subdomain.write_pressure_to_interfaces()
subdomain.write_solution_to_xdmf(#
file = self.solution_file[num], #
time = self.starttime,#
write_iter_for_fixed_time = False,
)
def update_DirichletBC_dictionary(self, subdomain_index: int,#
interpolation_degree: int = None, #
time: float = 0,#
):
""" update time of the dirichlet boundary object for subdomain with
index subdomain_index.
In case we don't have a case with an exact solution, we should have 0
expressions in self.dirichletBC_expression_strings, so nothing should happen.
"""
if interpolation_degree is None:
interpolation_degree = self.interpolation_degree
num = subdomain_index
subdomain = self.subdomain[num]
# we only need to set Dirichlet boundary conditions if we have an outer
# boundary.
if subdomain.outer_boundary is not None:
V = subdomain.function_space
# loop over all outer boundary parts. This is a bit of a brainfuck:
# subdomain.outer_boundary gives a dictionary of the outer boundaries of the subdomain.
# Since a domain patch can have several disjoint outer boundary parts, the expressions
# need to get an enumaration index which starts at 0. So subdomain.outer_boundary[j] is
# the dictionary of outer dirichlet conditions of the subdomain and boundary part j for
# the wetting and the nonwetting phase, so subdomain.outer_boundary[j]['wetting'] and
# subdomain.outer_boundary[j]['nonwetting'] return
# the actual expression needed for the dirichlet condition for both phases if present.
boundary_marker = subdomain.outer_boundary_marker
for index, boundary in enumerate(subdomain.outer_boundary):
if np.abs(time-self.starttime) < self.tol:
# Here the dictionary has to be created first because t = 0.
# create a dictionary of dictionaries for each subdomain
self.outerBC.update({num: dict()})
self.dirichletBC_dfExpression.update({num: dict()})
# create a dictionary of dictionaries for each outer boundary
# part of the subdomain.
self.outerBC[num].update({index: dict()})
self.dirichletBC_dfExpression[num].update({index: dict()})
# this contains the Expression strings input by the user.
pDC = self.dirichletBC_expression_strings[num][index]
# as with the interfaces, since numbering of the outer boundary
# parts of each subdomain starts at 0 and subdomain.outer_boundary_marker
# gets set to 0 on all facets of the mesh, we need to up the value for
# an actual outer boundary part by 1.
boundary_marker_value = index+1
for phase in subdomain.has_phases:
# note that the default interpolation degree is 2
if np.abs(time - self.starttime) < self.tol :
# time = 0 so the Dolfin Expression has to be created first.
pDCa = df.Expression(pDC[phase], domain = subdomain.mesh, #
degree = interpolation_degree, #
t=self.starttime)
self.dirichletBC_dfExpression[num][index].update({phase: pDCa})
else:
pDCa = self.dirichletBC_dfExpression[num][index][phase]
pDCa.t = time
self.outerBC[num][index].update(
{phase: df.DirichletBC(V[phase], pDCa, boundary_marker, boundary_marker_value)}
)
else:
# case subdomain.outer_boundary is None
print("subdomain is an inner subdomain and has no outer boundary.\n",
"no Dirichlet boundary has been set.")
def _init_exact_solution_expression(self):
""" initialise dolfin expressions for exact solution and populate
self.exact_solution_expr dictionary. This is used to calculate errornorms
in the case of an exact solution.
"""
if self.exact_solution is not None:
for sd_ind, subdomain in self.subdomain.items():
subdomain.pressure_exact = dict()
for phase in subdomain.has_phases:
print(f"initialising exact solutions for subdomain{subdomain.subdomain_index} and phase {phase}")
pa_exact = self.exact_solution[sd_ind][phase]
subdomain.pressure_exact.update(
{phase: df.Expression(pa_exact, domain = subdomain.mesh, #
degree=self.interpolation_degree, #
t=self.starttime) }
)
else:
pass
def _init_sources(self, interpolation_degree: int = None):
""" create source expressions at self.starttime and populate subdomain
source dictionaries.
"""
if interpolation_degree is None:
interpolation_degree = self.interpolation_degree
for subdomain_index, subdomain in self.subdomain.items():
for phase in subdomain.has_phases:
# here t = 0 and we have to initialise the sources.
f = self.sources[subdomain_index][phase]
subdomain.source.update(
{phase: df.Expression(f, domain = subdomain.mesh,#
degree = interpolation_degree, #
t = self.starttime)}
)
def _add_parameters_to_output_dirname(self):
""" function to create a new output directory for given set of input
parameters and adjust self.output_dir accordingly
"""
dirname = "mesh_res" + "{}_".format(self.mesh_resolution)\
+ "dt" + "{}".format(self.timestep_size) + "/"
self.output_dir += dirname
if not os.path.exists(self.output_dir):
os.mkdir(self.output_dir)
print(f"\nwill save output of this simulation to path {self.output_dir}")
def _init_analyse_timesteps(self):
""" determin timesteps to anayise
by setting
"""
# We want to write out csv files containing subsequent L2 errors of iterations for
# number_of_timesteps_to_analyse many timesteps. We want to do this with
# analyse_timesteps = np.linspace(starttime, number_of_timesteps, number_of_timesteps_to_analyse)
# see below. Therefor, we need to check if (number_of_timesteps_to_analyse-1) is
# a divisor of number_of_timesteps,
if self.number_of_timesteps_to_analyse == 0:
self.analyse_timesteps = None
elif self.number_of_timesteps_to_analyse == 1:
self.analyse_timesteps = [0]
else:
if not self.number_of_timesteps%(self.number_of_timesteps_to_analyse-1) == 0:
# if number_of_timesteps%(number_of_timesteps_to_analyse-1) is not 0, we
# round the division error to the nearest integer and redefine number_of_timesteps
print(f"since number_of_timesteps/number_of_timesteps_to_analyse = {self.number_of_timesteps/self.number_of_timesteps_to_analyse}",
"is not an integer,\n")
new_analysing_interval = int(round(self.number_of_timesteps/(self.number_of_timesteps_to_analyse-1)))
self.number_of_timesteps = new_analysing_interval*(self.number_of_timesteps_to_analyse)
print(f"I'm resetting number_of_timesteps = {self.number_of_timesteps}\n")
self.analyse_timesteps = np.linspace(0, self.number_of_timesteps, self.number_of_timesteps_to_analyse, dtype=int)
print(f"the following timesteps will be analysed: {self.analyse_timesteps}")
self.endtime = self.number_of_timesteps*self.timestep_size
# time_interval = [starttime, Tmax]
print(f"The simulation interval is [{self.starttime}, {self.endtime}]")
# https://github.com/geo-fluid-dynamics/phaseflow-fenics
# adapted from Lucas Ostrowski
class SolutionFile(df.XDMFFile):
"""
This class extends `df.XDMFFile` with some minor changes for convenience.
"""
def __init__(self, mpicomm, filepath):
df.XDMFFile.__init__(self, mpicomm, filepath)
self.parameters["functions_share_mesh"] = True
self.parameters["flush_output"] = True
self.path = filepath # Mimic the file path attribute from a `file` returned by `open`