Skip to content

Commit 9f02f2a

Browse files
committed
Apply partitioner optimizations to all code paths
Signed-off-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>
1 parent feed1d5 commit 9f02f2a

2 files changed

Lines changed: 130 additions & 15 deletions

File tree

libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp

Lines changed: 129 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <hpx/modules/execution.hpp>
1818
#include <hpx/modules/execution_base.hpp>
1919
#include <hpx/modules/iterator_support.hpp>
20+
#include <hpx/modules/pack_traversal.hpp>
2021
#include <hpx/modules/type_support.hpp>
2122
#include <hpx/parallel/util/detail/chunk_size.hpp>
2223
#include <hpx/parallel/util/detail/handle_local_exceptions.hpp>
@@ -69,9 +70,52 @@ namespace hpx::parallel::util::detail {
6970
auto&& shape =
7071
detail::get_bulk_iteration_shape(policy, it_or_r, count, cores);
7172

72-
return execution::bulk_async_execute(policy.executor(),
73-
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
74-
HPX_MOVE(shape));
73+
using executor_type = decltype(policy.executor());
74+
75+
// We attempt to perform some optimizations in case of non-task
76+
// execution.
77+
if constexpr (!hpx::is_async_execution_policy_v<ExPolicy> &&
78+
!hpx::execution_policy_has_scheduler_executor_v<ExPolicy>)
79+
{
80+
// Switch to sequential execution for one-core, one-chunk case
81+
// if the executor supports it.
82+
if constexpr (hpx::traits::is_one_way_executor_v<executor_type>)
83+
{
84+
if (cores == 1 && std::size(shape) == 1)
85+
{
86+
return execution::sync_execute(policy.executor(),
87+
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
88+
*std::begin(HPX_MOVE(shape)));
89+
}
90+
}
91+
92+
if constexpr (hpx::traits::is_bulk_one_way_executor_v<
93+
executor_type>)
94+
{
95+
return execution::bulk_sync_execute(policy.executor(),
96+
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
97+
HPX_MOVE(shape));
98+
}
99+
100+
// Fall back if given executor doesn't support any of the above
101+
// optimizations.
102+
auto&& items = execution::bulk_async_execute(policy.executor(),
103+
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
104+
HPX_MOVE(shape));
105+
if (hpx::wait_all_nothrow(items))
106+
{
107+
using handle_local_exceptions =
108+
detail::handle_local_exceptions<ExPolicy>;
109+
handle_local_exceptions::call(items);
110+
}
111+
return hpx::unwrap(items);
112+
}
113+
else
114+
{
115+
return execution::bulk_async_execute(policy.executor(),
116+
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
117+
HPX_MOVE(shape));
118+
}
75119
}
76120
else
77121
{
@@ -122,9 +166,52 @@ namespace hpx::parallel::util::detail {
122166
auto&& shape = detail::get_bulk_iteration_shape_idx(
123167
policy, first, count, cores, stride);
124168

125-
return execution::bulk_async_execute(policy.executor(),
126-
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
127-
HPX_MOVE(shape));
169+
using executor_type = decltype(policy.executor());
170+
171+
// We attempt to perform some optimizations in case of non-task
172+
// execution.
173+
if constexpr (!hpx::is_async_execution_policy_v<ExPolicy> &&
174+
!hpx::execution_policy_has_scheduler_executor_v<ExPolicy>)
175+
{
176+
// Switch to sequential execution for one-core, one-chunk case
177+
// if the executor supports it.
178+
if constexpr (hpx::traits::is_one_way_executor_v<executor_type>)
179+
{
180+
if (cores == 1 && std::size(shape) == 1)
181+
{
182+
return execution::sync_execute(policy.executor(),
183+
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
184+
*std::begin(HPX_MOVE(shape)));
185+
}
186+
}
187+
188+
if constexpr (hpx::traits::is_bulk_one_way_executor_v<
189+
executor_type>)
190+
{
191+
return execution::bulk_sync_execute(policy.executor(),
192+
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
193+
HPX_MOVE(shape));
194+
}
195+
196+
// Fall back if given executor doesn't support any of the above
197+
// optimizations.
198+
auto&& items = execution::bulk_async_execute(policy.executor(),
199+
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
200+
HPX_MOVE(shape));
201+
if (hpx::wait_all_nothrow(items))
202+
{
203+
using handle_local_exceptions =
204+
detail::handle_local_exceptions<ExPolicy>;
205+
handle_local_exceptions::call(items);
206+
}
207+
return hpx::unwrap(items);
208+
}
209+
else
210+
{
211+
return execution::bulk_async_execute(policy.executor(),
212+
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
213+
HPX_MOVE(shape));
214+
}
128215
}
129216
else
130217
{
@@ -207,13 +294,26 @@ namespace hpx::parallel::util::detail {
207294

208295
try
209296
{
210-
auto&& items =
297+
if constexpr (std::is_void_v<decltype(detail::partition<Result>(
298+
policy, first, count, f1))>)
299+
{
211300
detail::partition<Result>(HPX_FORWARD(ExPolicy_, policy),
212301
first, count, HPX_FORWARD(F1, f1));
213302

214-
scoped_params.mark_end_of_scheduling();
303+
scoped_params.mark_end_of_scheduling();
215304

216-
return reduce(HPX_MOVE(items), HPX_FORWARD(F2, f2));
305+
return HPX_INVOKE(f2);
306+
}
307+
else
308+
{
309+
auto&& items = detail::partition<Result>(
310+
HPX_FORWARD(ExPolicy_, policy), first, count,
311+
HPX_FORWARD(F1, f1));
312+
313+
scoped_params.mark_end_of_scheduling();
314+
315+
return reduce(HPX_MOVE(items), HPX_FORWARD(F2, f2));
316+
}
217317
}
218318
catch (...)
219319
{
@@ -236,13 +336,28 @@ namespace hpx::parallel::util::detail {
236336

237337
try
238338
{
239-
auto&& items = detail::partition_with_index<Result>(
240-
HPX_FORWARD(ExPolicy_, policy), first, count, stride,
241-
HPX_FORWARD(F1, f1));
339+
if constexpr (std::is_void_v<
340+
decltype(detail::partition_with_index<Result>(
341+
policy, first, count, stride, f1))>)
342+
{
343+
detail::partition_with_index<Result>(
344+
HPX_FORWARD(ExPolicy_, policy), first, count, stride,
345+
HPX_FORWARD(F1, f1));
242346

243-
scoped_params.mark_end_of_scheduling();
347+
scoped_params.mark_end_of_scheduling();
244348

245-
return reduce(HPX_MOVE(items), HPX_FORWARD(F2, f2));
349+
return HPX_INVOKE(HPX_FORWARD(F2, f2));
350+
}
351+
else
352+
{
353+
auto&& items = detail::partition_with_index<Result>(
354+
HPX_FORWARD(ExPolicy_, policy), first, count, stride,
355+
HPX_FORWARD(F1, f1));
356+
357+
scoped_params.mark_end_of_scheduling();
358+
359+
return reduce(HPX_MOVE(items), HPX_FORWARD(F2, f2));
360+
}
246361
}
247362
catch (...)
248363
{

libs/core/algorithms/include/hpx/parallel/util/partitioner_with_cleanup.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ namespace hpx::parallel::util {
6666

6767
try
6868
{
69-
const bool has_scheduler_executor =
69+
constexpr bool has_scheduler_executor =
7070
hpx::execution_policy_has_scheduler_executor_v<
7171
ExPolicy_>;
7272

0 commit comments

Comments
 (0)