TPIE

11a2c2d
tpie::merge_sorter< T, UseProgress, pred_t, store_t > Class Template Reference

Merge sorting consists of three phases. More...

#include <tpie/pipelining/merge_sorter.h>

Inherits tpie::merge_sorter_base.

Public Types

typedef std::shared_ptr< merge_sorterptr
 
typedef progress_types< UseProgress > Progress
 

Public Member Functions

 merge_sorter (pred_t pred=pred_t(), store_t store=store_t())
 
void begin ()
 Initiate phase 1: Formation of input runs. More...
 
void push (item_type &&item)
 Push item to merge sorter during phase 1. More...
 
void push (const item_type &item)
 
void end ()
 End phase 1. More...
 
void calc (typename Progress::base &pi)
 Perform phase 2: Performing all merges in the merge tree except the last one. More...
 
void evacuate ()
 
void evacuate_before_merging ()
 
void evacuate_before_reporting ()
 
void reinitialize_final_merger ()
 
bool can_pull ()
 In phase 3, return true if there are more items in the final merge phase. More...
 
item_type pull ()
 In phase 3, fetch next item in the final merge phase. More...
 
memory_size_type actual_memory_phase_3 ()
 
void set_parameters (memory_size_type runLength, memory_size_type fanout)
 Enable setting run length and fanout manually (for testing purposes). More...
 
void set_available_files (memory_size_type f)
 Calculate parameters from given amount of files. More...
 
void set_available_files (memory_size_type f1, memory_size_type f2, memory_size_type f3)
 Calculate parameters from given amount of files. More...
 
void set_available_memory (memory_size_type m)
 Calculate parameters from given memory amount. More...
 
void set_available_memory (memory_size_type m1, memory_size_type m2, memory_size_type m3)
 Calculate parameters from given memory amount. More...
 
stream_size_type item_count ()
 
memory_size_type evacuated_memory_usage () const
 
void set_items (stream_size_type n)
 Set upper bound on number of items pushed. More...
 
void set_owner (tpie::pipelining::node *n)
 
void set_phase_1_files (memory_size_type f1)
 
void set_phase_2_files (memory_size_type f2)
 
void set_phase_3_files (memory_size_type f3)
 
void set_phase_1_memory (memory_size_type m1)
 
void set_phase_2_memory (memory_size_type m2)
 
void set_phase_3_memory (memory_size_type m3)
 
bool is_calc_free () const
 
memory_size_type minimum_memory_phase_1 () noexcept
 
memory_size_type minimum_memory_phase_2 () noexcept
 
memory_size_type minimum_memory_phase_3 () noexcept
 
memory_size_type maximum_memory_phase_3 () noexcept
 
memory_size_type phase_1_memory (const sort_parameters &params) noexcept
 
memory_size_type phase_2_memory (const sort_parameters &params) noexcept
 
memory_size_type phase_3_memory (const sort_parameters &params) noexcept
 
memory_size_type calculate_fanout (memory_size_type availableMemory, memory_size_type availableFiles) noexcept
 calculate_parameters helper More...
 

Static Public Attributes

static const memory_size_type defaultFiles = 253
 
static const memory_size_type minimumFilesPhase1 = 1
 
static const memory_size_type maximumFilesPhase1 = 1
 
static const memory_size_type minimumFilesPhase2 = 5
 
static const memory_size_type maximumFilesPhase2 = std::numeric_limits<memory_size_type>::max()
 
static const memory_size_type minimumFilesPhase3 = 5
 
static const memory_size_type maximumFilesPhase3 = std::numeric_limits<memory_size_type>::max()
 

Protected Types

enum  state_type { stNotStarted, stRunFormation, stMerge, stReport }
 

Protected Member Functions

void calculate_parameters ()
 Calculate parameters from given memory amount. More...
 
void check_not_started ()
 

Static Protected Member Functions

static stream_size_type calculate_run_length (stream_size_type initialRunLength, memory_size_type fanout, memory_size_type mergeLevel)
 initialize_merger helper. More...
 

Protected Attributes

const linear_memory_usage m_fanout_memory_usage
 
const memory_size_type m_item_size
 
const memory_size_type m_element_file_stream_memory_usage
 
std::unique_ptr< memory_bucketm_bucketPtr
 
