TPIE

11a2c2d
serialized_store.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2014 The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef _TPIE_BTREE_SERIALIZED_STORE_H_
21 #define _TPIE_BTREE_SERIALIZED_STORE_H_
22 
23 #include <tpie/portability.h>
24 #include <tpie/btree/base.h>
25 #include <tpie/tpie_assert.h>
26 #include <tpie/serialization2.h>
27 #include <cstddef>
28 #include <fstream>
29 
30 #ifdef TPIE_HAS_LZ4
31 #include <lz4.h>
32 #endif
33 #ifdef TPIE_HAS_ZSTD
34 #include <zstd.h>
35 #endif
36 #ifdef TPIE_HAS_SNAPPY
37 #include <snappy.h>
38 #endif
39 
40 namespace tpie {
41 namespace bbits {
42 
51 template <typename T,
52  typename A,
53  std::size_t a_,
54  std::size_t b_,
55  std::size_t bs_
56  >
57 class serialized_store {
58 public:
62  typedef T value_type;
63 
67  typedef A augment_type;
68 
69 
70  typedef size_t size_type;
71 
72  typedef uint64_t off_t;
73 
74 
75  serialized_store(const serialized_store & o) = delete;
76  serialized_store & operator=(const serialized_store & o) = delete;
77  serialized_store(serialized_store && o) = default;
78 
79  serialized_store & operator=(serialized_store && o) {
80  this->~serialized_store();
81  new (this) serialized_store(o);
82  return this;
83  }
84 
85 private:
86  struct internal_content {
87  off_t offset;
88  A augment;
89  };
90  static_assert(std::is_trivially_copyable<internal_content>::value, "must be trivially copyable.");
91 
92  static constexpr size_t block_size() {return bs_?bs_:24*1024;}
93  static constexpr size_t min_internal_size() {return 1;}
94  static constexpr size_t max_internal_size() {return a_ ? a_ : (block_size() - sizeof(off_t) - sizeof(size_t)) / sizeof(internal_content) ; }
95  static constexpr size_t min_leaf_size() {return 1;}
96  static constexpr size_t max_leaf_size() {return b_ ? b_ : (block_size() - sizeof(off_t) - sizeof(size_t)) / sizeof(T);}
97 
98  class serilization_buffer {
99  public:
100  serilization_buffer() = default;
101  serilization_buffer(size_t n) : m_buffer(n) {}
102 
103  void read(char * buf, size_t size) {
104  assert(m_index + size <= m_buffer.size());
105  memcpy(buf, m_buffer.data() + m_index, size);
106  m_index += size;
107  }
108 
109  void write(const char * buf, size_t size) {
110  if (m_index + size > m_buffer.size()) {
111  m_buffer.resize(m_index + size);
112  }
113  memcpy(m_buffer.data() + m_index, buf, size);
114  m_index += size;
115  }
116 
117  size_t size() {
118  return m_buffer.size();
119  }
120 
121  char * data() {
122  return m_buffer.data();
123  }
124 
125  private:
126  std::vector<char> m_buffer;
127  size_t m_index = 0;
128  };
129 
130  template <typename S, typename N>
131  void serialize(S & s, const N & i) const {
132  using tpie::serialize;
133 
134  auto compression_type = m_flags & btree_flags::compression_mask;
135 
136  if (compression_type == btree_flags::compress_none) {
137  serialize(s, i.count);
138  serialize(s, i.values, i.values + i.count);
139  return;
140  }
141 
142  serilization_buffer uncompressed_buffer(sizeof(i.count) + sizeof(*i.values) * i.count);
143  serialize(uncompressed_buffer, i.count);
144  serialize(uncompressed_buffer, i.values, i.values + i.count);
145 
146  int32_t uncompressed_size = (int32_t)uncompressed_buffer.size();
147 
148  std::vector<char> compressed_buffer;
149  int32_t compressed_size;
150 
151  switch (compression_type) {
152 #ifdef TPIE_HAS_LZ4
153  case btree_flags::compress_lz4: {
154  auto max_compressed_size = LZ4_compressBound(uncompressed_size);
155  compressed_buffer.resize((size_t)max_compressed_size);
156 
157  compressed_size = LZ4_compress_default(uncompressed_buffer.data(), compressed_buffer.data(),
158  uncompressed_size, max_compressed_size);
159  if (compressed_size == 0)
160  throw io_exception("B-tree compression failed");
161 
162  break;
163  }
164 #endif
165 #ifdef TPIE_HAS_ZSTD
166  case compress_zstd: {
167  auto max_compressed_size = ZSTD_compressBound((size_t)uncompressed_size);
168  compressed_buffer.resize(max_compressed_size);
169 
170  int level = (int)((uint64_t)(m_flags & btree_flags::compression_level_mask) >> 8);
171  if (level == 0) level = 5;
172 
173  size_t r = ZSTD_compress(compressed_buffer.data(), max_compressed_size, uncompressed_buffer.data(), uncompressed_size, level);
174  if (ZSTD_isError(r))
175  throw io_exception("B-tree compression failed");
176 
177  compressed_size = (int32_t)r;
178  break;
179  }
180 #endif
181 #ifdef TPIE_HAS_SNAPPY
182  case compress_snappy: {
183  auto max_compressed_size = snappy::MaxCompressedLength((size_t) uncompressed_size);
184  compressed_buffer.resize(max_compressed_size);
185 
186  size_t _compressed_size;
187  snappy::RawCompress(uncompressed_buffer.data(), uncompressed_size,
188  compressed_buffer.data(), &_compressed_size);
189 
190  compressed_size = (int32_t) _compressed_size;
191  break;
192  }
193 #endif
194  default:
195  throw exception("Unknown compression, this code shouldn't be reachable");
196  }
197 
198  s.write(reinterpret_cast<char *>(&uncompressed_size), sizeof(uncompressed_size));
199  s.write(reinterpret_cast<char *>(&compressed_size), sizeof(compressed_size));
200  s.write(compressed_buffer.data(), compressed_size);
201  }
202 
203  template <typename D, typename N>
204  void unserialize(D & d, N & i) const {
205  using tpie::unserialize;
206 
207  auto compression_type = m_flags & btree_flags::compression_mask;
208 
209  if (compression_type == btree_flags::compress_none) {
210  unserialize(d, i.count);
211  unserialize(d, i.values, i.values + i.count);
212  return;
213  }
214 
215  int32_t uncompressed_size, compressed_size;
216  d.read(reinterpret_cast<char *>(&uncompressed_size), sizeof(uncompressed_size));
217  d.read(reinterpret_cast<char *>(&compressed_size), sizeof(compressed_size));
218 
219  std::vector<char> compressed_buffer((size_t)compressed_size);
220  d.read(compressed_buffer.data(), compressed_size);
221 
222  serilization_buffer uncompressed_buffer((size_t)uncompressed_size);
223 
224  switch (m_flags & btree_flags::compression_mask) {
225 #ifdef TPIE_HAS_LZ4
226  case compress_lz4: {
227  int r = LZ4_decompress_fast(compressed_buffer.data(), uncompressed_buffer.data(), uncompressed_size);
228  if (r != compressed_size)
229  throw io_exception("B-tree decompression failed");
230  break;
231  }
232 #endif
233 #ifdef TPIE_HAS_ZSTD
234  case compress_zstd: {
235  size_t r = ZSTD_decompress(uncompressed_buffer.data(), (size_t)uncompressed_size, compressed_buffer.data(), (size_t)compressed_size);
236  if (ZSTD_isError(r) || (int32_t)r != uncompressed_size)
237  throw io_exception("B-tree decompression failed");
238  break;
239  }
240 #endif
241 #ifdef TPIE_HAS_SNAPPY
242  case compress_snappy: {
243  bool ok = snappy::RawUncompress(compressed_buffer.data(), (size_t)compressed_size, uncompressed_buffer.data());
244  if (!ok)
245  throw io_exception("B-tree decompression failed");
246  break;
247  }
248 #endif
249  default:
250  throw exception("Unknown compression, this code shouldn't be reachable");
251  }
252 
253  unserialize(uncompressed_buffer, i.count);
254  unserialize(uncompressed_buffer, i.values, i.values + i.count);
255  }
256 
257  struct internal {
258  off_t my_offset; //NOTE not serialized
259  size_t count;
260  internal_content values[max_internal_size()];
261  };
262 
263  struct leaf {
264  off_t my_offset; //NOTE not serialized
265  size_t count;
266  T values[max_leaf_size()];
267  };
268 
269  class leaf_type {
270  std::shared_ptr<leaf> ptr;
271 
272  public:
273  leaf_type() = default;
274  leaf_type(const leaf_type &) = default;
275  leaf_type(leaf_type &&) = default;
276 
277  leaf_type & operator=(const leaf_type &) = default;
278  leaf_type & operator=(leaf_type &&) = default;
279 
280  leaf_type(off_t offset) {
281  ptr = std::make_shared<leaf>();
282  ptr->my_offset = offset;
283  }
284 
285  leaf & operator*() const noexcept {
286  return *ptr;
287  }
288 
289  leaf * operator->() const noexcept {
290  return ptr.operator->();
291  }
292 
293  void reset() noexcept {
294  ptr.reset();
295  }
296 
297  explicit operator bool() const noexcept {
298  return bool(ptr);
299  }
300 
301  bool operator==(const leaf_type & o) const noexcept {
302  return ptr->my_offset == o->my_offset;
303  }
304 
305  bool operator!=(const leaf_type & o) const noexcept {
306  return !(this == o);
307  }
308  };
309 
310 
311  struct header_v0 {
312  /*
313  * Version 0: initial
314  * Version 1: added flags
315  */
316  static constexpr uint64_t good_magic = 0x8bbd51bfe5e3d477, current_version = 1;
317  uint64_t magic;
318  uint64_t version; // 0
319  off_t root; // offset of root
320  size_t height; // tree height (internal and leaf levels)
321  size_t size; // number of items (from btree)
322  off_t metadata_offset;
323  off_t metadata_size;
324  };
325 
326  struct header : header_v0 {
327  btree_flags flags;
328  };
329 
330  typedef std::shared_ptr<internal> internal_type;
331 
332  void set_flags(btree_flags flags) {
333  m_flags = flags;
334  switch (flags & btree_flags::compression_mask) {
335  case btree_flags::compress_lz4:
336 #ifndef TPIE_HAS_LZ4
337  throw exception("Can't use a LZ4 compressed B-tree without LZ4 installed");
338 #endif
339  break;
340  case btree_flags::compress_zstd:
341 #ifndef TPIE_HAS_ZSTD
342  throw exception("Can't use a ZSTD compressed B-tree without ZSTD installed");
343 #endif
344  break;
345  case btree_flags::compress_snappy:
346 #ifndef TPIE_HAS_SNAPPY
347  throw exception("Can't use a snappy compressed B-tree without snappy installed");
348 #endif
349  break;
350  case btree_flags::compress_none:
351  break;
352  default:
353  throw exception("Unknown compression");
354  }
355  }
356 
362  explicit serialized_store(const std::string & path, btree_flags flags=btree_flags::defaults):
363  m_height(0), m_size(0), metadata_offset(0), metadata_size(0), path(path) {
364  f.reset(new std::fstream());
365  header h;
366  if ((flags & btree_flags::read) == 0) {
367  if ((flags & btree_flags::write) == 0)
368  throw invalid_file_exception("Either read or write must be supplied to serialized store");
369  f->open(path, std::ios_base::out | std::ios_base::trunc | std::ios_base::binary);
370  if (!f->is_open())
371  throw invalid_file_exception("Open failed");
372  memset(&h, 0, sizeof(h));
373  h.flags = flags;
374  set_flags(flags);
375  f->write(reinterpret_cast<char *>(&h), sizeof(h));
376  } else {
377  f->open(path, std::ios_base::in | std::ios_base::binary);
378  if (!f->is_open())
379  throw invalid_file_exception("Open failed");
380  f->read(reinterpret_cast<char *>(&h), sizeof(header_v0));
381  if (!*f)
382  throw invalid_file_exception("Unable to read header");
383 
384  if (h.magic != header::good_magic)
385  throw invalid_file_exception("Bad magic");
386 
387  if (h.version == 0) {
388  h.flags = btree_flags::defaults_v0;
389  } else if (h.version == 1) {
390  f->read(reinterpret_cast<char *>(&h.flags), sizeof(h.flags));
391  } else {
392  throw invalid_file_exception("Bad version");
393  }
394 
395  m_height = h.height;
396  m_size = h.size;
397  metadata_offset = h.metadata_offset;
398  metadata_size = h.metadata_size;
399 
400  set_flags(h.flags);
401 
402  if (m_height == 1) {
403  root_leaf = leaf_type(h.root);
404  f->seekg(h.root);
405  unserialize(*f, *root_leaf);
406  } else if (m_height > 1) {
407  root_internal = std::make_shared<internal>();
408  root_internal->my_offset = h.root;
409  f->seekg(h.root);
410  unserialize(*f, *root_internal);
411  }
412  }
413  }
414 
415 
416  void move(internal_type src, size_t src_i,
417  internal_type dst, size_t dst_i) {
418  dst->values[dst_i] = src->values[src_i];
419  }
420 
421  void move(leaf_type src, size_t src_i,
422  leaf_type dst, size_t dst_i) {
423  dst->values[dst_i] = src->values[src_i];
424  }
425 
426  void set(leaf_type dst, size_t dst_i, T c) {
427  assert(dst == current_leaf);
428  dst->values[dst_i] = c;
429  }
430 
431  void set(internal_type node, size_t i, internal_type c) {
432  assert(node == current_internal);
433  node->values[i].offset = c->my_offset;
434  }
435 
436  void set(internal_type node, size_t i, leaf_type c) {
437  assert(node == current_internal);
438  node->values[i].offset = c->my_offset;
439  }
440 
441  const T & get(leaf_type l, size_t i) const {
442  return l->values[i];
443  }
444 
445  size_t count(internal_type node) const {
446  return node->count;
447  }
448 
449  size_t count(leaf_type node) const {
450  return node->count;
451  }
452 
453  void set_count(internal_type node, size_t i) {
454  node->count = i;
455  }
456 
457  void set_count(leaf_type node, size_t i) {
458  node->count = i;
459  }
460 
461  leaf_type create_leaf() {
462  assert(!current_internal && !current_leaf);
463  current_leaf = leaf_type((off_t)f->tellp());
464  return current_leaf;
465  }
466  leaf_type create(leaf_type) {return create_leaf();}
467  internal_type create_internal() {
468  assert(!current_internal && !current_leaf);
469  current_internal = std::make_shared<internal>();
470  current_internal->my_offset = (stream_size_type)f->tellp();
471  return current_internal;
472  }
473  internal_type create(internal_type) {return create_internal();}
474 
475  void set_root(internal_type node) {root_internal = node;}
476  void set_root(leaf_type node) {root_leaf = node;}
477 
478  internal_type get_root_internal() const {
479  return root_internal;
480  }
481 
482  leaf_type get_root_leaf() const {
483  return root_leaf;
484  }
485 
486  internal_type get_child_internal(internal_type node, size_t i) const {
487  internal_type child = std::make_shared<internal>();
488  assert(i < node->count);
489  child->my_offset = node->values[i].offset;
490  f->seekg(child->my_offset);
491  unserialize(*f, *child);
492  return child;
493  }
494 
495  leaf_type get_child_leaf(internal_type node, size_t i) const {
496  leaf_type child = leaf_type(node->values[i].offset);
497  assert(i < node->count);
498  f->seekg(child->my_offset);
499  unserialize(*f, *child);
500  return child;
501  }
502 
503  size_t index(off_t my_offset, internal_type node) const {
504  for (size_t i=0; i < node->count; ++i)
505  if (node->values[i].offset == my_offset) return i;
506  tp_assert(false, "Not found");
507  tpie_unreachable();
508  }
509 
510  size_t index(leaf_type l, internal_type node) const {
511  return index(l->my_offset, node);
512  }
513 
514  size_t index(internal_type i, internal_type node) const {
515  return index(i->my_offset, node);
516  }
517 
518  void set_augment(leaf_type l, internal_type p, augment_type ag) {
519  size_t idx = index(l->my_offset, p);
520  p->values[idx].augment = ag;
521  }
522 
523  void set_augment(internal_type i, internal_type p, augment_type ag) {
524  size_t idx = index(i->my_offset, p);
525  p->values[idx].augment = ag;
526  }
527 
528  const augment_type & augment(internal_type p, size_t i) const {
529  return p->values[i].augment;
530  }
531 
532  size_t height() const throw() {
533  return m_height;
534  }
535 
536  void set_height(size_t height) throw() {
537  m_height = height;
538  }
539 
540  size_t size() const throw() {
541  return m_size;
542  }
543 
544  void set_size(size_t size) throw() {
545  m_size = size;
546  }
547 
548  void flush() {
549  if (current_internal) {
550  assert(!current_leaf);
551  assert((stream_size_type)f->tellp() == current_internal->my_offset);
552  serialize(*f, *current_internal);
553  current_internal.reset();
554  }
555  if (current_leaf) {
556  assert((stream_size_type)f->tellp() == current_leaf->my_offset);
557  serialize(*f, *current_leaf);
558  current_leaf.reset();
559  }
560  }
561 
562  void finalize_build() {
563  // Should call flush() first.
564  assert(!current_internal && !current_leaf);
565 
566  header h;
567  h.magic = header::good_magic;
568  h.version = header::current_version;
569  h.root = 0;
570  if (root_internal) {
571  h.root = root_internal->my_offset;
572  } else if (root_leaf) {
573  h.root = root_leaf->my_offset;
574  } else {
575  assert(m_size == 0);
576  }
577  h.height = m_height;
578  h.size = m_size;
579  h.metadata_offset = metadata_offset;
580  h.metadata_size = metadata_size;
581  h.flags = m_flags;
582  f->seekp(0);
583  f->write(reinterpret_cast<char *>(&h), sizeof(h));
584  f->close();
585 
586  f->open(path, std::ios_base::in | std::ios_base::binary);
587  if (!f->is_open())
588  throw invalid_file_exception("Open failed");
589  }
590 
591  void set_metadata(const std::string & data) {
592  assert(!current_internal && !current_leaf);
593  assert(f->is_open());
594  metadata_offset = (stream_size_type)f->tellp();
595  metadata_size = data.size();
596  f->write(data.c_str(), data.size());
597  }
598 
599  std::string get_metadata() {
600  assert(f->is_open());
601  if (metadata_offset == 0 || metadata_size == 0)
602  return {};
603  std::string data(metadata_size, '\0');
604  f->read(&data[0], metadata_size);
605  return data;
606  }
607 
608  size_t m_height;
609  size_t m_size;
610  off_t metadata_offset, metadata_size;
611  btree_flags m_flags;
612 
613  std::string path;
614  std::unique_ptr<std::fstream> f;
615  internal_type current_internal, root_internal;
616  leaf_type current_leaf, root_leaf;
617 
618  template <typename>
619  friend class ::tpie::btree_node;
620 
621  template <typename>
622  friend class ::tpie::btree_iterator;
623 
624  template <typename, typename>
625  friend class bbits::tree;
626 
627  template <typename, typename>
628  friend class bbits::tree_state;
629 
630  template<typename, typename>
631  friend class bbits::builder;
632 
633  template <typename, bool>
634  friend struct bbits::block_size_getter;
635 };
636 
637 } //namespace bbits
638 } //namespace tpie
639 #endif /*_TPIE_BTREE_SERIALIZED_STORE_H_*/
tpie_assert.h
portability.h
tpie::bbits::serialized_store
Serializing store.
Definition: base.h:260
tpie::unserialize
void unserialize(S &src, foo &v)
Sample tpie::unserialize prototype.
serialization2.h
tpie::serialize
void serialize(D &dst, const foo &v)
Sample tpie::serialize prototype.
tp_assert
#define tp_assert(condition, message)
Definition: tpie_assert.h:65
tpie::bbits::serialized_store::value_type
T value_type
Type of value of items stored.
Definition: serialized_store.h:62
tpie::bbits::serialized_store::augment_type
A augment_type
Type of augmentation stored.
Definition: serialized_store.h:67
tpie
Definition: access_type.h:26