TPIE

11a2c2d
merger.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_MERGER_H__
21 #define __TPIE_PIPELINING_MERGER_H__
22 
24 #include <tpie/compressed/stream.h>
25 #include <tpie/file_stream.h>
26 #include <tpie/tpie_assert.h>
27 #include <tpie/pipelining/store.h>
28 namespace tpie {
29 
30 template <typename specific_store_t, typename pred_t>
31 class merger {
32 private:
33  typedef typename specific_store_t::store_type store_type;
34  typedef typename specific_store_t::element_type element_type;
35 
37 public:
38  merger(pred_t pred, specific_store_t store,
40  : pq(0, predwrap(store_pred_t(pred)), bucket)
41  , in(bucket)
42  , itemsRead(bucket)
43  , m_store(store) {
44  }
45 
46  bool can_pull() {
47  return !pq.empty();
48  }
49 
50  store_type pull() {
51  tp_assert(can_pull(), "pull() while !can_pull()");
52  store_type el = std::move(pq.top().first);
53  size_t i = pq.top().second;
54  if (in[i].can_read() && itemsRead[i] < runLength) {
55  pq.pop_and_push(
56  std::make_pair(m_store.element_to_store(in[i].read()), i));
57  ++itemsRead[i];
58  } else {
59  pq.pop();
60  }
61  if (!can_pull()) {
62  reset();
63  }
64  return el;
65  }
66 
67  void reset() {
68  in.resize(0);
69  pq.resize(0);
70  itemsRead.resize(0);
71  }
72 
73  // Initialize merger with given sorted input runs. Each file stream is
74  // assumed to have a stream offset pointing to the first item in the run,
75  // and runLength items are read from each stream (unless end of stream
76  // occurs earlier).
77  // Precondition: !can_pull()
78  void reset(array<file_stream<element_type> > & inputs, stream_size_type runLength) {
79  this->runLength = runLength;
80  tp_assert(pq.empty(), "Reset before we are done");
81  in.swap(inputs);
82  pq.resize(in.size());
83  for (size_t i = 0; i < in.size(); ++i) {
84  pq.unsafe_push(
85  std::make_pair(
86  m_store.element_to_store(in[i].read()), i));
87  }
88  pq.make_safe();
89  itemsRead.resize(in.size(), 1);
90  }
91 
92  // Compute memory usage as a function of the fanout
93  static constexpr linear_memory_usage memory_usage() noexcept {
94  return
95  linear_memory_usage(-sizeof(file_stream<element_type>) //in filestreams,
96  + file_stream<element_type>::memory_usage(), //in filestreams
97  sizeof(merger)
98  - sizeof(internal_priority_queue<std::pair<store_type, size_t>, predwrap>) //pq
99  - sizeof(array<file_stream<element_type> >) //in
100  - sizeof(array<size_t>)) // itemsRead
101  + array<size_t>::memory_usage() //itemsRead
103  + array<file_stream<element_type> >::memory_usage(); //in
104  }
105 
106 
107  static constexpr memory_size_type memory_usage(memory_size_type fanout) noexcept {
108  return memory_usage()(fanout);
109  }
110 
111  class predwrap {
112  public:
113  typedef std::pair<store_type, size_t> item_type;
114  typedef item_type first_argument_type;
115  typedef item_type second_argument_type;
116  typedef bool result_type;
117 
118  predwrap(store_pred_t pred)
119  : pred(pred)
120  {
121  }
122 
123  inline bool operator()(const item_type & lhs, const item_type & rhs) {
124  return pred(lhs.first, rhs.first);
125  }
126 
127  private:
128  store_pred_t pred;
129  };
130 
131 private:
134  array<stream_size_type> itemsRead;
135  stream_size_type runLength;
136  specific_store_t m_store;
137 };
138 
139 } // namespace tpie
140 
141 #endif // __TPIE_PIPELINING_MERGER_H__
tpie_assert.h
tpie::internal_priority_queue
Standard binary internal heap.
Definition: internal_priority_queue.h:37
tpie::linear_memory_usage
Definition: util.h:62
stream.h
tpie::array
A generic array with a fixed size.
Definition: array.h:149
tpie::bits::store_pred
Definition: store.h:30
tpie::file_stream< element_type >
tpie::memory_bucket_ref
Class storring a reference to a memory bucket.
Definition: memory.h:334
tp_assert
#define tp_assert(condition, message)
Definition: tpie_assert.h:65
internal_priority_queue.h
Simple heap based priority queue implementation.
tpie::merger::predwrap
Definition: merger.h:111
tpie::array::resize
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:490
tpie::merger
Definition: merger.h:31
tpie
Definition: access_type.h:26