retryful_writer_v2.cpp 11 KB


  1. #include "retryful_writer_v2.h"
  2. #include <util/generic/scope.h>
  3. #include <yt/cpp/mapreduce/client/retry_heavy_write_request.h>
  4. #include <yt/cpp/mapreduce/client/transaction.h>
  5. #include <yt/cpp/mapreduce/client/transaction_pinger.h>
  6. #include <yt/cpp/mapreduce/common/fwd.h>
  7. #include <yt/cpp/mapreduce/common/helpers.h>
  8. #include <yt/cpp/mapreduce/common/retry_lib.h>
  9. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  10. #include <yt/cpp/mapreduce/http/context.h>
  11. #include <yt/cpp/mapreduce/http/helpers.h>
  12. #include <yt/cpp/mapreduce/http/http.h>
  13. #include <util/system/condvar.h>
  14. #include <queue>
  15. namespace NYT::NPrivate {
  16. ////////////////////////////////////////////////////////////////////////////////
  17. class TRetryfulWriterV2::TSentBuffer
  18. {
  19. public:
  20. TSentBuffer() = default;
  21. TSentBuffer(const TSentBuffer& ) = delete;
  22. std::pair<std::shared_ptr<std::string>, ssize_t> Snapshot() const
  23. {
  24. return {Buffer_, Size_};
  25. }
  26. void Clear()
  27. {
  28. // This method can be called only if no other object is holding snapshot.
  29. Y_ABORT_IF(Buffer_.use_count() != 1);
  30. Size_ = 0;
  31. }
  32. ssize_t Size() const
  33. {
  34. return Size_;
  35. }
  36. void Append(const void* data, ssize_t size)
  37. {
  38. auto newSize = Size_ + size;
  39. if (newSize < Capacity_) {
  40. memcpy(Buffer_->data() + Size_, data, size);
  41. } else {
  42. // Closest power of 2 exceeding new size
  43. auto newCapacity = 1 << (MostSignificantBit(newSize) + 1);
  44. newCapacity = Max<ssize_t>(64, newCapacity);
  45. auto newBuffer = std::make_shared<std::string>();
  46. newBuffer->resize(newCapacity);
  47. memcpy(newBuffer->data(), Buffer_->data(), Size_);
  48. memcpy(newBuffer->data() + Size_, data, size);
  49. Buffer_ = newBuffer;
  50. Capacity_ = newCapacity;
  51. }
  52. Size_ = newSize;
  53. }
  54. private:
  55. std::shared_ptr<std::string> Buffer_ = std::make_shared<std::string>();
  56. ssize_t Size_ = 0;
  57. ssize_t Capacity_ = 0;
  58. };
  59. ////////////////////////////////////////////////////////////////////////////////
  60. class TRetryfulWriterV2::TSender
  61. {
  62. public:
  63. TSender(TRichYPath path, THeavyRequestRetrier::TParameters parameters)
  64. : SenderThread_(
  65. [this, path=std::move(path), parameters=std::move(parameters)] {
  66. ThreadMain(std::move(path), parameters);
  67. })
  68. {
  69. SenderThread_.SetCurrentThreadName("retryful-writer-v2-sender");
  70. SenderThread_.Start();
  71. }
  72. ~TSender()
  73. {
  74. Abort();
  75. SenderThread_.Join();
  76. }
  77. bool IsRunning() const
  78. {
  79. auto g = Guard(Lock_);
  80. return State_.load() == EState::Running && !Error_;
  81. }
  82. void Abort()
  83. {
  84. auto g = Guard(Lock_);
  85. SetFinishedState(EState::Aborted, g);
  86. }
  87. void Finish()
  88. {
  89. {
  90. auto g = Guard(Lock_);
  91. SetFinishedState(EState::Completed, g);
  92. }
  93. SenderThread_.Join();
  94. CheckNoError();
  95. Y_ABORT_UNLESS(TaskIdQueue_.empty());
  96. Y_ABORT_UNLESS(TaskMap_.empty());
  97. }
  98. // Return future that is complete once upload of this buffer is successfully complete
  99. std::pair<NThreading::TFuture<void>, int> StartBlock()
  100. {
  101. auto g = Guard(Lock_);
  102. CheckNoError();
  103. auto taskId = NextTaskId_++;
  104. const auto& [it, inserted] = TaskMap_.emplace(taskId, TWriteTask{});
  105. Y_ABORT_IF(!inserted);
  106. TaskIdQueue_.push(taskId);
  107. HaveMoreData_.Signal();
  108. it->second.SendingComplete = NThreading::NewPromise();
  109. return {it->second.SendingComplete, taskId};
  110. }
  111. void UpdateBlock(int taskId, const TSentBuffer& buffer, bool complete)
  112. {
  113. auto snapshot = buffer.Snapshot();
  114. {
  115. auto g = Guard(Lock_);
  116. CheckNoError();
  117. auto it = TaskMap_.find(taskId);
  118. Y_ABORT_IF(it == TaskMap_.end());
  119. auto& writeTask = it->second;
  120. writeTask.Data = std::move(snapshot.first);
  121. writeTask.Size = snapshot.second;
  122. writeTask.BufferComplete = complete;
  123. if (TaskIdQueue_.empty() || TaskIdQueue_.back() != taskId) {
  124. TaskIdQueue_.push(taskId);
  125. }
  126. HaveMoreData_.Signal();
  127. }
  128. }
  129. private:
  130. enum class EState;
  131. private:
  132. void CheckNoError()
  133. {
  134. if (Error_) {
  135. std::rethrow_exception(Error_);
  136. }
  137. }
  138. void SetFinishedState(EState state, TGuard<TMutex>&)
  139. {
  140. if (State_ == EState::Running) {
  141. State_ = state;
  142. }
  143. HaveMoreData_.Signal();
  144. }
  145. void ThreadMain(TRichYPath path, const THeavyRequestRetrier::TParameters& parameters)
  146. {
  147. THolder<THeavyRequestRetrier> retrier;
  148. auto firstRequestParameters = parameters;
  149. auto restRequestParameters = parameters;
  150. {
  151. TNode firstPath = PathToNode(path);
  152. firstRequestParameters.Header.MergeParameters(TNode()("path", firstPath), /*overwrite*/ true);
  153. TNode restPath = PathToNode(TRichYPath(path.Path_).Append(true));
  154. restRequestParameters.Header.MergeParameters(TNode()("path", restPath), /*overwrite*/ true);
  155. }
  156. const auto* currentParameters = &firstRequestParameters;
  157. while (true) {
  158. int taskId = 0;
  159. TWriteTask task;
  160. {
  161. auto g = Guard(Lock_);
  162. while (State_ == EState::Running && TaskIdQueue_.empty()) {
  163. HaveMoreData_.Wait(Lock_);
  164. }
  165. if (
  166. State_ == EState::Aborted ||
  167. State_ == EState::Completed && TaskIdQueue_.empty()
  168. ) {
  169. break;
  170. }
  171. taskId = TaskIdQueue_.front();
  172. TaskIdQueue_.pop();
  173. if (auto it = TaskMap_.find(taskId); it != TaskMap_.end()) {
  174. task = it->second;
  175. } else {
  176. Y_ABORT();
  177. }
  178. }
  179. try {
  180. if (!retrier) {
  181. retrier = MakeHolder<THeavyRequestRetrier>(*currentParameters);
  182. }
  183. retrier->Update([task=task] {
  184. return MakeHolder<TMemoryInput>(task.Data->data(), task.Size);
  185. });
  186. if (task.BufferComplete) {
  187. retrier->Finish();
  188. retrier.Reset();
  189. }
  190. } catch (const std::exception& ex) {
  191. task.SendingComplete.SetException(std::current_exception());
  192. auto g = Guard(Lock_);
  193. Error_ = std::current_exception();
  194. return;
  195. }
  196. if (task.BufferComplete) {
  197. retrier.Reset();
  198. task.SendingComplete.SetValue();
  199. currentParameters = &restRequestParameters;
  200. auto g = Guard(Lock_);
  201. auto erased = TaskMap_.erase(taskId);
  202. Y_ABORT_UNLESS(erased == 1);
  203. }
  204. }
  205. if (State_ == EState::Completed) {
  206. auto g = Guard(Lock_);
  207. Y_ABORT_UNLESS(TaskIdQueue_.empty());
  208. Y_ABORT_UNLESS(TaskMap_.empty());
  209. }
  210. }
  211. private:
  212. struct TWriteTask
  213. {
  214. NThreading::TPromise<void> SendingComplete;
  215. std::shared_ptr<std::string> Data = std::make_shared<std::string>();
  216. ssize_t Size = 0;
  217. bool BufferComplete = false;
  218. };
  219. TMutex Lock_;
  220. TCondVar HaveMoreData_;
  221. TThread SenderThread_;
  222. THashMap<int, TWriteTask> TaskMap_;
  223. std::queue<int> TaskIdQueue_;
  224. std::exception_ptr Error_;
  225. enum class EState {
  226. Running,
  227. Completed,
  228. Aborted,
  229. };
  230. std::atomic<EState> State_ = EState::Running;
  231. int NextTaskId_ = 0;
  232. };
  233. ////////////////////////////////////////////////////////////////////////////////
  234. struct TRetryfulWriterV2::TSendTask
  235. {
  236. TSentBuffer Buffer;
  237. NThreading::TFuture<void> SentFuture = NThreading::MakeFuture();
  238. int TaskId = 0;
  239. };
  240. ////////////////////////////////////////////////////////////////////////////////
  241. TRetryfulWriterV2::TRetryfulWriterV2(
  242. IClientRetryPolicyPtr clientRetryPolicy,
  243. ITransactionPingerPtr transactionPinger,
  244. const TClientContext& context,
  245. const TTransactionId& parentId,
  246. const TString& command,
  247. const TMaybe<TFormat>& format,
  248. const TRichYPath& path,
  249. const TNode& serializedWriterOptions,
  250. ssize_t bufferSize,
  251. bool createTransaction)
  252. : BufferSize_(bufferSize)
  253. , Current_(MakeHolder<TSendTask>())
  254. , Previous_(MakeHolder<TSendTask>())
  255. {
  256. THttpHeader httpHeader("PUT", command);
  257. httpHeader.SetInputFormat(format);
  258. httpHeader.MergeParameters(serializedWriterOptions);
  259. if (createTransaction) {
  260. WriteTransaction_ = MakeHolder<TPingableTransaction>(
  261. clientRetryPolicy,
  262. context,
  263. parentId,
  264. transactionPinger->GetChildTxPinger(),
  265. TStartTransactionOptions()
  266. );
  267. auto append = path.Append_.GetOrElse(false);
  268. auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE);
  269. NDetail::NRawClient::Lock(
  270. clientRetryPolicy->CreatePolicyForGenericRequest(),
  271. context,
  272. WriteTransaction_->GetId(),
  273. path.Path_,
  274. lockMode
  275. );
  276. }
  277. THeavyRequestRetrier::TParameters parameters = {
  278. .ClientRetryPolicy = clientRetryPolicy,
  279. .TransactionPinger = transactionPinger,
  280. .Context = context,
  281. .TransactionId = WriteTransaction_ ? WriteTransaction_->GetId() : parentId,
  282. .Header = std::move(httpHeader),
  283. };
  284. Sender_ = MakeHolder<TSender>(path, parameters);
  285. DoStartBatch();
  286. }
  287. void TRetryfulWriterV2::Abort()
  288. {
  289. auto sender = std::move(Sender_);
  290. auto writeTransaction = std::move(WriteTransaction_);
  291. if (sender) {
  292. sender->Abort();
  293. if (writeTransaction) {
  294. writeTransaction->Abort();
  295. }
  296. }
  297. }
  298. size_t TRetryfulWriterV2::GetBufferMemoryUsage() const
  299. {
  300. return BufferSize_ * 4;
  301. }
  302. void TRetryfulWriterV2::DoFinish()
  303. {
  304. auto sender = std::move(Sender_);
  305. auto writeTransaction = std::move(WriteTransaction_);
  306. if (sender && sender->IsRunning()) {
  307. sender->UpdateBlock(Current_->TaskId, Current_->Buffer, true);
  308. sender->Finish();
  309. if (writeTransaction) {
  310. writeTransaction->Commit();
  311. }
  312. }
  313. }
  314. void TRetryfulWriterV2::DoStartBatch()
  315. {
  316. Previous_->SentFuture.Wait();
  317. std::swap(Previous_, Current_);
  318. auto&& [future, taskId] = Sender_->StartBlock();
  319. Current_->SentFuture = future;
  320. Current_->TaskId = taskId;
  321. Current_->Buffer.Clear();
  322. NextSizeToSend_ = SendStep_;
  323. }
  324. void TRetryfulWriterV2::DoWrite(const void* buf, size_t len)
  325. {
  326. Current_->Buffer.Append(buf, len);
  327. auto currentSize = Current_->Buffer.Size();
  328. if (currentSize >= NextSizeToSend_) {
  329. Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, false);
  330. NextSizeToSend_ = currentSize + SendStep_;
  331. }
  332. }
  333. void TRetryfulWriterV2::NotifyRowEnd()
  334. {
  335. if (Current_->Buffer.Size() >= BufferSize_) {
  336. Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, true);
  337. DoStartBatch();
  338. }
  339. }
  340. ////////////////////////////////////////////////////////////////////////////////
  341. } // namespace NYT::NPrivate