TPIE

11a2c2d
serialization_sort.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_PIPELINING_SERIALIZATION_SORT_H
21 #define TPIE_PIPELINING_SERIALIZATION_SORT_H
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/pipe_base.h>
25 #include <tpie/pipelining/factory_base.h>
26 #include <tpie/serialization_sorter.h>
27 
28 namespace tpie::pipelining {
29 
30 namespace serialization_bits {
31 
32 template <typename T, typename pred_t>
34 public:
35  typedef T item_type;
36  typedef pred_t pred_type;
38  typedef std::shared_ptr<sorter_t> sorterptr;
39 };
40 
41 template <typename Traits>
43 
44 template <typename Traits>
46 
47 template <typename Traits>
48 class sort_output_base : public node {
49  typedef typename Traits::pred_type pred_type;
50 public:
52  typedef typename Traits::item_type item_type;
54  typedef typename Traits::sorter_t sorter_t;
56  typedef typename Traits::sorterptr sorterptr;
57 
58  sorterptr get_sorter() const {
59  return m_sorter;
60  }
61 
62  void set_calc_node(node & calc) {
64  }
65 
66  void propagate() override {
67  set_steps(m_sorter->item_count());
68  forward("items", static_cast<stream_size_type>(m_sorter->item_count()));
69  memory_size_type memory_usage = m_sorter->actual_memory_phase_3();
70  set_minimum_memory(memory_usage);
71  set_maximum_memory(memory_usage);
73  m_propagate_called = true;
74  }
75 
76  void begin() override {
77  m_sorter->set_owner(this);
78  }
79 
80  void end() override {
81  m_sorter->set_owner(nullptr);
82  m_sorter.reset();
83  }
84 
85  void add_calc_dependency(node_token tkn) {
87  }
88 
89 protected:
90  void set_available_memory(memory_size_type availableMemory) override {
91  if (!m_propagate_called)
92  m_sorter->set_phase_3_memory(availableMemory);
93  }
94 
95  sort_output_base(pred_type pred)
96  : m_sorter(new sorter_t(sizeof(item_type), pred))
97  , m_propagate_called(false)
98  {
99  }
100 
101  sort_output_base(sorterptr p)
102  : m_sorter(p)
103  , m_propagate_called(false)
104  {
105  }
106 
107  sorterptr m_sorter;
108  bool m_propagate_called;
109 };
110 
114 template <typename Traits>
115 class sort_pull_output_t : public sort_output_base<Traits> {
116 public:
117  typedef typename Traits::item_type item_type;
118  typedef typename Traits::pred_type pred_type;
119  typedef typename Traits::sorter_t sorter_t;
120  typedef typename Traits::sorterptr sorterptr;
121 
123  : sort_output_base<Traits>(sorter)
124  {
125  this->set_minimum_memory(sorter_t::minimum_memory_phase_3());
126  this->set_name("Write sorted output", PRIORITY_INSIGNIFICANT);
127  this->set_memory_fraction(1.0);
128  }
129 
130  bool can_pull() const {
131  return this->m_sorter->can_pull();
132  }
133 
134  item_type pull() {
135  this->step();
136  return this->m_sorter->pull();
137  }
138 
139  // Despite this go() implementation, a sort_pull_output_t CANNOT be used as
140  // an initiator node. Normally, it is a type error to have a phase without
141  // an initiator, but with a passive_sorter you can circumvent this
142  // mechanism. Thus we customize the error message printed (but throw the
143  // same type of exception.)
144  void go() override {
145  log_warning() << "Passive sorter used without an initiator in the final merge and output phase.\n"
146  << "Define an initiator and pair it up with the pipe from passive_sorter::output()." << std::endl;
147  throw not_initiator_node();
148  }
149 };
150 
154 template <typename Traits, typename dest_t>
155 class sort_output_t : public sort_output_base<Traits> {
156  typedef typename Traits::pred_type pred_type;
157 public:
158  typedef typename Traits::item_type item_type;
160  typedef typename Traits::sorter_t sorter_t;
161  typedef typename Traits::sorterptr sorterptr;
162 
163  sort_output_t(dest_t dest, pred_type pred)
164  : p_t(pred)
165  , dest(std::move(dest))
166  {
167  this->add_push_destination(dest);
168  this->set_minimum_memory(sorter_t::minimum_memory_phase_3());
169  this->set_name("Write sorted output", PRIORITY_INSIGNIFICANT);
170  this->set_memory_fraction(1.0);
171  }
172 
173  void go() override {
174  while (this->m_sorter->can_pull()) {
175  dest.push(this->m_sorter->pull());
176  this->step();
177  }
178  }
179 private:
180  dest_t dest;
181 };
182 
188 template <typename Traits>
189 class sort_calc_t : public node {
190 public:
191  typedef typename Traits::item_type item_type;
192  typedef typename Traits::sorter_t sorter_t;
193  typedef typename Traits::sorterptr sorterptr;
194 
195  typedef sort_output_base<Traits> Output;
196 
197  sort_calc_t(sort_calc_t && other) = default;
198 
199  template <typename dest_t>
200  sort_calc_t(dest_t dest)
201  : dest(new dest_t(std::move(dest)))
202  {
203  m_sorter = this->dest->get_sorter();
204  this->dest->add_calc_dependency(this->get_token());
205  init();
206  }
207 
208  sort_calc_t(sorterptr sorter, node_token tkn)
209  : node(tkn), m_sorter(sorter)
210  {
211  init();
212  }
213 
214  void init() {
215  set_minimum_memory(sorter_t::minimum_memory_phase_2());
216  set_name("Perform merge heap", PRIORITY_SIGNIFICANT);
217  set_memory_fraction(1.0);
218  m_propagate_called = false;
219  }
220 
221  void propagate() override {
222  set_steps(1000);
223  m_propagate_called = true;
224  }
225 
226  void begin() override {
227  m_sorter->set_owner(this);
228  }
229 
230  bool is_go_free() const override {return m_sorter->is_merge_runs_free();}
231 
232  void go() override {
233  progress_indicator_base * pi = proxy_progress_indicator();
234  log_debug() << "TODO: Progress information during merging." << std::endl;
235  m_sorter->merge_runs();
236  pi->init(1);
237  pi->step();
238  pi->done();
239  }
240 
241  void end() override {
242  m_weak_sorter = m_sorter;
243  m_sorter.reset();
244  }
245 
246  bool can_evacuate() override {
247  return true;
248  }
249 
250  void evacuate() override {
251  auto p = m_weak_sorter.lock();
252  if (p) p->evacuate();
253  }
254 
255  sorterptr get_sorter() const {
256  return m_sorter;
257  }
258 
259  void set_input_node(node & input) {
260  add_memory_share_dependency(input);
261  }
262 
263 protected:
264  void set_available_memory(memory_size_type availableMemory) override {
265  if (!m_propagate_called)
266  m_sorter->set_phase_2_memory(availableMemory);
267  }
268 
269 private:
270  sorterptr m_sorter;
271  std::weak_ptr<typename sorterptr::element_type> m_weak_sorter;
272  bool m_propagate_called;
273  std::shared_ptr<Output> dest;
274 };
275 
281 template <typename Traits>
282 class sort_input_t : public node {
283  typedef typename Traits::pred_type pred_type;
284 public:
285  typedef typename Traits::item_type item_type;
286  typedef typename Traits::sorter_t sorter_t;
287  typedef typename Traits::sorterptr sorterptr;
288 
289  sort_input_t(sort_calc_t<Traits> dest)
290  : m_sorter(dest.get_sorter())
291  , dest(std::move(dest))
292  {
293  this->dest.set_input_node(*this);
294  set_minimum_memory(sorter_t::minimum_memory_phase_1());
295  set_name("Form input runs", PRIORITY_SIGNIFICANT);
296  set_memory_fraction(1.0);
297  m_propagate_called = false;
298  }
299 
300  void propagate() override {
301  m_propagate_called = true;
302  }
303 
304  void begin() override {
305  m_sorter->set_owner(this);
306  m_sorter->begin();
307  }
308 
309  void push(const item_type & item) {
310  m_sorter->push(item);
311  }
312 
313  void end() override {
314  m_sorter->end();
315  m_weak_sorter = m_sorter;
316  m_sorter.reset();
317  }
318 
319  bool can_evacuate() override {
320  return true;
321  }
322 
323  void evacuate() override {
324  auto p = m_weak_sorter.lock();
325  if (p) p->evacuate();
326  }
327 
328 protected:
329  void set_available_memory(memory_size_type availableMemory) override {
330  if (!m_propagate_called)
331  m_sorter->set_phase_1_memory(availableMemory);
332  }
333 
334 private:
335  sorterptr m_sorter;
336  std::weak_ptr<typename sorterptr::element_type> m_weak_sorter;
337  sort_calc_t<Traits> dest;
338  bool m_propagate_called;
339 };
340 
344 template <typename pred_t>
345 class sort_factory : public factory_base {
346 public:
347  template <typename dest_t>
349 
350  template <typename dest_t>
351  constructed_type<dest_t> construct(dest_t dest) {
352  using item_type = typename push_type<dest_t>::type;
353  using Traits = sorter_traits<item_type, pred_t>;
354 
355  sort_output_t<Traits, dest_t> output(std::move(dest), m_pred);
356  this->init_sub_node(output);
357  sort_calc_t<Traits> calc(std::move(output));
358  this->init_sub_node(calc);
359  sort_input_t<Traits> input(std::move(calc));
360  this->init_sub_node(input);
361 
362  return input;
363  }
364 
365  sort_factory(const pred_t & p) : m_pred(p) {}
366 
367  pred_t m_pred;
368 };
369 
370 
371 } // namespace serialization_bits
372 
373 
377 template <typename pred_t=std::less<void>>
379 serialization_sort(const pred_t & p=std::less<void>()) {
381  return pipe_middle<fact>(fact(p)).name("Sort");
382 }
383 
384 template <typename T, typename pred_t=std::less<T> >
386 
387 namespace serialization_bits {
388 
392 template <typename Traits>
394 public:
396  typedef sort_calc_t<Traits> calc_t;
398  typedef input_t constructed_type;
399  typedef typename Traits::sorter_t sorter_t;
400  typedef typename Traits::sorterptr sorterptr;
401 
402  passive_sorter_factory_input(sorterptr sorter, node_token calc_token)
403  : m_sorter(sorter)
404  , m_calc_token(calc_token) {}
405 
406  constructed_type construct() {
407  calc_t calc(std::move(m_sorter), m_calc_token);
408  this->init_node(calc);
409  input_t input(std::move(calc));
410  this->init_node(input);
411  return input;
412  }
413 
414 private:
415  sorterptr m_sorter;
416  node_token m_calc_token;
417 };
418 
422 template <typename Traits>
424 public:
426  typedef typename Traits::sorterptr sorterptr;
427  typedef output_t constructed_type;
428 
429  passive_sorter_factory_output(sorterptr sorter, node_token calc_token)
430  : m_sorter(sorter)
431  , m_calc_token(calc_token)
432  {}
433 
434  constructed_type construct() {
435  constructed_type res(std::move(m_sorter));
436  res.add_calc_dependency(m_calc_token);
437  init_node(res);
438  return res;
439  }
440 
441 private:
442  sorterptr m_sorter;
443  node_token m_calc_token;
444 };
445 
446 } // namespace serialization_bits
447 
456 template <typename T, typename pred_t>
459 public:
461  typedef T item_type;
463  typedef typename Traits::sorter_t sorter_t;
465  typedef typename Traits::sorterptr sorterptr;
468 
471 
472  serialization_passive_sorter(pred_t pred = pred_t())
473  : m_sorter_input(new sorter_t(sizeof(T), pred))
474  , m_sorter_output(m_sorter_input)
475  {
476  }
477 
479  serialization_passive_sorter & operator=(const serialization_passive_sorter &) = delete;
482 
487  assert(m_sorter_input);
488  return input_pipe_t(std::move(m_sorter_input), m_calc_token);
489  }
490 
495  assert(m_sorter_output);
496  return output_pipe_t(std::move(m_sorter_output), m_calc_token);
497  }
498 
499 private:
500  sorterptr m_sorter_input;
501  sorterptr m_sorter_output;
502  node_token m_calc_token;
503 };
504 
505 } // namespace tpie::pipelining
506 
507 #endif // TPIE_PIPELINING_SERIALIZATION_SORT_H
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::pipelining::factory_base::init_sub_node
void init_sub_node(node &r)
Initialize node constructed in a subclass.
tpie::pipelining::node::add_memory_share_dependency
void add_memory_share_dependency(const node_token &dest)
Called by implementers to declare a node memory share dependency, that is, a requirement that another...
tpie::pipelining::factory_base::init_node
void init_node(node &r)
\Brief Initialize node constructed in a subclass.
tpie::pipelining::pullpipe_begin
Definition: pipe_base.h:442
tpie::pipelining::serialization_bits::sort_output_base::begin
void begin() override
Begin pipeline processing phase.
Definition: serialization_sort.h:76
tpie::pipelining::not_initiator_node
Definition: exception.h:34
tpie::serialization_sorter
Definition: serialization_sorter.h:501
tpie::pipelining::serialization_bits::sort_output_base
Definition: serialization_sort.h:48
tpie::pipelining::pipe_end
Definition: pipe_base.h:212
tpie::pipelining::serialization_passive_sorter::output
output_pipe_t output()
Get the output pull node.
Definition: serialization_sort.h:494
tpie::pipelining::serialization_passive_sorter
Pipelined sorter with push input and pull output.
Definition: serialization_sort.h:385
tpie::pipelining::serialization_bits::sort_pull_output_t
Pipe sorter pull output node.
Definition: serialization_sort.h:115
tpie::pipelining::serialization_bits::sort_output_base::end
void end() override
End pipeline processing phase.
Definition: serialization_sort.h:80
tpie::pipelining::node::set_minimum_memory
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:206
tpie::pipelining::serialization_bits::sort_output_base::propagate
void propagate() override
Propagate stream metadata.
Definition: serialization_sort.h:66
tpie::pipelining::push_type
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:158
tpie::pipelining::serialization_bits::sort_output_base::set_available_memory
void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
Definition: serialization_sort.h:90
tpie::pipelining::factory_base
Base class of all pipelining factories.
Definition: factory_base.h:73
tpie::pipelining::serialization_bits::sort_output_t
Pipe sorter push output node.
Definition: serialization_sort.h:155
tpie::pipelining::node::forward
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:563
tpie::pipelining::node::add_push_destination
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
tpie::pipelining::bits::pipe_base::name
child_t name(const std::string &n, priority_type p=PRIORITY_USER)
Set name for this node.
Definition: pipe_base.h:90
tpie::pipelining::serialization_passive_sorter::input
input_pipe_t input()
Get the input push node.
Definition: serialization_sort.h:486
tpie::pipelining::serialization_bits::sort_output_base::sorterptr
Traits::sorterptr sorterptr
Smart pointer to sorter_t.
Definition: serialization_sort.h:56
tpie::pipelining::node::set_name
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
tpie::pipelining::serialization_bits::sort_factory
Sort factory using the given predicate as comparator.
Definition: serialization_sort.h:345
tpie::log_warning
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:158
tpie::pipelining::node::set_maximum_memory
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
Definition: node.h:216
tpie::pipelining::serialization_bits::sort_input_t
Pipe sorter input node.
Definition: serialization_sort.h:45
tpie::pipelining::pipe_middle
Definition: pipe_base.h:243
tpie::log_debug
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:168
tpie::pipelining::serialization_passive_sorter::sorterptr
Traits::sorterptr sorterptr
Smart pointer to sorter_t.
Definition: serialization_sort.h:465
tpie::pipelining::serialization_bits::passive_sorter_factory_output
Factory for the passive sorter output node.
Definition: serialization_sort.h:423
tpie::pipelining::serialization_passive_sorter::output_t
serialization_bits::sort_pull_output_t< Traits > output_t
Type of pipe sorter output.
Definition: serialization_sort.h:467
tpie::pipelining::node::step
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:653
tpie::pipelining::node::set_steps
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
tpie::pipelining::serialization_passive_sorter::sorter_t
Traits::sorter_t sorter_t
Type of the merge sort implementation used.
Definition: serialization_sort.h:463
tpie::pipelining::node::set_memory_fraction
void set_memory_fraction(double f)
Set the memory priority of this node.
Definition: node.h:224
tpie::pipelining::serialization_bits::sort_output_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: serialization_sort.h:173
tpie::pipelining::serialization_passive_sorter::item_type
T item_type
Type of items sorted.
Definition: serialization_sort.h:461
tpie::pipelining::input
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline.
Definition: file_stream.h:378
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::pipelining::serialization_bits::sort_calc_t
Pipe sorter middle node.
Definition: serialization_sort.h:42
tpie::pipelining::serialization_bits::sort_output_base::sorter_t
Traits::sorter_t sorter_t
Type of the merge sort implementation used.
Definition: serialization_sort.h:54
tpie::pipelining::serialization_sort
pipe_middle< serialization_bits::sort_factory< pred_t > > serialization_sort(const pred_t &p=std::less< void >())
Pipelining sorter using the given predicate.
Definition: serialization_sort.h:379
tpie::pipelining::serialization_bits::sorter_traits
Definition: serialization_sort.h:33
tpie::pipelining::serialization_bits::sort_output_base::item_type
Traits::item_type item_type
Type of items sorted.
Definition: serialization_sort.h:52
tpie::pipelining::node_token
Definition: tokens.h:292
tpie::pipelining::serialization_bits::passive_sorter_factory_input
Factory for the passive sorter input node.
Definition: serialization_sort.h:393
tpie::pipelining::item_type
pipe_middle< tfactory< bits::item_type_t, Args< T > > > item_type()
Create item type defining identity pipe node.
Definition: helpers.h:654
tpie::pipelining::output
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
Definition: file_stream.h:430
tpie::pipelining::serialization_bits::sort_pull_output_t::go
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: serialization_sort.h:144