retryful_writer.cpp 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #include "retryful_writer.h"
  2. #include "retry_heavy_write_request.h"
  3. #include <yt/cpp/mapreduce/http/requests.h>
  4. #include <yt/cpp/mapreduce/interface/errors.h>
  5. #include <yt/cpp/mapreduce/interface/finish_or_die.h>
  6. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  7. #include <util/generic/size_literals.h>
  8. namespace NYT {
  9. ////////////////////////////////////////////////////////////////////////////////
  10. TRetryfulWriter::~TRetryfulWriter()
  11. {
  12. NDetail::FinishOrDie(this, "TRetryfulWriter");
  13. }
  14. void TRetryfulWriter::CheckWriterState()
  15. {
  16. switch (WriterState_) {
  17. case Ok:
  18. break;
  19. case Completed:
  20. ythrow TApiUsageError() << "Cannot use table writer that is finished";
  21. case Error:
  22. ythrow TApiUsageError() << "Cannot use table writer that finished with error";
  23. }
  24. }
  25. void TRetryfulWriter::NotifyRowEnd()
  26. {
  27. CheckWriterState();
  28. if (Buffer_.Size() >= BufferSize_) {
  29. FlushBuffer(false);
  30. }
  31. }
  32. void TRetryfulWriter::DoWrite(const void* buf, size_t len)
  33. {
  34. CheckWriterState();
  35. while (Buffer_.Size() + len > Buffer_.Capacity()) {
  36. Buffer_.Reserve(Buffer_.Capacity() * 2);
  37. }
  38. Buffer_.Append(static_cast<const char*>(buf), len);
  39. }
  40. void TRetryfulWriter::DoFinish()
  41. {
  42. if (WriterState_ != Ok) {
  43. return;
  44. }
  45. FlushBuffer(true);
  46. if (Started_) {
  47. FilledBuffers_.Stop();
  48. Thread_.Join();
  49. }
  50. if (Exception_) {
  51. WriterState_ = Error;
  52. std::rethrow_exception(Exception_);
  53. }
  54. if (WriteTransaction_) {
  55. WriteTransaction_->Commit();
  56. }
  57. WriterState_ = Completed;
  58. }
  59. void TRetryfulWriter::FlushBuffer(bool lastBlock)
  60. {
  61. if (!Started_) {
  62. if (lastBlock) {
  63. try {
  64. Send(Buffer_);
  65. } catch (...) {
  66. WriterState_ = Error;
  67. throw;
  68. }
  69. return;
  70. } else {
  71. Started_ = true;
  72. Thread_.Start();
  73. }
  74. }
  75. auto emptyBuffer = EmptyBuffers_.Pop();
  76. if (!emptyBuffer) {
  77. WriterState_ = Error;
  78. std::rethrow_exception(Exception_);
  79. }
  80. FilledBuffers_.Push(std::move(Buffer_));
  81. Buffer_ = std::move(emptyBuffer.GetRef());
  82. }
  83. void TRetryfulWriter::Send(const TBuffer& buffer)
  84. {
  85. THttpHeader header("PUT", Command_);
  86. header.SetInputFormat(Format_);
  87. header.MergeParameters(Parameters_);
  88. auto streamMaker = [&buffer] () {
  89. return MakeHolder<TBufferInput>(buffer);
  90. };
  91. auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_);
  92. RetryHeavyWriteRequest(ClientRetryPolicy_, TransactionPinger_, Context_, transactionId, header, streamMaker);
  93. Parameters_ = SecondaryParameters_; // all blocks except the first one are appended
  94. }
  95. void TRetryfulWriter::SendThread()
  96. {
  97. while (auto maybeBuffer = FilledBuffers_.Pop()) {
  98. auto& buffer = maybeBuffer.GetRef();
  99. try {
  100. Send(buffer);
  101. } catch (const std::exception&) {
  102. Exception_ = std::current_exception();
  103. EmptyBuffers_.Stop();
  104. break;
  105. }
  106. buffer.Clear();
  107. EmptyBuffers_.Push(std::move(buffer));
  108. }
  109. }
  110. void* TRetryfulWriter::SendThread(void* opaque)
  111. {
  112. static_cast<TRetryfulWriter*>(opaque)->SendThread();
  113. return nullptr;
  114. }
  115. void TRetryfulWriter::Abort()
  116. {
  117. if (Started_) {
  118. FilledBuffers_.Stop();
  119. Thread_.Join();
  120. }
  121. if (WriteTransaction_) {
  122. WriteTransaction_->Abort();
  123. }
  124. WriterState_ = Completed;
  125. }
  126. size_t TRetryfulWriter::GetBufferMemoryUsage() const
  127. {
  128. return BufferSize_ * 4;
  129. }
  130. size_t TRetryfulWriter::GetBufferSize(const TMaybe<TWriterOptions>& writerOptions)
  131. {
  132. auto retryBlockSize = TMaybe<size_t>();
  133. if (writerOptions) {
  134. if (writerOptions->RetryBlockSize_) {
  135. retryBlockSize = *writerOptions->RetryBlockSize_;
  136. } else if (writerOptions->DesiredChunkSize_) {
  137. retryBlockSize = *writerOptions->DesiredChunkSize_;
  138. }
  139. }
  140. return retryBlockSize.GetOrElse(64_MB);
  141. }
  142. ////////////////////////////////////////////////////////////////////////////////
  143. } // namespace NYT