-
-
Notifications
You must be signed in to change notification settings - Fork 543
Expand file tree
/
Copy pathsender_diamond.cpp
More file actions
98 lines (80 loc) · 2.98 KB
/
sender_diamond.cpp
File metadata and controls
98 lines (80 loc) · 2.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Copyright (c) 2026 Kashy Namboothiri
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
// Note: This example is excluded from MSVC builds with C++ modules enabled
// due to a compiler ICE. See CMakeLists.txt for the build guard.
// Demonstrates a diamond dependency pattern using HPX sender/receiver
// primitives. A single sender (A) is shared between two independent continuations
// (B and C) using split. Their results are then joined by when_all and merged
// in a final step (D).
// Dependency structure:
//
// A (produce initial value)
// / \ split dependencies
// B C (independent transforms, dispatched to thread pool)
// \ / merge dependencies
// D (merge)
#include <hpx/assert.hpp>
#include <hpx/execution.hpp>
#include <hpx/init.hpp>
#include <iostream>
#include <utility>
namespace ex = hpx::execution::experimental;
namespace tt = hpx::this_thread::experimental;
int hpx_main()
{
// A. Produces an initial value
auto a = ex::just(10) | ex::then([](int x) {
std::cout << "A: produced " << x << "\n";
return x;
});
// split makes the sender A safe to connect more than once
auto a_shared = ex::split(std::move(a));
auto sched = ex::thread_pool_scheduler{};
// Keep the stable runtime behavior here and silence the local deprecation
// diagnostic so warning-as-error builds do not fail.
#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#elif defined(__GNUC__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
// B. Double the value from A
auto b = a_shared | ex::transfer(sched) | ex::then([](int x) {
int result = x * 2;
std::cout << "B: " << x << " * 2 = " << result << "\n";
return result;
});
// C. Triple the value from A
auto c = a_shared | ex::transfer(sched) | ex::then([](int x) {
int result = x * 3;
std::cout << "C: " << x << " * 3 = " << result << "\n";
return result;
});
#if defined(__clang__)
#pragma clang diagnostic pop
#elif defined(__GNUC__)
#pragma GCC diagnostic pop
#endif
// D. Join B and C, then sum their results
auto d = ex::then(
ex::when_all(std::move(b), std::move(c)), [](int from_b, int from_c) {
int result = from_b + from_c;
std::cout << "D: " << from_b << " + " << from_c << " = " << result
<< "\n";
return result;
});
// NOLINTNEXTLINE(bugprone-unchecked-optional-access)
auto result = hpx::get<0>(*tt::sync_wait(std::move(d)));
std::cout << "Final Result: " << result << "\n";
// expected: A=10, B=20, C=30, D=50
HPX_ASSERT(result == 50);
return hpx::local::finalize();
}
int main(int argc, char* argv[])
{
return hpx::local::init(hpx_main, argc, argv);
}