retryful_writer.h 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #pragma once
  2. #include "transaction.h"
  3. #include "transaction_pinger.h"
  4. #include <yt/cpp/mapreduce/common/retry_lib.h>
  5. #include <yt/cpp/mapreduce/http/http.h>
  6. #include <yt/cpp/mapreduce/interface/common.h>
  7. #include <yt/cpp/mapreduce/interface/io.h>
  8. #include <yt/cpp/mapreduce/io/helpers.h>
  9. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  10. #include <library/cpp/threading/blocking_queue/blocking_queue.h>
  11. #include <util/stream/output.h>
  12. #include <util/generic/buffer.h>
  13. #include <util/stream/buffer.h>
  14. #include <util/system/thread.h>
  15. #include <util/system/event.h>
  16. namespace NYT {
  17. ////////////////////////////////////////////////////////////////////////////////
  18. class TRetryfulWriter
  19. : public TRawTableWriter
  20. {
  21. public:
  22. template <class TWriterOptions>
  23. TRetryfulWriter(
  24. IClientRetryPolicyPtr clientRetryPolicy,
  25. ITransactionPingerPtr transactionPinger,
  26. const TClientContext& context,
  27. const TTransactionId& parentId,
  28. const TString& command,
  29. const TMaybe<TFormat>& format,
  30. const TRichYPath& path,
  31. const TWriterOptions& options)
  32. : ClientRetryPolicy_(std::move(clientRetryPolicy))
  33. , TransactionPinger_(std::move(transactionPinger))
  34. , Context_(context)
  35. , AutoFinish_(options.AutoFinish_)
  36. , Command_(command)
  37. , Format_(format)
  38. , BufferSize_(GetBufferSize(options.WriterOptions_))
  39. , ParentTransactionId_(parentId)
  40. , WriteTransaction_()
  41. , FilledBuffers_(2)
  42. , EmptyBuffers_(2)
  43. , Buffer_(BufferSize_ * 2)
  44. , Thread_(TThread::TParams{SendThread, this}.SetName("retryful_writer"))
  45. {
  46. Parameters_ = FormIORequestParameters(path, options);
  47. auto secondaryPath = path;
  48. secondaryPath.Append_ = true;
  49. secondaryPath.Schema_.Clear();
  50. secondaryPath.CompressionCodec_.Clear();
  51. secondaryPath.ErasureCodec_.Clear();
  52. secondaryPath.OptimizeFor_.Clear();
  53. SecondaryParameters_ = FormIORequestParameters(secondaryPath, options);
  54. if (options.CreateTransaction_) {
  55. WriteTransaction_.ConstructInPlace(ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions());
  56. auto append = path.Append_.GetOrElse(false);
  57. auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE);
  58. NDetail::NRawClient::Lock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, WriteTransaction_->GetId(), path.Path_, lockMode);
  59. }
  60. EmptyBuffers_.Push(TBuffer(BufferSize_ * 2));
  61. }
  62. ~TRetryfulWriter() override;
  63. void NotifyRowEnd() override;
  64. void Abort() override;
  65. size_t GetBufferMemoryUsage() const override;
  66. size_t GetRetryBlockRemainingSize() const
  67. {
  68. return (BufferSize_ > Buffer_.size()) ? (BufferSize_ - Buffer_.size()) : 0;
  69. }
  70. protected:
  71. void DoWrite(const void* buf, size_t len) override;
  72. void DoFinish() override;
  73. private:
  74. static size_t GetBufferSize(const TMaybe<TWriterOptions>& writerOptions);
  75. private:
  76. const IClientRetryPolicyPtr ClientRetryPolicy_;
  77. const ITransactionPingerPtr TransactionPinger_;
  78. const TClientContext Context_;
  79. const bool AutoFinish_;
  80. TString Command_;
  81. TMaybe<TFormat> Format_;
  82. const size_t BufferSize_;
  83. TNode Parameters_;
  84. TNode SecondaryParameters_;
  85. TTransactionId ParentTransactionId_;
  86. TMaybe<TPingableTransaction> WriteTransaction_;
  87. ::NThreading::TBlockingQueue<TBuffer> FilledBuffers_;
  88. ::NThreading::TBlockingQueue<TBuffer> EmptyBuffers_;
  89. TBuffer Buffer_;
  90. TThread Thread_;
  91. bool Started_ = false;
  92. std::exception_ptr Exception_ = nullptr;
  93. enum EWriterState {
  94. Ok,
  95. Completed,
  96. Error,
  97. } WriterState_ = Ok;
  98. private:
  99. void FlushBuffer(bool lastBlock);
  100. void Send(const TBuffer& buffer);
  101. void CheckWriterState();
  102. void SendThread();
  103. static void* SendThread(void* opaque);
  104. };
  105. ////////////////////////////////////////////////////////////////////////////////
  106. }