TPIE

11a2c2d
virtual.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*-
2 // vi:set ts=4 sts=4 sw=4 noet cino+=(0 :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
23 
24 #ifndef __TPIE_PIPELINING_VIRTUAL_H__
25 #define __TPIE_PIPELINING_VIRTUAL_H__
26 
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/pipe_base.h>
29 #include <tpie/pipelining/pipeline.h>
30 #include <tpie/pipelining/factory_helpers.h>
31 #include <tpie/pipelining/helpers.h>
32 
33 namespace tpie::pipelining {
34 
41 public:
42  virtual ~virtual_container() {}
43 };
44 
45 
46 // Predeclare
47 template <typename Input>
49 
50 // Predeclare
51 template <typename Input, typename Output>
53 
54 // Predeclare
55 template <typename Output>
57 
58 // Predeclare
59 template <typename Input>
61 
62 // Predeclare
63 template <typename Input, typename Output>
65 
66 // Predeclare
67 template <typename Output>
69 
70 namespace bits {
71 
76 template <typename T>
78  typedef const T & type;
79 };
80 
81 template <typename T>
82 struct maybe_add_const_ref<const T &> {
83  typedef const T & type;
84 };
85 
86 template <typename T>
87 struct maybe_add_const_ref<const T *> {
88  typedef const T * type;
89 };
90 
91 template <typename T>
92 struct maybe_add_const_ref<T &> {
93  typedef T & type;
94 };
95 
96 template <typename T>
97 struct maybe_add_const_ref<T *> {
98  typedef T * type;
99 };
100 
106 template <typename Input>
107 class virtsrc : public node {
108  typedef typename maybe_add_const_ref<Input>::type input_type;
109 
110 public:
111  virtual const node_token & get_token() = 0; //TODO this need not be virtual
112  virtual void push(input_type v) = 0;
113 };
114 
118 template <typename dest_t, typename T>
119 class virtsrc_impl : public virtsrc<T> {
120 public:
121  typedef T item_type;
122 
123 private:
124  typedef typename maybe_add_const_ref<item_type>::type input_type;
125  dest_t dest;
126 
127 public:
128  virtsrc_impl(dest_t dest)
129  : dest(std::move(dest))
130  {
131  node::add_push_destination(this->dest);
132  this->set_name("Virtual source", PRIORITY_INSIGNIFICANT);
133  this->set_plot_options(node::PLOT_BUFFERED | node::PLOT_SIMPLIFIED_HIDE);
134  }
135 
136  const node_token & get_token() final {
137  return node::get_token();
138  }
139 
140  void push(input_type v) final {
141  dest.push(v);
142  }
143 };
144 
150 template <typename Output>
151 class virtrecv : public node {
152  virtrecv *& m_self;
153  virtsrc<Output> * m_virtdest;
154 
155 public:
156  typedef Output item_type;
157 
158  virtrecv(virtrecv *& self)
159  : m_self(self)
160  , m_virtdest(nullptr)
161  {
162  m_self = this;
163  this->set_name("Virtual destination", PRIORITY_INSIGNIFICANT);
164  this->set_plot_options(node::PLOT_BUFFERED | node::PLOT_SIMPLIFIED_HIDE);
165  }
166 
167  virtrecv(virtrecv && o)
168  : node(std::move(o))
169  , m_self(o.m_self)
170  , m_virtdest(std::move(o.m_virtdest)) {
171  m_self = this;
172  }
173 
174  void begin() final {
175  if (m_virtdest == nullptr) {
176  throw tpie::exception("No virtual destination");
177  }
178  }
179 
180  void push(typename maybe_add_const_ref<Output>::type v) {
181  m_virtdest->push(v);
182  }
183 
184  void set_destination(virtsrc<Output> * dest) {
185  if (m_virtdest != nullptr) {
186  throw tpie::exception("Virtual destination set twice");
187  }
188 
189  m_virtdest = dest;
190  add_push_destination(dest->get_token());
191  }
192 
193  void change_destination(virtsrc<Output> * dest) {
194  m_virtdest = dest;
195  }
196 };
197 
202 template <typename T>
203 class virtpullsrc : public node {
204  using item_type = T;
205 public:
206  virtual const node_token & get_token() = 0;
207  virtual T pull() = 0;
208  virtual bool can_pull() = 0;
209 };
210 
214 template <typename src_t, typename T>
215 class virtpullsrc_impl : public virtpullsrc<T> {
216 private:
217  src_t src;
218 public:
219  virtpullsrc_impl(src_t src) : src(std::move(src)) {
220  node::add_pull_source(this->src);
221  this->set_name("Virtual pull source", PRIORITY_INSIGNIFICANT);
222  this->set_plot_options(node::PLOT_BUFFERED | node::PLOT_SIMPLIFIED_HIDE);
223  }
224 
225  const node_token & get_token() {
226  return node::get_token();
227  }
228 
229  T pull() final {return src.pull();}
230  bool can_pull() final {return src.can_pull();}
231 };
232 
238 template <typename T>
239 class virtpullrecv : public node {
240  virtpullrecv *& m_self;
241  virtpullsrc<T> * m_virtsrc;
242 public:
243  typedef T item_type;
244 
245  virtpullrecv(virtpullrecv *& self)
246  : m_self(self)
247  , m_virtsrc(nullptr)
248  {
249  m_self = this;
250  this->set_name("Virtual pull destitation", PRIORITY_INSIGNIFICANT);
251  this->set_plot_options(node::PLOT_BUFFERED | node::PLOT_SIMPLIFIED_HIDE);
252  }
253 
255  : node(std::move(o))
256  , m_self(o.m_self)
257  , m_virtsrc(std::move(o.m_virtsrc)) {
258  m_self = this;
259  }
260 
261  void begin() final {
262  if (m_virtsrc == nullptr) {
263  throw tpie::exception("No virtual pull source");
264  }
265  }
266 
267  T pull() {return m_virtsrc->pull();}
268  bool can_pull() {return m_virtsrc->can_pull();}
269 
270  void set_source(virtpullsrc<T> * src) {
271  if (m_virtsrc != nullptr) {
272  throw tpie::exception("Virtual source pull set twice");
273  }
274 
275  m_virtsrc = src;
276  add_pull_source(src->get_token());
277  }
278 
279  void change_source(virtpullsrc<T> * src) {
280  m_virtsrc = src;
281  }
282 };
283 
284 
295 class virt_node {
296 public:
297  typedef std::shared_ptr<virt_node> ptr;
298 
299 private:
300  std::unique_ptr<node> m_node;
301  std::unique_ptr<virtual_container> m_container;
302  ptr m_left;
303  ptr m_right;
304 
305 public:
309  static ptr take_own(node * pipe) {
310  virt_node * n = new virt_node();
311  n->m_node.reset(pipe);
312  ptr res(n);
313  return res;
314  }
315 
319  static ptr combine(ptr left, ptr right) {
320  virt_node * n = new virt_node();
321  n->m_left = left;
322  n->m_right = right;
323  ptr res(n);
324  return res;
325  }
326 
332  m_container.reset(ctr);
333  }
334 };
335 
341 template <typename T, typename U, typename Result>
343  static Result go(...) {
345  }
346 };
347 
353 template <typename T, typename Result>
354 struct assert_types_equal_and_return<T, T, Result> {
355  static Result go(Result r) {
356  return r;
357  }
358 };
359 
360 
361 class access {
362 public:
363  template <typename Input>
364  static virtsrc<Input> * get_input(const virtual_chunk_end<Input> &);
365  template <typename Input, typename Output>
366  static virtsrc<Input> * get_input(const virtual_chunk<Input, Output> &);
367  template <typename Input, typename Output>
368  static virtrecv<Output> * get_output(const virtual_chunk<Input, Output> &);
369  template <typename Output>
370  static virtrecv<Output> * get_output(const virtual_chunk_begin<Output> &);
371 
372  template <typename Input>
373  static virtpullrecv<Input> * get_input(const virtual_chunk_pull_end<Input> &);
374  template <typename Input, typename Output>
375  static virtpullrecv<Input> * get_input(const virtual_chunk_pull<Input, Output> &);
376  template <typename Input, typename Output>
377  static virtpullsrc<Output> * get_output(const virtual_chunk_pull<Input, Output> &);
378  template <typename Output>
379  static virtpullsrc<Output> * get_output(const virtual_chunk_pull_begin<Output> &);
380 
381  template <typename T, typename ...TT>
382  static T construct(TT && ... vv) {return T(std::forward<TT>(vv)...);}
383 };
384 
385 
390  // pipeline_base has virtual dtor and shared_ptr to m_nodeMap
391  friend class access;
392 protected:
393  virt_node::ptr m_node;
394 
395  virtual_chunk_base(node_map::ptr nodeMap, virt_node::ptr ptr)
396  : m_node(ptr)
397  {
398  this->m_nodeMap = nodeMap;
399  }
400 
401  virtual_chunk_base(node_map::ptr nodeMap) {
402  this->m_nodeMap = nodeMap;
403  }
404 public:
405  virtual_chunk_base() {}
406 
407  virt_node::ptr get_node() const { return m_node; }
408 
409  void set_container(virtual_container * ctr) {
410  m_node->set_container(ctr);
411  }
412 
413  bool empty() const { return m_node.get() == nullptr; }
414 };
415 
416 } //namespace bits
417 
418 
422 template <typename Input>
424  friend class bits::access;
425  typedef bits::access acc;
426  typedef bits::virtsrc<Input> input_type;
427  input_type * m_input;
428 
429  input_type * get_input() const { return m_input; }
430 
436  template <typename Mid>
437  virtual_chunk_end(const virtual_chunk<Input, Mid> & left,
438  const virtual_chunk_end<Mid> & right);
439 
440 public:
445  : m_input(nullptr)
446  {}
447 
452  template <typename fact_t>
454  *this = std::forward<pipe_end<fact_t>>(pipe);
455  set_container(ctr);
456  }
457 
458 
462  template <typename fact_t>
464  if (this->m_node) {
465  log_error() << "Virtual chunk assigned twice" << std::endl;
466  throw tpie::exception("Virtual chunk assigned twice");
467  }
468 
469  typedef typename fact_t::constructed_type constructed_type;
470  m_input = new bits::virtsrc_impl<constructed_type, Input>(pipe.factory.construct());
471  this->m_node = bits::virt_node::take_own(m_input);
472  this->m_nodeMap = m_input->get_node_map();
473 
474  return *this;
475  }
476 
480  explicit operator bool() const noexcept {
481  return m_input != nullptr;
482  }
483 };
484 
488 template <typename Input, typename Output=Input>
489 class virtual_chunk : public bits::virtual_chunk_base {
490  friend class bits::access;
491  typedef bits::access acc;
492  typedef bits::virtsrc<Input> input_type;
493  typedef bits::virtrecv<Output> output_type;
494  input_type * m_input;
495  output_type * m_output;
496  input_type * get_input() const { return m_input; }
497  output_type * get_output() const { return m_output; }
498 
504  template <typename Mid>
505  virtual_chunk(const virtual_chunk<Input, Mid> & left,
506  const virtual_chunk<Mid, Output> & right)
507  : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node()))
508  {
509  m_input = acc::get_input(left);
510  m_output = acc::get_output(right);
511  }
512 
513 public:
518  : m_input(nullptr)
519  , m_output(nullptr)
520  {}
521 
526  template <typename fact_t>
528  *this = std::forward<pipe_middle<fact_t>>(pipe);
529  set_container(ctr);
530  }
531 
532 
540  const virtual_chunk_begin<Output> & right)
541  : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node()))
542  {
543  m_input = acc::get_input(left);
544  m_output = acc::get_output(right);
545  }
546 
547 
551  template <typename fact_t>
553  if (this->m_node) {
554  log_error() << "Virtual chunk assigned twice" << std::endl;
555  throw tpie::exception("Virtual chunk assigned twice");
556  }
557  typedef typename fact_t::template constructed_type<output_type> constructed_type;
558  output_type temp(m_output);
559  this->m_nodeMap = temp.get_node_map();
560  fact_t f = std::move(pipe.factory);
561  f.set_destination_kind_push();
562  m_input = new bits::virtsrc_impl<constructed_type, Input>(f.construct(std::move(temp)));
563  this->m_node = bits::virt_node::take_own(m_input);
564 
565  return *this;
566  }
567 
571  template <typename NextOutput>
573  if (empty()) {
575  ::go(&dest);
576  }
577  bits::virtsrc<Output> * dst=acc::get_input(dest);
578  if (dest.empty() || !dst) {
580  ::go(this);
581  }
582  m_output->set_destination(dst);
583  return acc::construct<virtual_chunk<Input, NextOutput>>(*this, dest);
584  }
585 
590  if (empty()) {
592  ::go(&dest);
593  }
594  m_output->set_destination(acc::get_input(dest));
595  return acc::construct<virtual_chunk_end<Input>>(*this, dest);
596  }
597 
601  explicit operator bool() const noexcept {
602  return m_input != nullptr;
603  }
604 };
605 
606 template <typename Input>
607 template <typename Mid>
608 virtual_chunk_end<Input>::virtual_chunk_end(const virtual_chunk<Input, Mid> & left,
609  const virtual_chunk_end<Mid> & right)
610  : virtual_chunk_base(left.get_node_map(),
611  bits::virt_node::combine(left.get_node(), right.get_node()))
612 {
613  m_input = acc::get_input(left);
614 }
615 
619 template <typename Output>
620 class virtual_chunk_begin : public bits::virtual_chunk_base {
621  friend class bits::access;
622  typedef bits::access acc;
623  typedef bits::virtrecv<Output> output_type;
624  output_type * m_output;
625  output_type * get_output() const { return m_output; }
626 
632  template <typename Mid>
633  virtual_chunk_begin(const virtual_chunk_begin<Mid> & left,
634  const virtual_chunk<Mid, Output> & right)
635  : virtual_chunk_base(left.get_node_map(),
636  bits::virt_node::combine(left.get_node(), right.get_node()))
637  {
638  m_output = acc::get_output(right);
639  }
640 public:
645  : m_output(nullptr)
646  {}
647 
652  template <typename fact_t>
654  *this = std::forward<pipe_begin<fact_t>>(pipe);
655  set_container(ctr);
656  }
657 
661  template <typename fact_t>
663  if (this->m_node) {
664  log_error() << "Virtual chunk assigned twice" << std::endl;
665  throw tpie::exception("Virtual chunk assigned twice");
666  }
667  typedef typename fact_t::template constructed_type<output_type> constructed_type;
668  output_type temp(m_output);
669  this->m_nodeMap = m_output->get_node_map();
670  fact_t f = std::move(pipe.factory);
671  f.set_destination_kind_push();
672  this->m_node = bits::virt_node::take_own(new constructed_type(f.construct(std::move(temp))));
673  return *this;
674  }
675 
679  template <typename NextOutput>
681  if (empty()) throw virtual_chunk_missing_begin();
682  if (dest.empty()) {
684  ::go(this);
685  }
686  m_output->set_destination(acc::get_input(dest));
687  return acc::construct<virtual_chunk_begin<NextOutput>>(*this, dest);
688  }
689 
693  virtual_chunk_base operator|(virtual_chunk_end<Output> dest) {
694  if (empty()) throw virtual_chunk_missing_begin();
695  if (dest.empty()) throw virtual_chunk_missing_end();
696  m_output->set_destination(acc::get_input(dest));
697  return acc::construct<virtual_chunk_base>(
698  this->m_nodeMap,
699  bits::virt_node::combine(get_node(), dest.get_node()));
700  }
701 
705  explicit operator bool() const noexcept {
706  return m_output != nullptr;
707  }
708 };
709 
713 template <typename Input>
714 class virtual_chunk_pull_end : public bits::virtual_chunk_base {
715  friend class bits::access;
716  typedef bits::access acc;
717  typedef bits::virtpullrecv<Input> input_type;
718  input_type * m_input;
719  input_type * get_input() const { return m_input; }
720 
726  template <typename Mid>
727  virtual_chunk_pull_end(const virtual_chunk_pull<Input, Mid> & left,
728  const virtual_chunk_pull_end<Mid> & right);
729 
730 public:
735  : m_input(nullptr)
736  {}
737 
742  template <typename fact_t>
744  *this = std::forward<pullpipe_end<fact_t>>(pipe);
745  set_container(ctr);
746  }
747 
751  template <typename fact_t>
753  if (this->m_node) {
754  log_error() << "Virtual chunk assigned twice" << std::endl;
755  throw tpie::exception("Virtual chunk assigned twice");
756  }
757  typedef typename fact_t::template constructed_type<input_type> constructed_type;
758  input_type temp(m_input);
759  this->m_nodeMap = m_input->get_node_map();
760  fact_t f = std::move(pipe.factory);
761  f.set_destination_kind_pull();
762  this->m_node = bits::virt_node::take_own(new constructed_type(f.construct(std::move(temp))));
763  return *this;
764  }
765 
769  explicit operator bool() const noexcept {
770  return m_input != nullptr;
771  }
772 };
773 
774 
778 template <typename Input, typename Output=Input>
779 class virtual_chunk_pull : public bits::virtual_chunk_base {
780  friend class bits::access;
781  typedef bits::access acc;
782  typedef bits::virtpullsrc<Output> output_type;
783  typedef bits::virtpullrecv<Input> input_type;
784  input_type * m_input;
785  output_type * m_output;
786  input_type * get_input() const { return m_input; }
787  output_type * get_output() const { return m_output; }
788 
794  template <typename Mid>
795  virtual_chunk_pull(const virtual_chunk_pull<Input, Mid> & left,
796  const virtual_chunk_pull<Mid, Output> & right)
797  : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node()))
798  {
799  m_input = acc::get_input(left);
800  m_output = acc::get_output(right);
801  }
802 public:
807  : m_input(nullptr)
808  , m_output(nullptr)
809  {}
810 
815  template <typename fact_t>
817  *this = std::forward<pullpipe_middle<fact_t>>(pipe);
818  set_container(ctr);
819  }
820 
828  const virtual_chunk_pull_begin<Output> & right)
829  : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node())) {
830  m_input = acc::get_input(left);
831  m_output = acc::get_output(right);
832  }
833 
837  template <typename fact_t>
839  if (this->m_node) {
840  log_error() << "Virtual chunk assigned twice" << std::endl;
841  throw tpie::exception("Virtual chunk assigned twice");
842  }
843  typedef typename fact_t::template constructed_type<input_type> constructed_type;
844  input_type temp(m_input);
845  this->m_nodeMap = temp.get_node_map();
846  fact_t f = std::move(pipe.factory);
847  f.set_destination_kind_push();
848  m_output = new bits::virtpullsrc_impl<constructed_type, Output>(f.construct(std::move(temp)));
849  this->m_node = bits::virt_node::take_own(m_output);
850  return *this;
851  }
852 
856  template <typename NextOutput>
858  if (empty()) {
860  ::go(&dest);
861  }
862  if (dest.empty()) {
864  ::go(this);
865  }
866  acc::get_input(dest)->set_source(get_output());
867  return acc::construct<virtual_chunk_pull<Input, NextOutput>>(*this, dest);
868  }
869 
874  if (empty()) {
876  ::go(&dest);
877  }
878  acc::get_input(dest)->set_source(get_output());
879  return acc::construct<virtual_chunk_pull_end<Input>>(*this, dest);
880  }
881 
885  explicit operator bool() const noexcept {
886  return m_input != nullptr;
887  }
888 };
889 
890 
894 template <typename Output>
895 class virtual_chunk_pull_begin : public bits::virtual_chunk_base {
896  friend class bits::access;
897  typedef bits::access acc;
898  typedef bits::virtpullsrc<Output> output_type;
899  output_type * m_output;
900 
901  output_type * get_output() const { return m_output; }
902 
908  template <typename Mid>
909  virtual_chunk_pull_begin(const virtual_chunk_pull_begin<Mid> & left,
910  const virtual_chunk_pull<Mid, Output> & right)
911  : virtual_chunk_base(left.get_node_map(),
912  bits::virt_node::combine(left.get_node(), right.get_node())) {
913  m_output = acc::get_output(right);
914  }
915 public:
920  : m_output(nullptr)
921  {}
922 
927  template <typename fact_t>
929  *this = std::forward<pullpipe_begin<fact_t>>(pipe);
930  set_container(ctr);
931  }
932 
936  template <typename fact_t>
938  if (this->m_node) {
939  log_error() << "Virtual chunk assigned twice" << std::endl;
940  throw tpie::exception("Virtual chunk assigned twice");
941  }
942 
943  typedef typename fact_t::constructed_type constructed_type;
944  m_output = new bits::virtpullsrc_impl<constructed_type, Output>(pipe.factory.construct());
945  this->m_node = bits::virt_node::take_own(m_output);
946  this->m_nodeMap = m_output->get_node_map();
947 
948  return *this;
949  }
950 
951 
955  virtual_chunk_base operator|(virtual_chunk_pull_end<Output> dest) {
956  if (empty()) throw virtual_chunk_missing_begin();
957  if (dest.empty()) throw virtual_chunk_missing_end();
958  acc::get_input(dest)->set_source(get_output());
959  return acc::construct<virtual_chunk_base>(
960  this->m_nodeMap,
961  bits::virt_node::combine(get_node(), dest.get_node()));
962  }
963 
964 
965 
969  template <typename NextOutput>
971  if (empty()) throw virtual_chunk_missing_begin();
972  if (dest.empty()) {
974  ::go(this);
975  }
976  acc::get_input(dest)->set_source(get_output());
977  return acc::construct<virtual_chunk_pull_begin<NextOutput>>(*this, dest);
978  }
979 
980 
984  explicit operator bool() const noexcept {
985  return m_output != nullptr;
986  }
987 };
988 
989 
990 template <typename Input>
991 template <typename Mid>
992 virtual_chunk_pull_end<Input>::virtual_chunk_pull_end(const virtual_chunk_pull<Input, Mid> & left,
993  const virtual_chunk_pull_end<Mid> & right)
994  : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node())) {
995  m_input = acc::get_input(left);
996 }
997 
998 
999 namespace bits {
1000 
1001 template <typename Input>
1002 virtsrc<Input> * access::get_input(const virtual_chunk_end<Input> & chunk) {
1003  return chunk.get_input();
1004 }
1005 
1006 template <typename Input, typename Output>
1007 virtsrc<Input> * access::get_input(const virtual_chunk<Input, Output> & chunk) {
1008  return chunk.get_input();
1009 }
1010 
1011 template <typename Input, typename Output>
1012 virtrecv<Output> * access::get_output(const virtual_chunk<Input, Output> & chunk) {
1013  return chunk.get_output();
1014 }
1015 
1016 template <typename Output>
1017 virtrecv<Output> * access::get_output(const virtual_chunk_begin<Output> & chunk) {
1018  return chunk.get_output();
1019 }
1020 
1021 
1022 template <typename Input>
1023 virtpullrecv<Input> * access::get_input(const virtual_chunk_pull_end<Input> & chunk) {
1024  return chunk.get_input();
1025 }
1026 
1027 template <typename Input, typename Output>
1028 virtpullrecv<Input> * access::get_input(const virtual_chunk_pull<Input, Output> & chunk) {
1029  return chunk.get_input();
1030 }
1031 
1032 template <typename Input, typename Output>
1033 virtpullsrc<Output> * access::get_output(const virtual_chunk_pull<Input, Output> & chunk) {
1034  return chunk.get_output();
1035 }
1036 
1037 template <typename Output>
1038 virtpullsrc<Output> * access::get_output(const virtual_chunk_pull_begin<Output> & chunk) {
1039  return chunk.get_output();
1040 }
1041 
1042 template <typename dest_t, typename T>
1043 class vfork_node : public node {
1044 public:
1045  typedef T item_type;
1046 
1047  vfork_node(dest_t && dest, virtual_chunk_end<T> out)
1048  : vnode(out.get_node())
1049  , dest2(bits::access::get_input(out))
1050  , dest(std::move(dest)) {
1051  add_push_destination(this->dest);
1052  if (dest2) add_push_destination(*dest2);
1053  }
1054 
1055  void push(T v) {
1056  dest.push(v);
1057  if (dest2) dest2->push(v);
1058  }
1059 
1060 private:
1061  // This counted reference ensures dest2 is not deleted prematurely.
1062  virt_node::ptr vnode;
1063 
1064  virtsrc<T> * dest2;
1065 
1066  dest_t dest;
1067 };
1068 
1069 template <typename T>
1070 class devirtualize_end_node : public node {
1071 public:
1072  typedef T item_type;
1073 
1074  template <typename TT>
1075  devirtualize_end_node(TT out, std::unique_ptr<node> tail=std::unique_ptr<node>())
1076  : vnode(out.get_node())
1077  , tail(std::move(tail))
1078  , dest(bits::access::get_input(out))
1079  {
1080  if (dest) add_push_destination(*dest);
1081  }
1082 
1083  void push(T v) {
1084  if (dest) dest->push(v);
1085  }
1086 
1087 private:
1088  // This counted reference ensures dest is not deleted prematurely.
1089  virt_node::ptr vnode;
1090  std::unique_ptr<node> tail;
1091  virtsrc<T> * dest;
1092 };
1093 
1094 
1095 template <typename dest_t, typename T>
1097 public:
1098  using input_type = typename maybe_add_const_ref<T>::type;
1099 
1100  template <typename TT>
1101  devirtualize_begin_node(dest_t dest, TT input)
1102  : vnode(input.get_node())
1103  , dest(std::move(dest)) {
1104  node::add_push_destination(this->dest);
1105  this->set_name("Virtual source", PRIORITY_INSIGNIFICANT);
1106  this->set_plot_options(node::PLOT_BUFFERED | node::PLOT_SIMPLIFIED_HIDE);
1107  recv = bits::access::get_output(input);
1108  recv->set_destination(this);
1109  }
1110 
1112  : virtsrc<T>(std::move(o))
1113  , vnode(std::move(o.vnode))
1114  , recv(o.recv)
1115  , dest(std::move(o.dest)) {
1116  o.recv = nullptr;
1117  if (recv) {
1118  recv->change_destination(this);
1119  }
1120  }
1121 
1123  assert(recv == nullptr);
1124  virtsrc<T>::operator=(std::move(o));
1125  vnode = std::move(o.vnode);
1126  recv = o.recv;
1127  o.recv = nullptr;
1128  if (recv) {
1129  recv->change_destination(this);
1130  }
1131  }
1132 
1133  const node_token & get_token() final {
1134  return node::get_token();
1135  }
1136 
1137  void push(input_type v) final {
1138  dest.push(v);
1139  }
1140 
1141  virt_node::ptr vnode;
1142  virtrecv<T> * recv = nullptr;
1143  dest_t dest;
1144 };
1145 
1146 template <typename Input, typename Output>
1148 public:
1149  template <typename dest_t>
1151 
1154  devirtualization_factory & operator=(const devirtualization_factory &) = delete;
1155  devirtualization_factory & operator=(devirtualization_factory &&) = default;
1156 
1157  explicit devirtualization_factory(virtual_chunk<Input, Output> chunk) : chunk(chunk) {}
1158 
1159  template <typename dest_t>
1160  bits::devirtualize_end_node<Input> construct(dest_t && dest) {
1161  auto destnode = std::make_unique<bits::devirtualize_begin_node<dest_t, Output>>(std::move(dest), chunk);
1162  this->init_sub_node(*destnode);
1163  bits::devirtualize_end_node<Input> r(chunk, std::move(destnode));
1164  this->init_sub_node(r);
1165  return r;
1166  }
1167 
1169 };
1170 
1171 
1172 template <typename T>
1174 public:
1175  typedef T item_type;
1176 
1177  template <typename TT>
1178  devirtualize_pull_begin_node(TT out, std::unique_ptr<node> tail=std::unique_ptr<node>())
1179  : vnode(out.get_node())
1180  , tail(std::move(tail))
1181  , src(bits::access::get_output(out))
1182  {
1183  if (src) add_pull_source(*src);
1184  }
1185 
1186  bool can_pull() {return src->can_pull();}
1187  item_type pull() {return src->pull();}
1188 private:
1189  // This counted reference ensures src is not deleted prematurely.
1190  virt_node::ptr vnode;
1191  std::unique_ptr<node> tail;
1192  virtpullsrc<T> * src;
1193 };
1194 
1195 template <typename src_t, typename T>
1197 public:
1198  template <typename TT>
1199  devirtualize_pull_end_node(src_t src, TT output)
1200  : vnode(output.get_node())
1201  , src(std::move(src)) {
1202  node::add_pull_source(this->src),
1203  this->set_name("Virtual source", PRIORITY_INSIGNIFICANT);
1204  this->set_plot_options(node::PLOT_BUFFERED | node::PLOT_SIMPLIFIED_HIDE);
1205  input = bits::access::get_input(output);
1206  input->set_source(this);
1207  }
1208 
1210  : virtpullsrc<T>(std::move(o))
1211  , vnode(std::move(o.vnode))
1212  , input(o.input)
1213  , src(std::move(o.src)) {
1214  o.input = nullptr;
1215  if (input) {
1216  input->change_source(this);
1217  }
1218  }
1219 
1221  assert(input == nullptr);
1222  virtpullsrc<T>::operator=(std::move(o));
1223  vnode = std::move(o.vnode);
1224  input = o.input;
1225  o.input = nullptr;
1226  if (input) {
1227  input->change_source(this);
1228  }
1229  }
1230 
1231  const node_token & get_token() final {
1232  return node::get_token();
1233  }
1234 
1235  T pull() final {return src.pull();}
1236  bool can_pull() final {return src.can_pull();}
1237 
1238  virt_node::ptr vnode;
1239  virtpullrecv<T> * input = nullptr;
1240  src_t src;
1241 };
1242 
1243 template <typename Input, typename Output>
1245 public:
1246  template <typename src_t>
1248 
1251  devirtualization_pull_factory & operator=(const devirtualization_pull_factory &) = delete;
1253 
1254  explicit devirtualization_pull_factory(virtual_chunk_pull<Input, Output> chunk) : chunk(chunk) {}
1255 
1256  template <typename src_t>
1257  bits::devirtualize_pull_begin_node<Input> construct(src_t && src) {
1258  auto srcnode = std::make_unique<bits::devirtualize_pull_end_node<src_t, Input>>(std::move(src), chunk);
1259  this->init_sub_node(*srcnode);
1260  bits::devirtualize_pull_begin_node<Output> r(chunk, std::move(srcnode));
1261  this->init_sub_node(r);
1262  return r;
1263  }
1264 
1266 };
1267 
1268 
1269 
1270 } // namespace bits
1271 
1275 template <typename Input>
1277  return {out};
1278 }
1279 
1283 template <typename Output>
1285  return {in};
1286 }
1287 
1291 template <typename Input, typename Output>
1293  return {mid};
1294 }
1295 
1299 template <typename Input>
1301  return {out};
1302 }
1303 
1304 /*
1305  * \brief Convert a virtual_chunk_pull_end for use at the end of a normal none virtual pull pipeline
1306  */
1307 template <typename Input>
1308 pullpipe_end<tfactory<bits::devirtualize_pull_end_node, Args<Input>, virtual_chunk_pull_end<Input>>> devirtualize(const virtual_chunk_pull_end<Input> & in) {
1309  return {in};
1310 }
1311 
1312 /*
1313  * \brief Convert a virtual_chunk_pull for use in the middle of a normal none virtual pull pipeline
1314  */
1315 template <typename Input, typename Output>
1316 pullpipe_middle<bits::devirtualization_pull_factory<Input, Output>> devirtualize(const virtual_chunk_pull<Input, Output> & mid) {
1317  return {mid};
1318 }
1319 
1320 template <typename T>
1321 pipe_middle<tfactory<bits::vfork_node, Args<T>, virtual_chunk_end<T> > > fork_to_virtual(const virtual_chunk_end<T> & out) {
1322  return {out};
1323 }
1324 
1325 template <typename T>
1326 virtual_chunk<T> vfork(const virtual_chunk_end<T> & out) {
1327  if (out.empty()) return virtual_chunk<T>();
1328  return fork_to_virtual(out);
1329 }
1330 
1331 template <typename T>
1332 inline virtual_chunk<T> chunk_if(bool b, virtual_chunk<T> t) {
1333  if (b)
1334  return t;
1335  else
1336  return virtual_chunk<T>();
1337 }
1338 
1339 template <typename T>
1340 virtual_chunk_end<T> chunk_end_if(bool b, virtual_chunk_end<T> t) {
1341  if (b)
1342  return t;
1343  else
1344  return virtual_chunk_end<T>(null_sink<T>());
1345 }
1346 
1347 } // namespace tpie::pipelining
1348 
1349 #endif // __TPIE_PIPELINING_VIRTUAL_H__
tpie::pipelining::virtual_chunk_pull_begin::operator=
virtual_chunk_pull_begin & operator=(pullpipe_begin< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
Definition: virtual.h:937
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::pipelining::factory_base::init_sub_node
void init_sub_node(node &r)
Initialize node constructed in a subclass.
tpie::pipelining::virtual_chunk_pull
Virtual chunk that has input and output.
Definition: virtual.h:64
tpie::pipelining::bits::virtsrc_impl
Concrete implementation of virtsrc.
Definition: virtual.h:119
tpie::pipelining::bits::assert_types_equal_and_return
Helper class that throws an exception on behalf of virtual_chunks that have not been assigned a pipe_...
Definition: virtual.h:342
tpie::pipelining::virtual_chunk
Virtual chunk that has input and output.
Definition: virtual.h:52
tpie::pipelining::virtual_chunk_pull_end::operator=
virtual_chunk_pull_end & operator=(pullpipe_end< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
Definition: virtual.h:752
tpie::pipelining::bits::virtrecv::begin
void begin() final
Begin pipeline processing phase.
Definition: virtual.h:174
tpie::pipelining::virtual_chunk::virtual_chunk
virtual_chunk(const virtual_chunk_end< Input > &left, const virtual_chunk_begin< Output > &right)
Construct a virtual chunk from an end and a begin chunk.
Definition: virtual.h:539
tpie::pipelining::virtual_chunk::operator|
virtual_chunk_end< Input > operator|(virtual_chunk_end< Output > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:589
tpie::pipelining::bits::virt_node::set_container
void set_container(virtual_container *ctr)
Set and/or reset the virtual_container assigned to this virtual node.
Definition: virtual.h:331
tpie::pipelining::node::add_pull_source
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
tpie::pipelining::pullpipe_begin
Definition: pipe_base.h:442
tpie::pipelining::pipe_begin
Definition: pipe_base.h:331
tpie::pipelining::virtual_chunk_pull_end::virtual_chunk_pull_end
virtual_chunk_pull_end()
Constructor that leaves the virtual chunk unassigned.
Definition: virtual.h:734
tpie::pipelining::virtual_chunk_pull_end
Virtual chunk that has no input (that is, virtual producer).
Definition: virtual.h:60
tpie::pipelining::virtual_chunk_pull_begin::virtual_chunk_pull_begin
virtual_chunk_pull_begin(pullpipe_begin< fact_t > &&pipe, virtual_container *ctr=nullptr)
Constructor that recursively constructs a node and takes ownership of it.
Definition: virtual.h:928
tpie::pipelining::pipe_end
Definition: pipe_base.h:212
tpie::pipelining::virtual_chunk_begin::operator|
virtual_chunk_begin< NextOutput > operator|(virtual_chunk< Output, NextOutput > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:680
tpie::pipelining::bits::maybe_add_const_ref
The maybe_add_const_ref helper struct adds const & to a type unless the type is already const,...
Definition: virtual.h:77
tpie::pipelining::bits::devirtualize_end_node
Definition: virtual.h:1070
tpie::pipelining::virtual_chunk::virtual_chunk
virtual_chunk(pipe_middle< fact_t > &&pipe, virtual_container *ctr=nullptr)
Constructor that recursively constructs a node and takes ownership of it.
Definition: virtual.h:527
tpie::pipelining::bits::devirtualization_factory
Definition: virtual.h:1147
tpie::pipelining::virtual_chunk_end::virtual_chunk_end
virtual_chunk_end(pipe_end< fact_t > &&pipe, virtual_container *ctr=nullptr)
Constructor that recursively constructs a node and takes ownership of it.
Definition: virtual.h:453
tpie::pipelining::virtual_chunk_pull_end::virtual_chunk_pull_end
virtual_chunk_pull_end(pullpipe_end< fact_t > &&pipe, virtual_container *ctr=nullptr)
Constructor that recursively constructs a node and takes ownership of it.
Definition: virtual.h:743
tpie::pipelining::bits::devirtualize_pull_end_node
Definition: virtual.h:1196
tpie::pipelining::virtual_chunk::virtual_chunk
virtual_chunk()
Constructor that leaves the virtual chunk unassigned.
Definition: virtual.h:517
tpie::pipelining::bits::virtpullrecv
Virtual node that is injected into the end of a virtual chunk.
Definition: virtual.h:239
tpie::pipelining::virtual_container
Virtual base class for extra data to go with virtual chunks.
Definition: virtual.h:40
tpie::pipelining::virtual_chunk_pull_begin::operator|
virtual_chunk_base operator|(virtual_chunk_pull_end< Output > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:955
tpie::pipelining::bits::virt_node::take_own
static ptr take_own(node *pipe)
Take std::new-ownership of given node.
Definition: virtual.h:309
tpie::pipelining::bits::virtpullsrc
Virtual base node that is injected into the beginning of a virtual chunk.
Definition: virtual.h:203
tpie::log_error
logstream & log_error()
Return logstream for writing error log messages.
Definition: tpie_log.h:148
tpie::pipelining::bits::virtpullrecv::begin
void begin() final
Begin pipeline processing phase.
Definition: virtual.h:261
tpie::pipelining::virtual_chunk::operator=
virtual_chunk & operator=(pipe_middle< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
Definition: virtual.h:552
tpie::pipelining::node::get_node_map
bits::node_map::ptr get_node_map() const
Get the local node map, mapping node IDs to node pointers for all the nodes reachable from this one.
Definition: node.h:250
tpie::pipelining::virtual_chunk_pull::operator|
virtual_chunk_pull< Input, NextOutput > operator|(virtual_chunk_pull< Output, NextOutput > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:857
tpie::pipelining::virtual_chunk_begin::virtual_chunk_begin
virtual_chunk_begin(pipe_begin< fact_t > &&pipe, virtual_container *ctr=nullptr)
Constructor that recursively constructs a node and takes ownership of it.
Definition: virtual.h:653
tpie::pipelining::node::set_plot_options
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:458
tpie::pipelining::factory_base
Base class of all pipelining factories.
Definition: factory_base.h:73
tpie::pipelining::virtual_chunk_pull_begin
Virtual chunk that has no output (that is, virtual consumer).
Definition: virtual.h:68
tpie::exception
Definition: exception.h:33
tpie::pipelining::node::add_push_destination
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
tpie::pipelining::virtual_chunk_pull::operator=
virtual_chunk_pull & operator=(pullpipe_middle< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
Definition: virtual.h:838
tpie::pipelining::devirtualize
pipe_end< termfactory< bits::devirtualize_end_node< Input >, virtual_chunk_end< Input > > > devirtualize(const virtual_chunk_end< Input > &out)
Convert a virtual_chunk_end for use at the end of a normal none virtual pipeline.
Definition: virtual.h:1276
tpie::pipelining::virtual_chunk_end
Virtual chunk that has no output (that is, virtual consumer).
Definition: virtual.h:48
tpie::pipelining::bits::virtual_chunk_base
Base class of virtual chunks. Owns a virt_node.
Definition: virtual.h:389
tpie::pipelining::node::set_name
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
tpie::pipelining::bits::devirtualization_pull_factory
Definition: virtual.h:1244
tpie::pipelining::pullpipe_end
Definition: pipe_base.h:382
tpie::pipelining::pipe_middle
Definition: pipe_base.h:243
tpie::pipelining::bits::virtrecv
Virtual node that is injected into the end of a virtual chunk.
Definition: virtual.h:151
tpie::pipelining::virtual_chunk_pull_begin::operator|
virtual_chunk_pull_begin< NextOutput > operator|(virtual_chunk_pull< Output, NextOutput > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:970
tpie::pipelining::bits::access
Definition: virtual.h:361
tpie::pipelining::virtual_chunk_pull::virtual_chunk_pull
virtual_chunk_pull()
Constructor that leaves the virtual chunk unassigned.
Definition: virtual.h:806
tpie::pipelining::virtual_chunk_pull::virtual_chunk_pull
virtual_chunk_pull(const virtual_chunk_pull_end< Input > &left, const virtual_chunk_pull_begin< Output > &right)
Construct a virtual chunk pull from an end and a begin chunk.
Definition: virtual.h:827
tpie::pipelining::bits::virtsrc
Virtual base node that is injected into the beginning of a virtual chunk.
Definition: virtual.h:107
tpie::pipelining::virtual_chunk_missing_middle
Definition: exception.h:54
tpie::pipelining::virtual_chunk_missing_end
Definition: exception.h:59
tpie::pipelining::bits::pipeline_base
Definition: pipeline.h:109
tpie::pipelining::pullpipe_middle
Definition: pipe_base.h:404
tpie::pipelining::bits::virtpullsrc_impl
Concrete implementation of virtsrc.
Definition: virtual.h:215
tpie::pipelining::bits::devirtualize_pull_begin_node
Definition: virtual.h:1173
tpie::pipelining::virtual_chunk_end::operator=
virtual_chunk_end & operator=(pipe_end< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
Definition: virtual.h:463
tpie::pipelining::virtual_chunk_begin::operator|
virtual_chunk_base operator|(virtual_chunk_end< Output > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:693
tpie::pipelining::input
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline.
Definition: file_stream.h:378
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::pipelining::virtual_chunk_end::virtual_chunk_end
virtual_chunk_end()
Constructor that leaves the virtual chunk unassigned.
Definition: virtual.h:444
tpie::pipelining::virtual_chunk_pull_begin::virtual_chunk_pull_begin
virtual_chunk_pull_begin()
Constructor that leaves the virtual chunk unassigned.
Definition: virtual.h:919
tpie::pipelining::virtual_chunk_missing_begin
Definition: exception.h:49
tpie::pipelining::virtual_chunk_begin::virtual_chunk_begin
virtual_chunk_begin()
Constructor that leaves the virtual chunk unassigned.
Definition: virtual.h:644
tpie::pipelining::bits::virt_node::combine
static ptr combine(ptr left, ptr right)
Aggregate ownership of virt_nodes.
Definition: virtual.h:319
tpie::pipelining::virtual_chunk_begin::operator=
virtual_chunk_begin & operator=(pipe_begin< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
Definition: virtual.h:662
tpie::pipelining::bits::virt_node
Ownership of nodes.
Definition: virtual.h:295
tpie::pipelining::bits::vfork_node
Definition: virtual.h:1043
tpie::pipelining::virtual_chunk::operator|
virtual_chunk< Input, NextOutput > operator|(virtual_chunk< Output, NextOutput > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:572
tpie::pipelining::node::get_token
const node_token & get_token() const
Get the node_token that maps this node's ID to a pointer to this.
Definition: node.h:631
tpie::pipelining::node_token
Definition: tokens.h:292
tpie::pipelining::output
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
Definition: file_stream.h:430
tpie::pipelining::node::node
node()
Default constructor, using a new node_token.
tpie::pipelining::bits::devirtualize_begin_node
Definition: virtual.h:1096
tpie::pipelining::virtual_chunk_pull::operator|
virtual_chunk_pull_end< Input > operator|(virtual_chunk_pull_end< Output > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:873
tpie::pipelining::virtual_chunk_begin
Virtual chunk that has no input (that is, virtual producer).
Definition: virtual.h:56
tpie::pipelining::virtual_chunk_pull::virtual_chunk_pull
virtual_chunk_pull(pullpipe_middle< fact_t > &&pipe, virtual_container *ctr=nullptr)
Constructor that recursively constructs a node and takes ownership of it.
Definition: virtual.h:816