123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- #include "chunked_output_stream.h"
- #include <util/system/sanitizers.h>
- namespace NYT {
- ////////////////////////////////////////////////////////////////////////////////
- TChunkedOutputStream::TChunkedOutputStream(
- TRefCountedTypeCookie tagCookie,
- size_t initialReserveSize,
- size_t maxReserveSize)
- : MaxReserveSize_(RoundUpToPage(maxReserveSize))
- , CurrentReserveSize_(RoundUpToPage(initialReserveSize))
- , CurrentChunk_(tagCookie, /*size*/ 0)
- {
- YT_VERIFY(MaxReserveSize_ > 0);
- if (CurrentReserveSize_ > MaxReserveSize_) {
- CurrentReserveSize_ = MaxReserveSize_;
- }
- }
- std::vector<TSharedRef> TChunkedOutputStream::Finish()
- {
- FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
- YT_ASSERT(CurrentChunk_.IsEmpty());
- FinishedSize_ = 0;
- for (const auto& chunk : FinishedChunks_) {
- NSan::CheckMemIsInitialized(chunk.Begin(), chunk.Size());
- }
- return std::move(FinishedChunks_);
- }
- size_t TChunkedOutputStream::GetSize() const
- {
- return FinishedSize_ + CurrentChunk_.Size();
- }
- size_t TChunkedOutputStream::GetCapacity() const
- {
- return FinishedSize_ + CurrentChunk_.Capacity();
- }
- void TChunkedOutputStream::ReserveNewChunk(size_t spaceRequired)
- {
- YT_ASSERT(CurrentChunk_.Size() == CurrentChunk_.Capacity());
- FinishedSize_ += CurrentChunk_.Size();
- FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
- CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_);
- CurrentChunk_.Reserve(std::max(RoundUpToPage(spaceRequired), CurrentReserveSize_));
- }
- void TChunkedOutputStream::DoWrite(const void* buffer, size_t length)
- {
- if (CurrentChunk_.Capacity() == 0) {
- CurrentChunk_.Reserve(CurrentReserveSize_);
- }
- auto spaceAvailable = std::min(length, CurrentChunk_.Capacity() - CurrentChunk_.Size());
- CurrentChunk_.Append(buffer, spaceAvailable);
- auto spaceRequired = length - spaceAvailable;
- if (spaceRequired > 0) {
- ReserveNewChunk(spaceRequired);
- CurrentChunk_.Append(static_cast<const char*>(buffer) + spaceAvailable, spaceRequired);
- }
- }
- size_t TChunkedOutputStream::DoNext(void** ptr)
- {
- if (CurrentChunk_.Size() == CurrentChunk_.Capacity()) {
- if (CurrentChunk_.Capacity() == 0) {
- CurrentChunk_.Reserve(CurrentReserveSize_);
- } else {
- ReserveNewChunk(0);
- }
- }
- auto spaceAvailable = CurrentChunk_.Capacity() - CurrentChunk_.Size();
- YT_ASSERT(spaceAvailable > 0);
- *ptr = CurrentChunk_.End();
- CurrentChunk_.Resize(CurrentChunk_.Capacity(), /*initializeStorage*/ false);
- return spaceAvailable;
- }
- void TChunkedOutputStream::DoUndo(size_t len)
- {
- YT_VERIFY(CurrentChunk_.Size() >= len);
- CurrentChunk_.Resize(CurrentChunk_.Size() - len);
- }
- char* TChunkedOutputStream::Preallocate(size_t size)
- {
- size_t available = CurrentChunk_.Capacity() - CurrentChunk_.Size();
- if (available < size) {
- FinishedSize_ += CurrentChunk_.Size();
- FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
- CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_);
- CurrentChunk_.Reserve(std::max(RoundUpToPage(size), CurrentReserveSize_));
- }
- return CurrentChunk_.End();
- }
- void TChunkedOutputStream::Advance(size_t size)
- {
- YT_ASSERT(CurrentChunk_.Size() + size <= CurrentChunk_.Capacity());
- CurrentChunk_.Resize(CurrentChunk_.Size() + size, false);
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NYT
|