client_writer.cpp 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. #include "client_writer.h"
  2. #include "retryful_writer.h"
  3. #include "retryless_writer.h"
  4. #include "retryful_writer_v2.h"
  5. #include <yt/cpp/mapreduce/interface/io.h>
  6. #include <yt/cpp/mapreduce/common/fwd.h>
  7. #include <yt/cpp/mapreduce/common/helpers.h>
  8. namespace NYT {
  9. ////////////////////////////////////////////////////////////////////////////////
  10. TClientWriter::TClientWriter(
  11. const TRichYPath& path,
  12. IClientRetryPolicyPtr clientRetryPolicy,
  13. ITransactionPingerPtr transactionPinger,
  14. const TClientContext& context,
  15. const TTransactionId& transactionId,
  16. const TMaybe<TFormat>& format,
  17. const TTableWriterOptions& options)
  18. : BufferSize_(options.BufferSize_)
  19. , AutoFinish_(options.AutoFinish_)
  20. {
  21. if (options.SingleHttpRequest_) {
  22. RawWriter_.Reset(new TRetrylessWriter(
  23. context,
  24. transactionId,
  25. GetWriteTableCommand(context.Config->ApiVersion),
  26. format,
  27. path,
  28. BufferSize_,
  29. options));
  30. } else {
  31. bool useV2Writer = context.Config->TableWriterVersion == ETableWriterVersion::V2;
  32. if (useV2Writer) {
  33. auto serializedWriterOptions = FormIORequestParameters(options);
  34. RawWriter_ = MakeIntrusive<NPrivate::TRetryfulWriterV2>(
  35. std::move(clientRetryPolicy),
  36. std::move(transactionPinger),
  37. context,
  38. transactionId,
  39. GetWriteTableCommand(context.Config->ApiVersion),
  40. format,
  41. path,
  42. serializedWriterOptions,
  43. static_cast<ssize_t>(options.BufferSize_),
  44. options.CreateTransaction_);
  45. } else {
  46. RawWriter_.Reset(new TRetryfulWriter(
  47. std::move(clientRetryPolicy),
  48. std::move(transactionPinger),
  49. context,
  50. transactionId,
  51. GetWriteTableCommand(context.Config->ApiVersion),
  52. format,
  53. path,
  54. options));
  55. }
  56. }
  57. }
  58. TClientWriter::~TClientWriter()
  59. {
  60. NDetail::FinishOrDie(this, AutoFinish_, "TClientWriter");
  61. }
  62. void TClientWriter::Finish()
  63. {
  64. RawWriter_->Finish();
  65. }
  66. size_t TClientWriter::GetStreamCount() const
  67. {
  68. return 1;
  69. }
  70. IOutputStream* TClientWriter::GetStream(size_t tableIndex) const
  71. {
  72. Y_UNUSED(tableIndex);
  73. return RawWriter_.Get();
  74. }
  75. void TClientWriter::OnRowFinished(size_t)
  76. {
  77. RawWriter_->NotifyRowEnd();
  78. }
  79. void TClientWriter::Abort()
  80. {
  81. RawWriter_->Abort();
  82. }
  83. size_t TClientWriter::GetBufferMemoryUsage() const
  84. {
  85. return RawWriter_->GetBufferMemoryUsage();
  86. }
  87. ////////////////////////////////////////////////////////////////////////////////
  88. } // namespace NYT