123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "Stream.hh"
- #include <fstream>
- #ifndef _WIN32
- #include "fcntl.h"
- #include "unistd.h"
- #include <cerrno>
- #ifndef O_BINARY
- #define O_BINARY 0
- #endif
- #else
- #include "Windows.h"
- #ifdef min
- #undef min
- #endif
- #endif
- using std::istream;
- using std::ostream;
- using std::unique_ptr;
- namespace avro {
- namespace {
- struct BufferCopyIn {
- virtual ~BufferCopyIn() = default;
- virtual void seek(size_t len) = 0;
- virtual bool read(uint8_t *b, size_t toRead, size_t &actual) = 0;
- };
- struct FileBufferCopyIn : public BufferCopyIn {
- #ifdef _WIN32
- HANDLE h_;
- FileBufferCopyIn(const char *filename) : h_(::CreateFileA(filename, GENERIC_READ, 0, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) {
- if (h_ == INVALID_HANDLE_VALUE) {
- throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError());
- }
- }
- ~FileBufferCopyIn() {
- ::CloseHandle(h_);
- }
- void seek(size_t len) {
- if (::SetFilePointer(h_, len, NULL, FILE_CURRENT) == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR) {
- throw Exception(boost::format("Cannot skip file: %1%") % ::GetLastError());
- }
- }
- bool read(uint8_t *b, size_t toRead, size_t &actual) {
- DWORD dw = 0;
- if (!::ReadFile(h_, b, toRead, &dw, NULL)) {
- throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError());
- }
- actual = static_cast<size_t>(dw);
- return actual != 0;
- }
- #else
- const int fd_;
- explicit FileBufferCopyIn(const char *filename) : fd_(open(filename, O_RDONLY | O_BINARY)) {
- if (fd_ < 0) {
- throw Exception(boost::format("Cannot open file: %1%") % ::strerror(errno));
- }
- }
- ~FileBufferCopyIn() override {
- ::close(fd_);
- }
- void seek(size_t len) final {
- off_t r = ::lseek(fd_, len, SEEK_CUR);
- if (r == static_cast<off_t>(-1)) {
- throw Exception(boost::format("Cannot skip file: %1%") % strerror(errno));
- }
- }
- bool read(uint8_t *b, size_t toRead, size_t &actual) final {
- int n = ::read(fd_, b, toRead);
- if (n > 0) {
- actual = n;
- return true;
- }
- return false;
- }
- #endif
- };
- struct IStreamBufferCopyIn : public BufferCopyIn {
- istream &is_;
- explicit IStreamBufferCopyIn(istream &is) : is_(is) {
- }
- void seek(size_t len) override {
- if (!is_.seekg(len, std::ios_base::cur)) {
- throw Exception("Cannot skip stream");
- }
- }
- bool read(uint8_t *b, size_t toRead, size_t &actual) override {
- is_.read(reinterpret_cast<char *>(b), toRead);
- if (is_.bad()) {
- return false;
- }
- actual = static_cast<size_t>(is_.gcount());
- return (!is_.eof() || actual != 0);
- }
- };
- struct NonSeekableIStreamBufferCopyIn : public IStreamBufferCopyIn {
- explicit NonSeekableIStreamBufferCopyIn(istream &is) : IStreamBufferCopyIn(is) {}
- void seek(size_t len) final {
- const size_t bufSize = 4096;
- uint8_t buf[bufSize];
- while (len > 0) {
- size_t n = std::min(len, bufSize);
- is_.read(reinterpret_cast<char *>(buf), n);
- if (is_.bad()) {
- throw Exception("Cannot skip stream");
- }
- auto actual = static_cast<size_t>(is_.gcount());
- if (is_.eof() && actual == 0) {
- throw Exception("Cannot skip stream");
- }
- len -= n;
- }
- }
- };
- } // namespace
- class BufferCopyInInputStream : public SeekableInputStream {
- const size_t bufferSize_;
- uint8_t *const buffer_;
- unique_ptr<BufferCopyIn> in_;
- size_t byteCount_;
- uint8_t *next_;
- size_t available_;
- bool next(const uint8_t **data, size_t *size) final {
- if (available_ == 0 && !fill()) {
- return false;
- }
- *data = next_;
- *size = available_;
- next_ += available_;
- byteCount_ += available_;
- available_ = 0;
- return true;
- }
- void backup(size_t len) final {
- next_ -= len;
- available_ += len;
- byteCount_ -= len;
- }
- void skip(size_t len) final {
- while (len > 0) {
- if (available_ == 0) {
- in_->seek(len);
- byteCount_ += len;
- return;
- }
- size_t n = std::min(available_, len);
- available_ -= n;
- next_ += n;
- len -= n;
- byteCount_ += n;
- }
- }
- size_t byteCount() const final { return byteCount_; }
- bool fill() {
- size_t n = 0;
- if (in_->read(buffer_, bufferSize_, n)) {
- next_ = buffer_;
- available_ = n;
- return true;
- }
- return false;
- }
- void seek(int64_t position) final {
- // BufferCopyIn::seek is relative to byteCount_, whereas position is
- // absolute.
- in_->seek(position - byteCount_ - available_);
- byteCount_ = position;
- available_ = 0;
- }
- public:
- BufferCopyInInputStream(unique_ptr<BufferCopyIn> in, size_t bufferSize) : bufferSize_(bufferSize),
- buffer_(new uint8_t[bufferSize]),
- in_(std::move(in)),
- byteCount_(0),
- next_(buffer_),
- available_(0) {}
- ~BufferCopyInInputStream() override {
- delete[] buffer_;
- }
- };
- namespace {
- struct BufferCopyOut {
- virtual ~BufferCopyOut() = default;
- virtual void write(const uint8_t *b, size_t len) = 0;
- };
- struct FileBufferCopyOut : public BufferCopyOut {
- #ifdef _WIN32
- HANDLE h_;
- FileBufferCopyOut(const char *filename) : h_(::CreateFileA(filename, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) {
- if (h_ == INVALID_HANDLE_VALUE) {
- throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError());
- }
- }
- ~FileBufferCopyOut() {
- ::CloseHandle(h_);
- }
- void write(const uint8_t *b, size_t len) {
- while (len > 0) {
- DWORD dw = 0;
- if (!::WriteFile(h_, b, len, &dw, NULL)) {
- throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError());
- }
- b += dw;
- len -= dw;
- }
- }
- #else
- const int fd_;
- explicit FileBufferCopyOut(const char *filename) : fd_(::open(filename, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0644)) {
- if (fd_ < 0) {
- throw Exception(boost::format("Cannot open file: %1%") % ::strerror(errno));
- }
- }
- ~FileBufferCopyOut() override {
- ::close(fd_);
- }
- void write(const uint8_t *b, size_t len) final {
- if (::write(fd_, b, len) < 0) {
- throw Exception(boost::format("Cannot write file: %1%") % ::strerror(errno));
- }
- }
- #endif
- };
- struct OStreamBufferCopyOut : public BufferCopyOut {
- ostream &os_;
- explicit OStreamBufferCopyOut(ostream &os) : os_(os) {
- }
- void write(const uint8_t *b, size_t len) final {
- os_.write(reinterpret_cast<const char *>(b), len);
- }
- };
- } // namespace
- class BufferCopyOutputStream : public OutputStream {
- size_t bufferSize_;
- uint8_t *const buffer_;
- unique_ptr<BufferCopyOut> out_;
- uint8_t *next_;
- size_t available_;
- size_t byteCount_;
- // Invariant: byteCount_ == bytesWritten + bufferSize_ - available_;
- bool next(uint8_t **data, size_t *len) final {
- if (available_ == 0) {
- flush();
- }
- *data = next_;
- *len = available_;
- next_ += available_;
- byteCount_ += available_;
- available_ = 0;
- return true;
- }
- void backup(size_t len) final {
- available_ += len;
- next_ -= len;
- byteCount_ -= len;
- }
- uint64_t byteCount() const final {
- return byteCount_;
- }
- void flush() final {
- out_->write(buffer_, bufferSize_ - available_);
- next_ = buffer_;
- available_ = bufferSize_;
- }
- public:
- BufferCopyOutputStream(unique_ptr<BufferCopyOut> out, size_t bufferSize) : bufferSize_(bufferSize),
- buffer_(new uint8_t[bufferSize]),
- out_(std::move(out)),
- next_(buffer_),
- available_(bufferSize_), byteCount_(0) {}
- ~BufferCopyOutputStream() override {
- delete[] buffer_;
- }
- };
- unique_ptr<InputStream> fileInputStream(const char *filename,
- size_t bufferSize) {
- unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename));
- return unique_ptr<InputStream>(new BufferCopyInInputStream(std::move(in), bufferSize));
- }
- unique_ptr<SeekableInputStream> fileSeekableInputStream(const char *filename,
- size_t bufferSize) {
- unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename));
- return unique_ptr<SeekableInputStream>(new BufferCopyInInputStream(std::move(in),
- bufferSize));
- }
- unique_ptr<InputStream> istreamInputStream(istream &is, size_t bufferSize) {
- unique_ptr<BufferCopyIn> in(new IStreamBufferCopyIn(is));
- return unique_ptr<InputStream>(new BufferCopyInInputStream(std::move(in), bufferSize));
- }
- unique_ptr<InputStream> nonSeekableIstreamInputStream(
- istream &is, size_t bufferSize) {
- unique_ptr<BufferCopyIn> in(new NonSeekableIStreamBufferCopyIn(is));
- return unique_ptr<InputStream>(new BufferCopyInInputStream(std::move(in), bufferSize));
- }
- unique_ptr<OutputStream> fileOutputStream(const char *filename,
- size_t bufferSize) {
- unique_ptr<BufferCopyOut> out(new FileBufferCopyOut(filename));
- return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize));
- }
- unique_ptr<OutputStream> ostreamOutputStream(ostream &os,
- size_t bufferSize) {
- unique_ptr<BufferCopyOut> out(new OStreamBufferCopyOut(os));
- return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize));
- }
- } // namespace avro
|