retryful_writer.cpp 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. #include "retryful_writer.h"
  2. #include "retry_heavy_write_request.h"
  3. #include <yt/cpp/mapreduce/http/requests.h>
  4. #include <yt/cpp/mapreduce/interface/errors.h>
  5. #include <yt/cpp/mapreduce/interface/finish_or_die.h>
  6. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  7. #include <yt/cpp/mapreduce/http_client/raw_client.h>
  8. #include <util/generic/size_literals.h>
  9. namespace NYT {
  10. ////////////////////////////////////////////////////////////////////////////////
  11. TRetryfulWriter::~TRetryfulWriter()
  12. {
  13. NDetail::FinishOrDie(this, AutoFinish_, "TRetryfulWriter");
  14. }
  15. void TRetryfulWriter::CheckWriterState()
  16. {
  17. switch (WriterState_) {
  18. case Ok:
  19. break;
  20. case Completed:
  21. ythrow TApiUsageError() << "Cannot use table writer that is finished";
  22. case Error:
  23. ythrow TApiUsageError() << "Cannot use table writer that finished with error";
  24. }
  25. }
  26. void TRetryfulWriter::NotifyRowEnd()
  27. {
  28. CheckWriterState();
  29. if (Buffer_.Size() >= BufferSize_) {
  30. FlushBuffer(false);
  31. }
  32. }
  33. void TRetryfulWriter::DoWrite(const void* buf, size_t len)
  34. {
  35. CheckWriterState();
  36. while (Buffer_.Size() + len > Buffer_.Capacity()) {
  37. Buffer_.Reserve(Buffer_.Capacity() * 2);
  38. }
  39. Buffer_.Append(static_cast<const char*>(buf), len);
  40. }
  41. void TRetryfulWriter::DoFinish()
  42. {
  43. if (WriterState_ != Ok) {
  44. return;
  45. }
  46. FlushBuffer(true);
  47. if (Started_) {
  48. FilledBuffers_.Stop();
  49. Thread_.Join();
  50. }
  51. if (Exception_) {
  52. WriterState_ = Error;
  53. std::rethrow_exception(Exception_);
  54. }
  55. if (WriteTransaction_) {
  56. WriteTransaction_->Commit();
  57. }
  58. WriterState_ = Completed;
  59. }
  60. void TRetryfulWriter::FlushBuffer(bool lastBlock)
  61. {
  62. if (!Started_) {
  63. if (lastBlock) {
  64. try {
  65. Send(Buffer_);
  66. } catch (...) {
  67. WriterState_ = Error;
  68. throw;
  69. }
  70. return;
  71. } else {
  72. Started_ = true;
  73. Thread_.Start();
  74. }
  75. }
  76. auto emptyBuffer = EmptyBuffers_.Pop();
  77. if (!emptyBuffer) {
  78. WriterState_ = Error;
  79. std::rethrow_exception(Exception_);
  80. }
  81. FilledBuffers_.Push(std::move(Buffer_));
  82. Buffer_ = std::move(emptyBuffer.GetRef());
  83. }
  84. void TRetryfulWriter::Send(const TBuffer& buffer)
  85. {
  86. THttpHeader header("PUT", Command_);
  87. header.SetInputFormat(Format_);
  88. header.MergeParameters(Parameters_);
  89. auto streamMaker = [&buffer] () {
  90. return std::make_unique<TBufferInput>(buffer);
  91. };
  92. auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_);
  93. RetryHeavyWriteRequest(RawClient_, ClientRetryPolicy_, TransactionPinger_, Context_, transactionId, header, streamMaker);
  94. Parameters_ = SecondaryParameters_; // all blocks except the first one are appended
  95. }
  96. void TRetryfulWriter::SendThread()
  97. {
  98. while (auto maybeBuffer = FilledBuffers_.Pop()) {
  99. auto& buffer = maybeBuffer.GetRef();
  100. try {
  101. Send(buffer);
  102. } catch (const std::exception&) {
  103. Exception_ = std::current_exception();
  104. EmptyBuffers_.Stop();
  105. break;
  106. }
  107. buffer.Clear();
  108. EmptyBuffers_.Push(std::move(buffer));
  109. }
  110. }
  111. void* TRetryfulWriter::SendThread(void* opaque)
  112. {
  113. static_cast<TRetryfulWriter*>(opaque)->SendThread();
  114. return nullptr;
  115. }
  116. void TRetryfulWriter::Abort()
  117. {
  118. if (Started_) {
  119. FilledBuffers_.Stop();
  120. Thread_.Join();
  121. }
  122. if (WriteTransaction_) {
  123. WriteTransaction_->Abort();
  124. }
  125. WriterState_ = Completed;
  126. }
  127. size_t TRetryfulWriter::GetBufferMemoryUsage() const
  128. {
  129. return BufferSize_ * 4;
  130. }
  131. size_t TRetryfulWriter::GetBufferSize(const TMaybe<TWriterOptions>& writerOptions)
  132. {
  133. auto retryBlockSize = TMaybe<size_t>();
  134. if (writerOptions) {
  135. if (writerOptions->RetryBlockSize_) {
  136. retryBlockSize = *writerOptions->RetryBlockSize_;
  137. } else if (writerOptions->DesiredChunkSize_) {
  138. retryBlockSize = *writerOptions->DesiredChunkSize_;
  139. }
  140. }
  141. return retryBlockSize.GetOrElse(64_MB);
  142. }
  143. ////////////////////////////////////////////////////////////////////////////////
  144. } // namespace NYT