TPIE

11a2c2d
pipeline.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2011, 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_PIPELINE_H__
21 #define __TPIE_PIPELINING_PIPELINE_H__
22 
23 #include <tpie/tpie_export.h>
24 #include <tpie/types.h>
25 #include <iostream>
26 #include <tpie/pipelining/tokens.h>
28 #include <tpie/file_manager.h>
29 #include <tpie/memory.h>
30 #include <unordered_set>
31 #include <mutex>
32 
33 namespace tpie::pipelining {
34 namespace bits {
35 
36 
41 class TPIE_EXPORT pipeline_base_base {
42 public:
44  pipeline_base_base(const pipeline_base_base &) = default;
45  pipeline_base_base & operator=(const pipeline_base_base &) = default;
47  pipeline_base_base & operator=(pipeline_base_base &&) = default;
48 
64  void plot(std::ostream & out) {plot_impl(out, false);}
65 
76  void plot_full(std::ostream & out) {plot_impl(out, true);}
77 
81  virtual ~pipeline_base_base() {}
82 
83  void forward_any(std::string key, any_noncopyable value);
84 
85  bool can_fetch(std::string key);
86 
87  any_noncopyable & fetch_any(std::string key);
88 
89  node_map::ptr get_node_map() const {
90  return m_nodeMap;
91  }
92 
93  void output_memory(std::ostream & o) const;
94 
95  size_t uid() const {return m_uid;};
96 protected:
97 
98  node_map::ptr m_nodeMap;
99  size_t m_uid;
100 private:
101  void plot_impl(std::ostream & out, bool full);
102 };
103 
104 
109 class TPIE_EXPORT pipeline_base: public pipeline_base_base {
110 public:
114  void operator()(stream_size_type items, progress_indicator_base & pi,
115  memory_size_type filesAvailable, memory_size_type mem,
116  const char * file, const char * function);
117 
122  void operator()(stream_size_type items, progress_indicator_base & pi,
123  memory_size_type mem,
124  const char * file, const char * function) {
125  operator()(items, pi, get_file_manager().available(), mem, file, function);
126  }
127 
128  double memory() const {
129  return m_memory;
130  }
131 
132  void order_before(pipeline_base & other);
133 protected:
134  double m_memory;
135 };
136 
142 template <typename fact_t>
143 class pipeline_impl : public pipeline_base {
144 public:
145  typedef typename fact_t::constructed_type gen_t;
146 
147  pipeline_impl(fact_t & factory) {
148  this->m_memory = factory.memory();
149 
150  auto n = std::unique_ptr<gen_t>(new gen_t(factory.construct()));
151  this->m_nodeMap = n->get_node_map()->find_authority();
152  this->m_nodeMap->increment_pipeline_ref();
153  this->m_nodeMap->add_owned_node(std::move(n));
154  }
155 
156  pipeline_impl(const pipeline_impl & o) = delete;
157  pipeline_impl & operator=(const pipeline_impl & o) = delete;
158 
159  pipeline_impl(pipeline_impl && o) = default;
160  pipeline_impl & operator=(pipeline_impl && o) = default;
161 
162  ~pipeline_impl() override {
163  if (this->m_nodeMap) {
164  this->m_nodeMap->find_authority()->decrement_pipeline_ref();
165  }
166  }
167 };
168 
169 } // namespace bits
170 
171 
172 TPIE_EXPORT extern std::unordered_set<bits::pipeline_base_base *> current_pipelines;
173 TPIE_EXPORT extern std::mutex current_pipelines_mutex;
174 
181 class TPIE_EXPORT pipeline {
182 public:
183  pipeline() {}
184  pipeline(pipeline &&) = default;
185  pipeline(const pipeline &) = default;
186  pipeline & operator=(pipeline &&) = default;
187  pipeline & operator=(const pipeline &) = default;
188 
189  template <typename T>
190  pipeline(T from) {
191  *this = std::move(from);
192  }
193 
194  template <typename T>
195  pipeline & operator=(T from) {
196  p.reset(new T(std::move(from)));
197  return *this;
198  }
199 
200  pipeline(const std::shared_ptr<bits::pipeline_base> & p): p(p) {}
201 
202  void operator()() {
204  (*p)(1, pi, get_file_manager().available(), get_memory_manager().available(), nullptr, nullptr);
205  }
206 
207  void operator()(stream_size_type items, progress_indicator_base & pi,
208  const char * file, const char * function) {
209  (*p)(items, pi, get_file_manager().available(), get_memory_manager().available(), file, function);
210  }
211 
212  void operator()(stream_size_type items, progress_indicator_base & pi,
213  memory_size_type mem,
214  const char * file, const char * function) {
215  (*p)(items, pi, get_file_manager().available(), mem, file, function);
216  }
217 
218  void operator()(stream_size_type items, progress_indicator_base & pi,
219  memory_size_type filesAvailable, memory_size_type mem,
220  const char * file, const char * function) {
221  (*p)(items, pi, filesAvailable, mem, file, function);
222  }
223 
224  void plot(std::ostream & os = std::cout) {
225  p->plot(os);
226  }
227 
228  void plot_full(std::ostream & os = std::cout) {
229  p->plot_full(os);
230  }
231 
232  inline double memory() const {
233  return p->memory();
234  }
235 
236  bits::node_map::ptr get_node_map() const {
237  return p->get_node_map();
238  }
239 
240  bool can_fetch(std::string key) {
241  return p->can_fetch(key);
242  }
243 
244  any_noncopyable & fetch_any(std::string key) {
245  return p->fetch_any(key);
246  }
247 
248  template <typename T>
249  T & fetch(std::string key) {
250  any_noncopyable &a = fetch_any(key);
251  return any_cast<T>(a);
252  }
253 
254  void forward_any(std::string key, any_noncopyable value) {
255  p->forward_any(key, std::move(value));
256  }
257 
258  template <typename T>
259  void forward(std::string key, T value) {
260  forward_any(key, any_noncopyable(std::move(value)));
261  }
262 
263  pipeline & then(pipeline & other) {
264  p->order_before(*other.p);
265  return other;
266  }
267 
268  void output_memory(std::ostream & o) const {p->output_memory(o);}
269 private:
270  std::shared_ptr<bits::pipeline_base> p;
271 };
272 
273 } // namespace tpie::pipelining
274 
275 #endif // __TPIE_PIPELINING_PIPELINE_H__
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::progress_indicator_base
The base class for indicating the progress of some task.
Definition: progress_indicator_base.h:62
tpie::pipelining::factory
Definition: factory_helpers.h:35
tpie::file
Central file abstraction.
Definition: file.h:39
tpie::get_memory_manager
TPIE_EXPORT memory_manager & get_memory_manager()
Return a reference to the memory manager.
types.h
tpie::pipelining::factory_base::memory
void memory(double amount) noexcept
Set memory fraction for this node in the pipeline phase.
Definition: factory_base.h:87
tpie::resource_manager::available
size_t available() const noexcept
Return the amount of the resource still available to be assigned.
tpie::pipelining::bits::pipeline_base_base
Definition: pipeline.h:41
tpie::progress_indicator_null
a dummy progress indicator that produces no output
Definition: progress_indicator_null.h:39
tpie::pipelining::bits::pipeline_base_base::plot_full
void plot_full(std::ostream &out)
Generate a GraphViz plot of the actor graph.
Definition: pipeline.h:76
tpie::pipelining::bits::pipeline_base::operator()
void operator()(stream_size_type items, progress_indicator_base &pi, memory_size_type mem, const char *file, const char *function)
Invoke the pipeline with amount of available files automatically configured.
Definition: pipeline.h:122
progress_indicator_null.h
tpie::pipelining::bits::pipeline_base
Definition: pipeline.h:109
tokens.h
tpie::pipelining::pipeline
Definition: pipeline.h:181
tpie::pipelining::bits::pipeline_impl
Definition: pipeline.h:143
tpie::pipelining::bits::pipeline_base_base::plot
void plot(std::ostream &out)
Generate a GraphViz plot of the pipeline.
Definition: pipeline.h:64
tpie::pipelining::bits::pipeline_base_base::~pipeline_base_base
virtual ~pipeline_base_base()
Virtual dtor.
Definition: pipeline.h:81
tpie::get_file_manager
TPIE_EXPORT file_manager & get_file_manager()
Return a reference to the file manager.
memory.h
tpie::pipelining::any_noncopyable
Definition: container.h:195