retryful_writer.h 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #pragma once
  2. #include "transaction.h"
  3. #include "transaction_pinger.h"
  4. #include <yt/cpp/mapreduce/common/retry_lib.h>
  5. #include <yt/cpp/mapreduce/http/http.h>
  6. #include <yt/cpp/mapreduce/http/retry_request.h>
  7. #include <yt/cpp/mapreduce/interface/common.h>
  8. #include <yt/cpp/mapreduce/interface/io.h>
  9. #include <yt/cpp/mapreduce/interface/raw_client.h>
  10. #include <yt/cpp/mapreduce/io/helpers.h>
  11. #include <library/cpp/threading/blocking_queue/blocking_queue.h>
  12. #include <util/generic/buffer.h>
  13. #include <util/stream/buffer.h>
  14. #include <util/stream/output.h>
  15. #include <util/system/thread.h>
  16. #include <util/system/event.h>
  17. namespace NYT {
  18. ////////////////////////////////////////////////////////////////////////////////
  19. class TRetryfulWriter
  20. : public TRawTableWriter
  21. {
  22. public:
  23. template <class TWriterOptions>
  24. TRetryfulWriter(
  25. const IRawClientPtr& rawClient,
  26. IClientRetryPolicyPtr clientRetryPolicy,
  27. ITransactionPingerPtr transactionPinger,
  28. const TClientContext& context,
  29. const TTransactionId& parentId,
  30. const TString& command,
  31. const TMaybe<TFormat>& format,
  32. const TRichYPath& path,
  33. const TWriterOptions& options)
  34. : RawClient_(rawClient)
  35. , ClientRetryPolicy_(std::move(clientRetryPolicy))
  36. , TransactionPinger_(std::move(transactionPinger))
  37. , Context_(context)
  38. , AutoFinish_(options.AutoFinish_)
  39. , Command_(command)
  40. , Format_(format)
  41. , BufferSize_(GetBufferSize(options.WriterOptions_))
  42. , ParentTransactionId_(parentId)
  43. , WriteTransaction_()
  44. , FilledBuffers_(2)
  45. , EmptyBuffers_(2)
  46. , Buffer_(BufferSize_ * 2)
  47. , Thread_(TThread::TParams{SendThread, this}.SetName("retryful_writer"))
  48. {
  49. Parameters_ = FormIORequestParameters(path, options);
  50. auto secondaryPath = path;
  51. secondaryPath.Append_ = true;
  52. secondaryPath.Schema_.Clear();
  53. secondaryPath.CompressionCodec_.Clear();
  54. secondaryPath.ErasureCodec_.Clear();
  55. secondaryPath.OptimizeFor_.Clear();
  56. SecondaryParameters_ = FormIORequestParameters(secondaryPath, options);
  57. if (options.CreateTransaction_) {
  58. WriteTransaction_.ConstructInPlace(rawClient, ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions());
  59. auto append = path.Append_.GetOrElse(false);
  60. auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE);
  61. NDetail::RequestWithRetry<void>(
  62. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  63. [this, &path, &lockMode] (TMutationId& mutationId) {
  64. RawClient_->Lock(mutationId, WriteTransaction_->GetId(), path.Path_, lockMode);
  65. });
  66. }
  67. EmptyBuffers_.Push(TBuffer(BufferSize_ * 2));
  68. }
  69. ~TRetryfulWriter() override;
  70. void NotifyRowEnd() override;
  71. void Abort() override;
  72. size_t GetBufferMemoryUsage() const override;
  73. size_t GetRetryBlockRemainingSize() const
  74. {
  75. return (BufferSize_ > Buffer_.size()) ? (BufferSize_ - Buffer_.size()) : 0;
  76. }
  77. protected:
  78. void DoWrite(const void* buf, size_t len) override;
  79. void DoFinish() override;
  80. private:
  81. static size_t GetBufferSize(const TMaybe<TWriterOptions>& writerOptions);
  82. private:
  83. const IRawClientPtr RawClient_;
  84. const IClientRetryPolicyPtr ClientRetryPolicy_;
  85. const ITransactionPingerPtr TransactionPinger_;
  86. const TClientContext Context_;
  87. const bool AutoFinish_;
  88. TString Command_;
  89. TMaybe<TFormat> Format_;
  90. const size_t BufferSize_;
  91. TNode Parameters_;
  92. TNode SecondaryParameters_;
  93. TTransactionId ParentTransactionId_;
  94. TMaybe<TPingableTransaction> WriteTransaction_;
  95. ::NThreading::TBlockingQueue<TBuffer> FilledBuffers_;
  96. ::NThreading::TBlockingQueue<TBuffer> EmptyBuffers_;
  97. TBuffer Buffer_;
  98. TThread Thread_;
  99. bool Started_ = false;
  100. std::exception_ptr Exception_ = nullptr;
  101. enum EWriterState {
  102. Ok,
  103. Completed,
  104. Error,
  105. } WriterState_ = Ok;
  106. private:
  107. void FlushBuffer(bool lastBlock);
  108. void Send(const TBuffer& buffer);
  109. void CheckWriterState();
  110. void SendThread();
  111. static void* SendThread(void* opaque);
  112. };
  113. ////////////////////////////////////////////////////////////////////////////////
  114. }