memory_bucket_ref m_bucket
 
array< temp_filem_runFiles
 
state_type m_state
 
sort_parameters p
 
bool m_parametersSet
 
bits::run_positions m_runPositions
 
stream_size_type m_finishedRuns
 
memory_size_type m_currentRunItemCount
 
bool m_reportInternal
 
memory_size_type m_itemsPulled
 
stream_size_type m_itemCount
 
stream_size_type m_maxItems
 
bool m_evacuated
 
bool m_finalMergeInitialized
 
memory_size_type m_finalMergeLevel
 
memory_size_type m_finalRunCount
 
memory_size_type m_finalMergeSpecialRunNumber
 
tpie::pipelining::nodem_owning_node
 

Detailed Description

template<typename T, bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
class tpie::merge_sorter< T, UseProgress, pred_t, store_t >

Merge sorting consists of three phases.

  1. Sorting and forming runs
  2. Merging runs
  3. Final merge and report

If the number of elements received during phase 1 is less than the length of a single run, we are in "report internal" mode, meaning we do not write anything to disk. This causes phase 2 to be a no-op and phase 3 to be a simple array traversal.

Definition at line 396 of file merge_sorter.h.

Member Function Documentation

◆ begin()

template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::begin ( )
inline

Initiate phase 1: Formation of input runs.

Definition at line 423 of file merge_sorter.h.

423  {
424  tp_assert(m_state == stNotStarted, "Merge sorting already begun");
425  if (!m_parametersSet) calculate_parameters();
426  log_pipe_debug() << "Start forming input runs" << std::endl;
427  m_currentRunItems = array<store_type>(0, allocator<store_type>(m_bucket));
428  m_currentRunItems.resize((size_t)p.runLength);
429  m_runFiles.resize(p.fanout*2);
430  m_currentRunItemCount = 0;
431  m_finishedRuns = 0;
432  m_state = stRunFormation;
433  m_itemCount = 0;
434  }

References tpie::merge_sorter_base::calculate_parameters(), tpie::sort_parameters::fanout, tpie::log_pipe_debug(), tpie::array< T, Allocator >::resize(), tpie::sort_parameters::runLength, and tp_assert.

◆ calc()

template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::calc ( typename Progress::base pi)
inline

Perform phase 2: Performing all merges in the merge tree except the last one.

Definition at line 511 of file merge_sorter.h.

511  {
512  tp_assert(m_state == stMerge, "Wrong phase");
513  if (!m_reportInternal) {
514  prepare_pull(pi);
515  } else {
516  pi.init(1);
517  pi.step();
518  pi.done();
519  }
520  m_state = stReport;
521  }

References tpie::progress_indicator_base::done(), tpie::progress_indicator_base::init(), tpie::progress_indicator_base::step(), and tp_assert.

◆ calculate_fanout()

memory_size_type tpie::merge_sorter_base::calculate_fanout ( memory_size_type  availableMemory,
memory_size_type  availableFiles 
)
noexceptinherited

calculate_parameters helper

◆ calculate_parameters()

void tpie::merge_sorter_base::calculate_parameters ( )
protectedinherited

Calculate parameters from given memory amount.

Referenced by tpie::merge_sorter< T, UseProgress, pred_t, store_t >::begin().

◆ calculate_run_length()

static stream_size_type tpie::merge_sorter_base::calculate_run_length ( stream_size_type  initialRunLength,
memory_size_type  fanout,
memory_size_type  mergeLevel 
)
inlinestaticprotectedinherited

initialize_merger helper.

Definition at line 324 of file merge_sorter.h.

324  {
325  stream_size_type runLength = initialRunLength;
326  for (memory_size_type i = 0; i < mergeLevel; ++i) {
327  runLength *= fanout;
328  }
329  return runLength;
330  }

◆ can_pull()

template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
bool tpie::merge_sorter< T, UseProgress, pred_t, store_t >::can_pull ( )
inline

In phase 3, return true if there are more items in the final merge phase.

Definition at line 705 of file merge_sorter.h.

705  {
706  tp_assert(m_state == stReport, "Wrong phase");
707  if (m_reportInternal) return m_itemsPulled < m_currentRunItemCount;
708  else {
709  if (m_evacuated) reinitialize_final_merger();
710  return m_merger.can_pull();
711  }
712  }

