BufferDetail.hh 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * https://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef avro_BufferDetail_hh__
  19. #define avro_BufferDetail_hh__
  20. #include <boost/function.hpp>
  21. #include <boost/shared_array.hpp>
  22. #include <boost/shared_ptr.hpp>
  23. #include <boost/static_assert.hpp>
  24. #include <boost/utility.hpp>
  25. #include <utility>
  26. #ifdef HAVE_BOOST_ASIO
  27. #include <boost/asio/buffer.hpp>
  28. #endif
  29. #include <cassert>
  30. #include <deque>
  31. #include <exception>
  32. /**
  33. * \file BufferDetail.hh
  34. *
  35. * \brief The implementation details for the Buffer class.
  36. *
  37. **/
  38. namespace avro {
  39. namespace detail {
  40. typedef char data_type;
  41. typedef size_t size_type;
  42. #ifdef HAVE_BOOST_ASIO
  43. typedef boost::asio::const_buffer ConstAsioBuffer;
  44. typedef boost::asio::mutable_buffer MutableAsioBuffer;
  45. #endif
  46. /// The size in bytes for blocks backing buffer chunks.
  47. const size_type kMinBlockSize = 4096;
  48. const size_type kMaxBlockSize = 16384;
  49. const size_type kDefaultBlockSize = kMinBlockSize;
  50. typedef boost::function<void(void)> free_func;
  51. /**
  52. * Simple class to hold a functor that executes on delete
  53. **/
  54. class CallOnDestroy {
  55. public:
  56. explicit CallOnDestroy(free_func func) : func_(std::move(func)) {}
  57. ~CallOnDestroy() {
  58. if (func_) {
  59. func_();
  60. }
  61. }
  62. private:
  63. free_func func_;
  64. };
  65. /**
  66. * \brief A chunk is the building block for buffers.
  67. *
  68. * A chunk is backed by a memory block, and internally it maintains information
  69. * about which area of the block it may use, and the portion of this area that
  70. * contains valid data. More than one chunk may share the same underlying
  71. * block, but the areas should never overlap. Chunk holds a shared pointer to
  72. * an array of bytes so that shared blocks are reference counted.
  73. *
  74. * When a chunk is copied, the copy shares the same underlying buffer, but the
  75. * copy receives its own copies of the start/cursor/end pointers, so each copy
  76. * can be manipulated independently. This allows different buffers to share
  77. * the same non-overlapping parts of a chunk, or even overlapping parts of a
  78. * chunk if the situation arises.
  79. *
  80. **/
  81. class Chunk {
  82. public:
  83. /// Default constructor, allocates a new underlying block for this chunk.
  84. explicit Chunk(size_type size) : underlyingBlock_(new data_type[size]),
  85. readPos_(underlyingBlock_.get()),
  86. writePos_(readPos_),
  87. endPos_(readPos_ + size) {}
  88. /// Foreign buffer constructor, uses the supplied data for this chunk, and
  89. /// only for reading.
  90. Chunk(const data_type *data, size_type size, const free_func &func) : callOnDestroy_(new CallOnDestroy(func)),
  91. readPos_(const_cast<data_type *>(data)),
  92. writePos_(readPos_ + size),
  93. endPos_(writePos_) {}
  94. private:
  95. // reference counted object will call a functor when it's destroyed
  96. boost::shared_ptr<CallOnDestroy> callOnDestroy_;
  97. public:
  98. /// Remove readable bytes from the front of the chunk by advancing the
  99. /// chunk start position.
  100. void truncateFront(size_type howMuch) {
  101. readPos_ += howMuch;
  102. assert(readPos_ <= writePos_);
  103. }
  104. /// Remove readable bytes from the back of the chunk by moving the
  105. /// chunk cursor position.
  106. void truncateBack(size_type howMuch) {
  107. writePos_ -= howMuch;
  108. assert(readPos_ <= writePos_);
  109. }
  110. /// Tell the position the next byte may be written to.
  111. data_type *tellWritePos() const {
  112. return writePos_;
  113. }
  114. /// Tell the position of the first byte containing valid data.
  115. const data_type *tellReadPos() const {
  116. return readPos_;
  117. }
  118. /// After a write operation, increment the write position.
  119. void incrementCursor(size_type howMuch) {
  120. writePos_ += howMuch;
  121. assert(writePos_ <= endPos_);
  122. }
  123. /// Tell how many bytes of data were written to this chunk.
  124. size_type dataSize() const {
  125. return (writePos_ - readPos_);
  126. }
  127. /// Tell how many bytes this chunk has available to write to.
  128. size_type freeSize() const {
  129. return (endPos_ - writePos_);
  130. }
  131. /// Tell how many bytes of data this chunk can hold (used and free).
  132. size_type capacity() const {
  133. return (endPos_ - readPos_);
  134. }
  135. private:
  136. friend bool operator==(const Chunk &lhs, const Chunk &rhs);
  137. friend bool operator!=(const Chunk &lhs, const Chunk &rhs);
  138. // more than one buffer can share an underlying block, so use SharedPtr
  139. boost::shared_array<data_type> underlyingBlock_;
  140. data_type *readPos_; ///< The first readable byte in the block
  141. data_type *writePos_; ///< The end of written data and start of free space
  142. data_type *endPos_; ///< Marks the end of the usable block area
  143. };
  144. /**
  145. * Compare underlying buffers and return true if they are equal
  146. **/
  147. inline bool operator==(const Chunk &lhs, const Chunk &rhs) {
  148. return lhs.underlyingBlock_ == rhs.underlyingBlock_;
  149. }
  150. /**
  151. * Compare underlying buffers and return true if they are unequal
  152. **/
  153. inline bool operator!=(const Chunk &lhs, const Chunk &rhs) {
  154. return lhs.underlyingBlock_ != rhs.underlyingBlock_;
  155. }
  156. /**
  157. * \brief Implementation details for Buffer class
  158. *
  159. * Internally, BufferImpl keeps two lists of chunks, one list consists entirely of
  160. * chunks containing data, and one list which contains chunks with free space.
  161. *
  162. *
  163. */
  164. class BufferImpl : boost::noncopyable {
  165. /// Add a new chunk to the list of chunks for this buffer, growing the
  166. /// buffer by the default block size.
  167. void allocChunkChecked(size_type size = kDefaultBlockSize) {
  168. writeChunks_.push_back(Chunk(size));
  169. freeSpace_ += writeChunks_.back().freeSize();
  170. }
  171. /// Add a new chunk to the list of chunks for this buffer, growing the
  172. /// buffer by the requested size, but within the range of a minimum and
  173. /// maximum.
  174. void allocChunk(size_type size) {
  175. if (size < kMinBlockSize) {
  176. size = kMinBlockSize;
  177. } else if (size > kMaxBlockSize) {
  178. size = kMaxBlockSize;
  179. }
  180. allocChunkChecked(size);
  181. }
  182. /// Update the state of the chunks after a write operation. This function
  183. /// ensures the chunk states are consistent with the write.
  184. void postWrite(size_type size) {
  185. // precondition to this function is that the writeChunk_.front()
  186. // contains the data that was just written, so make sure writeChunks_
  187. // is not empty:
  188. assert(size <= freeSpace_ && !writeChunks_.empty());
  189. // This is probably the one tricky part of BufferImpl. The data that
  190. // was written now exists in writeChunks_.front(). Now we must make
  191. // sure that same data exists in readChunks_.back().
  192. //
  193. // There are two cases:
  194. //
  195. // 1. readChunks_.last() and writeChunk_.front() refer to the same
  196. // underlying block, in which case they both just need their cursor
  197. // updated to reflect the new state.
  198. //
  199. // 2. readChunk_.last() is not the same block as writeChunks_.front(),
  200. // in which case it should be, since the writeChunk.front() contains
  201. // the next bit of data that will be appended to readChunks_, and
  202. // therefore needs to be copied there so we can proceed with updating
  203. // their state.
  204. //
  205. // if readChunks_ is not the same as writeChunks_.front(), make a copy
  206. // of it there
  207. if (readChunks_.empty() || (readChunks_.back() != writeChunks_.front())) {
  208. const Chunk &curChunk = writeChunks_.front();
  209. readChunks_.push_back(curChunk);
  210. // Any data that existed in the write chunk previously doesn't
  211. // belong to this buffer (otherwise it would have already been
  212. // added to the readChunk_ list). Here, adjust the start of the
  213. // readChunk to begin after any data already existing in curChunk
  214. readChunks_.back().truncateFront(curChunk.dataSize());
  215. }
  216. assert(readChunks_.back().freeSize() == writeChunks_.front().freeSize());
  217. // update the states of both readChunks_ and writeChunks_ to indicate that they are
  218. // holding the new data
  219. readChunks_.back().incrementCursor(size);
  220. writeChunks_.front().incrementCursor(size);
  221. size_ += size;
  222. freeSpace_ -= size;
  223. // if there is no more free space in writeChunks_, the next write cannot use
  224. // it, so dispose of it now
  225. if (writeChunks_.front().freeSize() == 0) {
  226. writeChunks_.pop_front();
  227. }
  228. }
  229. public:
  230. typedef std::deque<Chunk> ChunkList;
  231. typedef boost::shared_ptr<BufferImpl> SharedPtr;
  232. typedef boost::shared_ptr<const BufferImpl> ConstSharedPtr;
  233. /// Default constructor, creates a buffer without any chunks
  234. BufferImpl() : freeSpace_(0),
  235. size_(0) {}
  236. /// Copy constructor, gets a copy of all the chunks with data.
  237. BufferImpl(const BufferImpl &src) : readChunks_(src.readChunks_),
  238. freeSpace_(0),
  239. size_(src.size_) {}
  240. /// Amount of data held in this buffer.
  241. size_type size() const {
  242. return size_;
  243. }
  244. /// Capacity that may be written before the buffer must allocate more memory.
  245. size_type freeSpace() const {
  246. return freeSpace_;
  247. }
  248. /// Add enough free chunks to make the reservation size available.
  249. /// Actual amount may be more (rounded up to next chunk).
  250. void reserveFreeSpace(size_type reserveSize) {
  251. while (freeSpace_ < reserveSize) {
  252. allocChunk(reserveSize - freeSpace_);
  253. }
  254. }
  255. /// Return the chunk avro's begin iterator for reading.
  256. ChunkList::const_iterator beginRead() const {
  257. return readChunks_.begin();
  258. }
  259. /// Return the chunk avro's end iterator for reading.
  260. ChunkList::const_iterator endRead() const {
  261. return readChunks_.end();
  262. }
  263. /// Return the chunk avro's begin iterator for writing.
  264. ChunkList::const_iterator beginWrite() const {
  265. return writeChunks_.begin();
  266. }
  267. /// Return the chunk avro's end iterator for writing.
  268. ChunkList::const_iterator endWrite() const {
  269. return writeChunks_.end();
  270. }
  271. /// Write a single value to buffer, add a new chunk if necessary.
  272. template<typename T>
  273. void writeTo(T val, const std::true_type &) {
  274. if (freeSpace_ && (sizeof(T) <= writeChunks_.front().freeSize())) {
  275. // fast path, there's enough room in the writeable chunk to just
  276. // straight out copy it
  277. *(reinterpret_cast<T *>(writeChunks_.front().tellWritePos())) = val;
  278. postWrite(sizeof(T));
  279. } else {
  280. // need to fixup chunks first, so use the regular memcpy
  281. // writeTo method
  282. writeTo(reinterpret_cast<data_type *>(&val), sizeof(T));
  283. }
  284. }
  285. /// An uninstantiable function, this is if boost::is_fundamental check fails,
  286. /// and will compile-time assert.
  287. template<typename T>
  288. void writeTo(T /*val*/, const std::false_type &) {
  289. BOOST_STATIC_ASSERT(sizeof(T) == 0);
  290. }
  291. /// Write a block of data to the buffer, adding new chunks if necessary.
  292. size_type writeTo(const data_type *data, size_type size) {
  293. size_type bytesLeft = size;
  294. while (bytesLeft) {
  295. if (freeSpace_ == 0) {
  296. allocChunkChecked();
  297. }
  298. Chunk &chunk = writeChunks_.front();
  299. size_type toCopy = std::min<size_type>(chunk.freeSize(), bytesLeft);
  300. assert(toCopy);
  301. memcpy(chunk.tellWritePos(), data, toCopy);
  302. postWrite(toCopy);
  303. data += toCopy;
  304. bytesLeft -= toCopy;
  305. }
  306. return size;
  307. }
  308. /// Update internal status of chunks after data is written using iterator.
  309. size_type wroteTo(size_type size) {
  310. assert(size <= freeSpace_);
  311. size_type bytesLeft = size;
  312. while (bytesLeft) {
  313. Chunk &chunk = writeChunks_.front();
  314. size_type wrote = std::min<size_type>(chunk.freeSize(), bytesLeft);
  315. assert(wrote);
  316. postWrite(wrote);
  317. bytesLeft -= wrote;
  318. }
  319. return size;
  320. }
  321. /// Append the chunks that have data in src to this buffer
  322. void append(const BufferImpl &src) {
  323. std::copy(src.readChunks_.begin(), src.readChunks_.end(), std::back_inserter(readChunks_));
  324. size_ += src.size_;
  325. }
  326. /// Remove all the chunks that contain data from this buffer.
  327. void discardData() {
  328. readChunks_.clear();
  329. size_ = 0;
  330. }
  331. /// Remove the specified amount of data from the chunks, starting at the front.
  332. void discardData(size_type bytes) {
  333. assert(bytes && bytes <= size_);
  334. size_type bytesToDiscard = bytes;
  335. while (bytesToDiscard) {
  336. size_t currentSize = readChunks_.front().dataSize();
  337. // see if entire chunk is discarded
  338. if (currentSize <= bytesToDiscard) {
  339. readChunks_.pop_front();
  340. bytesToDiscard -= currentSize;
  341. } else {
  342. readChunks_.front().truncateFront(bytesToDiscard);
  343. bytesToDiscard = 0;
  344. }
  345. }
  346. size_ -= bytes;
  347. }
  348. /// Remove the specified amount of data from the chunks, moving the
  349. /// data to dest's chunks
  350. void extractData(BufferImpl &dest, size_type bytes) {
  351. assert(bytes && bytes <= size_);
  352. size_type bytesToExtract = bytes;
  353. while (bytesToExtract) {
  354. size_t currentSize = readChunks_.front().dataSize();
  355. dest.readChunks_.push_back(readChunks_.front());
  356. // see if entire chunk was extracted
  357. if (currentSize <= bytesToExtract) {
  358. readChunks_.pop_front();
  359. bytesToExtract -= currentSize;
  360. } else {
  361. readChunks_.front().truncateFront(bytesToExtract);
  362. size_t excess = currentSize - bytesToExtract;
  363. dest.readChunks_.back().truncateBack(excess);
  364. bytesToExtract = 0;
  365. }
  366. }
  367. size_ -= bytes;
  368. dest.size_ += bytes;
  369. }
  370. /// Move data from this to the destination, leaving this buffer without data
  371. void extractData(BufferImpl &dest) {
  372. assert(dest.readChunks_.empty());
  373. dest.readChunks_.swap(readChunks_);
  374. dest.size_ = size_;
  375. size_ = 0;
  376. }
  377. /// Copy data to a different buffer by copying the chunks. It's
  378. /// a bit like extract, but without modifying the source buffer.
  379. static void copyData(BufferImpl &dest,
  380. ChunkList::const_iterator iter,
  381. size_type offset,
  382. size_type bytes) {
  383. // now we are positioned to start the copying, copy as many
  384. // chunks as we need, the first chunk may have a non-zero offset
  385. // if the data to copy is not at the start of the chunk
  386. size_type copied = 0;
  387. while (copied < bytes) {
  388. dest.readChunks_.push_back(*iter);
  389. // offset only applies in the first chunk,
  390. // all subsequent chunks are copied from the start
  391. dest.readChunks_.back().truncateFront(offset);
  392. offset = 0;
  393. copied += dest.readChunks_.back().dataSize();
  394. ++iter;
  395. }
  396. // if the last chunk copied has more bytes than we need, truncate it
  397. size_type excess = copied - bytes;
  398. dest.readChunks_.back().truncateBack(excess);
  399. dest.size_ += bytes;
  400. }
  401. /// The number of chunks containing data. Used for debugging.
  402. int numDataChunks() const {
  403. return readChunks_.size();
  404. }
  405. /// The number of chunks containing free space (note that an entire chunk
  406. /// may not be free). Used for debugging.
  407. int numFreeChunks() const {
  408. return writeChunks_.size();
  409. }
  410. /// Add unmanaged data to the buffer. The buffer will not automatically
  411. /// free the data, but it will call the supplied function when the data is
  412. /// no longer referenced by the buffer (or copies of the buffer).
  413. void appendForeignData(const data_type *data, size_type size, const free_func &func) {
  414. readChunks_.push_back(Chunk(data, size, func));
  415. size_ += size;
  416. }
  417. BufferImpl &operator=(const BufferImpl &src) = delete;
  418. private:
  419. ChunkList readChunks_; ///< chunks of this buffer containing data
  420. ChunkList writeChunks_; ///< chunks of this buffer containing free space
  421. size_type freeSpace_; ///< capacity of buffer before allocation required
  422. size_type size_; ///< amount of data in buffer
  423. };
  424. } // namespace detail
  425. } // namespace avro
  426. #endif