job_writer.cpp 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. #include "job_writer.h"
  2. #include <yt/cpp/mapreduce/interface/helpers.h>
  3. #include <yt/cpp/mapreduce/interface/io.h>
  4. #include <util/system/file.h>
  5. namespace NYT {
  6. namespace NDetail {
  7. ////////////////////////////////////////////////////////////////////////////////
  8. TJobWriterStream::TJobWriterStream(int fd)
  9. : TJobWriterStream(Duplicate(fd))
  10. { }
  11. TJobWriterStream::TJobWriterStream(const TFile& file)
  12. : FDFile(file)
  13. , FDOutput(FDFile)
  14. , BufferedOutput(&FDOutput, BufferSize)
  15. { }
  16. ////////////////////////////////////////////////////////////////////////////////
  17. } // namespace NDetail
  18. ////////////////////////////////////////////////////////////////////////////////
  19. TJobWriter::TJobWriter(size_t outputTableCount)
  20. {
  21. int firstOutputTableFD = GetJobFirstOutputTableFD();
  22. for (size_t i = 0; i < outputTableCount; ++i) {
  23. int fd = static_cast<int>(i * 3 + firstOutputTableFD);
  24. Streams_.emplace_back(std::make_unique<NDetail::TJobWriterStream>(fd));
  25. }
  26. }
  27. TJobWriter::TJobWriter(const TVector<TFile>& fileList)
  28. {
  29. for (const auto& f : fileList) {
  30. Streams_.emplace_back(std::make_unique<NDetail::TJobWriterStream>(f));
  31. }
  32. }
  33. size_t TJobWriter::GetStreamCount() const
  34. {
  35. return Streams_.size();
  36. }
  37. IOutputStream* TJobWriter::GetStream(size_t tableIndex) const
  38. {
  39. if (tableIndex >= Streams_.size()) {
  40. ythrow TIOException() <<
  41. "Table index " << tableIndex <<
  42. " is out of range [0, " << Streams_.size() << ")";
  43. }
  44. return &Streams_[tableIndex]->BufferedOutput;
  45. }
  46. void TJobWriter::OnRowFinished(size_t)
  47. { }
  48. size_t TJobWriter::GetBufferMemoryUsage() const
  49. {
  50. return NDetail::TJobWriterStream::BufferSize * GetStreamCount();
  51. }
  52. ////////////////////////////////////////////////////////////////////////////////
  53. THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount)
  54. {
  55. return ::MakeHolder<TJobWriter>(outputTableCount);
  56. }
  57. ////////////////////////////////////////////////////////////////////////////////
  58. TSingleStreamJobWriter::TSingleStreamJobWriter(size_t tableIndex)
  59. : TableIndex_(tableIndex)
  60. , Stream_(std::make_unique<NDetail::TJobWriterStream>(static_cast<int>(tableIndex * 3 + GetJobFirstOutputTableFD())))
  61. { }
  62. size_t TSingleStreamJobWriter::GetStreamCount() const
  63. {
  64. return 1;
  65. }
  66. IOutputStream* TSingleStreamJobWriter::GetStream(size_t tableIndex) const
  67. {
  68. if (tableIndex != TableIndex_) {
  69. ythrow TIOException() <<
  70. "Table index " << tableIndex <<
  71. " does not match this SignleTableJobWriter with index " << TableIndex_;
  72. }
  73. return &Stream_->BufferedOutput;
  74. }
  75. void TSingleStreamJobWriter::OnRowFinished(size_t)
  76. { }
  77. size_t TSingleStreamJobWriter::GetBufferMemoryUsage() const
  78. {
  79. return NDetail::TJobWriterStream::BufferSize * GetStreamCount();
  80. }
  81. ////////////////////////////////////////////////////////////////////////////////
  82. } // namespace NYT