TPIE

11a2c2d
split.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2016, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_PIPELINING_SPLIT_H
21 #define TPIE_PIPELINING_SPLIT_H
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_helpers.h>
25 #include <tpie/pipelining/pipe_base.h>
26 
27 namespace tpie::pipelining {
28 
38 template <typename T>
39 class split {
40 public:
41  class source_base : public node {
42  public:
43  source_base() = default;
44  source_base(source_base &&) = default;
45 
46  virtual void push(const T & v) = 0;
47 
48  protected:
49  ~source_base() {}
50  };
51 
52  template <typename dest_t>
53  class source_impl : public source_base {
54  public:
55  source_impl(dest_t dest, node_token sink_token, std::vector<source_base *> & the_sources)
56  : the_sources(the_sources)
57  , dest(std::move(dest))
58  {
59  this->set_name("Split source", PRIORITY_INSIGNIFICANT);
60  this->add_push_destination(this->dest);
61 
62  this->get_node_map()->union_set(sink_token.get_map());
63  bits::node_map::ptr m = this->get_node_map()->find_authority();
64  m->add_relation(sink_token.id(), this->get_token().id(), bits::pushes);
65  }
66 
67  source_impl(source_impl &&) = default;
68 
69  void prepare() override {
70  the_sources.push_back(this);
71  };
72 
73  void push(const T & v) override {
74  dest.push(v);
75  }
76 
77  private:
78  std::vector<source_base *> & the_sources;
79  dest_t dest;
80  };
81 
82  pipe_begin<factory<source_impl, node_token, std::vector<source_base *> &> > source() {
83  return {sink_token, the_sources};
84  }
85 
86  class sink_impl : public node {
87  public:
88  typedef T item_type;
89 
90  sink_impl(node_token sink_token, std::vector<source_base *> & the_sources)
91  : node(sink_token), the_sources(the_sources)
92  {
93  set_name("Join sink", PRIORITY_INSIGNIFICANT);
94  }
95 
96  void push(const T & v) {
97  for (auto & source : the_sources)
98  source->push(v);
99  }
100 
101  private:
102  std::vector<source_base *> & the_sources;
103  };
104 
106  return {sink_token, the_sources};
107  }
108 
109 private:
110  std::vector<source_base *> the_sources;
111  node_token sink_token;
112 };
113 
114 } // namespace tpie::pipelining
115 
116 #endif // TPIE_PIPELINING_SPLIT_H
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::pipelining::pipe_end
Definition: pipe_base.h:212
tpie::pipelining::node::get_node_map
bits::node_map::ptr get_node_map() const
Get the local node map, mapping node IDs to node pointers for all the nodes reachable from this one.
Definition: node.h:250
tpie::pipelining::split
Split one push streams into multiple.
Definition: split.h:39
tpie::pipelining::node::add_push_destination
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
tpie::pipelining::node::set_name
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
tpie::pipelining::split::source_impl
Definition: split.h:53
tpie::pipelining::split::sink_impl
Definition: split.h:86
tpie::pipelining::split::source_base
Definition: split.h:41
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::pipelining::split::source_impl::prepare
void prepare() override
Called before memory assignment but after depending phases have executed and ended.
Definition: split.h:69
tpie::pipelining::node_token
Definition: tokens.h:292
tpie::pipelining::node::node
node()
Default constructor, using a new node_token.