123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- #include "retryful_writer_v2.h"
- #include <util/generic/scope.h>
- #include <yt/cpp/mapreduce/client/retry_heavy_write_request.h>
- #include <yt/cpp/mapreduce/client/transaction.h>
- #include <yt/cpp/mapreduce/client/transaction_pinger.h>
- #include <yt/cpp/mapreduce/common/fwd.h>
- #include <yt/cpp/mapreduce/common/helpers.h>
- #include <yt/cpp/mapreduce/common/retry_lib.h>
- #include <yt/cpp/mapreduce/common/wait_proxy.h>
- #include <yt/cpp/mapreduce/http/context.h>
- #include <yt/cpp/mapreduce/http/helpers.h>
- #include <yt/cpp/mapreduce/http/http.h>
- #include <util/system/condvar.h>
- #include <queue>
- namespace NYT::NPrivate {
- ////////////////////////////////////////////////////////////////////////////////
- class TRetryfulWriterV2::TSentBuffer
- {
- public:
- TSentBuffer() = default;
- TSentBuffer(const TSentBuffer& ) = delete;
- std::pair<std::shared_ptr<std::string>, ssize_t> Snapshot() const
- {
- return {Buffer_, Size_};
- }
- void Clear()
- {
- // This method can be called only if no other object is holding snapshot.
- Y_ABORT_IF(Buffer_.use_count() != 1);
- Size_ = 0;
- }
- ssize_t Size() const
- {
- return Size_;
- }
- void Append(const void* data, ssize_t size)
- {
- auto newSize = Size_ + size;
- if (newSize < Capacity_) {
- memcpy(Buffer_->data() + Size_, data, size);
- } else {
- // Closest power of 2 exceeding new size
- auto newCapacity = 1 << (MostSignificantBit(newSize) + 1);
- newCapacity = Max<ssize_t>(64, newCapacity);
- auto newBuffer = std::make_shared<std::string>();
- newBuffer->resize(newCapacity);
- memcpy(newBuffer->data(), Buffer_->data(), Size_);
- memcpy(newBuffer->data() + Size_, data, size);
- Buffer_ = newBuffer;
- Capacity_ = newCapacity;
- }
- Size_ = newSize;
- }
- private:
- std::shared_ptr<std::string> Buffer_ = std::make_shared<std::string>();
- ssize_t Size_ = 0;
- ssize_t Capacity_ = 0;
- };
- ////////////////////////////////////////////////////////////////////////////////
- class TRetryfulWriterV2::TSender
- {
- public:
- TSender(TRichYPath path, THeavyRequestRetrier::TParameters parameters)
- : SenderThread_(
- [this, path=std::move(path), parameters=std::move(parameters)] {
- ThreadMain(std::move(path), parameters);
- })
- {
- SenderThread_.SetCurrentThreadName("retryful-writer-v2-sender");
- SenderThread_.Start();
- }
- ~TSender()
- {
- Abort();
- SenderThread_.Join();
- }
- bool IsRunning() const
- {
- auto g = Guard(Lock_);
- return State_.load() == EState::Running && !Error_;
- }
- void Abort()
- {
- auto g = Guard(Lock_);
- SetFinishedState(EState::Aborted, g);
- }
- void Finish()
- {
- {
- auto g = Guard(Lock_);
- SetFinishedState(EState::Completed, g);
- }
- SenderThread_.Join();
- CheckNoError();
- Y_ABORT_UNLESS(TaskIdQueue_.empty());
- Y_ABORT_UNLESS(TaskMap_.empty());
- }
- // Return future that is complete once upload of this buffer is successfully complete
- std::pair<NThreading::TFuture<void>, int> StartBlock()
- {
- auto g = Guard(Lock_);
- CheckNoError();
- auto taskId = NextTaskId_++;
- const auto& [it, inserted] = TaskMap_.emplace(taskId, TWriteTask{});
- Y_ABORT_IF(!inserted);
- TaskIdQueue_.push(taskId);
- HaveMoreData_.Signal();
- it->second.SendingComplete = NThreading::NewPromise();
- return {it->second.SendingComplete, taskId};
- }
- void UpdateBlock(int taskId, const TSentBuffer& buffer, bool complete)
- {
- auto snapshot = buffer.Snapshot();
- {
- auto g = Guard(Lock_);
- CheckNoError();
- auto it = TaskMap_.find(taskId);
- Y_ABORT_IF(it == TaskMap_.end());
- auto& writeTask = it->second;
- writeTask.Data = std::move(snapshot.first);
- writeTask.Size = snapshot.second;
- writeTask.BufferComplete = complete;
- if (TaskIdQueue_.empty() || TaskIdQueue_.back() != taskId) {
- TaskIdQueue_.push(taskId);
- }
- HaveMoreData_.Signal();
- }
- }
- private:
- enum class EState;
- private:
- void CheckNoError()
- {
- if (Error_) {
- std::rethrow_exception(Error_);
- }
- }
- void SetFinishedState(EState state, TGuard<TMutex>&)
- {
- if (State_ == EState::Running) {
- State_ = state;
- }
- HaveMoreData_.Signal();
- }
- void ThreadMain(TRichYPath path, const THeavyRequestRetrier::TParameters& parameters)
- {
- THolder<THeavyRequestRetrier> retrier;
- auto firstRequestParameters = parameters;
- auto restRequestParameters = parameters;
- {
- TNode firstPath = PathToNode(path);
- firstRequestParameters.Header.MergeParameters(TNode()("path", firstPath), /*overwrite*/ true);
- TNode restPath = PathToNode(TRichYPath(path.Path_).Append(true));
- restRequestParameters.Header.MergeParameters(TNode()("path", restPath), /*overwrite*/ true);
- }
- const auto* currentParameters = &firstRequestParameters;
- while (true) {
- int taskId = 0;
- TWriteTask task;
- {
- auto g = Guard(Lock_);
- while (State_ == EState::Running && TaskIdQueue_.empty()) {
- HaveMoreData_.Wait(Lock_);
- }
- if (
- State_ == EState::Aborted ||
- State_ == EState::Completed && TaskIdQueue_.empty()
- ) {
- break;
- }
- taskId = TaskIdQueue_.front();
- TaskIdQueue_.pop();
- if (auto it = TaskMap_.find(taskId); it != TaskMap_.end()) {
- task = it->second;
- } else {
- Y_ABORT();
- }
- }
- try {
- if (!retrier) {
- retrier = MakeHolder<THeavyRequestRetrier>(*currentParameters);
- }
- retrier->Update([task=task] {
- return MakeHolder<TMemoryInput>(task.Data->data(), task.Size);
- });
- if (task.BufferComplete) {
- retrier->Finish();
- retrier.Reset();
- }
- } catch (const std::exception& ex) {
- task.SendingComplete.SetException(std::current_exception());
- auto g = Guard(Lock_);
- Error_ = std::current_exception();
- return;
- }
- if (task.BufferComplete) {
- retrier.Reset();
- task.SendingComplete.SetValue();
- currentParameters = &restRequestParameters;
- auto g = Guard(Lock_);
- auto erased = TaskMap_.erase(taskId);
- Y_ABORT_UNLESS(erased == 1);
- }
- }
- if (State_ == EState::Completed) {
- auto g = Guard(Lock_);
- Y_ABORT_UNLESS(TaskIdQueue_.empty());
- Y_ABORT_UNLESS(TaskMap_.empty());
- }
- }
- private:
- struct TWriteTask
- {
- NThreading::TPromise<void> SendingComplete;
- std::shared_ptr<std::string> Data = std::make_shared<std::string>();
- ssize_t Size = 0;
- bool BufferComplete = false;
- };
- TMutex Lock_;
- TCondVar HaveMoreData_;
- TThread SenderThread_;
- THashMap<int, TWriteTask> TaskMap_;
- std::queue<int> TaskIdQueue_;
- std::exception_ptr Error_;
- enum class EState {
- Running,
- Completed,
- Aborted,
- };
- std::atomic<EState> State_ = EState::Running;
- int NextTaskId_ = 0;
- };
- ////////////////////////////////////////////////////////////////////////////////
- struct TRetryfulWriterV2::TSendTask
- {
- TSentBuffer Buffer;
- NThreading::TFuture<void> SentFuture = NThreading::MakeFuture();
- int TaskId = 0;
- };
- ////////////////////////////////////////////////////////////////////////////////
- TRetryfulWriterV2::TRetryfulWriterV2(
- IClientRetryPolicyPtr clientRetryPolicy,
- ITransactionPingerPtr transactionPinger,
- const TClientContext& context,
- const TTransactionId& parentId,
- const TString& command,
- const TMaybe<TFormat>& format,
- const TRichYPath& path,
- const TNode& serializedWriterOptions,
- ssize_t bufferSize,
- bool createTransaction)
- : BufferSize_(bufferSize)
- , Current_(MakeHolder<TSendTask>())
- , Previous_(MakeHolder<TSendTask>())
- {
- THttpHeader httpHeader("PUT", command);
- httpHeader.SetInputFormat(format);
- httpHeader.MergeParameters(serializedWriterOptions);
- if (createTransaction) {
- WriteTransaction_ = MakeHolder<TPingableTransaction>(
- clientRetryPolicy,
- context,
- parentId,
- transactionPinger->GetChildTxPinger(),
- TStartTransactionOptions()
- );
- auto append = path.Append_.GetOrElse(false);
- auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE);
- NDetail::NRawClient::Lock(
- clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- WriteTransaction_->GetId(),
- path.Path_,
- lockMode
- );
- }
- THeavyRequestRetrier::TParameters parameters = {
- .ClientRetryPolicy = clientRetryPolicy,
- .TransactionPinger = transactionPinger,
- .Context = context,
- .TransactionId = WriteTransaction_ ? WriteTransaction_->GetId() : parentId,
- .Header = std::move(httpHeader),
- };
- Sender_ = MakeHolder<TSender>(path, parameters);
- DoStartBatch();
- }
- void TRetryfulWriterV2::Abort()
- {
- auto sender = std::move(Sender_);
- auto writeTransaction = std::move(WriteTransaction_);
- if (sender) {
- sender->Abort();
- if (writeTransaction) {
- writeTransaction->Abort();
- }
- }
- }
- size_t TRetryfulWriterV2::GetBufferMemoryUsage() const
- {
- return BufferSize_ * 4;
- }
- void TRetryfulWriterV2::DoFinish()
- {
- auto sender = std::move(Sender_);
- auto writeTransaction = std::move(WriteTransaction_);
- if (sender && sender->IsRunning()) {
- sender->UpdateBlock(Current_->TaskId, Current_->Buffer, true);
- sender->Finish();
- if (writeTransaction) {
- writeTransaction->Commit();
- }
- }
- }
- void TRetryfulWriterV2::DoStartBatch()
- {
- Previous_->SentFuture.Wait();
- std::swap(Previous_, Current_);
- auto&& [future, taskId] = Sender_->StartBlock();
- Current_->SentFuture = future;
- Current_->TaskId = taskId;
- Current_->Buffer.Clear();
- NextSizeToSend_ = SendStep_;
- }
- void TRetryfulWriterV2::DoWrite(const void* buf, size_t len)
- {
- Current_->Buffer.Append(buf, len);
- auto currentSize = Current_->Buffer.Size();
- if (currentSize >= NextSizeToSend_) {
- Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, false);
- NextSizeToSend_ = currentSize + SendStep_;
- }
- }
- void TRetryfulWriterV2::NotifyRowEnd()
- {
- if (Current_->Buffer.Size() >= BufferSize_) {
- Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, true);
- DoStartBatch();
- }
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NYT::NPrivate
|