Skip to content

Commit 18497d0

Browse files
committed
add ifdef stdexec
1 parent de6ae81 commit 18497d0

6 files changed

Lines changed: 65 additions & 47 deletions

File tree

libs/core/algorithms/tests/performance/foreach_report.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ int hpx_main(hpx::program_options::variables_map& vm)
8282
[&]() { measure_parallel_foreach(data_representation, exec); });
8383
}
8484

85+
#if defined(HPX_HAVE_STDEXEC)
8586
{
8687
hpx::execution::experimental::scheduler_executor<
8788
hpx::execution::experimental::parallel_scheduler>
@@ -90,6 +91,7 @@ int hpx_main(hpx::program_options::variables_map& vm)
9091
test_count,
9192
[&]() { measure_parallel_foreach(data_representation, exec); });
9293
}
94+
#endif
9395

9496
{
9597
hpx::execution::parallel_executor exec;

libs/core/executors/include/hpx/executors/parallel_scheduler.hpp

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <hpx/threading_base/detail/get_default_pool.hpp>
1515
#include <exception>
1616
#include <memory>
17+
#include <type_traits>
1718

1819
namespace hpx::execution::experimental {
1920

@@ -59,17 +60,22 @@ namespace hpx::execution::experimental {
5960
// completion scheduler (completes_on pattern)
6061
auto par_sched = [&]() {
6162
if constexpr (hpx::is_invocable_v<
62-
hpx::execution::experimental::get_completion_scheduler_t<
63-
hpx::execution::experimental::set_value_t>,
64-
decltype(hpx::execution::experimental::get_env(child))>)
63+
hpx::execution::experimental::
64+
get_completion_scheduler_t<hpx::
65+
execution::experimental::
66+
set_value_t>,
67+
decltype(hpx::execution::experimental::
68+
get_env(child))>)
6569
{
66-
return hpx::execution::experimental::get_completion_scheduler<
67-
hpx::execution::experimental::set_value_t>(
68-
hpx::execution::experimental::get_env(child));
70+
return hpx::execution::experimental::
71+
get_completion_scheduler<
72+
hpx::execution::experimental::set_value_t>(
73+
hpx::execution::experimental::get_env(child));
6974
}
7075
else
7176
{
72-
return hpx::execution::experimental::get_parallel_scheduler();
77+
return hpx::execution::experimental::
78+
get_parallel_scheduler();
7379
}
7480
}();
7581

@@ -205,8 +211,7 @@ namespace hpx::execution::experimental {
205211
operation_state& operator=(operation_state&&) = default;
206212
operation_state& operator=(operation_state const&) = delete;
207213

208-
friend void tag_invoke(
209-
start_t, operation_state& os) noexcept
214+
friend void tag_invoke(start_t, operation_state& os) noexcept
210215
{
211216
#if defined(HPX_HAVE_STDEXEC)
212217
// P2079R10 4.1: if stop_token is stopped, complete
@@ -347,7 +352,7 @@ namespace hpx::execution::experimental {
347352

348353
private:
349354
thread_pool_policy_scheduler<hpx::launch> scheduler_;
350-
// Cached PU mask computed once, reused for every bulk_chunked call.
355+
// Cached PU mask - computed once, reused for every bulk_chunked call.
351356
hpx::threads::mask_type pu_mask_;
352357
};
353358

@@ -360,11 +365,12 @@ namespace hpx::execution::experimental {
360365
// P2079R10 get_parallel_scheduler function
361366
inline parallel_scheduler get_parallel_scheduler()
362367
{
363-
static const parallel_scheduler default_sched = []() {
368+
static parallel_scheduler const default_sched = []() {
364369
auto pool = detail::get_default_parallel_pool();
365370
if (!pool)
366371
{
367-
std::terminate(); // As per P2079R10, terminate if backend is unavailable
372+
std::
373+
terminate(); // As per P2079R10, terminate if backend is unavailable
368374
}
369375
return parallel_scheduler(thread_pool_policy_scheduler<hpx::launch>(
370376
pool, hpx::launch::async));

libs/core/executors/include/hpx/executors/scheduler_executor.hpp

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,19 @@ namespace hpx::execution::experimental {
4242
};
4343

4444
template <typename Policy>
45-
struct has_thread_pool_backend<
46-
thread_pool_policy_scheduler<Policy>> : std::true_type
45+
struct has_thread_pool_backend<thread_pool_policy_scheduler<Policy>>
46+
: std::true_type
4747
{
4848
};
4949

5050
// Helper to extract thread pool parameters from a scheduler
5151
template <typename Scheduler>
52-
struct thread_pool_params; // primary: not defined
52+
struct thread_pool_params; // primary: not defined
5353

5454
template <typename Policy>
5555
struct thread_pool_params<thread_pool_policy_scheduler<Policy>>
5656
{
57-
static auto* pool(
58-
thread_pool_policy_scheduler<Policy> const& sched)
57+
static auto* pool(thread_pool_policy_scheduler<Policy> const& sched)
5958
{
6059
return sched.get_thread_pool();
6160
}
@@ -79,8 +78,8 @@ namespace hpx::execution::experimental {
7978
static auto pu_mask(
8079
thread_pool_policy_scheduler<Policy> const& sched)
8180
{
82-
return hpx::execution::experimental::
83-
get_processing_units_mask(sched);
81+
return hpx::execution::experimental::get_processing_units_mask(
82+
sched);
8483
}
8584
};
8685
} // namespace detail
@@ -254,8 +253,8 @@ namespace hpx::execution::experimental {
254253

255254
return hpx::parallel::execution::detail::
256255
index_queue_bulk_async_execute(pool, first_core,
257-
num_cores, policy, HPX_FORWARD(F, f), shape,
258-
mask, HPX_FORWARD(Ts, ts)...);
256+
num_cores, policy, HPX_FORWARD(F, f), shape, mask,
257+
HPX_FORWARD(Ts, ts)...);
259258
}
260259
else if constexpr (requires {
261260
exec.sched_.get_underlying_scheduler();
@@ -287,7 +286,8 @@ namespace hpx::execution::experimental {
287286
size_type const n = hpx::util::size(shape);
288287
return make_future(bulk(schedule(exec.sched_), par, n,
289288
[shape, f = HPX_FORWARD(F, f),
290-
... args = HPX_FORWARD(Ts, ts)](size_type i) mutable {
289+
... args = HPX_FORWARD(Ts, ts)](
290+
size_type i) mutable {
291291
auto it = hpx::util::begin(shape);
292292
std::advance(it, i);
293293
HPX_INVOKE(f, *it, args...);
@@ -300,7 +300,8 @@ namespace hpx::execution::experimental {
300300
size_type const n = hpx::util::size(shape);
301301
return make_future(bulk(schedule(exec.sched_), par, n,
302302
[shape, f = HPX_FORWARD(F, f),
303-
... args = HPX_FORWARD(Ts, ts)](size_type i) mutable {
303+
... args = HPX_FORWARD(Ts, ts)](
304+
size_type i) mutable {
304305
auto it = hpx::util::begin(shape);
305306
std::advance(it, i);
306307
HPX_INVOKE(f, *it, args...);
@@ -501,8 +502,9 @@ namespace hpx::execution::experimental {
501502

502503
hpx::parallel::execution::detail::
503504
index_queue_bulk_sync_execute(pool, first_core,
504-
num_cores, policy, HPX_FORWARD(decltype(f), f),
505-
shape, mask, HPX_FORWARD(decltype(ts), ts)...);
505+
num_cores, policy,
506+
HPX_FORWARD(decltype(f), f), shape, mask,
507+
HPX_FORWARD(decltype(ts), ts)...);
506508
},
507509
HPX_FORWARD(Future, predecessor));
508510
}
@@ -538,8 +540,7 @@ namespace hpx::execution::experimental {
538540
index_queue_bulk_sync_execute(pool,
539541
first_core, num_cores, policy,
540542
HPX_FORWARD(decltype(f), f), shape,
541-
mask,
542-
HPX_FORWARD(decltype(ts), ts)...);
543+
mask, HPX_FORWARD(decltype(ts), ts)...);
543544
},
544545
HPX_FORWARD(Future, predecessor));
545546
}
@@ -557,28 +558,27 @@ namespace hpx::execution::experimental {
557558
size_type i, auto&... receiver_args) mutable {
558559
auto it = hpx::util::begin(shape);
559560
std::advance(it, i);
560-
HPX_INVOKE(
561-
f, *it, args..., receiver_args...);
561+
HPX_INVOKE(f, *it, args..., receiver_args...);
562562
});
563563
return make_future(HPX_MOVE(loop));
564564
}
565565
}
566566
else
567567
{
568568
// Fallback: sender pipeline
569-
auto pre_req = when_all(
570-
keep_future(HPX_FORWARD(Future, predecessor)));
569+
auto pre_req =
570+
when_all(keep_future(HPX_FORWARD(Future, predecessor)));
571571
using size_type = decltype(hpx::util::size(shape));
572572
size_type const n = hpx::util::size(shape);
573-
auto loop = bulk(
574-
transfer(HPX_MOVE(pre_req), exec.sched_), par, n,
575-
[shape, f = HPX_FORWARD(F, f),
576-
... args = HPX_FORWARD(Ts, ts)](
577-
size_type i, auto&... receiver_args) mutable {
578-
auto it = hpx::util::begin(shape);
579-
std::advance(it, i);
580-
HPX_INVOKE(f, *it, args..., receiver_args...);
581-
});
573+
auto loop =
574+
bulk(transfer(HPX_MOVE(pre_req), exec.sched_), par, n,
575+
[shape, f = HPX_FORWARD(F, f),
576+
... args = HPX_FORWARD(Ts, ts)](
577+
size_type i, auto&... receiver_args) mutable {
578+
auto it = hpx::util::begin(shape);
579+
std::advance(it, i);
580+
HPX_INVOKE(f, *it, args..., receiver_args...);
581+
});
582582
return make_future(HPX_MOVE(loop));
583583
}
584584
#else

libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ namespace hpx::execution::experimental::detail {
8787
{
8888
if (num_threads == 0)
8989
return static_cast<std::uint32_t>(n);
90-
// ceiling division: ceil(n / num_threads) one chunk per worker thread
90+
// ceiling division: ceil(n / num_threads) -> one chunk per worker thread
9191
return static_cast<std::uint32_t>(
9292
(n + static_cast<std::size_t>(num_threads) - 1) / num_threads);
9393
}
@@ -184,7 +184,8 @@ namespace hpx::execution::experimental::detail {
184184
auto const i_begin =
185185
static_cast<std::size_t>(index) * op_state->chunk_size;
186186
auto const i_end =
187-
(std::min) (i_begin + op_state->chunk_size, static_cast<std::size_t>(op_state->size));
187+
(std::min) (i_begin + op_state->chunk_size,
188+
static_cast<std::size_t>(op_state->size));
188189

189190
if constexpr (OperationState::is_chunked)
190191
{
@@ -683,8 +684,8 @@ namespace hpx::execution::experimental::detail {
683684
// Handle the queue for the local thread.
684685
if (main_thread_ok)
685686
{
686-
do_work_local(task_function<OperationState>{this->op_state,
687-
local_worker_thread});
687+
do_work_local(task_function<OperationState>{
688+
this->op_state, local_worker_thread});
688689
}
689690
}
690691

libs/core/executors/tests/unit/parallel_scheduler.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,22 @@
1111

1212
#include <atomic>
1313
#include <chrono>
14+
#include <cstddef>
1415
#include <exception>
1516
#include <optional>
1617
#include <set>
1718
#include <stdexcept>
19+
#include <string>
1820
#include <thread>
1921
#include <type_traits>
22+
#include <utility>
2023
#include <vector>
2124

2225
namespace ex = hpx::execution::experimental;
2326

2427
#if defined(HPX_HAVE_STDEXEC)
2528
// Include stdexec async_scope for stop token testing
2629
#include <exec/async_scope.hpp>
27-
#endif
2830

2931
int hpx_main(int, char*[])
3032
{
@@ -394,7 +396,6 @@ int hpx_main(int, char*[])
394396
}
395397
}
396398

397-
#if defined(HPX_HAVE_STDEXEC)
398399
// Stop token support test (P2079R10 requirement)
399400
{
400401
ex::parallel_scheduler sched = ex::get_parallel_scheduler();
@@ -503,10 +504,16 @@ int hpx_main(int, char*[])
503504
// Sequential execution should use only 1 thread
504505
HPX_TEST_EQ(thread_ids.size(), std::size_t(1));
505506
}
506-
#endif
507507

508508
return hpx::local::finalize();
509509
}
510+
#else
511+
int hpx_main(int, char*[])
512+
{
513+
// parallel_scheduler requires HPX_HAVE_STDEXEC
514+
return hpx::local::finalize();
515+
}
516+
#endif
510517

511518
int main(int argc, char* argv[])
512519
{

tests/performance/local/stream.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,7 @@ int hpx_main(hpx::program_options::variables_map& vm)
603603
timing = run_benchmark<>(warmup_iterations, iterations, vector_size,
604604
std::move(alloc), std::move(policy));
605605
}
606+
#if defined(HPX_HAVE_STDEXEC)
606607
else if (executor == 6)
607608
{
608609
// parallel_scheduler natively.
@@ -621,6 +622,7 @@ int hpx_main(hpx::program_options::variables_map& vm)
621622
timing = run_benchmark<>(warmup_iterations, iterations, vector_size,
622623
std::move(alloc), std::move(policy));
623624
}
625+
#endif
624626
else
625627
{
626628
HPX_THROW_EXCEPTION(hpx::error::commandline_option_error,

0 commit comments

Comments
 (0)