DOLFINx 0.8.0
DOLFINx C++ interface
Loading...
Searching...
No Matches
Scatterer.h
1// Copyright (C) 2022 Igor Baratta and Garth N. Wells
2//
3// This file is part of DOLFINx (https://www.fenicsproject.org)
4//
5// SPDX-License-Identifier: LGPL-3.0-or-later
6
7#pragma once
8
9#include "IndexMap.h"
10#include "MPI.h"
11#include "sort.h"
12#include <algorithm>
13#include <functional>
14#include <memory>
15#include <mpi.h>
16#include <numeric>
17#include <span>
18#include <type_traits>
19#include <vector>
20
21using namespace dolfinx;
22
23namespace dolfinx::common
24{
31template <class Allocator = std::allocator<std::int32_t>>
33{
34public:
36 using allocator_type = Allocator;
37
39 enum class type
40 {
41 neighbor, // use MPI neighborhood collectives
42 p2p // use MPI Isend/Irecv for communication
43 };
44
51 Scatterer(const IndexMap& map, int bs, const Allocator& alloc = Allocator())
52 : _bs(bs), _remote_inds(0, alloc), _local_inds(0, alloc),
53 _src(map.src().begin(), map.src().end()),
54 _dest(map.dest().begin(), map.dest().end())
55 {
56 if (dolfinx::MPI::size(map.comm()) == 1)
57 return;
58
59 // Check that src and dest ranks are unique and sorted
60 assert(std::is_sorted(_src.begin(), _src.end()));
61 assert(std::is_sorted(_dest.begin(), _dest.end()));
62
63 // Create communicators with directed edges:
64 // (0) owner -> ghost,
65 // (1) ghost -> owner
66 MPI_Comm comm0;
67 MPI_Dist_graph_create_adjacent(
68 map.comm(), _src.size(), _src.data(), MPI_UNWEIGHTED, _dest.size(),
69 _dest.data(), MPI_UNWEIGHTED, MPI_INFO_NULL, false, &comm0);
70 _comm0 = dolfinx::MPI::Comm(comm0, false);
71
72 MPI_Comm comm1;
73 MPI_Dist_graph_create_adjacent(
74 map.comm(), _dest.size(), _dest.data(), MPI_UNWEIGHTED, _src.size(),
75 _src.data(), MPI_UNWEIGHTED, MPI_INFO_NULL, false, &comm1);
76 _comm1 = dolfinx::MPI::Comm(comm1, false);
77
78 // Build permutation array that sorts ghost indices by owning rank
79 std::span owners = map.owners();
80 std::vector<std::int32_t> perm(owners.size());
81 std::iota(perm.begin(), perm.end(), 0);
82 dolfinx::argsort_radix<std::int32_t>(owners, perm);
83
84 // Sort (i) ghost indices and (ii) ghost index owners by rank
85 // (using perm array)
86 std::span ghosts = map.ghosts();
87 std::vector<int> owners_sorted(owners.size());
88 std::vector<std::int64_t> ghosts_sorted(owners.size());
89 std::transform(perm.begin(), perm.end(), owners_sorted.begin(),
90 [&owners](auto idx) { return owners[idx]; });
91 std::transform(perm.begin(), perm.end(), ghosts_sorted.begin(),
92 [&ghosts](auto idx) { return ghosts[idx]; });
93
94 // For data associated with ghost indices, packed by owning
95 // (neighbourhood) rank, compute sizes and displacements. I.e.,
96 // when sending ghost index data from this rank to the owning
97 // ranks, disp[i] is the first entry in the buffer sent to
98 // neighbourhood rank i, and disp[i + 1] - disp[i] is the number
99 // of values sent to rank i.
100 _sizes_remote.resize(_src.size(), 0);
101 _displs_remote.resize(_src.size() + 1, 0);
102 std::vector<std::int32_t>::iterator begin = owners_sorted.begin();
103 for (std::size_t i = 0; i < _src.size(); i++)
104 {
105 auto upper = std::upper_bound(begin, owners_sorted.end(), _src[i]);
106 int num_ind = std::distance(begin, upper);
107 _displs_remote[i + 1] = _displs_remote[i] + num_ind;
108 _sizes_remote[i] = num_ind;
109 begin = upper;
110 }
111
112 // For data associated with owned indices that are ghosted by
113 // other ranks, compute the size and displacement arrays. When
114 // sending data associated with ghost indices to the owner, these
115 // size and displacement arrays are for the receive buffer.
116
117 // Compute sizes and displacements of local data (how many local
118 // elements to be sent/received grouped by neighbors)
119 _sizes_local.resize(_dest.size());
120 _displs_local.resize(_sizes_local.size() + 1);
121 _sizes_remote.reserve(1);
122 _sizes_local.reserve(1);
123 MPI_Neighbor_alltoall(_sizes_remote.data(), 1, MPI_INT32_T,
124 _sizes_local.data(), 1, MPI_INT32_T, _comm1.comm());
125 std::partial_sum(_sizes_local.begin(), _sizes_local.end(),
126 std::next(_displs_local.begin()));
127
128 assert((std::int32_t)ghosts_sorted.size() == _displs_remote.back());
129 assert((std::int32_t)ghosts_sorted.size() == _displs_remote.back());
130
131 // Send ghost global indices to owning rank, and receive owned
132 // indices that are ghosts on other ranks
133 std::vector<std::int64_t> recv_buffer(_displs_local.back(), 0);
134 MPI_Neighbor_alltoallv(ghosts_sorted.data(), _sizes_remote.data(),
135 _displs_remote.data(), MPI_INT64_T,
136 recv_buffer.data(), _sizes_local.data(),
137 _displs_local.data(), MPI_INT64_T, _comm1.comm());
138
139 const std::array<std::int64_t, 2> range = map.local_range();
140#ifndef NDEBUG
141 // Check that all received indice are within the owned range
142 std::for_each(recv_buffer.begin(), recv_buffer.end(), [range](auto idx)
143 { assert(idx >= range[0] and idx < range[1]); });
144#endif
145
146 // Scale sizes and displacements by block size
147 {
148 auto rescale = [](auto& x, int bs)
149 {
150 std::transform(x.begin(), x.end(), x.begin(),
151 [bs](auto e) { return e *= bs; });
152 };
153 rescale(_sizes_local, bs);
154 rescale(_displs_local, bs);
155 rescale(_sizes_remote, bs);
156 rescale(_displs_remote, bs);
157 }
158
159 // Expand local indices using block size and convert it from
160 // global to local numbering
161 _local_inds = std::vector<std::int32_t, allocator_type>(
162 recv_buffer.size() * _bs, alloc);
163 std::int64_t offset = range[0] * _bs;
164 for (std::size_t i = 0; i < recv_buffer.size(); i++)
165 for (int j = 0; j < _bs; j++)
166 _local_inds[i * _bs + j] = (recv_buffer[i] * _bs + j) - offset;
167
168 // Expand remote indices using block size
169 _remote_inds
170 = std::vector<std::int32_t, allocator_type>(perm.size() * _bs, alloc);
171 for (std::size_t i = 0; i < perm.size(); i++)
172 for (int j = 0; j < _bs; j++)
173 _remote_inds[i * _bs + j] = perm[i] * _bs + j;
174 }
175
196 template <typename T>
197 void scatter_fwd_begin(std::span<const T> send_buffer,
198 std::span<T> recv_buffer,
199 std::span<MPI_Request> requests,
200 Scatterer::type type = type::neighbor) const
201 {
202 // Return early if there are no incoming or outgoing edges
203 if (_sizes_local.empty() and _sizes_remote.empty())
204 return;
205
206 switch (type)
207 {
208 case type::neighbor:
209 {
210 assert(requests.size() == std::size_t(1));
211 MPI_Ineighbor_alltoallv(
212 send_buffer.data(), _sizes_local.data(), _displs_local.data(),
213 dolfinx::MPI::mpi_type<T>(), recv_buffer.data(), _sizes_remote.data(),
214 _displs_remote.data(), dolfinx::MPI::mpi_type<T>(), _comm0.comm(),
215 requests.data());
216 break;
217 }
218 case type::p2p:
219 {
220 assert(requests.size() == _dest.size() + _src.size());
221 for (std::size_t i = 0; i < _src.size(); i++)
222 {
223 MPI_Irecv(recv_buffer.data() + _displs_remote[i], _sizes_remote[i],
224 dolfinx::MPI::mpi_type<T>(), _src[i], MPI_ANY_TAG,
225 _comm0.comm(), &requests[i]);
226 }
227
228 for (std::size_t i = 0; i < _dest.size(); i++)
229 {
230 MPI_Isend(send_buffer.data() + _displs_local[i], _sizes_local[i],
231 dolfinx::MPI::mpi_type<T>(), _dest[i], 0, _comm0.comm(),
232 &requests[i + _src.size()]);
233 }
234 break;
235 }
236 default:
237 throw std::runtime_error("Scatter::type not recognized");
238 }
239 }
240
249 void scatter_fwd_end(std::span<MPI_Request> requests) const
250 {
251 // Return early if there are no incoming or outgoing edges
252 if (_sizes_local.empty() and _sizes_remote.empty())
253 return;
254
255 // Wait for communication to complete
256 MPI_Waitall(requests.size(), requests.data(), MPI_STATUS_IGNORE);
257 }
258
279 template <typename T, typename F>
280 requires std::is_invocable_v<F, std::span<const T>,
281 std::span<const std::int32_t>, std::span<T>>
282 void scatter_fwd_begin(std::span<const T> local_data,
283 std::span<T> local_buffer, std::span<T> remote_buffer,
284 F pack_fn, std::span<MPI_Request> requests,
285 Scatterer::type type = type::neighbor) const
286 {
287 assert(local_buffer.size() == _local_inds.size());
288 assert(remote_buffer.size() == _remote_inds.size());
289 pack_fn(local_data, _local_inds, local_buffer);
290 scatter_fwd_begin(std::span<const T>(local_buffer), remote_buffer, requests,
291 type);
292 }
293
313 template <typename T, typename F>
314 requires std::is_invocable_v<F, std::span<const T>,
315 std::span<const std::int32_t>, std::span<T>,
316 std::function<T(T, T)>>
317 void scatter_fwd_end(std::span<const T> remote_buffer,
318 std::span<T> remote_data, F unpack_fn,
319 std::span<MPI_Request> requests) const
320 {
321 assert(remote_buffer.size() == _remote_inds.size());
322 assert(remote_data.size() == _remote_inds.size());
323 scatter_fwd_end(requests);
324 unpack_fn(remote_buffer, _remote_inds, remote_data,
325 [](T /*a*/, T b) { return b; });
326 }
327
339 template <typename T>
340 void scatter_fwd(std::span<const T> local_data,
341 std::span<T> remote_data) const
342 {
343 std::vector<MPI_Request> requests(1, MPI_REQUEST_NULL);
344 std::vector<T> local_buffer(local_buffer_size(), 0);
345 std::vector<T> remote_buffer(remote_buffer_size(), 0);
346 auto pack_fn = [](auto&& in, auto&& idx, auto&& out)
347 {
348 for (std::size_t i = 0; i < idx.size(); ++i)
349 out[i] = in[idx[i]];
350 };
351 scatter_fwd_begin(local_data, std::span<T>(local_buffer),
352 std::span<T>(remote_buffer), pack_fn,
353 std::span<MPI_Request>(requests));
354
355 auto unpack_fn = [](auto&& in, auto&& idx, auto&& out, auto op)
356 {
357 for (std::size_t i = 0; i < idx.size(); ++i)
358 out[idx[i]] = op(out[idx[i]], in[i]);
359 };
360
361 scatter_fwd_end(std::span<const T>(remote_buffer), remote_data, unpack_fn,
362 std::span<MPI_Request>(requests));
363 }
364
391 template <typename T>
392 void scatter_rev_begin(std::span<const T> send_buffer,
393 std::span<T> recv_buffer,
394 std::span<MPI_Request> requests,
395 Scatterer::type type = type::neighbor) const
396 {
397 // Return early if there are no incoming or outgoing edges
398 if (_sizes_local.empty() and _sizes_remote.empty())
399 return;
400
401 // // Send and receive data
402
403 switch (type)
404 {
405 case type::neighbor:
406 {
407 assert(requests.size() == 1);
408 MPI_Ineighbor_alltoallv(send_buffer.data(), _sizes_remote.data(),
409 _displs_remote.data(), MPI::mpi_type<T>(),
410 recv_buffer.data(), _sizes_local.data(),
411 _displs_local.data(), MPI::mpi_type<T>(),
412 _comm1.comm(), &requests[0]);
413 break;
414 }
415 case type::p2p:
416 {
417 assert(requests.size() == _dest.size() + _src.size());
418 // Start non-blocking send from this process to ghost owners.
419 for (std::size_t i = 0; i < _dest.size(); i++)
420 {
421 MPI_Irecv(recv_buffer.data() + _displs_local[i], _sizes_local[i],
422 dolfinx::MPI::mpi_type<T>(), _dest[i], MPI_ANY_TAG,
423 _comm0.comm(), &requests[i]);
424 }
425
426 // Start non-blocking receive from neighbor process for which an owned
427 // index is a ghost.
428 for (std::size_t i = 0; i < _src.size(); i++)
429 {
430 MPI_Isend(send_buffer.data() + _displs_remote[i], _sizes_remote[i],
431 dolfinx::MPI::mpi_type<T>(), _src[i], 0, _comm0.comm(),
432 &requests[i + _dest.size()]);
433 }
434 break;
435 }
436 default:
437 throw std::runtime_error("Scatter::type not recognized");
438 }
439 }
440
449 void scatter_rev_end(std::span<MPI_Request> request) const
450 {
451 // Return early if there are no incoming or outgoing edges
452 if (_sizes_local.empty() and _sizes_remote.empty())
453 return;
454
455 // Wait for communication to complete
456 MPI_Waitall(request.size(), request.data(), MPI_STATUS_IGNORE);
457 }
458
483 template <typename T, typename F>
484 requires std::is_invocable_v<F, std::span<const T>,
485 std::span<const std::int32_t>, std::span<T>>
486 void scatter_rev_begin(std::span<const T> remote_data,
487 std::span<T> remote_buffer, std::span<T> local_buffer,
488 F pack_fn, std::span<MPI_Request> request,
489 Scatterer::type type = type::neighbor) const
490 {
491 assert(local_buffer.size() == _local_inds.size());
492 assert(remote_buffer.size() == _remote_inds.size());
493 pack_fn(remote_data, _remote_inds, remote_buffer);
494 scatter_rev_begin(std::span<const T>(remote_buffer), local_buffer, request,
495 type);
496 }
497
517 template <typename T, typename F, typename BinaryOp>
518 requires std::is_invocable_v<F, std::span<const T>,
519 std::span<const std::int32_t>, std::span<T>,
520 BinaryOp>
521 and std::is_invocable_r_v<T, BinaryOp, T, T>
522 void scatter_rev_end(std::span<const T> local_buffer, std::span<T> local_data,
523 F unpack_fn, BinaryOp op, std::span<MPI_Request> request)
524 {
525 assert(local_buffer.size() == _local_inds.size());
526 if (!_local_inds.empty())
527 {
528 assert(*std::max_element(_local_inds.begin(), _local_inds.end())
529 < std::int32_t(local_data.size()));
530 }
531 scatter_rev_end(request);
532 unpack_fn(local_buffer, _local_inds, local_data, op);
533 }
534
537 template <typename T, typename BinaryOp>
538 void scatter_rev(std::span<T> local_data, std::span<const T> remote_data,
539 BinaryOp op)
540 {
541 std::vector<T> local_buffer(local_buffer_size(), 0);
542 std::vector<T> remote_buffer(remote_buffer_size(), 0);
543 auto pack_fn = [](auto&& in, auto&& idx, auto&& out)
544 {
545 for (std::size_t i = 0; i < idx.size(); ++i)
546 out[i] = in[idx[i]];
547 };
548 auto unpack_fn = [](auto&& in, auto&& idx, auto&& out, auto op)
549 {
550 for (std::size_t i = 0; i < idx.size(); ++i)
551 out[idx[i]] = op(out[idx[i]], in[i]);
552 };
553 std::vector<MPI_Request> request(1, MPI_REQUEST_NULL);
554 scatter_rev_begin(remote_data, std::span<T>(remote_buffer),
555 std::span<T>(local_buffer), pack_fn,
556 std::span<MPI_Request>(request));
557 scatter_rev_end(std::span<const T>(local_buffer), local_data, unpack_fn, op,
558 std::span<MPI_Request>(request));
559 }
560
564 std::int32_t local_buffer_size() const noexcept { return _local_inds.size(); }
565
569 std::int32_t remote_buffer_size() const noexcept
570 {
571 return _remote_inds.size();
572 }
573
577 const std::vector<std::int32_t>& local_indices() const noexcept
578 {
579 return _local_inds;
580 }
581
584 const std::vector<std::int32_t>& remote_indices() const noexcept
585 {
586 return _remote_inds;
587 }
588
592 int bs() const noexcept { return _bs; }
593
597 = type::neighbor)
598 {
599 std::vector<MPI_Request> requests;
600 switch (type)
601 {
602 case type::neighbor:
603 requests = {MPI_REQUEST_NULL};
604 break;
605 case type::p2p:
606 requests.resize(_dest.size() + _src.size(), MPI_REQUEST_NULL);
607 break;
608 default:
609 throw std::runtime_error("Scatter::type not recognized");
610 }
611 return requests;
612 }
613
614private:
615 // Block size
616 int _bs;
617
618 // Communicator where the source ranks own the indices in the callers
619 // halo, and the destination ranks 'ghost' indices owned by the
620 // caller. I.e.,
621 // - in-edges (src) are from ranks that own my ghosts
622 // - out-edges (dest) go to ranks that 'ghost' my owned indices
623 dolfinx::MPI::Comm _comm0{MPI_COMM_NULL};
624
625 // Communicator where the source ranks have ghost indices that are
626 // owned by the caller, and the destination ranks are the owners of
627 // indices in the callers halo region. I.e.,
628 // - in-edges (src) are from ranks that 'ghost' my owned indices
629 // - out-edges (dest) are to the owning ranks of my ghost indices
630 dolfinx::MPI::Comm _comm1{MPI_COMM_NULL};
631
632 // Permutation indices used to pack and unpack ghost data (remote)
633 std::vector<std::int32_t, allocator_type> _remote_inds;
634
635 // Number of remote indices (ghosts) for each neighbor process
636 std::vector<int> _sizes_remote;
637
638 // Displacements of remote data for mpi scatter and gather
639 std::vector<int> _displs_remote;
640
641 // Permutation indices used to pack and unpack local shared data
642 // (owned indices that are shared with other processes). Indices are
643 // grouped by neighbor process.
644 std::vector<std::int32_t, allocator_type> _local_inds;
645
646 // Number of local shared indices per neighbor process
647 std::vector<int> _sizes_local;
648
649 // Displacements of local data for mpi scatter and gather
650 std::vector<int> _displs_local;
651
652 // Set of ranks that own ghosts
653 // FIXME: Should we store the index map instead?
654 std::vector<int> _src;
655
656 // Set of ranks ghost owned indices
657 // FIXME: Should we store the index map instead?
658 std::vector<int> _dest;
659};
660} // namespace dolfinx::common
A duplicate MPI communicator and manage lifetime of the communicator.
Definition MPI.h:43
Definition IndexMap.h:94
std::span< const int > owners() const
The ranks that own each ghost index.
Definition IndexMap.h:205
std::array< std::int64_t, 2 > local_range() const noexcept
Range of indices (global) owned by this process.
Definition IndexMap.cpp:861
std::span< const std::int64_t > ghosts() const noexcept
Definition IndexMap.cpp:875
MPI_Comm comm() const
Return the MPI communicator that the map is defined on.
Definition IndexMap.cpp:940
A Scatterer supports the MPI scattering and gathering of data that is associated with a common::Index...
Definition Scatterer.h:33
std::vector< MPI_Request > create_request_vector(Scatterer::type type=type::neighbor)
Create a vector of MPI_Requests for a given Scatterer::type.
Definition Scatterer.h:596
Allocator allocator_type
The allocator type.
Definition Scatterer.h:36
void scatter_fwd_end(std::span< MPI_Request > requests) const
Complete a non-blocking send from the local owner to process ranks that have the index as a ghost.
Definition Scatterer.h:249
void scatter_fwd(std::span< const T > local_data, std::span< T > remote_data) const
Scatter data associated with owned indices to ghosting ranks.
Definition Scatterer.h:340
Scatterer(const IndexMap &map, int bs, const Allocator &alloc=Allocator())
Create a scatterer.
Definition Scatterer.h:51
void scatter_rev_end(std::span< const T > local_buffer, std::span< T > local_data, F unpack_fn, BinaryOp op, std::span< MPI_Request > request)
End the reverse scatter communication, and unpack the received local buffer into local data.
Definition Scatterer.h:522
std::int32_t remote_buffer_size() const noexcept
Buffer size for remote data (ghosts) used in forward and reverse communication.
Definition Scatterer.h:569
std::int32_t local_buffer_size() const noexcept
Size of buffer for local data (owned and shared) used in forward and reverse communication.
Definition Scatterer.h:564
void scatter_fwd_end(std::span< const T > remote_buffer, std::span< T > remote_data, F unpack_fn, std::span< MPI_Request > requests) const
Complete a non-blocking send from the local owner to process ranks that have the index as a ghost,...
Definition Scatterer.h:317
void scatter_rev_begin(std::span< const T > remote_data, std::span< T > remote_buffer, std::span< T > local_buffer, F pack_fn, std::span< MPI_Request > request, Scatterer::type type=type::neighbor) const
Scatter data associated with ghost indices to owning ranks.
Definition Scatterer.h:486
type
Types of MPI communication pattern used by the Scatterer.
Definition Scatterer.h:40
void scatter_rev_end(std::span< MPI_Request > request) const
End the reverse scatter communication.
Definition Scatterer.h:449
void scatter_fwd_begin(std::span< const T > send_buffer, std::span< T > recv_buffer, std::span< MPI_Request > requests, Scatterer::type type=type::neighbor) const
Start a non-blocking send of owned data to ranks that ghost the data.
Definition Scatterer.h:197
const std::vector< std::int32_t > & local_indices() const noexcept
Definition Scatterer.h:577
const std::vector< std::int32_t > & remote_indices() const noexcept
Definition Scatterer.h:584
void scatter_rev(std::span< T > local_data, std::span< const T > remote_data, BinaryOp op)
Scatter data associated with ghost indices to ranks that own the indices.
Definition Scatterer.h:538
int bs() const noexcept
The number values (block size) to send per index in the common::IndexMap use to create the scatterer.
Definition Scatterer.h:592
void scatter_fwd_begin(std::span< const T > local_data, std::span< T > local_buffer, std::span< T > remote_buffer, F pack_fn, std::span< MPI_Request > requests, Scatterer::type type=type::neighbor) const
Scatter data associated with owned indices to ghosting ranks.
Definition Scatterer.h:282
void scatter_rev_begin(std::span< const T > send_buffer, std::span< T > recv_buffer, std::span< MPI_Request > requests, Scatterer::type type=type::neighbor) const
Start a non-blocking send of ghost data to ranks that own the data.
Definition Scatterer.h:392
int size(MPI_Comm comm)
Definition MPI.cpp:72
Miscellaneous classes, functions and types.
Definition dolfinx_common.h:8
Top-level namespace.
Definition defines.h:12