TPIE

11a2c2d
join.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_PIPELINING_JOIN_H
21 #define TPIE_PIPELINING_JOIN_H
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_helpers.h>
25 #include <tpie/pipelining/pipe_base.h>
26 
27 namespace tpie::pipelining {
28 
37 template <typename T>
38 class join {
39 public:
40  class source_base : public node {
41  public:
42  source_base(node_token token)
43  : node(token)
44  {
45  }
46 
47  source_base(source_base &&) = default;
48 
49  virtual void push(const T & v) = 0;
50 
51  protected:
52  ~source_base() {}
53  };
54 
55  template <typename dest_t>
56  class source_impl : public source_base {
57  public:
58  source_impl(dest_t dest, node_token token, source_base ** the_source)
59  : source_base(token)
60  , the_source(the_source)
61  , dest(std::move(dest))
62  {
63  this->set_name("Join source", PRIORITY_INSIGNIFICANT);
64  this->add_push_destination(this->dest);
65  }
66 
67  source_impl(source_impl &&) = default;
68 
69  void prepare() override {
70  if (*the_source != NULL && *the_source != this) {
71  // If join.source() is used twice, the second construction of node()
72  // should fail since the node_token is already used.
73  // Thus, this exception should never be thrown.
74  throw exception("Attempted to set join source a second time");
75  }
76  *the_source = this;
77  };
78 
79  void push(const T & v) override {
80  dest.push(v);
81  }
82 
83  private:
84  source_base ** the_source;
85  dest_t dest;
86  };
87 
88  pipe_begin<factory<source_impl, node_token, source_base **> > source() {
89  return factory<source_impl, node_token, source_base **>(source_token, &the_source);
90  }
91 
92  class sink_impl : public node {
93  public:
94  typedef T item_type;
95 
96  sink_impl(node_token source_token, source_base ** the_source)
97  : the_source(the_source)
98  {
99  set_name("Join sink", PRIORITY_INSIGNIFICANT);
100  add_push_destination(source_token);
101  }
102 
103  void begin() override {
104  the_source_cache = *the_source;
105  }
106 
107  void push(const T & v) {
108  the_source_cache->push(v);
109  }
110 
111  private:
112  source_base * the_source_cache;
113  source_base ** the_source;
114  };
115 
116  pipe_end<termfactory<sink_impl, node_token, source_base **> > sink() {
117  return termfactory<sink_impl, node_token, source_base **>(source_token, &the_source);
118  }
119 
120  join() : the_source(NULL) {}
121 private:
122  source_base * the_source;
123  node_token source_token;
124 };
125 
126 } // namespace tpie::pipelining
127 
128 #endif // TPIE_PIPELINING_JOIN_H
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::pipelining::join::source_impl
Definition: join.h:56
tpie::pipelining::join
Joins multiple push streams into one.
Definition: join.h:38
tpie::exception
Definition: exception.h:33
tpie::pipelining::node::add_push_destination
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
tpie::pipelining::join::sink_impl
Definition: join.h:92
tpie::pipelining::node::set_name
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
tpie::pipelining::join::source_impl::prepare
void prepare() override
Called before memory assignment but after depending phases have executed and ended.
Definition: join.h:69
tpie::pipelining::join::source_base
Definition: join.h:40
tpie::pipelining::join::sink_impl::begin
void begin() override
Begin pipeline processing phase.
Definition: join.h:103
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::pipelining::node_token
Definition: tokens.h:292
tpie::pipelining::node::node
node()
Default constructor, using a new node_token.