client_writer.cpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. #include "client_writer.h"
  2. #include "retryful_writer.h"
  3. #include "retryful_writer_v2.h"
  4. #include "retryless_writer.h"
  5. #include <yt/cpp/mapreduce/common/fwd.h>
  6. #include <yt/cpp/mapreduce/common/helpers.h>
  7. #include <yt/cpp/mapreduce/interface/io.h>
  8. namespace NYT {
  9. ////////////////////////////////////////////////////////////////////////////////
  10. TClientWriter::TClientWriter(
  11. const TRichYPath& path,
  12. const IRawClientPtr& rawClient,
  13. IClientRetryPolicyPtr clientRetryPolicy,
  14. ITransactionPingerPtr transactionPinger,
  15. const TClientContext& context,
  16. const TTransactionId& transactionId,
  17. const TMaybe<TFormat>& format,
  18. const TTableWriterOptions& options)
  19. : BufferSize_(options.BufferSize_)
  20. , AutoFinish_(options.AutoFinish_)
  21. {
  22. if (options.SingleHttpRequest_) {
  23. RawWriter_.Reset(new TRetrylessWriter(
  24. context,
  25. transactionId,
  26. GetWriteTableCommand(context.Config->ApiVersion),
  27. format,
  28. path,
  29. BufferSize_,
  30. options));
  31. } else {
  32. bool useV2Writer = context.Config->TableWriterVersion == ETableWriterVersion::V2;
  33. if (useV2Writer) {
  34. auto serializedWriterOptions = FormIORequestParameters(options);
  35. RawWriter_ = MakeIntrusive<NPrivate::TRetryfulWriterV2>(
  36. rawClient,
  37. std::move(clientRetryPolicy),
  38. std::move(transactionPinger),
  39. context,
  40. transactionId,
  41. GetWriteTableCommand(context.Config->ApiVersion),
  42. format,
  43. path,
  44. serializedWriterOptions,
  45. static_cast<ssize_t>(options.BufferSize_),
  46. options.CreateTransaction_);
  47. } else {
  48. RawWriter_.Reset(new TRetryfulWriter(
  49. rawClient,
  50. std::move(clientRetryPolicy),
  51. std::move(transactionPinger),
  52. context,
  53. transactionId,
  54. GetWriteTableCommand(context.Config->ApiVersion),
  55. format,
  56. path,
  57. options));
  58. }
  59. }
  60. }
  61. TClientWriter::~TClientWriter()
  62. {
  63. NDetail::FinishOrDie(this, AutoFinish_, "TClientWriter");
  64. }
  65. void TClientWriter::Finish()
  66. {
  67. RawWriter_->Finish();
  68. }
  69. size_t TClientWriter::GetStreamCount() const
  70. {
  71. return 1;
  72. }
  73. IOutputStream* TClientWriter::GetStream(size_t tableIndex) const
  74. {
  75. Y_UNUSED(tableIndex);
  76. return RawWriter_.Get();
  77. }
  78. void TClientWriter::OnRowFinished(size_t)
  79. {
  80. RawWriter_->NotifyRowEnd();
  81. }
  82. void TClientWriter::Abort()
  83. {
  84. RawWriter_->Abort();
  85. }
  86. size_t TClientWriter::GetBufferMemoryUsage() const
  87. {
  88. return RawWriter_->GetBufferMemoryUsage();
  89. }
  90. ////////////////////////////////////////////////////////////////////////////////
  91. } // namespace NYT