Source code for ufl.algorithms.apply_restrictions
"""Apply restrictions.
This module contains the apply_restrictions algorithm which propagates
restrictions in a form towards the terminals.
"""
# Copyright (C) 2008-2016 Martin Sandve Alnæs
#
# This file is part of UFL (https://www.fenicsproject.org)
#
# SPDX-License-Identifier: LGPL-3.0-or-later
from ufl.algorithms.map_integrands import map_integrand_dags
from ufl.classes import Restricted
from ufl.corealg.map_dag import map_expr_dag
from ufl.corealg.multifunction import MultiFunction
from ufl.domain import extract_unique_domain
from ufl.measure import integral_type_to_measure_name
from ufl.sobolevspace import H1
[docs]class RestrictionPropagator(MultiFunction):
"""Restriction propagator."""
def __init__(self, side=None):
"""Initialise."""
MultiFunction.__init__(self)
self.current_restriction = side
self.default_restriction = "+"
# Caches for propagating the restriction with map_expr_dag
self.vcaches = {"+": {}, "-": {}}
self.rcaches = {"+": {}, "-": {}}
if self.current_restriction is None:
self._rp = {"+": RestrictionPropagator("+"), "-": RestrictionPropagator("-")}
[docs] def restricted(self, o):
"""When hitting a restricted quantity, visit child with a separate restriction algorithm."""
# Assure that we have only two levels here, inside or outside
# the Restricted node
if self.current_restriction is not None:
raise ValueError("Cannot restrict an expression twice.")
# Configure a propagator for this side and apply to subtree
side = o.side()
return map_expr_dag(
self._rp[side], o.ufl_operands[0], vcache=self.vcaches[side], rcache=self.rcaches[side]
)
# --- Reusable rules
def _ignore_restriction(self, o):
"""Ignore current restriction.
Quantity is independent of side also from a computational point
of view.
"""
return o
def _require_restriction(self, o):
"""Restrict a discontinuous quantity to current side, require a side to be set."""
if self.current_restriction is None:
raise ValueError(f"Discontinuous type {o._ufl_class_.__name__} must be restricted.")
return o(self.current_restriction)
def _default_restricted(self, o):
"""Restrict a continuous quantity to default side if no current restriction is set."""
r = self.current_restriction
if r is None:
r = self.default_restriction
return o(r)
def _opposite(self, o):
"""Restrict a quantity to default side.
If the current restriction is different swap the sign, require a side to be set.
"""
if self.current_restriction is None:
raise ValueError(f"Discontinuous type {o._ufl_class_.__name__} must be restricted.")
elif self.current_restriction == self.default_restriction:
return o(self.default_restriction)
else:
return -o(self.default_restriction)
def _missing_rule(self, o):
"""Raise an error."""
raise ValueError(f"Missing rule for {o._ufl_class_.__name__}")
# --- Rules for operators
# Default: Operators should reconstruct only if subtrees are not touched
operator = MultiFunction.reuse_if_untouched
# Assuming apply_derivatives has been called,
# propagating Grad inside the Restricted nodes.
# Considering all grads to be discontinuous, may
# want something else for facet functions in future.
grad = _require_restriction
[docs] def variable(self, o, op, label):
"""Strip variable."""
return op
[docs] def reference_value(self, o):
"""Reference value of something follows same restriction rule as the underlying object."""
(f,) = o.ufl_operands
assert f._ufl_is_terminal_
g = self(f)
if isinstance(g, Restricted):
side = g.side()
return o(side)
else:
return o
# --- Rules for terminals
# Require handlers to be specified for all terminals
terminal = _missing_rule
multi_index = _ignore_restriction
label = _ignore_restriction
# Default: Literals should ignore restriction
constant_value = _ignore_restriction
constant = _ignore_restriction
# Even arguments with continuous elements such as Lagrange must be
# restricted to associate with the right part of the element
# matrix
argument = _require_restriction
# Defaults for geometric quantities
geometric_cell_quantity = _require_restriction
geometric_facet_quantity = _require_restriction
# Only a few geometric quantities are independent on the restriction:
facet_coordinate = _ignore_restriction
quadrature_weight = _ignore_restriction
# Assuming homogeoneous mesh
reference_cell_volume = _ignore_restriction
reference_facet_volume = _ignore_restriction
[docs] def coefficient(self, o):
"""Restrict a coefficient.
Allow coefficients to be unrestricted (apply default if so) if
the values are fully continuous across the facet.
"""
if o.ufl_element() in H1:
# If the coefficient _value_ is _fully_ continuous
# It must still be computed from one of the sides, we just don't care which
return self._default_restricted(o)
else:
return self._require_restriction(o)
[docs] def facet_normal(self, o):
"""Restrict a facet_normal."""
D = extract_unique_domain(o)
e = D.ufl_coordinate_element()
gd = D.geometric_dimension()
td = D.topological_dimension()
if e.embedded_superdegree <= 1 and e in H1 and gd == td:
# For meshes with a continuous linear non-manifold
# coordinate field, the facet normal from side - points in
# the opposite direction of the one from side +. We must
# still require a side to be chosen by the user but
# rewrite n- -> n+. This is an optimization, possibly
# premature, however it's more difficult to do at a later
# stage.
return self._opposite(o)
else:
# For other meshes, we require a side to be
# chosen by the user and respect that
return self._require_restriction(o)
[docs]def apply_restrictions(expression):
"""Propagate restriction nodes to wrap differential terminals directly."""
integral_types = [
k for k in integral_type_to_measure_name.keys() if k.startswith("interior_facet")
]
rules = RestrictionPropagator()
return map_integrand_dags(rules, expression, only_integral_type=integral_types)
[docs]class DefaultRestrictionApplier(MultiFunction):
"""Default restriction applier."""
def __init__(self, side=None):
"""Initialise."""
MultiFunction.__init__(self)
self.current_restriction = side
self.default_restriction = "+"
if self.current_restriction is None:
self._rp = {"+": DefaultRestrictionApplier("+"), "-": DefaultRestrictionApplier("-")}
[docs] def terminal(self, o):
"""Apply to terminal."""
# Most terminals are unchanged
return o
# Default: Operators should reconstruct only if subtrees are not touched
operator = MultiFunction.reuse_if_untouched
[docs] def restricted(self, o):
"""Apply to restricted."""
# Don't restrict twice
return o
[docs] def derivative(self, o):
"""Apply to derivative."""
# I don't think it's safe to just apply default restriction
# to the argument of any derivative, i.e. grad(cg1_function)
# is not continuous across cells even if cg1_function is.
return o
def _default_restricted(self, o):
"""Restrict a continuous quantity to default side if no current restriction is set."""
r = self.current_restriction
if r is None:
r = self.default_restriction
return o(r)
# These are the same from either side but to compute them
# cell (or facet) data from one side must be selected:
spatial_coordinate = _default_restricted
# Depends on cell only to get to the facet:
facet_jacobian = _default_restricted
facet_jacobian_determinant = _default_restricted
facet_jacobian_inverse = _default_restricted
# facet_tangents = _default_restricted
# facet_midpoint = _default_restricted
facet_area = _default_restricted
# facet_diameter = _default_restricted
min_facet_edge_length = _default_restricted
max_facet_edge_length = _default_restricted
facet_origin = _default_restricted # FIXME: Is this valid for quads?
[docs]def apply_default_restrictions(expression):
"""Some terminals can be restricted from either side.
This applies a default restriction to such terminals if unrestricted.
"""
integral_types = [
k for k in integral_type_to_measure_name.keys() if k.startswith("interior_facet")
]
rules = DefaultRestrictionApplier()
return map_integrand_dags(rules, expression, only_integral_type=integral_types)