TPIE

11a2c2d
file_stream.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2011, 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_FILE_STREAM_H__
21 #define __TPIE_PIPELINING_FILE_STREAM_H__
22 
23 #include <tpie/file_stream.h>
24 
25 #include <tpie/pipelining/node.h>
26 #include <tpie/pipelining/factory_helpers.h>
27 #include <tpie/pipelining/pipe_base.h>
28 #include <tpie/maybe.h>
29 #include <tpie/flags.h>
30 
31 namespace tpie::pipelining {
32 
33 enum stream_option {
34  STREAM_RESET=1,
35  STREAM_CLOSE=2
36 };
37 
38 TPIE_DECLARE_OPERATORS_FOR_FLAGS(stream_option)
39 typedef tpie::flags<stream_option> stream_options;
40 
41 namespace bits {
42 
48 template <typename dest_t>
49 class input_t : public node {
50 public:
51  typedef typename push_type<dest_t>::type item_type;
52 
53  input_t(dest_t dest, file_stream<item_type> & fs, stream_options options) : options(options), fs(fs), dest(std::move(dest)) {
54  add_push_destination(this->dest);
55  set_name("Read", PRIORITY_INSIGNIFICANT);
56  set_minimum_memory(fs.memory_usage());
57  }
58 
59  void propagate() override {
60  if (options & STREAM_RESET) fs.seek(0);
61 
62  if (fs.is_open()) {
63  forward("items", fs.size() - fs.offset());
64  set_steps(fs.size() - fs.offset());
65  } else {
66  forward("items", 0);
67  }
68  }
69 
70  void go() override {
71  if (fs.is_open()) {
72  while (fs.can_read()) {
73  dest.push(fs.read());
74  step();
75  }
76  }
77  }
78 
79  void end() override {
80  if (options & STREAM_CLOSE) fs.close();
81  }
82 
83 private:
84  stream_options options;
86  dest_t dest;
87 };
88 
94 template <typename dest_t>
95 class named_input_t : public node {
96 public:
97  typedef typename push_type<dest_t>::type item_type;
98 
99  named_input_t(dest_t dest, std::string path) : dest(std::move(dest)), path(path) {
100  add_push_destination(this->dest);
101  set_name("Read", PRIORITY_INSIGNIFICANT);
103  }
104 
105  void propagate() override {
106  fs.construct();
107  fs->open(path, access_read);
108  forward("items", fs->size());
109  set_steps(fs->size());
110  }
111 
112  void go() override {
113  while (fs->can_read()) {
114  dest.push(fs->read());
115  step();
116  }
117  fs.destruct();
118  }
119 private:
120  dest_t dest;
122  std::string path;
123 };
124 
125 
131 template <typename T>
132 class pull_input_t : public node {
133 public:
134  typedef T item_type;
135 
136  pull_input_t(file_stream<T> & fs, stream_options options) : options(options), fs(fs) {
137  set_name("Read", PRIORITY_INSIGNIFICANT);
138  set_minimum_memory(fs.memory_usage());
139  }
140 
141  void propagate() override {
142  if (options & STREAM_RESET) fs.seek(0);
143  forward("items", fs.size()-fs.offset());
144  set_steps(fs.size()-fs.offset());
145  }
146 
147  T pull() {
148  step();
149  return fs.read();
150  }
151 
152  bool can_pull() {
153  return fs.can_read();
154  }
155 
156  void end() override {
157  if (options & STREAM_CLOSE) fs.close();
158  }
159 
160 private:
161  stream_options options;
162  file_stream<T> & fs;
163 };
164 
170 template <typename T>
171 class pull_reverse_input_t : public node {
172 public:
173  typedef T item_type;
174 
175  pull_reverse_input_t(file_stream<T> & fs, stream_options options) : options(options), fs(fs) {
176  set_name("Read", PRIORITY_INSIGNIFICANT);
177  set_minimum_memory(fs.memory_usage());
178  }
179 
180  void propagate() override {
181  if (options & STREAM_RESET) fs.seek(0, file_stream<T>::end);
182  forward("items", fs.offset());
183  set_steps(fs.offset());
184  }
185 
186  T pull() {
187  step();
188  return fs.read_back();
189  }
190 
191  bool can_pull() {
192  return fs.can_read_back();
193  }
194 
195  void end() override {
196  if (options & STREAM_CLOSE) fs.close();
197  }
198 
199 private:
200  stream_options options;
201  file_stream<T> & fs;
202 };
203 
209 template <typename T>
210 class named_pull_input_t : public node {
211 public:
212  typedef T item_type;
213 
214  named_pull_input_t(std::string path): path(std::move(path)) {
215  set_name("Read", PRIORITY_INSIGNIFICANT);
217  }
218 
219  void propagate() override {
220  fs.construct();
221  fs->open(path, access_read);
222  forward("items", fs->size());
223  set_steps(fs->size());
224  }
225 
226  T pull() {
227  step();
228  return fs->read();
229  }
230 
231  bool can_pull() {
232  return fs->can_read();
233  }
234 
235  void end() override {
236  fs->close();
237  fs.destruct();
238  }
239 private:
241  std::string path;
242 };
243 
244 
250 template <typename T>
251 class output_t : public node {
252 public:
253  typedef T item_type;
254 
255  output_t(file_stream<T> & fs) : fs(fs) {
256  set_name("Write", PRIORITY_INSIGNIFICANT);
257  set_minimum_memory(fs.memory_usage());
258  }
259 
260  void push(const T & item) {
261  fs.write(item);
262  }
263 private:
264  file_stream<T> & fs;
265 };
266 
272 template <typename T>
273 class named_output_t : public node {
274 public:
275  typedef T item_type;
276 
277  named_output_t(const std::string & path): path(path) {
278  set_name("Write", PRIORITY_INSIGNIFICANT);
280  }
281 
282  void begin() override {
283  fs.construct();
284  fs->open(path, access_write);
285  }
286 
287  void push(const T & item) {
288  fs->write(item);
289  }
290 
291  void end() override {
292  fs->close();
293  fs.destruct();
294  }
295 private:
297  std::string path;
298 };
299 
300 
306 template <typename source_t>
307 class pull_output_t : public node {
308 public:
309  typedef typename pull_type<source_t>::type item_type;
310 
311  pull_output_t(source_t source, file_stream<item_type> & fs) : source(std::move(source)), fs(fs) {
312  add_pull_source(this->source);
313  set_name("Write", PRIORITY_INSIGNIFICANT);
314  set_minimum_memory(fs.memory_usage());
315  }
316 
317  void go() override {
318  source.begin();
319  while (source.can_pull()) {
320  fs.write(source.pull());
321  }
322  source.end();
323  }
324 
325 private:
326  source_t source;
328 };
329 
330 template <typename dest_t, typename T>
331 class tee_t: public node {
332 public:
333  typedef T item_type;
334  tee_t(dest_t dest, file_stream<item_type> & fs): fs(fs), dest(std::move(dest)) {
335  set_minimum_memory(fs.memory_usage());
336  }
337 
338  void push(const item_type & i) {
339  fs.write(i);
340  dest.push(i);
341  }
342 private:
344  dest_t dest;
345 };
346 
347 template <typename source_t, typename T>
348 class pull_tee_t {
349 public:
350  typedef T item_type;
351  pull_tee_t(source_t source, file_stream<item_type> & fs): fs(fs), source(std::move(source)) {
352  set_minimum_memory(fs.memory_usage());
353  }
354 
355  bool can_pull() {
356  return source.can_pull();
357  }
358 
359  item_type pull() {
360  item_type i = source.pull();
361  fs.write(i);
362  return i;
363  }
364 private:
366  source_t source;
367 };
368 
369 } // namespace bits
370 
377 template<typename T>
379  stream_options options=stream_options()) {
380  return {fs, options};
381 }
382 
389 
395 template<typename T>
397  file_stream<T> & fs,
398  stream_options options=stream_options()) {
399  return {fs, options};
400 }
401 
408 template<typename T>
410  file_stream<T> & fs,
411  stream_options options=stream_options()) {
412  return {fs, options};
413 }
414 
419 template<typename T>
421  return {std::move(path)};
422 }
423 
424 
429 template <typename T>
431  return {fs};
432 }
433 
438 template <typename T>
439 inline pipe_end<termfactory<bits::named_output_t<T>, std::string> > named_output(std::string path) {
440  return {std::move(path)};
441 }
442 
443 
448 template<typename T>
450  return {fs};
451 }
452 
458 template <typename T>
460  return {fs};
461 }
462 
468 template <typename T>
470  return {fs};
471 }
472 
473 } // namespace tpie::pipelining
474 #endif
tpie::access_write
@ access_write
Open a file for writing only, content is truncated.
Definition: access_type.h:33
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::pipelining::pull_output
pullpipe_end< factory< bits::pull_output_t, file_stream< T > & > > pull_output(file_stream< T > &fs)
A pull-pipe node that writes the pulled items to a file stream.
Definition: file_stream.h:449
tpie::pipelining::node::add_pull_source
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
tpie::pipelining::pullpipe_begin
Definition: pipe_base.h:442
tpie::pipelining::pipe_begin
Definition: pipe_base.h:331
tpie::file_stream::read
const T & read()
Reads next item from stream if can_read() == true.
Definition: stream.h:425
tpie::pipelining::bits::input_t::end
void end() override
End pipeline processing phase.
Definition: file_stream.h:79
tpie::pipelining::bits::tee_t
Definition: file_stream.h:331
tpie::pipelining::bits::named_input_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: file_stream.h:112
tpie::pipelining::bits::pull_input_t::propagate
void propagate() override
Propagate stream metadata.
Definition: file_stream.h:141
tpie::pipelining::pipe_end
Definition: pipe_base.h:212
tpie::pipelining::bits::pull_output_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: file_stream.h:317
tpie::pipelining::bits::named_pull_input_t
Definition: file_stream.h:210
tpie::file_stream< item_type >
tpie::pipelining::bits::pull_tee_t
Definition: file_stream.h:348
tpie::pipelining::node::set_minimum_memory
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:206
tpie::pipelining::push_type
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:158
tpie::pipelining::pull_type
Definition: node_traits.h:165
maybe.h
tpie::pipelining::bits::named_input_t
Definition: file_stream.h:95
tpie::pipelining::tee
pipe_middle< tfactory< bits::tee_t, Args< typename T::item_type >, T & > > tee(T &fs)
A pipelining node that writes the pushed to a file stream and then pushes the items to the next node.
Definition: file_stream.h:459
tpie::pipelining::node::forward
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:563
tpie::pipelining::node::add_push_destination
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
tpie::pipelining::bits::input_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: file_stream.h:70
tpie::pipelining::bits::named_input_t::propagate
void propagate() override
Propagate stream metadata.
Definition: file_stream.h:105
tpie::pipelining::node::set_name
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
tpie::pipelining::bits::named_pull_input_t::end
void end() override
End pipeline processing phase.
Definition: file_stream.h:235
tpie::pipelining::bits::named_output_t::end
void end() override
End pipeline processing phase.
Definition: file_stream.h:291
tpie::pipelining::pull_tee
pullpipe_middle< tfactory< bits::pull_tee_t, Args< typename T::item_type >, T & > > pull_tee(T &fs)
A pull-pipe node that when pulled from will pull from its source, write its item to disk and then ret...
Definition: file_stream.h:469
tpie::pipelining::bits::input_t
Definition: file_stream.h:49
tpie::pipelining::pullpipe_end
Definition: pipe_base.h:382
tpie::pipelining::pipe_middle
Definition: pipe_base.h:243
tpie::pipelining::named_input
pipe_begin< factory< bits::named_input_t, std::string > > named_input
Pipelining nodes that pushes the contents of the named file stream to the next node in the pipeline.
Definition: file_stream.h:388
tpie::flags< stream_option >
tpie::pipelining::bits::pull_reverse_input_t::end
void end() override
End pipeline processing phase.
Definition: file_stream.h:195
tpie::pipelining::bits::named_output_t
Definition: file_stream.h:273
tpie::pipelining::bits::input_t::propagate
void propagate() override
Propagate stream metadata.
Definition: file_stream.h:59
tpie::pipelining::node::step
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:653
tpie::pipelining::node::set_steps
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
tpie::pipelining::pullpipe_middle
Definition: pipe_base.h:404
tpie::pipelining::bits::pull_output_t
Definition: file_stream.h:307
tpie::pipelining::bits::output_t
Definition: file_stream.h:251
tpie::pipelining::pull_input
pullpipe_begin< termfactory< bits::pull_input_t< T >, file_stream< T > &, stream_options > > pull_input(file_stream< T > &fs, stream_options options=stream_options())
A pipelining pull-node that reads items from the given file_stream.
Definition: file_stream.h:396
tpie::pipelining::bits::pull_input_t
Definition: file_stream.h:132
tpie::pipelining::input
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline.
Definition: file_stream.h:378
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::pipelining::pull_reverse_input
pullpipe_begin< termfactory< bits::pull_reverse_input_t< T >, file_stream< T > &, stream_options > > pull_reverse_input(file_stream< T > &fs, stream_options options=stream_options())
A pipelining pull-node that reads items in reverse order from the given file_stream.
Definition: file_stream.h:409
tpie::pipelining::bits::pull_input_t::end
void end() override
End pipeline processing phase.
Definition: file_stream.h:156
tpie::pipelining::bits::pull_reverse_input_t
Definition: file_stream.h:171
tpie::pipelining::named_pull_input
pullpipe_begin< termfactory< bits::named_pull_input_t< T >, std::string > > named_pull_input(std::string path)
A pipelining pull-node that reads items from the given file path.
Definition: file_stream.h:420
tpie::pipelining::bits::pull_reverse_input_t::propagate
void propagate() override
Propagate stream metadata.
Definition: file_stream.h:180
tpie::pipelining::named_output
pipe_end< termfactory< bits::named_output_t< T >, std::string > > named_output(std::string path)
A pipelining node that writes the pushed items to a named file stream.
Definition: file_stream.h:439
tpie::pipelining::bits::named_output_t::begin
void begin() override
Begin pipeline processing phase.
Definition: file_stream.h:282
tpie::pipelining::output
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
Definition: file_stream.h:430
tpie::pipelining::bits::named_pull_input_t::propagate
void propagate() override
Propagate stream metadata.
Definition: file_stream.h:219
tpie::maybe
Definition: maybe.h:34
tpie::access_read
@ access_read
Open a file for reading.
Definition: access_type.h:31