Source code for ufl.algorithms.domain_analysis
"""Algorithms for building canonical data structure for integrals over subdomains."""
# Copyright (C) 2009-2016 Anders Logg and Martin Sandve Alnæs
#
# This file is part of UFL (https://www.fenicsproject.org)
#
# SPDX-License-Identifier: LGPL-3.0-or-later
import numbers
import typing
from collections import defaultdict
import ufl
from ufl.algorithms.coordinate_derivative_helpers import (
attach_coordinate_derivatives,
strip_coordinate_derivatives,
)
from ufl.algorithms.renumbering import renumber_indices
from ufl.form import Form
from ufl.integral import Integral
from ufl.protocols import id_or_none
from ufl.sorting import cmp_expr, sorted_expr
from ufl.utils.sorting import canonicalize_metadata, sorted_by_key
[docs]
class IntegralData(object):
"""Utility class.
This class has members (domain, integral_type, subdomain_id,
integrals, metadata), where metadata is an empty dictionary that may
be used for associating metadata with each object.
"""
__slots__ = (
"domain",
"integral_type",
"subdomain_id",
"integrals",
"metadata",
"integral_coefficients",
"enabled_coefficients",
)
def __init__(self, domain, integral_type, subdomain_id, integrals, metadata):
"""Initialise."""
if 1 != len(set(itg.ufl_domain() for itg in integrals)):
raise ValueError("Multiple domains mismatch in integral data.")
if not all(integral_type == itg.integral_type() for itg in integrals):
raise ValueError("Integral type mismatch in integral data.")
if not all(subdomain_id == itg.subdomain_id() for itg in integrals):
raise ValueError("Subdomain id mismatch in integral data.")
self.domain = domain
self.integral_type = integral_type
self.subdomain_id = subdomain_id
self.integrals = integrals
# This is populated in preprocess using data not available at
# this stage:
self.integral_coefficients = None
self.enabled_coefficients = None
# TODO: I think we can get rid of this with some refactoring
# in ffc:
self.metadata = metadata
def __lt__(self, other):
"""Check if self is less than other."""
# To preserve behaviour of extract_integral_data:
return (self.integral_type, self.subdomain_id, self.integrals, self.metadata) < (
other.integral_type,
other.subdomain_id,
other.integrals,
other.metadata,
)
def __eq__(self, other):
"""Check for equality."""
# Currently only used for tests:
return (
self.integral_type == other.integral_type
and self.subdomain_id == other.subdomain_id
and self.integrals == other.integrals
and self.metadata == other.metadata
)
def __str__(self):
"""Format as a string."""
s = f"IntegralData over domain({self.integral_type}, {self.subdomain_id})"
s += " with integrals:\n"
s += "\n\n".join(map(str, self.integrals))
s += "\nand metadata:\n{metadata}"
return s
[docs]
class ExprTupleKey(object):
"""Tuple comparison helper."""
__slots__ = ("x",)
def __init__(self, x):
"""Initialise."""
self.x = x
def __lt__(self, other):
"""Check if self is less than other."""
# Comparing expression first
c = cmp_expr(self.x[0], other.x[0])
if c < 0:
return True
elif c > 0:
return False
else:
# Comparing form compiler data
mds = canonicalize_metadata(self.x[1])
mdo = canonicalize_metadata(other.x[1])
return mds < mdo
[docs]
def group_integrals_by_domain_and_type(integrals, domains):
"""Group integrals by domain and type.
Args:
integrals: list of Integral objects
domains: list of AbstractDomain objects from the parent Form
Returns:
Dictionary mapping (domain, integral_type) to list(Integral)
"""
integrals_by_domain_and_type = defaultdict(list)
for itg in integrals:
if itg.ufl_domain() is None:
raise ValueError("Integral has no domain.")
key = (itg.ufl_domain(), itg.integral_type())
# Append integral to list of integrals with shared key
integrals_by_domain_and_type[key].append(itg)
return integrals_by_domain_and_type
[docs]
def integral_subdomain_ids(integral):
"""Get a tuple of integer subdomains or a valid string subdomain from integral."""
did = integral.subdomain_id()
if isinstance(did, numbers.Integral):
return (did,)
elif isinstance(did, tuple):
if not all(isinstance(d, numbers.Integral) for d in did):
raise ValueError("Expecting only integer subdomains in tuple.")
return did
elif did in ("everywhere", "otherwise"):
# TODO: Define list of valid strings somewhere more central
return did
else:
raise ValueError(f"Invalid domain id {did}.")
[docs]
def rearrange_integrals_by_single_subdomains(
integrals: typing.List[Integral], do_append_everywhere_integrals: bool
) -> typing.Dict[int, typing.List[Integral]]:
"""Rearrange integrals over multiple subdomains to single subdomain integrals.
Args:
integrals: List of integrals
do_append_everywhere_integrals: Boolean indicating if integrals
defined on the whole domain should
just be restricted to the set of input subdomain ids.
Returns:
The integrals reconstructed with single subdomain_id
"""
# Split integrals into lists of everywhere and subdomain integrals
everywhere_integrals = []
subdomain_integrals = []
for itg in integrals:
dids = integral_subdomain_ids(itg)
if dids == "otherwise":
raise ValueError("'otherwise' integrals should never occur before preprocessing.")
elif dids == "everywhere":
everywhere_integrals.append(itg)
else:
subdomain_integrals.append((dids, itg))
# Fill single_subdomain_integrals with lists of integrals from
# subdomain_integrals, but split and restricted to single
# subdomain ids
single_subdomain_integrals = defaultdict(list)
for dids, itg in subdomain_integrals:
# Region or single subdomain id
for did in dids:
# Restrict integral to this subdomain!
single_subdomain_integrals[did].append(itg.reconstruct(subdomain_id=did))
# Add everywhere integrals to each single subdomain id integral
# list
otherwise_integrals = []
for ev_itg in everywhere_integrals:
# Restrict everywhere integral to 'otherwise'
otherwise_integrals.append(ev_itg.reconstruct(subdomain_id="otherwise"))
# Restrict everywhere integral to each subdomain
# and append to each integral list
if do_append_everywhere_integrals:
for subdomain_id in sorted(single_subdomain_integrals.keys()):
single_subdomain_integrals[subdomain_id].append(
ev_itg.reconstruct(subdomain_id=subdomain_id)
)
if otherwise_integrals:
single_subdomain_integrals["otherwise"] = otherwise_integrals
return single_subdomain_integrals
[docs]
def accumulate_integrands_with_same_metadata(integrals):
"""Accumulate integrands with the same metedata.
Args:
integrals: a list of integrals
Returns:
A list of the form [(integrand0, metadata0), (integrand1,
metadata1), ...] where integrand0 < integrand1 by the canonical
ufl expression ordering criteria.
"""
# Group integrals by compiler data hash
by_cdid = {}
for itg in integrals:
cd = itg.metadata()
cdid = hash(canonicalize_metadata(cd))
if cdid not in by_cdid:
by_cdid[cdid] = ([], cd)
by_cdid[cdid][0].append(itg)
# Accumulate integrands separately for each compiler data object
# id
for cdid in by_cdid:
integrals, cd = by_cdid[cdid]
# Ensure canonical sorting of more than two integrands
integrands = sorted_expr((itg.integrand() for itg in integrals))
integrands_sum = sum(integrands[1:], integrands[0])
by_cdid[cdid] = (integrands_sum, cd)
# Sort integrands canonically by integrand first then compiler
# data
return sorted(by_cdid.values(), key=ExprTupleKey)
[docs]
def build_integral_data(integrals):
"""Build integral data given a list of integrals.
The integrals you pass in here must have been rearranged and
gathered (removing the "everywhere" subdomain_id). To do this, you
should call group_form_integrals.
Args:
integrals: An iterable of Integral objects.
Returns:
A tuple of IntegralData objects.
"""
itgs = defaultdict(list)
# --- Merge integral data that has the same integrals,
for integral in integrals:
integral_type = integral.integral_type()
ufl_domain = integral.ufl_domain()
subdomain_ids = integral.subdomain_id()
if "everywhere" in subdomain_ids:
raise ValueError(
"'everywhere' not a valid subdomain id. "
"Did you forget to call group_form_integrals?"
)
# Group for integral data (One integral data object for all
# integrals with same domain, itype, (but possibly different metadata).
itgs[(ufl_domain, integral_type, subdomain_ids)].append(integral)
# Build list with canonical ordering, iteration over dicts
# is not deterministic across python versions
def keyfunc(item):
(d, itype, sid), integrals = item
sid_int = tuple(-1 if i == "otherwise" else i for i in sid)
return (d._ufl_sort_key_(), itype, (type(sid).__name__,), sid_int)
integral_datas = []
for (d, itype, sid), integrals in sorted(itgs.items(), key=keyfunc):
integral_datas.append(IntegralData(d, itype, sid, integrals, {}))
return integral_datas
[docs]
def group_form_integrals(form, domains, do_append_everywhere_integrals=True):
"""Group integrals by domain and type, performing canonical simplification.
Args:
form: the Form to group the integrals of.
domains: an iterable of Domains.
do_append_everywhere_integrals: Boolean indicating if integrals
defined on the whole domain should
just be restricted to the set of input subdomain ids.
Returns:
A new Form with gathered integrands.
"""
# Group integrals by domain and type
integrals_by_domain_and_type = group_integrals_by_domain_and_type(form.integrals(), domains)
integrals = []
for domain in domains:
for integral_type in ufl.measure.integral_types():
# Get integrals with this domain and type
ddt_integrals = integrals_by_domain_and_type.get((domain, integral_type))
if ddt_integrals is None:
continue
# Group integrals by subdomain id, after splitting e.g.
# f*dx((1,2)) + g*dx((2,3)) -> f*dx(1) + (f+g)*dx(2) + g*dx(3)
# (note: before this call, 'everywhere' is a valid subdomain_id,
# and after this call, 'otherwise' is a valid subdomain_id)
single_subdomain_integrals = rearrange_integrals_by_single_subdomains(
ddt_integrals, do_append_everywhere_integrals
)
for subdomain_id, ss_integrals in sorted_by_key(single_subdomain_integrals):
# strip the coordinate derivatives from all integrals
# this yields a list of the form [(coordinate derivative, integral), ...]
stripped_integrals_and_coordderivs = strip_coordinate_derivatives(ss_integrals)
# now group the integrals by the coordinate derivative
def calc_hash(cd):
return sum(
sum(tuple_elem._ufl_compute_hash_() for tuple_elem in tuple_)
for tuple_ in cd
)
coordderiv_integrals_dict = {}
for integral, coordderiv in stripped_integrals_and_coordderivs:
coordderivhash = calc_hash(coordderiv)
if coordderivhash in coordderiv_integrals_dict:
coordderiv_integrals_dict[coordderivhash][1].append(integral)
else:
coordderiv_integrals_dict[coordderivhash] = (coordderiv, [integral])
# cd_integrals_dict is now a dict of the form
# { hash: (CoordinateDerivative, [integral, integral, ...]), ... }
# we can now put the integrals back together and then afterwards
# apply the CoordinateDerivative again
for cdhash, samecd_integrals in sorted_by_key(coordderiv_integrals_dict):
# Accumulate integrands of integrals that share the
# same compiler data
integrands_and_cds = accumulate_integrands_with_same_metadata(
samecd_integrals[1]
)
for integrand, metadata in integrands_and_cds:
integral = Integral(
integrand, integral_type, domain, subdomain_id, metadata, None
)
integral = attach_coordinate_derivatives(integral, samecd_integrals[0])
integrals.append(integral)
# Group integrals by common integrand
# u.dx(0)*dx(1) + u.dx(0)*dx(2) -> u.dx(0)*dx((1,2))
# to avoid duplicate kernels generated after geometry lowering
unique_integrals = defaultdict(tuple)
metadata_table = defaultdict(dict)
for integral in integrals:
integral_type = integral.integral_type()
ufl_domain = integral.ufl_domain()
metadata = integral.metadata()
meta_hash = hash(canonicalize_metadata(metadata))
subdomain_id = integral.subdomain_id()
subdomain_data = id_or_none(integral.subdomain_data())
integrand = renumber_indices(integral.integrand())
unique_integrals[(integral_type, ufl_domain, meta_hash, integrand, subdomain_data)] += (
subdomain_id,
)
metadata_table[(integral_type, ufl_domain, meta_hash, integrand, subdomain_data)] = metadata
grouped_integrals = []
for integral_data, subdomain_ids in unique_integrals.items():
(integral_type, ufl_domain, metadata, integrand, subdomain_data) = integral_data
integral = Integral(
integrand,
integral_type,
ufl_domain,
subdomain_ids,
metadata_table[integral_data],
subdomain_data,
)
grouped_integrals.append(integral)
return Form(grouped_integrals)
[docs]
def reconstruct_form_from_integral_data(integral_data):
"""Reconstruct a form from integral data."""
integrals = []
for ida in integral_data:
integrals.extend(ida.integrals)
return Form(integrals)