Stream.cc 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * https://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "Stream.hh"
  19. #include <vector>
  20. namespace avro {
  21. using std::vector;
  22. class MemoryInputStream : public InputStream {
  23. const std::vector<uint8_t *> &data_;
  24. const size_t chunkSize_;
  25. const size_t size_;
  26. const size_t available_;
  27. size_t cur_;
  28. size_t curLen_;
  29. size_t maxLen() {
  30. size_t n = (cur_ == (size_ - 1)) ? available_ : chunkSize_;
  31. if (n == curLen_) {
  32. if (cur_ == (size_ - 1)) {
  33. return 0;
  34. }
  35. ++cur_;
  36. n = (cur_ == (size_ - 1)) ? available_ : chunkSize_;
  37. curLen_ = 0;
  38. }
  39. return n;
  40. }
  41. public:
  42. MemoryInputStream(const std::vector<uint8_t *> &b,
  43. size_t chunkSize, size_t available) : data_(b), chunkSize_(chunkSize), size_(b.size()),
  44. available_(available), cur_(0), curLen_(0) {}
  45. bool next(const uint8_t **data, size_t *len) final {
  46. if (size_t n = maxLen()) {
  47. *data = data_[cur_] + curLen_;
  48. *len = n - curLen_;
  49. curLen_ = n;
  50. return true;
  51. }
  52. return false;
  53. }
  54. void backup(size_t len) final {
  55. curLen_ -= len;
  56. }
  57. void skip(size_t len) final {
  58. while (len > 0) {
  59. if (size_t n = maxLen()) {
  60. if ((curLen_ + len) < n) {
  61. n = curLen_ + len;
  62. }
  63. len -= n - curLen_;
  64. curLen_ = n;
  65. } else {
  66. break;
  67. }
  68. }
  69. }
  70. size_t byteCount() const final {
  71. return cur_ * chunkSize_ + curLen_;
  72. }
  73. };
  74. class MemoryInputStream2 : public InputStream {
  75. const uint8_t *const data_;
  76. const size_t size_;
  77. size_t curLen_;
  78. public:
  79. MemoryInputStream2(const uint8_t *data, size_t len)
  80. : data_(data), size_(len), curLen_(0) {}
  81. bool next(const uint8_t **data, size_t *len) final {
  82. if (curLen_ == size_) {
  83. return false;
  84. }
  85. *data = &data_[curLen_];
  86. *len = size_ - curLen_;
  87. curLen_ = size_;
  88. return true;
  89. }
  90. void backup(size_t len) final {
  91. curLen_ -= len;
  92. }
  93. void skip(size_t len) final {
  94. if (len > (size_ - curLen_)) {
  95. len = size_ - curLen_;
  96. }
  97. curLen_ += len;
  98. }
  99. size_t byteCount() const final {
  100. return curLen_;
  101. }
  102. };
  103. class MemoryOutputStream final : public OutputStream {
  104. public:
  105. const size_t chunkSize_;
  106. std::vector<uint8_t *> data_;
  107. size_t available_;
  108. size_t byteCount_;
  109. explicit MemoryOutputStream(size_t chunkSize) : chunkSize_(chunkSize),
  110. available_(0), byteCount_(0) {}
  111. ~MemoryOutputStream() final {
  112. for (std::vector<uint8_t *>::const_iterator it = data_.begin();
  113. it != data_.end(); ++it) {
  114. delete[] *it;
  115. }
  116. }
  117. bool next(uint8_t **data, size_t *len) final {
  118. if (available_ == 0) {
  119. data_.push_back(new uint8_t[chunkSize_]);
  120. available_ = chunkSize_;
  121. }
  122. *data = &data_.back()[chunkSize_ - available_];
  123. *len = available_;
  124. byteCount_ += available_;
  125. available_ = 0;
  126. return true;
  127. }
  128. void backup(size_t len) final {
  129. available_ += len;
  130. byteCount_ -= len;
  131. }
  132. uint64_t byteCount() const final {
  133. return byteCount_;
  134. }
  135. void flush() final {}
  136. };
  137. std::unique_ptr<OutputStream> memoryOutputStream(size_t chunkSize) {
  138. return std::unique_ptr<OutputStream>(new MemoryOutputStream(chunkSize));
  139. }
  140. std::unique_ptr<InputStream> memoryInputStream(const uint8_t *data, size_t len) {
  141. return std::unique_ptr<InputStream>(new MemoryInputStream2(data, len));
  142. }
  143. std::unique_ptr<InputStream> memoryInputStream(const OutputStream &source) {
  144. const auto &mos =
  145. dynamic_cast<const MemoryOutputStream &>(source);
  146. return (mos.data_.empty()) ? std::unique_ptr<InputStream>(new MemoryInputStream2(nullptr, 0)) : std::unique_ptr<InputStream>(new MemoryInputStream(mos.data_, mos.chunkSize_, (mos.chunkSize_ - mos.available_)));
  147. }
  148. std::shared_ptr<std::vector<uint8_t>> snapshot(const OutputStream &source) {
  149. const auto &mos =
  150. dynamic_cast<const MemoryOutputStream &>(source);
  151. std::shared_ptr<std::vector<uint8_t>> result(new std::vector<uint8_t>());
  152. size_t c = mos.byteCount_;
  153. result->reserve(mos.byteCount_);
  154. for (auto it = mos.data_.begin();
  155. it != mos.data_.end(); ++it) {
  156. size_t n = std::min(c, mos.chunkSize_);
  157. std::copy(*it, *it + n, std::back_inserter(*result));
  158. c -= n;
  159. }
  160. return result;
  161. }
  162. } // namespace avro