24 #ifndef __TPIE_PIPELINING_VIRTUAL_H__
25 #define __TPIE_PIPELINING_VIRTUAL_H__
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/pipe_base.h>
29 #include <tpie/pipelining/pipeline.h>
30 #include <tpie/pipelining/factory_helpers.h>
31 #include <tpie/pipelining/helpers.h>
47 template <
typename Input>
51 template <
typename Input,
typename Output>
55 template <
typename Output>
59 template <
typename Input>
63 template <
typename Input,
typename Output>
67 template <
typename Output>
78 typedef const T & type;
83 typedef const T & type;
88 typedef const T * type;
106 template <
typename Input>
112 virtual void push(input_type v) = 0;
118 template <
typename dest_t,
typename T>
129 : dest(std::move(dest))
132 this->
set_name(
"Virtual source", PRIORITY_INSIGNIFICANT);
140 void push(input_type v)
final {
150 template <
typename Output>
156 typedef Output item_type;
160 , m_virtdest(
nullptr)
163 this->
set_name(
"Virtual destination", PRIORITY_INSIGNIFICANT);
170 , m_virtdest(std::move(o.m_virtdest)) {
175 if (m_virtdest ==
nullptr) {
184 void set_destination(virtsrc<Output> * dest) {
185 if (m_virtdest !=
nullptr) {
193 void change_destination(virtsrc<Output> * dest) {
202 template <
typename T>
207 virtual T pull() = 0;
208 virtual bool can_pull() = 0;
214 template <
typename src_t,
typename T>
221 this->
set_name(
"Virtual pull source", PRIORITY_INSIGNIFICANT);
229 T pull()
final {
return src.pull();}
230 bool can_pull()
final {
return src.can_pull();}
238 template <
typename T>
250 this->
set_name(
"Virtual pull destitation", PRIORITY_INSIGNIFICANT);
257 , m_virtsrc(std::move(o.m_virtsrc)) {
262 if (m_virtsrc ==
nullptr) {
267 T pull() {
return m_virtsrc->pull();}
268 bool can_pull() {
return m_virtsrc->can_pull();}
270 void set_source(virtpullsrc<T> * src) {
271 if (m_virtsrc !=
nullptr) {
279 void change_source(virtpullsrc<T> * src) {
297 typedef std::shared_ptr<virt_node> ptr;
300 std::unique_ptr<node> m_node;
301 std::unique_ptr<virtual_container> m_container;
311 n->m_node.reset(pipe);
332 m_container.reset(ctr);
341 template <
typename T,
typename U,
typename Result>
343 static Result go(...) {
353 template <
typename T,
typename Result>
355 static Result go(Result r) {
363 template <
typename Input>
365 template <
typename Input,
typename Output>
367 template <
typename Input,
typename Output>
369 template <
typename Output>
372 template <
typename Input>
374 template <
typename Input,
typename Output>
376 template <
typename Input,
typename Output>
378 template <
typename Output>
381 template <
typename T,
typename ...TT>
382 static T construct(TT && ... vv) {
return T(std::forward<TT>(vv)...);}
393 virt_node::ptr m_node;
398 this->m_nodeMap = nodeMap;
402 this->m_nodeMap = nodeMap;
407 virt_node::ptr get_node()
const {
return m_node; }
410 m_node->set_container(ctr);
413 bool empty()
const {
return m_node.get() ==
nullptr; }
422 template <
typename Input>
427 input_type * m_input;
429 input_type * get_input()
const {
return m_input; }
436 template <
typename M
id>
438 const virtual_chunk_end<Mid> & right);
452 template <
typename fact_t>
454 *
this = std::forward<pipe_end<fact_t>>(pipe);
462 template <
typename fact_t>
465 log_error() <<
"Virtual chunk assigned twice" << std::endl;
469 typedef typename fact_t::constructed_type constructed_type;
472 this->m_nodeMap = m_input->get_node_map();
480 explicit operator bool() const noexcept {
481 return m_input !=
nullptr;
488 template <
typename Input,
typename Output=Input>
489 class virtual_chunk :
public bits::virtual_chunk_base {
490 friend class bits::access;
491 typedef bits::access acc;
492 typedef bits::virtsrc<Input> input_type;
493 typedef bits::virtrecv<Output> output_type;
494 input_type * m_input;
495 output_type * m_output;
496 input_type * get_input()
const {
return m_input; }
497 output_type * get_output()
const {
return m_output; }
504 template <
typename M
id>
506 const virtual_chunk<Mid, Output> & right)
507 : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node()))
509 m_input = acc::get_input(left);
510 m_output = acc::get_output(right);
526 template <
typename fact_t>
528 *
this = std::forward<pipe_middle<fact_t>>(pipe);
541 : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node()))
543 m_input = acc::get_input(left);
544 m_output = acc::get_output(right);
551 template <
typename fact_t>
554 log_error() <<
"Virtual chunk assigned twice" << std::endl;
557 typedef typename fact_t::template constructed_type<output_type> constructed_type;
560 fact_t f = std::move(pipe.factory);
561 f.set_destination_kind_push();
571 template <
typename NextOutput>
578 if (dest.empty() || !dst) {
582 m_output->set_destination(dst);
583 return acc::construct<virtual_chunk<Input, NextOutput>>(*
this, dest);
594 m_output->set_destination(acc::get_input(dest));
595 return acc::construct<virtual_chunk_end<Input>>(*
this, dest);
601 explicit operator bool() const noexcept {
602 return m_input !=
nullptr;
606 template <
typename Input>
607 template <
typename M
id>
609 const virtual_chunk_end<Mid> & right)
610 : virtual_chunk_base(left.get_node_map(),
611 bits::virt_node::combine(left.get_node(), right.get_node()))
613 m_input = acc::get_input(left);
619 template <
typename Output>
620 class virtual_chunk_begin :
public bits::virtual_chunk_base {
621 friend class bits::access;
622 typedef bits::access acc;
623 typedef bits::virtrecv<Output> output_type;
624 output_type * m_output;
625 output_type * get_output()
const {
return m_output; }
632 template <
typename M
id>
634 const virtual_chunk<Mid, Output> & right)
635 : virtual_chunk_base(left.get_node_map(),
636 bits::virt_node::combine(left.get_node(), right.get_node()))
638 m_output = acc::get_output(right);
652 template <
typename fact_t>
654 *
this = std::forward<pipe_begin<fact_t>>(pipe);
661 template <
typename fact_t>
664 log_error() <<
"Virtual chunk assigned twice" << std::endl;
667 typedef typename fact_t::template constructed_type<output_type> constructed_type;
669 this->m_nodeMap = m_output->get_node_map();
670 fact_t f = std::move(pipe.factory);
671 f.set_destination_kind_push();
679 template <
typename NextOutput>
686 m_output->set_destination(acc::get_input(dest));
687 return acc::construct<virtual_chunk_begin<NextOutput>>(*
this, dest);
696 m_output->set_destination(acc::get_input(dest));
697 return acc::construct<virtual_chunk_base>(
705 explicit operator bool() const noexcept {
706 return m_output !=
nullptr;
713 template <
typename Input>
714 class virtual_chunk_pull_end :
public bits::virtual_chunk_base {
715 friend class bits::access;
716 typedef bits::access acc;
717 typedef bits::virtpullrecv<Input> input_type;
718 input_type * m_input;
719 input_type * get_input()
const {
return m_input; }
726 template <
typename M
id>
728 const virtual_chunk_pull_end<Mid> & right);
742 template <
typename fact_t>
744 *
this = std::forward<pullpipe_end<fact_t>>(pipe);
751 template <
typename fact_t>
754 log_error() <<
"Virtual chunk assigned twice" << std::endl;
757 typedef typename fact_t::template constructed_type<input_type> constructed_type;
759 this->m_nodeMap = m_input->get_node_map();
760 fact_t f = std::move(pipe.factory);
761 f.set_destination_kind_pull();
769 explicit operator bool() const noexcept {
770 return m_input !=
nullptr;
778 template <
typename Input,
typename Output=Input>
779 class virtual_chunk_pull :
public bits::virtual_chunk_base {
780 friend class bits::access;
781 typedef bits::access acc;
782 typedef bits::virtpullsrc<Output> output_type;
783 typedef bits::virtpullrecv<Input> input_type;
784 input_type * m_input;
785 output_type * m_output;
786 input_type * get_input()
const {
return m_input; }
787 output_type * get_output()
const {
return m_output; }
794 template <
typename M
id>
796 const virtual_chunk_pull<Mid, Output> & right)
797 : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node()))
799 m_input = acc::get_input(left);
800 m_output = acc::get_output(right);
815 template <
typename fact_t>
817 *
this = std::forward<pullpipe_middle<fact_t>>(pipe);
829 : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node())) {
830 m_input = acc::get_input(left);
831 m_output = acc::get_output(right);
837 template <
typename fact_t>
840 log_error() <<
"Virtual chunk assigned twice" << std::endl;
843 typedef typename fact_t::template constructed_type<input_type> constructed_type;
846 fact_t f = std::move(pipe.factory);
847 f.set_destination_kind_push();
856 template <
typename NextOutput>
866 acc::get_input(dest)->set_source(get_output());
867 return acc::construct<virtual_chunk_pull<Input, NextOutput>>(*
this, dest);
878 acc::get_input(dest)->set_source(get_output());
879 return acc::construct<virtual_chunk_pull_end<Input>>(*
this, dest);
885 explicit operator bool() const noexcept {
886 return m_input !=
nullptr;
894 template <
typename Output>
895 class virtual_chunk_pull_begin :
public bits::virtual_chunk_base {
896 friend class bits::access;
897 typedef bits::access acc;
898 typedef bits::virtpullsrc<Output> output_type;
899 output_type * m_output;
901 output_type * get_output()
const {
return m_output; }
908 template <
typename M
id>
910 const virtual_chunk_pull<Mid, Output> & right)
911 : virtual_chunk_base(left.get_node_map(),
912 bits::virt_node::combine(left.get_node(), right.get_node())) {
913 m_output = acc::get_output(right);
927 template <
typename fact_t>
929 *
this = std::forward<pullpipe_begin<fact_t>>(pipe);
936 template <
typename fact_t>
939 log_error() <<
"Virtual chunk assigned twice" << std::endl;
943 typedef typename fact_t::constructed_type constructed_type;
946 this->m_nodeMap = m_output->get_node_map();
958 acc::get_input(dest)->set_source(get_output());
959 return acc::construct<virtual_chunk_base>(
969 template <
typename NextOutput>
976 acc::get_input(dest)->set_source(get_output());
977 return acc::construct<virtual_chunk_pull_begin<NextOutput>>(*
this, dest);
984 explicit operator bool() const noexcept {
985 return m_output !=
nullptr;
990 template <
typename Input>
991 template <
typename M
id>
993 const virtual_chunk_pull_end<Mid> & right)
994 : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node())) {
995 m_input = acc::get_input(left);
1001 template <
typename Input>
1002 virtsrc<Input> * access::get_input(
const virtual_chunk_end<Input> & chunk) {
1003 return chunk.get_input();
1006 template <
typename Input,
typename Output>
1007 virtsrc<Input> * access::get_input(
const virtual_chunk<Input, Output> & chunk) {
1008 return chunk.get_input();
1011 template <
typename Input,
typename Output>
1012 virtrecv<Output> * access::get_output(
const virtual_chunk<Input, Output> & chunk) {
1013 return chunk.get_output();
1016 template <
typename Output>
1017 virtrecv<Output> * access::get_output(
const virtual_chunk_begin<Output> & chunk) {
1018 return chunk.get_output();
1022 template <
typename Input>
1023 virtpullrecv<Input> * access::get_input(
const virtual_chunk_pull_end<Input> & chunk) {
1024 return chunk.get_input();
1027 template <
typename Input,
typename Output>
1028 virtpullrecv<Input> * access::get_input(
const virtual_chunk_pull<Input, Output> & chunk) {
1029 return chunk.get_input();
1032 template <
typename Input,
typename Output>
1033 virtpullsrc<Output> * access::get_output(
const virtual_chunk_pull<Input, Output> & chunk) {
1034 return chunk.get_output();
1037 template <
typename Output>
1038 virtpullsrc<Output> * access::get_output(
const virtual_chunk_pull_begin<Output> & chunk) {
1039 return chunk.get_output();
1042 template <
typename dest_t,
typename T>
1045 typedef T item_type;
1048 : vnode(out.get_node())
1049 , dest2(bits::access::get_input(out))
1050 , dest(std::move(dest)) {
1057 if (dest2) dest2->push(v);
1062 virt_node::ptr vnode;
1069 template <
typename T>
1072 typedef T item_type;
1074 template <
typename TT>
1076 : vnode(out.get_node())
1077 , tail(std::move(tail))
1078 , dest(bits::access::get_input(out))
1084 if (dest) dest->push(v);
1089 virt_node::ptr vnode;
1090 std::unique_ptr<node> tail;
1095 template <
typename dest_t,
typename T>
1100 template <
typename TT>
1102 : vnode(
input.get_node())
1103 , dest(std::move(dest)) {
1105 this->
set_name(
"Virtual source", PRIORITY_INSIGNIFICANT);
1107 recv = bits::access::get_output(
input);
1108 recv->set_destination(
this);
1113 , vnode(std::move(o.vnode))
1115 , dest(std::move(o.dest)) {
1118 recv->change_destination(
this);
1123 assert(recv ==
nullptr);
1125 vnode = std::move(o.vnode);
1129 recv->change_destination(
this);
1137 void push(input_type v)
final {
1141 virt_node::ptr vnode;
1146 template <
typename Input,
typename Output>
1149 template <
typename dest_t>
1159 template <
typename dest_t>
1161 auto destnode = std::make_unique<bits::devirtualize_begin_node<dest_t, Output>>(std::move(dest), chunk);
1172 template <
typename T>
1175 typedef T item_type;
1177 template <
typename TT>
1179 : vnode(out.get_node())
1180 , tail(std::move(tail))
1181 , src(bits::access::get_output(out))
1186 bool can_pull() {
return src->can_pull();}
1187 item_type pull() {
return src->pull();}
1190 virt_node::ptr vnode;
1191 std::unique_ptr<node> tail;
1195 template <
typename src_t,
typename T>
1198 template <
typename TT>
1200 : vnode(
output.get_node())
1201 , src(std::move(src)) {
1203 this->
set_name(
"Virtual source", PRIORITY_INSIGNIFICANT);
1205 input = bits::access::get_input(
output);
1206 input->set_source(
this);
1211 , vnode(std::move(o.vnode))
1213 , src(std::move(o.src)) {
1216 input->change_source(
this);
1221 assert(input ==
nullptr);
1223 vnode = std::move(o.vnode);
1227 input->change_source(
this);
1235 T pull()
final {
return src.pull();}
1236 bool can_pull()
final {
return src.can_pull();}
1238 virt_node::ptr vnode;
1243 template <
typename Input,
typename Output>
1246 template <
typename src_t>
1256 template <
typename src_t>
1258 auto srcnode = std::make_unique<bits::devirtualize_pull_end_node<src_t, Input>>(std::move(src), chunk);
1275 template <
typename Input>
1283 template <
typename Output>
1291 template <
typename Input,
typename Output>
1299 template <
typename Input>
1307 template <
typename Input>
1308 pullpipe_end<tfactory<bits::devirtualize_pull_end_node, Args<Input>, virtual_chunk_pull_end<Input>>>
devirtualize(
const virtual_chunk_pull_end<Input> & in) {
1315 template <
typename Input,
typename Output>
1316 pullpipe_middle<bits::devirtualization_pull_factory<Input, Output>>
devirtualize(
const virtual_chunk_pull<Input, Output> & mid) {
1320 template <
typename T>
1321 pipe_middle<tfactory<bits::vfork_node, Args<T>, virtual_chunk_end<T> > > fork_to_virtual(
const virtual_chunk_end<T> & out) {
1325 template <
typename T>
1326 virtual_chunk<T> vfork(
const virtual_chunk_end<T> & out) {
1327 if (out.empty())
return virtual_chunk<T>();
1328 return fork_to_virtual(out);
1331 template <
typename T>
1332 inline virtual_chunk<T> chunk_if(
bool b, virtual_chunk<T> t) {
1336 return virtual_chunk<T>();
1339 template <
typename T>
1340 virtual_chunk_end<T> chunk_end_if(
bool b, virtual_chunk_end<T> t) {
1344 return virtual_chunk_end<T>(null_sink<T>());
1349 #endif // __TPIE_PIPELINING_VIRTUAL_H__