References tp_assert.

Referenced by tpie::merge_sorter< T, UseProgress, pred_t, store_t >::pull().

◆ end()

template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::end ( )
inline

End phase 1.

Definition at line 464 of file merge_sorter.h.

464  {
465  tp_assert(m_state == stRunFormation, "Wrong phase");
466  sort_current_run();
467 
468  if (m_itemCount == 0) {
469  tp_assert(m_currentRunItemCount == 0, "m_itemCount == 0, but m_currentRunItemCount != 0");
470  m_reportInternal = true;
471  m_itemsPulled = 0;
472  m_currentRunItems.resize(0);
473  log_debug() << "Got no items. Internal reporting mode." << std::endl;
474  } else if (m_finishedRuns == 0 && m_currentRunItems.size() <= p.internalReportThreshold) {
475  // Our current buffer fits within the memory requirements of phase 2.
476  m_reportInternal = true;
477  m_itemsPulled = 0;
478  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode." << std::endl;
479 
480  } else if (m_finishedRuns == 0
481  && m_currentRunItemCount <= p.internalReportThreshold
482  && array<store_type>::memory_usage(m_currentRunItemCount) <= get_memory_manager().available()) {
483  // Our current buffer does not fit within the memory requirements
484  // of phase 2, but we have enough temporary memory to copy and
485  // resize the buffer.
486 
487  array<store_type> currentRun(m_currentRunItemCount);
488  for (size_t i=0; i < m_currentRunItemCount; ++i)
489  currentRun[i] = std::move(m_currentRunItems[i]);
490  m_currentRunItems.swap(currentRun);
491 
492  m_reportInternal = true;
493  m_itemsPulled = 0;
494  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode "
495  << "after resizing item buffer." << std::endl;
496 
497  } else {
498  m_reportInternal = false;
499  empty_current_run();
500  m_currentRunItems.resize(0);
501  log_debug() << "Got " << m_finishedRuns << " runs. External reporting mode." << std::endl;
502  }
503  m_state = stMerge;
504  }

References tpie::get_memory_manager(), tpie::sort_parameters::internalReportThreshold, tpie::log_debug(), tpie::array< T, Allocator >::resize(), tpie::array< T, Allocator >::size(), tpie::array< T, Allocator >::swap(), and tp_assert.

◆ pull()

template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
item_type tpie::merge_sorter< T, UseProgress, pred_t, store_t >::pull ( )
inline

In phase 3, fetch next item in the final merge phase.

Definition at line 717 of file merge_sorter.h.

717  {
718  tp_assert(m_state == stReport, "Wrong phase");
719  if (m_reportInternal && m_itemsPulled < m_currentRunItemCount) {
720  store_type el = std::move(m_currentRunItems[m_itemsPulled++]);
721  if (!can_pull()) m_currentRunItems.resize(0);
722  return m_store.store_to_outer(std::move(el));
723  } else {
724  if (m_evacuated) reinitialize_final_merger();
725  m_runPositions.close();
726  return m_store.store_to_outer(m_merger.pull());
727  }
728  }

References tpie::merge_sorter< T, UseProgress, pred_t, store_t >::can_pull(), tpie::bits::run_positions::close(), tpie::array< T, Allocator >::resize(), and tp_assert.

◆ push()

template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::push ( item_type &&  item)
inline

Push item to merge sorter during phase 1.

Definition at line 439 of file merge_sorter.h.

439  {
440  tp_assert(m_state == stRunFormation, "Wrong phase");
441  if (m_currentRunItemCount >= p.runLength) {
442  sort_current_run();
443  empty_current_run();
444  }
445  m_currentRunItems[m_currentRunItemCount] = m_store.outer_to_store(std::move(item));
446  ++m_currentRunItemCount;
447  ++m_itemCount;
448  }

References tpie::sort_parameters::runLength, and tp_assert.

◆ set_available_files() [1/2]

void tpie::merge_sorter_base::set_available_files ( memory_size_type  f)
inlineinherited

Calculate parameters from given amount of files.

Parameters
fFiles available for phase 1, 2 and 3

Definition at line 163 of file merge_sorter.h.

163  {
164  p.filesPhase1 = p.filesPhase2 = p.filesPhase3 = f;
165  check_not_started();
166  }

