20 #ifndef TPIE_COMPRESSED_BUFFER_H
21 #define TPIE_COMPRESSED_BUFFER_H
90 memory_size_type m_size;
92 stream_size_type m_readOffset;
93 memory_size_type m_blockSize;
100 , m_readOffset(1111111111111111111ull)
101 , m_blockSize(std::numeric_limits<memory_size_type>::max())
104 std::fill(m_storage.
begin(), m_storage.
end(), 0);
119 tp_assert(!(m_state != from),
"compressor_buffer: invalid state transition");
131 tp_assert(
false,
"is_busy: compressor_buffer in invalid state");
139 return m_storage.
get();
145 const char *
get()
const {
146 return m_storage.
get();
152 memory_size_type
size()
const {
160 return m_storage.
size();
184 m_readOffset = 1111111111111111111ull;
185 m_blockSize = std::numeric_limits<memory_size_type>::max();
188 memory_size_type get_block_size() {
return m_blockSize; }
189 stream_size_type get_read_offset() {
return m_readOffset; }
190 void set_block_size(memory_size_type s) { m_blockSize = s; }
191 void set_read_offset(stream_size_type s) { m_readOffset = s; }
216 typedef std::shared_ptr<compressor_buffer> buffer_t;
221 buffer_t allocate_own_buffer();
222 void release_own_buffer(buffer_t &);
224 bool can_take_shared_buffer();
225 buffer_t take_shared_buffer();
226 void release_shared_buffer(buffer_t &);
249 typedef std::shared_ptr<compressor_buffer> buffer_t;
251 const static memory_size_type OWN_BUFFERS = 1;
254 : m_blockSize(blockSize)
261 log_debug() <<
"ERROR: ~stream_buffers: not empty!" << std::endl;
265 static memory_size_type memory_usage(memory_size_type blockSize) {
270 if (!(m_ownBuffers < OWN_BUFFERS || can_take_shared_buffer())) {
272 buffermapit target = m_buffers.find(blockNumber);
273 if (target != m_buffers.end())
return target->second;
278 buffermapit i = m_buffers.begin();
279 while (i != m_buffers.end() && !i->second.unique()) ++i;
280 if (i == m_buffers.end()) {
281 compressor().wait_for_request_done(lock);
291 m_buffers.insert(std::make_pair(blockNumber, b));
296 std::pair<buffermapit, bool> res
297 = m_buffers.insert(std::make_pair(blockNumber, buffer_t()));
298 buffermapit & target = res.first;
299 bool & inserted = res.second;
300 if (!inserted)
return target->second;
312 buffermapit i = m_buffers.begin();
313 while (i != m_buffers.end() && !i->second.unique()) ++i;
315 if (i == m_buffers.end()) {
317 if (m_ownBuffers < OWN_BUFFERS) {
318 target->second = allocate_own_buffer();
319 }
else if (can_take_shared_buffer()) {
320 target->second = take_shared_buffer();
324 tp_assert(
false,
"get_buffer: Could not get a new buffer "
325 "contrary to previous checks");
329 target->second.swap(i->second);
334 buffer_t result = target->second;
342 return m_buffers.empty();
346 buffermapit i = m_buffers.begin();
347 while (i != m_buffers.end()) {
349 if (j->second.get() == 0) {
353 throw exception(
"stream_buffers: j->second.get() == 0");
354 }
else if (j->second.unique()) {
355 if (shared_buffers() > 0) {
356 release_shared_buffer(j->second);
358 release_own_buffer(j->second);
366 memory_size_type own_buffers() {
370 memory_size_type shared_buffers() {
371 return m_buffers.size() - m_ownBuffers;
374 void release_shared_buffer(buffer_t & b) {
378 void release_own_buffer(buffer_t & b) {
383 bool can_take_shared_buffer() {
387 buffer_t take_shared_buffer() {
391 buffer_t allocate_own_buffer() {
397 return the_compressor_thread();
400 memory_size_type block_size()
const {
404 memory_size_type m_blockSize;
406 typedef std::map<stream_size_type, buffer_t> buffermap_t;
407 typedef buffermap_t::iterator buffermapit;
408 buffermap_t m_buffers;
411 memory_size_type m_ownBuffers;
416 #endif // TPIE_COMPRESSED_BUFFER_H