retryful_writer.h 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #pragma once
  2. #include "transaction.h"
  3. #include "transaction_pinger.h"
  4. #include <yt/cpp/mapreduce/common/retry_lib.h>
  5. #include <yt/cpp/mapreduce/http/http.h>
  6. #include <yt/cpp/mapreduce/interface/common.h>
  7. #include <yt/cpp/mapreduce/interface/io.h>
  8. #include <yt/cpp/mapreduce/io/helpers.h>
  9. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  10. #include <library/cpp/threading/blocking_queue/blocking_queue.h>
  11. #include <util/stream/output.h>
  12. #include <util/generic/buffer.h>
  13. #include <util/stream/buffer.h>
  14. #include <util/system/thread.h>
  15. #include <util/system/event.h>
  16. #include <atomic>
  17. namespace NYT {
  18. ////////////////////////////////////////////////////////////////////////////////
  19. class TRetryfulWriter
  20. : public TRawTableWriter
  21. {
  22. public:
  23. template <class TWriterOptions>
  24. TRetryfulWriter(
  25. IClientRetryPolicyPtr clientRetryPolicy,
  26. ITransactionPingerPtr transactionPinger,
  27. const TClientContext& context,
  28. const TTransactionId& parentId,
  29. const TString& command,
  30. const TMaybe<TFormat>& format,
  31. const TRichYPath& path,
  32. const TWriterOptions& options)
  33. : ClientRetryPolicy_(std::move(clientRetryPolicy))
  34. , TransactionPinger_(std::move(transactionPinger))
  35. , Context_(context)
  36. , Command_(command)
  37. , Format_(format)
  38. , BufferSize_(GetBufferSize(options.WriterOptions_))
  39. , ParentTransactionId_(parentId)
  40. , WriteTransaction_()
  41. , FilledBuffers_(2)
  42. , EmptyBuffers_(2)
  43. , Buffer_(BufferSize_ * 2)
  44. , Thread_(TThread::TParams{SendThread, this}.SetName("retryful_writer"))
  45. {
  46. Parameters_ = FormIORequestParameters(path, options);
  47. auto secondaryPath = path;
  48. secondaryPath.Append_ = true;
  49. secondaryPath.Schema_.Clear();
  50. secondaryPath.CompressionCodec_.Clear();
  51. secondaryPath.ErasureCodec_.Clear();
  52. secondaryPath.OptimizeFor_.Clear();
  53. SecondaryParameters_ = FormIORequestParameters(secondaryPath, options);
  54. if (options.CreateTransaction_) {
  55. WriteTransaction_.ConstructInPlace(ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions());
  56. auto append = path.Append_.GetOrElse(false);
  57. auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE);
  58. NDetail::NRawClient::Lock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, WriteTransaction_->GetId(), path.Path_, lockMode);
  59. }
  60. EmptyBuffers_.Push(TBuffer(BufferSize_ * 2));
  61. }
  62. ~TRetryfulWriter() override;
  63. void NotifyRowEnd() override;
  64. void Abort() override;
  65. size_t GetBufferMemoryUsage() const override;
  66. size_t GetRetryBlockRemainingSize() const
  67. {
  68. return (BufferSize_ > Buffer_.size()) ? (BufferSize_ - Buffer_.size()) : 0;
  69. }
  70. protected:
  71. void DoWrite(const void* buf, size_t len) override;
  72. void DoFinish() override;
  73. private:
  74. static size_t GetBufferSize(const TMaybe<TWriterOptions>& writerOptions);
  75. private:
  76. const IClientRetryPolicyPtr ClientRetryPolicy_;
  77. const ITransactionPingerPtr TransactionPinger_;
  78. const TClientContext Context_;
  79. TString Command_;
  80. TMaybe<TFormat> Format_;
  81. const size_t BufferSize_;
  82. TNode Parameters_;
  83. TNode SecondaryParameters_;
  84. TTransactionId ParentTransactionId_;
  85. TMaybe<TPingableTransaction> WriteTransaction_;
  86. ::NThreading::TBlockingQueue<TBuffer> FilledBuffers_;
  87. ::NThreading::TBlockingQueue<TBuffer> EmptyBuffers_;
  88. TBuffer Buffer_;
  89. TThread Thread_;
  90. bool Started_ = false;
  91. std::exception_ptr Exception_ = nullptr;
  92. enum EWriterState {
  93. Ok,
  94. Completed,
  95. Error,
  96. } WriterState_ = Ok;
  97. private:
  98. void FlushBuffer(bool lastBlock);
  99. void Send(const TBuffer& buffer);
  100. void CheckWriterState();
  101. void SendThread();
  102. static void* SendThread(void* opaque);
  103. };
  104. ////////////////////////////////////////////////////////////////////////////////
  105. }