Skip to content

Commit 2e0a0f9

Browse files
executors: fix performance regressions and optimize parallel_executor
1 parent d7c9fe1 commit 2e0a0f9

4 files changed

Lines changed: 64 additions & 35 deletions

File tree

libs/core/executors/include/hpx/executors/fork_join_executor.hpp

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,8 +1270,7 @@ namespace hpx::execution::experimental {
12701270
/// \brief Construct a fork_join_executor.
12711271
///
12721272
/// \param priority The priority of the worker threads.
1273-
/// \param stacksize The stacksize of the worker threads. Must not be
1274-
/// nostack.
1273+
/// \param stacksize The stacksize of the worker threads.
12751274
/// \param sched The loop schedule of the parallel regions.
12761275
/// \param yield_delay The time after which the executor yields to other
12771276
/// work if it has not received any new work for execution.
@@ -1291,8 +1290,7 @@ namespace hpx::execution::experimental {
12911290
///
12921291
/// \param pu_mask The PU-mask to use for placing the created threads
12931292
/// \param priority The priority of the worker threads.
1294-
/// \param stacksize The stacksize of the worker threads. Must not be
1295-
/// nostack.
1293+
/// \param stacksize The stacksize of the worker threads.
12961294
/// \param sched The loop schedule of the parallel regions.
12971295
/// \param yield_delay The time after which the executor yields to other
12981296
/// work if it has not received any new work for execution.
@@ -1347,6 +1345,23 @@ namespace hpx::execution::experimental {
13471345
return exec.shared_data_->pu_mask_;
13481346
}
13491347

1348+
friend std::size_t tag_invoke(
1349+
hpx::execution::experimental::get_first_core_t,
1350+
fork_join_executor const& exec) noexcept
1351+
{
1352+
return shared_data::get_first_core(exec.shared_data_->pu_mask_);
1353+
}
1354+
1355+
template <typename Parameters>
1356+
friend std::size_t tag_invoke(
1357+
hpx::execution::experimental::processing_units_count_t,
1358+
Parameters&&, fork_join_executor const& exec,
1359+
hpx::chrono::steady_duration const& = hpx::chrono::null_duration,
1360+
std::size_t = 0) noexcept
1361+
{
1362+
return exec.shared_data_->num_threads_;
1363+
}
1364+
13501365
/// \cond NOINTERNAL
13511366
enum class init_mode : std::uint8_t
13521367
{

libs/core/executors/include/hpx/executors/parallel_executor.hpp

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ namespace hpx::execution {
161161
, policy_(rhs.policy_)
162162
, first_core_(rhs.first_core_)
163163
, num_cores_(rhs.num_cores_)
164-
, mask_(nullptr) // force recomputing cached pu mask
165164
#if defined(HPX_HAVE_THREAD_DESCRIPTION)
166165
, annotation_(rhs.annotation_)
167166
#endif
@@ -178,7 +177,6 @@ namespace hpx::execution {
178177
policy_ = rhs.policy_;
179178
first_core_ = rhs.first_core_;
180179
num_cores_ = rhs.num_cores_;
181-
mask_ = nullptr; // force recomputing cached pu mask
182180

183181
#if defined(HPX_HAVE_THREAD_DESCRIPTION)
184182
annotation_ = rhs.annotation_;
@@ -187,10 +185,7 @@ namespace hpx::execution {
187185
return *this;
188186
}
189187

190-
constexpr ~parallel_policy_executor_base()
191-
{
192-
delete mask_;
193-
}
188+
constexpr ~parallel_policy_executor_base() = default;
194189

195190
// backwards compatibility support, will be removed in the future
196191
template <typename Parameters>
@@ -345,11 +340,6 @@ namespace hpx::execution {
345340

346341
hpx::threads::mask_type pu_mask() const
347342
{
348-
//if (mask_ != nullptr && hpx::threads::any(*mask_))
349-
//{
350-
// return *mask_;
351-
//}
352-
353343
auto const num_threads = get_num_cores();
354344
auto const available_threads = static_cast<std::uint32_t>(
355345
pool()->get_active_os_thread_count());
@@ -376,13 +366,6 @@ namespace hpx::execution {
376366
}
377367
}
378368

379-
// `mask_` is conceptually mutable, however in order to constexpr
380-
// construct this object we need to use a `const_cast` to cache
381-
// the mask.
382-
383-
//*const_cast<hpx::threads::mask_type**>(&mask_) =
384-
// new hpx::threads::mask_type(mask);
385-
386369
return mask;
387370
}
388371

@@ -397,7 +380,6 @@ namespace hpx::execution {
397380
Policy policy_;
398381
std::size_t first_core_ = 0;
399382
std::size_t num_cores_ = 0;
400-
hpx::threads::mask_type* mask_ = nullptr;
401383
#if defined(HPX_HAVE_THREAD_DESCRIPTION)
402384
char const* annotation_ = nullptr;
403385
#endif

libs/core/executors/tests/unit/fork_join_executor.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ void test_processing_mask(ExecutorArgs&&... args)
4949

5050
auto const cores_mask = hpx::execution::experimental::get_cores_mask(exec);
5151
HPX_TEST(cores_mask == expected_mask);
52+
53+
auto const expected_pu_count = hpx::threads::count(pus_mask);
54+
HPX_TEST_EQ(expected_pu_count, hpx::threads::count(expected_mask));
55+
56+
auto const pu_count =
57+
hpx::execution::experimental::processing_units_count(exec);
58+
HPX_TEST_EQ(pu_count, expected_pu_count);
59+
60+
auto const first_core = hpx::execution::experimental::get_first_core(exec);
61+
HPX_TEST_EQ(first_core, hpx::threads::find_first(expected_mask));
5262
}
5363

5464
///////////////////////////////////////////////////////////////////////////////

libs/core/executors/tests/unit/parallel_executor.cpp

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -187,21 +187,43 @@ void test_processing_mask()
187187
{
188188
hpx::execution::parallel_executor exec;
189189

190+
auto const pool = hpx::threads::detail::get_self_or_default_pool();
191+
auto const expected_mask =
192+
pool->get_used_processing_units(pool->get_os_thread_count(), false);
193+
194+
// 1. Initial access returns the expected processing units mask
195+
auto const mask1 =
196+
hpx::execution::experimental::get_processing_units_mask(exec);
197+
HPX_TEST(mask1 == expected_mask);
198+
199+
// 2. Repeated access returns the same mask
200+
auto const mask2 =
201+
hpx::execution::experimental::get_processing_units_mask(exec);
202+
HPX_TEST(mask2 == expected_mask);
203+
204+
// 3. Copy construction preserves the observable mask behavior
205+
hpx::execution::parallel_executor exec_copy = exec;
206+
auto const mask3 =
207+
hpx::execution::experimental::get_processing_units_mask(exec_copy);
208+
HPX_TEST(mask3 == expected_mask);
209+
210+
// 4. Copy assignment preserves the observable mask behavior
211+
hpx::execution::parallel_executor exec_assign;
212+
// Exercise the assignee before assignment to verify repeated use remains correct
213+
hpx::execution::experimental::get_processing_units_mask(exec_assign);
214+
215+
// Assign from another executor and verify the resulting mask is still correct
216+
exec_assign = exec;
217+
auto const mask4 =
218+
hpx::execution::experimental::get_processing_units_mask(exec_assign);
219+
HPX_TEST(mask4 == expected_mask);
220+
221+
// 5. Cores mask test
190222
{
191-
auto const pool = hpx::threads::detail::get_self_or_default_pool();
192-
auto const expected_mask =
193-
pool->get_used_processing_units(pool->get_os_thread_count(), false);
194-
auto const mask =
195-
hpx::execution::experimental::get_processing_units_mask(exec);
196-
HPX_TEST(mask == expected_mask);
197-
}
198-
199-
{
200-
auto const pool = hpx::threads::detail::get_self_or_default_pool();
201-
auto const expected_mask =
223+
auto const expected_cores =
202224
pool->get_used_processing_units(pool->get_os_thread_count(), true);
203225
auto const mask = hpx::execution::experimental::get_cores_mask(exec);
204-
HPX_TEST(mask == expected_mask);
226+
HPX_TEST(mask == expected_cores);
205227
}
206228
}
207229

0 commit comments

Comments
 (0)