123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515 |
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #ifndef avro_BufferDetail_hh__
- #define avro_BufferDetail_hh__
- #include <boost/function.hpp>
- #include <boost/shared_array.hpp>
- #include <boost/shared_ptr.hpp>
- #include <boost/static_assert.hpp>
- #include <boost/utility.hpp>
- #include <utility>
- #ifdef HAVE_BOOST_ASIO
- #include <boost/asio/buffer.hpp>
- #endif
- #include <cassert>
- #include <deque>
- #include <exception>
- /**
- * \file BufferDetail.hh
- *
- * \brief The implementation details for the Buffer class.
- *
- **/
- namespace avro {
- namespace detail {
- typedef char data_type;
- typedef size_t size_type;
- #ifdef HAVE_BOOST_ASIO
- typedef boost::asio::const_buffer ConstAsioBuffer;
- typedef boost::asio::mutable_buffer MutableAsioBuffer;
- #endif
- /// The size in bytes for blocks backing buffer chunks.
- const size_type kMinBlockSize = 4096;
- const size_type kMaxBlockSize = 16384;
- const size_type kDefaultBlockSize = kMinBlockSize;
- typedef boost::function<void(void)> free_func;
- /**
- * Simple class to hold a functor that executes on delete
- **/
- class CallOnDestroy {
- public:
- explicit CallOnDestroy(free_func func) : func_(std::move(func)) {}
- ~CallOnDestroy() {
- if (func_) {
- func_();
- }
- }
- private:
- free_func func_;
- };
- /**
- * \brief A chunk is the building block for buffers.
- *
- * A chunk is backed by a memory block, and internally it maintains information
- * about which area of the block it may use, and the portion of this area that
- * contains valid data. More than one chunk may share the same underlying
- * block, but the areas should never overlap. Chunk holds a shared pointer to
- * an array of bytes so that shared blocks are reference counted.
- *
- * When a chunk is copied, the copy shares the same underlying buffer, but the
- * copy receives its own copies of the start/cursor/end pointers, so each copy
- * can be manipulated independently. This allows different buffers to share
- * the same non-overlapping parts of a chunk, or even overlapping parts of a
- * chunk if the situation arises.
- *
- **/
- class Chunk {
- public:
- /// Default constructor, allocates a new underlying block for this chunk.
- explicit Chunk(size_type size) : underlyingBlock_(new data_type[size]),
- readPos_(underlyingBlock_.get()),
- writePos_(readPos_),
- endPos_(readPos_ + size) {}
- /// Foreign buffer constructor, uses the supplied data for this chunk, and
- /// only for reading.
- Chunk(const data_type *data, size_type size, const free_func &func) : callOnDestroy_(new CallOnDestroy(func)),
- readPos_(const_cast<data_type *>(data)),
- writePos_(readPos_ + size),
- endPos_(writePos_) {}
- private:
- // reference counted object will call a functor when it's destroyed
- boost::shared_ptr<CallOnDestroy> callOnDestroy_;
- public:
- /// Remove readable bytes from the front of the chunk by advancing the
- /// chunk start position.
- void truncateFront(size_type howMuch) {
- readPos_ += howMuch;
- assert(readPos_ <= writePos_);
- }
- /// Remove readable bytes from the back of the chunk by moving the
- /// chunk cursor position.
- void truncateBack(size_type howMuch) {
- writePos_ -= howMuch;
- assert(readPos_ <= writePos_);
- }
- /// Tell the position the next byte may be written to.
- data_type *tellWritePos() const {
- return writePos_;
- }
- /// Tell the position of the first byte containing valid data.
- const data_type *tellReadPos() const {
- return readPos_;
- }
- /// After a write operation, increment the write position.
- void incrementCursor(size_type howMuch) {
- writePos_ += howMuch;
- assert(writePos_ <= endPos_);
- }
- /// Tell how many bytes of data were written to this chunk.
- size_type dataSize() const {
- return (writePos_ - readPos_);
- }
- /// Tell how many bytes this chunk has available to write to.
- size_type freeSize() const {
- return (endPos_ - writePos_);
- }
- /// Tell how many bytes of data this chunk can hold (used and free).
- size_type capacity() const {
- return (endPos_ - readPos_);
- }
- private:
- friend bool operator==(const Chunk &lhs, const Chunk &rhs);
- friend bool operator!=(const Chunk &lhs, const Chunk &rhs);
- // more than one buffer can share an underlying block, so use SharedPtr
- boost::shared_array<data_type> underlyingBlock_;
- data_type *readPos_; ///< The first readable byte in the block
- data_type *writePos_; ///< The end of written data and start of free space
- data_type *endPos_; ///< Marks the end of the usable block area
- };
- /**
- * Compare underlying buffers and return true if they are equal
- **/
- inline bool operator==(const Chunk &lhs, const Chunk &rhs) {
- return lhs.underlyingBlock_ == rhs.underlyingBlock_;
- }
- /**
- * Compare underlying buffers and return true if they are unequal
- **/
- inline bool operator!=(const Chunk &lhs, const Chunk &rhs) {
- return lhs.underlyingBlock_ != rhs.underlyingBlock_;
- }
- /**
- * \brief Implementation details for Buffer class
- *
- * Internally, BufferImpl keeps two lists of chunks, one list consists entirely of
- * chunks containing data, and one list which contains chunks with free space.
- *
- *
- */
- class BufferImpl : boost::noncopyable {
- /// Add a new chunk to the list of chunks for this buffer, growing the
- /// buffer by the default block size.
- void allocChunkChecked(size_type size = kDefaultBlockSize) {
- writeChunks_.push_back(Chunk(size));
- freeSpace_ += writeChunks_.back().freeSize();
- }
- /// Add a new chunk to the list of chunks for this buffer, growing the
- /// buffer by the requested size, but within the range of a minimum and
- /// maximum.
- void allocChunk(size_type size) {
- if (size < kMinBlockSize) {
- size = kMinBlockSize;
- } else if (size > kMaxBlockSize) {
- size = kMaxBlockSize;
- }
- allocChunkChecked(size);
- }
- /// Update the state of the chunks after a write operation. This function
- /// ensures the chunk states are consistent with the write.
- void postWrite(size_type size) {
- // precondition to this function is that the writeChunk_.front()
- // contains the data that was just written, so make sure writeChunks_
- // is not empty:
- assert(size <= freeSpace_ && !writeChunks_.empty());
- // This is probably the one tricky part of BufferImpl. The data that
- // was written now exists in writeChunks_.front(). Now we must make
- // sure that same data exists in readChunks_.back().
- //
- // There are two cases:
- //
- // 1. readChunks_.last() and writeChunk_.front() refer to the same
- // underlying block, in which case they both just need their cursor
- // updated to reflect the new state.
- //
- // 2. readChunk_.last() is not the same block as writeChunks_.front(),
- // in which case it should be, since the writeChunk.front() contains
- // the next bit of data that will be appended to readChunks_, and
- // therefore needs to be copied there so we can proceed with updating
- // their state.
- //
- // if readChunks_ is not the same as writeChunks_.front(), make a copy
- // of it there
- if (readChunks_.empty() || (readChunks_.back() != writeChunks_.front())) {
- const Chunk &curChunk = writeChunks_.front();
- readChunks_.push_back(curChunk);
- // Any data that existed in the write chunk previously doesn't
- // belong to this buffer (otherwise it would have already been
- // added to the readChunk_ list). Here, adjust the start of the
- // readChunk to begin after any data already existing in curChunk
- readChunks_.back().truncateFront(curChunk.dataSize());
- }
- assert(readChunks_.back().freeSize() == writeChunks_.front().freeSize());
- // update the states of both readChunks_ and writeChunks_ to indicate that they are
- // holding the new data
- readChunks_.back().incrementCursor(size);
- writeChunks_.front().incrementCursor(size);
- size_ += size;
- freeSpace_ -= size;
- // if there is no more free space in writeChunks_, the next write cannot use
- // it, so dispose of it now
- if (writeChunks_.front().freeSize() == 0) {
- writeChunks_.pop_front();
- }
- }
- public:
- typedef std::deque<Chunk> ChunkList;
- typedef boost::shared_ptr<BufferImpl> SharedPtr;
- typedef boost::shared_ptr<const BufferImpl> ConstSharedPtr;
- /// Default constructor, creates a buffer without any chunks
- BufferImpl() : freeSpace_(0),
- size_(0) {}
- /// Copy constructor, gets a copy of all the chunks with data.
- BufferImpl(const BufferImpl &src) : readChunks_(src.readChunks_),
- freeSpace_(0),
- size_(src.size_) {}
- /// Amount of data held in this buffer.
- size_type size() const {
- return size_;
- }
- /// Capacity that may be written before the buffer must allocate more memory.
- size_type freeSpace() const {
- return freeSpace_;
- }
- /// Add enough free chunks to make the reservation size available.
- /// Actual amount may be more (rounded up to next chunk).
- void reserveFreeSpace(size_type reserveSize) {
- while (freeSpace_ < reserveSize) {
- allocChunk(reserveSize - freeSpace_);
- }
- }
- /// Return the chunk avro's begin iterator for reading.
- ChunkList::const_iterator beginRead() const {
- return readChunks_.begin();
- }
- /// Return the chunk avro's end iterator for reading.
- ChunkList::const_iterator endRead() const {
- return readChunks_.end();
- }
- /// Return the chunk avro's begin iterator for writing.
- ChunkList::const_iterator beginWrite() const {
- return writeChunks_.begin();
- }
- /// Return the chunk avro's end iterator for writing.
- ChunkList::const_iterator endWrite() const {
- return writeChunks_.end();
- }
- /// Write a single value to buffer, add a new chunk if necessary.
- template<typename T>
- void writeTo(T val, const std::true_type &) {
- if (freeSpace_ && (sizeof(T) <= writeChunks_.front().freeSize())) {
- // fast path, there's enough room in the writeable chunk to just
- // straight out copy it
- *(reinterpret_cast<T *>(writeChunks_.front().tellWritePos())) = val;
- postWrite(sizeof(T));
- } else {
- // need to fixup chunks first, so use the regular memcpy
- // writeTo method
- writeTo(reinterpret_cast<data_type *>(&val), sizeof(T));
- }
- }
- /// An uninstantiable function, this is if boost::is_fundamental check fails,
- /// and will compile-time assert.
- template<typename T>
- void writeTo(T /*val*/, const std::false_type &) {
- BOOST_STATIC_ASSERT(sizeof(T) == 0);
- }
- /// Write a block of data to the buffer, adding new chunks if necessary.
- size_type writeTo(const data_type *data, size_type size) {
- size_type bytesLeft = size;
- while (bytesLeft) {
- if (freeSpace_ == 0) {
- allocChunkChecked();
- }
- Chunk &chunk = writeChunks_.front();
- size_type toCopy = std::min<size_type>(chunk.freeSize(), bytesLeft);
- assert(toCopy);
- memcpy(chunk.tellWritePos(), data, toCopy);
- postWrite(toCopy);
- data += toCopy;
- bytesLeft -= toCopy;
- }
- return size;
- }
- /// Update internal status of chunks after data is written using iterator.
- size_type wroteTo(size_type size) {
- assert(size <= freeSpace_);
- size_type bytesLeft = size;
- while (bytesLeft) {
- Chunk &chunk = writeChunks_.front();
- size_type wrote = std::min<size_type>(chunk.freeSize(), bytesLeft);
- assert(wrote);
- postWrite(wrote);
- bytesLeft -= wrote;
- }
- return size;
- }
- /// Append the chunks that have data in src to this buffer
- void append(const BufferImpl &src) {
- std::copy(src.readChunks_.begin(), src.readChunks_.end(), std::back_inserter(readChunks_));
- size_ += src.size_;
- }
- /// Remove all the chunks that contain data from this buffer.
- void discardData() {
- readChunks_.clear();
- size_ = 0;
- }
- /// Remove the specified amount of data from the chunks, starting at the front.
- void discardData(size_type bytes) {
- assert(bytes && bytes <= size_);
- size_type bytesToDiscard = bytes;
- while (bytesToDiscard) {
- size_t currentSize = readChunks_.front().dataSize();
- // see if entire chunk is discarded
- if (currentSize <= bytesToDiscard) {
- readChunks_.pop_front();
- bytesToDiscard -= currentSize;
- } else {
- readChunks_.front().truncateFront(bytesToDiscard);
- bytesToDiscard = 0;
- }
- }
- size_ -= bytes;
- }
- /// Remove the specified amount of data from the chunks, moving the
- /// data to dest's chunks
- void extractData(BufferImpl &dest, size_type bytes) {
- assert(bytes && bytes <= size_);
- size_type bytesToExtract = bytes;
- while (bytesToExtract) {
- size_t currentSize = readChunks_.front().dataSize();
- dest.readChunks_.push_back(readChunks_.front());
- // see if entire chunk was extracted
- if (currentSize <= bytesToExtract) {
- readChunks_.pop_front();
- bytesToExtract -= currentSize;
- } else {
- readChunks_.front().truncateFront(bytesToExtract);
- size_t excess = currentSize - bytesToExtract;
- dest.readChunks_.back().truncateBack(excess);
- bytesToExtract = 0;
- }
- }
- size_ -= bytes;
- dest.size_ += bytes;
- }
- /// Move data from this to the destination, leaving this buffer without data
- void extractData(BufferImpl &dest) {
- assert(dest.readChunks_.empty());
- dest.readChunks_.swap(readChunks_);
- dest.size_ = size_;
- size_ = 0;
- }
- /// Copy data to a different buffer by copying the chunks. It's
- /// a bit like extract, but without modifying the source buffer.
- static void copyData(BufferImpl &dest,
- ChunkList::const_iterator iter,
- size_type offset,
- size_type bytes) {
- // now we are positioned to start the copying, copy as many
- // chunks as we need, the first chunk may have a non-zero offset
- // if the data to copy is not at the start of the chunk
- size_type copied = 0;
- while (copied < bytes) {
- dest.readChunks_.push_back(*iter);
- // offset only applies in the first chunk,
- // all subsequent chunks are copied from the start
- dest.readChunks_.back().truncateFront(offset);
- offset = 0;
- copied += dest.readChunks_.back().dataSize();
- ++iter;
- }
- // if the last chunk copied has more bytes than we need, truncate it
- size_type excess = copied - bytes;
- dest.readChunks_.back().truncateBack(excess);
- dest.size_ += bytes;
- }
- /// The number of chunks containing data. Used for debugging.
- int numDataChunks() const {
- return readChunks_.size();
- }
- /// The number of chunks containing free space (note that an entire chunk
- /// may not be free). Used for debugging.
- int numFreeChunks() const {
- return writeChunks_.size();
- }
- /// Add unmanaged data to the buffer. The buffer will not automatically
- /// free the data, but it will call the supplied function when the data is
- /// no longer referenced by the buffer (or copies of the buffer).
- void appendForeignData(const data_type *data, size_type size, const free_func &func) {
- readChunks_.push_back(Chunk(data, size, func));
- size_ += size;
- }
- BufferImpl &operator=(const BufferImpl &src) = delete;
- private:
- ChunkList readChunks_; ///< chunks of this buffer containing data
- ChunkList writeChunks_; ///< chunks of this buffer containing free space
- size_type freeSpace_; ///< capacity of buffer before allocation required
- size_type size_; ///< amount of data in buffer
- };
- } // namespace detail
- } // namespace avro
- #endif
|