BufferReader.hh 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * https://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef avro_BufferReader_hh__
  19. #define avro_BufferReader_hh__
  20. #include "Buffer.hh"
  21. #include <type_traits>
  22. #ifdef min
  23. #undef min
  24. #endif
  25. /**
  26. * \file BufferReader.hh
  27. *
  28. * \brief Helper class for reading bytes from buffer in a streaming manner,
  29. * without the overhead of istreams.
  30. *
  31. **/
  32. namespace avro {
  33. /**
  34. * Helper class for reading bytes from buffer without worrying about
  35. * chunk boundaries. May read from an InputBuffer or OutputBuffer.
  36. *
  37. **/
  38. class AVRO_DECL BufferReader : private boost::noncopyable {
  39. public:
  40. typedef detail::data_type data_type;
  41. typedef detail::size_type size_type;
  42. private:
  43. size_type chunkRemaining() const {
  44. return iter_->dataSize() - chunkPos_;
  45. }
  46. void incrementChunk(size_type howMuch) {
  47. bytesRemaining_ -= howMuch;
  48. chunkPos_ += howMuch;
  49. if (chunkPos_ == iter_->dataSize()) {
  50. chunkPos_ = 0;
  51. ++iter_;
  52. }
  53. }
  54. void rewind() {
  55. iter_ = bufferImpl_->beginRead();
  56. bytesRemaining_ = bytes_;
  57. chunkPos_ = 0;
  58. }
  59. const data_type *addr() const {
  60. return iter_->tellReadPos() + chunkPos_;
  61. }
  62. public:
  63. explicit BufferReader(const InputBuffer &buf) : bufferImpl_(buf.pimpl_),
  64. iter_(bufferImpl_->beginRead()),
  65. bytes_(bufferImpl_->size()),
  66. bytesRemaining_(bytes_),
  67. chunkPos_(0) {}
  68. explicit BufferReader(const OutputBuffer &buf) : bufferImpl_(buf.pimpl_),
  69. iter_(bufferImpl_->beginRead()),
  70. bytes_(bufferImpl_->size()),
  71. bytesRemaining_(bytes_),
  72. chunkPos_(0) {}
  73. /**
  74. * How many bytes are still not read from this buffer.
  75. **/
  76. size_type bytesRemaining() const {
  77. return bytesRemaining_;
  78. }
  79. /**
  80. * Read a block of data from the front of the buffer.
  81. **/
  82. size_type bytesRead() const {
  83. return bytes_ - bytesRemaining_;
  84. }
  85. /**
  86. * Read a block of data from the buffer.
  87. **/
  88. size_type read(data_type *data, size_type size) {
  89. if (size > bytesRemaining_) {
  90. size = bytesRemaining_;
  91. }
  92. size_type sizeToRead = size;
  93. while (sizeToRead) {
  94. const size_type toRead = std::min(sizeToRead, chunkRemaining());
  95. memcpy(data, addr(), toRead);
  96. sizeToRead -= toRead;
  97. data += toRead;
  98. incrementChunk(toRead);
  99. }
  100. return size;
  101. }
  102. /**
  103. * Read a block of data from the buffer.
  104. **/
  105. bool read(std::string &str, size_type size) {
  106. if (size > bytesRemaining_) {
  107. return false;
  108. }
  109. if (size <= chunkRemaining()) {
  110. fastStringRead(str, size);
  111. } else {
  112. slowStringRead(str, size);
  113. }
  114. return true;
  115. }
  116. /**
  117. * Read a single value from the buffer. The value must be a "fundamental"
  118. * type, e.g. int, float, etc. (otherwise use the other writeTo tests).
  119. *
  120. **/
  121. template<typename T>
  122. bool read(T &val) {
  123. return read(val, std::is_fundamental<T>());
  124. }
  125. /**
  126. * Skips a block of data from the buffer.
  127. **/
  128. bool skip(size_type bytes) {
  129. bool skipped = false;
  130. if (bytes <= bytesRemaining_) {
  131. doSkip(bytes);
  132. skipped = true;
  133. }
  134. return skipped;
  135. }
  136. /**
  137. * Seek to a position in the buffer.
  138. **/
  139. bool seek(size_type pos) {
  140. if (pos > bytes_) {
  141. return false;
  142. }
  143. size_type toSkip = pos;
  144. size_type curPos = bytesRead();
  145. // if the seek position is ahead, we can use skip to get there
  146. if (pos >= curPos) {
  147. toSkip -= curPos;
  148. }
  149. // if the seek position is ahead of the start of the chunk we can back up to
  150. // start of the chunk
  151. else if (pos >= (curPos - chunkPos_)) {
  152. curPos -= chunkPos_;
  153. bytesRemaining_ += chunkPos_;
  154. chunkPos_ = 0;
  155. toSkip -= curPos;
  156. } else {
  157. rewind();
  158. }
  159. doSkip(toSkip);
  160. return true;
  161. }
  162. bool peek(char &val) {
  163. bool ret = (bytesRemaining_ > 0);
  164. if (ret) {
  165. val = *(addr());
  166. }
  167. return ret;
  168. }
  169. InputBuffer copyData(size_type bytes) {
  170. if (bytes > bytesRemaining_) {
  171. // force no copy
  172. bytes = 0;
  173. }
  174. detail::BufferImpl::SharedPtr newImpl(new detail::BufferImpl);
  175. if (bytes) {
  176. bufferImpl_->copyData(*newImpl, iter_, chunkPos_, bytes);
  177. doSkip(bytes);
  178. }
  179. return InputBuffer(newImpl);
  180. }
  181. private:
  182. void doSkip(size_type sizeToSkip) {
  183. while (sizeToSkip) {
  184. const size_type toSkip = std::min(sizeToSkip, chunkRemaining());
  185. sizeToSkip -= toSkip;
  186. incrementChunk(toSkip);
  187. }
  188. }
  189. template<typename T>
  190. bool read(T &val, const std::true_type &) {
  191. if (sizeof(T) > bytesRemaining_) {
  192. return false;
  193. }
  194. if (sizeof(T) <= chunkRemaining()) {
  195. val = *(reinterpret_cast<const T *>(addr()));
  196. incrementChunk(sizeof(T));
  197. } else {
  198. read(reinterpret_cast<data_type *>(&val), sizeof(T));
  199. }
  200. return true;
  201. }
  202. /// An uninstantiable function, that is if boost::is_fundamental check fails
  203. template<typename T>
  204. bool read(T &val, const std::false_type &) {
  205. static_assert(sizeof(T) == 0, "Not a valid type to read");
  206. return false;
  207. }
  208. void fastStringRead(std::string &str, size_type sizeToCopy) {
  209. str.assign(addr(), sizeToCopy);
  210. incrementChunk(sizeToCopy);
  211. }
  212. void slowStringRead(std::string &str, size_type sizeToCopy) {
  213. str.clear();
  214. str.reserve(sizeToCopy);
  215. while (sizeToCopy) {
  216. const size_type toCopy = std::min(sizeToCopy, chunkRemaining());
  217. str.append(addr(), toCopy);
  218. sizeToCopy -= toCopy;
  219. incrementChunk(toCopy);
  220. }
  221. }
  222. detail::BufferImpl::ConstSharedPtr bufferImpl_;
  223. detail::BufferImpl::ChunkList::const_iterator iter_;
  224. size_type bytes_;
  225. size_type bytesRemaining_;
  226. size_type chunkPos_;
  227. };
  228. } // namespace avro
  229. #endif