Skip to content

Commit eb799a8

Browse files
committed
Switch to executing a parallel algorithm to synchronous execution
... if instructed to run on one core using one chunk - flyby: added missing trait specialization to parallel_executor - flyby: removed superfluous trait specialization for restricted_policy_executor - flyby: making return values from sync_execute and bulk_synch_execute consistent for scheduler_executor Signed-off-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>
1 parent 7190b67 commit eb799a8

13 files changed

Lines changed: 176 additions & 120 deletions

File tree

cmake/templates/std_headers.hpp.in

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@
9696
# include "lct.h"
9797
#endif
9898

99+
#if defined(HPX_HAVE_MODULE_ASYNC_CUDA)
100+
# if defined(HPX_HAVE_CUDA) && defined(HPX_HAVE_GPUBLAS)
101+
# include <cublas_api.h>
102+
# include <cublas_v2.h>
103+
# elif defined(HPX_HAVE_HIP) && defined(HPX_HAVE_GPUBLAS)
104+
# include <hipblas.h>
105+
# endif
106+
#endif
107+
99108
#if defined(HPX_HAVE_MODULE_THRUST)
100109
# include <hpx/thrust/thrust_headers.hpp>
101110
#endif

libs/core/algorithms/include/hpx/parallel/algorithms/merge.hpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -657,13 +657,25 @@ namespace hpx::parallel {
657657
hpx::tuple<Iter1, std::size_t, Iter2, std::size_t, std::size_t>;
658658

659659
auto reshape = [len1, first2, last2, comp, proj1, proj2](
660-
auto&& shape) {
660+
auto&& shape, std::size_t cores) {
661661
#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
662662
static hpx::util::itt::event notify_event("reshape");
663663
hpx::util::itt::mark_event e(notify_event);
664664
#endif
665+
666+
auto shape_size = std::size(shape);
667+
if (shape_size == 1 && cores == 1)
668+
{
669+
// special case: one core, one chunk - the partitioner will
670+
// switch to sequential execution in this case
671+
auto [it1, size1, _] = *std::begin(shape);
672+
auto size2 = detail::distance(first2, last2);
673+
return std::vector<merge_region>(
674+
1, merge_region{it1, size1, first2, size2, 0});
675+
}
676+
665677
std::vector<merge_region> reshaped;
666-
reshaped.reserve(2 * std::size(shape));
678+
reshaped.reserve(2 * shape_size);
667679

668680
Iter2 it2 = first2;
669681
std::size_t dest_start = 0;
@@ -784,7 +796,7 @@ namespace hpx::parallel {
784796
auto reshape = get_reshape_chunks<Iter1>(len1, first2, end2,
785797
comp, proj1, proj2, lower_bound_helper{});
786798

787-
return util::foreach_partitioner<ExPolicy>::call(
799+
return util::foreach_partitioner<std::decay_t<ExPolicy>>::call(
788800
HPX_FORWARD(ExPolicy, policy), first1, len1, HPX_MOVE(f1),
789801
HPX_MOVE(f2), HPX_MOVE(reshape));
790802
}
@@ -797,7 +809,7 @@ namespace hpx::parallel {
797809
auto reshape = get_reshape_chunks<Iter2>(
798810
len2, first1, end1, comp, proj2, proj1, upper_bound_helper{});
799811

800-
return util::foreach_partitioner<ExPolicy>::call(
812+
return util::foreach_partitioner<std::decay_t<ExPolicy>>::call(
801813
HPX_FORWARD(ExPolicy, policy), first2, len2, HPX_MOVE(f1),
802814
HPX_MOVE(f2), HPX_MOVE(reshape));
803815
}

libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -129,18 +129,18 @@ namespace hpx::parallel::util::detail {
129129
typename Stride = std::size_t>
130130
hpx::util::iterator_range<chunk_size_iterator<IterOrR>>
131131
get_bulk_iteration_shape(ExPolicy& policy, IterOrR& it_or_r,
132-
std::size_t& count, Stride s = Stride(1))
132+
std::size_t& count, std::size_t& cores, Stride s = Stride(1))
133133
{
134134
if (count == 0)
135135
{
136+
cores = 1;
136137
auto it = chunk_size_iterator(it_or_r, 1);
137138
return hpx::util::iterator_range(it, it);
138139
}
139140

140-
std::size_t const cores =
141-
hpx::execution::experimental::processing_units_count(
142-
policy.parameters(), policy.executor(),
143-
hpx::chrono::null_duration, count);
141+
cores = hpx::execution::experimental::processing_units_count(
142+
policy.parameters(), policy.executor(), hpx::chrono::null_duration,
143+
count);
144144

145145
std::size_t max_chunks =
146146
hpx::execution::experimental::maximal_number_of_chunks(
@@ -183,10 +183,12 @@ namespace hpx::parallel::util::detail {
183183
typename IterOrR, typename Stride = std::size_t>
184184
hpx::util::iterator_range<chunk_size_iterator<IterOrR>>
185185
get_bulk_iteration_shape(ExPolicy& policy, std::vector<Future>& workitems,
186-
F1&& f1, IterOrR& it_or_r, std::size_t& count, Stride s = Stride(1))
186+
F1&& f1, IterOrR& it_or_r, std::size_t& count, std::size_t& cores,
187+
Stride s = Stride(1))
187188
{
188189
if (count == 0)
189190
{
191+
cores = 1;
190192
auto it = chunk_size_iterator(it_or_r, 1);
191193
return hpx::util::iterator_range(it, it);
192194
}
@@ -219,10 +221,8 @@ namespace hpx::parallel::util::detail {
219221
hpx::execution::experimental::measure_iteration(
220222
policy.parameters(), policy.executor(), test_function, count);
221223

222-
std::size_t const cores =
223-
hpx::execution::experimental::processing_units_count(
224-
policy.parameters(), policy.executor(), iteration_duration,
225-
count);
224+
cores = hpx::execution::experimental::processing_units_count(
225+
policy.parameters(), policy.executor(), iteration_duration, count);
226226

227227
std::size_t max_chunks =
228228
hpx::execution::experimental::maximal_number_of_chunks(
@@ -264,20 +264,20 @@ namespace hpx::parallel::util::detail {
264264
typename Stride = std::size_t>
265265
std::vector<hpx::tuple<IterOrR, std::size_t>>
266266
get_bulk_iteration_shape_variable(ExPolicy& policy, IterOrR& it_or_r,
267-
std::size_t& count, Stride s = Stride(1))
267+
std::size_t& count, std::size_t& cores, Stride s = Stride(1))
268268
{
269269
using tuple_type = hpx::tuple<IterOrR, std::size_t>;
270270
std::vector<tuple_type> shape;
271271

272272
if (count == 0)
273273
{
274+
cores = 1;
274275
return shape;
275276
}
276277

277-
std::size_t const cores =
278-
hpx::execution::experimental::processing_units_count(
279-
policy.parameters(), policy.executor(),
280-
hpx::chrono::null_duration, count);
278+
cores = hpx::execution::experimental::processing_units_count(
279+
policy.parameters(), policy.executor(), hpx::chrono::null_duration,
280+
count);
281281

282282
std::size_t max_chunks =
283283
hpx::execution::experimental::maximal_number_of_chunks(
@@ -338,20 +338,20 @@ namespace hpx::parallel::util::detail {
338338
typename FwdIter, typename Stride = std::size_t>
339339
decltype(auto) get_bulk_iteration_shape(std::false_type, ExPolicy& policy,
340340
std::vector<Future>& workitems, F1&& f1, FwdIter& begin,
341-
std::size_t& count, Stride s = Stride(1))
341+
std::size_t& count, std::size_t& cores, Stride s = Stride(1))
342342
{
343343
return get_bulk_iteration_shape(
344-
policy, workitems, HPX_FORWARD(F1, f1), begin, count, s);
344+
policy, workitems, HPX_FORWARD(F1, f1), begin, count, cores, s);
345345
}
346346

347347
HPX_CXX_EXPORT template <typename ExPolicy, typename Future, typename F1,
348348
typename FwdIter, typename Stride = std::size_t>
349349
decltype(auto) get_bulk_iteration_shape(std::true_type, ExPolicy& policy,
350350
std::vector<Future>& workitems, F1&& f1, FwdIter& begin,
351-
std::size_t& count, Stride s = Stride(1))
351+
std::size_t& count, std::size_t& cores, Stride s = Stride(1))
352352
{
353353
return get_bulk_iteration_shape_variable(
354-
policy, workitems, HPX_FORWARD(F1, f1), begin, count, s);
354+
policy, workitems, HPX_FORWARD(F1, f1), begin, count, cores, s);
355355
}
356356

357357
///////////////////////////////////////////////////////////////////////////
@@ -384,21 +384,21 @@ namespace hpx::parallel::util::detail {
384384
hpx::util::iterator_range<
385385
parallel::util::detail::chunk_size_idx_iterator<FwdIter>>
386386
get_bulk_iteration_shape_idx(ExPolicy& policy, FwdIter begin,
387-
std::size_t count, Stride s = Stride(1))
387+
std::size_t count, std::size_t& cores, Stride s = Stride(1))
388388
{
389389
using iterator =
390390
parallel::util::detail::chunk_size_idx_iterator<FwdIter>;
391391

392392
if (count == 0)
393393
{
394+
cores = 1;
394395
auto it = iterator(begin, 1);
395396
return hpx::util::iterator_range(it, it);
396397
}
397398

398-
std::size_t const cores =
399-
hpx::execution::experimental::processing_units_count(
400-
policy.parameters(), policy.executor(),
401-
hpx::chrono::null_duration, count);
399+
cores = hpx::execution::experimental::processing_units_count(
400+
policy.parameters(), policy.executor(), hpx::chrono::null_duration,
401+
count);
402402

403403
std::size_t max_chunks =
404404
hpx::execution::experimental::maximal_number_of_chunks(
@@ -447,13 +447,14 @@ namespace hpx::parallel::util::detail {
447447
parallel::util::detail::chunk_size_idx_iterator<FwdIter>>
448448
get_bulk_iteration_shape_idx(ExPolicy& policy,
449449
std::vector<Future>& workitems, F1&& f1, FwdIter begin,
450-
std::size_t count, Stride s = Stride(1))
450+
std::size_t count, std::size_t& cores, Stride s = Stride(1))
451451
{
452452
using iterator =
453453
parallel::util::detail::chunk_size_idx_iterator<FwdIter>;
454454

455455
if (count == 0)
456456
{
457+
cores = 1;
457458
auto it = iterator(begin, 1);
458459
return hpx::util::iterator_range(it, it);
459460
}
@@ -488,10 +489,8 @@ namespace hpx::parallel::util::detail {
488489
hpx::execution::experimental::measure_iteration(
489490
policy.parameters(), policy.executor(), test_function, count);
490491

491-
std::size_t const cores =
492-
hpx::execution::experimental::processing_units_count(
493-
policy.parameters(), policy.executor(), iteration_duration,
494-
count);
492+
cores = hpx::execution::experimental::processing_units_count(
493+
policy.parameters(), policy.executor(), iteration_duration, count);
495494

496495
std::size_t max_chunks =
497496
hpx::execution::experimental::maximal_number_of_chunks(
@@ -536,20 +535,20 @@ namespace hpx::parallel::util::detail {
536535
typename Stride = std::size_t>
537536
std::vector<hpx::tuple<FwdIter, std::size_t, std::size_t>>
538537
get_bulk_iteration_shape_idx_variable(ExPolicy& policy, FwdIter first,
539-
std::size_t count, Stride s = Stride(1))
538+
std::size_t count, std::size_t& cores, Stride s = Stride(1))
540539
{
541540
using tuple_type = hpx::tuple<FwdIter, std::size_t, std::size_t>;
542541
std::vector<tuple_type> shape;
543542

544543
if (count == 0)
545544
{
545+
cores = 1;
546546
return shape;
547547
}
548548

549-
std::size_t const cores =
550-
hpx::execution::experimental::processing_units_count(
551-
policy.parameters(), policy.executor(),
552-
hpx::chrono::null_duration, count);
549+
cores = hpx::execution::experimental::processing_units_count(
550+
policy.parameters(), policy.executor(), hpx::chrono::null_duration,
551+
count);
553552

554553
std::size_t max_chunks =
555554
hpx::execution::experimental::maximal_number_of_chunks(

libs/core/algorithms/include/hpx/parallel/util/detail/partitioner_iteration.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@ namespace hpx::parallel::util::detail {
4848
template <typename Archive>
4949
void serialize(Archive& ar, unsigned)
5050
{
51-
// clang-format off
5251
ar & f_;
53-
// clang-format on
5452
}
5553
};
5654
} // namespace hpx::parallel::util::detail

libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <hpx/modules/async_combinators.hpp>
1616
#include <hpx/modules/execution.hpp>
1717
#include <hpx/modules/execution_base.hpp>
18+
#include <hpx/modules/pack_traversal.hpp>
1819
#include <hpx/parallel/util/detail/chunk_size.hpp>
1920
#include <hpx/parallel/util/detail/handle_local_exceptions.hpp>
2021
#include <hpx/parallel/util/detail/partitioner_iteration.hpp>
@@ -48,42 +49,78 @@ namespace hpx::parallel::util::detail {
4849
hpx::execution::experimental::extract_invokes_testing_function_v<
4950
parameters_type>;
5051

52+
std::size_t cores = 1;
5153
if constexpr (has_variable_chunk_size)
5254
{
5355
static_assert(!invokes_testing_function,
5456
"parameters object should not expose both, "
5557
"has_variable_chunk_size and invokes_testing_function");
5658

5759
auto&& shape = detail::get_bulk_iteration_shape_idx_variable(
58-
policy, first, count);
60+
policy, first, count, cores);
5961

6062
return execution::bulk_async_execute(policy.executor(),
6163
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
6264
reshape(HPX_MOVE(shape)));
6365
}
6466
else if constexpr (!invokes_testing_function)
6567
{
66-
auto&& shape =
67-
detail::get_bulk_iteration_shape_idx(policy, first, count);
68+
auto&& shape = detail::get_bulk_iteration_shape_idx(
69+
policy, first, count, cores);
70+
71+
using executor_type = decltype(policy.executor());
72+
auto&& reshaped = reshape(HPX_MOVE(shape), cores);
6873

69-
if constexpr (hpx::is_async_execution_policy_v<ExPolicy>)
74+
// We attempt to perform some optimizations in case of non-task
75+
// execution.
76+
if constexpr (!hpx::is_async_execution_policy_v<ExPolicy> &&
77+
!hpx::execution_policy_has_scheduler_executor_v<ExPolicy>)
7078
{
71-
return execution::bulk_async_execute(policy.executor(),
79+
// Switch to sequential execution for one-core, one-chunk case
80+
// if the executor supports it.
81+
if constexpr (hpx::traits::is_one_way_executor_v<executor_type>)
82+
{
83+
if (cores == 1 && std::size(reshaped) == 1)
84+
{
85+
return execution::sync_execute(policy.executor(),
86+
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
87+
*std::begin(HPX_MOVE(reshaped)));
88+
}
89+
}
90+
91+
if constexpr (hpx::traits::is_bulk_one_way_executor_v<
92+
executor_type>)
93+
{
94+
return execution::bulk_sync_execute(policy.executor(),
95+
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
96+
HPX_MOVE(reshaped));
97+
}
98+
99+
// Fall back if given executor doesn't support any of the above
100+
// optimizations.
101+
auto&& items = execution::bulk_async_execute(policy.executor(),
72102
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
73-
reshape(HPX_MOVE(shape)));
103+
HPX_MOVE(reshaped));
104+
if (hpx::wait_all_nothrow(items))
105+
{
106+
using handle_local_exceptions =
107+
detail::handle_local_exceptions<ExPolicy>;
108+
handle_local_exceptions::call(items);
109+
}
110+
return hpx::unwrap(items);
74111
}
75112
else
76113
{
77-
return execution::bulk_sync_execute(policy.executor(),
114+
return execution::bulk_async_execute(policy.executor(),
78115
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
79-
reshape(HPX_MOVE(shape)));
116+
HPX_MOVE(reshaped));
80117
}
81118
}
82119
else
83120
{
84121
std::vector<hpx::future<Result>> inititems;
85122
auto&& shape = detail::get_bulk_iteration_shape_idx(
86-
policy, inititems, f, first, count);
123+
policy, inititems, f, first, count, cores);
87124

88125
auto&& workitems = execution::bulk_async_execute(policy.executor(),
89126
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
@@ -96,6 +133,17 @@ namespace hpx::parallel::util::detail {
96133
///////////////////////////////////////////////////////////////////////
97134
// The static partitioner simply spawns one chunk of iterations for
98135
// each available core.
136+
HPX_CXX_EXPORT struct default_reshape
137+
{
138+
using is_transparent = std::true_type;
139+
140+
template <typename T, typename... Ts>
141+
HPX_HOST_DEVICE constexpr T&& operator()(T&& t, Ts&&...) const noexcept
142+
{
143+
return HPX_FORWARD(T, t);
144+
}
145+
};
146+
99147
HPX_CXX_EXPORT template <typename ExPolicy, typename Result>
100148
struct foreach_static_partitioner
101149
{
@@ -106,7 +154,7 @@ namespace hpx::parallel::util::detail {
106154
detail::handle_local_exceptions<ExPolicy>;
107155

108156
template <typename ExPolicy_, typename FwdIter, typename F1,
109-
typename F2, typename ReShape = hpx::identity>
157+
typename F2, typename ReShape = default_reshape>
110158
static auto call(ExPolicy_&& policy, FwdIter first, std::size_t count,
111159
F1&& f1, F2&& f2, ReShape&& reshape = ReShape{})
112160
{
@@ -213,7 +261,7 @@ namespace hpx::parallel::util::detail {
213261
detail::handle_local_exceptions<ExPolicy>;
214262

215263
template <typename ExPolicy_, typename FwdIter, typename F1,
216-
typename F2, typename ReShape = hpx::identity>
264+
typename F2, typename ReShape = default_reshape>
217265
static decltype(auto) call(ExPolicy_&& policy, FwdIter first,
218266
std::size_t count, F1&& f1, F2&& f2, ReShape&& reshape = ReShape{})
219267
{

0 commit comments

Comments
 (0)