TPIE

11a2c2d
subpipeline.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2016, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 #ifndef __TPIE_PIPELINING_SUBPIPELINE_H__
20 #define __TPIE_PIPELINING_SUBPIPELINE_H__
21 
22 #include <tpie/tpie_export.h>
23 #include <tpie/pipelining/pipeline.h>
24 #include <tpie/pipelining/runtime.h>
26 
27 namespace tpie::pipelining {
28 namespace bits {
29 
30 class TPIE_EXPORT subpipeline_base: public pipeline_base_base {
31 public:
32  void begin(stream_size_type items, progress_indicator_base & pi,
33  memory_size_type filesAvailable, memory_size_type mem,
34  const char * file, const char * function);
35  void begin(stream_size_type items, progress_indicator_base & pi,
36  memory_size_type mem,
37  const char * file, const char * function) {
38  begin(items, pi, get_file_manager().available(), mem, file, function);
39  }
40  void end();
42 protected:
43  node * frontNode;
44 private:
45  gocontext_ptr gc;
46  std::unique_ptr<runtime> rt;
47 };
48 
49 template <typename item_type>
52 
53  virtual void push(const item_type &) = 0;
54 };
55 
56 template <typename item_type, typename fact_t>
57 struct subpipeline_impl: public subpipeline_virt<item_type> {
58  typename fact_t::constructed_type front;
59  subpipeline_impl(fact_t fact): front(fact.construct()) {
60  this->m_nodeMap = front.get_node_map();
61  this->frontNode = &front;
62  }
63 
64  subpipeline_impl(const subpipeline_impl &) = delete;
65  subpipeline_impl & operator=(const subpipeline_impl &) = delete;
67  subpipeline_impl & operator=(subpipeline_impl &&) = delete;
68 
69  void push(const item_type & item) override {
70  front.push(item);
71  };
72 
73 };
74 
75 template <typename item_type>
76 struct subpipeline_virt_impl: public subpipeline_virt<item_type> {
77  virtsrc<item_type> * front;
79 
81  : front(access::get_input(chunk))
82  , chunk(std::move(chunk)) {
83  this->m_nodeMap = front->get_node_map();
84  this->frontNode = front;
85  }
86 
88  subpipeline_virt_impl & operator=(const subpipeline_virt_impl &) = delete;
90  subpipeline_virt_impl & operator=(subpipeline_virt_impl &&) = delete;
91 
92  void push(const item_type & item) override {
93  front->push(item);
94  };
95 };
96 
97 } //namespace bits
98 
104 template <typename item_type>
105 struct subpipeline {
106  subpipeline() {}
107  subpipeline(subpipeline &&) = default;
108  subpipeline(const subpipeline &) = default;
109  subpipeline & operator=(subpipeline &&) = default;
110  subpipeline & operator=(const subpipeline &) = default;
111  subpipeline(const std::shared_ptr<bits::subpipeline_virt<item_type>> & p): p(p) {}
112 
113  template <typename T>
114  subpipeline(T from) {
115  *this = std::move(from);
116  }
117 
118  template <typename T>
119  subpipeline & operator=(T from) {
120  p.reset(new bits::subpipeline_impl<item_type, T>(std::move(from)));
121  return *this;
122  }
123 
124  subpipeline & operator=(virtual_chunk_end<item_type> from) {
125  p.reset(new bits::subpipeline_virt_impl<item_type>(std::move(from)));
126  return *this;
127  }
128 
129  void clear() {p.reset();}
130 
131  void push(const item_type & item) {p->push(item);}
132 
133  void begin(size_t filesAvailable, size_t memory) {
134  p->begin(1, p->pi, filesAvailable, memory, nullptr, nullptr);
135  }
136 
137  void begin(size_t memory) {
138  begin(get_file_manager().available(), memory);
139  }
140 
141  void begin(stream_size_type items, progress_indicator_base & pi,
142  memory_size_type filesAvailable, memory_size_type mem,
143  const char * file, const char * function) {
144  p->begin(items, pi, filesAvailable, mem, file, function);
145  }
146 
147  void begin(stream_size_type items, progress_indicator_base & pi,
148  memory_size_type mem,
149  const char * file, const char * function) {
150  begin(items, pi, get_file_manager().available(), mem, file, function);
151  }
152 
153  void end() {p->end();}
154 
155  void plot(std::ostream & os = std::cout) {
156  p->plot(os);
157  }
158 
159  void plot_full(std::ostream & os = std::cout) {
160  p->plot_full(os);
161  }
162 
163  bits::node_map::ptr get_node_map() const {
164  return p->get_node_map();
165  }
166 
167  bool can_fetch(std::string key) {
168  return p->can_fetch(key);
169  }
170 
171  any_noncopyable & fetch_any(std::string key) {
172  return p->fetch_any(key);
173  }
174 
175  template <typename T>
176  T & fetch(std::string key) {
177  any_noncopyable &a = fetch_any(key);
178  return any_cast<T>(a);
179  }
180 
181  void forward_any(std::string key, any_noncopyable value) {
182  return p->forward_any(key, std::move(value));
183  }
184 
185  template <typename T>
186  void forward(std::string key, T value) {
187  forward_any(key, any_noncopyable(value));
188  }
189 
190  void output_memory(std::ostream & o) const {p->output_memory(o);}
191 private:
192  std::shared_ptr<bits::subpipeline_virt<item_type>> p;
193 };
194 
195 } //namespace tpie::pipelining
196 
197 #endif //__TPIE_PIPELINING_SUBPIPELINE_H__
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::progress_indicator_base
The base class for indicating the progress of some task.
Definition: progress_indicator_base.h:62
tpie::file
Central file abstraction.
Definition: file.h:39
tpie::pipelining::bits::subpipeline_virt_impl
Definition: subpipeline.h:76
tpie::pipelining::bits::pipeline_base_base
Definition: pipeline.h:41
tpie::pipelining::subpipeline
Definition: subpipeline.h:105
tpie::pipelining::bits::subpipeline_impl
Definition: subpipeline.h:57
tpie::pipelining::node::get_node_map
bits::node_map::ptr get_node_map() const
Get the local node map, mapping node IDs to node pointers for all the nodes reachable from this one.
Definition: node.h:250
tpie::progress_indicator_null
a dummy progress indicator that produces no output
Definition: progress_indicator_null.h:39
tpie::pipelining::bits::subpipeline_virt
Definition: subpipeline.h:50
tpie::pipelining::virtual_chunk_end< item_type >
tpie::pipelining::bits::subpipeline_base
Definition: subpipeline.h:30
tpie::pipelining::bits::virtsrc< item_type >
virtual.h
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::pipelining::item_type
pipe_middle< tfactory< bits::item_type_t, Args< T > > > item_type()
Create item type defining identity pipe node.
Definition: helpers.h:654
tpie::get_file_manager
TPIE_EXPORT file_manager & get_file_manager()
Return a reference to the file manager.
tpie::pipelining::any_noncopyable
Definition: container.h:195