20 #ifndef TPIE_PIPELINING_SERIALIZATION_SORT_H
21 #define TPIE_PIPELINING_SERIALIZATION_SORT_H
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/pipe_base.h>
25 #include <tpie/pipelining/factory_base.h>
26 #include <tpie/serialization_sorter.h>
30 namespace serialization_bits {
32 template <
typename T,
typename pred_t>
36 typedef pred_t pred_type;
38 typedef std::shared_ptr<sorter_t> sorterptr;
41 template <
typename Traits>
44 template <
typename Traits>
47 template <
typename Traits>
49 typedef typename Traits::pred_type pred_type;
62 void set_calc_node(
node & calc) {
68 forward(
"items",
static_cast<stream_size_type
>(m_sorter->item_count()));
69 memory_size_type memory_usage = m_sorter->actual_memory_phase_3();
73 m_propagate_called =
true;
77 m_sorter->set_owner(
this);
81 m_sorter->set_owner(
nullptr);
91 if (!m_propagate_called)
92 m_sorter->set_phase_3_memory(availableMemory);
97 , m_propagate_called(false)
103 , m_propagate_called(false)
108 bool m_propagate_called;
114 template <
typename Traits>
117 typedef typename Traits::item_type
item_type;
118 typedef typename Traits::pred_type pred_type;
119 typedef typename Traits::sorter_t
sorter_t;
120 typedef typename Traits::sorterptr
sorterptr;
126 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
130 bool can_pull()
const {
131 return this->m_sorter->can_pull();
136 return this->m_sorter->pull();
145 log_warning() <<
"Passive sorter used without an initiator in the final merge and output phase.\n"
146 <<
"Define an initiator and pair it up with the pipe from passive_sorter::output()." << std::endl;
154 template <
typename Traits,
typename dest_t>
156 typedef typename Traits::pred_type pred_type;
158 typedef typename Traits::item_type
item_type;
160 typedef typename Traits::sorter_t
sorter_t;
161 typedef typename Traits::sorterptr
sorterptr;
165 , dest(std::move(dest))
169 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
174 while (this->m_sorter->can_pull()) {
175 dest.push(this->m_sorter->pull());
188 template <
typename Traits>
189 class sort_calc_t :
public node {
191 typedef typename Traits::item_type item_type;
192 typedef typename Traits::sorter_t sorter_t;
193 typedef typename Traits::sorterptr sorterptr;
195 typedef sort_output_base<Traits> Output;
197 sort_calc_t(sort_calc_t && other) =
default;
199 template <
typename dest_t>
200 sort_calc_t(dest_t dest)
201 : dest(new dest_t(std::move(dest)))
203 m_sorter = this->dest->get_sorter();
204 this->dest->add_calc_dependency(this->get_token());
208 sort_calc_t(sorterptr sorter, node_token tkn)
209 : node(tkn), m_sorter(sorter)
215 set_minimum_memory(sorter_t::minimum_memory_phase_2());
216 set_name(
"Perform merge heap", PRIORITY_SIGNIFICANT);
217 set_memory_fraction(1.0);
218 m_propagate_called =
false;
221 void propagate()
override {
223 m_propagate_called =
true;
226 void begin()
override {
227 m_sorter->set_owner(
this);
230 bool is_go_free()
const override {
return m_sorter->is_merge_runs_free();}
233 progress_indicator_base * pi = proxy_progress_indicator();
234 log_debug() <<
"TODO: Progress information during merging." << std::endl;
235 m_sorter->merge_runs();
241 void end()
override {
242 m_weak_sorter = m_sorter;
246 bool can_evacuate()
override {
250 void evacuate()
override {
251 auto p = m_weak_sorter.lock();
252 if (p) p->evacuate();
255 sorterptr get_sorter()
const {
259 void set_input_node(node &
input) {
260 add_memory_share_dependency(
input);
264 void set_available_memory(memory_size_type availableMemory)
override {
265 if (!m_propagate_called)
266 m_sorter->set_phase_2_memory(availableMemory);
271 std::weak_ptr<typename sorterptr::element_type> m_weak_sorter;
272 bool m_propagate_called;
273 std::shared_ptr<Output> dest;
281 template <
typename Traits>
282 class sort_input_t :
public node {
283 typedef typename Traits::pred_type pred_type;
285 typedef typename Traits::item_type
item_type;
286 typedef typename Traits::sorter_t sorter_t;
287 typedef typename Traits::sorterptr sorterptr;
289 sort_input_t(sort_calc_t<Traits> dest)
290 : m_sorter(dest.get_sorter())
291 , dest(std::move(dest))
293 this->dest.set_input_node(*
this);
294 set_minimum_memory(sorter_t::minimum_memory_phase_1());
295 set_name(
"Form input runs", PRIORITY_SIGNIFICANT);
296 set_memory_fraction(1.0);
297 m_propagate_called =
false;
300 void propagate()
override {
301 m_propagate_called =
true;
304 void begin()
override {
305 m_sorter->set_owner(
this);
309 void push(
const item_type & item) {
310 m_sorter->push(item);
313 void end()
override {
315 m_weak_sorter = m_sorter;
319 bool can_evacuate()
override {
323 void evacuate()
override {
324 auto p = m_weak_sorter.lock();
325 if (p) p->evacuate();
329 void set_available_memory(memory_size_type availableMemory)
override {
330 if (!m_propagate_called)
331 m_sorter->set_phase_1_memory(availableMemory);
336 std::weak_ptr<typename sorterptr::element_type> m_weak_sorter;
337 sort_calc_t<Traits> dest;
338 bool m_propagate_called;
344 template <
typename pred_t>
347 template <
typename dest_t>
350 template <
typename dest_t>
377 template <
typename pred_t=std::less<
void>>
384 template <
typename T,
typename pred_t=std::less<T> >
387 namespace serialization_bits {
392 template <
typename Traits>
399 typedef typename Traits::sorter_t sorter_t;
400 typedef typename Traits::sorterptr sorterptr;
404 , m_calc_token(calc_token) {}
407 calc_t calc(std::move(m_sorter), m_calc_token);
422 template <
typename Traits>
426 typedef typename Traits::sorterptr sorterptr;
431 , m_calc_token(calc_token)
436 res.add_calc_dependency(m_calc_token);
456 template <
typename T,
typename pred_t>
473 : m_sorter_input(new
sorter_t(sizeof(T), pred))
474 , m_sorter_output(m_sorter_input)
487 assert(m_sorter_input);
488 return input_pipe_t(std::move(m_sorter_input), m_calc_token);
495 assert(m_sorter_output);
496 return output_pipe_t(std::move(m_sorter_output), m_calc_token);
507 #endif // TPIE_PIPELINING_SERIALIZATION_SORT_H