20 #ifndef TPIE_PIPELINING_JOIN_H
21 #define TPIE_PIPELINING_JOIN_H
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_helpers.h>
25 #include <tpie/pipelining/pipe_base.h>
49 virtual void push(
const T & v) = 0;
55 template <
typename dest_t>
60 , the_source(the_source)
61 , dest(std::move(dest))
63 this->
set_name(
"Join source", PRIORITY_INSIGNIFICANT);
70 if (*the_source != NULL && *the_source !=
this) {
74 throw exception(
"Attempted to set join source a second time");
79 void push(
const T & v)
override {
84 source_base ** the_source;
88 pipe_begin<factory<source_impl, node_token, source_base **> > source() {
89 return factory<source_impl, node_token, source_base **>(source_token, &the_source);
97 : the_source(the_source)
99 set_name(
"Join sink", PRIORITY_INSIGNIFICANT);
104 the_source_cache = *the_source;
107 void push(
const T & v) {
108 the_source_cache->push(v);
112 source_base * the_source_cache;
113 source_base ** the_source;
116 pipe_end<termfactory<sink_impl, node_token, source_base **> > sink() {
117 return termfactory<sink_impl, node_token, source_base **>(source_token, &the_source);
120 join() : the_source(NULL) {}
122 source_base * the_source;
123 node_token source_token;
128 #endif // TPIE_PIPELINING_JOIN_H