◆ set_available_files() [2/2]

void tpie::merge_sorter_base::set_available_files ( memory_size_type  f1,
memory_size_type  f2,
memory_size_type  f3 
)
inlineinherited

Calculate parameters from given amount of files.

Parameters
f1Files available for phase 1
f2Files available for phase 2
f3Files available for phase 3

Definition at line 174 of file merge_sorter.h.

174  {
175  p.filesPhase1 = f1;
176  p.filesPhase2 = f2;
177  p.filesPhase3 = f3;
178  check_not_started();
179  }

◆ set_available_memory() [1/2]

void tpie::merge_sorter_base::set_available_memory ( memory_size_type  m)
inlineinherited

Calculate parameters from given memory amount.

Parameters
mMemory available for phase 1, 2 and 3

Definition at line 185 of file merge_sorter.h.

185  {
186  p.memoryPhase1 = p.memoryPhase2 = p.memoryPhase3 = m;
187  check_not_started();
188  }

◆ set_available_memory() [2/2]

void tpie::merge_sorter_base::set_available_memory ( memory_size_type  m1,
memory_size_type  m2,
memory_size_type  m3 
)
inlineinherited

Calculate parameters from given memory amount.

Parameters
m1Memory available for phase 1
m2Memory available for phase 2
m3Memory available for phase 3

Definition at line 196 of file merge_sorter.h.

196  {
197  p.memoryPhase1 = m1;
198  p.memoryPhase2 = m2;
199  p.memoryPhase3 = m3;
200  check_not_started();
201  }

◆ set_items()

void tpie::merge_sorter_base::set_items ( stream_size_type  n)
inherited

Set upper bound on number of items pushed.

If the number of items to push is less than the size of a single run, this method will decrease the run size to that. This may make it easier for the sorter to go into internal reporting mode.

◆ set_parameters()

void tpie::merge_sorter_base::set_parameters ( memory_size_type  runLength,
memory_size_type  fanout 
)
inherited

Enable setting run length and fanout manually (for testing purposes).


The documentation for this class was generated from the following file:
tpie::sort_parameters::memoryPhase1
memory_size_type memoryPhase1
memory available while forming sorted runs.
Definition: sort_parameters.h:32
tpie::sort_parameters::filesPhase1
memory_size_type filesPhase1
files available while forming sorted runs.
Definition: sort_parameters.h:30
tpie::get_memory_manager
TPIE_EXPORT memory_manager & get_memory_manager()
Return a reference to the memory manager.
tpie::sort_parameters::fanout
memory_size_type fanout
Fanout of merge tree during phase 2.
Definition: sort_parameters.h:51
tpie::sort_parameters::memoryPhase2
memory_size_type memoryPhase2
Memory available while merging runs.
Definition: sort_parameters.h:36
tp_assert
#define tp_assert(condition, message)
Definition: tpie_assert.h:65
tpie::log_pipe_debug
logstream & log_pipe_debug()
Return logstream for writing pipe_debug log messages.
Definition: tpie_log.h:178
tpie::log_debug
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:168
tpie::array::swap
void swap(array &other)
Swap two arrays.
Definition: array.h:504
tpie::sort_parameters::internalReportThreshold
memory_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
Definition: sort_parameters.h:49
tpie::array::resize
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:490
tpie::sort_parameters::runLength
memory_size_type runLength
Run length, subject to memory restrictions during phase 2.
Definition: sort_parameters.h:46
tpie::bits::run_positions::close
void close()
Switch from any state to closed state.
tpie::sort_parameters::memoryPhase3
memory_size_type memoryPhase3
Memory available during output phase.
Definition: sort_parameters.h:40
tpie::merge_sorter::can_pull
bool can_pull()
In phase 3, return true if there are more items in the final merge phase.
Definition: merge_sorter.h:705
tpie::sort_parameters::filesPhase3
memory_size_type filesPhase3
files available during output phase.
Definition: sort_parameters.h:38
tpie::sort_parameters::filesPhase2
memory_size_type filesPhase2
files available while merging runs.
Definition: sort_parameters.h:34
tpie::merge_sorter_base::calculate_parameters
void calculate_parameters()
Calculate parameters from given memory amount.
tpie::array::size
size_type size() const
Return the size of the array.
Definition: array.h:531