20 #ifndef __TPIE_PIPELINING_SORT_H__
21 #define __TPIE_PIPELINING_SORT_H__
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/pipe_base.h>
25 #include <tpie/pipelining/factory_base.h>
26 #include <tpie/pipelining/merge_sorter.h>
28 #include <tpie/file_stream.h>
37 template <
typename T,
typename pred_t,
typename store_t>
40 template <
typename T,
typename pred_t,
typename store_t>
43 template <
typename T,
typename pred_t,
typename store_t>
61 forward(
"items",
static_cast<stream_size_type
>(m_sorter->item_count()));
62 memory_size_type memory_usage = m_sorter->actual_memory_phase_3();
66 m_propagate_called =
true;
76 if (m_propagate_called)
80 m_sorter->set_phase_3_memory(available);
81 else if (type == FILES) {
82 m_sorter->set_phase_3_files(available);
88 , m_propagate_called(false)
93 bool m_propagate_called;
101 template <
typename T,
typename pred_t,
typename store_t>
119 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
125 this->m_sorter->set_owner(
this);
128 bool can_pull()
const {
129 return this->m_sorter->can_pull();
134 return this->m_sorter->pull();
138 this->m_sorter.reset();
147 log_warning() <<
"Passive sorter used without an initiator in the final merge and output phase.\n"
148 <<
"Define an initiator and pair it up with the pipe from passive_sorter::output()." << std::endl;
158 template <
typename pred_t,
typename dest_t,
typename store_t>
173 , dest(std::move(dest))
180 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
186 this->m_sorter->set_owner(
this);
190 while (this->m_sorter->can_pull()) {
192 dest.push(std::move(y));
198 this->m_sorter.reset();
210 template <
typename T,
typename pred_t,
typename store_t>
211 class sort_calc_t :
public node {
225 template <
typename dest_t>
227 : dest(new dest_t(std::move(dest)))
229 m_sorter = this->dest->get_sorter();
230 this->dest->add_calc_dependency(this->get_token());
235 :
node(tkn), m_sorter(sorter)
241 set_minimum_resource_usage(FILES, this->m_sorter->minimumFilesPhase2);
242 set_resource_fraction(FILES, 1.0);
243 set_minimum_memory(this->m_sorter->minimum_memory_phase_2());
244 set_name(
"Perform merge heap", PRIORITY_SIGNIFICANT);
245 set_memory_fraction(1.0);
246 set_plot_options(PLOT_BUFFERED | PLOT_SIMPLIFIED_HIDE);
247 m_propagate_called =
false;
250 void propagate()
override {
252 m_propagate_called =
true;
255 void begin()
override {
256 this->m_sorter->set_owner(
this);
259 void end()
override {
260 m_weakSorter = m_sorter;
264 bool is_go_free()
const override {
return m_sorter->is_calc_free();}
267 progress_indicator_base * pi = proxy_progress_indicator();
271 bool can_evacuate()
override {
275 void evacuate()
override {
277 if (sorter) sorter->evacuate_before_reporting();
284 void set_input_node(node &
input) {
285 add_memory_share_dependency(
input);
289 void resource_available_changed(resource_type type, memory_size_type available)
override {
291 if (m_propagate_called)
295 m_sorter->set_phase_2_memory(available);
296 else if (type == FILES) {
297 m_sorter->set_phase_2_files(available);
303 std::weak_ptr<typename sorterptr::element_type> m_weakSorter;
304 bool m_propagate_called;
305 std::shared_ptr<Output> dest;
313 template <
typename T,
typename pred_t,
typename store_t>
314 class sort_input_t :
public node {
325 : m_sorter(dest.get_sorter())
326 , m_propagate_called(false)
327 , dest(std::move(dest))
329 this->dest.set_input_node(*
this);
330 set_name(
"Form input runs", PRIORITY_SIGNIFICANT);
331 set_minimum_resource_usage(FILES, sorter_t::minimumFilesPhase1);
332 set_resource_fraction(FILES, 0.0);
333 set_minimum_memory(m_sorter->minimum_memory_phase_1());
334 set_memory_fraction(1.0);
335 set_plot_options(PLOT_BUFFERED | PLOT_SIMPLIFIED_HIDE);
338 void propagate()
override {
339 if (this->can_fetch(
"items"))
340 m_sorter->set_items(this->fetch<stream_size_type>(
"items"));
341 m_propagate_called =
true;
345 m_sorter->push(std::move(item));
349 m_sorter->push(item);
352 void begin()
override {
354 m_sorter->set_owner(
this);
358 void end()
override {
361 m_weakSorter = m_sorter;
365 bool can_evacuate()
override {
369 void evacuate()
override {
371 if (sorter) sorter->evacuate_before_merging();
375 void resource_available_changed(resource_type type, memory_size_type available)
override {
377 if (m_propagate_called)
381 m_sorter->set_phase_1_memory(available);
382 else if (type == FILES) {
383 m_sorter->set_phase_1_files(available);
388 std::weak_ptr<typename sorterptr::element_type> m_weakSorter;
389 bool m_propagate_called;
390 sort_calc_t<T, pred_t, store_t> dest;
393 template <
typename pred_t,
typename store_t>
396 template <
typename dest_t>
399 template <
typename dest_t>
415 sort_factory(
const pred_t & pred, store_t store): m_pred(pred), m_store(store) {}
428 template <
typename store_t>
439 template <
typename store_t,
typename pred_t>
440 inline pipe_middle<bits::sort_factory<pred_t, store_t> >
450 template <
typename pred_t=std::less<
void>,
typename store_t=default_store>
451 inline pipe_middle<bits::sort_factory<pred_t, store_t> >
457 template <
typename T,
typename pred_t=std::less<T>,
typename store_t=default_store>
465 template <
typename T,
typename pred_t,
typename store_t>
472 typedef typename sorter_t::ptr sorterptr;
476 , m_calc_token(calc_token) {}
479 calc_t calc(std::move(m_sorter), m_calc_token);
494 template <
typename T,
typename pred_t,
typename store_t>
498 typedef typename sorter_t::ptr sorterptr;
503 , m_calc_token(calc_token)
508 res.add_calc_dependency(m_calc_token);
526 template <
typename T,
typename pred_t,
typename store_t>
539 store_t store = store_t())
540 : m_sorterInput(std::make_shared<
sorter_t>(pred, store))
541 , m_sorterOutput(m_sorterInput)
556 tp_assert(m_sorterInput,
"input() called more than once");
558 std::move(m_sorterInput), m_calc_token);
559 return {std::move(ret)};
566 tp_assert(m_sorterOutput,
"output() called more than once");
568 std::move(m_sorterOutput), m_calc_token);
569 return {std::move(ret)};
579 #endif //__TPIE_PIPELINING_SORT_H__
pipelining/factory_base.h Base class of pipelining factories
void init_sub_node(node &r)
Initialize node constructed in a subclass.
void add_memory_share_dependency(const node_token &dest)
Called by implementers to declare a node memory share dependency, that is, a requirement that another...
merge_sorter< item_type, true, pred_t, store_t > sorter_t
Type of the merge sort implementation used.
merge_sorter< item_type, true, pred_t, store_t > sorter_t
Type of the merge sort implementation used.
void init_node(node &r)
\Brief Initialize node constructed in a subclass.
input_pipe_t input()
Get the input push node.
Fantastic store strategy.
Pipelined sorter with push input and pull output.
Pipe sorter pull output node.
merge_sorter< T, true, pred_t, store_t > sorter_t
Type of the merge sort implementation used.
Merge sorting consists of three phases.
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
output_pipe_t output()
Get the output pull node.
Class to deduce the item_type of a node of type T.
T item_type
Type of items sorted.
merge_sorter< item_type, true, pred_t, store_t > sorter_t
Type of the merge sort implementation used.
void end() override
End pipeline processing phase.
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Base class of all pipelining factories.
void resource_available_changed(resource_type type, memory_size_type available) override
Called by the resource manager to notify the node's available amount of resource has changed.
bits::sort_pull_output_t< item_type, pred_t, store_t > output_t
Type of pipe sorter output.
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
void begin() override
Begin pipeline processing phase.
child_t name(const std::string &n, priority_type p=PRIORITY_USER)
Set name for this node.
#define tp_assert(condition, message)
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
logstream & log_warning()
Return logstream for writing warning log messages.
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
Pipe sorter push output node.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
void propagate() override
Propagate stream metadata.
T item_type
Type of items sorted.
void set_minimum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare minimum resource requirements.
void set_resource_fraction(resource_type type, double f)
Set the resource priority of this node.
void begin() override
Begin pipeline processing phase.
push_type< dest_t >::type item_type
Type of items sorted.
T item_type
Type of items sorted.
void step(stream_size_type steps=1)
Step the progress indicator.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
virtual void end()
End pipeline processing phase.
void set_memory_fraction(double f)
Set the memory priority of this node.
T item_type
Type of items sorted.
Factory for the passive sorter output node.
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline.
sort_output_base< item_type, pred_t, store_t > p_t
Base class.
pipe_middle< bits::sort_factory< pred_t, store_t > > sort(const pred_t &p=std::less< void >(), store_t store=default_store())
A pipelining node that sorts large elements indirectly by using a store and a given predicate.
merge_sorter< item_type, true, pred_t, store_t > sorter_t
Type of the merge sort implementation used.
pipe_middle< tfactory< bits::item_type_t, Args< T > > > item_type()
Create item type defining identity pipe node.
pipe_middle< bits::sort_factory< std::less< void >, store_t > > store_sort(store_t store=store_t())
A pipelining node that sorts large elements indirectly by using a store and std::less.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
void end() override
End pipeline processing phase.