Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions libs/full/collectives/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ set(collectives_headers
hpx/collectives/broadcast_direct.hpp
hpx/collectives/channel_communicator.hpp
hpx/collectives/create_communicator.hpp
hpx/collectives/detail/barrier_node.hpp
hpx/collectives/detail/channel_communicator.hpp
hpx/collectives/detail/communicator.hpp
hpx/collectives/detail/latch.hpp
Expand All @@ -34,7 +33,6 @@ set(collectives_headers
hpx/collectives/reduce_direct.hpp
hpx/collectives/scatter.hpp
hpx/collectives/spmd_block.hpp
hpx/collectives/detail/barrier_node.hpp
hpx/collectives/detail/latch.hpp
)

Expand All @@ -61,7 +59,6 @@ set(collectives_sources
broadcast.cpp
channel_communicator.cpp
create_communicator.cpp
detail/barrier_node.cpp
detail/channel_communicator_server.cpp
exclusive_scan.cpp
gather.cpp
Expand Down
236 changes: 224 additions & 12 deletions libs/full/collectives/include/hpx/collectives/barrier.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2016 Thomas Heller
// Copyright (c) 2026 The STE||AR-Group
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -90,31 +91,223 @@
#else

#include <hpx/config.hpp>
#include <hpx/collectives/detail/barrier_node.hpp>

#if !defined(HPX_COMPUTE_DEVICE_CODE)

#include <hpx/assert.hpp>
#include <hpx/async_distributed/async.hpp>
#include <hpx/collectives/argument_types.hpp>
#include <hpx/collectives/create_communicator.hpp>
#include <hpx/modules/async_base.hpp>
#include <hpx/modules/components_base.hpp>
#include <hpx/modules/errors.hpp>
#include <hpx/modules/futures.hpp>
#include <hpx/modules/memory.hpp>

#include <array>
#include <atomic>
#include <cstddef>
#include <string>
#include <utility>
#include <variant>
#include <vector>

#include <hpx/config/warnings_prefix.hpp>

namespace hpx::distributed {
////////////////////////////////////////////////////////////////////////////////
namespace hpx::traits {

namespace communication {

struct barrier_tag;

namespace detail {
template <>
struct communicator_data<barrier_tag>
{
HPX_EXPORT static char const* name() noexcept;
};
} // namespace communication

struct barrier_node;
///////////////////////////////////////////////////////////////////////////
// support for barrier (no-payload collective)
template <typename Communicator>
struct communication_operation<Communicator, communication::barrier_tag>
{
template <typename Result>
static Result get(Communicator& communicator, std::size_t which,
std::size_t generation)
{
return communicator.template handle_data<void>(
communication::communicator_data<
communication::barrier_tag>::name(),
which, generation, nullptr, nullptr);
}
};
} // namespace hpx::traits

////////////////////////////////////////////////////////////////////////////////
namespace hpx::collectives {

// Flat barrier: synchronize all sites associated with a single communicator.
inline hpx::future<void> barrier(communicator fid,
this_site_arg this_site = this_site_arg(),
generation_arg const generation = generation_arg())
{
if (this_site.is_default())
{
this_site = agas::get_locality_id();
}
if (generation.is_default())
{
return hpx::make_exceptional_future<void>(HPX_GET_EXCEPTION(
hpx::error::bad_parameter, "hpx::collectives::barrier",
"the generation number shouldn't be zero"));
}

// Handle operation right away if there is only one site.
if ([[maybe_unused]] auto const [num_sites, _] = fid.get_info();

Check notice

Code scanning / CodeQL

Unused local variable Note

Variable _ is not used.
Comment thread
hkaiser marked this conversation as resolved.
Dismissed
num_sites == 1)
{
return hpx::make_ready_future();
}

auto barrier_data = [this_site, generation](
communicator&& c) -> hpx::future<void> {
using action_type =
detail::communicator_server::communication_get_direct_action<
traits::communication::barrier_tag, hpx::future<void>>;

hpx::future<void> result =
hpx::async(action_type(), c, this_site, generation);

if (!result.is_ready())
{
// make sure id is kept alive as long as the returned future
traits::detail::get_shared_state(result)->set_on_completed(
[client = HPX_MOVE(c)] { HPX_UNUSED(client); });
}

return result;
};

return fid.then(hpx::launch::sync, HPX_MOVE(barrier_data));
}

class HPX_EXPORT barrier
inline hpx::future<void> barrier(communicator fid,
generation_arg const generation,
this_site_arg const this_site = this_site_arg())
{
return barrier(HPX_MOVE(fid), this_site, generation);
}

inline hpx::future<void> barrier(char const* basename,
num_sites_arg const num_sites = num_sites_arg(),
this_site_arg const this_site = this_site_arg(),
generation_arg const generation = generation_arg(),
root_site_arg const root_site = root_site_arg())
{
return barrier(create_communicator(basename, num_sites, this_site,
generation, root_site),
this_site);
}

inline void barrier(hpx::launch::sync_policy, communicator fid,
this_site_arg const this_site = this_site_arg(),
generation_arg const generation = generation_arg())
{
barrier(HPX_MOVE(fid), this_site, generation).get();
}

inline void barrier(hpx::launch::sync_policy, communicator fid,
generation_arg const generation,
this_site_arg const this_site = this_site_arg())
{
barrier(HPX_MOVE(fid), this_site, generation).get();
}

inline void barrier(hpx::launch::sync_policy, char const* basename,
num_sites_arg const num_sites = num_sites_arg(),
this_site_arg const this_site = this_site_arg(),
generation_arg const generation = generation_arg(),
root_site_arg const root_site = root_site_arg())
{
barrier(create_communicator(
basename, num_sites, this_site, generation, root_site),
this_site)
.get();
}

////////////////////////////////////////////////////////////////////////////
// Hierarchical barrier: reduce-phase + broadcast-phase no-op gates
// Uses the 2k-1 / 2k generation mapping: user generation k maps to
// internal generation 2k-1 (reduce phase) and 2k (broadcast phase). This
// allows the same sub-communicators to be used for both phases without
// generation collisions.
inline hpx::future<void> barrier(
hierarchical_communicator const& communicators,
this_site_arg this_site = this_site_arg(),
generation_arg const generation = generation_arg(),
root_site_arg /*root_site*/ = root_site_arg())
{
typedef detail::barrier_node wrapped_type;
typedef components::managed_component<wrapped_type> wrapping_type;
if (generation.is_default())
{
return hpx::make_exceptional_future<void>(
HPX_GET_EXCEPTION(hpx::error::bad_parameter,
"hpx::collectives::barrier (hierarchical)",
"hierarchical barrier requires an explicit generation "
"number for the 2k-1/2k internal mapping"));
}

if (this_site.is_default())
{
this_site = agas::get_locality_id();
}

if (communicators.size() == 0)
{
return hpx::make_ready_future();
}

generation_arg const reduce_gen(2 * generation - 1);
generation_arg const broadcast_gen(2 * generation);

// Reduce phase: walk sub-communicators from deepest (end of vector) to
// shallowest (start). Each sub-barrier releases only after all sites
// within that sub-communicator have checked in, which propagates
// arrival up the tree.
for (std::size_t i = communicators.size(); i > 0; --i)
{
barrier(
communicators.get(i - 1), communicators.site(i - 1), reduce_gen)
.get();
}
Comment thread
hkaiser marked this conversation as resolved.

// Broadcast phase: walk sub-communicators from shallowest to deepest.
// Returning the final future lets the caller chain on completion.

for (std::size_t i = 0; i + 1 < communicators.size(); ++i)
{
barrier(communicators.get(i), communicators.site(i), broadcast_gen)
.get();
}

return barrier(
communicators.back(), communicators.last_site(), broadcast_gen);
}

inline void barrier(hpx::launch::sync_policy,
hierarchical_communicator const& communicators,
this_site_arg const this_site = this_site_arg(),
generation_arg const generation = generation_arg(),
root_site_arg const root_site = root_site_arg())
{
barrier(communicators, this_site, generation, root_site).get();
}
} // namespace hpx::collectives

////////////////////////////////////////////////////////////////////////////////
namespace hpx::distributed {

class HPX_EXPORT barrier
{
public:
explicit barrier(std::string const& base_name);

Expand All @@ -141,18 +334,37 @@
void detach();

// Get the instance of the global barrier
static std::array<barrier, 2>& get_global_barrier();
static std::array<barrier, 2> create_global_barrier();
static barrier& get_global_barrier();
static barrier create_global_barrier();

static void synchronize();

private:
enum class force_flat_tag
{
tag
};

barrier();

hpx::intrusive_ptr<wrapping_type> node_;
barrier(std::string const& base_name, std::size_t num, std::size_t rank,
force_flat_tag);

void create_communicator(bool force_flat);

std::string base_name_;
std::size_t num_ = 0;
std::size_t rank_ = 0;
std::size_t cut_off_ = 0;
mutable std::atomic<std::size_t> generation_{0};

std::variant<hpx::collectives::communicator,
hpx::collectives::hierarchical_communicator>
comm_;
};
} // namespace hpx::distributed

#include <hpx/config/warnings_suffix.hpp>

#endif // !HPX_COMPUTE_DEVICE_CODE
#endif // DOXYGEN
Loading
Loading