Select Git revision
domainPatch.py
LDDsimulation.py 86.77 KiB
"""LDD simulation class.
Main class of the LDD simulation. The heart and soul of the code.
"""
import os
import dolfin as df
import numpy as np
import numpy.linalg as la
import typing as tp
import mshr
import domainPatch as dp
import boundary_and_interface as bi
import helpers as hlp
import pandas as pd
import sys
import copy
import multiprocessing as mp
# import errno
import h5py
from termcolor import colored
from solutionFile import SolutionFile
class LDDsimulation(object):
"""Main Class of LDD simulation.
LDDsimulation is the main class for the LDD simulation. It contains methods for
setting up and running the LDD simulation for different meshes and domains.
"""
def __init__(self, dimension: int = 2,#
debug: bool = False,#
tol: float = 1E-14,#
LDDsolver_tol: float = 1E-8,
# maximal number of L-iterations that the LDD solver uses.
max_iter_num: int = 1000,
FEM_Lagrange_degree: int = 1,
mesh_study: bool = False):
hlp.print_once("\n ###############################################\n",
"############# Begin Simulation ################\n",
"###############################################\n")
hlp.print_once("Init LDD simulation...")
# parameter as set in NSAC_simulation by Lukas Ostrowski
hlp.print_once("Set internal fenics parameter...")
# calculation exactness for the whole simulation.
if tol:
self.tol = tol
else:
self.tol = df.DOLFIN_EPS
# dimension of the simulation. This is by default 2 as I don't intend
# to implement 3D.
self.dim = dimension
self.FEM_Lagrange_degree = FEM_Lagrange_degree
#Parameters
# df.parameters["allow_extrapolation"] = True
# df.parameters["refinement_algorithm"] = "plaza_with_parent_facets"
df.parameters["form_compiler"]["quadrature_degree"] = 14
# interpolation degree, for source terms, intitial and boundary conditions.
self.interpolation_degree = 6
# # To be able to run DG in parallel
# df.parameters["ghost_mode"] = "shared_facet"
# df.parameters["ghost_mode"] = "none"
df.parameters["mesh_partitioner"] = "ParMETIS"
# Form compiler options
df.parameters["form_compiler"]["optimize"] = True
df.parameters["form_compiler"]["cpp_optimize"] = True
# df.parameters['std_out_all_processes'] = False
# df.set_log_level(df.LogLevel.WARNING)
df.set_log_level(df.LogLevel.INFO)
# df.set_log_level(df.LogLevel.DEBUG)
# CRITICAL = 50, // errors that may lead to data corruption and suchlike
# ERROR = 40, // things that go boom
# WARNING = 30, // things that may go boom later
# INFO = 20, // information of general interest
# PROGRESS = 16, // what's happening (broadly)
# TRACE = 13, // what's happening (in detail)
# DEBUG = 10, // sundry
hlp.print_once("Set default parameter...\n")
self.restart_file = None
# # write checkpoints
# self.write_checkpoints = True
# # dont project solution to CG for visualization
# self.project_to_cg = False
# # no sponge zones
# self.sponge = False
# set up mpi for internal parallelisation.
self._mpi_rank = df.MPI.rank(df.MPI.comm_world)
self._mpi_communicator = df.MPI.comm_world
## Public variables.
# maximal number of L-iterations that the LDD solver uses.
self._max_iter_num = max_iter_num
# directory to write files output to.
self.output_dir = None
# toggle wether or not we are doing a mesh study. influences the naming
# of the output directory
self.mesh_study = mesh_study
# list of subdomain meshes. Is populated by self.init_meshes_and_markers()
self.mesh_subdomain = []
# global marker for subdomains and interfaces. Is set by self.init_meshes_and_markers()
self.domain_marker = None
# List of interfaces of class Interface. This is populated by self.init_interfaces
self.interface = None
# The global interface marker. Gets initialised by self.init_interfaces().
self.interface_marker = None
# List of list of indices indicating subdomains adjacent to interfaces.
# self.adjacent_subdomains[i] gives the two indices of subdomains which
# are share the interface self.interface[i]. This gets populated by
# self.init_interfaces()
self.adjacent_subdomains = None
## Private variables
# TODO rewrite this with regard to the mesh sizes
# self.calc_tol = self.tol
# list of timesteps that get analyed. Gets initiated by self._init_analyse_timesteps
self.analyse_timesteps = None
# variable to check if self.set_parameters() has been called
self._parameters_set = False
# flag holds true if self.initialise() has been executed.
self._initialised = False
# dictionary of objects of class DomainPatch initialised by self._init_subdomains()
self.subdomain = dict()
# dictionary to hold the input Expression strings for the external boundary. These are
# turned into df.Expressions objects by the method update_DirichletBC_dictionary()
# this is mostly in place when examples with exact solutions are calculated.
self.dirichletBC_expression_strings = None
# dictionary to hold the df.Expressions for the external boundary created
# from the above self.dirichletBC_expression_strings. These are
# turned into df.dirichletBC objects by the method update_DirichletBC_dictionary()
# The dictionary is created by self._init_DirichletBC_dictionary
self.dirichletBC_dfExpression = None
# dictionary to store the df.dirichletBC objects in. These are used by the
# solver to set the boundary conditions. Gets initiated by self._init_DirichletBC_dictionary
self.outerBC = None
# Dictionary that holdes the exact solution. Gets set by self.set_parameters()
# but can remain None if we don't consider a case with an exact solution.
self.exact_solution = None
# Dictionary of df. Expressions holding for each subdomain and phase the
# dolfin expressions generated of self.exact_solution by self._init_initial_values()
self.exact_solution_expr = dict()
# dictionary holding the source expressions for each subdomain. These are
# saved to the respective subdomains by the self._init_subdomains() method.
self.sources = None#
# dictionary holding expressions for the initial conditions for each subdomain.
# These are interpolated and stored to each respective subdomain by
# self._init_initial_values()
self.initial_conditions = None
# dictionary of xdmf files for writing the solution for each
# subdomain. This gets populated by self._init_solution_files()
self.solution_file = None
########## SOLVER DEFINITION AND PARAMETERS ########
# print("\nLinear Algebra Backends:")
# df.list_linear_algebra_backends()
# # # print("\nLinear Solver Methods:")
# df.list_linear_solver_methods()
# print("\n")
# df.list_krylov_solver_methods()
# print("\n")
# # # print("\nPeconditioners for Krylov Solvers:")
# df.list_krylov_solver_preconditioners()
# df.info(df.parameters, True)
self.solver_type_is_Krylov = True
### Define the linear solver to be used.
if self.solver_type_is_Krylov:
self.solver = 'gmres' #'bicgstab'#'superlu' #'gmres'#'bicgstab' # biconjugate gradient stabilized method
self.preconditioner = 'ilu'#'jacobi' #'hypre_amg' 'default' #'ilu'#'hypre_amg' # incomplete LU factorization
# dictionary of solver parametrs. This is passed to self._init_subdomains,
# where for each subdomain a sovler object of type self.solver is created
# with these parameters.
self.solver_parameters = {
'nonzero_initial_guess': True,
'absolute_tolerance': 1E-14,
'relative_tolerance': 1E-11,
'maximum_iterations': 1000,
'report': False,
'monitor_convergence': False
}
else:
self.solver = 'superlu' #'gmres'#'bicgstab' # biconjugate gradient stabilized method
self.preconditioner = 'default' #'hypre_amg' 'default' #'ilu'#'hypre_amg' # incomplete LU factorization
# dictionary of solver parametrs. This is passed to self._init_subdomains,
# where for each subdomain a sovler object of type self.solver is created
# with these parameters.
self.solver_parameters = {
'report' : True,
'symmetric': False,
'verbose': True
}
self.debug_solver = debug
self.LDDsolver_tol = LDDsolver_tol
# Dictionaries needed by the LDD Solver
# dictionary holding bool variables for each subdomain indicating wether
# minimal error has been achieved, i.e. the calculation can be considered
# done or not.
def set_parameters(
self,#
use_case: str,
output_dir: str,#
subdomain_def_points: tp.List[tp.List[df.Point]],#
# this is actually an array of bools
isRichards: tp.Dict[int, bool],#
interface_def_points: tp.List[tp.List[df.Point]],#
outer_boundary_def_points: tp.Dict[int, tp.Dict[int, tp.List[df.Point]]],#
adjacent_subdomains: tp.List[np.ndarray],#
mesh_resolution: float,#
viscosity: tp.Dict[int, tp.List[float]],#
porosity: tp.Dict[int, tp.Dict[str, float]],#
L: tp.Dict[int, tp.Dict[str, float]],#
lambda_param: tp.Dict[int, tp.Dict[str, float]],#
relative_permeability: tp.Dict[int, tp.Dict[str, tp.Callable[...,None]] ],
intrinsic_permeability: tp.Dict[int, float],
saturation: tp.Dict[int, tp.Callable[...,None]],#
starttime: float,#
timestep_size: tp.Dict[int, float],#
number_of_timesteps: int,#
number_of_timesteps_to_analyse: int,#
sources: tp.Dict[int, tp.Dict[str, str]],#
initial_conditions: tp.Dict[int, tp.Dict[str, str]],#
dirichletBC_expression_strings: tp.Dict[int, tp.Dict[str, str]],#
write2file: tp.Dict[str, bool],#
exact_solution: tp.Dict[int, tp.Dict[str, str]] = None, #
densities: tp.Dict[int, tp.Dict[str, float]] = None,#
include_gravity: bool = False,#
gravity_acceleration: float = 9.81,
starttime_timestep_number_shift: int = 0,
plot_timestep_every: int = 1,
)-> None:
""" set parameters of an instance of class LDDsimulation"""
self.use_case = use_case
if include_gravity:
self.use_case = self.use_case + "-with-gravity"
if self.mesh_study:
self.use_case = "mesh study (meshres"+ "{}".format(mesh_resolution) + ") for case: " + self.use_case
self.output_dir = output_dir
self.subdomain_def_points = subdomain_def_points
self.isRichards = isRichards
self.mesh_resolution = mesh_resolution
self.interface_def_points = interface_def_points
self.outer_boundary_def_points = outer_boundary_def_points
self.adjacent_subdomains = adjacent_subdomains
self.viscosity = viscosity
self.porosity = porosity
self.L = L
self.lambda_param = lambda_param
self.relative_permeability = relative_permeability
self.intrinsic_permeability = intrinsic_permeability
self.saturation = saturation
# simulation start time and time variable
self.starttime = starttime
# optional argument specifying what number the initial timestep should
# have. Default is 0, but if a simulation needs to be restarted from
# a later timestep, it is convenient for merging data to shift the
# the numbering of the timestep by the sepcified amount.
self.starttime_timestep_number_shift = starttime_timestep_number_shift
# total number of timesteps.
self.number_of_timesteps = number_of_timesteps
self.number_of_timesteps_to_analyse = number_of_timesteps_to_analyse
# define array of timesteps numbers for which to plot stuff. This
# does not yet contain the shift self.starttime_timestep_number_shift,
# because the nonshifted array is needed to calculate
# self.timesteps_to_plot below.
self.timestep_numbers_to_plot = np.arange(
0,
self.number_of_timesteps,
plot_timestep_every
)
# we are now missing the last timestep.
self.timestep_numbers_to_plot = np.append(
self.timestep_numbers_to_plot,
self.number_of_timesteps
)
hlp.print_once("The following timesteps will be used for plotting:")
# print(self.timestep_numbers_to_plot)
self.timestep_size = timestep_size
# stores the timesteps tn that will be used to write out the solution.
# these are the timesteps corresponding to the timestep numbers in
# self.timestep_numbers_to_plot
self.timesteps_to_plot = self.starttime \
+ self.timestep_size*self.timestep_numbers_to_plot
# now the numbering of the timesteps to plot can safely be shifted by
# the optional argument self.starttime_timestep_number_shift
self.timestep_numbers_to_plot = self.timestep_numbers_to_plot \
+ self.starttime_timestep_number_shift
self.sources = sources
self.initial_conditions = initial_conditions
self.dirichletBC_expression_strings = dirichletBC_expression_strings
self.exact_solution = exact_solution
self.write2file = write2file
# flag to toggle wether or not to include the gravity term in the calculation.
self.include_gravity = include_gravity
self.gravity_acceleration = gravity_acceleration
self.densities = densities
if self.include_gravity and self.densities is None:
raise RuntimeError("include_gravity = True but no densities were given.")
self._parameters_set = True
def initialise(self) -> None:
""" initialise LDD simulation
Public method to call all the init methods of the LDD simulation.
"""
if not self._parameters_set:
raise RuntimeError('Parameters note set!\nYou forgott to run self.set_parameters(**kwds)')
# The number of subdomains are counted by self.init_meshes_and_markers()
self._number_of_subdomains = 0
self._init_analyse_timesteps()
self._add_parameters_to_output_dirname()
df.info(colored("initialise meshes and marker functions ...\n", "yellow"))
self._init_meshes_and_markers()
df.info(colored("initialise interfaces ...\n", "yellow"))
self._init_interfaces()
df.info(colored("initialise subdomains ...\n", "yellow"))
self._init_subdomains()
df.info(colored("creating xdmf files for each subdomain ...\n", "yellow"))
self._init_solution_files()
# initial values get initialised here to be able to call the solver at different
# timesteps without having to call self.run().
df.info(colored("create initial values ...", "yellow"))
self._init_initial_values()
df.info(colored("create source Expressions ...", "yellow"))
self._init_sources()
# initialise exact solution expression dictionary if we have an exact
# solution case.
self._init_exact_solution_expression()
self._init_DirichletBC_dictionary()
self._init_simulation_output()
if self.write2file['solutions']:
df.info(colored("writing exact solutions and/or source terms for all time steps to xdmf files ...", "yellow"))
self.write_exact_solution_to_xdmf()
self._initialised = True
def run(self,
analyse_solver_at_timesteps: tp.List[float] = None,
analyse_condition: bool = False):
""" run the simulation
This method implements the time stepping, calls the acutal solvers and writes
solutions to files. It is the main funtion of the simulation class. If you
want to understand this code, start reading here.
PARAMETERS
analyse_solver_at_timesteps #type tp.List[float] List of timesteps at which
to produce output to
analyse the solver at
given timestep. This
tells the function at
which timesteps to
toggle the analyse_timestep
flag of self.LDDsolver.
"""
timer = df.Timer("Simulation time")
# check if simulation has been initialised. If the user calls self.initialise()
# from his/her script, then the time for initialisation is not counted in
# the simulation time.
if not self._initialised:
print("Simulation has not been initialised by the user. \n",
"Initialising simulation now ... ")
self.initialise()
if analyse_solver_at_timesteps is None:
analyse_solver_at_timesteps = self.analyse_timesteps
df.info(colored("Start time stepping","green"))
# write initial values to file.
self.timestep_num = int(1) + self.starttime_timestep_number_shift
self.t = self.starttime
# note that at this point of the code self.t = self.starttime as set in
# the beginning.
self.max_timestep_num = self.number_of_timesteps\
+ self.starttime_timestep_number_shift
while self.timestep_num <= self.max_timestep_num:
print("LDD simulation use case: {}".format(self.use_case))
print(f"entering timestep ",
"{}".format(self.timestep_num),
"at time t = "+ "{number:.{digits}f}".format(number = self.t, digits = 6))
# self.t is now set to the new target time. self.t will be used
# to update the dirichlet and source terms so that we actually solve
# an iplicit euler.
self.t += self.timestep_size
# check if the solver should be analised or not.
if np.isin(self.timestep_num, analyse_solver_at_timesteps, assume_unique=True):
print("analyising timestep ...")
self.LDDsolver(debug=self.debug_solver,
analyse_timestep=True,
analyse_condition=analyse_condition)
else:
self.LDDsolver(debug=self.debug_solver,
analyse_timestep=False,
analyse_condition=False)
# calculate spacetime errornorms for self.output.
if self.exact_solution is not None:
space_errornorm = self._spacetime_errornorm_calc_step()
else:
space_errornorm = None
# write solutions to files. It is paramount, that self.timestep_num
# be only updated after calling self.write_data_to_disc()
self._write_data_to_disc(space_errornorm)
self.timestep_num += 1
# the L2 space_time errornorm needs to be square rooted.
for subdom_ind in self.isRichards.keys():
subdomain = self.subdomain[subdom_ind]
for phase in subdomain.has_phases:
self.output[subdom_ind]['errornorm'][phase]['L2'] = np.sqrt(self.output[subdom_ind]['errornorm'][phase]['L2'])
print("LDD simulation use case: {}".format(self.use_case))
df.info("Finished calculation")
df.info("Elapsed time:"+str(timer.elapsed()[0]))
timer.stop()
# r_cons.close()
# energy.close()
df.list_timings(df.TimingClear.keep, [df.TimingType.wall, df.TimingType.system])
# timing_table = df.timings(df.TimingClear.keep, [df.TimingType.wall, df.TimingType.user, df.TimingType.system])
# print(timing_table.__get_value__())
# with open(self.output_dir+"timings.txt", "w") as timefile:
# timefile.write(df.timings)
df.dump_timings_to_xml(self.output_dir+"timings.xml", df.TimingClear.keep)
# df.info(colored("start post processing calculations ...\n", "yellow"))
# self.post_processing()
# df.info(colored("finished post processing calculations \nAll right. I'm Done.", "green"))
for subdomain_index, subdomain_output in self.output.items():
print(f"Errornorms on subdomain{subdomain_index}")
for phase, different_errornorms in subdomain_output['errornorm'].items():
print(f"phase: {phase}")
for errortype, error in different_errornorms.items():
print(f"{errortype}: {error}\n")
return self.output
def LDDsolver(self,
debug: bool = False,
analyse_timestep: bool = False,
analyse_condition: bool = False):
""" calulate the solution on all patches for the next timestep
The heart of the simulation. The function LDDsolver implements the LDD
iteration, that solves the problem given the initial data at timestep
time.
PARAMATERS
---------------------
debug bool, optional debug flag used to toggle the amount of
output being displayed.
analyse_timestep bool, optional, flag to toggle wether or not to run
routines designed to output data within one
timestep iteration.
"""
time = self.t
# numpy array holding for each subdomain the maximum of the relative
# error of subsequent iterations of both phases on the subdomain.
# max(subsequent_error) is used as a measure if the solver is done or
# not.
max_subsequent_error = np.zeros(
self._number_of_subdomains, dtype=float
)
# in case analyse_timestep is None,
# solution_over_iteration_within_timestep is None
solution_over_iteration_within_timestep = self.prepare_LDDsolver(
analyse_timestep=analyse_timestep
)
# actual iteration starts here
iteration = 0
# gobal stopping criterion for the iteration.
all_subdomains_are_done = False
while iteration < self._max_iter_num and not all_subdomains_are_done:
iteration += 1
# we need to loop over the subdomains and solve an L-scheme type
# step on each subdomain.
# Here it should be possible to parallelise in
# the future.
for sd_index, subdomain in self.subdomain.items():
# set subdomain iteration to current iteration
max_subsequent_error[sd_index - 1] = self.run_Lsolver_step(
iteration=iteration,
subdomain_index=sd_index,
sol_over_iter_file=solution_over_iteration_within_timestep,
analyse_timestep=analyse_timestep,
analyse_condition=analyse_condition,
debug=debug
)
# end loop over subdomains
# stopping criterion for the solver.
total_subsequent_error = np.amax(max_subsequent_error)
if debug:
print(f"the error criterion is tol = {self.LDDsolver_tol}")
debugstring = f"time = {time} (at timestep " + \
f"{self.timestep_num}):: max subsequent error of both" + \
f"subdomains in iteration {iteration} is: "
print(debugstring, total_subsequent_error)
if total_subsequent_error < self.LDDsolver_tol:
# if debug:
total_error_string = "all phases on all subdomains reached" + \
f"a subsequent error = {total_subsequent_error} < " + \
f"tol = {self.LDDsolver_tol} after " + \
f"{iteration} iterations."
print(total_error_string)
all_subdomains_are_done = True
# end iteration while loop.
def run_Lsolver_step(
self,
iteration: int,
subdomain_index: int,
# processQueue,
sol_over_iter_file: tp.Dict[int, tp.Type[SolutionFile]],
debug: bool = False,
analyse_timestep: bool = False,
analyse_condition: bool = False
):
"""Encapsulate Lsolver_step.
This encapsulates a part of code originally part of the above
self.LDDsolver to make it work with multiprocessing.
"""
time = self.t
subdomain = self.subdomain[subdomain_index]
subdomain.iteration_number = iteration
sd_index = subdomain_index
# solve the problem on subdomain
condition_nums = self.Lsolver_step(
subdomain_index=sd_index,
debug=debug,
analyse_condition=analyse_condition
)
# bypass subsequent error calculation for the first iteration, becauses
# it is zero anyways.
# if iteration >= 1:
subsequent_iter_error = dict()
if debug:
print("LDD simulation use case: {}".format(self.use_case))
for phase in subdomain.has_phases:
error = df.Function(subdomain.function_space["pressure"][phase])
error.assign(
subdomain.pressure[phase] - subdomain.pressure_prev_iter[phase]
)
error_calculated = df.norm(error, 'L2')
subsequent_iter_error.update(
{phase: error_calculated}
)
if debug:
debugstring = "time = {} (at timestep".format(time) +\
"{}): subsequent ".format(self.timestep_num) +\
"error on subdomain {} ".format(sd_index) +\
"for {} phase in ".format(phase) +\
"iteration {} = ".format(iteration)
print(debugstring, subsequent_iter_error[phase])
if analyse_timestep:
# save the calculated L2 errors to csv file for plotting
# with pgfplots
if self.write2file['subsequent_errors']:
subsequent_error_filename = self.output_dir\
+ self.output_filename_parameter_part[sd_index]\
+ "subsequent_iteration_errors" + "_at_timestep"\
+ "{number}".format(number=self.timestep_num) + ".csv"
# "{number:.{digits}f}".format(number=time, digits=4)
self.write_subsequent_errors_to_csv(
filename=subsequent_error_filename,
subdomain_index=sd_index,
errors=subsequent_iter_error
)
if analyse_condition and self.write2file['condition_numbers']:
# save the calculated condition numbers of the assembled
# matrices a separate file for monitoring
condition_number_filename = self.output_dir\
+ self.output_filename_parameter_part[sd_index]\
+ "condition_numbers" + "_at_timestep" \
+ "{number}".format(number=self.timestep_num) + ".csv"
self.write_condition_numbers_to_csv(
filename=condition_number_filename,
subdomain_index=sd_index,
condition_number=condition_nums
)
# prepare next iteration
# write the newly calculated solution to the interface dictionaries
# of the subdomain for communication.
subdomain.write_pressure_to_interfaces()
if analyse_timestep and self.write2file['L_iterations_per_timestep']:
subdomain.write_solution_to_xdmf(
file=sol_over_iter_file[sd_index],
# time has been set to the target timestep by the solver.
# the timeste pat wich we are currently is one timestep_size
# less than that.
time=time-self.timestep_size,
write_iter_for_fixed_time=True,
)
for phase in subdomain.has_phases:
subdomain.pressure_prev_iter[phase].assign(
subdomain.pressure[phase]
)
# calculate the maximum over phase of subsequent errors for the
# stopping criterion.
# max_subsequent_error[sd_index-1] = max(subsequent_iter_error.values())
# processQueue.put(max(subsequent_iter_error.values()))
return max(subsequent_iter_error.values())
def prepare_LDDsolver(
self,
time: float = None,
debug: bool = False,
analyse_timestep: bool = False) -> tp.Dict[int, tp.Type[SolutionFile]]:
"""Initialise LDD iteration for current time time.
This method runs for each subdomain self.prepare_subdomain
PARAMETERS
------------------------------------
time float, optional time at which to run the solver. This is
internally set to self.t if no time is given.
This variable is there for development
purposes.
debug bool, optional debug flag used to toggle the amount of
output being displayed.
analyse_timestep bool, optional, flag to toggle wether or not to run
routines designed to output data within one
timestep iteration.
OUTPUT
------------------------------------
analysing_solution_file_dict tp.Dict[ind, SolutionFile] dictionary
containing the solution file for analysing the
timestep if analyse_timestep == True, else None.
"""
# in case no time is given, use the time of the class.
if time is None:
# remember: this has been set to the target timestep by the solver.
# the timestep we are calculating the solution for.
time = self.t
if analyse_timestep:
# dictionary holding filename strings for each subdomain to store
# iterations in, used to analyse the behaviour of the iterates
# within the current time step.
solution_over_iteration_within_timestep = dict()
else:
solution_over_iteration_within_timestep = None
# before the iteration gets started all iteration numbers must be
# nulled
for subdomain_index in self.subdomain.keys():
# prepare_subdomain = mp.Process(
# target=self.prepare_subdomain,
# args=(
# subdomain_index,
# solution_over_iteration_within_timestep,
# debug,
# analyse_timestep,
# )
# )
# prepare_subdomain.start()
# prepare_subdomain.join()
self.prepare_subdomain(
subdomain_index=subdomain_index,
sol_over_iter_file=solution_over_iteration_within_timestep,
debug=debug,
analyse_timestep=analyse_timestep,
)
return solution_over_iteration_within_timestep
def prepare_subdomain(
self,
subdomain_index: int,
sol_over_iter_file,
debug: bool = False,
analyse_timestep: bool = False) -> tp.Dict[int, tp.Type[SolutionFile]]:
"""Initialise LDD iteration for current time time and subdomain.
This method executes a few prelininary steps on the subdomain
with index subdomain_index before actually starting
the L-iteration. Namely,
- create solution files for analysing time step if
analyse_timestep == True
- Dirichlet boundary condition terms are evaluated to the current time
- all subdomain iteration numbers on interfaces are set to 0.
- all subdomain iteration numbers subdomain.iteration_number are set
to 0.
- source terms are evaluated to the current time.
- the calculated solution pressure from the previous timestep
is assigned to pressure_prev for each subdomain.
- gl0 terms are calculated.
- write pressure to previous iteration dictionaries on all interfaces.
PARAMETERS
------------------------------------
subdomain_index int Index of the subdomain that is prepared by
the method.
sol_over_iter_file tp.Dict[None], eiter None or empty dictionary to
store the file object for saving iterations
over time.
debug bool, optional debug flag used to toggle the amount of
output being displayed.
analyse_timestep bool, optional, flag to toggle wether or not to run
routines designed to output data within one
timestep iteration.
OUTPUT
------------------------------------
analysing_solution_file_dict tp.Dict[ind, SolutionFile] dictionary
containing the solution file for analysing
the timestep if analyse_timestep == True,
else None.
"""
# the timestep we are calculating the solution for.
time = self.t
subdomain = self.subdomain[subdomain_index]
solution_over_iteration_within_timestep = sol_over_iter_file
# create xdmf files for writing out iterations if analyse_timestep
# is given.
if analyse_timestep:
filename = self.output_dir + \
self.output_filename_parameter_part[subdomain_index] \
+ "solution_iterations_at_timestep" \
+ "{number}".format(number=self.timestep_num) + ".xdmf"
solution_over_iteration_within_timestep.update(
{subdomain_index: SolutionFile(
self._mpi_communicator, filename
)}
)
# reset all interface[has_interface].current_iteration[ind] = 0,
# for index has_interface in subdomain.has_interface
if self.interface_def_points is not None:
subdomain.null_all_interface_iteration_numbers()
if subdomain.outer_boundary is not None:
self.update_DirichletBC_dictionary(time=time,
subdomain_index=subdomain_index,
)
# this is the beginning of the solver, so iteration must be set
# to 0.
subdomain.iteration_number = 0
for phase in subdomain.has_phases:
# evaluate the sources to the current time.
subdomain.source[phase].t = time
# set the solution of the previous timestep as new prev_timestep
# pressure
if debug:
debugstr1 = f"subdomain{subdomain.subdomain_index}." + \
f"pressure_prev_timestep[{phase}].vector()[:]" + " =\n" + \
f"{subdomain.pressure_prev_timestep[phase].vector()[:]}"
print(debugstr1)
debugstr2 = f"subdomain{subdomain.subdomain_index}." + \
f"pressure[{phase}].vector()[:]" + " =\n" + \
f"{subdomain.pressure[phase].vector()[:]}"
print(debugstr2)
subdomain.pressure_prev_timestep[phase].assign(
subdomain.pressure[phase]
)
if debug:
debugstr3 = f"id(subdomain{subdomain.subdomain_index}." + \
f"pressure[{phase}])"
debugstr4 = " =\n" + f"{id(subdomain.pressure[phase])}"
print(colored(debugstr3, "red"), debugstr4)
debugstr5 = f"subdomain{subdomain.subdomain_index}." + \
f"pressure_prev_timestep[{phase}].vector()[:]" + " =\n" + \
f"{subdomain.pressure_prev_timestep[phase].vector()[:]}"
print(debugstr5)
# we need to update the previous pressure dictionaries of all
# interfaces of the subdomain with the current solution, so that the
# subdomain.calc_gli_term method works properly.
# The current pressure dictionaries should already be populated with
# the current solution, sinces the solver writes the pressure
# to interfaces after each iteration.
if self.interface_def_points is not None:
subdomain.write_pressure_to_interfaces()
subdomain.write_pressure_to_interfaces(previous_iter=True)
subdomain.calc_gl0_term()
def Lsolver_step(self, subdomain_index: int,
analyse_condition: bool = False,
debug: bool = False) -> tp.Dict[str, float]:
"""L-scheme solver iteration step for an object of class subdomain.
Lsolver_step implements L-scheme solver iteration step for an subdomain
with index subdomain_index, i.e. for the model form (already in
L-scheme form) stored in the domain patch subdomain.
# parameters
subdomain_index: int subdomain index that defines
which subdomain to perform
the iteration on.
iteration: int iteration number
debug: bool flag to toggle weather or not
to write out each iteration.
"""
subdomain = self.subdomain[subdomain_index]
condition_number = None
if analyse_condition:
condition_number = dict()
for phase in subdomain.has_phases:
# extract L-scheme form and rhs (without gli term) from subdomain.
governing_problem = subdomain.governing_problem(phase=phase)
form = governing_problem['form']
rhs = governing_problem['rhs']
# assemble the form and rhs
form_assembled = df.assemble(form)
rhs_assembled = df.assemble(rhs)
if analyse_condition:
condition_number.update(
{phase: la.cond(form_assembled.array())}
)
# apply outer Dirichlet boundary conditions if present.
if subdomain.outer_boundary is not None:
# apply boundary dirichlet conditions for all outer boundaries.
for index, boundary in enumerate(subdomain.outer_boundary):
self.outerBC[subdomain_index][index][phase].apply(
form_assembled, rhs_assembled
)
linear_solver = subdomain.linear_solver
if linear_solver is None:
df.solve(form_assembled,
subdomain.pressure[phase].vector(),
rhs_assembled, self.solver,
self.preconditioner
)
else:
linear_solver.solve(form_assembled,
subdomain.pressure[phase].vector(),
rhs_assembled
)
return condition_number
def _spacetime_errornorm_calc_step(self)-> tp.Dict[int, tp.Dict[str, float]]:
"""one step in integrating the spacetime errornorm
OUTPUT
error_out tp.Dict[int, tp.Dict[str, float]] dictionary containing
the L2-errornorms in space for the timestep.
this gets passed on to self.write_data_to_disc()
"""
error_out = dict()
for subdom_ind in self.isRichards.keys():
error_out.update({subdom_ind: dict()})
subdomain = self.subdomain[subdom_ind]
# if we have an exact solution, write out several errors and
# errornorms to be used later in paraview or latex plots.
if subdomain.pressure_exact is not None:
pressure_exact = dict()
for phase in subdomain.has_phases:
pa_exact = subdomain.pressure_exact[phase]
pa_exact.t = self.t
error_calculated = df.errornorm(pa_exact, subdomain.pressure[phase], 'L2', degree_rise=4)
self.output[subdom_ind]['errornorm'][phase]['L1'] += self.timestep_size*error_calculated
self.output[subdom_ind]['errornorm'][phase]['L2'] += self.timestep_size*error_calculated**2
# print(f"Linf error on subdomain {subdom_ind} and phase {phase} before checking: {self.output[subdom_ind]['errornorm'][phase]['Linf']}")
if error_calculated > self.output[subdom_ind]['errornorm'][phase]['Linf']:
self.output[subdom_ind]['errornorm'][phase].update(
{'Linf': error_calculated}
)
# print(f"Linf error on subdomain {subdom_ind} and phase {phase} after checking: {self.output[subdom_ind]['errornorm'][phase]['Linf']}")
# if we are at a timestep at which to write shit out,
# calculate the relative errornorm
if np.isin(self.timestep_num, self.timestep_numbers_to_plot, assume_unique=True):
norm_pa_exact = df.norm(pa_exact, norm_type='L2', mesh=subdomain.mesh)
if norm_pa_exact > self.tol:
error_out[subdom_ind].update(
{phase: error_calculated/norm_pa_exact}
)
else:
error_out[subdom_ind].update(
{phase: error_calculated}
)
else:
error_out[subdom_ind].update({phase: None})
else:
# error_out.update({subdom_ind: None})
error_out.update({subdom_ind: None})
return error_out
def _write_data_to_disc(self, space_errornorm: tp.Dict[int, tp.Dict[str, float]]):
"""function that takes care of writing out different data to disc after
a timestep done by the self.run() function. What is written out at which
timesteps, is determined by by self.write2file aswell as
self.timestep_numbers_to_plot.
INPUT
space_errornorm tp.Dict[int, tp.Dict[str, float]]) dictionary of
L2 space errornorms calculated by
self._spacetime_errornorm_calc_step()
"""
# we only write data of timesteps to disc, that are stored in
# self.timestep_numbers_to_plot.
if np.isin(self.timestep_num, self.timestep_numbers_to_plot, assume_unique=True):
# We need to determin the corresponding
# timestep value in self.timesteps_to_plot to write data out at the
# same values for timestep values tn. This is necessary, because we
# used self.timesteps_to_plot to plot the exact solutions.
# Counting up the time via self.t += self.timestep_size in the timesteping
# loop, yields to slightly other values for t due to rounding errors. sigh.
plot_index = np.argwhere(self.timestep_numbers_to_plot==self.timestep_num)[0][0]
plot_t = self.timesteps_to_plot[plot_index]
for subdom_ind in self.isRichards.keys():
subdomain = self.subdomain[subdom_ind]
# write out solution, if self.write2file['solutions'] flag says so.
if self.write2file['solutions']:
subdomain.write_solution_to_xdmf(#
file = self.solution_file[subdom_ind], #
time = plot_t,#
write_iter_for_fixed_time = False,
)
# if we have an exact solution, write out several errors and
# errornorms to be used later in paraview or latex plots.
if subdomain.pressure_exact is not None:
# write out the absolute differences for convenience in paraview
if self.write2file['absolute_differences']:
pressure_exact = dict()
for phase in subdomain.has_phases:
pa_exact = subdomain.pressure_exact[phase]
pa_exact.t = plot_t
pressure_exact.update(
{phase: df.interpolate(pa_exact, subdomain.function_space["pressure"][phase])}
)
absolute_difference = df.Function(subdomain.function_space["pressure"][phase])
pressure_difference = pressure_exact[phase].vector()[:] - subdomain.pressure[phase].vector()[:]
abs_diff_tmp = np.fabs(pressure_difference)
absolute_difference.vector()[:] = abs_diff_tmp
if phase == "wetting":
data_set_name="abs_diff_w"
else:
data_set_name="abs_diff_nw"
self.write_function_to_xdmf(#
function=absolute_difference,
file=self.solution_file[subdom_ind], #
time=plot_t,#
data_set_name=data_set_name
)
if self.write2file['space_errornorms']:
relative_L2_errornorm = space_errornorm[subdom_ind]
errornorm_filename = self.output_dir+self.output_filename_parameter_part[subdom_ind]+\
"_L2_errornorms_over_time" +".csv"
self.write_errornorms_to_csv(
filename = errornorm_filename, #
subdomain_index = subdom_ind,
errors = relative_L2_errornorm,
time=plot_t,
timestep_num=self.timestep_num
)
else:
pass
pass
else:
# in this case we are at a timestep that is not bound to be written
# to disc.
pass
def _init_solution_files(self):
""" set up solution files for saving the solution of the LDD scheme for
each subdomain.
"""
# set up solution file names for each subdomain.
self.output_filename_parameter_part = dict()
self.solution_file = dict()
self.postprocessed_solution_file = dict()
for subdom_ind, subdomain in self.subdomain.items():
tau = subdomain.timestep_size
L = subdomain.L
h = subdomain.mesh_size
if subdomain.isRichards:
name1 ="subdomain" + "{}".format(subdom_ind)
# only show three digits of the mesh size.
name2 = "_dt" + "{}".format(tau)\
+"_hmax" + "{number:.{digits}f}".format(number=h, digits=3)\
+ "_gravity_" + "{}".format(self.include_gravity)
name3 = "_Lw" + "{}".format(L["wetting"])
filename_param_part = name1 + name2 + name3
else:
filename_param_part = "subdomain" + "{}".format(subdom_ind)\
+"_dt" + "{}".format(tau)\
+"_hmax" + "{number:.{digits}f}".format(number=h, digits=3)\
+ "_gravity_" + "{}".format(self.include_gravity)\
+"_Lw" + "{}".format(L["wetting"])\
+"_Lnw" + "{}".format(L["nonwetting"]
)
filename = self.output_dir + filename_param_part + "_solution" +".xdmf"
post_filename = self.output_dir + filename_param_part + "_solution_post" +".xdmf"
# create solution files and populate dictionary.
self.output_filename_parameter_part.update(
{subdom_ind: filename_param_part}
)
self.solution_file.update( #
{subdom_ind: SolutionFile(self._mpi_communicator, filename)}
)
# self.postprocessed_solution_file.update(
# {subdom_ind: SolutionFile(self._mpi_communicator, post_filename)}
# )
def write_exact_solution_to_xdmf(self):
"""
evaluate the exact solution of the simulation (in case there is one) at
all timesteps and write it to the solution file.
In addition write calculate and write out the saturation as well as the
source terms in order to montior the examples
"""
# print(f" solution will be printed at times t = {self.timesteps_to_plot}\n")
for subdom_ind, subdomain in self.subdomain.items():
file = self.solution_file[subdom_ind]
for timestep in self.timesteps_to_plot:
source = dict()
if self.exact_solution:
exact_pressure = dict()
S = self.saturation[subdom_ind]
saturation_w = df.Function(
subdomain.function_space["pressure"]['wetting']
)
saturation_nw = df.Function(
subdomain.function_space["pressure"]['wetting']
)
for phase in subdomain.has_phases:
f_expr = subdomain.source[phase]
f_expr.t = timestep
source.update(
{phase: df.project(
f_expr, subdomain.function_space["pressure"][phase]
)
}
)
source[phase].rename(
"source_"+"{}".format(phase),
"source_"+"{}".format(phase)
)
file.write(source[phase], timestep)
if self.exact_solution:
pa_exact = subdomain.pressure_exact[phase]
pa_exact.t = timestep
exact_pressure.update(
{phase: df.interpolate(
pa_exact,
subdomain.function_space["pressure"][phase]
)}
)
exact_pressure[phase].rename(
"exact_pressure_"+"{}".format(phase),
"exact_pressure_"+"{}".format(phase)
)
file.write(exact_pressure[phase], timestep)
if self.exact_solution:
exact_capillary_pressure = df.Function(
subdomain.function_space["pressure"]['wetting']
)
if subdomain.isRichards:
exact_capillary_pressure.assign(
-exact_pressure['wetting']
)
else:
pc_temp = \
exact_pressure["nonwetting"].vector().get_local()\
- exact_pressure["wetting"].vector().get_local()
exact_capillary_pressure.vector().set_local(pc_temp)
exact_capillary_pressure.rename("pc_exact", "pc_exact")
file.write(exact_capillary_pressure, timestep)
saturation_w = df.project(
S(exact_capillary_pressure),
subdomain.function_space["pressure"]["wetting"]
)
# saturation_w.assign(Sat_w)
saturation_w.rename("Sw_exact", "Sw_exact")
file.write(saturation_w, timestep)
# S_nw = 1-S(exact_capillary_pressure).vector().get_local()
saturation_nw = df.project(
1-S(exact_capillary_pressure),
subdomain.function_space["pressure"]["wetting"]
)
# saturation_nw.assign(S_nw)
saturation_nw.rename("Snw_exact", "Snw_exact")
file.write(saturation_nw, timestep)
def write_subsequent_errors_to_csv(self,#
filename: str, #
subdomain_index: int,#
errors: tp.Dict[str, float],#
) -> None:
""" write subsequent errors to csv file for plotting with pgfplots"""
subdomain = self.subdomain[subdomain_index]
iteration = subdomain.iteration_number
if subdomain.isRichards:
self.errors = pd.DataFrame({ "iteration": iteration,
#"th. initial mass": df.assemble(rho*df.dx), \
#"energy": self.total_energy(), \
"wetting": errors["wetting"]}, index=[iteration])
else:
self.errors = pd.DataFrame({"iteration":iteration, #\
#"th. initial mass": df.assemble(rho*df.dx), \
#"energy": self.total_energy(), \
"wetting": errors["wetting"],
"nonwetting": errors["nonwetting"]}, index=[iteration])
# if self._rank == 0:
# check if file exists
if os.path.isfile(filename) == True:
with open(filename, 'a') as f:
self.errors.to_csv(f, header=False, sep='\t', encoding='utf-8', index=False)
else:
self.errors.to_csv(filename, sep='\t', encoding='utf-8', index=False)
def write_condition_numbers_to_csv(self,#
filename: str, #
subdomain_index: int,#
condition_number: tp.Dict[str, float],#
) -> None:
""" write subsequent errors to csv file for plotting with pgfplots"""
subdomain = self.subdomain[subdomain_index]
iteration = subdomain.iteration_number
if subdomain.isRichards:
condition = pd.DataFrame({ "iteration": iteration,
#"th. initial mass": df.assemble(rho*df.dx), \
#"energy": self.total_energy(), \
"wetting": condition_number["wetting"]}, index=[iteration])
else:
condition = pd.DataFrame({"iteration":iteration, #\
#"th. initial mass": df.assemble(rho*df.dx), \
#"energy": self.total_energy(), \
"wetting": condition_number["wetting"],
"nonwetting": condition_number["nonwetting"]}, index=[iteration])
# if self._rank == 0:
# check if file exists
if os.path.isfile(filename) == True:
with open(filename, 'a') as f:
condition.to_csv(f, header=False, sep='\t', encoding='utf-8', index=False)
else:
condition.to_csv(filename, sep='\t', encoding='utf-8', index=False)
def write_errornorms_to_csv(self,#
filename: str, #
subdomain_index: int,#
errors: tp.Dict[str, float],#
time: float,
timestep_num: int
) -> None:
""" write subsequent errors to csv file for plotting with pgfplots"""
subdomain = self.subdomain[subdomain_index]
if subdomain.isRichards:
self.errors = pd.DataFrame({ "time": time,
"timestep": timestep_num,
#"th. initial mass": df.assemble(rho*df.dx), \
#"energy": self.total_energy(), \
"wetting": errors["wetting"]}, index=[timestep_num])
else:
self.errors = pd.DataFrame({ "time": time,
"timestep": timestep_num,
"wetting": errors["wetting"],
"nonwetting": errors["nonwetting"]}, index=[timestep_num])
# if self._rank == 0:
# check if file exists
if os.path.isfile(filename) == True:
with open(filename, 'a') as f:
self.errors.to_csv(f, header=False, sep='\t', encoding='utf-8', index=False)
else:
self.errors.to_csv(filename, sep='\t', encoding='utf-8', index=False)
def write_function_to_xdmf(self,
function: df.Function,
file: tp.Type[SolutionFile], #
data_set_name: str,
time: float = None,#
):
"""
Write function to disk file file. It is expected, that file is a df.XDMFFile
PARAMETERS
function df.Function dolfin function containing the
information to write to the file.
file str File name of df.XDMFFile to write to
data_set_name str name of the data set
time float time at which to write the solution.
"""
function.rename("{}".format(data_set_name), "{}".format(data_set_name))
file.write(function, time)
## Private methods
def _init_meshes_and_markers(self, subdomain_def_points: tp.List[tp.List[df.Point]] = None,#
mesh_resolution: float = None, #
) -> None:
""" Generate meshes and markerfunctions for sudomains.
This method generate meshes and markerfunctions for sudomains and
sets the internal lists of meshes used by the LDDsimulation class.
The following lists and variables are set:
self.mesh_subdomain: List of subdomain meshes. self.mesh_subdomain[0]
is the whole simulation domain, self.mesh_subdomain[i]
is the submesh of subdomain i.
self.domain_marker: Global marker function on self.mesh_subdomain[0]
marking the subdomains by their respective number.
self._number_of_subdomains Internal variable holding the total number of
actual subdomains (global domain not counted).
The method is intended to be used by the self.initialise() method to set
up the simulation but also accepts needed input parameters explicitly for
greater debugging felxibility.
# Parameters
subdomain_def_points #type: tp.List[tp.List[df.Point]] The sub-
domains are passed as list of lists of dolfin
points forming the polygonal boundary of the
subdomains. subdomain_def_points[0] is
to contain the boundary polygon of the whole
domain omega, whereas subdomain_def_points[i]
is supposed to contain the boundary polygone
of subdomain i.
mesh_resolution #type float positive number used by the
mshr mesh generator to influence how coarse
the mesh is. The higher the number, the finer
the mesh.
"""
if not subdomain_def_points:
subdomain_def_points = self.subdomain_def_points
if not mesh_resolution:
mesh_resolution = self.mesh_resolution
### generate global mesh and subdomain meshes. Also count number of subdomains.
# define the global simulation domain polygon first.
domain = mshr.Polygon(subdomain_def_points[0])
if self.interface_def_points is None:
# in this case we don't have any interface and no domain decomposi-
# tion going on.
self._number_of_subdomains = 1
else:
for num, subdomain_vertices in enumerate(subdomain_def_points[1:], 1):
subdomain = mshr.Polygon(subdomain_vertices)
domain.set_subdomain(num, subdomain)
# count number of subdomains for further for loops
self._number_of_subdomains = num
# create global mesh. Submeshes still need to be extracted.
mesh = mshr.generate_mesh(domain, mesh_resolution)
if self.write2file['meshes_and_markers']:
filepath = self.output_dir+f"global_mesh_meshrez{mesh_resolution}.pvd"
df.File(filepath) << mesh
filepath = self.output_dir+f"global_mesh_meshrez{mesh_resolution}.xml.gz"
df.File(filepath) << mesh
self.mesh_subdomain.append(mesh)
# define domainmarker on global mesh and set marker values to domain
# number as assigned in the above for loop.
self.domain_marker = df.MeshFunction('size_t', mesh, mesh.topology().dim(), mesh.domains())
if self.interface_def_points is None:
self.domain_marker.set_all(0)
else:
# extract subdomain meshes
for num in range(1, self._number_of_subdomains+1):
# if MPI.size(mpi_comm_world()) == 1:
self.mesh_subdomain.append(df.SubMesh(mesh, self.domain_marker, num))
# else:
# self.mesh_subdomain.append(create_submesh(mesh, self.domain_marker, num))
if self.write2file['meshes_and_markers']:
filepath = self.output_dir+f"domain_marker_meshrez{mesh_resolution}.pvd"
df.File(filepath) << self.domain_marker
def _init_interfaces(self, interface_def_points: tp.List[tp.List[df.Point]] = None,#
adjacent_subdomains: tp.List[np.ndarray] = None) -> None:
""" generate interface objects and interface marker functions for suddomains
used by the LDDsimulation class.
This initialises a list of interfaces and global interface marker functions
used by the LDDsimulation class.
The following lists and variables are set:
self.interface_marker global interface marker
self.interface list containing interface objectso of class
Interface.
The method is intended to be used internally by the self.initialise()
method to set up the simulation but also accepts needed input parameters
explicitly for greater debugging felxibility. In case input parameters
are provided, they override the ones set by self.initialise().
# Parameters
interface_def_points #type: tp.List[tp.List[df.Point]] list of
lists of dolfin points containing the internal
interfaces dividing the domain omega into subdomains.
the indices of this list introduce a numbering
of the interfaces which will determin the list
of neighbours attached to each subdomain.
adjacent_subdomains #type: tp.List[tp.array[int]] List of
list of indices such that adjacent_subdomains[i]
gives the indices of the subdomains that share
the interface defined by the dolfin points in
interface_def_points
"""
if interface_def_points is None:
interface_def_points = self.interface_def_points
else:
# overwrite what has been set by init()
self.adjacent_subdomains = adjacent_subdomains
if not adjacent_subdomains:
adjacent_subdomains = self.adjacent_subdomains
else:
# overwrite what has been set by init()
self.adjacent_subdomains = adjacent_subdomains
mesh = self.mesh_subdomain[0]
# define global interface marker. This is a mesh function on edges of the mesh.
self.interface_marker = df.MeshFunction('size_t', mesh, mesh.topology().dim()-1)
self.interface_marker.set_all(0)
# if self.interface_def_points is None, i.e. we have no interface we do
# nothing, since self.init() defines self.interface = None
if self.interface_def_points is not None:
self.interface = []
for glob_interface_num, vertices in enumerate(interface_def_points):
self.interface.append(
bi.Interface(
vertices=vertices, #
tol=self.tol,#mesh.hmin()/100,
internal=True,#
adjacent_subdomains = adjacent_subdomains[glob_interface_num],#
isRichards = self.isRichards,#
global_index = glob_interface_num,
lambda_param = self.lambda_param[glob_interface_num]
)
)#
# marker value gets internally set to global_index+1.
# The reason is that we want to mark outer boundaries with the value 0.
marker_value = self.interface[glob_interface_num].marker_value
self.interface[glob_interface_num].mark(self.interface_marker, marker_value)
self.interface[glob_interface_num].init_facet_dictionaries(self.interface_marker, marker_value, subdomain_index=0)
# print(self.interface[glob_interface_num])
if self.write2file['meshes_and_markers']:
filepath = self.output_dir+f"interface_marker_global_meshrez{self.mesh_resolution}.pvd"
df.File(filepath) << self.interface_marker
def _init_subdomains(self) -> None:
""" generate dictionary of opjects of class DomainPatch.
This method generates a dictionary of opjects of class DomainPatch, containing
everything that is needed for the LDDsolver. Additionally a linear solver
object is created.
"""
# The list is_richards has the same length as the nuber of subdomains.
# Therefor it is used in the subdomain initialisation loop.
for subdom_num, isR in self.isRichards.items():
if self.interface_def_points is None:
interface_list = None
else:
interface_list = self._get_interfaces(subdom_num)
# print(f"has_interface list for subdomain {subdom_num}:", interface_list)
if self.densities is None:
subdom_densities = None
else:
subdom_densities = self.densities[subdom_num]
self.subdomain.update(
{subdom_num : dp.DomainPatch(
subdomain_index=subdom_num,
isRichards=isR,
mesh=self.mesh_subdomain[subdom_num],
viscosity=self.viscosity[subdom_num],
porosity=self.porosity[subdom_num],
outer_boundary_def_points=self.outer_boundary_def_points[subdom_num],
interfaces=self.interface,
has_interface=interface_list,
densities=subdom_densities,
include_gravity=self.include_gravity,
gravity_acceleration=self.gravity_acceleration,
L=self.L[subdom_num],
relative_permeability=self.relative_permeability[subdom_num],
intrinsic_permeability=self.intrinsic_permeability[subdom_num],
saturation=self.saturation[subdom_num],
timestep_size=self.timestep_size,
interpolation_degree=self.interpolation_degree,
tol=self.tol,
degree=self.FEM_Lagrange_degree)
}
)
# setup the linear solvers and set the solver parameters.
# df.LinearSolver(self.solver)
if self.solver_type_is_Krylov:
self.subdomain[subdom_num].linear_solver = df.KrylovSolver(self.solver, self.preconditioner)
else:
self.subdomain[subdom_num].linear_solver = df.LUSolver()
# print("solver method:")
# for parameter, value in self.subdomain[subdom_num].linear_solver.parameters.items():
# print(f"\'{parameter}\': {value}")
# assign solver parameters set at the beginning of the LDDsimulation class.
solver_param = self.subdomain[subdom_num].linear_solver.parameters
# df.info(solver_param, True)
for parameter_name in self.solver_parameters.keys():
solver_param[parameter_name] = self.solver_parameters[parameter_name]
# # ## print out set solver parameters
# for parameter, value in self.subdomain[subdom_num].linear_solver.parameters.items():
# print(f"parameter: {parameter} = {value}")
if self.write2file['meshes_and_markers']:
filepath = self.output_dir+f"interface_marker_subdomain{subdom_num}_meshrez{self.mesh_resolution}.pvd"
df.File(filepath) << self.subdomain[subdom_num].interface_marker
filepath = self.output_dir+f"outer_boundary_marker_subdomain{subdom_num}_meshrez{self.mesh_resolution}.pvd"
df.File(filepath) << self.subdomain[subdom_num].outer_boundary_marker
# print(self.subdomain[subdom_num])
def _get_interfaces(self, subdomain_index: int, #
adjacent_subdomains: tp.List[np.ndarray] = None) -> np.ndarray:
""" determin all (global) indices of interfaces belonging to subdomain
with index subdomain_index.
This method determins all (global) indices of interfaces belonging to
subdomain with index subdomain_index from adjacent_subdomains.
"""
if adjacent_subdomains is None:
adjacent_subdomains = self.adjacent_subdomains
else:
# overwrite what has been set by init()
self.adjacent_subdomains = adjacent_subdomains
interface_of_subdomain =[]
for global_interface_ind, adj_doms in enumerate(adjacent_subdomains):
# interface [i,j] and [j,i] are not counted seperately in adjacent_subdomains
# therefor we don't care wether subdomain_index == adj_doms[0] or
# subdomain_index == adj_doms[1]. if subdomain_index is one of either,
# the interface belongs to the subdomain with this index.
if np.isin(subdomain_index, adj_doms, assume_unique=True):
interface_of_subdomain.append(global_interface_ind)
return interface_of_subdomain
def _init_initial_values(self, interpolation_degree: int = None):
""" set initial values
This method populates the subdomain dictionaries as well as the interface
dictionaries with initial values needed for the LDDsolver iterations.
Additionally, the initial value gets written to the solution file of each
subdomain.
"""
if interpolation_degree is None:
interpolation_degree = self.interpolation_degree
for num, subdomain in self.subdomain.items():
V = subdomain.function_space["pressure"]
# p0 contains both pw_0 and pnw_0 for subdomain[num]
p0 = self.initial_conditions[num]
for phase in subdomain.has_phases:
# note that the default interpolation degree is interpolation_degree
pa0 = df.Expression(p0[phase], domain = subdomain.mesh, #
degree = interpolation_degree,#
t=self.starttime)
pa0_interpolated = df.interpolate(pa0, V[phase])
# alocate memory
subdomain.pressure.update(
{phase: df.Function(V[phase])}
)
subdomain.pressure_prev_iter.update(
{phase: df.Function(V[phase])}
)
subdomain.pressure_prev_timestep.update(
{phase: df.Function(V[phase])}
)
subdomain.pressure[phase].assign(pa0_interpolated)
subdomain.pressure_prev_iter[phase].assign(pa0_interpolated)
subdomain.pressure_prev_timestep[phase].assign(pa0_interpolated)
# write zeros to the abs_diff variables
if self.write2file['absolute_differences']:
absolute_difference = df.Function(subdomain.function_space["pressure"][phase])
# pressure_difference = pressure_exact[phase].vector()[:] - subdomain.pressure[phase].vector()[:]
# abs_diff_tmp = np.fabs(pressure_difference)
# absolute_difference.vector()[:] = abs_diff_tmp
if phase == "wetting":
data_set_name="abs_diff_w"
else:
data_set_name="abs_diff_nw"
self.write_function_to_xdmf(#
function=absolute_difference,
file=self.solution_file[num], #
time=self.starttime,#
data_set_name=data_set_name
)
if self.write2file['solutions']:
subdomain.write_solution_to_xdmf(#
file = self.solution_file[num], #
time = self.starttime,#
write_iter_for_fixed_time = False,
)
def _init_DirichletBC_dictionary(self, interpolation_degree: int = None):
"""initialise dictionary that holds the dolfin.DirichletBC objects
for each outer boundary of each subdomain.
We create two dictionaries, self.outerBC and self.dirichletBC_dfExpression.
The second one holds the dolfin expressions that is used to create the
dirichlet boundary object of class dolfin.dirichletBC as well as update
the time of the expression. The second one holds the dolfin.dirichletBC
boundary object itself.
"""
if interpolation_degree is None:
interpolation_degree = self.interpolation_degree
# we only need to set Dirichlet boundary conditions if we have an outer
# boundary.
self.outerBC = dict()
self.dirichletBC_dfExpression = dict()
for subdom_index, subdomain in self.subdomain.items():
if subdomain.outer_boundary is None:
self.outerBC.update({subdom_index: None})
self.dirichletBC_dfExpression.update({subdom_index: None})
else:
self.outerBC.update({subdom_index: dict()})
self.dirichletBC_dfExpression.update({subdom_index: dict()})
V = subdomain.function_space["pressure"]
boundary_marker = subdomain.outer_boundary_marker
# loop over all outer boundary parts. This is a bit of a brainfuck:
# subdomain.outer_boundary gives a dictionary of the outer boundaries of the subdomain.
# Since a domain patch can have several disjoint outer boundary parts, the expressions
# need to get an enumaration index which starts at 0. So subdomain.outer_boundary[j] is
# the dictionary of outer dirichlet conditions of the subdomain and boundary part j for
# the wetting and the nonwetting phase, so subdomain.outer_boundary[j]['wetting'] and
# subdomain.outer_boundary[j]['nonwetting'] return
# the actual expression needed for the dirichlet condition for both phases if present.
for boundary_index, boundary in enumerate(subdomain.outer_boundary):
# create a dictionary of dictionaries for each outer boundary
# part of the subdomain.
self.outerBC[subdom_index].update({boundary_index: dict()})
self.dirichletBC_dfExpression[subdom_index].update({boundary_index: dict()})
# this contains the Expression strings input by the user.
pDC = self.dirichletBC_expression_strings[subdom_index][boundary_index]
boundary_marker_value = boundary.marker_value
for phase in subdomain.has_phases:
pDCa = df.Expression(pDC[phase], domain = subdomain.mesh, #
degree = interpolation_degree, #
t=self.starttime)
self.dirichletBC_dfExpression[subdom_index][boundary_index].update({phase: pDCa})
self.outerBC[subdom_index][boundary_index].update(
{phase: df.DirichletBC(V[phase], pDCa, boundary_marker, boundary_marker_value)}
)
def update_DirichletBC_dictionary(self, subdomain_index: int,#
interpolation_degree: int = None, #
time: float = None,#
):
""" update time of the dirichlet boundary object for subdomain with
index subdomain_index.
In case we don't have a case with an exact solution, we should have 0
expressions in self.dirichletBC_expression_strings, so nothing should happen.
"""
if interpolation_degree is None:
interpolation_degree = self.interpolation_degree
if time is None:
time = self.t
subdomain = self.subdomain[subdomain_index]
# in case time == self.starttime, the expression has already been
# assembled by self._init_DirichletBC_dictionary.
if np.fabs(time - self.starttime) < self.tol:
print("warning: update_DirichletBC_dictionary tried to update initial time")
pass
else:
V = subdomain.function_space["pressure"]
boundary_marker = subdomain.outer_boundary_marker
for boundary_index, boundary in enumerate(subdomain.outer_boundary):
boundary_marker_value = boundary.marker_value
for phase in subdomain.has_phases:
pDCa = self.dirichletBC_dfExpression[subdomain_index][boundary_index][phase]
pDCa.t = time
self.outerBC[subdomain_index][boundary_index].update(
{phase: df.DirichletBC(V[phase], pDCa, boundary_marker, boundary_marker_value)}
)
def post_processing(self):
"""post processing of the simulation.
calculate
- pc_exact and pc_num
- absolute differences to exact solution if present.
- relative errors to exact solution if present
"""
for subdom_ind, subdomain in self.subdomain.items():
self._mesh = df.Mesh()
solution_file = self.solution_file[subdom_ind]
post_solution_file = self.postprocessed_solution_file[subdom_ind]
pressure = dict()
# the internal time series counter in df.XDMFFile starts at 0 and
# refers to the first saved value. This corresponds to the initial
# values. We then calculated self.number_of_timesteps many timesteps.
t = self.starttime
for internal_timestep in range(self.number_of_timesteps+1):
for phase in subdomain.has_phases:
pressure.update(
{phase: df.Function(subdomain.function_space["pressure"][phase])}
)
phase_pressure_string = "pressure_"+"{}".format(phase)
solution_file.read_checkpoint(
u=pressure[phase],
name=phase_pressure_string,
counter=internal_timestep
)
pressure[phase].rename("pressure_"+"{}".format(phase), "pressure_"+"{}".format(phase))
post_solution_file.write(pressure[phase], t)
t += self.timestep_size
print(f"read pressure of {phase}phase:")
print(pressure[phase].vector()[:])
solution_file.close()
# # if we have an exact solution, write out the |u -uh|_L2 to see the
# # absolute error.
# if subdomain.pressure_exact is not None:
# relative_L2_errornorm = dict()
# for phase in subdomain.has_phases:
# pa_exact = subdomain.pressure_exact[phase]
# pa_exact.t = self.t
# norm_pa_exact = df.norm(pa_exact, norm_type='L2', mesh=subdomain.mesh)
# error_calculated = df.errornorm(pa_exact, subdomain.pressure[phase], 'L2', degree_rise=6)
# if norm_pa_exact > self.tol:
# relative_L2_errornorm.update(
# {phase: error_calculated/norm_pa_exact}
# )
# else:
# relative_L2_errornorm.update(
# {phase: error_calculated}
# )
# errornorm_filename = self.output_dir+self.output_filename_parameter_part[subdom_ind]+\
# "_L2_errornorms_over_time" +".csv"
# self.write_errornorms_to_csv(
# filename = errornorm_filename, #
# subdomain_index = subdom_ind,
# errors = relative_L2_errornorm,
# )
def _init_exact_solution_expression(self):
""" initialise dolfin expressions for exact solution and populate
self.exact_solution_expr dictionary. This is used to calculate errornorms
in the case of an exact solution.
"""
if self.exact_solution is not None:
for sd_ind, subdomain in self.subdomain.items():
subdomain.pressure_exact = dict()
for phase in subdomain.has_phases:
print(f"initialising exact solutions for subdomain{subdomain.subdomain_index} and phase {phase}")
pa_exact = self.exact_solution[sd_ind][phase]
subdomain.pressure_exact.update(
{phase: df.Expression(pa_exact, domain = subdomain.mesh, #
degree=self.interpolation_degree, #
t=self.starttime) }
)
else:
pass
def _init_sources(self, interpolation_degree: int = None):
""" create source expressions at self.starttime and populate subdomain
source dictionaries.
"""
if interpolation_degree is None:
interpolation_degree = self.interpolation_degree
for subdomain_index, subdomain in self.subdomain.items():
for phase in subdomain.has_phases:
# here t = 0 and we have to initialise the sources.
f = self.sources[subdomain_index][phase]
subdomain.source.update(
{phase: df.Expression(f, domain = subdomain.mesh,#
degree = interpolation_degree, #
t = self.starttime)}
)
def _add_parameters_to_output_dirname(self):
""" function to create a new output directory for given set of input
parameters and adjust self.output_dir accordingly
"""
gravity_string = ""
if self.include_gravity:
gravity_string = "_with_gravity"
# if we want to investigate the EOC and mesh dependence, we do not want
# to put all data files with different grids in different directories.
if self.mesh_study:
#"{number:.{digits}f}".format(number=h, digits=3)
dirname = "dt" + "{}".format(self.timestep_size)+ \
"mesh_study_on-[{start:.{digits}f},{end:.{digits}f}]".format(start=self.starttime,end=self.endtime,digits=4) + gravity_string + "/"
else:
dirname = "mesh_res" + "{}_".format(self.mesh_resolution)\
+ "dt" + "{}".format(self.timestep_size)\
+ "solver_tol{}".format(self.LDDsolver_tol)\
+ "_t0-{}".format(self.starttime)\
+ gravity_string + "/"
self.output_dir += dirname
if not os.path.exists(self.output_dir):
os.mkdir(self.output_dir)
print(f"\nwill save output of this simulation to path {self.output_dir}")
def _init_analyse_timesteps(self):
""" determin timesteps to anayise """
if self.number_of_timesteps_to_analyse == 0:
self.analyse_timesteps = None
else:
self.analyse_timesteps = np.linspace(
1 + self.starttime_timestep_number_shift,
self.number_of_timesteps + self.starttime_timestep_number_shift,
self.number_of_timesteps_to_analyse,
dtype=int
)
self.endtime = self.starttime \
+ self.number_of_timesteps*self.timestep_size
# time_interval = [starttime, Tmax]
print(f"The simulation interval is [{self.starttime}, {self.endtime}]\n")
print(f"the following timesteps will be analysed: {self.analyse_timesteps}\n")
def _init_simulation_output(self):
"""set up dictionary for simulation output"""
self.output = dict()
for subdom_ind in self.isRichards.keys():
subdomain = self.subdomain[subdom_ind]
self.output.update({subdom_ind: dict()})
self.output[subdom_ind].update({'mesh_size': subdomain.mesh_size})
self.output[subdom_ind].update({'errornorm': dict()})
for phase in subdomain.has_phases:
self.output[subdom_ind]['errornorm'].update({phase: dict()})
self.output[subdom_ind]['errornorm'][phase].update(
{# keys mean the norm in time. We use L2 in space
'Linf': 0.0,
'L1': 0.0,
'L2': 0.0 }
)