Skip to content

Commit ad9f249

Browse files
committed
Remove 'flush' stage
Includes: - minor cleanups - renaming product_store::id() to product_store::index()
1 parent 1e2b080 commit ad9f249

26 files changed

+300
-414
lines changed

phlex/core/CMakeLists.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ cet_make_library(
2222
glue.cpp
2323
input_arguments.cpp
2424
message.cpp
25-
message_sender.cpp
2625
node_catalog.cpp
2726
multiplexer.cpp
2827
products_consumer.cpp
@@ -61,7 +60,6 @@ install(
6160
graph_proxy.hpp
6261
input_arguments.hpp
6362
message.hpp
64-
message_sender.hpp
6563
multiplexer.hpp
6664
node_catalog.hpp
6765
product_query.hpp

phlex/core/declared_fold.hpp

Lines changed: 29 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
#include "oneapi/tbb/concurrent_unordered_map.h"
2121
#include "oneapi/tbb/flow_graph.h"
22-
#include "spdlog/spdlog.h"
2322

2423
#include <atomic>
2524
#include <cassert>
@@ -43,7 +42,7 @@ namespace phlex::experimental {
4342

4443
virtual tbb::flow::sender<message>& sender() = 0;
4544
virtual tbb::flow::sender<message>& to_output() = 0;
46-
virtual tbb::flow::receiver<message>& flush_port() = 0;
45+
virtual tbb::flow::receiver<flush_message>& flush_port() = 0;
4746
virtual product_specifications const& output() const = 0;
4847
virtual std::size_t product_count() const = 0;
4948
};
@@ -77,40 +76,36 @@ namespace phlex::experimental {
7776
initializer_{std::move(initializer)},
7877
output_{to_product_specifications(full_name(), std::move(output), make_type_ids<R>())},
7978
partition_{std::move(partition)},
80-
flush_receiver_{
81-
g,
82-
tbb::flow::unlimited,
83-
[this](message const& msg) -> tbb::flow::continue_msg {
84-
auto const& [store, original_message_id] = std::tie(msg.store, msg.original_id);
85-
if (store->index()->layer_name() != partition_) {
86-
return {};
87-
}
88-
89-
counter_for(store->index()->hash()).set_flush_value(store, original_message_id);
90-
emit_and_evict_if_done(store->index());
91-
return {};
92-
}},
79+
flush_receiver_{g,
80+
tbb::flow::unlimited,
81+
[this](flush_message const& msg) -> tbb::flow::continue_msg {
82+
auto const& [index, counts, original_message_id] = msg;
83+
if (index->layer_name() != partition_) {
84+
return {};
85+
}
86+
87+
counter_for(index->hash()).set_flush_value(counts, original_message_id);
88+
emit_and_evict_if_done(index);
89+
return {};
90+
}},
9391
join_{make_join_or_none(g, std::make_index_sequence<N>{})},
9492
fold_{
9593
g, concurrency, [this, ft = alg.release_algorithm()](messages_t<N> const& messages, auto&) {
9694
// N.B. The assumption is that a fold will *never* need to cache
9795
// the product store it creates. Any flush messages *do not* need
9896
// to be propagated to downstream nodes.
9997
auto const& msg = most_derived(messages);
100-
auto const& store = msg.store;
101-
102-
assert(not store->is_flush());
98+
auto const& index = msg.store->index();
10399

104-
if (not store->index()->parent(partition_)) {
100+
auto fold_index = index->parent(partition_);
101+
if (not fold_index) {
105102
return;
106103
}
107104

108-
auto const& fold_index = store->index()->parent(partition_);
109-
assert(fold_index);
110-
auto const& id_hash_for_counter = fold_index->hash();
105+
auto const& index_hash_for_counter = fold_index->hash();
111106

112107
call(ft, messages, std::make_index_sequence<N>{});
113-
counter_for(id_hash_for_counter).increment(store->index()->layer_hash());
108+
counter_for(index_hash_for_counter).increment(index->layer_hash());
114109

115110
emit_and_evict_if_done(fold_index);
116111
}}
@@ -123,7 +118,7 @@ namespace phlex::experimental {
123118
{
124119
if (auto counter = done_with(fold_index->hash())) {
125120
auto parent = std::make_shared<product_store>(fold_index, this->full_name());
126-
commit_(*parent);
121+
commit_(parent);
127122
++product_count_;
128123
output_port<0>(fold_).try_put({parent, counter->original_message_id()});
129124
}
@@ -136,21 +131,21 @@ namespace phlex::experimental {
136131

137132
std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }
138133

139-
tbb::flow::receiver<message>& flush_port() override { return flush_receiver_; }
134+
tbb::flow::receiver<flush_message>& flush_port() override { return flush_receiver_; }
140135
tbb::flow::sender<message>& sender() override { return output_port<0ull>(fold_); }
141136
tbb::flow::sender<message>& to_output() override { return sender(); }
142137
product_specifications const& output() const override { return output_; }
143138

144139
template <std::size_t... Is>
145140
void call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
146141
{
147-
auto const& parent_id = *most_derived(messages).store->index()->parent(partition_);
142+
auto const parent_index = most_derived(messages).store->index()->parent(partition_);
148143
// FIXME: Not the safest approach!
149-
auto it = results_.find(parent_id);
144+
auto it = results_.find(parent_index->hash());
150145
if (it == results_.end()) {
151146
it =
152147
results_
153-
.insert({parent_id,
148+
.insert({parent_index->hash(),
154149
initialized_object(std::move(initializer_),
155150
std::make_index_sequence<std::tuple_size_v<InitTuple>>{})})
156151
.first;
@@ -169,13 +164,13 @@ namespace phlex::experimental {
169164
new R{std::forward<std::tuple_element_t<Is, InitTuple>>(std::get<Is>(tuple))...}};
170165
}
171166

172-
void commit_(product_store& store)
167+
auto commit_(product_store_ptr& store)
173168
{
174-
auto& result = results_.at(*store.index());
169+
auto& result = results_.at(store->index()->hash());
175170
if constexpr (requires { send(*result); }) {
176-
store.add_product(output()[0].name(), send(*result));
171+
store->add_product(output()[0].name(), send(*result));
177172
} else {
178-
store.add_product(output()[0].name(), std::move(*result));
173+
store->add_product(output()[0].name(), std::move(result));
179174
}
180175
// Reclaim some memory; it would be better to erase the entire entry from the map,
181176
// but that is not thread-safe.
@@ -186,10 +181,10 @@ namespace phlex::experimental {
186181
input_retriever_types<input_parameter_types> input_{input_arguments<input_parameter_types>()};
187182
product_specifications output_;
188183
std::string partition_;
189-
tbb::flow::function_node<message> flush_receiver_;
184+
tbb::flow::function_node<flush_message> flush_receiver_;
190185
join_or_none_t<N> join_;
191186
tbb::flow::multifunction_node<messages_t<N>, messages_t<1>> fold_;
192-
tbb::concurrent_unordered_map<data_cell_index, std::unique_ptr<R>> results_;
187+
tbb::concurrent_unordered_map<data_cell_index::hash_type, std::unique_ptr<R>> results_;
193188
std::atomic<std::size_t> calls_;
194189
std::atomic<std::size_t> product_count_;
195190
};

phlex/core/declared_observer.hpp

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ namespace phlex::experimental {
3737
product_queries input_products);
3838
virtual ~declared_observer();
3939

40-
virtual tbb::flow::receiver<message>& flush_port() = 0;
40+
virtual tbb::flow::receiver<flush_message>& flush_port() = 0;
4141

4242
protected:
4343
using hashes_t = tbb::concurrent_hash_map<data_cell_index::hash_type, bool>;
@@ -70,10 +70,11 @@ namespace phlex::experimental {
7070
declared_observer{std::move(name), std::move(predicates), std::move(input_products)},
7171
flush_receiver_{g,
7272
tbb::flow::unlimited,
73-
[this](message const& msg) -> tbb::flow::continue_msg {
74-
receive_flush(msg);
75-
if (done_with(msg.store)) {
76-
cached_hashes_.erase(msg.store->index()->hash());
73+
[this](flush_message const& msg) -> tbb::flow::continue_msg {
74+
auto const hash = msg.index->hash();
75+
receive_flush(hash);
76+
if (done_with(hash)) {
77+
cached_hashes_.erase(hash);
7778
}
7879
return {};
7980
}},
@@ -83,18 +84,17 @@ namespace phlex::experimental {
8384
[this, ft = alg.release_algorithm()](
8485
messages_t<N> const& messages) -> oneapi::tbb::flow::continue_msg {
8586
auto const& msg = most_derived(messages);
86-
auto const& [store, message_id] = std::tie(msg.store, msg.id);
87-
88-
assert(not store->is_flush());
87+
auto const& store = msg.store;
88+
auto const hash = store->index()->hash();
8989

9090
if (accessor a; needs_new(store, a)) {
9191
call(ft, messages, std::make_index_sequence<N>{});
9292
a->second = true;
93-
flag_for(store->index()->hash()).mark_as_processed();
93+
flag_for(hash).mark_as_processed();
9494
}
9595

96-
if (done_with(store)) {
97-
cached_hashes_.erase(store->index()->hash());
96+
if (done_with(hash)) {
97+
cached_hashes_.erase(hash);
9898
}
9999
return {};
100100
}}
@@ -112,14 +112,15 @@ namespace phlex::experimental {
112112

113113
std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }
114114

115-
tbb::flow::receiver<message>& flush_port() override { return flush_receiver_; }
115+
tbb::flow::receiver<flush_message>& flush_port() override { return flush_receiver_; }
116116

117117
bool needs_new(product_store_const_ptr const& store, accessor& a)
118118
{
119-
if (cached_hashes_.count(store->index()->hash()) > 0ull) {
119+
auto const hash = store->index()->hash();
120+
if (cached_hashes_.count(hash) > 0ull) {
120121
return false;
121122
}
122-
return cached_hashes_.insert(a, store->index()->hash());
123+
return cached_hashes_.insert(a, hash);
123124
}
124125

125126
template <std::size_t... Is>
@@ -132,7 +133,7 @@ namespace phlex::experimental {
132133
std::size_t num_calls() const final { return calls_.load(); }
133134

134135
input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
135-
tbb::flow::function_node<message> flush_receiver_;
136+
tbb::flow::function_node<flush_message> flush_receiver_;
136137
join_or_none_t<N> join_;
137138
tbb::flow::function_node<messages_t<N>> observer_;
138139
hashes_t cached_hashes_;

phlex/core/declared_output.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ namespace phlex::experimental {
1010
detail::output_function_t&& ft) :
1111
consumer{std::move(name), std::move(predicates)},
1212
node_{g, concurrency, [f = std::move(ft)](message const& msg) -> tbb::flow::continue_msg {
13-
if (not msg.store->is_flush()) {
14-
f(*msg.store);
15-
}
13+
f(*msg.store);
1614
return {};
1715
}}
1816
{

phlex/core/declared_predicate.hpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ namespace phlex::experimental {
4040
product_queries input_products);
4141
virtual ~declared_predicate();
4242

43-
virtual tbb::flow::receiver<message>& flush_port() = 0;
43+
virtual tbb::flow::receiver<flush_message>& flush_port() = 0;
4444
virtual tbb::flow::sender<predicate_result>& sender() = 0;
4545

4646
protected:
@@ -75,10 +75,11 @@ namespace phlex::experimental {
7575
declared_predicate{std::move(name), std::move(predicates), std::move(input_products)},
7676
flush_receiver_{g,
7777
tbb::flow::unlimited,
78-
[this](message const& msg) -> tbb::flow::continue_msg {
79-
receive_flush(msg);
80-
if (done_with(msg.store)) {
81-
results_.erase(msg.store->index()->hash());
78+
[this](flush_message const& msg) -> tbb::flow::continue_msg {
79+
auto const hash = msg.index->hash();
80+
receive_flush(hash);
81+
if (done_with(hash)) {
82+
results_.erase(hash);
8283
}
8384
return {};
8485
}},
@@ -90,19 +91,18 @@ namespace phlex::experimental {
9091
auto const& msg = most_derived(messages);
9192
auto const& [store, message_id] = std::tie(msg.store, msg.id);
9293

93-
assert(not store->is_flush());
94-
9594
predicate_result result{};
96-
if (const_accessor a; results_.find(a, store->index()->hash())) {
95+
auto const hash = store->index()->hash();
96+
if (const_accessor a; results_.find(a, hash)) {
9797
result = {message_id, a->second.result};
98-
} else if (accessor a; results_.insert(a, store->index()->hash())) {
98+
} else if (accessor a; results_.insert(a, hash)) {
9999
bool const rc = call(ft, messages, std::make_index_sequence<N>{});
100100
result = a->second = {message_id, rc};
101-
flag_for(store->index()->hash()).mark_as_processed();
101+
flag_for(hash).mark_as_processed();
102102
}
103103

104-
if (done_with(store)) {
105-
results_.erase(store->index()->hash());
104+
if (done_with(hash)) {
105+
results_.erase(hash);
106106
}
107107
return result;
108108
}}
@@ -120,7 +120,7 @@ namespace phlex::experimental {
120120

121121
std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }
122122

123-
tbb::flow::receiver<message>& flush_port() override { return flush_receiver_; }
123+
tbb::flow::receiver<flush_message>& flush_port() override { return flush_receiver_; }
124124
tbb::flow::sender<predicate_result>& sender() override { return predicate_; }
125125

126126
template <std::size_t... Is>
@@ -133,7 +133,7 @@ namespace phlex::experimental {
133133
std::size_t num_calls() const final { return calls_.load(); }
134134

135135
input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
136-
tbb::flow::function_node<message> flush_receiver_;
136+
tbb::flow::function_node<flush_message> flush_receiver_;
137137
join_or_none_t<N> join_;
138138
tbb::flow::function_node<messages_t<N>, predicate_result> predicate_;
139139
results_t results_;

0 commit comments

Comments
 (0)