client_writer.cpp 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. #include "client_writer.h"
  2. #include "retryful_writer.h"
  3. #include "retryless_writer.h"
  4. #include "retryful_writer_v2.h"
  5. #include <yt/cpp/mapreduce/interface/io.h>
  6. #include <yt/cpp/mapreduce/common/fwd.h>
  7. #include <yt/cpp/mapreduce/common/helpers.h>
  8. namespace NYT {
  9. ////////////////////////////////////////////////////////////////////////////////
  10. TClientWriter::TClientWriter(
  11. const TRichYPath& path,
  12. IClientRetryPolicyPtr clientRetryPolicy,
  13. ITransactionPingerPtr transactionPinger,
  14. const TClientContext& context,
  15. const TTransactionId& transactionId,
  16. const TMaybe<TFormat>& format,
  17. const TTableWriterOptions& options)
  18. : BufferSize_(options.BufferSize_)
  19. {
  20. if (options.SingleHttpRequest_) {
  21. RawWriter_.Reset(new TRetrylessWriter(
  22. context,
  23. transactionId,
  24. GetWriteTableCommand(context.Config->ApiVersion),
  25. format,
  26. path,
  27. BufferSize_,
  28. options));
  29. } else {
  30. bool useV2Writer = context.Config->TableWriterVersion == ETableWriterVersion::V2;
  31. if (useV2Writer) {
  32. auto serializedWriterOptions = FormIORequestParameters(options);
  33. RawWriter_ = MakeIntrusive<NPrivate::TRetryfulWriterV2>(
  34. std::move(clientRetryPolicy),
  35. std::move(transactionPinger),
  36. context,
  37. transactionId,
  38. GetWriteTableCommand(context.Config->ApiVersion),
  39. format,
  40. path,
  41. serializedWriterOptions,
  42. static_cast<ssize_t>(options.BufferSize_),
  43. options.CreateTransaction_);
  44. } else {
  45. RawWriter_.Reset(new TRetryfulWriter(
  46. std::move(clientRetryPolicy),
  47. std::move(transactionPinger),
  48. context,
  49. transactionId,
  50. GetWriteTableCommand(context.Config->ApiVersion),
  51. format,
  52. path,
  53. options));
  54. }
  55. }
  56. }
  57. size_t TClientWriter::GetStreamCount() const
  58. {
  59. return 1;
  60. }
  61. IOutputStream* TClientWriter::GetStream(size_t tableIndex) const
  62. {
  63. Y_UNUSED(tableIndex);
  64. return RawWriter_.Get();
  65. }
  66. void TClientWriter::OnRowFinished(size_t)
  67. {
  68. RawWriter_->NotifyRowEnd();
  69. }
  70. void TClientWriter::Abort()
  71. {
  72. RawWriter_->Abort();
  73. }
  74. size_t TClientWriter::GetBufferMemoryUsage() const
  75. {
  76. return RawWriter_->GetBufferMemoryUsage();
  77. }
  78. ////////////////////////////////////////////////////////////////////////////////
  79. } // namespace NYT