FileStream.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * https://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "Stream.hh"
  19. #include <fstream>
  20. #ifndef _WIN32
  21. #include "fcntl.h"
  22. #include "unistd.h"
  23. #include <cerrno>
  24. #ifndef O_BINARY
  25. #define O_BINARY 0
  26. #endif
  27. #else
  28. #include "Windows.h"
  29. #ifdef min
  30. #undef min
  31. #endif
  32. #endif
  33. using std::istream;
  34. using std::ostream;
  35. using std::unique_ptr;
  36. namespace avro {
  37. namespace {
  38. struct BufferCopyIn {
  39. virtual ~BufferCopyIn() = default;
  40. virtual void seek(size_t len) = 0;
  41. virtual bool read(uint8_t *b, size_t toRead, size_t &actual) = 0;
  42. };
  43. struct FileBufferCopyIn : public BufferCopyIn {
  44. #ifdef _WIN32
  45. HANDLE h_;
  46. FileBufferCopyIn(const char *filename) : h_(::CreateFileA(filename, GENERIC_READ, 0, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) {
  47. if (h_ == INVALID_HANDLE_VALUE) {
  48. throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError());
  49. }
  50. }
  51. ~FileBufferCopyIn() {
  52. ::CloseHandle(h_);
  53. }
  54. void seek(size_t len) {
  55. if (::SetFilePointer(h_, len, NULL, FILE_CURRENT) == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR) {
  56. throw Exception(boost::format("Cannot skip file: %1%") % ::GetLastError());
  57. }
  58. }
  59. bool read(uint8_t *b, size_t toRead, size_t &actual) {
  60. DWORD dw = 0;
  61. if (!::ReadFile(h_, b, toRead, &dw, NULL)) {
  62. throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError());
  63. }
  64. actual = static_cast<size_t>(dw);
  65. return actual != 0;
  66. }
  67. #else
  68. const int fd_;
  69. explicit FileBufferCopyIn(const char *filename) : fd_(open(filename, O_RDONLY | O_BINARY)) {
  70. if (fd_ < 0) {
  71. throw Exception(boost::format("Cannot open file: %1%") % ::strerror(errno));
  72. }
  73. }
  74. ~FileBufferCopyIn() override {
  75. ::close(fd_);
  76. }
  77. void seek(size_t len) final {
  78. off_t r = ::lseek(fd_, len, SEEK_CUR);
  79. if (r == static_cast<off_t>(-1)) {
  80. throw Exception(boost::format("Cannot skip file: %1%") % strerror(errno));
  81. }
  82. }
  83. bool read(uint8_t *b, size_t toRead, size_t &actual) final {
  84. int n = ::read(fd_, b, toRead);
  85. if (n > 0) {
  86. actual = n;
  87. return true;
  88. }
  89. return false;
  90. }
  91. #endif
  92. };
  93. struct IStreamBufferCopyIn : public BufferCopyIn {
  94. istream &is_;
  95. explicit IStreamBufferCopyIn(istream &is) : is_(is) {
  96. }
  97. void seek(size_t len) override {
  98. if (!is_.seekg(len, std::ios_base::cur)) {
  99. throw Exception("Cannot skip stream");
  100. }
  101. }
  102. bool read(uint8_t *b, size_t toRead, size_t &actual) override {
  103. is_.read(reinterpret_cast<char *>(b), toRead);
  104. if (is_.bad()) {
  105. return false;
  106. }
  107. actual = static_cast<size_t>(is_.gcount());
  108. return (!is_.eof() || actual != 0);
  109. }
  110. };
  111. struct NonSeekableIStreamBufferCopyIn : public IStreamBufferCopyIn {
  112. explicit NonSeekableIStreamBufferCopyIn(istream &is) : IStreamBufferCopyIn(is) {}
  113. void seek(size_t len) final {
  114. const size_t bufSize = 4096;
  115. uint8_t buf[bufSize];
  116. while (len > 0) {
  117. size_t n = std::min(len, bufSize);
  118. is_.read(reinterpret_cast<char *>(buf), n);
  119. if (is_.bad()) {
  120. throw Exception("Cannot skip stream");
  121. }
  122. auto actual = static_cast<size_t>(is_.gcount());
  123. if (is_.eof() && actual == 0) {
  124. throw Exception("Cannot skip stream");
  125. }
  126. len -= n;
  127. }
  128. }
  129. };
  130. } // namespace
  131. class BufferCopyInInputStream : public SeekableInputStream {
  132. const size_t bufferSize_;
  133. uint8_t *const buffer_;
  134. unique_ptr<BufferCopyIn> in_;
  135. size_t byteCount_;
  136. uint8_t *next_;
  137. size_t available_;
  138. bool next(const uint8_t **data, size_t *size) final {
  139. if (available_ == 0 && !fill()) {
  140. return false;
  141. }
  142. *data = next_;
  143. *size = available_;
  144. next_ += available_;
  145. byteCount_ += available_;
  146. available_ = 0;
  147. return true;
  148. }
  149. void backup(size_t len) final {
  150. next_ -= len;
  151. available_ += len;
  152. byteCount_ -= len;
  153. }
  154. void skip(size_t len) final {
  155. while (len > 0) {
  156. if (available_ == 0) {
  157. in_->seek(len);
  158. byteCount_ += len;
  159. return;
  160. }
  161. size_t n = std::min(available_, len);
  162. available_ -= n;
  163. next_ += n;
  164. len -= n;
  165. byteCount_ += n;
  166. }
  167. }
  168. size_t byteCount() const final { return byteCount_; }
  169. bool fill() {
  170. size_t n = 0;
  171. if (in_->read(buffer_, bufferSize_, n)) {
  172. next_ = buffer_;
  173. available_ = n;
  174. return true;
  175. }
  176. return false;
  177. }
  178. void seek(int64_t position) final {
  179. // BufferCopyIn::seek is relative to byteCount_, whereas position is
  180. // absolute.
  181. in_->seek(position - byteCount_ - available_);
  182. byteCount_ = position;
  183. available_ = 0;
  184. }
  185. public:
  186. BufferCopyInInputStream(unique_ptr<BufferCopyIn> in, size_t bufferSize) : bufferSize_(bufferSize),
  187. buffer_(new uint8_t[bufferSize]),
  188. in_(std::move(in)),
  189. byteCount_(0),
  190. next_(buffer_),
  191. available_(0) {}
  192. ~BufferCopyInInputStream() override {
  193. delete[] buffer_;
  194. }
  195. };
  196. namespace {
  197. struct BufferCopyOut {
  198. virtual ~BufferCopyOut() = default;
  199. virtual void write(const uint8_t *b, size_t len) = 0;
  200. };
  201. struct FileBufferCopyOut : public BufferCopyOut {
  202. #ifdef _WIN32
  203. HANDLE h_;
  204. FileBufferCopyOut(const char *filename) : h_(::CreateFileA(filename, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) {
  205. if (h_ == INVALID_HANDLE_VALUE) {
  206. throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError());
  207. }
  208. }
  209. ~FileBufferCopyOut() {
  210. ::CloseHandle(h_);
  211. }
  212. void write(const uint8_t *b, size_t len) {
  213. while (len > 0) {
  214. DWORD dw = 0;
  215. if (!::WriteFile(h_, b, len, &dw, NULL)) {
  216. throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError());
  217. }
  218. b += dw;
  219. len -= dw;
  220. }
  221. }
  222. #else
  223. const int fd_;
  224. explicit FileBufferCopyOut(const char *filename) : fd_(::open(filename, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0644)) {
  225. if (fd_ < 0) {
  226. throw Exception(boost::format("Cannot open file: %1%") % ::strerror(errno));
  227. }
  228. }
  229. ~FileBufferCopyOut() override {
  230. ::close(fd_);
  231. }
  232. void write(const uint8_t *b, size_t len) final {
  233. if (::write(fd_, b, len) < 0) {
  234. throw Exception(boost::format("Cannot write file: %1%") % ::strerror(errno));
  235. }
  236. }
  237. #endif
  238. };
  239. struct OStreamBufferCopyOut : public BufferCopyOut {
  240. ostream &os_;
  241. explicit OStreamBufferCopyOut(ostream &os) : os_(os) {
  242. }
  243. void write(const uint8_t *b, size_t len) final {
  244. os_.write(reinterpret_cast<const char *>(b), len);
  245. }
  246. };
  247. } // namespace
  248. class BufferCopyOutputStream : public OutputStream {
  249. size_t bufferSize_;
  250. uint8_t *const buffer_;
  251. unique_ptr<BufferCopyOut> out_;
  252. uint8_t *next_;
  253. size_t available_;
  254. size_t byteCount_;
  255. // Invariant: byteCount_ == bytesWritten + bufferSize_ - available_;
  256. bool next(uint8_t **data, size_t *len) final {
  257. if (available_ == 0) {
  258. flush();
  259. }
  260. *data = next_;
  261. *len = available_;
  262. next_ += available_;
  263. byteCount_ += available_;
  264. available_ = 0;
  265. return true;
  266. }
  267. void backup(size_t len) final {
  268. available_ += len;
  269. next_ -= len;
  270. byteCount_ -= len;
  271. }
  272. uint64_t byteCount() const final {
  273. return byteCount_;
  274. }
  275. void flush() final {
  276. out_->write(buffer_, bufferSize_ - available_);
  277. next_ = buffer_;
  278. available_ = bufferSize_;
  279. }
  280. public:
  281. BufferCopyOutputStream(unique_ptr<BufferCopyOut> out, size_t bufferSize) : bufferSize_(bufferSize),
  282. buffer_(new uint8_t[bufferSize]),
  283. out_(std::move(out)),
  284. next_(buffer_),
  285. available_(bufferSize_), byteCount_(0) {}
  286. ~BufferCopyOutputStream() override {
  287. delete[] buffer_;
  288. }
  289. };
  290. unique_ptr<InputStream> fileInputStream(const char *filename,
  291. size_t bufferSize) {
  292. unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename));
  293. return unique_ptr<InputStream>(new BufferCopyInInputStream(std::move(in), bufferSize));
  294. }
  295. unique_ptr<SeekableInputStream> fileSeekableInputStream(const char *filename,
  296. size_t bufferSize) {
  297. unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename));
  298. return unique_ptr<SeekableInputStream>(new BufferCopyInInputStream(std::move(in),
  299. bufferSize));
  300. }
  301. unique_ptr<InputStream> istreamInputStream(istream &is, size_t bufferSize) {
  302. unique_ptr<BufferCopyIn> in(new IStreamBufferCopyIn(is));
  303. return unique_ptr<InputStream>(new BufferCopyInInputStream(std::move(in), bufferSize));
  304. }
  305. unique_ptr<InputStream> nonSeekableIstreamInputStream(
  306. istream &is, size_t bufferSize) {
  307. unique_ptr<BufferCopyIn> in(new NonSeekableIStreamBufferCopyIn(is));
  308. return unique_ptr<InputStream>(new BufferCopyInInputStream(std::move(in), bufferSize));
  309. }
  310. unique_ptr<OutputStream> fileOutputStream(const char *filename,
  311. size_t bufferSize) {
  312. unique_ptr<BufferCopyOut> out(new FileBufferCopyOut(filename));
  313. return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize));
  314. }
  315. unique_ptr<OutputStream> ostreamOutputStream(ostream &os,
  316. size_t bufferSize) {
  317. unique_ptr<BufferCopyOut> out(new OStreamBufferCopyOut(os));
  318. return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize));
  319. }
  320. } // namespace avro