20 #ifndef __TPIE_PIPELINING_MERGE_SORTER_H__
21 #define __TPIE_PIPELINING_MERGE_SORTER_H__
23 #include <tpie/tpie_export.h>
25 #include <tpie/pipelining/sort_parameters.h>
26 #include <tpie/pipelining/merger.h>
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/exception.h>
70 static memory_size_type memory_usage() noexcept;
100 void final_level(memory_size_type fanout);
105 void set_position(memory_size_type mergeLevel, memory_size_type runNumber,
stream_position pos);
110 stream_position get_position(memory_size_type mergeLevel, memory_size_type runNumber);
121 memory_size_type m_levels;
123 memory_size_type m_runs[2];
131 bool m_finalExtraSet;
142 memory_size_type item_size,
143 memory_size_type element_file_stream_memory_usage);
145 static const memory_size_type defaultFiles = 253;
146 static const memory_size_type minimumFilesPhase1 = 1;
147 static const memory_size_type maximumFilesPhase1 = 1;
148 static const memory_size_type minimumFilesPhase2 = 5;
149 static const memory_size_type maximumFilesPhase2 = std::numeric_limits<memory_size_type>::max();
150 static const memory_size_type minimumFilesPhase3 = 5;
151 static const memory_size_type maximumFilesPhase3 = std::numeric_limits<memory_size_type>::max();
157 void set_parameters(memory_size_type runLength, memory_size_type fanout);
164 p.filesPhase1 = p.filesPhase2 = p.filesPhase3 = f;
186 p.memoryPhase1 = p.memoryPhase2 = p.memoryPhase3 = m;
203 stream_size_type item_count() {
208 memory_size_type evacuated_memory_usage()
const {
209 return 2*p.fanout*
sizeof(temp_file);
221 void set_items(stream_size_type n);
225 void set_phase_1_files(memory_size_type f1) {
230 void set_phase_2_files(memory_size_type f2) {
235 void set_phase_3_files(memory_size_type f3) {
240 void set_phase_1_memory(memory_size_type m1) {
245 void set_phase_2_memory(memory_size_type m2) {
250 void set_phase_3_memory(memory_size_type m3) {
255 bool is_calc_free()
const {
256 tp_assert(m_state == stMerge,
"Wrong phase");
257 return m_reportInternal || m_finishedRuns <= p.fanout;
261 memory_size_type minimum_memory_phase_1() noexcept {
269 sort_parameters tmp_p((sort_parameters()));
271 tmp_p.fanout = calculate_fanout(std::numeric_limits<memory_size_type>::max(), 0);
272 return phase_1_memory(tmp_p);
275 memory_size_type minimum_memory_phase_2() noexcept {
276 return m_fanout_memory_usage(calculate_fanout(0, 0));
279 memory_size_type minimum_memory_phase_3() noexcept {
280 return m_fanout_memory_usage(calculate_fanout(0, 0));
283 memory_size_type maximum_memory_phase_3() noexcept {
284 return std::numeric_limits<memory_size_type>::max();
287 memory_size_type phase_1_memory(
const sort_parameters & params) noexcept {
288 return params.runLength * m_item_size
290 + m_element_file_stream_memory_usage
291 + 2*params.fanout*
sizeof(temp_file);
294 memory_size_type phase_2_memory(
const sort_parameters & params) noexcept {
295 return m_fanout_memory_usage(params.fanout);
298 memory_size_type phase_3_memory(
const sort_parameters & params) noexcept {
299 return m_fanout_memory_usage(params.finalFanout);
305 memory_size_type calculate_fanout(memory_size_type availableMemory, memory_size_type availableFiles) noexcept;
312 void calculate_parameters();
315 void check_not_started() {
316 if (m_state != stNotStarted) {
317 throw tpie::exception(
"Can't change parameters after merge sorting has started");
324 static stream_size_type
calculate_run_length(stream_size_type initialRunLength, memory_size_type fanout, memory_size_type mergeLevel) {
325 stream_size_type runLength = initialRunLength;
326 for (memory_size_type i = 0; i < mergeLevel; ++i) {
339 const linear_memory_usage m_fanout_memory_usage;
340 const memory_size_type m_item_size, m_element_file_stream_memory_usage;
342 std::unique_ptr<memory_bucket> m_bucketPtr;
343 memory_bucket_ref m_bucket;
345 array<temp_file> m_runFiles;
350 bool m_parametersSet;
352 bits::run_positions m_runPositions;
357 stream_size_type m_finishedRuns;
361 memory_size_type m_currentRunItemCount;
363 bool m_reportInternal;
367 memory_size_type m_itemsPulled;
369 stream_size_type m_itemCount;
371 stream_size_type m_maxItems;
374 bool m_finalMergeInitialized;
375 memory_size_type m_finalMergeLevel;
376 memory_size_type m_finalRunCount;
377 memory_size_type m_finalMergeSpecialRunNumber;
395 template <
typename T,
bool UseProgress,
typename pred_t = std::less<T>,
typename store_t=default_store>
398 typedef typename store_t::template element_type<T>::type TT;
399 typedef typename store_t::template specific<TT> specific_store_t;
400 typedef typename specific_store_t::outer_type outer_type;
401 typedef typename specific_store_t::store_type store_type;
402 typedef typename specific_store_t::element_type element_type;
403 typedef outer_type item_type;
404 static const size_t item_size = specific_store_t::item_size;
407 typedef std::shared_ptr<merge_sorter> ptr;
410 merge_sorter(pred_t pred = pred_t(), store_t store = store_t())
412 , m_store(store.template get_specific<element_type>())
413 , m_merger(pred, m_store, m_bucket)
414 , m_currentRunItems(m_bucket)
424 tp_assert(m_state == stNotStarted,
"Merge sorting already begun");
429 m_runFiles.resize(p.
fanout*2);
430 m_currentRunItemCount = 0;
432 m_state = stRunFormation;
440 tp_assert(m_state == stRunFormation,
"Wrong phase");
441 if (m_currentRunItemCount >= p.
runLength) {
445 m_currentRunItems[m_currentRunItemCount] = m_store.outer_to_store(std::move(item));
446 ++m_currentRunItemCount;
450 void push(
const item_type & item) {
451 tp_assert(m_state == stRunFormation,
"Wrong phase");
452 if (m_currentRunItemCount >= p.
runLength) {
456 m_currentRunItems[m_currentRunItemCount] = m_store.outer_to_store(item);
457 ++m_currentRunItemCount;
465 tp_assert(m_state == stRunFormation,
"Wrong phase");
468 if (m_itemCount == 0) {
469 tp_assert(m_currentRunItemCount == 0,
"m_itemCount == 0, but m_currentRunItemCount != 0");
470 m_reportInternal =
true;
472 m_currentRunItems.
resize(0);
473 log_debug() <<
"Got no items. Internal reporting mode." << std::endl;
476 m_reportInternal =
true;
478 log_debug() <<
"Got " << m_currentRunItemCount <<
" items. Internal reporting mode." << std::endl;
480 }
else if (m_finishedRuns == 0
488 for (
size_t i=0; i < m_currentRunItemCount; ++i)
489 currentRun[i] = std::move(m_currentRunItems[i]);
490 m_currentRunItems.
swap(currentRun);
492 m_reportInternal =
true;
494 log_debug() <<
"Got " << m_currentRunItemCount <<
" items. Internal reporting mode "
495 <<
"after resizing item buffer." << std::endl;
498 m_reportInternal =
false;
500 m_currentRunItems.
resize(0);
501 log_debug() <<
"Got " << m_finishedRuns <<
" runs. External reporting mode." << std::endl;
512 tp_assert(m_state == stMerge,
"Wrong phase");
513 if (!m_reportInternal) {
524 tp_assert(m_state == stMerge || m_state == stReport,
"Wrong phase");
525 if (m_reportInternal) {
526 log_pipe_debug() <<
"Evacuate merge_sorter (" <<
this <<
") in internal reporting mode" << std::endl;
527 m_reportInternal =
false;
528 memory_size_type runCount = (m_currentRunItemCount > 0) ? 1 : 0;
530 m_currentRunItems.
resize(0);
531 initialize_final_merger(0, runCount);
532 }
else if (m_state == stMerge) {
533 log_pipe_debug() <<
"Evacuate merge_sorter (" <<
this <<
") before merge in external reporting mode (noop)" << std::endl;
537 log_pipe_debug() <<
"Evacuate merge_sorter (" <<
this <<
") before reporting in external reporting mode" << std::endl;
543 void evacuate_before_merging() {
544 if (m_state == stMerge) evacuate();
547 void evacuate_before_reporting() {
548 if (m_state == stReport && (!m_reportInternal || m_itemsPulled == 0)) evacuate();
556 void sort_current_run() {
558 bits::store_pred<pred_t, specific_store_t>(pred));
562 void empty_current_run() {
563 if (m_finishedRuns < 10)
564 log_pipe_debug() <<
"Write " << m_currentRunItemCount <<
" items to run file " << m_finishedRuns << std::endl;
565 else if (m_finishedRuns == 10)
567 file_stream<element_type> fs;
568 open_run_file_write(fs, 0, m_finishedRuns);
569 for (memory_size_type i = 0; i < m_currentRunItemCount; ++i)
570 fs.write(m_store.store_to_element(std::move(m_currentRunItems[i])));
571 m_currentRunItemCount = 0;
579 void initialize_merger(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount) {
584 array<file_stream<element_type> > in(runCount);
585 for (memory_size_type i = 0; i < runCount; ++i) {
586 open_run_file_read(in[i], mergeLevel, runNumber+i);
590 m_merger.reset(in, runLength);
596 void initialize_final_merger(memory_size_type finalMergeLevel, memory_size_type runCount) {
597 if (m_finalMergeInitialized) {
598 reinitialize_final_merger();
602 m_finalMergeInitialized =
true;
603 m_finalMergeLevel = finalMergeLevel;
604 m_finalRunCount = runCount;
608 log_pipe_debug() <<
"Run count in final level (" << runCount <<
") is greater than the final fanout (" << p.
finalFanout <<
")\n";
611 memory_size_type n = runCount-i;
612 log_pipe_debug() <<
"Merge " << n <<
" runs starting from #" << i << std::endl;
613 dummy_progress_indicator pi;
614 m_finalMergeSpecialRunNumber = merge_runs(finalMergeLevel, i, n, pi);
616 log_pipe_debug() <<
"Run count in final level (" << runCount <<
") is less or equal to the final fanout (" << p.
finalFanout <<
")" << std::endl;
617 m_finalMergeSpecialRunNumber = std::numeric_limits<memory_size_type>::max();
619 reinitialize_final_merger();
623 void reinitialize_final_merger() {
624 tp_assert(m_finalMergeInitialized,
"reinitialize_final_merger while !m_finalMergeInitialized");
626 if (m_finalMergeSpecialRunNumber != std::numeric_limits<memory_size_type>::max()) {
627 array<file_stream<element_type> > in(p.
finalFanout);
628 for (memory_size_type i = 0; i < p.
finalFanout-1; ++i) {
629 open_run_file_read(in[i], m_finalMergeLevel, i);
630 log_pipe_debug() <<
"Run " << i <<
" is at offset " << in[i].offset() <<
" and has size " << in[i].size() << std::endl;
632 open_run_file_read(in[p.
finalFanout-1], m_finalMergeLevel+1, m_finalMergeSpecialRunNumber);
636 m_merger.reset(in, runLength);
638 initialize_merger(m_finalMergeLevel, 0, m_finalRunCount);
649 template <
typename ProgressIndicator>
650 memory_size_type merge_runs(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount, ProgressIndicator & pi) {
651 initialize_merger(mergeLevel, runNumber, runCount);
652 file_stream<element_type> out;
653 memory_size_type nextRunNumber = runNumber/p.
fanout;
654 open_run_file_write(out, mergeLevel+1, nextRunNumber);
655 while (m_merger.can_pull()) {
657 out.write(m_store.store_to_element(m_merger.pull()));
659 return nextRunNumber;
665 void prepare_pull(
typename Progress::base & pi) {
669 int treeHeight=
static_cast<int>(ceil(log(
static_cast<float>(m_finishedRuns)) /
670 log(
static_cast<float>(p.
fanout))));
671 pi.init(item_count()*treeHeight);
673 memory_size_type mergeLevel = 0;
674 memory_size_type runCount = m_finishedRuns;
675 while (runCount > p.
fanout) {
676 log_pipe_debug() <<
"Merge " << runCount <<
" runs in merge level " << mergeLevel <<
'\n';
678 memory_size_type newRunCount = 0;
679 for (memory_size_type i = 0; i < runCount; i += p.
fanout) {
680 memory_size_type n = std::min(runCount-i, p.
fanout);
682 if (newRunCount < 10)
683 log_pipe_debug() <<
"Merge " << n <<
" runs starting from #" << i << std::endl;
684 else if (newRunCount == 10)
687 merge_runs(mergeLevel, i, n, pi);
691 runCount = newRunCount;
693 log_pipe_debug() <<
"Final merge level " << mergeLevel <<
" has " << runCount <<
" runs" << std::endl;
694 initialize_final_merger(mergeLevel, runCount);
706 tp_assert(m_state == stReport,
"Wrong phase");
707 if (m_reportInternal)
return m_itemsPulled < m_currentRunItemCount;
709 if (m_evacuated) reinitialize_final_merger();
710 return m_merger.can_pull();
718 tp_assert(m_state == stReport,
"Wrong phase");
719 if (m_reportInternal && m_itemsPulled < m_currentRunItemCount) {
720 store_type el = std::move(m_currentRunItems[m_itemsPulled++]);
722 return m_store.store_to_outer(std::move(el));
724 if (m_evacuated) reinitialize_final_merger();
725 m_runPositions.
close();
726 return m_store.store_to_outer(m_merger.pull());
731 memory_size_type actual_memory_phase_3() {
732 tp_assert(m_state == stReport,
"Wrong phase");
733 if (m_reportInternal)
734 return m_runFiles.memory_usage(m_runFiles.size())
737 return fanout_memory_usage(m_finalRunCount);
745 static constexpr linear_memory_usage fanout_memory_usage() noexcept {
746 return merger<specific_store_t, pred_t>::memory_usage()
748 + file_stream<element_type>::memory_usage()
749 + 2*
sizeof(temp_file);
752 static constexpr memory_size_type fanout_memory_usage(memory_size_type fanout) noexcept {
753 return fanout_memory_usage()(fanout);
762 memory_size_type run_file_index(memory_size_type mergeLevel, memory_size_type runNumber) {
766 return (mergeLevel % 2)*p.
fanout + (runNumber % p.
fanout);
772 void open_run_file_write(file_stream<element_type> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
775 memory_size_type idx = run_file_index(mergeLevel, runNumber);
776 if (runNumber < p.
fanout) m_runFiles[idx].free();
778 fs.seek(0, file_stream_base::end);
779 m_runPositions.
set_position(mergeLevel, runNumber, fs.get_position());
785 void open_run_file_read(file_stream<element_type> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
788 memory_size_type idx = run_file_index(mergeLevel, runNumber);
790 fs.set_position(m_runPositions.
get_position(mergeLevel, runNumber));
793 specific_store_t m_store;
794 merger<specific_store_t, pred_t> m_merger;
797 array<store_type> m_currentRunItems;
804 #endif // __TPIE_PIPELINING_MERGE_SORTER_H__