job_writer.cpp 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. #include "job_writer.h"
  2. #include <yt/cpp/mapreduce/interface/io.h>
  3. #include <util/system/file.h>
  4. namespace NYT {
  5. namespace NDetail {
  6. ////////////////////////////////////////////////////////////////////////////////
  7. TJobWriterStream::TJobWriterStream(int fd)
  8. : TJobWriterStream(Duplicate(fd))
  9. { }
  10. TJobWriterStream::TJobWriterStream(const TFile& file)
  11. : FdFile(file)
  12. , FdOutput(FdFile)
  13. , BufferedOutput(&FdOutput, BufferSize)
  14. { }
  15. TJobWriterStream::~TJobWriterStream()
  16. { }
  17. ////////////////////////////////////////////////////////////////////////////////
  18. } // namespace NDetail
  19. ////////////////////////////////////////////////////////////////////////////////
  20. TJobWriter::TJobWriter(size_t outputTableCount)
  21. {
  22. for (size_t i = 0; i < outputTableCount; ++i) {
  23. Streams_.emplace_back(MakeHolder<NDetail::TJobWriterStream>(int(i * 3 + 1)));
  24. }
  25. }
  26. TJobWriter::TJobWriter(const TVector<TFile>& fileList)
  27. {
  28. for (const auto& f : fileList) {
  29. Streams_.emplace_back(MakeHolder<NDetail::TJobWriterStream>(f));
  30. }
  31. }
  32. size_t TJobWriter::GetStreamCount() const
  33. {
  34. return Streams_.size();
  35. }
  36. IOutputStream* TJobWriter::GetStream(size_t tableIndex) const
  37. {
  38. if (tableIndex >= Streams_.size()) {
  39. ythrow TIOException() <<
  40. "Table index " << tableIndex <<
  41. " is out of range [0, " << Streams_.size() << ")";
  42. }
  43. return &Streams_[tableIndex]->BufferedOutput;
  44. }
  45. void TJobWriter::OnRowFinished(size_t)
  46. { }
  47. size_t TJobWriter::GetBufferMemoryUsage() const
  48. {
  49. return NDetail::TJobWriterStream::BufferSize * GetStreamCount();
  50. }
  51. ////////////////////////////////////////////////////////////////////////////////
  52. THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount)
  53. {
  54. return ::MakeHolder<TJobWriter>(outputTableCount);
  55. }
  56. ////////////////////////////////////////////////////////////////////////////////
  57. TSingleStreamJobWriter::TSingleStreamJobWriter(size_t tableIndex)
  58. : TableIndex_(tableIndex)
  59. , Stream_(MakeHolder<NDetail::TJobWriterStream>(int(tableIndex * 3 + 1)))
  60. { }
  61. size_t TSingleStreamJobWriter::GetStreamCount() const
  62. {
  63. return 1;
  64. }
  65. IOutputStream* TSingleStreamJobWriter::GetStream(size_t tableIndex) const
  66. {
  67. if (tableIndex != TableIndex_) {
  68. ythrow TIOException() <<
  69. "Table index " << tableIndex <<
  70. " does not match this SignleTableJobWriter with index " << TableIndex_;
  71. }
  72. return &Stream_->BufferedOutput;
  73. }
  74. void TSingleStreamJobWriter::OnRowFinished(size_t)
  75. { }
  76. size_t TSingleStreamJobWriter::GetBufferMemoryUsage() const
  77. {
  78. return NDetail::TJobWriterStream::BufferSize * GetStreamCount();
  79. }
  80. ////////////////////////////////////////////////////////////////////////////////
  81. } // namespace NYT