Skip to content

Commit e4fd72f

Browse files
Merge branch 'master' into fix/minmax-type-deduction
2 parents 007c7b3 + d7c9fe1 commit e4fd72f

10 files changed

Lines changed: 707 additions & 629 deletions

File tree

libs/full/collectives/CMakeLists.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ set(collectives_headers
2121
hpx/collectives/broadcast_direct.hpp
2222
hpx/collectives/channel_communicator.hpp
2323
hpx/collectives/create_communicator.hpp
24-
hpx/collectives/detail/barrier_node.hpp
2524
hpx/collectives/detail/channel_communicator.hpp
2625
hpx/collectives/detail/communicator.hpp
2726
hpx/collectives/detail/latch.hpp
@@ -34,7 +33,6 @@ set(collectives_headers
3433
hpx/collectives/reduce_direct.hpp
3534
hpx/collectives/scatter.hpp
3635
hpx/collectives/spmd_block.hpp
37-
hpx/collectives/detail/barrier_node.hpp
3836
hpx/collectives/detail/latch.hpp
3937
)
4038

@@ -61,7 +59,6 @@ set(collectives_sources
6159
broadcast.cpp
6260
channel_communicator.cpp
6361
create_communicator.cpp
64-
detail/barrier_node.cpp
6562
detail/channel_communicator_server.cpp
6663
exclusive_scan.cpp
6764
gather.cpp

libs/full/collectives/include/hpx/collectives/barrier.hpp

Lines changed: 224 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// Copyright (c) 2016 Thomas Heller
2+
// Copyright (c) 2026 The STE||AR-Group
23
//
34
// SPDX-License-Identifier: BSL-1.0
45
// Distributed under the Boost Software License, Version 1.0. (See accompanying
@@ -90,31 +91,223 @@ namespace hpx { namespace distributed {
9091
#else
9192

9293
#include <hpx/config.hpp>
93-
#include <hpx/collectives/detail/barrier_node.hpp>
94+
95+
#if !defined(HPX_COMPUTE_DEVICE_CODE)
96+
97+
#include <hpx/assert.hpp>
98+
#include <hpx/async_distributed/async.hpp>
99+
#include <hpx/collectives/argument_types.hpp>
100+
#include <hpx/collectives/create_communicator.hpp>
94101
#include <hpx/modules/async_base.hpp>
95102
#include <hpx/modules/components_base.hpp>
103+
#include <hpx/modules/errors.hpp>
96104
#include <hpx/modules/futures.hpp>
97-
#include <hpx/modules/memory.hpp>
98105

99-
#include <array>
106+
#include <atomic>
100107
#include <cstddef>
101108
#include <string>
109+
#include <utility>
110+
#include <variant>
102111
#include <vector>
103112

104113
#include <hpx/config/warnings_prefix.hpp>
105114

106-
namespace hpx::distributed {
115+
////////////////////////////////////////////////////////////////////////////////
116+
namespace hpx::traits {
117+
118+
namespace communication {
119+
120+
struct barrier_tag;
107121

108-
namespace detail {
122+
template <>
123+
struct communicator_data<barrier_tag>
124+
{
125+
HPX_EXPORT static char const* name() noexcept;
126+
};
127+
} // namespace communication
109128

110-
struct barrier_node;
129+
///////////////////////////////////////////////////////////////////////////
130+
// support for barrier (no-payload collective)
131+
template <typename Communicator>
132+
struct communication_operation<Communicator, communication::barrier_tag>
133+
{
134+
template <typename Result>
135+
static Result get(Communicator& communicator, std::size_t which,
136+
std::size_t generation)
137+
{
138+
return communicator.template handle_data<void>(
139+
communication::communicator_data<
140+
communication::barrier_tag>::name(),
141+
which, generation, nullptr, nullptr);
142+
}
143+
};
144+
} // namespace hpx::traits
145+
146+
////////////////////////////////////////////////////////////////////////////////
147+
namespace hpx::collectives {
148+
149+
// Flat barrier: synchronize all sites associated with a single communicator.
150+
inline hpx::future<void> barrier(communicator fid,
151+
this_site_arg this_site = this_site_arg(),
152+
generation_arg const generation = generation_arg())
153+
{
154+
if (this_site.is_default())
155+
{
156+
this_site = agas::get_locality_id();
157+
}
158+
if (generation.is_default())
159+
{
160+
return hpx::make_exceptional_future<void>(HPX_GET_EXCEPTION(
161+
hpx::error::bad_parameter, "hpx::collectives::barrier",
162+
"the generation number shouldn't be zero"));
163+
}
164+
165+
// Handle operation right away if there is only one site.
166+
if ([[maybe_unused]] auto const [num_sites, _] = fid.get_info();
167+
num_sites == 1)
168+
{
169+
return hpx::make_ready_future();
170+
}
171+
172+
auto barrier_data = [this_site, generation](
173+
communicator&& c) -> hpx::future<void> {
174+
using action_type =
175+
detail::communicator_server::communication_get_direct_action<
176+
traits::communication::barrier_tag, hpx::future<void>>;
177+
178+
hpx::future<void> result =
179+
hpx::async(action_type(), c, this_site, generation);
180+
181+
if (!result.is_ready())
182+
{
183+
// make sure id is kept alive as long as the returned future
184+
traits::detail::get_shared_state(result)->set_on_completed(
185+
[client = HPX_MOVE(c)] { HPX_UNUSED(client); });
186+
}
187+
188+
return result;
189+
};
190+
191+
return fid.then(hpx::launch::sync, HPX_MOVE(barrier_data));
111192
}
112193

113-
class HPX_EXPORT barrier
194+
inline hpx::future<void> barrier(communicator fid,
195+
generation_arg const generation,
196+
this_site_arg const this_site = this_site_arg())
197+
{
198+
return barrier(HPX_MOVE(fid), this_site, generation);
199+
}
200+
201+
inline hpx::future<void> barrier(char const* basename,
202+
num_sites_arg const num_sites = num_sites_arg(),
203+
this_site_arg const this_site = this_site_arg(),
204+
generation_arg const generation = generation_arg(),
205+
root_site_arg const root_site = root_site_arg())
206+
{
207+
return barrier(create_communicator(basename, num_sites, this_site,
208+
generation, root_site),
209+
this_site);
210+
}
211+
212+
inline void barrier(hpx::launch::sync_policy, communicator fid,
213+
this_site_arg const this_site = this_site_arg(),
214+
generation_arg const generation = generation_arg())
215+
{
216+
barrier(HPX_MOVE(fid), this_site, generation).get();
217+
}
218+
219+
inline void barrier(hpx::launch::sync_policy, communicator fid,
220+
generation_arg const generation,
221+
this_site_arg const this_site = this_site_arg())
222+
{
223+
barrier(HPX_MOVE(fid), this_site, generation).get();
224+
}
225+
226+
inline void barrier(hpx::launch::sync_policy, char const* basename,
227+
num_sites_arg const num_sites = num_sites_arg(),
228+
this_site_arg const this_site = this_site_arg(),
229+
generation_arg const generation = generation_arg(),
230+
root_site_arg const root_site = root_site_arg())
231+
{
232+
barrier(create_communicator(
233+
basename, num_sites, this_site, generation, root_site),
234+
this_site)
235+
.get();
236+
}
237+
238+
////////////////////////////////////////////////////////////////////////////
239+
// Hierarchical barrier: reduce-phase + broadcast-phase no-op gates
240+
// Uses the 2k-1 / 2k generation mapping: user generation k maps to
241+
// internal generation 2k-1 (reduce phase) and 2k (broadcast phase). This
242+
// allows the same sub-communicators to be used for both phases without
243+
// generation collisions.
244+
inline hpx::future<void> barrier(
245+
hierarchical_communicator const& communicators,
246+
this_site_arg this_site = this_site_arg(),
247+
generation_arg const generation = generation_arg(),
248+
root_site_arg /*root_site*/ = root_site_arg())
114249
{
115-
typedef detail::barrier_node wrapped_type;
116-
typedef components::managed_component<wrapped_type> wrapping_type;
250+
if (generation.is_default())
251+
{
252+
return hpx::make_exceptional_future<void>(
253+
HPX_GET_EXCEPTION(hpx::error::bad_parameter,
254+
"hpx::collectives::barrier (hierarchical)",
255+
"hierarchical barrier requires an explicit generation "
256+
"number for the 2k-1/2k internal mapping"));
257+
}
258+
259+
if (this_site.is_default())
260+
{
261+
this_site = agas::get_locality_id();
262+
}
263+
264+
if (communicators.size() == 0)
265+
{
266+
return hpx::make_ready_future();
267+
}
268+
269+
generation_arg const reduce_gen(2 * generation - 1);
270+
generation_arg const broadcast_gen(2 * generation);
271+
272+
// Reduce phase: walk sub-communicators from deepest (end of vector) to
273+
// shallowest (start). Each sub-barrier releases only after all sites
274+
// within that sub-communicator have checked in, which propagates
275+
// arrival up the tree.
276+
for (std::size_t i = communicators.size(); i > 0; --i)
277+
{
278+
barrier(
279+
communicators.get(i - 1), communicators.site(i - 1), reduce_gen)
280+
.get();
281+
}
282+
283+
// Broadcast phase: walk sub-communicators from shallowest to deepest.
284+
// Returning the final future lets the caller chain on completion.
285+
286+
for (std::size_t i = 0; i + 1 < communicators.size(); ++i)
287+
{
288+
barrier(communicators.get(i), communicators.site(i), broadcast_gen)
289+
.get();
290+
}
291+
292+
return barrier(
293+
communicators.back(), communicators.last_site(), broadcast_gen);
294+
}
117295

296+
inline void barrier(hpx::launch::sync_policy,
297+
hierarchical_communicator const& communicators,
298+
this_site_arg const this_site = this_site_arg(),
299+
generation_arg const generation = generation_arg(),
300+
root_site_arg const root_site = root_site_arg())
301+
{
302+
barrier(communicators, this_site, generation, root_site).get();
303+
}
304+
} // namespace hpx::collectives
305+
306+
////////////////////////////////////////////////////////////////////////////////
307+
namespace hpx::distributed {
308+
309+
class HPX_EXPORT barrier
310+
{
118311
public:
119312
explicit barrier(std::string const& base_name);
120313

@@ -141,18 +334,37 @@ namespace hpx::distributed {
141334
void detach();
142335

143336
// Get the instance of the global barrier
144-
static std::array<barrier, 2>& get_global_barrier();
145-
static std::array<barrier, 2> create_global_barrier();
337+
static barrier& get_global_barrier();
338+
static barrier create_global_barrier();
146339

147340
static void synchronize();
148341

149342
private:
343+
enum class force_flat_tag
344+
{
345+
tag
346+
};
347+
150348
barrier();
151349

152-
hpx::intrusive_ptr<wrapping_type> node_;
350+
barrier(std::string const& base_name, std::size_t num, std::size_t rank,
351+
force_flat_tag);
352+
353+
void create_communicator(bool force_flat);
354+
355+
std::string base_name_;
356+
std::size_t num_ = 0;
357+
std::size_t rank_ = 0;
358+
std::size_t cut_off_ = 0;
359+
mutable std::atomic<std::size_t> generation_{0};
360+
361+
std::variant<hpx::collectives::communicator,
362+
hpx::collectives::hierarchical_communicator>
363+
comm_;
153364
};
154365
} // namespace hpx::distributed
155366

156367
#include <hpx/config/warnings_suffix.hpp>
157368

369+
#endif // !HPX_COMPUTE_DEVICE_CODE
158370
#endif // DOXYGEN

0 commit comments

Comments
 (0)