Design discussion: hierarchical all_to_all (3-phase decomposition, looking for feedback) #7200
Replies: 4 comments 3 replies
-
As a first step, I'd opt for (a) to minimize implementation effort
Serializing a
Yes, let's try padding first.
Your suggestion is a correct first step. |
Beta Was this translation helpful? Give feedback.
-
|
Thanks @hkaiser. Locking in (a) for Q1, padding for Q3, and the transpose-identity pattern for Q4. Noting the "first step" / "let's try first" framing on Q1 and Q3: I'll record (b)/(c) for Phase 2's communicator and the all_to_allv path as explicit revisit-later follow-ups in the design doc, so they stay visible if benchmarks push against them. Two architectural questions surfaced while deepening the sketch into a longer doc, posting here since they affect v1 scope and (Q5) is a prerequisite PR: Q5: Generation namespace across hierarchical collectives sharing a communicator. Each hierarchical collective consumes a fixed stride of internal generations on the underlying communicators: Cleanest fix I can see: extend the factory to suffix each collective's basename with a collective-kind tag (
My read is: suffix approach, standalone PR first. But this touches #7160 so I'd rather not assume. Q6: Large-payload fallback threshold. The top rep's peak footprint in Phase 2 is roughly Two options:
Leaning (i) with a conservative default (~4 MiB per sub-matrix), but it's reasonable to defer (i) to a follow-up and ship v1 with (ii) + clear docs. Preference? Design doc in progress; will share once these are settled so the doc can reflect the decisions rather than re-open them. |
Beta Was this translation helpful? Give feedback.
-
|
Q5: I agree that the generations are a problem (which we have ignored for now). Having communicators specific to the collective operation would prevent us from using the same communicator for different operations, so I think this is not a good idea. We could however have a stride of Q6: Let's cross the bridge when we're there. So let's go with (ii) for now. |
Beta Was this translation helpful? Give feedback.
-
|
Design write up reflecting the decisions in this thread till now. Hierarchical
|
| src | source group | (src_group.size, my_group.size) real region | fills Col_0[0..2][·] columns |
|---|---|---|---|
| 0 | [0..2] | 3 × 3 | 0..2 |
| 1 | [3..5] | 3 × 3 | 3..5 |
| 2 | [6..8] | 3 × 3 | 6..8 |
| 3 | [9..10] | 2 × 3 | 9..10 |
Both end at 3k, so consecutive user generations chain cleanly: user generation k+1 starts at 3(k+1)-2 = 3k+1, immediately after the prior call's last slot.
9.1 Why no dummy round-trip is needed
The internal gate's next_generation (§2.5) accepts any new_generation >= generation_ and post-increments. The two-phase skip falls out of this:
- Two-phase phase 1 at
3k-2: gate accepts (3k-2 >= prev_last + 1 = 3(k-1) + 1 = 3k-2for the second call onward; trivially true for the first call). Gate advances to3k-1. - Two-phase phase 2 at
3k: gate accepts (3k >= 3k-1). Gate advances to3k+1. - The slot
3k-1is consumed implicitly by the>=accept — no dummy collective, no extra wire round-trip. Per@hkaiserin Design discussion: hierarchical all_to_all (3-phase decomposition, looking for feedback) #7200: "instead of adding another round trip simply send the required increment with the last operation on a communicator."
For three-phase all_to_all the same mechanism is invoked with no skip: phases use 3k-2, 3k-1, 3k consecutively, the gate post-increments at each step, and the final state is 3k+1. Both shapes leave the gate at the same state, so they can be freely interleaved on the same communicator across user generations.
9.2 What the user sees
The user generation contract is unchanged: each call on a shared hierarchical_communicator must use a strictly greater generation than the previous call. Stride-three is an internal invariant; it is invisible at the API boundary.
What the user gains is predictability across operation kinds. A sequence
all_gather (gen=1) -> internal slots 1, _, 3 (last = 3)
all_to_all (gen=2) -> internal slots 4, 5, 6 (last = 6)
all_reduce (gen=3) -> internal slots 7, _, 9 (last = 9)
never collides on the underlying communicators' generation namespaces, regardless of how two-phase and three-phase calls are interleaved.
9.3 What this is not
Stride-three does not relax the user-visible monotonicity rule. Two distinct calls on the same hierarchical_communicator with the same user generation are still an error; the prohibition is independent of stride.
9.4 Prerequisite PR scope (§14, step 0)
A separate PR before all_to_all:
all_reduce.hpp:459-460: change2 * generation - 1, 2 * generationto3 * generation - 2, 3 * generation.all_gather.hpp:420-421: same.all_reduce.hpp:451andall_gather.hpp:412: change thebad_parameterstrings from "the 2k/2k+1 internal mapping" to "the 3k-2/3k internal mapping" (the existing strings already describe the wrong arithmetic — pre-existing slip, fixed in passing).- A regression test mixing
all_reduceandall_gathercalls at consecutive user generations on the samehierarchical_communicator, asserting all calls succeed. The cross-collective regression that includesall_to_allis added when the all_to_all PR lands.
No changes to next_generation, handle_data, or any other internal communicator API.
10. Flat fallback
At entry to hierarchical all_to_all:
if (communicators.size() == 1)
{
auto [c, site] = communicators[0];
return all_to_all(c, HPX_MOVE(local_result), site, generation);
}
This handles two cases uniformly:
- The factory took the explicit flat-fallback path from PR Add flat-collective fallback to hierarchical_communicator for small site counts #7193 (
num_sites < threshold). - The tree construction reached the terminal flat case immediately because
num_sites <= arity.
Flat all_to_all's validation and exception behavior (size mismatch, generation == 0) are inherited unchanged.
This mirrors the existing scatter_from and scatter_to hierarchical size() == 1 short-circuits at scatter.hpp:547 and scatter.hpp:713.
11. Memory behavior
Total logical data volume is unavoidable: every site contributes N values and receives N values, irrespective of the algorithm.
The pressure point is a top representative. In a straightforward v1 implementation a rep transiently holds up to four buffers of order g × N elements (with g ≈ N / arity for balanced trees): Row_k, outgoing padded blocks, received padded blocks, and Col_k. Aggressive use of HPX_MOVE keeps two of these alive at a time in steady state; peak usage stays in O(g · N · sizeof(T)).
Per-rep peak (rough upper bound, balanced case):
peak_bytes ≈ (2/arity + 1/arity²) · N² · m
where m = sizeof(payload-element). For N = 256, arity = 4, m = 1 MiB this is approximately 36 GiB at a single rep, which is unusable on most nodes.
v1 disposition (closes #7200 Q6): no second fallback threshold in v1. Document the budget; users size arity to keep per-rep peak within node memory. @hkaiser: "Let's cross that bridge when we're there."
Future work that addresses this — listed here for visibility, not for v1 — includes ragged blocks instead of padded (eliminates the g_max² - g_src · g_dst waste per block), streaming Phase 2 → Phase 3 to avoid materializing the full Col_k, a payload-byte-keyed second fallback threshold, an all_to_allv based on channel_communicator, and multi-leader variants for large groups. None are v1.
12. Tests
Tests live alongside the existing hierarchical collective tests and follow the same launch-num_sites-tasks-from-locality-0 pattern (compare the existing flat all_to_all test at concurrent_collectives.cpp:189-200).
12.1 Transpose identity
Site i contributes In[i][j] = encode(i, j) for an injective deterministic encode such as
auto encode = [](std::size_t i, std::size_t j, std::size_t C) {
return i * C + j;
};
with C > max_num_sites to keep the encoding injective. After the collective, site i checks Out[i][j] == encode(j, i) for all j. This catches every transpose error.
12.2 Local multi-site coverage
arity 2: N = 2, 3, 4, 5, 6, 7, 8, 9, 11, 15
arity 4: N = 5, 6, 7, 9, 10, 11, 13, 15
flat_fallback_threshold_arg(0) forces the tree path except in the terminal-flat case where num_sites <= arity (§2.2); a high threshold (e.g. 1024) forces the explicit flat-fallback path. The N = 11, arity = 4 case is the worked example in §7.4 and exercises the most non-trivial padding/reconstruction path.
12.3 Distributed coverage
The existing CMake LOCALITIES 2 pattern is sufficient for API surface, fallback path, and basic distributed correctness, but it does not exercise a real top-level hierarchy: at N=2, arity=2 the recursion's terminal condition (§2.2) fires immediately (right - left = 1 < arity = 2), producing a hierarchical_communicator of size() == 1 and dispatching through the §10 fallback regardless of the flat_fallback_threshold value.
Real hierarchical multi-level behavior is therefore covered by the local multi-site tests in §12.2 (which run num_sites HPX tasks from a single locality) and, optionally, by a higher-locality distributed target if CI cost permits — LOCALITIES 4 with arity 2 is enough to produce a two-level tree.
12.4 Bad-input path
Mirror flat all_to_all: if any site's contribution is not of length num_sites, the operation completes exceptionally with bad_parameter. Same for generation == 0 and for the default-generation case (§4).
12.5 Generation regression — cross-collective stride-three
On one hierarchical_communicator:
all_gather (gen=1)
all_to_all (gen=2)
all_reduce (gen=3)
all_to_all (gen=4)
All calls must succeed.
The failure mode this guards against is mixing operation strides on a shared communicator. Under the old two-phase mapping (2k-1, 2k), all_gather(gen=1) would consume internal slots 1, 2, leaving the gate expecting at least 3 for the next call. A stride-three all_to_all(gen=2) would consume slots 4, 5, 6, leaving the gate expecting at least 7. A subsequent old-mapped all_reduce(gen=3) would attempt to start at slot 2*3 - 1 = 5, which is now behind the gate (5 < 7) — next_generation's >= check rejects, the call fails. The stride-three prerequisite eliminates this by making all_reduce(gen=3) start at slot 3*3 - 2 = 7, which the gate accepts.
This regression is the test that justifies the §9 design. The two-collective version (without all_to_all) lives in the prerequisite PR (§14, step 0); the four-call version above is added when all_to_all lands.
12.6 Helper unit tests
detail::get_top_level_groups and detail::classify_site get a small unit test against a hardcoded partition table, covering at minimum:
- balanced cases:
(N=8, arity=2),(N=16, arity=4); - non-balanced cases:
(N=11, arity=4)(the §7.4 example, sizes3,3,3,2),(N=5, arity=4)(sizes2,1,1,1— three top reps with singleton top-level groups).
These tests live in the prerequisite PR (§14, step 1).
13. Benchmarks
Benchmarking comes after correctness. The first extension follows the existing benchmark_collectives_test style:
- flat vs hierarchical
all_to_all; - arity 2, 4 (and 8 if cluster geometry permits);
- fallback threshold
0for forced tree mode and the default for normal behavior; - payload sizes from scalar to large vectors;
- threads-per-locality sweep, matching the previous fallback-benchmark discussion.
Per-phase timing is useful but should not block the correctness PR; if added, it stays in the benchmark harness, not the collective.
14. Implementation sequence
Each step is a standalone PR.
Step 0 — prerequisite: stride-three arithmetic. Update all_reduce.hpp and all_gather.hpp from (2k-1, 2k) to (3k-2, 3k). Fix the bad_parameter strings. Add the two-collective generation regression test. No internal communicator API change.
Step 1 — prerequisite: shared top-level partition helper. Introduce detail::get_top_level_groups and detail::classify_site. Use get_top_level_groups in the top frame of recursively_fill_communicators without changing the produced communicator names, group boundaries, or site ranks — this PR is a refactor, not a behavior change. Add §12.6's unit tests.
Step 2 — all_to_all fallback and validation. Hierarchical entry point with the §10 size() == 1 shortcut, malformed-size validation, and generation == 0 rejection. Add §12.4's tests.
Step 3 — subtree_gather_at_top_rep helper (Phase 1). Plus a unit test that exercises only Phase 1 against a known input.
Step 4 — Phase 2 representative exchange. Padded block builder, flat all_to_all<vector<T>> over the top communicator, and Col_k reconstruction using §5's helpers.
Step 5 — subtree_scatter_at_top_rep helper (Phase 3). Plus a unit test that exercises only Phase 3 against a known input.
Step 6 — End-to-end transpose tests. §12.1 + §12.2 + §12.3 + the four-call version of §12.5.
Step 7 — Benchmark wiring. Per §13.
Steps 0 and 1 are PR-able immediately and depend on nothing in this design. Steps 2–6 land as one feature PR or as a small chain.
15. Out of scope for v1
- Separate communicator families per collective operation (rejected in Design discussion: hierarchical all_to_all (3-phase decomposition, looking for feedback) #7200; would prevent shared communicator identity).
- Second fallback threshold keyed on payload bytes (deferred per Design discussion: hierarchical all_to_all (3-phase decomposition, looking for feedback) #7200; ship v1 with a documented memory budget instead).
- Custom
all_to_allvoverchannel_communicator. - Ragged (non-padded) blocks. Two benefits, both deferred to follow-up: (a) eliminates the
g_max² - g_src · g_dstwaste per block, and (b) lifts the v1 default-constructibility constraint onT(§7.1), since blocks would carry only real elements. This is the natural near-term Phase-2.5 follow-up. - Streaming Phase 2 → Phase 3 to avoid materializing the full
Col_k. - GPU-aware payload handling.
- Phase overlap / pipelining.
- Multi-leader representative schemes.
- Performance-driven changes to the default
flat_fallback_thresholdvalue.
These are valid follow-ups; including any one in v1 would force a too-large initial PR.
16. Pitfalls
communicators.get(0)is the top-of-tree communicator only on top reps. On any non-top-rep it is the highest subtree-internal communicator. Implementations that branch on it must computeis_top_repfrom(num_sites, arity, this_site)via §5's helper, not fromcommunicators[0]alone.- Public hierarchical
gather_hereon a top rep ends with a cross-top gather; Phase 1 on top reps must usesubtree_gather_at_top_rep. Publicgather_thereon non-top-reps is fine as-is. - Public hierarchical
scatter_toon a top rep starts with a cross-top scatter; Phase 3 on top reps must usesubtree_scatter_at_top_rep. Publicscatter_fromon non-top-reps is fine as-is. - Padding is not a serialization requirement.
all_to_all<vector<T>>carries variable-length inner vectors fine. Padding is purely v1's receiver-side reconstruction simplicity. - Stride-three needs no internal communicator API change.
next_generation's>=accept + post-increment carries the implicit skip; do not introduce a dummy collective for the unused middle slot. - Reusing the same user generation for two distinct calls on the same
hierarchical_communicatoris still an error after the stride-three change. Stride-three makes the internal generation footprint per call uniform; it does not relax the user contract. - Receiver-side
Col_kreconstruction in Phase 2 needs the source group's actual size to crop padding correctly. Get it fromdetail::get_top_level_groups(num_sites, arity)[src]; do not assume balanced groups. - Top-rep detection comes from the partition helper, not from the communicator path. Use
detail::classify_site(get_top_level_groups(...), this_site).is_representativeto decide whether to dispatch to the subtree gather/scatter helpers. Inferring top-rep-ness fromcommunicators.size()or from positional checks oncommunicators[0]is fragile: a top rep with a singleton top-level group hascommunicators.size() == 2(top comm + 1-site terminal), not 1; the onlysize() == 1case is the §10 short-circuit, which fires before any top-rep dispatch. - Do not duplicate the partitioning rule in two places. Step 1 of §14 introduces
detail::get_top_level_groupsprecisely to keep the factory and the helper in lockstep.
17. Summary
if hierarchical communicator has one underlying communicator: delegate to flat all_to_all (covers the explicit fallback path AND the case num_sites <= arity)else:
Phase 1 (gen 3k-2): each top-level subtree gathers to its representative
top reps: subtree_gather_at_top_rep helper (loop, no cross-top)
non-top-reps: public hierarchical gather_there (works as-is)Phase 2 (gen 3k-1): top reps run flat all_to_all<vector<T>> on the top communicator with padded |G_k| × |G_dst| blocks; receivers crop padding using get_top_level_groups Phase 3 (gen 3k): each top rep scatters Col_k down its subtree top reps: subtree_scatter_at_top_rep helper (start at comm[1]) non-top-reps: public hierarchical scatter_from (works as-is)
Correctness first. The two prerequisite PRs (stride-three arithmetic, shared partition helper) are small, mechanical, and reviewable in isolation; they unlock the all_to_all PR sequence without coupling.
References
- Strack, Zeil, Kaiser, Pflüger. Hierarchical Collective Operations for the Asynchronous Many-Task Runtime HPX. 16th International Parallel Tools Workshop (IPTW), 2025.
- Träff, Rougier. MPI Collectives and Datatypes for Hierarchical All-to-all Communication. EuroMPI 2014.
- Chochia, Solt, Hursey. Applying On-Node Aggregation Methods to MPI Alltoall Collectives: Matrix Block Aggregation Algorithm. EuroMPI/USA 2022.
- Bienz, Olson, Gropp. Node-Aware Improvements to Allreduce. ExaMPI Workshop, SC 2019. arXiv:1910.09650.
- HPX PR Add hierarchical all_reduce and all_gather via reduce+broadcast composition #7160 (hierarchical
all_reduce/all_gather, merged). - HPX PR Add flat-collective fallback to hierarchical_communicator for small site counts #7193 (flat-fallback factory threshold, in review).
- HPX Discussion Design discussion: hierarchical all_to_all (3-phase decomposition, looking for feedback) #7200 (architectural Q&A for this design).
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Following PR #7160 (hierarchical all_reduce + all_gather, merged) and PR #7193 (flat fallback for the hierarchical_communicator factory, in review), I want to start a design discussion on the third collective in the project: hierarchical all_to_all. Posting now to get feedback on the architectural shape before writing code.
This is a sketch. The goal is to validate the overall decomposition and surface the implementation choices I cannot make alone.
Would appreciate input from @hkaiser and @constracktor on the questions below.
Why all_to_all is structurally different from #7160
For all_reduce and all_gather, the result is the SAME at every site (reduced scalar / gathered vector). That's why "compute at root, then broadcast" works as a 2-phase composition over existing primitives.
all_to_all does not have this property. Site i needs the i-th column of the global N x N input matrix. The output is DIFFERENT at every site, so no broadcast trick. We have to move N^2 worth of data across the cluster, no matter what.
What hierarchical can buy us is not less data, but FEWER MESSAGES. A flat all_to_all over N sites issues N*(N-1) distinct messages globally. Hierarchical can collapse this to O(N log N) larger messages by aggregating intra-group, exchanging inter-group, and redistributing intra-group. This matches the standard "node-aware" pattern in the MPI literature (Träff and Rougier 2014; Bienz et al.; Chochia et al. "matrix block aggregation"). For HPX, the equivalent of "node" is "subtree of the existing hierarchical_communicator at the top level of the tree."
Proposed 3-phase decomposition
For N sites with arity a, the existing hierarchical_communicator factory builds a tree where the top level has
arepresentatives, each rooting a subtree G_k of size ~N/a.Phase 1 composes with the existing hierarchical gather walk. Phase 3 mirrors it with the existing hierarchical scatter walk. Phase 2 is the only genuinely new piece; it operates on the top-level communicator (size = arity) using a flat all_to_all there with
a-sized payload-per-destination.Generation mapping mirrors the 2k-1 / 2k pattern from #7160:
Complexity sketch
Let m = bytes per element, g = N/a = subtree size (assume balanced).
The classical hierarchical tradeoff: fewer messages help when latency dominates (small payloads) but introduce bandwidth concentration at top reps that can hurt for very large payloads. A flat fallback below a configurable site-count threshold (matching #7193) is the obvious safety net for small N.
Proposed signature
Mirrors the existing all_reduce hierarchical signature:
The fallback threshold is consumed at communicator creation, so it does not appear on the collective itself, matching how the existing hierarchical overloads handle it.
Architectural questions
Q1: Phase 2's communicator
Three options for the inter-rep exchange:
(a) Use the existing top-of-tree communicator (
communicators.get(0)at a rep) with a flatall_to_all<vector<T>>(...). (b) Construct a dedicated inter-group communicator at factory time and store it alongside the tree levels. (c) Usechannel_communicatorfor explicit point-to-point routing.My instinct is (a): reuses the existing flat all_to_all, no new communicator type, the top group is small so flat there is cheap. Is there a reason to prefer (b) or (c)?
Q2: Data layout for the inter-rep payload
For Phase 2 each rep sends
adistinct sub-matrices (one per destination subtree). The natural representation isstd::vector<std::vector<T>>outer-keyed by destination, which means instantiatingall_to_all<std::vector<T>>(...)at the inter-rep step.Does the existing all_to_all serialization handle nested vectors cleanly, or is there a known sharp edge analogous to the
vector<bool>proxy issue we already special-case? For very large payloads, would a scatter-gather "schedule" be worth introducing instead of nested vectors, to avoid the extra copy? My instinct is to defer the second question until benchmark data motivates it.Q3: Non-balanced subtrees
When N is not a multiple of arity, top-level subtrees have slightly different sizes (
recursive_fillhandles this withdivision_steps + remainder). Phases 1 and 3 inherit this naturally, but Phase 2 sees variable-size sub-matrices: rep k sends|G_k| * |G_dst|elements to rep dst, which is NOT symmetric across (k, dst) pairs.The flat
all_to_allAPI requires equal-size contributions. So we cannot directly call flat all_to_all in Phase 2 with variable-size sub-matrices. Options:I'd like to ship the padding option in v1 and revisit if benchmarks show the waste is non-trivial. Confirm or push back?
Q4: Test pattern
Proposed correctness check: site i contributes
In[i][j] = encode(i, j)for a deterministicencode, then checksOut[i][j] == encode(j, i). This catches every transpose error. Is there a stronger pattern the project uses that I should mirror?Out of scope for v1
These are noted for follow-up but trying to land them in v1 risks not landing v1.
Plan
Approximate timeline:
If the decomposition above has a structural flaw I'm missing, the sooner we catch it, the better. Looking forward to feedback.
References
Beta Was this translation helpful? Give feedback.
All reactions