TPIE

11a2c2d
std_glue.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2011, 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_STD_GLUE_H__
21 #define __TPIE_PIPELINING_STD_GLUE_H__
22 
23 #include <vector>
24 
25 #include <tpie/pipelining/node.h>
26 #include <tpie/pipelining/pipe_base.h>
27 #include <tpie/pipelining/factory_helpers.h>
28 
29 namespace tpie::pipelining {
30 namespace bits {
31 
32 template <typename dest_t, typename T, typename A>
33 class input_vector_t : public node {
34 public:
35  typedef T item_type;
36 
37  input_vector_t(dest_t dest, const std::vector<T, A> & input) : dest(std::move(dest)), input(input) {
38  add_push_destination(this->dest);
39  }
40 
41  input_vector_t(dest_t dest, std::vector<T, A> && input) = delete;
42 
43  void propagate() override {
44  forward("items", static_cast<stream_size_type>(input.size()));
45  set_steps(input.size());
46  }
47 
48  void go() override {
49  for (auto & i: input) {
50  dest.push(i);
51  step();
52  }
53  }
54 private:
55  dest_t dest;
56  const std::vector<T, A> & input;
57 };
58 
59 template <typename T, typename A>
60 class pull_input_vector_t : public node {
61 public:
62  typedef T item_type;
63 
64  pull_input_vector_t(const std::vector<T, A> & input) : input(input) {}
65 
66  pull_input_vector_t(std::vector<T, A> && input) = delete;
67 
68  void propagate() override {
69  forward("items", static_cast<stream_size_type>(input.size()));
70  }
71 
72  void begin() override {idx=0;};
73  bool can_pull() const {return idx < input.size();}
74  const T & peek() const {return input[idx];}
75  const T & pull() {return input[idx++];}
76 
77 private:
78  size_t idx;
79  const std::vector<T, A> & input;
80 };
81 
82 
83 template <typename T, typename A>
84 class output_vector_t : public node {
85 public:
86  typedef T item_type;
87 
88  output_vector_t(std::vector<T, A> & output) : output(output) {}
89 
90  output_vector_t(std::vector<T, A> && output) = delete;
91 
92  void push(const T & item) {
93  output.push_back(item);
94  }
95 private:
96  std::vector<item_type, A> & output;
97 };
98 
99 
100 template <typename dest_t, typename F>
101 class lambda_t: public node {
102 public:
103  typedef typename F::argument_type item_type;
104 
105  lambda_t(dest_t dest, const F & f): f(f), dest(std::move(dest)) {
106  }
107 
108  void push(const item_type & item) {
109  dest.push(f(item));
110  }
111 private:
112  F f;
113  dest_t dest;
114 };
115 
116 template <typename dest_t, typename F>
117 class exclude_lambda_t: public node {
118 public:
119  typedef typename F::argument_type item_type;
120 
121  exclude_lambda_t(dest_t dest, const F & f): f(f), dest(std::move(dest)) {
122  }
123 
124  void push(const item_type & item) {
125  typename F::result_type t=f(item);
126  if (t.second) dest.push(t.first);
127  }
128 private:
129  F f;
130  dest_t dest;
131 };
132 
133 
134 } // namespace bits
135 
141 template<typename T, typename A>
142 inline pipe_begin<tfactory<bits::input_vector_t, Args<T, A>, const std::vector<T, A> &> > input_vector(const std::vector<T, A> & input) {
143  return {input};
144 }
145 template<typename T, typename A>
146 pipe_begin<tfactory<bits::input_vector_t, Args<T, A>, const std::vector<T, A> &> > input_vector(std::vector<T, A> && input) = delete;
147 
153 template<typename T, typename A>
154 inline pullpipe_begin<termfactory<bits::pull_input_vector_t<T, A>, const std::vector<T, A> &> > pull_input_vector(const std::vector<T, A> & input) {
155  return {input};
156 }
157 template<typename T, typename A>
158 pullpipe_begin<termfactory<bits::pull_input_vector_t<T, A>, const std::vector<T, A> &> > pull_input_vector(std::vector<T, A> && input) = delete;
159 
164 template <typename T, typename A>
165 inline pipe_end<termfactory<bits::output_vector_t<T, A>, std::vector<T, A> &> > output_vector(std::vector<T, A> & output) {
166  return {output};
167 }
168 template <typename T, typename A>
169 pipe_end<termfactory<bits::output_vector_t<T, A>, std::vector<T, A> &> > output_vector(std::vector<T, A> && output) = delete;
170 
177 template <typename F>
179  return {f};
180 }
181 
191 template <typename F>
193  return {f};
194 }
195 
196 } // namespace tpie:: pipelining
197 
198 #endif //__TPIE_PIPELINING_STD_GLUE_H__
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::pipelining::bits::pull_input_vector_t::propagate
void propagate() override
Propagate stream metadata.
Definition: std_glue.h:68
tpie::pipelining::bits::input_vector_t::propagate
void propagate() override
Propagate stream metadata.
Definition: std_glue.h:43
tpie::pipelining::pull_input_vector
pullpipe_begin< termfactory< bits::pull_input_vector_t< T, A >, const std::vector< T, A > & > > pull_input_vector(const std::vector< T, A > &input)
Pipelining nodes that pushes the contents of the given vector to the next node in the pipeline.
Definition: std_glue.h:154
tpie::pipelining::pullpipe_begin
Definition: pipe_base.h:442
tpie::pipelining::pipe_begin
Definition: pipe_base.h:331
tpie::pipelining::output_vector
pipe_end< termfactory< bits::output_vector_t< T, A >, std::vector< T, A > & > > output_vector(std::vector< T, A > &output)
Pipelining node that pushes items to the given vector.
Definition: std_glue.h:165
tpie::pipelining::pipe_end
Definition: pipe_base.h:212
tpie::pipelining::lambda
pipe_middle< tfactory< bits::lambda_t, Args< F >, F > > lambda(const F &f)
Pipelining nodes that applies to given functor to items in the stream.
Definition: std_glue.h:178
tpie::pipelining::bits::lambda_t
Definition: std_glue.h:101
tpie::pipelining::bits::exclude_lambda_t
Definition: std_glue.h:117
tpie::pipelining::exclude_lambda
pipe_middle< tfactory< bits::exclude_lambda_t, Args< F >, F > > exclude_lambda(const F &f)
Pipelining nodes that applies to given functor to items in the stream.
Definition: std_glue.h:192
tpie::pipelining::bits::pull_input_vector_t
Definition: std_glue.h:60
tpie::pipelining::node::forward
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:563
tpie::pipelining::node::add_push_destination
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
tpie::pipelining::bits::output_vector_t
Definition: std_glue.h:84
tpie::pipelining::pipe_middle
Definition: pipe_base.h:243
tpie::pipelining::node::step
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:653
tpie::pipelining::node::set_steps
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
tpie::pipelining::input_vector
pipe_begin< tfactory< bits::input_vector_t, Args< T, A >, const std::vector< T, A > & > > input_vector(const std::vector< T, A > &input)
Pipelining nodes that pushes the contents of the given vector to the next node in the pipeline.
Definition: std_glue.h:142
tpie::pipelining::input
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline.
Definition: file_stream.h:378
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::pipelining::bits::pull_input_vector_t::begin
void begin() override
Begin pipeline processing phase.
Definition: std_glue.h:72
tpie::pipelining::bits::input_vector_t
Definition: std_glue.h:33
tpie::pipelining::output
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
Definition: file_stream.h:430
tpie::pipelining::bits::input_vector_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: std_glue.h:48