file_writer.cpp 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. #include "file_writer.h"
  2. #include <yt/cpp/mapreduce/io/helpers.h>
  3. #include <yt/cpp/mapreduce/interface/finish_or_die.h>
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. namespace NYT {
  6. ////////////////////////////////////////////////////////////////////////////////
  7. TFileWriter::TFileWriter(
  8. const TRichYPath& path,
  9. IClientRetryPolicyPtr clientRetryPolicy,
  10. ITransactionPingerPtr transactionPinger,
  11. const TClientContext& context,
  12. const TTransactionId& transactionId,
  13. const TFileWriterOptions& options)
  14. : AutoFinish_(options.AutoFinish_)
  15. , RetryfulWriter_(
  16. std::move(clientRetryPolicy),
  17. std::move(transactionPinger),
  18. context,
  19. transactionId,
  20. GetWriteFileCommand(context.Config->ApiVersion),
  21. TMaybe<TFormat>(),
  22. path,
  23. options)
  24. { }
  25. TFileWriter::~TFileWriter()
  26. {
  27. NDetail::FinishOrDie(this, AutoFinish_, "TFileWriter");
  28. }
  29. void TFileWriter::DoWrite(const void* buf, size_t len)
  30. {
  31. // If user tunes RetryBlockSize / DesiredChunkSize he expects
  32. // us to send data exactly by RetryBlockSize. So behaviour of the writer is predictable.
  33. //
  34. // We want to avoid situation when size of sent data slightly exceeded DesiredChunkSize
  35. // and server produced one chunk of desired size and one small chunk.
  36. while (len > 0) {
  37. const auto retryBlockRemainingSize = RetryfulWriter_.GetRetryBlockRemainingSize();
  38. Y_ABORT_UNLESS(retryBlockRemainingSize > 0);
  39. const auto firstWriteLen = Min(len, retryBlockRemainingSize);
  40. RetryfulWriter_.Write(buf, firstWriteLen);
  41. RetryfulWriter_.NotifyRowEnd();
  42. len -= firstWriteLen;
  43. buf = static_cast<const char*>(buf) + firstWriteLen;
  44. }
  45. }
  46. void TFileWriter::DoFinish()
  47. {
  48. RetryfulWriter_.Finish();
  49. }
  50. size_t TFileWriter::GetBufferMemoryUsage() const
  51. {
  52. return RetryfulWriter_.GetBufferMemoryUsage();
  53. }
  54. ////////////////////////////////////////////////////////////////////////////////
  55. } // namespace NYT