TPIE

11a2c2d
serialization.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
23 
24 #ifndef TPIE_PIPELINING_SERIALIZATION_H
25 #define TPIE_PIPELINING_SERIALIZATION_H
26 
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/factory_helpers.h>
29 #include <tpie/pipelining/pair_factory.h>
30 #include <tpie/pipelining/pipe_base.h>
32 
33 namespace tpie::pipelining {
34 namespace serialization_bits {
35 
36 template <typename dest_t>
37 class input_t : public node {
38  dest_t dest;
40 
41 public:
42  typedef typename push_type<dest_t>::type item_type;
43 
44  input_t(dest_t dest, serialization_reader * rd)
45  : dest(std::move(dest))
46  , rd(rd)
47  {
48  set_name("Serialization reader");
49  add_push_destination(this->dest);
50  set_minimum_memory(rd->memory_usage());
52  }
53 
54  void propagate() override {
55  set_steps(rd->size());
56  }
57 
58  void go() override {
59  item_type x;
60  stream_size_type bytesRead = 0;
61  while (rd->can_read()) {
62  rd->unserialize(x);
63  dest.push(x);
64 
65  stream_size_type bytesRead2 = rd->offset();
66  step(bytesRead2 - bytesRead);
67  bytesRead = bytesRead2;
68  }
69  }
70 };
71 
72 typedef factory<input_t, serialization_reader *> input_factory;
73 
74 
75 template <typename T>
76 class output_t : public node {
78 
79 public:
80  typedef T item_type;
81 
83  : wr(wr)
84  {
85  set_name("Serialization writer");
86  set_minimum_memory(wr->memory_usage());
88  }
89 
90  void push(const T & x) {
91  wr->serialize(x);
92  }
93 };
94 
95 template <typename T>
98 };
99 
100 } // namespace serialization_bits
101 
109 }
110 
115 template <typename T>
116 pipe_end<typename serialization_bits::output_factory<T>::type>
118  return typename serialization_bits::output_factory<T>::type(&wr);
119 }
120 
121 namespace serialization_bits {
122 
123 template <typename T>
124 class reverser_input_t : public node {
125 public:
126  typedef T item_type;
127 
128  reverser_input_t(const node_token & token,
129  std::shared_ptr<node> output=std::shared_ptr<node>())
130  : node(token), output(output)
131  , wr()
132  , items(0)
133  {
134  this->set_name("Serialization reverse writer");
135  //TODO memory
136  set_minimum_resource_usage(FILES, 1);
137  set_plot_options(PLOT_BUFFERED | PLOT_SIMPLIFIED_HIDE);
138  }
139 
140  void propagate() override {
141  file.construct();
142  forward<tpie::maybe<tpie::temp_file>*>("__srev_file", &file, 1);
143  }
144 
145  void begin() override {
146  wr.open(file->path());
147  }
148 
149  void push(const item_type & x) {
150  wr.serialize(x);
151  ++items;
152  }
153 
154  void end() override {
155  wr.close();
156  forward<stream_size_type>("items", items);
157  }
158 
159 private:
161  std::shared_ptr<node> output;
163  stream_size_type items;
164 };
165 
166 template <typename dest_t>
167 class reverser_output_t : public node {
168 public:
169  typedef typename push_type<dest_t>::type item_type;
170 
171  reverser_output_t(dest_t dest, const node_token & input_token)
172  : dest(std::move(dest))
173  {
174  set_name("Serialization reverse reader");
175  add_dependency(input_token);
176  add_push_destination(this->dest);
177  //TODO memory
178  set_minimum_resource_usage(FILES, 1);
179  set_plot_options(PLOT_BUFFERED);
180  }
181 
182  void propagate() override {
183  file = fetch<tpie::maybe<tpie::temp_file> *>("__srev_file");
184  if (!file->is_constructed())
185  throw tpie::exception("No one created my file");
186  rd.open((*file)->path());
187  this->set_steps(rd.size());
188  }
189 
190  void go() override {
191  item_type x;
192  stream_size_type bytesRead = 0;
193  while (rd.can_read()) {
194  rd.unserialize(x);
195  dest.push(x);
196 
197  stream_size_type bytesRead2 = rd.offset();
198  step(bytesRead2 - bytesRead);
199  bytesRead = bytesRead2;
200  }
201  }
202 
203  void end() override {
204  rd.close();
205  file->destruct();
206  }
207 private:
210  dest_t dest;
211 };
212 
213 template <typename T>
214 class reverser_pull_output_t : public node {
215 public:
216  typedef T item_type;
217 
218  reverser_pull_output_t(const node_token & input_token)
219  {
220  set_name("Serialization reverse reader");
221  add_dependency(input_token);
222  //TODO memory
223  set_minimum_resource_usage(FILES, 1);
224  set_plot_options(PLOT_BUFFERED);
225  }
226 
227  void propagate() override {
228  file = fetch<tpie::maybe<tpie::temp_file> *>("__srev_file");
229  if (!file->is_constructed())
230  throw tpie::exception("No one created my file");
231  rd.open((*file)->path());
232  this->set_steps(rd.size());
233  }
234 
235  bool can_pull() {
236  return rd.can_read();
237  }
238 
239  T pull() {
240  item_type x;
241  stream_size_type bytesRead = rd.offset();
242  rd.unserialize(x);
243  stream_size_type bytesRead2 = rd.offset();
244  step(bytesRead2 - bytesRead);
245  return x;
246  }
247 
248  void end() override {
249  rd.close();
250  file->destruct();
251  }
252 
253 private:
256 };
257 
258 template <typename T>
259 class buffer_input_t : public node {
260 public:
261  typedef T item_type;
262 
263  buffer_input_t(const node_token & token,
264  std::shared_ptr<node> output = std::shared_ptr<node>())
265  : node(token)
266  , output(output)
267  , wr()
268  , items(0) {
269  set_name("Serialization buffer writer");
270  //TODO memory
271  set_minimum_resource_usage(FILES, 1);
272  set_plot_options(PLOT_BUFFERED | PLOT_SIMPLIFIED_HIDE);
273  }
274 
275  void propagate() override {
276  file.construct();
277  forward<tpie::maybe<tpie::temp_file>*>("__sbuf_file", &file, 1);
278  }
279 
280  void begin() override {
281  wr.open(file->path());
282  }
283 
284  void push(const item_type & x) {
285  wr.serialize(x);
286  ++items;
287  }
288 
289  void end() override {
290  wr.close();
291  this->forward<stream_size_type>("items", items);
292  }
293 public:
294  std::shared_ptr<node> output;
297  stream_size_type items;
298 };
299 
300 
301 template <typename dest_t>
302 class buffer_output_t : public node {
303 public:
304  typedef typename push_type<dest_t>::type item_type;
305 
306  buffer_output_t(dest_t dest, const node_token & input_token)
307  : dest(std::move(dest))
308  {
309  add_dependency(input_token);
310  add_push_destination(this->dest);
311  //TODO MEMORY
312  set_minimum_resource_usage(FILES, 1);
313  set_name("Serialization buffer reader");
314  set_plot_options(PLOT_BUFFERED);
315  }
316 
317  void propagate() override {
318  file = fetch<tpie::maybe<tpie::temp_file> *>("__sbuf_file");
319  if (!file->is_constructed())
320  throw tpie::exception("No one created my file");
321 
322  rd.open((*file)->path());
323  set_steps(rd.size());
324  }
325 
326  void go() override {
327  item_type x;
328  stream_size_type bytesRead = 0;
329  while (rd.can_read()) {
330  rd.unserialize(x);
331  dest.push(x);
332 
333  stream_size_type bytesRead2 = rd.offset();
334  step(bytesRead2 - bytesRead);
335  bytesRead = bytesRead2;
336  }
337  }
338 
339  void end() override {
340  rd.close();
341  file->destruct();
342  }
343 private:
346  dest_t dest;
347 };
348 
349 template <typename T>
350 class buffer_pull_output_t: public node {
351 public:
352  typedef T item_type;
353 
354  buffer_pull_output_t(const node_token & input_token) {
355  add_dependency(input_token);
356  set_name("Fetching items", PRIORITY_SIGNIFICANT);
357  //TODO memory
358  set_minimum_resource_usage(FILES, 1);
359  set_plot_options(PLOT_BUFFERED);
360  }
361 
362  void propagate() override {
363  file = fetch<tpie::maybe<tpie::temp_file> *>("__sbuf_file");
364  if (!file->is_constructed())
365  throw tpie::exception("No one created my file");
366  rd.open((*file)->path());
367  set_steps(rd.size());
368  }
369 
370  bool can_pull() {
371  return rd.can_read();
372  }
373 
374  T pull() {
375  item_type x;
376  stream_size_type bytesRead = rd.offset();
377  rd.unserialize(x);
378  stream_size_type bytesRead2 = rd.offset();
379  step(bytesRead2 - bytesRead);
380  return x;
381  }
382 
383  void end() override {
384  rd.close();
385  file->destruct();
386  }
387 private:
390 };
391 
392 
393 } // namespace serialization_bits
394 
395 
400 template <typename T>
402 public:
403  typedef T item_type;
406 private:
411 public:
413 
414  input_t raw_input() {
415  return input_t(input_token);
416  }
417 
418  output_t raw_output() {
419  return output_t(input_token);
420  }
421 
426  return inputfact_t(input_token);
427  }
428 
433  return outputfact_t(input_token);
434  }
435 private:
436  node_token input_token;
437 
440 };
441 
442 
447 template <typename T>
449 public:
450  typedef T item_type;
453 private:
458 
459 public:
461 
462  input_t raw_input() {
463  return input_t(input_token);
464  }
465 
466  output_t raw_output() {
467  return output_t(input_token);
468  }
469 
470  inputpipe_t input() {
471  return inputfact_t(input_token);
472  }
473 
474  outputpipe_t output() {
475  return outputfact_t(input_token);
476  }
477 
478 private:
479  node_token input_token;
480 
483 };
484 
485 
491 
497 
498 } // namespace tpie::pipelining
499 
500 #endif // TPIE_PIPELINING_SERIALIZATION_H
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::pipelining::serialization_bits::output_factory
Definition: serialization.h:96
tpie::pipelining::serialization_bits::buffer_pull_output_t
Definition: serialization.h:350
tpie::pipelining::factory
Definition: factory_helpers.h:35
tpie::file
Central file abstraction.
Definition: file.h:39
tpie::pipelining::serialization_bits::buffer_output_t
Definition: serialization.h:302
tpie::pipelining::pullpipe_begin
Definition: pipe_base.h:442
tpie::pipelining::serialization_bits::reverser_output_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: serialization.h:190
tpie::pipelining::serialization_bits::buffer_output_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: serialization.h:326
tpie::pipelining::pipe_begin
Definition: pipe_base.h:331
tpie::pipelining::serialization_bits::reverser_pull_output_t::propagate
void propagate() override
Propagate stream metadata.
Definition: serialization.h:227
tpie::pipelining::serialization_bits::input_t::propagate
void propagate() override
Propagate stream metadata.
Definition: serialization.h:54
tpie::pipelining::serialization_bits::buffer_pull_output_t::propagate
void propagate() override
Propagate stream metadata.
Definition: serialization.h:362
tpie::pipelining::serialization_bits::buffer_input_t::propagate
void propagate() override
Propagate stream metadata.
Definition: serialization.h:275
tpie::pipelining::pipe_end
Definition: pipe_base.h:212
tpie::serialization_reverse_writer
Definition: serialization_stream.h:157
tpie::pipelining::serialization_bits::reverser_input_t::begin
void begin() override
Begin pipeline processing phase.
Definition: serialization.h:145
tpie::pipelining::serialization_bits::output_t
Definition: serialization.h:76
tpie::pipelining::passive_serialization_reverser::output
outputpipe_t output()
Returns a termfactory for the output nodes.
Definition: serialization.h:432
serialization_stream.h
tpie::pipelining::passive_serialization_reverser::input
inputpipe_t input()
Returns a termfactory for the input nodes.
Definition: serialization.h:425
tpie::bits::serialization_reader_base::unserialize
void unserialize(T &v)
Unserialize an unserializable item from the stream.
Definition: serialization_stream.h:321
tpie::pipelining::serialization_bits::buffer_input_t::begin
void begin() override
Begin pipeline processing phase.
Definition: serialization.h:280
tpie::pipelining::node::set_minimum_memory
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:206
tpie::pipelining::push_type
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:158
tpie::serialization_reverse_reader
Definition: serialization_stream.h:385
tpie::pipelining::serialization_bits::buffer_input_t::end
void end() override
End pipeline processing phase.
Definition: serialization.h:289
tpie::pipelining::node::set_plot_options
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:458
tpie::exception
Definition: exception.h:33
tpie::pipelining::serialization_bits::reverser_input_t::end
void end() override
End pipeline processing phase.
Definition: serialization.h:154
tpie::pipelining::node::add_push_destination
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
tpie::pipelining::serialization_bits::input_t
Definition: serialization.h:37
tpie::pipelining::serialization_bits::reverser_pull_output_t::end
void end() override
End pipeline processing phase.
Definition: serialization.h:248
tpie::pipelining::node::set_name
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
tpie::serialization_writer::serialize
void serialize(const T &v)
Serialize a serializable item and write it to the stream.
Definition: serialization_stream.h:136
tpie::serialization_reader::offset
stream_size_type offset()
Number of bytes read, not including the header.
tpie::pipelining::serialization_bits::reverser_pull_output_t
Definition: serialization.h:214
tpie::pipelining::pipe_middle
Definition: pipe_base.h:243
tpie::pipelining::serialization_bits::buffer_pull_output_t::end
void end() override
End pipeline processing phase.
Definition: serialization.h:383
tpie::pipelining::passive_serialization_buffer
Serialization stream buffer.
Definition: serialization.h:448
tpie::pipelining::serialization_output
pipe_end< typename serialization_bits::output_factory< T >::type > serialization_output(serialization_writer &wr)
A pipelining node that writes item to a serialization_writer.
Definition: serialization.h:117
tpie::pipelining::serialization_bits::reverser_output_t::end
void end() override
End pipeline processing phase.
Definition: serialization.h:203
tpie::pipelining::serialization_bits::reverser_output_t
Definition: serialization.h:167
tpie::pipelining::termfactory
Definition: factory_helpers.h:78
tpie::file_base_crtp::path
const std::string & path() const
The path of the file opened or the empty string.
Definition: file_base_crtp.h:195
tpie::pipelining::node::set_minimum_resource_usage
void set_minimum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare minimum resource requirements.
tpie::pipelining::serialization_bits::reverser_input_t
Definition: serialization.h:124
tpie::serialization_reverse_writer::serialize
void serialize(const T &v)
Serialize a serializable item and write it to the stream.
Definition: serialization_stream.h:230
tpie::pipelining::serialization_bits::buffer_input_t
Definition: serialization.h:259
tpie::pipelining::node::step
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:653
tpie::pipelining::node::set_steps
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
tpie::pipelining::serialization_bits::reverser_input_t::propagate
void propagate() override
Propagate stream metadata.
Definition: serialization.h:140
tpie::serialization_reader
Definition: serialization_stream.h:358
tpie::pipelining::node::add_dependency
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
tpie::pipelining::serialization_bits::buffer_output_t::end
void end() override
End pipeline processing phase.
Definition: serialization.h:339
tpie::pipelining::serialization_reverser
pipe_middle< split_factory< serialization_bits::reverser_input_t, node, serialization_bits::reverser_output_t > > serialization_reverser
A pipelining node that reverses serializable items and creates a phase boundary.
Definition: serialization.h:490
tpie::pipelining::serialization_bits::reverser_output_t::propagate
void propagate() override
Propagate stream metadata.
Definition: serialization.h:182
tpie::pipelining::passive_serialization_reverser
A passive serialization reverser stored in external memory.
Definition: serialization.h:401
tpie::bits::serialization_reader_base::size
stream_size_type size()
Size of file in bytes, not including the header.
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::pipelining::serialization_input
pipe_begin< serialization_bits::input_factory > serialization_input(serialization_reader &rd)
A pipelining node that reads items from a serialization_reader.
Definition: serialization.h:107
tpie::pipelining::serialization_bits::buffer_output_t::propagate
void propagate() override
Propagate stream metadata.
Definition: serialization.h:317
tpie::serialization_writer
Definition: serialization_stream.h:84
tpie::serialization_reverse_reader::offset
stream_size_type offset()
Number of bytes read, not including the header.
tpie::pipelining::node_token
Definition: tokens.h:292
tpie::pipelining::item_type
pipe_middle< tfactory< bits::item_type_t, Args< T > > > item_type()
Create item type defining identity pipe node.
Definition: helpers.h:654
tpie::pipelining::node::node
node()
Default constructor, using a new node_token.
tpie::pipelining::serialization_bits::input_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: serialization.h:58
tpie::pipelining::serialization_buffer
pipe_middle< split_factory< serialization_bits::buffer_input_t, node, serialization_bits::buffer_output_t > > serialization_buffer
A pipelining node that acts as a buffer for serializable items and creates a phase boundary.
Definition: serialization.h:496
tpie::maybe< tpie::temp_file >