123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #ifndef avro_BufferReader_hh__
- #define avro_BufferReader_hh__
- #include "Buffer.hh"
- #include <type_traits>
- #ifdef min
- #undef min
- #endif
- /**
- * \file BufferReader.hh
- *
- * \brief Helper class for reading bytes from buffer in a streaming manner,
- * without the overhead of istreams.
- *
- **/
- namespace avro {
- /**
- * Helper class for reading bytes from buffer without worrying about
- * chunk boundaries. May read from an InputBuffer or OutputBuffer.
- *
- **/
- class AVRO_DECL BufferReader : private boost::noncopyable {
- public:
- typedef detail::data_type data_type;
- typedef detail::size_type size_type;
- private:
- size_type chunkRemaining() const {
- return iter_->dataSize() - chunkPos_;
- }
- void incrementChunk(size_type howMuch) {
- bytesRemaining_ -= howMuch;
- chunkPos_ += howMuch;
- if (chunkPos_ == iter_->dataSize()) {
- chunkPos_ = 0;
- ++iter_;
- }
- }
- void rewind() {
- iter_ = bufferImpl_->beginRead();
- bytesRemaining_ = bytes_;
- chunkPos_ = 0;
- }
- const data_type *addr() const {
- return iter_->tellReadPos() + chunkPos_;
- }
- public:
- explicit BufferReader(const InputBuffer &buf) : bufferImpl_(buf.pimpl_),
- iter_(bufferImpl_->beginRead()),
- bytes_(bufferImpl_->size()),
- bytesRemaining_(bytes_),
- chunkPos_(0) {}
- explicit BufferReader(const OutputBuffer &buf) : bufferImpl_(buf.pimpl_),
- iter_(bufferImpl_->beginRead()),
- bytes_(bufferImpl_->size()),
- bytesRemaining_(bytes_),
- chunkPos_(0) {}
- /**
- * How many bytes are still not read from this buffer.
- **/
- size_type bytesRemaining() const {
- return bytesRemaining_;
- }
- /**
- * Read a block of data from the front of the buffer.
- **/
- size_type bytesRead() const {
- return bytes_ - bytesRemaining_;
- }
- /**
- * Read a block of data from the buffer.
- **/
- size_type read(data_type *data, size_type size) {
- if (size > bytesRemaining_) {
- size = bytesRemaining_;
- }
- size_type sizeToRead = size;
- while (sizeToRead) {
- const size_type toRead = std::min(sizeToRead, chunkRemaining());
- memcpy(data, addr(), toRead);
- sizeToRead -= toRead;
- data += toRead;
- incrementChunk(toRead);
- }
- return size;
- }
- /**
- * Read a block of data from the buffer.
- **/
- bool read(std::string &str, size_type size) {
- if (size > bytesRemaining_) {
- return false;
- }
- if (size <= chunkRemaining()) {
- fastStringRead(str, size);
- } else {
- slowStringRead(str, size);
- }
- return true;
- }
- /**
- * Read a single value from the buffer. The value must be a "fundamental"
- * type, e.g. int, float, etc. (otherwise use the other writeTo tests).
- *
- **/
- template<typename T>
- bool read(T &val) {
- return read(val, std::is_fundamental<T>());
- }
- /**
- * Skips a block of data from the buffer.
- **/
- bool skip(size_type bytes) {
- bool skipped = false;
- if (bytes <= bytesRemaining_) {
- doSkip(bytes);
- skipped = true;
- }
- return skipped;
- }
- /**
- * Seek to a position in the buffer.
- **/
- bool seek(size_type pos) {
- if (pos > bytes_) {
- return false;
- }
- size_type toSkip = pos;
- size_type curPos = bytesRead();
- // if the seek position is ahead, we can use skip to get there
- if (pos >= curPos) {
- toSkip -= curPos;
- }
- // if the seek position is ahead of the start of the chunk we can back up to
- // start of the chunk
- else if (pos >= (curPos - chunkPos_)) {
- curPos -= chunkPos_;
- bytesRemaining_ += chunkPos_;
- chunkPos_ = 0;
- toSkip -= curPos;
- } else {
- rewind();
- }
- doSkip(toSkip);
- return true;
- }
- bool peek(char &val) {
- bool ret = (bytesRemaining_ > 0);
- if (ret) {
- val = *(addr());
- }
- return ret;
- }
- InputBuffer copyData(size_type bytes) {
- if (bytes > bytesRemaining_) {
- // force no copy
- bytes = 0;
- }
- detail::BufferImpl::SharedPtr newImpl(new detail::BufferImpl);
- if (bytes) {
- bufferImpl_->copyData(*newImpl, iter_, chunkPos_, bytes);
- doSkip(bytes);
- }
- return InputBuffer(newImpl);
- }
- private:
- void doSkip(size_type sizeToSkip) {
- while (sizeToSkip) {
- const size_type toSkip = std::min(sizeToSkip, chunkRemaining());
- sizeToSkip -= toSkip;
- incrementChunk(toSkip);
- }
- }
- template<typename T>
- bool read(T &val, const std::true_type &) {
- if (sizeof(T) > bytesRemaining_) {
- return false;
- }
- if (sizeof(T) <= chunkRemaining()) {
- val = *(reinterpret_cast<const T *>(addr()));
- incrementChunk(sizeof(T));
- } else {
- read(reinterpret_cast<data_type *>(&val), sizeof(T));
- }
- return true;
- }
- /// An uninstantiable function, that is if boost::is_fundamental check fails
- template<typename T>
- bool read(T &val, const std::false_type &) {
- static_assert(sizeof(T) == 0, "Not a valid type to read");
- return false;
- }
- void fastStringRead(std::string &str, size_type sizeToCopy) {
- str.assign(addr(), sizeToCopy);
- incrementChunk(sizeToCopy);
- }
- void slowStringRead(std::string &str, size_type sizeToCopy) {
- str.clear();
- str.reserve(sizeToCopy);
- while (sizeToCopy) {
- const size_type toCopy = std::min(sizeToCopy, chunkRemaining());
- str.append(addr(), toCopy);
- sizeToCopy -= toCopy;
- incrementChunk(toCopy);
- }
- }
- detail::BufferImpl::ConstSharedPtr bufferImpl_;
- detail::BufferImpl::ChunkList::const_iterator iter_;
- size_type bytes_;
- size_type bytesRemaining_;
- size_type chunkPos_;
- };
- } // namespace avro
- #endif
|