TPIE

11a2c2d
helpers.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2011, 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_HELPERS_H__
21 #define __TPIE_PIPELINING_HELPERS_H__
22 
23 #include <iostream>
24 #include <tpie/pipelining/node.h>
25 #include <tpie/pipelining/pipe_base.h>
26 #include <tpie/pipelining/factory_helpers.h>
27 #include <tpie/memory.h>
28 #include <tpie/tpie_assert.h>
29 
30 namespace tpie::pipelining {
31 namespace bits {
32 
33 template <typename dest_t>
34 class ostream_logger_t : public node {
35 public:
36  typedef typename push_type<dest_t>::type item_type;
37 
38  ostream_logger_t(dest_t dest, std::ostream & log) : dest(std::move(dest)), log(log), begun(false), ended(false) {
39  add_push_destination(this->dest);
40  set_name("Log", PRIORITY_INSIGNIFICANT);
41  }
42  void begin() override {
43  node::begin();
44  begun = true;
45  }
46  void end() override {
47  node::end();
48  ended = true;
49  }
50  void push(const item_type & item) {
51  if (!begun) {
52  log << "WARNING: push() called before begin(). Calling begin on rest of pipeline." << std::endl;
53  begin();
54  }
55  if (ended) {
56  log << "WARNING: push() called after end()." << std::endl;
57  ended = false;
58  }
59  log << "pushing " << item << std::endl;
60  dest.push(item);
61  }
62 private:
63  dest_t dest;
64  std::ostream & log;
65  bool begun;
66  bool ended;
67 };
68 
69 template <typename source_t>
70 class pull_peek_t : public node {
71 public:
72  typedef typename pull_type<source_t>::type item_type;
73 
74  pull_peek_t(source_t source) : source(std::move(source)) {
75  add_pull_source(this->source);
76  set_plot_options(PLOT_SIMPLIFIED_HIDE);
77  }
78 
79  void begin() override {
80  could_pull = source.can_pull();
81  if (could_pull) item=source.pull();
82  }
83 
84  item_type pull() {
85  item_type i = item;
86  could_pull = source.can_pull();
87  if (could_pull) item=source.pull();
88  return i;
89  }
90 
91  const item_type & peek() const {
92  return item;
93  }
94 
95  bool can_pull() const {
96  return could_pull;
97  }
98 
99 private:
100  item_type item;
101  bool could_pull;
102  source_t source;
103 };
104 
105 template <typename T>
106 class dummydest_t : public node {
107 public:
108  dummydest_t() : buffer(std::make_shared<T>()) {}
109 
110  typedef T item_type;
111  std::shared_ptr<T> buffer;
112  void push(const T & el) {
113  *buffer = el;
114  }
115  T pull() {
116  return *buffer;
117  }
118 };
119 
120 template <typename dest_t, typename fact2_t>
121 class fork_t: public node {
122 public:
123  typedef typename fact2_t::constructed_type dest2_t;
124  typedef typename push_type<dest_t>::type item_type;
125 
126  fork_t(dest_t dest, fact2_t fact2) : dest(std::move(dest)), dest2(fact2.construct()) {
127  add_push_destination(this->dest);
128  add_push_destination(dest2);
129  }
130 
131  void push(const item_type & item) {
132  dest.push(item);
133  dest2.push(item);
134  }
135 
136 private:
137  dest_t dest;
138  dest2_t dest2;
139 };
140 
141 
142 template <typename source_t, typename dest_fact_t>
143 class pull_fork_t: public node {
144 public:
145  typedef typename pull_type<source_t>::type item_type;
146 
147  pull_fork_t(source_t source, dest_fact_t dest_fact)
148  : dest(dest_fact.construct())
149  , source(std::move(source)) {
150  add_pull_source(this->source);
151  add_push_destination(dest);
152  }
153 
154  bool can_pull() {return source.can_pull();}
155 
156  item_type pull() {
157  item_type i=source.pull();
158  dest.push(i);
159  return i;
160  }
161 
162 private:
163  typename dest_fact_t::constructed_type dest;
164  source_t source;
165 };
166 
167 
168 template <typename T>
169 class null_sink_t: public node {
170 public:
171  typedef T item_type;
172 
173  void push(const T &) {}
174 };
175 
176 
177 template <typename T>
178 class zero_source_t: public node {
179 public:
180  typedef T item_type;
181  void propagate() override { forward("items", stream_size_type(0)); };
182 
183  T pull() {tpie_unreachable();}
184  bool can_pull() {return false;}
185 };
186 
187 
188 template <typename IT>
190  IT i;
191  IT till;
192 public:
193  typedef typename IT::value_type item_type;
194  pull_input_iterator_t(IT from, IT to)
195  : i(from)
196  , till(to)
197  {
198  }
199 
200  bool can_pull() {
201  return i != till;
202  }
203 
204  item_type pull() {
205  return *i++;
206  }
207 };
208 
209 template <typename dest_t, typename IT>
211  IT i;
212  IT till;
213  dest_t dest;
214 public:
215  push_input_iterator_t(dest_t dest, IT from, IT to)
216  : i(from)
217  , till(to)
218  , dest(std::move(dest)) {
219  add_push_destination(dest);
220  }
221 
222  void go() override {
223  while (i != till) {
224  dest.push(*i);
225  ++i;
226  }
227  }
228 };
229 
230 template <typename Iterator, typename Item = void>
232 
233 template <typename Iterator>
234 class push_output_iterator_t<Iterator, void> : public node {
235  Iterator i;
236 public:
237  typedef typename Iterator::value_type item_type;
238  push_output_iterator_t(Iterator to)
239  : i(to)
240  {
241  }
242 
243  void push(const item_type & item) {
244  *i = item;
245  ++i;
246  }
247 };
248 
249 template <typename Iterator, typename Item>
250 class push_output_iterator_t : public node {
251  Iterator i;
252 public:
253  typedef Item item_type;
254  push_output_iterator_t(Iterator to)
255  : i(to)
256  {
257  }
258 
259  void push(const item_type & item) {
260  *i = item;
261  ++i;
262  }
263 };
264 
265 template <typename dest_t, typename IT>
267  IT i;
268  dest_t dest;
269 public:
270  pull_output_iterator_t(dest_t dest, IT to)
271  : i(to)
272  , dest(std::move(dest)) {
273  add_pull_source(dest);
274  }
275 
276  void go() override {
277  while (dest.can_pull()) {
278  *i = dest.pull();
279  ++i;
280  }
281  }
282 };
283 
284 template <typename dest_t, typename F>
285 class preparer_t: public node {
286 private:
287  F functor;
288  dest_t dest;
289 public:
290  typedef typename push_type<dest_t>::type item_type;
291  preparer_t(dest_t dest, const F & functor): functor(functor), dest(std::move(dest)) {
292  add_push_destination(this->dest);
293  }
294 
295  void prepare() override {
296  functor(*static_cast<node*>(this));
297  }
298 
299  void push(const item_type & item) {dest.push(item);}
300 };
301 
302 template <typename dest_t, typename F>
303 class propagater_t: public node {
304 private:
305  F functor;
306  dest_t dest;
307 public:
308  typedef typename push_type<dest_t>::type item_type;
309  propagater_t(dest_t dest, const F & functor): functor(functor), dest(std::move(dest)) {
310  add_push_destination(this->dest);
311  }
312 
313  void propagate() override {
314  functor(*static_cast<node*>(this));
315  }
316 
317  void push(const item_type & item) {dest.push(item);}
318 };
319 
320 template <typename dest_t, typename src_fact_t>
321 struct zip_t: public node {
322  typedef typename src_fact_t::constructed_type src_t;
323  typedef typename push_type<dest_t>::type::first_type item_type;
324 
325  zip_t(dest_t dest, src_fact_t src_fact)
326  : src(src_fact.construct()), dest(std::move(dest)) {
327  add_push_destination(this->dest);
328  add_pull_source(src);
329  }
330 
331  void push(const item_type & item) {
332  tp_assert(src.can_pull(), "We should be able to pull");
333  dest.push(std::make_pair(item, src.pull()));
334  }
335 private:
336  src_t src;
337  dest_t dest;
338 };
339 
340 template <typename dest1_t, typename fact2_t>
341 struct unzip_t: public node {
342  typedef typename fact2_t::constructed_type dest2_t;
343  typedef typename push_type<dest1_t>::type first_type;
344  typedef typename push_type<dest2_t>::type second_type;
345  typedef std::pair<first_type, second_type> item_type;
346 
347  unzip_t(dest1_t dest1, fact2_t fact2) : dest1(std::move(dest1)), dest2(fact2.construct()) {
348  add_push_destination(this->dest1);
349  add_push_destination(dest2);
350  }
351 
352  void push(const item_type & item) {
353  dest2.push(item.second);
354  dest1.push(item.first);
355  }
356 private:
357  dest1_t dest1;
358  dest2_t dest2;
359 };
360 
361 template <typename dest_t, typename T>
362 class item_type_t: public node {
363 private:
364  dest_t dest;
365 public:
366  typedef T item_type;
367  item_type_t(dest_t dest): dest(std::move(dest)) {}
368  void push(const item_type & item) {dest.push(item);}
369 };
370 
371 
372 template <typename dest_t, typename fact_t>
373 class pull_source_t: public node {
374 public:
375  typedef typename fact_t::constructed_type source_t;
376  typedef typename push_type<dest_t>::type item_type;
377 
378  pull_source_t(dest_t dest, fact_t fact)
379  : dest(std::move(dest)), src(fact.construct()) {
380  add_push_destination(dest);
381  add_pull_source(src);
382  }
383 
384  void propagate() override {
385  size_t size = fetch<stream_size_type>("items");
386  set_steps(size);
387  }
388 
389  void go() override {
390  while (src.can_pull()) {
391  dest.push(src.pull());
392  step();
393  }
394  }
395 
396 private:
397  dest_t dest;
398  source_t src;
399 };
400 
401 template <typename dest_t, typename equal_t>
402 class unique_t : public node {
403 public:
404  typedef typename push_type<dest_t>::type item_type;
405 
406  unique_t(dest_t dest, equal_t equal)
407  : equal(equal), dest(std::move(dest)) {}
408 
409  void begin() override {
410  first = true;
411  }
412 
413  void push(const item_type & item) {
414  if (!first && equal(prev, item))
415  return;
416  first = false;
417  dest.push(item);
418  prev = item;
419  }
420 
421 private:
422  bool first;
423  equal_t equal;
424  item_type prev;
425  dest_t dest;
426 };
427 
428 template <typename src_t, typename equal_t>
429 class pull_unique_t : public node {
430 public:
431  typedef typename pull_type<src_t>::type item_type;
432 
433  pull_unique_t(src_t src, equal_t equal)
434  : equal(equal), src(std::move(src)) {}
435 
436  void begin() override {
437  has_item = src.can_pull();
438  if (!has_item) return;
439  item = src.pull();
440  }
441 
442  bool can_pull() {
443  return has_item;
444  }
445 
446  item_type over() {
447  has_item = false;
448  return item;
449  }
450 
451  item_type pull() {
452  if (!src.can_pull()) return over();
453 
454  item_type next;
455  do next = src.pull();
456  while (src.can_pull() && equal(item, next));
457 
458  if (equal(item, next)) return over();
459  std::swap(next, item);
460  return next;
461  }
462 
463 private:
464  bool has_item;
465  equal_t equal;
466  item_type item;
467  src_t src;
468 };
469 
470 } // namespace bits
471 
476 inline pipe_middle<factory<bits::ostream_logger_t, std::ostream &> >
478  return {std::cout};
479 }
480 
485 
486 
493 template <typename fact_t>
496  return {std::move(to.factory)};
497 }
498 
505 template <typename dest_fact_t>
506 pullpipe_middle<tfactory<bits::pull_fork_t, Args<dest_fact_t>, dest_fact_t> >
507 pull_fork(dest_fact_t dest_fact) {
508  return {std::move(dest_fact)};
509 }
510 
517 template <typename fact_t>
518 pipe_middle<tfactory<bits::unzip_t, Args<fact_t>, fact_t> >
520  return {std::move(to.factory)};
521 }
522 
530 template <typename fact_t>
531 pipe_middle<tfactory<bits::zip_t, Args<fact_t>, fact_t> >
533  return {std::move(from.factory)};
534 }
535 
541 template <typename T>
542 inline pipe_end<termfactory<bits::null_sink_t<T>>>
543 null_sink() {return {};}
544 
550 template <typename T>
551 inline pullpipe_begin<termfactory<bits::zero_source_t<T>>>
552 zero_source() {return {}; }
553 
554 
555 template <template <typename dest_t> class Fact, typename... T>
556 pipe_begin<factory<Fact, T...> > make_pipe_begin(T... t) {
557  return {t...};
558 }
559 
560 template <template <typename dest_t> class Fact, typename... T>
561 pipe_middle<factory<Fact, T...> > make_pipe_middle(T... t) {
562  return {t...};
563 }
564 
565 template <typename Fact, typename... T>
566 pipe_end<termfactory<Fact, T...> > make_pipe_end(T ... t) {
567  return {t...};
568 }
569 
575 template <typename IT>
577  return {begin, end};
578 }
579 
586 template <typename IT>
588  return {begin, end};
589 }
590 
597 template <typename IT>
599  return {to};
600 }
601 
609 template <typename Item, typename IT>
611  return {to};
612 }
613 
620 template <typename IT>
622  return {to};
623 }
624 
631 template <typename F>
633  return {functor};
634 }
635 
642 template <typename F>
644  return {functor};
645 }
646 
653 template <typename T>
655  return {};
656 }
657 
663 template <typename fact_t>
664 pipe_begin<tfactory<bits::pull_source_t, Args<fact_t>, fact_t> >
666  return {std::move(from.factory)};
667 }
668 
675 template <typename equal_t>
677  return {equal};
678 }
679 
686 template <typename equal_t>
688  return {equal};
689 }
690 
691 } //namespace tpie::pipelining
692 #endif //__TPIE_PIPELINING_HELPERS_H__
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::pipelining::bits::pull_peek_t
Definition: helpers.h:70
tpie::pipelining::node::begin
virtual void begin()
Begin pipeline processing phase.
Definition: node.h:298
tpie::pipelining::bits::ostream_logger_t::end
void end() override
End pipeline processing phase.
Definition: helpers.h:46
tpie_assert.h
tpie::pipelining::pull_unique
pullpipe_middle< tfactory< bits::pull_unique_t, Args< equal_t >, equal_t > > pull_unique(equal_t equal)
Pull version of unique.
Definition: helpers.h:687
tpie::pipelining::bits::fork_t
Definition: helpers.h:121
tpie::pipelining::push_output_iterator
pipe_end< termfactory< bits::push_output_iterator_t< IT >, IT > > push_output_iterator(IT to)
A node that writes its given items to the destination pointed to by the given iterator.
Definition: helpers.h:598
tpie::pipelining::bits::ostream_logger_t::begin
void begin() override
Begin pipeline processing phase.
Definition: helpers.h:42
tpie::pipelining::bits::pull_output_iterator_t
Definition: helpers.h:266
tpie::pipelining::node::add_pull_source
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
tpie::pipelining::pullpipe_begin
Definition: pipe_base.h:442
tpie::pipelining::fork
pipe_middle< tfactory< bits::fork_t, Args< fact_t >, fact_t > > fork(pipe_end< fact_t > to)
Create a fork pipe node.
Definition: helpers.h:495
tpie::pipelining::pipe_begin
Definition: pipe_base.h:331
tpie::pipelining::bits::pull_source_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: helpers.h:389
tpie::pipelining::unique
pipe_middle< tfactory< bits::unique_t, Args< equal_t >, equal_t > > unique(equal_t equal)
Filter consecutive duplicates out.
Definition: helpers.h:676
tpie::pipelining::pipe_end
Definition: pipe_base.h:212
tpie::pipelining::pull_fork
pullpipe_middle< tfactory< bits::pull_fork_t, Args< dest_fact_t >, dest_fact_t > > pull_fork(dest_fact_t dest_fact)
Create a pulling fork pipe node.
Definition: helpers.h:507
tpie::pipelining::bits::pull_source_t::propagate
void propagate() override
Propagate stream metadata.
Definition: helpers.h:384
tpie::pipelining::pull_input_iterator
pullpipe_begin< termfactory< bits::pull_input_iterator_t< IT >, IT, IT > > pull_input_iterator(IT begin, IT end)
A pull-pipe that returns items in the range given by two iterators.
Definition: helpers.h:576
tpie::pipelining::bits::pull_fork_t
Definition: helpers.h:143
tpie::pipelining::bits::pull_input_iterator_t
Definition: helpers.h:189
tpie::pipelining::zero_source
pullpipe_begin< termfactory< bits::zero_source_t< T > > > zero_source()
Create a dummy pull begin pipe node.
Definition: helpers.h:552
tpie::pipelining::pull_source
pipe_begin< tfactory< bits::pull_source_t, Args< fact_t >, fact_t > > pull_source(pullpipe_begin< fact_t > from)
A node that pulls items from source and push them into dest.
Definition: helpers.h:665
tpie::pipelining::propagater
pipe_middle< tfactory< bits::propagater_t, Args< F >, F > > propagater(const F &functor)
Create propagate callback identity pipe node.
Definition: helpers.h:643
tpie::pipelining::bits::zero_source_t
Definition: helpers.h:178
tpie::pipelining::bits::unzip_t
Definition: helpers.h:341
tpie::pipelining::push_type
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:158
tpie::pipelining::pull_type
Definition: node_traits.h:165
tpie::pipelining::bits::propagater_t::propagate
void propagate() override
Propagate stream metadata.
Definition: helpers.h:313
tpie::pipelining::node::set_plot_options
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:458
tpie::pipelining::bits::push_input_iterator_t
Definition: helpers.h:210
tpie::pipelining::node::forward
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:563
tpie::pipelining::node::add_push_destination
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
tp_assert
#define tp_assert(condition, message)
Definition: tpie_assert.h:65
tpie::pipelining::bits::zip_t
Definition: helpers.h:321
tpie::pipelining::node::set_name
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
tpie::pipelining::bits::preparer_t
Definition: helpers.h:285
tpie::pipelining::pullpipe_end
Definition: pipe_base.h:382
tpie::pipelining::pipe_middle
Definition: pipe_base.h:243
tpie::pipelining::preparer
pipe_middle< tfactory< bits::preparer_t, Args< F >, F > > preparer(const F &functor)
Create preparer callback identity pipe node.
Definition: helpers.h:632
tpie::pipelining::pull_peek
pullpipe_middle< factory< bits::pull_peek_t > > pull_peek
A node that allows peeking at the next item in the pipeline.
Definition: helpers.h:484
tpie::pipelining::bits::zero_source_t::propagate
void propagate() override
Propagate stream metadata.
Definition: helpers.h:181
tpie::pipelining::bits::preparer_t::prepare
void prepare() override
Called before memory assignment but after depending phases have executed and ended.
Definition: helpers.h:295
tpie::pipelining::bits::ostream_logger_t
Definition: helpers.h:34
tpie::pipelining::bits::pull_peek_t::begin
void begin() override
Begin pipeline processing phase.
Definition: helpers.h:79
tpie::pipelining::unzip
pipe_middle< tfactory< bits::unzip_t, Args< fact_t >, fact_t > > unzip(pipe_end< fact_t > to)
Create unzip pipe node.
Definition: helpers.h:519
tpie::pipelining::bits::unique_t
Definition: helpers.h:402
tpie::pipelining::bits::pull_unique_t::begin
void begin() override
Begin pipeline processing phase.
Definition: helpers.h:436
tpie::pipelining::null_sink
pipe_end< termfactory< bits::null_sink_t< T > > > null_sink()
Create a dummy end pipe node.
Definition: helpers.h:543
tpie::pipelining::bits::push_input_iterator_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: helpers.h:222
tpie::pipelining::node::step
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:653
tpie::pipelining::node::set_steps
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
tpie::pipelining::bits::dummydest_t
Definition: helpers.h:106
tpie::pipelining::cout_logger
pipe_middle< factory< bits::ostream_logger_t, std::ostream & > > cout_logger()
A pipelining node that writes items to standard out and then pushes them to the next node.
Definition: helpers.h:477
tpie::pipelining::pullpipe_middle
Definition: pipe_base.h:404
tpie::pipelining::bits::null_sink_t
Definition: helpers.h:169
tpie::pipelining::node::end
virtual void end()
End pipeline processing phase.
Definition: node.h:328
tpie::pipelining::pull_output_iterator
pullpipe_end< tfactory< bits::pull_output_iterator_t, Args< IT >, IT > > pull_output_iterator(IT to)
A pull-pipe node that writes its given items to the destination pointed to by the given iterator.
Definition: helpers.h:621
tpie::pipelining::typed_push_output_iterator
pipe_end< termfactory< bits::push_output_iterator_t< IT, Item >, IT > > typed_push_output_iterator(IT to)
A node that writes its given items to the destination pointed to by the given iterator.
Definition: helpers.h:610
tpie::pipelining::bits::push_output_iterator_t
Definition: helpers.h:231
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::pipelining::bits::unique_t::begin
void begin() override
Begin pipeline processing phase.
Definition: helpers.h:409
tpie::pipelining::zip
pipe_middle< tfactory< bits::zip_t, Args< fact_t >, fact_t > > zip(pullpipe_begin< fact_t > from)
Create a zip pipe node.
Definition: helpers.h:532
tpie::pipelining::bits::propagater_t
Definition: helpers.h:303
tpie::pipelining::push_input_iterator
pipe_begin< tfactory< bits::push_input_iterator_t, Args< IT >, IT, IT > > push_input_iterator(IT begin, IT end)
A pipelining node that pushes the items in the range given by two iterators.
Definition: helpers.h:587
tpie::pipelining::bits::pull_unique_t
Definition: helpers.h:429
tpie::pipelining::item_type
pipe_middle< tfactory< bits::item_type_t, Args< T > > > item_type()
Create item type defining identity pipe node.
Definition: helpers.h:654
tpie::pipelining::bits::pull_output_iterator_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: helpers.h:276
memory.h
tpie::pipelining::bits::pull_source_t
Definition: helpers.h:373
tpie::pipelining::bits::item_type_t
Definition: helpers.h:362