retryful_writer.h 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. #pragma once
  2. #include "transaction.h"
  3. #include "transaction_pinger.h"
  4. #include <yt/cpp/mapreduce/common/retry_lib.h>
  5. #include <yt/cpp/mapreduce/common/retry_request.h>
  6. #include <yt/cpp/mapreduce/interface/common.h>
  7. #include <yt/cpp/mapreduce/interface/io.h>
  8. #include <yt/cpp/mapreduce/interface/raw_client.h>
  9. #include <yt/cpp/mapreduce/io/helpers.h>
  10. #include <library/cpp/threading/blocking_queue/blocking_queue.h>
  11. #include <util/generic/buffer.h>
  12. #include <util/stream/buffer.h>
  13. #include <util/stream/output.h>
  14. #include <util/system/thread.h>
  15. #include <util/system/event.h>
  16. namespace NYT {
  17. ////////////////////////////////////////////////////////////////////////////////
  18. class TRetryfulWriter
  19. : public TRawTableWriter
  20. {
  21. public:
  22. template <class TWriterOptions>
  23. TRetryfulWriter(
  24. const IRawClientPtr& rawClient,
  25. IClientRetryPolicyPtr clientRetryPolicy,
  26. ITransactionPingerPtr transactionPinger,
  27. const TClientContext& context,
  28. const TTransactionId& parentId,
  29. const TString& command,
  30. const TMaybe<TFormat>& format,
  31. const TRichYPath& path,
  32. const TWriterOptions& options)
  33. : RawClient_(rawClient)
  34. , ClientRetryPolicy_(std::move(clientRetryPolicy))
  35. , TransactionPinger_(std::move(transactionPinger))
  36. , Context_(context)
  37. , AutoFinish_(options.AutoFinish_)
  38. , Command_(command)
  39. , Format_(format)
  40. , BufferSize_(GetBufferSize(options.WriterOptions_))
  41. , ParentTransactionId_(parentId)
  42. , WriteTransaction_()
  43. , FilledBuffers_(2)
  44. , EmptyBuffers_(2)
  45. , Buffer_(BufferSize_ * 2)
  46. , Thread_(TThread::TParams{SendThread, this}.SetName("retryful_writer"))
  47. {
  48. Parameters_ = FormIORequestParameters(path, options);
  49. auto secondaryPath = path;
  50. secondaryPath.Append_ = true;
  51. secondaryPath.Schema_.Clear();
  52. secondaryPath.CompressionCodec_.Clear();
  53. secondaryPath.ErasureCodec_.Clear();
  54. secondaryPath.OptimizeFor_.Clear();
  55. SecondaryParameters_ = FormIORequestParameters(secondaryPath, options);
  56. if (options.CreateTransaction_) {
  57. WriteTransaction_.ConstructInPlace(rawClient, ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions());
  58. auto append = path.Append_.GetOrElse(false);
  59. auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE);
  60. NDetail::RequestWithRetry<void>(
  61. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  62. [this, &path, &lockMode] (TMutationId& mutationId) {
  63. RawClient_->Lock(mutationId, WriteTransaction_->GetId(), path.Path_, lockMode);
  64. });
  65. }
  66. EmptyBuffers_.Push(TBuffer(BufferSize_ * 2));
  67. }
  68. ~TRetryfulWriter() override;
  69. void NotifyRowEnd() override;
  70. void Abort() override;
  71. size_t GetBufferMemoryUsage() const override;
  72. size_t GetRetryBlockRemainingSize() const
  73. {
  74. return (BufferSize_ > Buffer_.size()) ? (BufferSize_ - Buffer_.size()) : 0;
  75. }
  76. protected:
  77. void DoWrite(const void* buf, size_t len) override;
  78. void DoFinish() override;
  79. private:
  80. static size_t GetBufferSize(const TMaybe<TWriterOptions>& writerOptions);
  81. private:
  82. const IRawClientPtr RawClient_;
  83. const IClientRetryPolicyPtr ClientRetryPolicy_;
  84. const ITransactionPingerPtr TransactionPinger_;
  85. const TClientContext Context_;
  86. const bool AutoFinish_;
  87. TString Command_;
  88. TMaybe<TFormat> Format_;
  89. const size_t BufferSize_;
  90. TNode Parameters_;
  91. TNode SecondaryParameters_;
  92. TTransactionId ParentTransactionId_;
  93. TMaybe<TPingableTransaction> WriteTransaction_;
  94. ::NThreading::TBlockingQueue<TBuffer> FilledBuffers_;
  95. ::NThreading::TBlockingQueue<TBuffer> EmptyBuffers_;
  96. TBuffer Buffer_;
  97. TThread Thread_;
  98. bool Started_ = false;
  99. std::exception_ptr Exception_ = nullptr;
  100. enum EWriterState {
  101. Ok,
  102. Completed,
  103. Error,
  104. } WriterState_ = Ok;
  105. private:
  106. void FlushBuffer(bool lastBlock);
  107. void Send(const TBuffer& buffer);
  108. void CheckWriterState();
  109. void SendThread();
  110. static void* SendThread(void* opaque);
  111. };
  112. ////////////////////////////////////////////////////////////////////////////////
  113. }