TPIE

11a2c2d
merge_sorter.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_MERGE_SORTER_H__
21 #define __TPIE_PIPELINING_MERGE_SORTER_H__
22 
23 #include <tpie/tpie_export.h>
24 #include <tpie/compressed/stream.h>
25 #include <tpie/pipelining/sort_parameters.h>
26 #include <tpie/pipelining/merger.h>
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/exception.h>
29 #include <tpie/dummy_progress.h>
30 #include <tpie/array_view.h>
31 #include <tpie/parallel_sort.h>
32 
33 namespace tpie {
34 
35 namespace bits {
36 
62 class TPIE_EXPORT run_positions {
63 public:
64  run_positions();
65  ~run_positions();
66 
70  static memory_size_type memory_usage() noexcept;
71 
75  void open();
76 
80  void close();
81 
85  void evacuate();
86 
90  void unevacuate();
91 
95  void next_level();
96 
100  void final_level(memory_size_type fanout);
101 
105  void set_position(memory_size_type mergeLevel, memory_size_type runNumber, stream_position pos);
106 
110  stream_position get_position(memory_size_type mergeLevel, memory_size_type runNumber);
111 
112 private:
114  bool m_open;
116  bool m_evacuated;
118  bool m_final;
119 
121  memory_size_type m_levels;
122 
123  memory_size_type m_runs[2];
124  temp_file m_positionsFile[2];
125  stream_position m_positionsPosition[2];
126  file_stream<stream_position> m_positions[2];
127 
129  array<stream_position> m_finalPositions;
131  bool m_finalExtraSet;
133  stream_position m_finalExtra;
134 };
135 
136 } // namespace bits
137 
138 class TPIE_EXPORT merge_sorter_base {
139 public:
141  linear_memory_usage fanout_memory_usage,
142  memory_size_type item_size,
143  memory_size_type element_file_stream_memory_usage);
144 
145  static const memory_size_type defaultFiles = 253; // Default number of files available, when not using set_available_files
146  static const memory_size_type minimumFilesPhase1 = 1;
147  static const memory_size_type maximumFilesPhase1 = 1;
148  static const memory_size_type minimumFilesPhase2 = 5;
149  static const memory_size_type maximumFilesPhase2 = std::numeric_limits<memory_size_type>::max();
150  static const memory_size_type minimumFilesPhase3 = 5;
151  static const memory_size_type maximumFilesPhase3 = std::numeric_limits<memory_size_type>::max();
152 
157  void set_parameters(memory_size_type runLength, memory_size_type fanout);
158 
163  void set_available_files(memory_size_type f) {
164  p.filesPhase1 = p.filesPhase2 = p.filesPhase3 = f;
165  check_not_started();
166  }
167 
174  void set_available_files(memory_size_type f1, memory_size_type f2, memory_size_type f3) {
175  p.filesPhase1 = f1;
176  p.filesPhase2 = f2;
177  p.filesPhase3 = f3;
178  check_not_started();
179  }
180 
185  void set_available_memory(memory_size_type m) {
186  p.memoryPhase1 = p.memoryPhase2 = p.memoryPhase3 = m;
187  check_not_started();
188  }
189 
196  void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3) {
197  p.memoryPhase1 = m1;
198  p.memoryPhase2 = m2;
199  p.memoryPhase3 = m3;
200  check_not_started();
201  }
202 
203  stream_size_type item_count() {
204  return m_itemCount;
205  }
206 
207 
208  memory_size_type evacuated_memory_usage() const {
209  return 2*p.fanout*sizeof(temp_file);
210  }
211 
212 
221  void set_items(stream_size_type n);
222 
223  void set_owner(tpie::pipelining::node * n);
224 
225  void set_phase_1_files(memory_size_type f1) {
226  p.filesPhase1 = f1;
227  check_not_started();
228  }
229 
230  void set_phase_2_files(memory_size_type f2) {
231  p.filesPhase2 = f2;
232  check_not_started();
233  }
234 
235  void set_phase_3_files(memory_size_type f3) {
236  p.filesPhase3 = f3;
237  check_not_started();
238  }
239 
240  void set_phase_1_memory(memory_size_type m1) {
241  p.memoryPhase1 = m1;
242  check_not_started();
243  }
244 
245  void set_phase_2_memory(memory_size_type m2) {
246  p.memoryPhase2 = m2;
247  check_not_started();
248  }
249 
250  void set_phase_3_memory(memory_size_type m3) {
251  p.memoryPhase3 = m3;
252  check_not_started();
253  }
254 
255  bool is_calc_free() const {
256  tp_assert(m_state == stMerge, "Wrong phase");
257  return m_reportInternal || m_finishedRuns <= p.fanout;
258  }
259 
260 
261  memory_size_type minimum_memory_phase_1() noexcept {
262  // Our *absolute minimum* memory requirements are a single item and
263  // twice as many temp_files as the fanout.
264  // However, our fanout calculation does not take the memory available
265  // in this phase (run formation) into account.
266  // Thus, we assume the largest fanout, meaning we might overshoot.
267  // If we do overshoot, we will just spend the extra bytes on a run length
268  // longer than 1, which is probably what the user wants anyway.
269  sort_parameters tmp_p((sort_parameters()));
270  tmp_p.runLength = 1;
271  tmp_p.fanout = calculate_fanout(std::numeric_limits<memory_size_type>::max(), 0);
272  return phase_1_memory(tmp_p);
273  }
274 
275  memory_size_type minimum_memory_phase_2() noexcept {
276  return m_fanout_memory_usage(calculate_fanout(0, 0));
277  }
278 
279  memory_size_type minimum_memory_phase_3() noexcept {
280  return m_fanout_memory_usage(calculate_fanout(0, 0));
281  }
282 
283  memory_size_type maximum_memory_phase_3() noexcept {
284  return std::numeric_limits<memory_size_type>::max();
285  }
286 
287  memory_size_type phase_1_memory(const sort_parameters & params) noexcept {
288  return params.runLength * m_item_size
290  + m_element_file_stream_memory_usage
291  + 2*params.fanout*sizeof(temp_file);
292  }
293 
294  memory_size_type phase_2_memory(const sort_parameters & params) noexcept {
295  return m_fanout_memory_usage(params.fanout);
296  }
297 
298  memory_size_type phase_3_memory(const sort_parameters & params) noexcept {
299  return m_fanout_memory_usage(params.finalFanout);
300  }
301 
305  memory_size_type calculate_fanout(memory_size_type availableMemory, memory_size_type availableFiles) noexcept;
306 
307 protected:
308 
312  void calculate_parameters();
313 
314  // Checks if we should still be able to change parameters
315  void check_not_started() {
316  if (m_state != stNotStarted) {
317  throw tpie::exception("Can't change parameters after merge sorting has started");
318  }
319  }
320 
324  static stream_size_type calculate_run_length(stream_size_type initialRunLength, memory_size_type fanout, memory_size_type mergeLevel) {
325  stream_size_type runLength = initialRunLength;
326  for (memory_size_type i = 0; i < mergeLevel; ++i) {
327  runLength *= fanout;
328  }
329  return runLength;
330  }
331 
332  enum state_type {
333  stNotStarted,
334  stRunFormation,
335  stMerge,
336  stReport
337  };
338 
339  const linear_memory_usage m_fanout_memory_usage;
340  const memory_size_type m_item_size, m_element_file_stream_memory_usage;
341 
342  std::unique_ptr<memory_bucket> m_bucketPtr;
343  memory_bucket_ref m_bucket;
344 
345  array<temp_file> m_runFiles;
346 
347  state_type m_state;
348 
349  sort_parameters p;
350  bool m_parametersSet;
351 
352  bits::run_positions m_runPositions;
353 
354  // Number of runs already written to disk.
355  // On 32-bit systems, we could in principle support more than 2^32 finished runs,
356  // but keeping this as a memory_size_type is nicer when doing the actual merges.
357  stream_size_type m_finishedRuns;
358 
359  // Number of items in current run buffer.
360  // Used to index into m_currentRunItems, so memory_size_type.
361  memory_size_type m_currentRunItemCount;
362 
363  bool m_reportInternal;
364 
365  // When doing internal reporting: the number of items already reported
366  // Used in comparison with m_currentRunItemCount
367  memory_size_type m_itemsPulled;
368 
369  stream_size_type m_itemCount;
370 
371  stream_size_type m_maxItems;
372 
373  bool m_evacuated;
374  bool m_finalMergeInitialized;
375  memory_size_type m_finalMergeLevel;
376  memory_size_type m_finalRunCount;
377  memory_size_type m_finalMergeSpecialRunNumber;
378 
379  tpie::pipelining::node * m_owning_node;
380 };
381 
382 
395 template <typename T, bool UseProgress, typename pred_t = std::less<T>, typename store_t=default_store>
397 private:
398  typedef typename store_t::template element_type<T>::type TT;
399  typedef typename store_t::template specific<TT> specific_store_t;
400  typedef typename specific_store_t::outer_type outer_type; //Should be the same as T
401  typedef typename specific_store_t::store_type store_type;
402  typedef typename specific_store_t::element_type element_type; //Should be the same as TT
403  typedef outer_type item_type;
404  static const size_t item_size = specific_store_t::item_size;
405 public:
406 
407  typedef std::shared_ptr<merge_sorter> ptr;
409 
410  merge_sorter(pred_t pred = pred_t(), store_t store = store_t())
411  : merge_sorter_base(fanout_memory_usage(), specific_store_t::item_size, file_stream<element_type>::memory_usage())
412  , m_store(store.template get_specific<element_type>())
413  , m_merger(pred, m_store, m_bucket)
414  , m_currentRunItems(m_bucket)
415  , pred(pred)
416  {}
417 
418 
419 public:
423  void begin() {
424  tp_assert(m_state == stNotStarted, "Merge sorting already begun");
425  if (!m_parametersSet) calculate_parameters();
426  log_pipe_debug() << "Start forming input runs" << std::endl;
427  m_currentRunItems = array<store_type>(0, allocator<store_type>(m_bucket));
428  m_currentRunItems.resize((size_t)p.runLength);
429  m_runFiles.resize(p.fanout*2);
430  m_currentRunItemCount = 0;
431  m_finishedRuns = 0;
432  m_state = stRunFormation;
433  m_itemCount = 0;
434  }
435 
439  void push(item_type && item) {
440  tp_assert(m_state == stRunFormation, "Wrong phase");
441  if (m_currentRunItemCount >= p.runLength) {
442  sort_current_run();
443  empty_current_run();
444  }
445  m_currentRunItems[m_currentRunItemCount] = m_store.outer_to_store(std::move(item));
446  ++m_currentRunItemCount;
447  ++m_itemCount;
448  }
449 
450  void push(const item_type & item) {
451  tp_assert(m_state == stRunFormation, "Wrong phase");
452  if (m_currentRunItemCount >= p.runLength) {
453  sort_current_run();
454  empty_current_run();
455  }
456  m_currentRunItems[m_currentRunItemCount] = m_store.outer_to_store(item);
457  ++m_currentRunItemCount;
458  ++m_itemCount;
459  }
460 
464  void end() {
465  tp_assert(m_state == stRunFormation, "Wrong phase");
466  sort_current_run();
467 
468  if (m_itemCount == 0) {
469  tp_assert(m_currentRunItemCount == 0, "m_itemCount == 0, but m_currentRunItemCount != 0");
470  m_reportInternal = true;
471  m_itemsPulled = 0;
472  m_currentRunItems.resize(0);
473  log_debug() << "Got no items. Internal reporting mode." << std::endl;
474  } else if (m_finishedRuns == 0 && m_currentRunItems.size() <= p.internalReportThreshold) {
475  // Our current buffer fits within the memory requirements of phase 2.
476  m_reportInternal = true;
477  m_itemsPulled = 0;
478  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode." << std::endl;
479 
480  } else if (m_finishedRuns == 0
481  && m_currentRunItemCount <= p.internalReportThreshold
482  && array<store_type>::memory_usage(m_currentRunItemCount) <= get_memory_manager().available()) {
483  // Our current buffer does not fit within the memory requirements
484  // of phase 2, but we have enough temporary memory to copy and
485  // resize the buffer.
486 
487  array<store_type> currentRun(m_currentRunItemCount);
488  for (size_t i=0; i < m_currentRunItemCount; ++i)
489  currentRun[i] = std::move(m_currentRunItems[i]);
490  m_currentRunItems.swap(currentRun);
491 
492  m_reportInternal = true;
493  m_itemsPulled = 0;
494  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode "
495  << "after resizing item buffer." << std::endl;
496 
497  } else {
498  m_reportInternal = false;
499  empty_current_run();
500  m_currentRunItems.resize(0);
501  log_debug() << "Got " << m_finishedRuns << " runs. External reporting mode." << std::endl;
502  }
503  m_state = stMerge;
504  }
505 
506 
511  void calc(typename Progress::base & pi) {
512  tp_assert(m_state == stMerge, "Wrong phase");
513  if (!m_reportInternal) {
514  prepare_pull(pi);
515  } else {
516  pi.init(1);
517  pi.step();
518  pi.done();
519  }
520  m_state = stReport;
521  }
522 
523  void evacuate() {
524  tp_assert(m_state == stMerge || m_state == stReport, "Wrong phase");
525  if (m_reportInternal) {
526  log_pipe_debug() << "Evacuate merge_sorter (" << this << ") in internal reporting mode" << std::endl;
527  m_reportInternal = false;
528  memory_size_type runCount = (m_currentRunItemCount > 0) ? 1 : 0;
529  empty_current_run();
530  m_currentRunItems.resize(0);
531  initialize_final_merger(0, runCount);
532  } else if (m_state == stMerge) {
533  log_pipe_debug() << "Evacuate merge_sorter (" << this << ") before merge in external reporting mode (noop)" << std::endl;
534  m_runPositions.evacuate();
535  return;
536  }
537  log_pipe_debug() << "Evacuate merge_sorter (" << this << ") before reporting in external reporting mode" << std::endl;
538  m_merger.reset();
539  m_evacuated = true;
540  m_runPositions.evacuate();
541  }
542 
543  void evacuate_before_merging() {
544  if (m_state == stMerge) evacuate();
545  }
546 
547  void evacuate_before_reporting() {
548  if (m_state == stReport && (!m_reportInternal || m_itemsPulled == 0)) evacuate();
549  }
550 
551 private:
553  // Phase 1 helpers.
555 
556  void sort_current_run() {
557  parallel_sort(m_currentRunItems.begin(), m_currentRunItems.begin()+m_currentRunItemCount,
558  bits::store_pred<pred_t, specific_store_t>(pred));
559  }
560 
561  // postcondition: m_currentRunItemCount = 0
562  void empty_current_run() {
563  if (m_finishedRuns < 10)
564  log_pipe_debug() << "Write " << m_currentRunItemCount << " items to run file " << m_finishedRuns << std::endl;
565  else if (m_finishedRuns == 10)
566  log_pipe_debug() << "..." << std::endl;
567  file_stream<element_type> fs;
568  open_run_file_write(fs, 0, m_finishedRuns);
569  for (memory_size_type i = 0; i < m_currentRunItemCount; ++i)
570  fs.write(m_store.store_to_element(std::move(m_currentRunItems[i])));
571  m_currentRunItemCount = 0;
572  ++m_finishedRuns;
573  }
574 
579  void initialize_merger(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount) {
580  // runCount is a memory_size_type since we must be able to have that
581  // many file_streams open at the same time.
582 
583  // Open files and seek to the first item in the run.
584  array<file_stream<element_type> > in(runCount);
585  for (memory_size_type i = 0; i < runCount; ++i) {
586  open_run_file_read(in[i], mergeLevel, runNumber+i);
587  }
588  stream_size_type runLength = calculate_run_length(p.runLength, p.fanout, mergeLevel);
589  // Pass file streams with correct stream offsets to the merger
590  m_merger.reset(in, runLength);
591  }
592 
596  void initialize_final_merger(memory_size_type finalMergeLevel, memory_size_type runCount) {
597  if (m_finalMergeInitialized) {
598  reinitialize_final_merger();
599  return;
600  }
601 
602  m_finalMergeInitialized = true;
603  m_finalMergeLevel = finalMergeLevel;
604  m_finalRunCount = runCount;
605  m_runPositions.next_level();
606  m_runPositions.final_level(p.fanout);
607  if (runCount > p.finalFanout) {
608  log_pipe_debug() << "Run count in final level (" << runCount << ") is greater than the final fanout (" << p.finalFanout << ")\n";
609 
610  memory_size_type i = p.finalFanout-1;
611  memory_size_type n = runCount-i;
612  log_pipe_debug() << "Merge " << n << " runs starting from #" << i << std::endl;
613  dummy_progress_indicator pi;
614  m_finalMergeSpecialRunNumber = merge_runs(finalMergeLevel, i, n, pi);
615  } else {
616  log_pipe_debug() << "Run count in final level (" << runCount << ") is less or equal to the final fanout (" << p.finalFanout << ")" << std::endl;
617  m_finalMergeSpecialRunNumber = std::numeric_limits<memory_size_type>::max();
618  }
619  reinitialize_final_merger();
620  }
621 
622 public:
623  void reinitialize_final_merger() {
624  tp_assert(m_finalMergeInitialized, "reinitialize_final_merger while !m_finalMergeInitialized");
625  m_runPositions.unevacuate();
626  if (m_finalMergeSpecialRunNumber != std::numeric_limits<memory_size_type>::max()) {
627  array<file_stream<element_type> > in(p.finalFanout);
628  for (memory_size_type i = 0; i < p.finalFanout-1; ++i) {
629  open_run_file_read(in[i], m_finalMergeLevel, i);
630  log_pipe_debug() << "Run " << i << " is at offset " << in[i].offset() << " and has size " << in[i].size() << std::endl;
631  }
632  open_run_file_read(in[p.finalFanout-1], m_finalMergeLevel+1, m_finalMergeSpecialRunNumber);
633  log_debug() << "Special large run is at offset " << in[p.finalFanout-1].offset() << " and has size " << in[p.finalFanout-1].size() << std::endl;
634  stream_size_type runLength = calculate_run_length(p.runLength, p.fanout, m_finalMergeLevel+1);
635  log_pipe_debug() << "Run length " << runLength << std::endl;
636  m_merger.reset(in, runLength);
637  } else {
638  initialize_merger(m_finalMergeLevel, 0, m_finalRunCount);
639  }
640  m_evacuated = false;
641  }
642 
643 private:
649  template <typename ProgressIndicator>
650  memory_size_type merge_runs(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount, ProgressIndicator & pi) {
651  initialize_merger(mergeLevel, runNumber, runCount);
652  file_stream<element_type> out;
653  memory_size_type nextRunNumber = runNumber/p.fanout;
654  open_run_file_write(out, mergeLevel+1, nextRunNumber);
655  while (m_merger.can_pull()) {
656  pi.step();
657  out.write(m_store.store_to_element(m_merger.pull()));
658  }
659  return nextRunNumber;
660  }
661 
665  void prepare_pull(typename Progress::base & pi) {
666  m_runPositions.unevacuate();
667 
668  // Compute merge depth (number of passes over data).
669  int treeHeight= static_cast<int>(ceil(log(static_cast<float>(m_finishedRuns)) /
670  log(static_cast<float>(p.fanout))));
671  pi.init(item_count()*treeHeight);
672 
673  memory_size_type mergeLevel = 0;
674  memory_size_type runCount = m_finishedRuns;
675  while (runCount > p.fanout) {
676  log_pipe_debug() << "Merge " << runCount << " runs in merge level " << mergeLevel << '\n';
677  m_runPositions.next_level();
678  memory_size_type newRunCount = 0;
679  for (memory_size_type i = 0; i < runCount; i += p.fanout) {
680  memory_size_type n = std::min(runCount-i, p.fanout);
681 
682  if (newRunCount < 10)
683  log_pipe_debug() << "Merge " << n << " runs starting from #" << i << std::endl;
684  else if (newRunCount == 10)
685  log_pipe_debug() << "..." << std::endl;
686 
687  merge_runs(mergeLevel, i, n, pi);
688  ++newRunCount;
689  }
690  ++mergeLevel;
691  runCount = newRunCount;
692  }
693  log_pipe_debug() << "Final merge level " << mergeLevel << " has " << runCount << " runs" << std::endl;
694  initialize_final_merger(mergeLevel, runCount);
695 
696  m_state = stReport;
697  pi.done();
698  }
699 
700 public:
705  bool can_pull() {
706  tp_assert(m_state == stReport, "Wrong phase");
707  if (m_reportInternal) return m_itemsPulled < m_currentRunItemCount;
708  else {
709  if (m_evacuated) reinitialize_final_merger();
710  return m_merger.can_pull();
711  }
712  }
713 
717  item_type pull() {
718  tp_assert(m_state == stReport, "Wrong phase");
719  if (m_reportInternal && m_itemsPulled < m_currentRunItemCount) {
720  store_type el = std::move(m_currentRunItems[m_itemsPulled++]);
721  if (!can_pull()) m_currentRunItems.resize(0);
722  return m_store.store_to_outer(std::move(el));
723  } else {
724  if (m_evacuated) reinitialize_final_merger();
725  m_runPositions.close();
726  return m_store.store_to_outer(m_merger.pull());
727  }
728  }
729 
730 
731  memory_size_type actual_memory_phase_3() {
732  tp_assert(m_state == stReport, "Wrong phase");
733  if (m_reportInternal)
734  return m_runFiles.memory_usage(m_runFiles.size())
735  + m_currentRunItems.memory_usage(m_currentRunItems.size());
736  else
737  return fanout_memory_usage(m_finalRunCount);
738  }
739 
740 private:
741 
745  static constexpr linear_memory_usage fanout_memory_usage() noexcept {
746  return merger<specific_store_t, pred_t>::memory_usage()
748  + file_stream<element_type>::memory_usage() // output stream
749  + 2*sizeof(temp_file); // merge_sorter::m_runFiles
750  }
751 
752  static constexpr memory_size_type fanout_memory_usage(memory_size_type fanout) noexcept {
753  return fanout_memory_usage()(fanout);
754  }
755 
756  private:
762  memory_size_type run_file_index(memory_size_type mergeLevel, memory_size_type runNumber) {
763  // runNumber is a memory_size_type since it is used as an index into
764  // m_runFiles.
765 
766  return (mergeLevel % 2)*p.fanout + (runNumber % p.fanout);
767  }
768 
772  void open_run_file_write(file_stream<element_type> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
773  // see run_file_index comment about runNumber
774 
775  memory_size_type idx = run_file_index(mergeLevel, runNumber);
776  if (runNumber < p.fanout) m_runFiles[idx].free();
777  fs.open(m_runFiles[idx], access_read_write, 0, access_sequential, compression_normal);
778  fs.seek(0, file_stream_base::end);
779  m_runPositions.set_position(mergeLevel, runNumber, fs.get_position());
780  }
781 
785  void open_run_file_read(file_stream<element_type> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
786  // see run_file_index comment about runNumber
787 
788  memory_size_type idx = run_file_index(mergeLevel, runNumber);
789  fs.open(m_runFiles[idx], access_read, 0, access_sequential, compression_normal);
790  fs.set_position(m_runPositions.get_position(mergeLevel, runNumber));
791  }
792 
793  specific_store_t m_store;
794  merger<specific_store_t, pred_t> m_merger;
795 
796  // current run buffer. size 0 before begin(), size runLength after begin().
797  array<store_type> m_currentRunItems;
798 
799  pred_t pred;
800 };
801 
802 } // namespace tpie
803 
804 #endif // __TPIE_PIPELINING_MERGE_SORTER_H__
array_view.h
tpie::progress_indicator_base
The base class for indicating the progress of some task.
Definition: progress_indicator_base.h:62
tpie::merge_sorter::begin
void begin()
Initiate phase 1: Formation of input runs.
Definition: merge_sorter.h:423
tpie::get_memory_manager
TPIE_EXPORT memory_manager & get_memory_manager()
Return a reference to the memory manager.
tpie::bits::run_positions::memory_usage
static memory_size_type memory_usage() noexcept
Memory usage when open and not evacuated.
tpie::linear_memory_usage
Definition: util.h:62
tpie::bits::run_positions::unevacuate
void unevacuate()
Switch from any state to the corresponding non-evacuated state.
tpie::parallel_sort
void parallel_sort(iterator_type a, iterator_type b, typename tpie::progress_types< Progress >::base &pi, comp_type comp=std::less< typename boost::iterator_value< iterator_type >::type >())
Sort items in the range [a,b) using a parallel quick sort.
Definition: parallel_sort.h:294
stream.h
tpie::open
Definition: stream.h:38
tpie::array
A generic array with a fixed size.
Definition: array.h:149
tpie::merge_sorter
Merge sorting consists of three phases.
Definition: merge_sorter.h:396
tpie::merge_sorter::end
void end()
End phase 1.
Definition: merge_sorter.h:464
tpie::array::begin
iterator begin()
Return an iterator to the beginning of the array.
Definition: array.h:312
tpie::merge_sorter::pull
item_type pull()
In phase 3, fetch next item in the final merge phase.
Definition: merge_sorter.h:717
tpie::file_stream
Compressed stream.
Definition: predeclare.h:49
tpie::sort_parameters::fanout
memory_size_type fanout
Fanout of merge tree during phase 2.
Definition: sort_parameters.h:51
tpie::merge_sorter_base::set_available_memory
void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3)
Calculate parameters from given memory amount.
Definition: merge_sorter.h:196
tpie::stream_position
POD object indicating the position of an item in a stream.
Definition: stream_position.h:72
tpie::merge_sorter_base::set_available_files
void set_available_files(memory_size_type f)
Calculate parameters from given amount of files.
Definition: merge_sorter.h:163
tpie::merge_sorter::calc
void calc(typename Progress::base &pi)
Perform phase 2: Performing all merges in the merge tree except the last one.
Definition: merge_sorter.h:511
tpie::sort_parameters::finalFanout
memory_size_type finalFanout
Fanout of merge tree during phase 3.
Definition: sort_parameters.h:53
parallel_sort.h
tpie::exception
Definition: exception.h:33
tpie::merge_sorter_base::set_available_files
void set_available_files(memory_size_type f1, memory_size_type f2, memory_size_type f3)
Calculate parameters from given amount of files.
Definition: merge_sorter.h:174
tpie::compression_normal
@ compression_normal
Compress some blocks according to available resources (time, memory).
Definition: scheme.h:40
tpie::merge_sorter_base::set_available_memory
void set_available_memory(memory_size_type m)
Calculate parameters from given memory amount.
Definition: merge_sorter.h:185
tp_assert
#define tp_assert(condition, message)
Definition: tpie_assert.h:65
tpie::log_pipe_debug
logstream & log_pipe_debug()
Return logstream for writing pipe_debug log messages.
Definition: tpie_log.h:178
tpie::bits::run_positions::final_level
void final_level(memory_size_type fanout)
Set this to be the final level in the merge heap - see class docstring.
tpie::merge_sorter::push
void push(item_type &&item)
Push item to merge sorter during phase 1.
Definition: merge_sorter.h:439
tpie::progress_indicator_base::step
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Definition: progress_indicator_base.h:91
tpie::bits::run_positions
Class to maintain the positions where sorted runs start.
Definition: merge_sorter.h:62
tpie::log_debug
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:168
tpie::bits::run_positions::set_position
void set_position(memory_size_type mergeLevel, memory_size_type runNumber, stream_position pos)
Store a stream position - see class docstring.
tpie::linear_memory_base::memory_usage
static constexpr memory_size_type memory_usage(memory_size_type size) noexcept
Return the number of bytes required to create a data structure supporting a given number of elements.
Definition: util.h:106
tpie::merge_sorter_base
Definition: merge_sorter.h:138
tpie::array::swap
void swap(array &other)
Swap two arrays.
Definition: array.h:504
tpie::sort_parameters::internalReportThreshold
memory_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
Definition: sort_parameters.h:49
tpie::progress_indicator_base::init
virtual void init(stream_size_type range=0)
Initialize progress indicator.
Definition: progress_indicator_base.h:121
tpie::access_sequential
@ access_sequential
Sequential access is intended.
Definition: cache_hint.h:36
dummy_progress.h
tpie::array::resize
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:490
tpie::sort_parameters::runLength
memory_size_type runLength
Run length, subject to memory restrictions during phase 2.
Definition: sort_parameters.h:46
tpie::bits::run_positions::close
void close()
Switch from any state to closed state.
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::bits::run_positions::next_level
void next_level()
Go to next level in the merge heap - see class docstring.
tpie::bits::run_positions::get_position
stream_position get_position(memory_size_type mergeLevel, memory_size_type runNumber)
Fetch a stream position - see class docstring.
tpie::merge_sorter::can_pull
bool can_pull()
In phase 3, return true if there are more items in the final merge phase.
Definition: merge_sorter.h:705
tpie::temp_file
Class representing a reference to a temporary file.
Definition: tempname.h:201
tpie::access_read_write
@ access_read_write
Open a file for reading or writing.
Definition: access_type.h:35
tpie::progress_indicator_base::done
virtual void done()
Advance the indicator to the end.
Definition: progress_indicator_base.h:130
tpie::progress_types
For applications where you wish to disable progress indicators via a template parameter,...
Definition: dummy_progress.h:127
tpie::merge_sorter_base::calculate_parameters
void calculate_parameters()
Calculate parameters from given memory amount.
tpie::bits::run_positions::evacuate
void evacuate()
Switch from any state to the corresponding evacuated state.
tpie::merge_sorter_base::calculate_run_length
static stream_size_type calculate_run_length(stream_size_type initialRunLength, memory_size_type fanout, memory_size_type mergeLevel)
initialize_merger helper.
Definition: merge_sorter.h:324
tpie::allocator< store_type >
tpie
Definition: access_type.h:26
tpie::array::size
size_type size() const
Return the size of the array.
Definition: array.h:531
tpie::access_read
@ access_read
Open a file for reading.
Definition: access_type.h:31