TPIE

11a2c2d
buffer.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
23 
24 #ifndef __TPIE_PIPELINING_BUFFER_H__
25 #define __TPIE_PIPELINING_BUFFER_H__
26 
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/factory_helpers.h>
29 #include <tpie/pipelining/pipe_base.h>
30 #include <tpie/file_stream.h>
31 #include <tpie/maybe.h>
32 #include <memory>
33 
34 namespace tpie::pipelining {
35 
36 namespace bits {
37 
38 template <typename T>
39 class buffer_pull_output_t: public node {
40  tpie::maybe<file_stream<T> > * m_queue_ptr;
41  file_stream<T> * m_queue;
42 public:
43  typedef T item_type;
44 
45  buffer_pull_output_t(const node_token & input_token) {
46  add_dependency(input_token);
47  set_name("Fetching items", PRIORITY_SIGNIFICANT);
50  set_plot_options(PLOT_BUFFERED);
51  }
52 
53  void propagate() override {
54  m_queue_ptr = fetch<tpie::maybe<file_stream<T> > *>("queue");
55  m_queue = &**m_queue_ptr;
56  m_queue->seek(0);
57  forward("items", m_queue->size());
58  set_steps(m_queue->size());
59  }
60 
61  bool can_pull() const {
62  return m_queue->can_read();
63  }
64 
65  T pull() {
66  step();
67  return m_queue->read();
68  }
69 
70  virtual void end() override {
71  (*m_queue_ptr).destruct();
72  }
73 };
74 
78 
79 template <typename T>
80 class buffer_input_t: public node {
81 public:
82  typedef T item_type;
83 
84  buffer_input_t(const node_token & token, std::shared_ptr<node> output=std::shared_ptr<node>())
85  : node(token)
86  , m_output(output)
87  {
88  set_name("Storing items", PRIORITY_INSIGNIFICANT);
91  set_plot_options(PLOT_BUFFERED | PLOT_SIMPLIFIED_HIDE);
92  }
93 
94  void begin() override {
95  m_queue.construct();
96  m_queue->open(static_cast<memory_size_type>(0), access_sequential, compression_normal);
97  }
98 
99  void push(const T & item) {
100  m_queue->write(item);
101  }
102 
103  void end() override {
104  forward("queue", &m_queue, 1);
105  }
106 
107 private:
108  tpie::maybe< file_stream<T> > m_queue;
109  std::shared_ptr<node> m_output;
110 };
111 
115 template <typename dest_t>
116 class buffer_output_t: public node {
117 public:
118  typedef typename push_type<dest_t>::type item_type;
119 
120  buffer_output_t(dest_t dest, const node_token & input_token)
121  : dest(std::move(dest))
122  {
123  add_dependency(input_token);
124  add_push_destination(this->dest);
126  set_minimum_resource_usage(FILES, 1);
127  set_name("Buffer", PRIORITY_INSIGNIFICANT);
128  set_plot_options(PLOT_BUFFERED);
129  }
130 
131 
132  void propagate() override {
133  m_queue_ptr = fetch<tpie::maybe<file_stream<item_type> > *>("queue");
134  m_queue = &**m_queue_ptr;
135  forward("items", m_queue->size());
136  set_steps(m_queue->size());
137  }
138 
139  void go() override {
140  m_queue->seek(0);
141  while (m_queue->can_read()) {
142  dest.push(m_queue->read());
143  step();
144  }
145  }
146 
147  void end() override {
148  m_queue_ptr->destruct();
149  }
150 private:
151  tpie::maybe<file_stream<item_type> > * m_queue_ptr;
152  file_stream<item_type> * m_queue;
153  dest_t dest;
154 };
155 
156 } // namespace bits
157 
162 template <typename T>
164 public:
165  typedef T item_type;
168 private:
171 public:
174 
175 
176  passive_buffer() {}
177 
178  input_t raw_input() {
179  return input_t(input_token);
180  }
181 
182  output_t raw_output() {
183  return output_t(input_token);
184  }
185 
186  inputpipe_t input() {
187  return inputfact_t(input_token);
188  }
189 
190  outputpipe_t output() {
191  return outputfact_t(input_token);
192  }
193 
194 private:
195  node_token input_token;
196 
198  passive_buffer & operator=(const passive_buffer &);
199 };
200 
206 
207 } // namespace tpie::pipelining
208 
209 #endif // __TPIE_PIPELINING_BUFFER_H__
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::pipelining::bits::buffer_output_t::propagate
void propagate() override
Propagate stream metadata.
Definition: buffer.h:132
tpie::pipelining::bits::buffer_output_t
Output node for buffer.
Definition: buffer.h:116
tpie::pipelining::passive_buffer
Plain old file_stream buffer.
Definition: buffer.h:163
tpie::pipelining::pullpipe_begin
Definition: pipe_base.h:442
tpie::file_stream::read
const T & read()
Reads next item from stream if can_read() == true.
Definition: stream.h:425
tpie::pipelining::bits::buffer_output_t::end
void end() override
End pipeline processing phase.
Definition: buffer.h:147
tpie::pipelining::pipe_end
Definition: pipe_base.h:212
tpie::file_stream
Compressed stream.
Definition: predeclare.h:49
tpie::pipelining::bits::buffer_input_t::end
void end() override
End pipeline processing phase.
Definition: buffer.h:103
tpie::pipelining::node::set_minimum_memory
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:206
tpie::pipelining::push_type
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:158
maybe.h
tpie::pipelining::node::set_plot_options
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:458
tpie::pipelining::node::forward
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:563
tpie::pipelining::node::add_push_destination
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
tpie::compression_normal
@ compression_normal
Compress some blocks according to available resources (time, memory).
Definition: scheme.h:40
tpie::pipelining::buffer
pipe_middle< split_factory< bits::buffer_input_t, node, bits::buffer_output_t > > buffer
The buffer node inserts a phase boundary into the pipeline by writing items to disk.
Definition: buffer.h:205
tpie::pipelining::bits::buffer_pull_output_t
Definition: buffer.h:39
tpie::pipelining::node::set_name
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
tpie::pipelining::pipe_middle
Definition: pipe_base.h:243
tpie::pipelining::termfactory
Definition: factory_helpers.h:78
tpie::pipelining::node::set_minimum_resource_usage
void set_minimum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare minimum resource requirements.
tpie::pipelining::node::step
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:653
tpie::pipelining::node::set_steps
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
tpie::access_sequential
@ access_sequential
Sequential access is intended.
Definition: cache_hint.h:36
tpie::pipelining::node::add_dependency
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
tpie::pipelining::bits::buffer_input_t::begin
void begin() override
Begin pipeline processing phase.
Definition: buffer.h:94
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::maybe::destruct
void destruct()
Invokes the deconstructor on the object contained.
Definition: maybe.h:136
tpie::pipelining::bits::buffer_pull_output_t::propagate
void propagate() override
Propagate stream metadata.
Definition: buffer.h:53
tpie::pipelining::node_token
Definition: tokens.h:292
tpie::pipelining::output
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
Definition: file_stream.h:430
tpie::pipelining::bits::buffer_input_t
Input node for buffer.
Definition: buffer.h:80
tpie::pipelining::node::node
node()
Default constructor, using a new node_token.
tpie::pipelining::bits::buffer_output_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: buffer.h:139
tpie::maybe
Definition: maybe.h:34
tpie::pipelining::bits::buffer_pull_output_t::end
virtual void end() override
End pipeline processing phase.
Definition: buffer.h:70