retryful_writer_v2.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. #include "retryful_writer_v2.h"
  2. #include <util/generic/scope.h>
  3. #include <yt/cpp/mapreduce/client/retry_heavy_write_request.h>
  4. #include <yt/cpp/mapreduce/client/transaction.h>
  5. #include <yt/cpp/mapreduce/client/transaction_pinger.h>
  6. #include <yt/cpp/mapreduce/common/fwd.h>
  7. #include <yt/cpp/mapreduce/common/helpers.h>
  8. #include <yt/cpp/mapreduce/common/retry_lib.h>
  9. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  10. #include <yt/cpp/mapreduce/http/context.h>
  11. #include <yt/cpp/mapreduce/http/helpers.h>
  12. #include <yt/cpp/mapreduce/http/http.h>
  13. #include <yt/cpp/mapreduce/interface/raw_client.h>
  14. #include <util/system/condvar.h>
  15. #include <queue>
  16. namespace NYT::NPrivate {
  17. ////////////////////////////////////////////////////////////////////////////////
  18. class TRetryfulWriterV2::TSentBuffer
  19. {
  20. public:
  21. TSentBuffer() = default;
  22. TSentBuffer(const TSentBuffer& ) = delete;
  23. std::pair<std::shared_ptr<std::string>, ssize_t> Snapshot() const
  24. {
  25. return {Buffer_, Size_};
  26. }
  27. void Clear()
  28. {
  29. // This method can be called only if no other object is holding snapshot.
  30. Y_ABORT_IF(Buffer_.use_count() != 1);
  31. Size_ = 0;
  32. }
  33. ssize_t Size() const
  34. {
  35. return Size_;
  36. }
  37. void Append(const void* data, ssize_t size)
  38. {
  39. auto newSize = Size_ + size;
  40. if (newSize < Capacity_) {
  41. memcpy(Buffer_->data() + Size_, data, size);
  42. } else {
  43. // Closest power of 2 exceeding new size
  44. auto newCapacity = 1 << (MostSignificantBit(newSize) + 1);
  45. newCapacity = Max<ssize_t>(64, newCapacity);
  46. auto newBuffer = std::make_shared<std::string>();
  47. newBuffer->resize(newCapacity);
  48. memcpy(newBuffer->data(), Buffer_->data(), Size_);
  49. memcpy(newBuffer->data() + Size_, data, size);
  50. Buffer_ = newBuffer;
  51. Capacity_ = newCapacity;
  52. }
  53. Size_ = newSize;
  54. }
  55. private:
  56. std::shared_ptr<std::string> Buffer_ = std::make_shared<std::string>();
  57. ssize_t Size_ = 0;
  58. ssize_t Capacity_ = 0;
  59. };
  60. ////////////////////////////////////////////////////////////////////////////////
  61. class TRetryfulWriterV2::TSender
  62. {
  63. public:
  64. TSender(TRichYPath path, THeavyRequestRetrier::TParameters parameters)
  65. : SenderThread_(
  66. [this, path=std::move(path), parameters=std::move(parameters)] {
  67. ThreadMain(std::move(path), parameters);
  68. })
  69. {
  70. SenderThread_.SetCurrentThreadName("retryful-writer-v2-sender");
  71. SenderThread_.Start();
  72. }
  73. ~TSender()
  74. {
  75. Abort();
  76. SenderThread_.Join();
  77. }
  78. bool IsRunning() const
  79. {
  80. auto g = Guard(Lock_);
  81. return State_.load() == EState::Running && !Error_;
  82. }
  83. void Abort()
  84. {
  85. auto g = Guard(Lock_);
  86. SetFinishedState(EState::Aborted, g);
  87. }
  88. void Finish()
  89. {
  90. {
  91. auto g = Guard(Lock_);
  92. SetFinishedState(EState::Completed, g);
  93. }
  94. SenderThread_.Join();
  95. CheckNoError();
  96. Y_ABORT_UNLESS(TaskIdQueue_.empty());
  97. Y_ABORT_UNLESS(TaskMap_.empty());
  98. }
  99. // Return future that is complete once upload of this buffer is successfully complete
  100. std::pair<NThreading::TFuture<void>, int> StartBlock()
  101. {
  102. auto g = Guard(Lock_);
  103. CheckNoError();
  104. auto taskId = NextTaskId_++;
  105. const auto& [it, inserted] = TaskMap_.emplace(taskId, TWriteTask{});
  106. Y_ABORT_IF(!inserted);
  107. TaskIdQueue_.push(taskId);
  108. HaveMoreData_.Signal();
  109. it->second.SendingComplete = NThreading::NewPromise();
  110. return {it->second.SendingComplete, taskId};
  111. }
  112. void UpdateBlock(int taskId, const TSentBuffer& buffer, bool complete)
  113. {
  114. auto snapshot = buffer.Snapshot();
  115. {
  116. auto g = Guard(Lock_);
  117. CheckNoError();
  118. auto it = TaskMap_.find(taskId);
  119. Y_ABORT_IF(it == TaskMap_.end());
  120. auto& writeTask = it->second;
  121. writeTask.Data = std::move(snapshot.first);
  122. writeTask.Size = snapshot.second;
  123. writeTask.BufferComplete = complete;
  124. if (TaskIdQueue_.empty() || TaskIdQueue_.back() != taskId) {
  125. TaskIdQueue_.push(taskId);
  126. }
  127. HaveMoreData_.Signal();
  128. }
  129. }
  130. private:
  131. enum class EState;
  132. private:
  133. void CheckNoError()
  134. {
  135. if (Error_) {
  136. std::rethrow_exception(Error_);
  137. }
  138. }
  139. void SetFinishedState(EState state, TGuard<TMutex>&)
  140. {
  141. if (State_ == EState::Running) {
  142. State_ = state;
  143. }
  144. HaveMoreData_.Signal();
  145. }
  146. void ThreadMain(TRichYPath path, const THeavyRequestRetrier::TParameters& parameters)
  147. {
  148. std::unique_ptr<THeavyRequestRetrier> retrier;
  149. auto firstRequestParameters = parameters;
  150. auto restRequestParameters = parameters;
  151. {
  152. TNode firstPath = PathToNode(path);
  153. firstRequestParameters.Header.MergeParameters(TNode()("path", firstPath), /*overwrite*/ true);
  154. TNode restPath = PathToNode(TRichYPath(path.Path_).Append(true));
  155. restRequestParameters.Header.MergeParameters(TNode()("path", restPath), /*overwrite*/ true);
  156. }
  157. const auto* currentParameters = &firstRequestParameters;
  158. while (true) {
  159. int taskId = 0;
  160. TWriteTask task;
  161. {
  162. auto g = Guard(Lock_);
  163. while (State_ == EState::Running && TaskIdQueue_.empty()) {
  164. HaveMoreData_.Wait(Lock_);
  165. }
  166. if (
  167. State_ == EState::Aborted ||
  168. State_ == EState::Completed && TaskIdQueue_.empty()
  169. ) {
  170. break;
  171. }
  172. taskId = TaskIdQueue_.front();
  173. TaskIdQueue_.pop();
  174. if (auto it = TaskMap_.find(taskId); it != TaskMap_.end()) {
  175. task = it->second;
  176. } else {
  177. Y_ABORT();
  178. }
  179. }
  180. try {
  181. if (!retrier) {
  182. retrier = std::make_unique<THeavyRequestRetrier>(*currentParameters);
  183. }
  184. retrier->Update([task=task] {
  185. return std::make_unique<TMemoryInput>(task.Data->data(), task.Size);
  186. });
  187. if (task.BufferComplete) {
  188. retrier->Finish();
  189. retrier.reset();
  190. }
  191. } catch (const std::exception& ex) {
  192. task.SendingComplete.SetException(std::current_exception());
  193. auto g = Guard(Lock_);
  194. Error_ = std::current_exception();
  195. return;
  196. }
  197. if (task.BufferComplete) {
  198. retrier.reset();
  199. task.SendingComplete.SetValue();
  200. currentParameters = &restRequestParameters;
  201. auto g = Guard(Lock_);
  202. auto erased = TaskMap_.erase(taskId);
  203. Y_ABORT_UNLESS(erased == 1);
  204. }
  205. }
  206. if (State_ == EState::Completed) {
  207. auto g = Guard(Lock_);
  208. Y_ABORT_UNLESS(TaskIdQueue_.empty());
  209. Y_ABORT_UNLESS(TaskMap_.empty());
  210. }
  211. }
  212. private:
  213. struct TWriteTask
  214. {
  215. NThreading::TPromise<void> SendingComplete;
  216. std::shared_ptr<std::string> Data = std::make_shared<std::string>();
  217. ssize_t Size = 0;
  218. bool BufferComplete = false;
  219. };
  220. TMutex Lock_;
  221. TCondVar HaveMoreData_;
  222. TThread SenderThread_;
  223. THashMap<int, TWriteTask> TaskMap_;
  224. std::queue<int> TaskIdQueue_;
  225. std::exception_ptr Error_;
  226. enum class EState {
  227. Running,
  228. Completed,
  229. Aborted,
  230. };
  231. std::atomic<EState> State_ = EState::Running;
  232. int NextTaskId_ = 0;
  233. };
  234. ////////////////////////////////////////////////////////////////////////////////
  235. struct TRetryfulWriterV2::TSendTask
  236. {
  237. TSentBuffer Buffer;
  238. NThreading::TFuture<void> SentFuture = NThreading::MakeFuture();
  239. int TaskId = 0;
  240. };
  241. ////////////////////////////////////////////////////////////////////////////////
  242. TRetryfulWriterV2::TRetryfulWriterV2(
  243. const IRawClientPtr& rawClient,
  244. IClientRetryPolicyPtr clientRetryPolicy,
  245. ITransactionPingerPtr transactionPinger,
  246. const TClientContext& context,
  247. const TTransactionId& parentId,
  248. const TString& command,
  249. const TMaybe<TFormat>& format,
  250. const TRichYPath& path,
  251. const TNode& serializedWriterOptions,
  252. ssize_t bufferSize,
  253. bool createTransaction)
  254. : BufferSize_(bufferSize)
  255. , Current_(std::make_unique<TSendTask>())
  256. , Previous_(std::make_unique<TSendTask>())
  257. {
  258. THttpHeader httpHeader("PUT", command);
  259. httpHeader.SetInputFormat(format);
  260. httpHeader.MergeParameters(serializedWriterOptions);
  261. if (createTransaction) {
  262. WriteTransaction_ = std::make_unique<TPingableTransaction>(
  263. rawClient,
  264. clientRetryPolicy,
  265. context,
  266. parentId,
  267. transactionPinger->GetChildTxPinger(),
  268. TStartTransactionOptions()
  269. );
  270. auto append = path.Append_.GetOrElse(false);
  271. auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE);
  272. NDetail::RequestWithRetry<void>(
  273. clientRetryPolicy->CreatePolicyForGenericRequest(),
  274. [this, &rawClient, &path, &lockMode] (TMutationId& mutationId) {
  275. rawClient->Lock(mutationId, WriteTransaction_->GetId(), path.Path_, lockMode);
  276. });
  277. }
  278. THeavyRequestRetrier::TParameters parameters = {
  279. .RawClientPtr = rawClient,
  280. .ClientRetryPolicy = clientRetryPolicy,
  281. .TransactionPinger = transactionPinger,
  282. .Context = context,
  283. .TransactionId = WriteTransaction_ ? WriteTransaction_->GetId() : parentId,
  284. .Header = std::move(httpHeader),
  285. };
  286. Sender_ = std::make_unique<TSender>(path, parameters);
  287. DoStartBatch();
  288. }
  289. void TRetryfulWriterV2::Abort()
  290. {
  291. auto sender = std::move(Sender_);
  292. auto writeTransaction = std::move(WriteTransaction_);
  293. if (sender) {
  294. sender->Abort();
  295. if (writeTransaction) {
  296. writeTransaction->Abort();
  297. }
  298. }
  299. }
  300. size_t TRetryfulWriterV2::GetBufferMemoryUsage() const
  301. {
  302. return BufferSize_ * 4;
  303. }
  304. void TRetryfulWriterV2::DoFinish()
  305. {
  306. auto sender = std::move(Sender_);
  307. auto writeTransaction = std::move(WriteTransaction_);
  308. if (sender && sender->IsRunning()) {
  309. sender->UpdateBlock(Current_->TaskId, Current_->Buffer, true);
  310. sender->Finish();
  311. if (writeTransaction) {
  312. writeTransaction->Commit();
  313. }
  314. }
  315. }
  316. void TRetryfulWriterV2::DoStartBatch()
  317. {
  318. Previous_->SentFuture.Wait();
  319. std::swap(Previous_, Current_);
  320. auto&& [future, taskId] = Sender_->StartBlock();
  321. Current_->SentFuture = future;
  322. Current_->TaskId = taskId;
  323. Current_->Buffer.Clear();
  324. NextSizeToSend_ = SendStep_;
  325. }
  326. void TRetryfulWriterV2::DoWrite(const void* buf, size_t len)
  327. {
  328. Current_->Buffer.Append(buf, len);
  329. auto currentSize = Current_->Buffer.Size();
  330. if (currentSize >= NextSizeToSend_) {
  331. Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, false);
  332. NextSizeToSend_ = currentSize + SendStep_;
  333. }
  334. }
  335. void TRetryfulWriterV2::NotifyRowEnd()
  336. {
  337. if (Current_->Buffer.Size() >= BufferSize_) {
  338. Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, true);
  339. DoStartBatch();
  340. }
  341. }
  342. ////////////////////////////////////////////////////////////////////////////////
  343. } // namespace NYT::NPrivate