DOLFINx 0.10.0.0
DOLFINx C++ interface
Loading...
Searching...
No Matches
Scatterer.h
1// Copyright (C) 2022 Igor Baratta and Garth N. Wells
2//
3// This file is part of DOLFINx (https://www.fenicsproject.org)
4//
5// SPDX-License-Identifier: LGPL-3.0-or-later
6
7#pragma once
8
9#include "IndexMap.h"
10#include "MPI.h"
11#include "sort.h"
12#include <algorithm>
13#include <functional>
14#include <memory>
15#include <mpi.h>
16#include <numeric>
17#include <span>
18#include <type_traits>
19#include <vector>
20
21namespace dolfinx::common
22{
29template <class Allocator = std::allocator<std::int32_t>>
31{
32public:
34 using allocator_type = Allocator;
35
37 enum class type
38 {
39 neighbor, // use MPI neighborhood collectives
40 p2p // use MPI Isend/Irecv for communication
41 };
42
49 Scatterer(const IndexMap& map, int bs, const Allocator& alloc = Allocator())
50 : _bs(bs), _remote_inds(0, alloc), _local_inds(0, alloc),
51 _src(map.src().begin(), map.src().end()),
52 _dest(map.dest().begin(), map.dest().end())
53 {
54 if (dolfinx::MPI::size(map.comm()) == 1)
55 return;
56
57 // Check that src and dest ranks are unique and sorted
58 assert(std::ranges::is_sorted(_src));
59 assert(std::ranges::is_sorted(_dest));
60
61 // Create communicators with directed edges:
62 // (0) owner -> ghost,
63 // (1) ghost -> owner
64 MPI_Comm comm0;
65 MPI_Dist_graph_create_adjacent(
66 map.comm(), _src.size(), _src.data(), MPI_UNWEIGHTED, _dest.size(),
67 _dest.data(), MPI_UNWEIGHTED, MPI_INFO_NULL, false, &comm0);
68 _comm0 = dolfinx::MPI::Comm(comm0, false);
69
70 MPI_Comm comm1;
71 MPI_Dist_graph_create_adjacent(
72 map.comm(), _dest.size(), _dest.data(), MPI_UNWEIGHTED, _src.size(),
73 _src.data(), MPI_UNWEIGHTED, MPI_INFO_NULL, false, &comm1);
74 _comm1 = dolfinx::MPI::Comm(comm1, false);
75
76 // Build permutation array that sorts ghost indices by owning rank
77 std::span owners = map.owners();
78 std::vector<std::int32_t> perm(owners.size());
79 std::iota(perm.begin(), perm.end(), 0);
80 dolfinx::radix_sort(perm, [&owners](auto index) { return owners[index]; });
81
82 // Sort (i) ghost indices and (ii) ghost index owners by rank
83 // (using perm array)
84 std::span ghosts = map.ghosts();
85 std::vector<int> owners_sorted(owners.size());
86 std::vector<std::int64_t> ghosts_sorted(owners.size());
87 std::ranges::transform(perm, owners_sorted.begin(),
88 [&owners](auto idx) { return owners[idx]; });
89 std::ranges::transform(perm, ghosts_sorted.begin(),
90 [&ghosts](auto idx) { return ghosts[idx]; });
91
92 // For data associated with ghost indices, packed by owning
93 // (neighbourhood) rank, compute sizes and displacements. I.e.,
94 // when sending ghost index data from this rank to the owning
95 // ranks, disp[i] is the first entry in the buffer sent to
96 // neighbourhood rank i, and disp[i + 1] - disp[i] is the number
97 // of values sent to rank i.
98 _sizes_remote.resize(_src.size(), 0);
99 _displs_remote.resize(_src.size() + 1, 0);
100 std::vector<std::int32_t>::iterator begin = owners_sorted.begin();
101 for (std::size_t i = 0; i < _src.size(); i++)
102 {
103 auto upper = std::upper_bound(begin, owners_sorted.end(), _src[i]);
104 int num_ind = std::distance(begin, upper);
105 _displs_remote[i + 1] = _displs_remote[i] + num_ind;
106 _sizes_remote[i] = num_ind;
107 begin = upper;
108 }
109
110 // For data associated with owned indices that are ghosted by
111 // other ranks, compute the size and displacement arrays. When
112 // sending data associated with ghost indices to the owner, these
113 // size and displacement arrays are for the receive buffer.
114
115 // Compute sizes and displacements of local data (how many local
116 // elements to be sent/received grouped by neighbors)
117 _sizes_local.resize(_dest.size());
118 _displs_local.resize(_sizes_local.size() + 1);
119 _sizes_remote.reserve(1);
120 _sizes_local.reserve(1);
121 MPI_Neighbor_alltoall(_sizes_remote.data(), 1, MPI_INT32_T,
122 _sizes_local.data(), 1, MPI_INT32_T, _comm1.comm());
123 std::partial_sum(_sizes_local.begin(), _sizes_local.end(),
124 std::next(_displs_local.begin()));
125
126 assert((std::int32_t)ghosts_sorted.size() == _displs_remote.back());
127 assert((std::int32_t)ghosts_sorted.size() == _displs_remote.back());
128
129 // Send ghost global indices to owning rank, and receive owned
130 // indices that are ghosts on other ranks
131 std::vector<std::int64_t> recv_buffer(_displs_local.back(), 0);
132 MPI_Neighbor_alltoallv(ghosts_sorted.data(), _sizes_remote.data(),
133 _displs_remote.data(), MPI_INT64_T,
134 recv_buffer.data(), _sizes_local.data(),
135 _displs_local.data(), MPI_INT64_T, _comm1.comm());
136
137 const std::array<std::int64_t, 2> range = map.local_range();
138#ifndef NDEBUG
139 // Check that all received indice are within the owned range
140 std::ranges::for_each(recv_buffer, [range](auto idx)
141 { assert(idx >= range[0] and idx < range[1]); });
142#endif
143
144 // Scale sizes and displacements by block size
145 {
146 auto rescale = [](auto& x, int bs)
147 {
148 std::ranges::transform(x, x.begin(), [bs](auto e) { return e *= bs; });
149 };
150 rescale(_sizes_local, bs);
151 rescale(_displs_local, bs);
152 rescale(_sizes_remote, bs);
153 rescale(_displs_remote, bs);
154 }
155
156 // Expand local indices using block size and convert it from
157 // global to local numbering
158 _local_inds = std::vector<std::int32_t, allocator_type>(
159 recv_buffer.size() * _bs, alloc);
160 std::int64_t offset = range[0] * _bs;
161 for (std::size_t i = 0; i < recv_buffer.size(); i++)
162 for (int j = 0; j < _bs; j++)
163 _local_inds[i * _bs + j] = (recv_buffer[i] * _bs + j) - offset;
164
165 // Expand remote indices using block size
166 _remote_inds
167 = std::vector<std::int32_t, allocator_type>(perm.size() * _bs, alloc);
168 for (std::size_t i = 0; i < perm.size(); i++)
169 for (int j = 0; j < _bs; j++)
170 _remote_inds[i * _bs + j] = perm[i] * _bs + j;
171 }
172
193 template <typename T>
194 void scatter_fwd_begin(std::span<const T> send_buffer,
195 std::span<T> recv_buffer,
196 std::span<MPI_Request> requests,
197 Scatterer::type type = type::neighbor) const
198 {
199 // Return early if there are no incoming or outgoing edges
200 if (_sizes_local.empty() and _sizes_remote.empty())
201 return;
202
203 switch (type)
204 {
205 case type::neighbor:
206 {
207 assert(requests.size() == std::size_t(1));
208 MPI_Ineighbor_alltoallv(send_buffer.data(), _sizes_local.data(),
209 _displs_local.data(), dolfinx::MPI::mpi_t<T>,
210 recv_buffer.data(), _sizes_remote.data(),
211 _displs_remote.data(), dolfinx::MPI::mpi_t<T>,
212 _comm0.comm(), requests.data());
213 break;
214 }
215 case type::p2p:
216 {
217 assert(requests.size() == _dest.size() + _src.size());
218 for (std::size_t i = 0; i < _src.size(); i++)
219 {
220 MPI_Irecv(recv_buffer.data() + _displs_remote[i], _sizes_remote[i],
221 dolfinx::MPI::mpi_t<T>, _src[i], MPI_ANY_TAG, _comm0.comm(),
222 &requests[i]);
223 }
224
225 for (std::size_t i = 0; i < _dest.size(); i++)
226 {
227 MPI_Isend(send_buffer.data() + _displs_local[i], _sizes_local[i],
228 dolfinx::MPI::mpi_t<T>, _dest[i], 0, _comm0.comm(),
229 &requests[i + _src.size()]);
230 }
231 break;
232 }
233 default:
234 throw std::runtime_error("Scatter::type not recognized");
235 }
236 }
237
246 void scatter_fwd_end(std::span<MPI_Request> requests) const
247 {
248 // Return early if there are no incoming or outgoing edges
249 if (_sizes_local.empty() and _sizes_remote.empty())
250 return;
251
252 // Wait for communication to complete
253 MPI_Waitall(requests.size(), requests.data(), MPI_STATUS_IGNORE);
254 }
255
276 template <typename T, typename F>
277 requires std::is_invocable_v<F, std::span<const T>,
278 std::span<const std::int32_t>, std::span<T>>
279 void scatter_fwd_begin(std::span<const T> local_data,
280 std::span<T> local_buffer, std::span<T> remote_buffer,
281 F pack_fn, std::span<MPI_Request> requests,
282 Scatterer::type type = type::neighbor) const
283 {
284 assert(local_buffer.size() == _local_inds.size());
285 assert(remote_buffer.size() == _remote_inds.size());
286 pack_fn(local_data, _local_inds, local_buffer);
287 scatter_fwd_begin(std::span<const T>(local_buffer), remote_buffer, requests,
288 type);
289 }
290
310 template <typename T, typename F>
311 requires std::is_invocable_v<F, std::span<const T>,
312 std::span<const std::int32_t>, std::span<T>,
313 std::function<T(T, T)>>
314 void scatter_fwd_end(std::span<const T> remote_buffer,
315 std::span<T> remote_data, F unpack_fn,
316 std::span<MPI_Request> requests) const
317 {
318 assert(remote_buffer.size() == _remote_inds.size());
319 assert(remote_data.size() == _remote_inds.size());
320 scatter_fwd_end(requests);
321 unpack_fn(remote_buffer, _remote_inds, remote_data,
322 [](T /*a*/, T b) { return b; });
323 }
324
336 template <typename T>
337 void scatter_fwd(std::span<const T> local_data,
338 std::span<T> remote_data) const
339 {
340 std::vector<MPI_Request> requests(1, MPI_REQUEST_NULL);
341 std::vector<T> local_buffer(local_buffer_size(), 0);
342 std::vector<T> remote_buffer(remote_buffer_size(), 0);
343 auto pack_fn = [](auto&& in, auto&& idx, auto&& out)
344 {
345 for (std::size_t i = 0; i < idx.size(); ++i)
346 out[i] = in[idx[i]];
347 };
348 scatter_fwd_begin(local_data, std::span<T>(local_buffer),
349 std::span<T>(remote_buffer), pack_fn,
350 std::span<MPI_Request>(requests));
351
352 auto unpack_fn = [](auto&& in, auto&& idx, auto&& out, auto op)
353 {
354 for (std::size_t i = 0; i < idx.size(); ++i)
355 out[idx[i]] = op(out[idx[i]], in[i]);
356 };
357
358 scatter_fwd_end(std::span<const T>(remote_buffer), remote_data, unpack_fn,
359 std::span<MPI_Request>(requests));
360 }
361
388 template <typename T>
389 void scatter_rev_begin(std::span<const T> send_buffer,
390 std::span<T> recv_buffer,
391 std::span<MPI_Request> requests,
392 Scatterer::type type = type::neighbor) const
393 {
394 // Return early if there are no incoming or outgoing edges
395 if (_sizes_local.empty() and _sizes_remote.empty())
396 return;
397
398 // // Send and receive data
399
400 switch (type)
401 {
402 case type::neighbor:
403 {
404 assert(requests.size() == 1);
405 MPI_Ineighbor_alltoallv(
406 send_buffer.data(), _sizes_remote.data(), _displs_remote.data(),
407 MPI::mpi_t<T>, recv_buffer.data(), _sizes_local.data(),
408 _displs_local.data(), MPI::mpi_t<T>, _comm1.comm(), &requests[0]);
409 break;
410 }
411 case type::p2p:
412 {
413 assert(requests.size() == _dest.size() + _src.size());
414 // Start non-blocking send from this process to ghost owners.
415 for (std::size_t i = 0; i < _dest.size(); i++)
416 {
417 MPI_Irecv(recv_buffer.data() + _displs_local[i], _sizes_local[i],
418 dolfinx::MPI::mpi_t<T>, _dest[i], MPI_ANY_TAG, _comm0.comm(),
419 &requests[i]);
420 }
421
422 // Start non-blocking receive from neighbor process for which an owned
423 // index is a ghost.
424 for (std::size_t i = 0; i < _src.size(); i++)
425 {
426 MPI_Isend(send_buffer.data() + _displs_remote[i], _sizes_remote[i],
427 dolfinx::MPI::mpi_t<T>, _src[i], 0, _comm0.comm(),
428 &requests[i + _dest.size()]);
429 }
430 break;
431 }
432 default:
433 throw std::runtime_error("Scatter::type not recognized");
434 }
435 }
436
445 void scatter_rev_end(std::span<MPI_Request> request) const
446 {
447 // Return early if there are no incoming or outgoing edges
448 if (_sizes_local.empty() and _sizes_remote.empty())
449 return;
450
451 // Wait for communication to complete
452 MPI_Waitall(request.size(), request.data(), MPI_STATUS_IGNORE);
453 }
454
479 template <typename T, typename F>
480 requires std::is_invocable_v<F, std::span<const T>,
481 std::span<const std::int32_t>, std::span<T>>
482 void scatter_rev_begin(std::span<const T> remote_data,
483 std::span<T> remote_buffer, std::span<T> local_buffer,
484 F pack_fn, std::span<MPI_Request> request,
485 Scatterer::type type = type::neighbor) const
486 {
487 assert(local_buffer.size() == _local_inds.size());
488 assert(remote_buffer.size() == _remote_inds.size());
489 pack_fn(remote_data, _remote_inds, remote_buffer);
490 scatter_rev_begin(std::span<const T>(remote_buffer), local_buffer, request,
491 type);
492 }
493
514 template <typename T, typename F, typename BinaryOp>
515 requires std::is_invocable_v<F, std::span<const T>,
516 std::span<const std::int32_t>, std::span<T>,
517 BinaryOp>
518 and std::is_invocable_r_v<T, BinaryOp, T, T>
519 void scatter_rev_end(std::span<const T> local_buffer, std::span<T> local_data,
520 F unpack_fn, BinaryOp op, std::span<MPI_Request> request)
521 {
522 assert(local_buffer.size() == _local_inds.size());
523 if (!_local_inds.empty())
524 {
525 assert(*std::ranges::max_element(_local_inds)
526 < std::int32_t(local_data.size()));
527 }
528 scatter_rev_end(request);
529 unpack_fn(local_buffer, _local_inds, local_data, op);
530 }
531
534 template <typename T, typename BinaryOp>
535 void scatter_rev(std::span<T> local_data, std::span<const T> remote_data,
536 BinaryOp op)
537 {
538 std::vector<T> local_buffer(local_buffer_size(), 0);
539 std::vector<T> remote_buffer(remote_buffer_size(), 0);
540 auto pack_fn = [](auto&& in, auto&& idx, auto&& out)
541 {
542 for (std::size_t i = 0; i < idx.size(); ++i)
543 out[i] = in[idx[i]];
544 };
545 auto unpack_fn = [](auto&& in, auto&& idx, auto&& out, auto op)
546 {
547 for (std::size_t i = 0; i < idx.size(); ++i)
548 out[idx[i]] = op(out[idx[i]], in[i]);
549 };
550 std::vector<MPI_Request> request(1, MPI_REQUEST_NULL);
551 scatter_rev_begin(remote_data, std::span<T>(remote_buffer),
552 std::span<T>(local_buffer), pack_fn,
553 std::span<MPI_Request>(request));
554 scatter_rev_end(std::span<const T>(local_buffer), local_data, unpack_fn, op,
555 std::span<MPI_Request>(request));
556 }
557
561 std::int32_t local_buffer_size() const noexcept { return _local_inds.size(); }
562
566 std::int32_t remote_buffer_size() const noexcept
567 {
568 return _remote_inds.size();
569 }
570
574 const std::vector<std::int32_t>& local_indices() const noexcept
575 {
576 return _local_inds;
577 }
578
581 const std::vector<std::int32_t>& remote_indices() const noexcept
582 {
583 return _remote_inds;
584 }
585
589 int bs() const noexcept { return _bs; }
590
594 = type::neighbor)
595 {
596 std::vector<MPI_Request> requests;
597 switch (type)
598 {
599 case type::neighbor:
600 requests = {MPI_REQUEST_NULL};
601 break;
602 case type::p2p:
603 requests.resize(_dest.size() + _src.size(), MPI_REQUEST_NULL);
604 break;
605 default:
606 throw std::runtime_error("Scatter::type not recognized");
607 }
608 return requests;
609 }
610
611private:
612 // Block size
613 int _bs;
614
615 // Communicator where the source ranks own the indices in the callers
616 // halo, and the destination ranks 'ghost' indices owned by the
617 // caller. I.e.,
618 // - in-edges (src) are from ranks that own my ghosts
619 // - out-edges (dest) go to ranks that 'ghost' my owned indices
620 dolfinx::MPI::Comm _comm0{MPI_COMM_NULL};
621
622 // Communicator where the source ranks have ghost indices that are
623 // owned by the caller, and the destination ranks are the owners of
624 // indices in the callers halo region. I.e.,
625 // - in-edges (src) are from ranks that 'ghost' my owned indices
626 // - out-edges (dest) are to the owning ranks of my ghost indices
627 dolfinx::MPI::Comm _comm1{MPI_COMM_NULL};
628
629 // Permutation indices used to pack and unpack ghost data (remote)
630 std::vector<std::int32_t, allocator_type> _remote_inds;
631
632 // Number of remote indices (ghosts) for each neighbor process
633 std::vector<int> _sizes_remote;
634
635 // Displacements of remote data for mpi scatter and gather
636 std::vector<int> _displs_remote;
637
638 // Permutation indices used to pack and unpack local shared data
639 // (owned indices that are shared with other processes). Indices are
640 // grouped by neighbor process.
641 std::vector<std::int32_t, allocator_type> _local_inds;
642
643 // Number of local shared indices per neighbor process
644 std::vector<int> _sizes_local;
645
646 // Displacements of local data for mpi scatter and gather
647 std::vector<int> _displs_local;
648
649 // Set of ranks that own ghosts
650 // FIXME: Should we store the index map instead?
651 std::vector<int> _src;
652
653 // Set of ranks ghost owned indices
654 // FIXME: Should we store the index map instead?
655 std::vector<int> _dest;
656};
657} // namespace dolfinx::common
A duplicate MPI communicator and manage lifetime of the communicator.
Definition MPI.h:43
Definition IndexMap.h:94
std::span< const int > owners() const
The ranks that own each ghost index.
Definition IndexMap.h:218
std::array< std::int64_t, 2 > local_range() const noexcept
Range of indices (global) owned by this process.
Definition IndexMap.cpp:926
std::span< const std::int64_t > ghosts() const noexcept
Definition IndexMap.cpp:940
MPI_Comm comm() const
Return the MPI communicator that the map is defined on.
Definition IndexMap.cpp:1005
std::vector< MPI_Request > create_request_vector(Scatterer::type type=type::neighbor)
Create a vector of MPI_Requests for a given Scatterer::type.
Definition Scatterer.h:593
Allocator allocator_type
The allocator type.
Definition Scatterer.h:34
void scatter_fwd_end(std::span< MPI_Request > requests) const
Complete a non-blocking send from the local owner to process ranks that have the index as a ghost.
Definition Scatterer.h:246
void scatter_fwd(std::span< const T > local_data, std::span< T > remote_data) const
Scatter data associated with owned indices to ghosting ranks.
Definition Scatterer.h:337
Scatterer(const IndexMap &map, int bs, const Allocator &alloc=Allocator())
Create a scatterer.
Definition Scatterer.h:49
void scatter_rev_end(std::span< const T > local_buffer, std::span< T > local_data, F unpack_fn, BinaryOp op, std::span< MPI_Request > request)
End the reverse scatter communication, and unpack the received local buffer into local data.
Definition Scatterer.h:519
std::int32_t remote_buffer_size() const noexcept
Buffer size for remote data (ghosts) used in forward and reverse communication.
Definition Scatterer.h:566
std::int32_t local_buffer_size() const noexcept
Size of buffer for local data (owned and shared) used in forward and reverse communication.
Definition Scatterer.h:561
void scatter_fwd_end(std::span< const T > remote_buffer, std::span< T > remote_data, F unpack_fn, std::span< MPI_Request > requests) const
Complete a non-blocking send from the local owner to process ranks that have the index as a ghost,...
Definition Scatterer.h:314
void scatter_rev_begin(std::span< const T > remote_data, std::span< T > remote_buffer, std::span< T > local_buffer, F pack_fn, std::span< MPI_Request > request, Scatterer::type type=type::neighbor) const
Scatter data associated with ghost indices to owning ranks.
Definition Scatterer.h:482
type
Types of MPI communication pattern used by the Scatterer.
Definition Scatterer.h:38
void scatter_rev_end(std::span< MPI_Request > request) const
End the reverse scatter communication.
Definition Scatterer.h:445
void scatter_fwd_begin(std::span< const T > send_buffer, std::span< T > recv_buffer, std::span< MPI_Request > requests, Scatterer::type type=type::neighbor) const
Start a non-blocking send of owned data to ranks that ghost the data.
Definition Scatterer.h:194
const std::vector< std::int32_t > & local_indices() const noexcept
Definition Scatterer.h:574
const std::vector< std::int32_t > & remote_indices() const noexcept
Definition Scatterer.h:581
void scatter_rev(std::span< T > local_data, std::span< const T > remote_data, BinaryOp op)
Scatter data associated with ghost indices to ranks that own the indices.
Definition Scatterer.h:535
int bs() const noexcept
The number values (block size) to send per index in the common::IndexMap use to create the scatterer.
Definition Scatterer.h:589
void scatter_fwd_begin(std::span< const T > local_data, std::span< T > local_buffer, std::span< T > remote_buffer, F pack_fn, std::span< MPI_Request > requests, Scatterer::type type=type::neighbor) const
Scatter data associated with owned indices to ghosting ranks.
Definition Scatterer.h:279
void scatter_rev_begin(std::span< const T > send_buffer, std::span< T > recv_buffer, std::span< MPI_Request > requests, Scatterer::type type=type::neighbor) const
Start a non-blocking send of ghost data to ranks that own the data.
Definition Scatterer.h:389
MPI_Datatype mpi_t
Retrieves the MPI data type associated to the provided type.
Definition MPI.h:281
int size(MPI_Comm comm)
Definition MPI.cpp:72
Miscellaneous classes, functions and types.
Definition dolfinx_common.h:8
constexpr __radix_sort radix_sort
Radix sort.
Definition sort.h:122