file_writer.cpp 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. #include "file_writer.h"
  2. #include <yt/cpp/mapreduce/common/helpers.h>
  3. #include <yt/cpp/mapreduce/interface/finish_or_die.h>
  4. #include <yt/cpp/mapreduce/io/helpers.h>
  5. namespace NYT {
  6. ////////////////////////////////////////////////////////////////////////////////
  7. TFileWriter::TFileWriter(
  8. const TRichYPath& path,
  9. const IRawClientPtr& rawClient,
  10. IClientRetryPolicyPtr clientRetryPolicy,
  11. ITransactionPingerPtr transactionPinger,
  12. const TClientContext& context,
  13. const TTransactionId& transactionId,
  14. const TFileWriterOptions& options)
  15. : AutoFinish_(options.AutoFinish_)
  16. , RetryfulWriter_(
  17. rawClient,
  18. std::move(clientRetryPolicy),
  19. std::move(transactionPinger),
  20. context,
  21. transactionId,
  22. GetWriteFileCommand(context.Config->ApiVersion),
  23. TMaybe<TFormat>(),
  24. path,
  25. options)
  26. { }
  27. TFileWriter::~TFileWriter()
  28. {
  29. NDetail::FinishOrDie(this, AutoFinish_, "TFileWriter");
  30. }
  31. void TFileWriter::DoWrite(const void* buf, size_t len)
  32. {
  33. // If user tunes RetryBlockSize / DesiredChunkSize he expects
  34. // us to send data exactly by RetryBlockSize. So behaviour of the writer is predictable.
  35. //
  36. // We want to avoid situation when size of sent data slightly exceeded DesiredChunkSize
  37. // and server produced one chunk of desired size and one small chunk.
  38. while (len > 0) {
  39. const auto retryBlockRemainingSize = RetryfulWriter_.GetRetryBlockRemainingSize();
  40. Y_ABORT_UNLESS(retryBlockRemainingSize > 0);
  41. const auto firstWriteLen = Min(len, retryBlockRemainingSize);
  42. RetryfulWriter_.Write(buf, firstWriteLen);
  43. RetryfulWriter_.NotifyRowEnd();
  44. len -= firstWriteLen;
  45. buf = static_cast<const char*>(buf) + firstWriteLen;
  46. }
  47. }
  48. void TFileWriter::DoFinish()
  49. {
  50. RetryfulWriter_.Finish();
  51. }
  52. size_t TFileWriter::GetBufferMemoryUsage() const
  53. {
  54. return RetryfulWriter_.GetBufferMemoryUsage();
  55. }
  56. ////////////////////////////////////////////////////////////////////////////////
  57. } // namespace NYT