Select Git revision
domainPatch.py
domainPatch.py 48.65 KiB
"""Class definitions for subdomains representing soil patches for LDD-TPR simulation
This file contains class definitions for the class DomainPatch which defines the
subdomains used for the LDD simulation. Each subdomain of this class holds
information about permeability, porosity, the model/physics used on the patch
as well as information about which parts of the boundary are outer boundary of
the global domain and which parts are interfaces to neighboring soil patches of
the same class.
Additionally, a class BoundaryPart is defined, which accepts a List of dolfin
Points vertices, representing the vertices of a 1D polygone. The class contains
the method BoundaryPart.inside(meshpoint,on_boundary) which is used to mark
faces of a given subdomain mesh that lie on the 1D-polygone defined by vertices.
Assumed is the use of python 3.6
"""
import dolfin as df
import mshr
import numpy as np
# import domainPatch as dp
import typing as tp
import ufl as fl
import LDDsimulation
import boundary_and_interface as bi
from termcolor import colored
from solutionFile import SolutionFile
# Interface = tp.NewType('Interface', tp.any)
class DomainPatch(df.SubDomain):
""" DomainPatch Class defines subdomains used in the LDD simulation
The class DomainPatch defines subdomains for the LDD simulation. Each subdomain
of this class holds information about the phyisics involved like viscosity,
porosity, the model/physics that is assumed on the patch
as well as information about which parts of the boundary are outer boundary of
the global domain and which parts are interfaces to neighboring soil patches of
the same class.
The constructor accepts
# Parameters
isRichards #type: bool return true if Richards model is
assumed on patch.
mesh #type df::Mesh & Dolfin mesh object, submesh of the
subdomain.
porosity #type float porosity of the the soil in the patch
assumed to be constant in space.
viscosity #type tp.List[float] viscosity(ies) of the the
fluid phases in the patch viscosity[0] is assumed
to be the viscosity of the wetting phase and if
present (isRichards == False) viscosity[1] is
assumed to be the viscosity of the non-wetting phase.
interfaces: #type tp.List[bi.Interface] List of Interface class
objects from the LDDsimulation
class.
has_interface #type tp.List[int]: list of indices (w.r.t. the global
numbering of interfaces in
LDDsimulation.interfaces) indicating
which interfaces are part of the
subdomain.
L #type tp.List[float] L parameters for the L-scheme. In
case isRichards == False (i.e
two-phase flow is assumed on the
patch) we have two parameters, one
for each equation.
lambda_param #tp.List[float] L parameters for the L-scheme. In
case isRichards == False (i.e
two-phase flow is assumed on the
patch) we have two parameters, one
for each equation.
densities tp.Dict[int, tp.Dict[str, float]] Dictionary for both phases
holding the densities for the gravity
term.
include_gravity bool flag to toggle wether or not to
include the gravity term
gravity_acceleration float gravity acceleration for the gravity
term
## Public Variables
isRichards #type bool set by input, see above
mesh #type df.Mesh set by input, see above
porosity #type float set by input, see above
viscosity #type tp.List[float] set by input, see above
interfaces: #type tp.List[bi.Interface] set by input, see above
has_interface #type tp.List[int] set by input, see above
L #type tp.List[float] set by input, see above
lambda_param #type tp.List[float] set by input, see above
function_space #type tp.Dict[str: df.Function] function space
used for wetting
and nonwetting
pressures. This is
set by
self._init_function_space()
parent_mesh_index #type np.array parent vertex map of the submesh.
boundary_marker #type df.MeshFunction marker function to mark all
boundaries of the subdomain.
The marker value marking
interface i with global index
has_interface[i] will be
has_interface[i] aswell.
## Public Methods
"""
### constructor
def __init__(self, #
subdomain_index: int,#
isRichards: bool,#
mesh: df.Mesh,#
porosity: float,#
viscosity: tp.Dict[str, float],#
outer_boundary_def_points: tp.Dict[str, tp.List[df.Point]],#
# tp.List[Interface] doesn't work, because it hasen't been defined yet.
interfaces: tp.List[tp.Type[bi.Interface]],#
has_interface: tp.List[int],#
L: tp.Dict[str, float],#
lambda_param: tp.Dict[str, float],#
relative_permeability: tp.Dict[str, tp.Callable[...,None]],#
saturation: tp.Callable[..., None],#
timestep_size: float,#
densities: tp.Dict[str, float] = None,
include_gravity: bool = False,#
gravity_acceleration: float = 9.81,
interpolation_degree: int = 10,
tol = None,#
):
# because we declare our own __init__ method for the derived class BoundaryPart,
# we overwrite the __init__-method of the parent class df.SubDomain. However,
# we need the methods from the parent class, so we call the __init__-method
# from df.SubDomain to access it.
df.SubDomain.__init__(self)
if tol:
self.tol = tol
else:
self.tol = df.DOLFIN_EPS
### Class Variables/Objects set by the input to the constructor
self.subdomain_index = subdomain_index
self.isRichards = isRichards
self.mesh = mesh
self.porosity = porosity
self.viscosity = viscosity
self.outer_boundary_def_points = outer_boundary_def_points
self.interface = interfaces
self.has_interface = has_interface
self.densities = densities
self.include_gravity = include_gravity
self.gravity_acceleration = gravity_acceleration
self.L = L
self.lambda_param = lambda_param
self.relative_permeability = relative_permeability
self.saturation = saturation
# timestep size, tau in the paper
self.timestep_size = timestep_size
self.interpolation_degree = interpolation_degree
### Class variables set by methods
# sometimes we need to loop over the phases and the length of the loop is
# determined by the model we are assuming
if self.isRichards:
self.has_phases =['wetting']
else:
self.has_phases =['wetting', 'nonwetting']
# dictionary holding the dof indices corresponding to an interface of
# given interface. see self._calc_dof_indices_of_interfaces()
self._dof_indices_of_interface = dict()
# dictionary holding for each interface in self.has_interface the dof
# indices that are shared with another interface (possibly none). This info is needed for
# the calculation of the gli terms.
self._interface_has_common_dof_indices = dict()
# List of objects of clas outerBoundary initialised by self._init_markers()
# can be NONE if no outer boundary is present.
self.outer_boundary = []
# measures are set by self._init_measures()
self.iteration_number = 0
self.dx = None
self.ds = None
# solution function(s) for the form set by LDDsimulation._init_initial_values()
self.pressure = dict()
# this is being populated by LDDsimulation._eval_sources()
self.source = dict()
# dictionary of FEM function for each phase holding the previous iteration.
self.pressure_prev_iter = dict()
# dictionary of FEM function for each phase holding the pressures of the previous timestep t_n.
self.pressure_prev_timestep = dict()
# test function(s) for the form set by _init_function_space
self.testfunction = None
# Dictionary holding the exact solution expression if we have a case with exact solution.
# Is set by LDDsimualtion._init_exact_solution_expression()
self.pressure_exact = None
# Dictionary of FEM function of self.function_space holding the interface
# dofs of the previous iteration of the neighbouring pressure. This is used
# to assemble the gli terms in the method self.calc_gli_term().
# self.neighbouring_interface_pressure = None
# valiable holding a solver object of type df.KrylovSolver. It is set the
# first time LDDsimulation.LDDsolver is run.
self.linear_solver = None
self._init_function_space()
self._init_dof_maps()
self._init_markers()
self._init_measures()
self._calc_dof_indices_of_interfaces()
self._calc_interface_has_common_dof_indices()
if self.include_gravity:
self.gravity_term = dict()
self._calc_gravity_expressions()
else:
self.gravity_term = None
# Dictionary of Arrays (one for 'wetting' and one for 'nonwetting')
# corresponding to the dofs of an FEM function which is initiated
# by calc_gli_term() holding the dofs of the assembled dt*gli*v*ds terms as
# a sparse vector (here, dt = self.timestep_size). This vector is read by the solver and added to the rhs
# which is given by self.govering_problem().
#
self.gli_assembled = dict()
# Dictionary of FEM function holding the same dofs gli_assembled above
# but as a df.Function. This is needed for writing out the gli terms when
# analyising the convergence of a timestep.
self.gli_function = dict()
for phase in self.has_phases:
self.gli_function.update(
{phase: df.Function(self.function_space[phase])}
)
# END constructor
#### PUBLIC METHODS
def write_pressure_to_interfaces(self, previous_iter: int = False):
""" save the interface values of self.pressure to all neighbouring interfaces
This method is used by the LDDsimulation.LDDsolver method.
# parameters
previous_iter bool Flag to toggle wether to write self.pressure
to interface.pressure_values or to
interface.pressure_values_prev
"""
from_func = self.pressure
for ind in self.has_interface:
for phase in self.has_phases:
self.interface[ind].read_pressure_dofs(from_function = from_func[phase], #
interface_dofs = self._dof_indices_of_interface[ind][phase],#
dof_to_vert_map = self.dof2vertex[phase],#
local_to_parent_vertex_map = self.parent_mesh_index,#
phase = phase,#
subdomain_ind = self.subdomain_index,
previous_iter = previous_iter)
def governing_problem(self, phase: str) -> tp.Dict[str, fl.Form]:
""" return the governing form and right hand side for phase phase as a dictionary.
return the governing form ant right hand side for phase phase (without the gli terms) as a dictionary.
CAUTION: the gli terms are not assembled in this method and need to be
calculated by the solver separately and subtracted from the rhs!
"""
# define measures
dx = self.dx
ds = self.ds
dt = self.timestep_size
porosity = self.porosity
if self.isRichards:
# this assumes that atmospheric pressure has been normalised to 0.
# and since
pc_prev_iter = -self.pressure_prev_iter['wetting']
pc_prev_timestep = -self.pressure_prev_timestep['wetting']
if phase == 'nonwetting':
print("CAUTION: You invoked domainPatch.governing_problem() with\n", #
"Parameter phase = 'nonwetting'. But isRichards = True for",
"current subdomain. \nResetting phase = 'wetting'.\n")
phase = 'wetting'
else:
pw_prev_iter = self.pressure_prev_iter['wetting']
pnw_prev_iter = self.pressure_prev_iter['nonwetting']
pw_prev_timestep = self.pressure_prev_timestep['wetting']
pnw_prev_timestep = self.pressure_prev_timestep['nonwetting']
pc_prev_iter = pnw_prev_iter - pw_prev_iter
pc_prev_timestep = pnw_prev_timestep - pw_prev_timestep
La = self.L[phase]
pa = self.trialfunction[phase]
# this is pw_{i-1} in the paper
pa_prev_iter = self.pressure_prev_iter[phase]
phi_a = self.testfunction[phase]
Lambda_a = self.lambda_param[phase]
ka = self.relative_permeability[phase]
S = self.saturation
mu_a = self.viscosity[phase]
source_a = self.source[phase]
if self.include_gravity:
z_a = self.gravity_term[phase]
# the first part of the form and rhs are the same for both wetting and nonwetting
form1 = (La*pa*phi_a)*dx
rhs1 = (La*pa_prev_iter*phi_a)*dx
# the forms of wetting and nonwetting phase differ by a minus sign
# and by interface_form terms if the neighbour of the current patch
# assumes Richards model
if phase == "wetting":
# gather interface forms
interface_forms = []
# the marker values of the interfacese are global interfaces index + 1
for interface in self.has_interface:
# print(f"subdomain{self.subdomain_index} has interfaces: {interface}\n")
marker_value = self.interface[interface].marker_value
# interface_forms += (dt*Lambda_a*pa*phi_a)*ds(interface)
interface_forms.append((dt*Lambda_a*pa*phi_a)*ds(marker_value))
# form2 is different for wetting and nonwetting
form2 = dt/mu_a*df.dot(ka(S(pc_prev_iter))*df.grad(pa), df.grad(phi_a))*dx
rhs2 = -(porosity*(S(pc_prev_iter) - S(pc_prev_timestep))*phi_a)*dx
if self.include_gravity:
# z_a included the minus sign already, see self._calc_gravity_expressions
rhs_gravity = -dt/mu_a*df.dot(ka(S(pc_prev_iter))*df.grad(z_a), df.grad(phi_a))*dx
elif phase == "nonwetting":
# gather interface forms
interface_forms = []
for interface in self.has_interface:
marker_value = self.interface[interface].marker_value
if self.interface[interface].neighbour_isRichards[self.subdomain_index]:
# if the neighbour of our subdomain (with index self.subdomain_index)
# assumes a Richards model, we assume constant normalised atmospheric pressure
# and no interface term is practically appearing.
# interface_forms += (df.Constant(0)*phi_a)*ds(interface)
# pass
interface_forms.append((df.Constant(0)*phi_a)*ds(marker_value))
else:
# interface_forms += (dt*Lambda_a*pa*phi_a)*ds(interface)
interface_forms.append((dt*Lambda_a*pa*phi_a)*ds(marker_value))
form2 = dt/mu_a*df.dot(ka(1 - S(pc_prev_iter))*df.grad(pa), df.grad(phi_a))*dx
# the non wetting phase has a + before instead of a - before the bracket
rhs2 = (porosity*(S(pc_prev_iter) - S(pc_prev_timestep))*phi_a)*dx
if self.include_gravity:
# z_a included the minus sign already, see self._calc_gravity_expressions
rhs_gravity = -dt/mu_a*df.dot(ka(1 - S(pc_prev_iter))*df.grad(z_a), df.grad(phi_a))*dx
else:
raise RuntimeError('missmatch for input parameter phase.',
'Expected either phase == wetting or phase == nonwetting ')
return None
form = form1 + form2 + sum(interface_forms)
if self.include_gravity:
# z_a included the minus sign already, see self._calc_gravity_expressions
rhs_without_gli = rhs1 + rhs2 + dt*source_a*phi_a*dx + rhs_gravity
else:
rhs_without_gli = rhs1 + rhs2 + dt*source_a*phi_a*dx
form_and_rhs = {#
'form': form,#
'rhs_without_gli' : rhs_without_gli
}
return form_and_rhs
def calc_gl0_term(self, debug: bool = False) -> None: # tp.Dict[str, fl.Form]
"""calculate the gl0 terms for all interfaces of the subdomain.
assemble the gl0 terms for all interfaces of the subdomain and save them
to the dictionries of the interfaces. This method gets called by the LDDsolver
before entering the L-iterations in each time step.
This method assumes that self.pressure_prev_timestep has assigned the values
of self.pressure for all phases.
"""
subdomain = self.subdomain_index
dt = self.timestep_size
Lambda = self.lambda_param
# since we loop over all interfaces in self.has_interface we need a temporary
# vectors (numpy arrays) for each phase that we will use to add all dofs of the gli terms
# into one array.
gli_assembled_tmp = dict()
for phase in self.has_phases:
# print("number of dofs: ", self.pressure_prev_iter['wetting'].vector()[:].size)
gli_assembled_tmp.update(
{phase: np.zeros(self.pressure[phase].vector()[:].size, dtype=float)}
)
for interf_ind in self.has_interface:
interface = self.interface[interf_ind]
# neighbour index
neighbour = interface.neighbour[subdomain]
neighbour_iter_num = interface.current_iteration[neighbour]
ds = self.ds(interface.marker_value)
# needed for the read_gli_dofs() functions
interface_dofs = self._dof_indices_of_interface[interf_ind]
# for each interface, the following is a list of dofs indices belonging to another
# interface. The dofs corresponding to these indices must be multiplied
# by 1/2 to to get an average of the gli terms for dofs belonging to
# two different interfaces.
dofs_in_common_with_other_interfaces = self._interface_has_common_dof_indices[interf_ind]
if debug:
print(f"On subdomain{subdomain}, we have:\n",
f"Interface{interf_ind}, Neighbour index = {neighbour}\n",
f"dofs on interface:\n{interface_dofs['wetting']}\n",
f"dof2global_vertex_map:\n{self.parent_mesh_index[self.dof2vertex['wetting'][interface_dofs['wetting']]]}\n"
f"and dofs common with other interfaces:\n{dofs_in_common_with_other_interfaces['wetting']}")
if self.isRichards:
# assuming that we normalised atmospheric pressure to zero,
# pc = -pw in the Richards case.
pc_prev = -self.pressure_prev_timestep['wetting']
else:
pw_prev = self.pressure_prev_timestep['wetting']
pnw_prev = self.pressure_prev_timestep['nonwetting']
pc_prev = pnw_prev - pw_prev
n = df.FacetNormal(self.mesh)
S = self.saturation
for phase in self.has_phases:
# viscosity
mu_a = self.viscosity[phase]
# relative permeability
ka = self.relative_permeability[phase]
# previous timestep pressure
pa_prev = self.pressure_prev_timestep[phase]
# testfunction
phi_a = self.testfunction[phase]
z_a = self.gravity_term[phase]
if phase == 'nonwetting' and interface.neighbour_isRichards[subdomain]:
# if the neighbour of our subdomain (with index self.subdomain_index)
# assumes a Richards model, we don't have gli terms for the
# nonwetting phase and assemble the terms as zero form. We don't
# need to do anything, since gli_assembled_tmp[phase] has been
# initialised to zero.
pass
# gli_assembled_tmp[phase] += df.assemble(df.Constant(0)*ds).get_local()
else:
if phase == 'wetting':
# the wetting phase is always present and we always need
# to calculate a gl0 term.
# TODO: add intrinsic permeabilty!!!!
# TODO: add gravity term!!!!!
if self.include_gravity:
# z_a included the minus sign already, see self._calc_gravity_expressions
flux = -1/mu_a*ka(S(pc_prev))*df.grad(pa_prev + z_a)
else:
flux = -1/mu_a*ka(S(pc_prev))*df.grad(pa_prev)
else:
# phase == 'nonwetting'
# we need anothre flux in this case
if self.include_gravity:
flux = -1/mu_a*ka(1-S(pc_prev))*df.grad(pa_prev + z_a)
else:
flux = -1/mu_a*ka(1-S(pc_prev))*df.grad(pa_prev)
gl0 = df.dot(flux, n) - Lambda[phase]*pa_prev
gl0_assembled = df.assemble(dt*gl0*phi_a*ds).get_local()
# if dofs_in_common_with_other_interfaces[phase].size == 0
# there are no dofs that lie on another interface and we can
# skip the following step.
if dofs_in_common_with_other_interfaces[phase].size != 0:
if debug:
print(f"the following dofs (phase = {phase}) of interface{interf_ind} are in common with \n",
f" other interfaces: {dofs_in_common_with_other_interfaces[phase]}")
for common_dof in dofs_in_common_with_other_interfaces[phase]:
# from the standpoint of the subdomain we are currently on,
# each dof can belong maximally to two interfaces. On these
# vertices, common to two interfaces, the dofs of the
# corresponding gli terms of both interfaces are averaged
# to give one value for this vertex. Because we achieve this
# by adding the assembled gli terms together we have to
# multiply the dofs that appear in more than one interface
# by 1/2.
gl0_assembled[common_dof] = 0.5*gl0_assembled[common_dof]
# now, add the just assembled and corrected term to the Global
# gli_assembled_tmp vector
gli_assembled_tmp[phase] += gl0_assembled
# writing out glo to the current iteration dictionary of the
# interface takes place after the loop, see below.
### END for ind in self.has_interface:
# save the result of the calculation to the subdomain and to the interface
# dictionaries.
for phase in self.has_phases:
for interf_ind in self.has_interface:
# self._calc_gli_term_on(interface, iteration)
interface = self.interface[interf_ind]
if debug:
print(f"Interface{interf_ind}.gli_term_prev[{subdomain}][{phase}]=\n",interface.gli_term_prev[subdomain][phase])
print(f"before writing to interface[{interf_ind}] the values gli_assembled[{phase}]=\n",gli_assembled_tmp[phase])
# write gli dofs to interface dicts for communiction.
interface.read_gli_dofs(from_array = gli_assembled_tmp[phase],#self.gli_assembled[phase], #
interface_dofs = interface_dofs[phase],#
dof_to_vert_map = self.dof2vertex[phase],#
local_to_parent_vertex_map = self.parent_mesh_index,#
phase = phase,#
subdomain_ind = subdomain,#
previous_iter = True
)
if debug:
print(f"gl0_terms after writing to interface[{interf_ind}] Interface[{interf_ind}].gli_term_prev[{subdomain}][{phase}]=\n",interface.gli_term_prev[subdomain][phase])
### END calc_gl0_term
def calc_gli_term(self, debug: bool = False) -> None: # tp.Dict[str, fl.Form]
"""calculate the gli terms for the iteration number of the subdomain
calculate the gl terms for the iteration number of the subdomain and save them
to self.gli_assembled. The term gets saved as an already assembled sparse
ds form to be added to the rhs by the solver. This is due to the need of
having to communicate dofs over the interface. Additionally, the dofs are
being saved to an interface dictionary exactly as the pressures.
Caution: The array self.gli_assembled needs to be added to the rhs with
a minus sign by the solver!
"""
# this is the number of the current iteration of the subdomain.
iteration = self.iteration_number
subdomain = self.subdomain_index
dt = self.timestep_size
Lambda = self.lambda_param
# since we loop over all interfaces in self.has_interface we need a temporary
# vector (numpy array) that we will use to add all dofs of the gli terms
# into one array.
gli_assembled_tmp = dict()
for phase in self.has_phases:
# print("number of dofs: ", self.pressure_prev_iter['wetting'].vector()[:].size)
gli_assembled_tmp.update(
{phase: np.zeros(self.pressure[phase].vector()[:].size, dtype=float)}
)
for ind in self.has_interface:
interface = self.interface[ind]
# neighbour index
neighbour = interface.neighbour[subdomain]
# update the current_iteration number of subdomain stored in the interface
# to the current iteration number of the subdomain.
interface.current_iteration[subdomain] = iteration
# save the iteration number of the neighbour
neighbour_iter_num = interface.current_iteration[neighbour]
ds = self.ds(interface.marker_value)
# needed for the read_gli_dofs() functions
interface_dofs = self._dof_indices_of_interface[ind]
# for each interface, the following is a list of dofs indices belonging to another
# interface. The dofs corresponding to these indices must be multiplied
# by 1/2 to to get an average of the gli terms for dofs belonging to
# two different interfaces.
dofs_in_common_with_other_interfaces = self._interface_has_common_dof_indices[ind]
if debug:
print(f"On subdomain{subdomain}, we have:\n",
f"Interface{ind}, Neighbour index = {neighbour}\n",
f"dofs on interface:\n{interface_dofs['wetting']}\n",
f"dof2global_vertex_map:\n{self.parent_mesh_index[self.dof2vertex['wetting'][interface_dofs['wetting']]]}\n"
f"and dofs common with other interfaces:\n{dofs_in_common_with_other_interfaces['wetting']}")
for phase in self.has_phases:
# in case phase == 'nonwetting' only proceed if the neighbouring
# subdomain does not assume a Richards model.
if (phase == 'wetting') or (not interface.neighbour_isRichards[subdomain]):
# read gli_prev term from the neighbour.
# to be able to sum the gli terms of all interfaces together,
# we need a dummy vector to hold the dofs read from the interface.
gli_prev_of_neighbour = np.zeros(gli_assembled_tmp[phase].size, dtype = float)
interface.write_gli_dofs(to_array = gli_prev_of_neighbour, #
interface_dofs = interface_dofs[phase],#
dof_to_vert_map = self.dof2vertex[phase],#
local_to_parent_vertex_map = self.parent_mesh_index,#
phase = phase,#
subdomain_ind = neighbour,#
# this is set to True because we need gli_prev
previous_iter = True
)
if debug:
print(f"read gli_prev_iter[{subdomain}]:\n", gli_prev_of_neighbour)
# read previous neighbouring pressure values from interface and write them to
# the function pa_prev. We need to make a distinction first.
if iteration == neighbour_iter_num + 1:
# in this case the neighbour has not yet iterated beyond
# the iteration we are currently in, so
# interface.pressure_values[neighbour] holds the "previous pressure"
# we need to read from the interface to calculate the current gli.
take_previous_pressure_from_interface = False
elif iteration == neighbour_iter_num:
# this is the only possible other case. In this case the
# neighbour has iterated once beyond the iteration we are
# currently in, so interface.pressure_values_prev holds the "previous pressure"
# we need to read from the interface to calculate gli.
take_previous_pressure_from_interface = True
else:
raise RuntimeError("\n!!!!!!!!! Warning !!!!!!!!!!!!!! \n"+\
"this case should not appear\n"+"neighbour_iter_num =" + "{}".format(neighbour_iter_num)\
+ "for neighbour = "+ "{}".format(neighbour)+\
"and iteration = " + "{} on subdomain".format(iteration)+ "{}".format(subdomain)\
+ "You suck! Revisite DomainPatch.calc_gli_term() and LDDsimulation.LDDsolver ASAP.")
pa_prev = df.interpolate(df.Constant(0), self.function_space[phase])
interface.write_pressure_dofs(to_function = pa_prev,#self.neighbouring_interface_pressure[phase], #
interface_dofs = interface_dofs[phase],#
dof_to_vert_map = self.dof2vertex[phase],#
local_to_parent_vertex_map = self.parent_mesh_index,#
phase = phase,#
subdomain_ind = neighbour,#
previous_iter = take_previous_pressure_from_interface)
# use the above to calculate the gli update with
# in the paper p^{i-1}_{3-l}
# pa_prev = self.neighbouring_interface_pressure[phase]
phi_a = self.testfunction[phase]
gli_pressure_part = df.assemble(-2*dt*Lambda[phase]*pa_prev*phi_a*ds)
gli_pressure_part_array = gli_pressure_part.get_local()
gli = gli_pressure_part_array - gli_prev_of_neighbour
# check if there are shared dofs with another interface.
if dofs_in_common_with_other_interfaces[phase].size != 0:
for common_dof in dofs_in_common_with_other_interfaces[phase]:
# see previous case.
gli[common_dof] = 0.5*gli[common_dof]
# now, add the just assembled and corrected term to the Global
# gli_assembled_tmp vector
gli_assembled_tmp[phase] += gli
### END for ind in self.has_interface:
# save the result of the calculation to the subdomain and to the interface
# dictionaries.
for phase in self.has_phases:
self.gli_assembled.update(
{phase: gli_assembled_tmp[phase].copy()}
)
# communicate the newly calculated terms
for ind in self.has_interface:
# self._calc_gli_term_on(interface, iteration)
interface = self.interface[ind]
if debug:
print(f"Interface{ind}.gli_term[{subdomain}][{phase}]=\n",interface.gli_term[subdomain][phase])
print(f"before writing to interface[{ind}] gli_assembled[{phase}]=\n",self.gli_assembled[phase])
# write gli dofs to interface dicts for communiction. We have to
# distinguish different cases.
if iteration == neighbour_iter_num + 1:
# in this case, the neighbour has not done the next iteration
# and will need both pressure and gli_term_prev of the current
# subdomain. Therefor, we save the the calculated gli_terms to
# the gli_term dictionary of the current subdomain, not the
# gli_term_prev dictionary.
interface.read_gli_dofs(from_array = self.gli_assembled[phase], #
interface_dofs = interface_dofs[phase],#
dof_to_vert_map = self.dof2vertex[phase],#
local_to_parent_vertex_map = self.parent_mesh_index,#
phase = phase,#
subdomain_ind = subdomain,#
previous_iter = False
)
else:
# This should be the case iteration == neighbour_iter_num:
# in this case, the neighbour has done the next iteration already
# and will not need both pressure_prev and gli_term_prev of the current
# subdomain. Therefor, we save the the calculated gli_terms to
# the gli_term_prev dictionary of the current subdomain, not the
# gli_term dictionary.
interface.read_gli_dofs(from_array = self.gli_assembled[phase], #
interface_dofs = interface_dofs[phase],#
dof_to_vert_map = self.dof2vertex[phase],#
local_to_parent_vertex_map = self.parent_mesh_index,#
phase = phase,#
subdomain_ind = subdomain,#
previous_iter = True
)
# Additionally, the gli_term of the neighbour is saved to
# gli_term currently and will be overwritten in the next step,
# if we don't save it to gli_term_prev of the neighbour.
# similarly for the pressure values.
if phase == 'wetting' or not interface.neighbour_isRichards[subdomain]:
interface.gli_term_prev[neighbour].update(#
{phase: interface.gli_term[neighbour][phase].copy()}
)
interface.pressure_values_prev[neighbour][phase].update(
{phase: interface.pressure_values[neighbour][phase].copy()}
)
### END calc_gli_term
def null_all_interface_iteration_numbers(self) -> None:
""" reset interface.current_iteration[subdomain] = 0 for all interfaces
of the subdomain.
"""
subdomain = self.subdomain_index
for index in self.has_interface:
self.interface[index].current_iteration[subdomain] = 0
#### PRIVATE METHODS
def _init_function_space(self) -> None:
""" create function space for solution and trial functions
Note that P1 FEM is hard coded here, as it is assumed in other methods
aswell. This method sets the class variable
self.function_space
self.trialfunction
self.pressure
self.testfunction
"""
self.function_space = dict()
self.trialfunction = dict()
self.testfunction = dict()
# self.neighbouring_interface_pressure = dict()
for phase in self.has_phases:
self.function_space.update({phase: df.FunctionSpace(self.mesh, 'P', 1)})
self.trialfunction.update({phase: df.TrialFunction(self.function_space[phase])})
self.testfunction.update({phase: df.TestFunction(self.function_space[phase])})
# self.neighbouring_interface_pressure.update(
# {phase: df.interpolate(df.Constant(0), self.function_space[phase])}
# )
def _init_dof_maps(self) -> None:
""" calculate dof maps and the vertex to parent map.
This method sets the class Variables
self.parent_mesh_index
self.dof2vertex
self.vertex2dof
"""
mesh_data = self.mesh.data()
# local to parent vertex index map
self.parent_mesh_index = mesh_data.array('parent_vertex_indices',0)
# print(f"on subdomain{self.subdomain_index} parent_vertex_indices: {self.parent_mesh_index}")
self.dof2vertex = dict()
self.vertex2dof = dict()
for phase in self.has_phases:
self.dof2vertex.update(#
{phase :df.dof_to_vertex_map(self.function_space[phase])}#
)
# print(f"on subdomain{self.subdomain_index} dof2vertex: {self.dof2vertex[phase]}")
self.vertex2dof.update(#
{phase :df.vertex_to_dof_map(self.function_space[phase])}#
)
# print(f"on subdomain{self.subdomain_index} vertex2dof: {self.vertex2dof[phase]}")
def _init_markers(self) -> None:
""" define boundary markers
This method sets the class Variables
self.interface_marker
self.outer_boundary_marker
"""
self.outer_boundary_marker = df.MeshFunction('size_t', self.mesh, self.mesh.topology().dim()-1)
self.outer_boundary_marker.set_all(0)
self.interface_marker = df.MeshFunction('size_t', self.mesh, self.mesh.topology().dim()-1)
self.interface_marker.set_all(0)
for glob_index in self.has_interface:
# the marker value is set to the same as the global marker value,
# stored in self.interfaces[glob_index].marker_value.
marker_value = self.interface[glob_index].marker_value
# each interface gets marked with the global interface index.
self.interface[glob_index].mark(self.interface_marker, marker_value)
# create outerBoundary objects and mark outer boundaries with 1
for index, boundary_points in self.outer_boundary_def_points.items():
# if our domain patch has no outer boundary, this should be reflected
# by the value None in the dictionary
if boundary_points is None:
#
self.outer_boundary = None
else:
self.outer_boundary.append(#
bi.BoundaryPart(vertices = boundary_points,#
internal = False,
tol= self.tol) #self.mesh.hmin()/100
)
# similar to the interfaces self.outer_boundary_def_points is a
# dictionary enumerating all the boundary parts. The enumeration
# starts at 0, so we set the marker value to index+1
boundary_marker_value = index+1
self.outer_boundary[index].mark(self.outer_boundary_marker, boundary_marker_value)
def _init_measures(self):
""" define measures for the governing form
This method sets the class Variables
self.dx
self.ds
"""
# domain measure
self.dx = df.Measure('dx', domain = self.mesh)
# measure over the interfaces
self.ds = df.Measure('ds', domain = self.mesh, subdomain_data = self.interface_marker)
def _calc_dof_indices_of_interfaces(self):
""" calculate dof indices of each interface """
V = self.function_space
marker = self.interface_marker
for ind in self.has_interface:
# the marker value for the interfaces is always global interface index+1
marker_value = self.interface[ind].marker_value
self._dof_indices_of_interface.update(#
{ind: dict()}
)
for phase in self.has_phases:
self._dof_indices_of_interface[ind].update(#
{phase: self.interface[ind].dofs_on_interface(V[phase], marker, marker_value)}
)
# print(f"subdomain{self.subdomain_index}: dofs on interface{ind}:", self._dof_indices_of_interface[ind][phase])
def _calc_interface_has_common_dof_indices(self):
""" populate dictionary self._interface_has_common_dof_indices
Determin for each interface in self.has_interface the dof indices that
also belong to another interface and store them in the dictionary
self._interface_has_common_dof_indices
This method populates the dictionary
self._interface_has_common_dof_indices
"""
for interf_ind, interf_dofs in self._dof_indices_of_interface.items():
# initialise bool numpy arry for the search.
if self.isRichards:
is_part_of_neighbour_interface_w = np.zeros(interf_dofs['wetting'].size, dtype=bool)
else:
is_part_of_neighbour_interface_w = np.zeros(interf_dofs['wetting'].size, dtype=bool)
is_part_of_neighbour_interface_nw = np.zeros(interf_dofs['nonwetting'].size, dtype=bool)
for other_interf_ind, other_interf_dofs in self._dof_indices_of_interface.items():
if other_interf_ind is not interf_ind:
if self.isRichards:
is_part_of_neighbour_interface_w += np.isin(interf_dofs['wetting'], other_interf_dofs['wetting'], assume_unique=True)
else:
is_part_of_neighbour_interface_w += np.isin(interf_dofs['wetting'], other_interf_dofs['wetting'], assume_unique=True)
is_part_of_neighbour_interface_nw += np.isin(interf_dofs['nonwetting'], other_interf_dofs['nonwetting'], assume_unique=True)
# this carries a numpyarray of the indices which belong to other
# interfaces, too
self._interface_has_common_dof_indices.update({interf_ind: dict()})
if self.isRichards:
self._interface_has_common_dof_indices[interf_ind].update(
{'wetting': interf_dofs['wetting'][is_part_of_neighbour_interface_w]}
)
else:
self._interface_has_common_dof_indices[interf_ind].update(
{'wetting': interf_dofs['wetting'][is_part_of_neighbour_interface_w],
'nonwetting': interf_dofs['nonwetting'][is_part_of_neighbour_interface_nw]}
)
# for phase in self.has_phases:
# print(f"self._interface_has_common_dof_indices[{interf_ind}][{phase}]", self._interface_has_common_dof_indices[interf_ind][phase])
# print(f"python id of self._interface_has_common_dof_indices[{interf_ind}][{phase}]", id(self._interface_has_common_dof_indices[interf_ind][phase]))
def _calc_gravity_expressions(self):
""" calculate the gravity term if it is included"""
g = df.Constant(self.gravity_acceleration) #
for phase in self.has_phases:
rho = df.Constant(self.densities[phase])
self.gravity_term.update(
{phase: df.Expression('-g*rho*x[1]', domain = self.mesh, #
degree = self.interpolation_degree,
g = g, rho = rho)}
)
def write_solution_to_xdmf(self, file: tp.Type[SolutionFile], #
time: float = None,#
write_iter_for_fixed_time: bool = False,#
exact_solution: bool = False,#
):
"""
Weither write the solution of the simulation and corresponding time
or the solution and corresponding iteration at fixed time
(write_iter_for_fixed_time = True) to disk.
This is needed to visualize the solutions.
PARAMETERS
file str File name of file to write to
time float time at which to write the solution
write_iter_for_fixed_time bool flag to toggle wether pairs of
(solution, iteration) for fixed t should be
written instead of (solution, t)
subsequent_errors df.Funtion FEM Function passed by the solver
containing self.pressure - self.pressure_prev_iter
"""
t = time
for phase in self.has_phases:
if not write_iter_for_fixed_time:
self.pressure[phase].rename("pressure_"+"{}".format(phase), "pressure_"+"{}".format(phase))
file.write(self.pressure[phase], t)
# if self.pressure_exact:
# self.pressure_exact[phase].t = t
# tmp_exact_pressure = df.interpolate(self.pressure_exact[phase], self.function_space[phase])
# tmp_exact_pressure.rename("exact_pressure_"+"{}".format(phase), "exactpressure_"+"{}".format(phase))
# file.write(tmp_exact_pressure, t)
else:
i = self.iteration_number
self.pressure[phase].rename("pressure_"+"{}".format(phase), #
"pressure(t="+"{}".format(t)+")_"+"{}".format(phase)
)
file.write(self.pressure[phase], i)
# create function to copy the subsequent errors to
error = df.Function(self.function_space[phase])
error.assign(self.pressure[phase] - self.pressure_prev_iter[phase])
error.rename("pi_"+"{}".format(phase)+"-pim1_"+"{}".format(phase), #
"pressure(t="+"{}".format(t)+")_"+"{}".format(phase)
)
file.write(error, i)
self.gli_function[phase].vector().set_local(self.gli_assembled[phase])
self.gli_function[phase].rename("gli_function_"+"{}".format(phase),#
"all_gli_terms(t="+"{}".format(t)+")_"+"{}".format(phase))
file.write(self.gli_function[phase], i)
# END OF CLASS DomainPatch