TPIE

11a2c2d
buffer.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_COMPRESSED_BUFFER_H
21 #define TPIE_COMPRESSED_BUFFER_H
22 
26 
27 #include <tpie/array.h>
28 #include <tpie/tpie_assert.h>
29 #include <tpie/compressed/thread.h>
30 #include <map>
31 #include <memory>
32 
33 namespace tpie {
34 
39 
44 
66  enum type {
79  };
80 };
81 
86 private:
87  typedef array<char> storage_t;
88 
89  storage_t m_storage;
90  memory_size_type m_size;
92  stream_size_type m_readOffset;
93  memory_size_type m_blockSize;
94 
95 public:
96  compressor_buffer(memory_size_type capacity)
97  : m_storage(capacity)
98  , m_size(0)
100  , m_readOffset(1111111111111111111ull)
101  , m_blockSize(std::numeric_limits<memory_size_type>::max())
102  {
103  #ifndef NDEBUG
104  std::fill(m_storage.begin(), m_storage.end(), 0);
105  #endif
106  }
107 
108  compressor_buffer_state::type get_state() const {
109  return m_state;
110  }
111 
112  void set_state(compressor_buffer_state::type to) {
113  m_state = to;
114  }
115 
116  void transition_state(compressor_buffer_state::type from,
118  {
119  tp_assert(!(m_state != from), "compressor_buffer: invalid state transition");
120  unused(from);
121  set_state(to);
122  }
123 
124  bool is_busy() {
125  switch (m_state) {
126  case compressor_buffer_state::dirty: return false;
127  case compressor_buffer_state::writing: return true;
128  case compressor_buffer_state::reading: return true;
129  case compressor_buffer_state::clean: return false;
130  }
131  tp_assert(false, "is_busy: compressor_buffer in invalid state");
132  return false; // suppress compiler warning
133  }
134 
138  char * get() {
139  return m_storage.get();
140  }
141 
145  const char * get() const {
146  return m_storage.get();
147  }
148 
152  memory_size_type size() const {
153  return m_size;
154  }
155 
159  memory_size_type capacity() const {
160  return m_storage.size();
161  }
162 
166  void set_size(memory_size_type size) {
167  m_size = size;
168  }
169 
173  void set_capacity(memory_size_type capacity) {
174  m_storage.resize(capacity);
175  m_size = 0;
176  }
177 
181  void reset() {
183  m_size = 0;
184  m_readOffset = 1111111111111111111ull;
185  m_blockSize = std::numeric_limits<memory_size_type>::max();
186  }
187 
188  memory_size_type get_block_size() { return m_blockSize; }
189  stream_size_type get_read_offset() { return m_readOffset; }
190  void set_block_size(memory_size_type s) { m_blockSize = s; }
191  void set_read_offset(stream_size_type s) { m_readOffset = s; }
192 };
193 
215 public:
216  typedef std::shared_ptr<compressor_buffer> buffer_t;
217 
220 
221  buffer_t allocate_own_buffer();
222  void release_own_buffer(buffer_t &);
223 
224  bool can_take_shared_buffer();
225  buffer_t take_shared_buffer();
226  void release_shared_buffer(buffer_t &);
227 
228 private:
229  class impl;
230  impl * pimpl;
231 };
232 
240 
248 public:
249  typedef std::shared_ptr<compressor_buffer> buffer_t;
250 
251  const static memory_size_type OWN_BUFFERS = 1;
252 
253  stream_buffers(memory_size_type blockSize)
254  : m_blockSize(blockSize)
255  , m_ownBuffers(0)
256  {
257  }
258 
259  ~stream_buffers() {
260  if (!empty()) {
261  log_debug() << "ERROR: ~stream_buffers: not empty!" << std::endl;
262  }
263  }
264 
265  static memory_size_type memory_usage(memory_size_type blockSize) {
266  return blockSize;
267  }
268 
269  buffer_t get_buffer(compressor_thread_lock & lock, stream_size_type blockNumber) {
270  if (!(m_ownBuffers < OWN_BUFFERS || can_take_shared_buffer())) {
271  // First, search for the buffer in the map.
272  buffermapit target = m_buffers.find(blockNumber);
273  if (target != m_buffers.end()) return target->second;
274 
275  // If not found, wait for a free buffer to become available.
276  buffer_t b;
277  while (true) {
278  buffermapit i = m_buffers.begin();
279  while (i != m_buffers.end() && !i->second.unique()) ++i;
280  if (i == m_buffers.end()) {
281  compressor().wait_for_request_done(lock);
282  continue;
283  } else {
284  b.swap(i->second);
285  m_buffers.erase(i);
286  break;
287  }
288  }
289 
290  b->reset();
291  m_buffers.insert(std::make_pair(blockNumber, b));
292  clean();
293  return b;
294  } else {
295  // First, search for the buffer in the map.
296  std::pair<buffermapit, bool> res
297  = m_buffers.insert(std::make_pair(blockNumber, buffer_t()));
298  buffermapit & target = res.first;
299  bool & inserted = res.second;
300  if (!inserted) return target->second;
301 
302  // If not found, find a free buffer and place it in target->second.
303 
304  // We have now placed an empty shared_ptr in m_buffers
305  // (an "insertion point"), and nobody is allowed to call clean()
306  // on us before we insert something in that point.
307 
308  // target->second is the only buffer in the map with use_count() == 0.
309  // If a buffer in the map has use_count() == 1 (that is, unique() == true),
310  // that means only our map (and nobody else) refers to the buffer,
311  // so it is free to be reused.
312  buffermapit i = m_buffers.begin();
313  while (i != m_buffers.end() && !i->second.unique()) ++i;
314 
315  if (i == m_buffers.end()) {
316  // No free found: allocate new buffer.
317  if (m_ownBuffers < OWN_BUFFERS) {
318  target->second = allocate_own_buffer();
319  } else if (can_take_shared_buffer()) {
320  target->second = take_shared_buffer();
321  } else {
322  // This is a contradition of the very first check
323  // in the beginning of the method.
324  tp_assert(false, "get_buffer: Could not get a new buffer "
325  "contrary to previous checks");
326  }
327  } else {
328  // Free found: reuse buffer.
329  target->second.swap(i->second);
330  m_buffers.erase(i);
331  }
332 
333  // Bump use count before cleaning.
334  buffer_t result = target->second;
335  clean();
336  result->reset();
337  return result;
338  }
339  }
340 
341  bool empty() const {
342  return m_buffers.empty();
343  }
344 
345  void clean() {
346  buffermapit i = m_buffers.begin();
347  while (i != m_buffers.end()) {
348  buffermapit j = i++;
349  if (j->second.get() == 0) {
350  // This item in the map represents an insertion point in get_buffer,
351  // but in that case, get_buffer has the compressor lock,
352  // and it shouldn't wait before inserting something
353  throw exception("stream_buffers: j->second.get() == 0");
354  } else if (j->second.unique()) {
355  if (shared_buffers() > 0) {
356  release_shared_buffer(j->second);
357  } else {
358  release_own_buffer(j->second);
359  }
360  m_buffers.erase(j);
361  }
362  }
363  }
364 
365 private:
366  memory_size_type own_buffers() {
367  return m_ownBuffers;
368  }
369 
370  memory_size_type shared_buffers() {
371  return m_buffers.size() - m_ownBuffers;
372  }
373 
374  void release_shared_buffer(buffer_t & b) {
375  the_stream_buffer_pool().release_shared_buffer(b);
376  }
377 
378  void release_own_buffer(buffer_t & b) {
379  --m_ownBuffers;
380  the_stream_buffer_pool().release_own_buffer(b);
381  }
382 
383  bool can_take_shared_buffer() {
384  return the_stream_buffer_pool().can_take_shared_buffer();
385  }
386 
387  buffer_t take_shared_buffer() {
388  return the_stream_buffer_pool().take_shared_buffer();
389  }
390 
391  buffer_t allocate_own_buffer() {
392  ++m_ownBuffers;
393  return the_stream_buffer_pool().allocate_own_buffer();
394  }
395 
396  compressor_thread & compressor() {
397  return the_compressor_thread();
398  }
399 
400  memory_size_type block_size() const {
401  return m_blockSize;
402  }
403 
404  memory_size_type m_blockSize;
405 
406  typedef std::map<stream_size_type, buffer_t> buffermap_t;
407  typedef buffermap_t::iterator buffermapit;
408  buffermap_t m_buffers;
409 
411  memory_size_type m_ownBuffers;
412 };
413 
414 } // namespace tpie
415 
416 #endif // TPIE_COMPRESSED_BUFFER_H
tpie::compressor_buffer::reset
void reset()
Return buffer to a newly constructed state.
Definition: buffer.h:181
tpie_assert.h
tpie::the_stream_buffer_pool
stream_buffer_pool & the_stream_buffer_pool()
Get the stream buffer pool singleton.
tpie::compressor_buffer_state::clean
@ clean
The buffer is equal to the contents on the disk.
Definition: buffer.h:78
tpie::compressor_buffer::get
char * get()
Get pointer to buffer storage.
Definition: buffer.h:138
tpie::stream_buffers
Buffer manager for a single stream.
Definition: buffer.h:247
tpie::compressor_buffer::get
const char * get() const
Get pointer to buffer storage.
Definition: buffer.h:145
tpie::compressor_buffer::capacity
memory_size_type capacity() const
Get maximal byte size of buffer.
Definition: buffer.h:159
tpie::array< char >
tpie::array::begin
iterator begin()
Return an iterator to the beginning of the array.
Definition: array.h:312
tpie::compressor_buffer_state::writing
@ writing
The buffer will soon be written to disk.
Definition: buffer.h:72
array.h
tpie::compressor_buffer_state::reading
@ reading
The buffer will soon change to reflect the contents on the disk.
Definition: buffer.h:75
tpie::compressor_thread_lock
Definition: thread.h:67
tpie::compressor_buffer_state::dirty
@ dirty
The buffer is different from the contents on the disk.
Definition: buffer.h:69
tpie::compressor_buffer::size
memory_size_type size() const
Get number of bytes used to store items.
Definition: buffer.h:152
tpie::compressor_buffer
A buffer for elements belonging to a specific stream block.
Definition: buffer.h:85
tpie::exception
Definition: exception.h:33
tpie::stream_buffer_pool
Pool of shared buffers.
Definition: buffer.h:214
tp_assert
#define tp_assert(condition, message)
Definition: tpie_assert.h:65
tpie::unused
void unused(const T &x)
Declare that a variable is unused on purpose.
Definition: util.h:42
tpie::array::get
T * get()
Return a raw pointer to the array content.
Definition: array.h:536
tpie::log_debug
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:168
tpie::compressor_buffer::set_capacity
void set_capacity(memory_size_type capacity)
Resize internal buffer, clearing all elements.
Definition: buffer.h:173
tpie::compressor_buffer_state
The different states of a compressor buffer.
Definition: buffer.h:65
tpie::array::end
iterator end()
Return an iterator to the end of the array.
Definition: array.h:326
tpie::compressor_thread
Definition: thread.h:39
tpie::compressor_buffer::set_size
void set_size(memory_size_type size)
Set number of bytes used to store items.
Definition: buffer.h:166
tpie::finish_stream_buffer_pool
void finish_stream_buffer_pool()
Used by tpie::finish to free stream buffer pool.
tpie::array::resize
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:490
tpie::init_stream_buffer_pool
void init_stream_buffer_pool()
Used by tpie::init to initialize stream buffer pool.
thread.h
tpie::compressor_buffer_state::type
type
Definition: buffer.h:66
tpie
Definition: access_type.h:26
tpie::array::size
size_type size() const
Return the size of the array.
Definition: array.h:531