TPIE

11a2c2d
stream.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_COMPRESSED_STREAM_H
21 #define TPIE_COMPRESSED_STREAM_H
22 
26 
27 #include <tpie/tpie_export.h>
28 #include <tpie/array.h>
29 #include <tpie/tpie_assert.h>
30 #include <tpie/tempname.h>
31 #include <tpie/file_base_crtp.h>
32 #include <tpie/file_stream_base.h>
34 #include <tpie/stream_writable.h>
35 
36 namespace tpie {
37 
38 struct open {
39  enum type {
41  read_only = 00000001,
44  write_only = 00000002,
47  access_normal = 00000004,
50  access_random = 00000010,
53  compression_normal = 00000020,
57  compression_all = 00000040,
58 
59  defaults = 0
60  };
61 
62  friend inline open::type operator|(open::type a, open::type b)
63  { return (open::type) ((int) a | (int) b); }
64  friend inline open::type operator&(open::type a, open::type b)
65  { return (open::type) ((int) a & (int) b); }
66  friend inline open::type operator^(open::type a, open::type b)
67  { return (open::type) ((int) a ^ (int) b); }
68  friend inline open::type operator~(open::type a)
69  { return (open::type) ~(int) a; }
70 };
71 
72 
73 class compressed_stream_base_p;
74 
79 class TPIE_EXPORT compressed_stream_base {
80 public:
81  friend class compressed_stream_base_p;
82 
83  static const file_stream_base::offset_type beginning = file_stream_base::beginning;
84  static const file_stream_base::offset_type end = file_stream_base::end;
85  static const file_stream_base::offset_type current = file_stream_base::current;
86 
87  typedef file_stream_base::offset_type offset_type;
88 protected:
89  struct seek_state {
90  enum type {
91  none,
92  beginning,
93  end,
94  position
95  };
96  };
97 
98  compressed_stream_base(memory_size_type itemSize,
99  double blockFactor);
100 
101  // Non-virtual, protected destructor
103 
104 
109  void cache_read_writes();
110 
111  void peak_unlikely();
112 
113  void read_back_unlikely();
114 
115  void write_unlikely(const char * item);
116 
117  static memory_size_type memory_usage(double blockFactor=1.0) noexcept;
118 
119 public:
120  bool is_readable() const noexcept;
121 
122  bool is_writable() const noexcept;
123 
124  static memory_size_type block_size(double blockFactor) noexcept;
125 
126  static double calculate_block_factor(memory_size_type blockSize) noexcept;
127 
128  static memory_size_type block_memory_usage(double blockFactor) noexcept;
129 
130  memory_size_type block_items() const;
131 
132  memory_size_type block_size() const;
133 
134  template <typename TT>
135  void read_user_data(TT & data) {
136  if (sizeof(TT) != user_data_size())
137  throw stream_exception("Wrong user data size");
138  read_user_data(reinterpret_cast<void *>(&data), sizeof(TT));
139  }
140 
141  memory_size_type read_user_data(void * data, memory_size_type count);
142 
143  template <typename TT>
144  void write_user_data(const TT & data) {
145  if (sizeof(TT) > max_user_data_size())
146  throw stream_exception("Wrong user data size");
147  write_user_data(reinterpret_cast<const void *>(&data), sizeof(TT));
148  }
149 
150  void write_user_data(const void * data, memory_size_type count);
151 
152  memory_size_type user_data_size() const;
153 
154  memory_size_type max_user_data_size() const;
155 
156  const std::string & path() const;
157 
168  void open(const std::string & path,
169  access_type accessType,
170  memory_size_type userDataSize = 0,
171  cache_hint cacheHint=access_sequential,
172  compression_flags compressionFlags=compression_none);
173 
177  void open(memory_size_type userDataSize,
178  cache_hint cacheHint=access_sequential,
179  compression_flags compressionFlags=compression_none);
180 
184  void open(temp_file & file,
185  access_type accessType,
186  memory_size_type userDataSize = 0,
187  cache_hint cacheHint=access_sequential,
188  compression_flags compressionFlags=compression_none);
189 
193  void open(const std::string & path, compression_flags compressionFlags);
194 
198  void open(compression_flags compressionFlags);
199 
203  void open(temp_file & file, compression_flags compressionFlags);
204 
245  void open(const std::string & path, open::type openFlags=open::defaults, memory_size_type userDataSize=0);
246 
253  void open(open::type openFlags=open::defaults, memory_size_type userDataSize=0);
254 
262  void open(temp_file & file, open::type openFlags=open::defaults, memory_size_type userDataSize=0);
263 
264  void close();
265 
266  bool is_open() const noexcept;
267 
268  stream_size_type offset() const;
269 
273  void describe(std::ostream & out);
274 
278  std::string describe();
279 
280 
285  void seek(stream_offset_type offset, offset_type whence=beginning);
286 
293  void truncate(stream_size_type offset);
294 
298  void truncate(const stream_position & pos);
299 
312  stream_position get_position();
313 
318  void set_position(const stream_position & pos);
319 
320  stream_size_type size() const { return m_size; }
321 
322  stream_size_type file_size() const { return size(); }
323 
324 
328  bool can_read() {
329  if (m_cachedReads > 0) return true;
330  return offset() < size();
331  }
332 
336  bool can_read_back() {
337  return offset() > 0;
338  }
339 protected:
341  memory_size_type m_cachedReads;
343  memory_size_type m_cachedWrites;
344 
346  stream_size_type m_size;
349  seek_state::type m_seekState;
350 
364  stream_size_type m_offset;
365 
368 
370  char * m_bufferEnd;
371 
373  char * m_nextItem;
374 
375  compressed_stream_base_p * m_p;
376 };
377 
378 
379 
401 template <typename T>
402 class file_stream : public compressed_stream_base {
403  static_assert(is_stream_writable<T>::value, "file_stream item type must be trivially copyable");
404 public:
405  typedef T item_type;
406 
407  file_stream(double blockFactor=1.0)
408  : compressed_stream_base(sizeof(T), blockFactor) {}
409 
410  static memory_size_type memory_usage(double blockFactor=1.0) noexcept {
411  // m_buffer is included in m_buffers memory usage
412  return sizeof(file_stream) + compressed_stream_base::memory_usage(blockFactor);
413  }
414 
425  const T & read() {
426  if (m_cachedReads == 0) {
427  peak_unlikely();
428  const T & res = *reinterpret_cast<const T*>(m_nextItem);
429  ++m_offset;
430  m_nextItem += sizeof(T);
431  cache_read_writes();
432  return res;
433  }
434  --m_cachedReads;
435  ++m_offset;
436  const T & res = *reinterpret_cast<const T*>(m_nextItem);
437  m_nextItem += sizeof(T);
438  return res;
439  }
440 
451  const T & peek() {
452  if (m_cachedReads == 0) peak_unlikely();
453  return *reinterpret_cast<const T*>(m_nextItem);
454  }
455 
456  void skip() {
457  read();
458  }
459 
460  void skip_back() {
461  read_back();
462  }
463 
470  template <typename IT>
471  void read(IT const a, IT const b) {
472  for (IT i = a; i != b; ++i) *i = read();
473  }
474 
475  const T & read_back() {
476  if (m_seekState != seek_state::none || m_nextItem == m_bufferBegin) read_back_unlikely();
477  ++m_cachedReads;
478  --m_offset;
479  m_nextItem -= sizeof(T);
480  return *reinterpret_cast<const T *>(m_nextItem);
481  }
482 
483  void write(const T & item) {
484  if (m_cachedWrites == 0) {
485  write_unlikely(reinterpret_cast<const char*>(&item));
486  return;
487  }
488  memcpy(m_nextItem, &item, sizeof(T));
489  m_nextItem += sizeof(T);
490  ++m_size;
491  ++m_offset;
492  --m_cachedWrites;
493  return;
494  }
495 
496  template <typename IT>
497  void write(IT const a, IT const b) {
498  for (IT i = a; i != b; ++i) write(*i);
499  }
500 };
501 
502 } // namespace tpie
503 
504 #endif // TPIE_COMPRESSED_STREAM_H
tpie::compressed_stream_base::m_cachedWrites
memory_size_type m_cachedWrites
Number of cheap, unchecked writes we can do next.
Definition: stream.h:343
tpie::stream_crtp< file_stream_base >::offset_type
offset_type
Type describing how we should interpret the offset supplied to seek.
Definition: stream_crtp.h:39
tpie::open::access_random
@ access_random
Random access is intended.
Definition: stream.h:50
tpie::compressed_stream_base::m_nextItem
char * m_nextItem
Next item in buffer to read/write.
Definition: stream.h:373
tpie_assert.h
file_stream_base.h
tpie::compressed_stream_base::m_bufferBegin
char * m_bufferBegin
Only when m_buffer.get() != 0: First item in writable buffer.
Definition: stream.h:367
tpie::file_stream::read
const T & read()
Reads next item from stream if can_read() == true.
Definition: stream.h:425
tpie::open::type
type
Definition: stream.h:39
tpie::is_stream_writable
Definition: stream_writable.h:59
tpie::open
Definition: stream.h:38
tempname.h
tpie::open::compression_all
@ compression_all
Compress all blocks according to the preferred compression scheme which can be set using tpie::the_co...
Definition: stream.h:57
tpie::file_stream
Compressed stream.
Definition: predeclare.h:49
tpie::file_stream::peek
const T & peek()
Peeks next item from stream if can_read() == true.
Definition: stream.h:451
array.h
tpie::compressed_stream_base::m_seekState
seek_state::type m_seekState
Buffer manager for this entire stream.
Definition: stream.h:349
tpie::access_type
access_type
Type describing how we wish to access a file.
Definition: access_type.h:29
tpie::compression_flags
compression_flags
Possible values for the compressionFlags parameter to stream::open.
Definition: scheme.h:33
tpie::compressed_stream_base::m_bufferEnd
char * m_bufferEnd
Only when m_buffer.get() != 0: End of writable buffer.
Definition: stream.h:370
tpie::open::compression_normal
@ compression_normal
Compress some blocks according to available resources (time, memory).
Definition: stream.h:53
tpie::compressed_stream_base::seek_state
Definition: stream.h:89
tpie::cache_hint
cache_hint
Definition: cache_hint.h:28
tpie::open::access_normal
@ access_normal
Neither sequential access nor random access is intended.
Definition: stream.h:47
tpie::compressed_stream_base::can_read_back
bool can_read_back()
Check if the next call to read_back() will succeed or not.
Definition: stream.h:336
tpie::stream_exception
Definition: exception.h:45
tpie::compressed_stream_base
Base class containing the implementation details that are independent of the item type.
Definition: stream.h:79
tpie::access_sequential
@ access_sequential
Sequential access is intended.
Definition: cache_hint.h:36
file_base_crtp.h
tpie::compressed_stream_base::m_offset
stream_size_type m_offset
Offset of next item to read/write, relative to beginning of stream.
Definition: stream.h:364
tpie::compression_none
@ compression_none
No written blocks should be compressed.
Definition: scheme.h:37
tpie::open::write_only
@ write_only
Open a file for writing only.
Definition: stream.h:44
tpie::compressed_stream_base::can_read
bool can_read()
Check if the next call to read() will succeed or not.
Definition: stream.h:328
tpie::compressed_stream_base::m_cachedReads
memory_size_type m_cachedReads
Number of cheap, unchecked reads we can do next.
Definition: stream.h:341
stream_position.h
tpie::compressed_stream_base::m_size
stream_size_type m_size
Number of logical items in the stream.
Definition: stream.h:346
tpie::file_stream::read
void read(IT const a, IT const b)
Precondition: is_open().
Definition: stream.h:471
tpie
Definition: access_type.h:26
tpie::open::read_only
@ read_only
Open a file for reading only.
Definition: stream.h:41