job_writer.cpp 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. #include "job_writer.h"
  2. #include <yt/cpp/mapreduce/interface/io.h>
  3. #include <util/system/file.h>
  4. namespace NYT {
  5. namespace NDetail {
  6. ////////////////////////////////////////////////////////////////////////////////
  7. TJobWriterStream::TJobWriterStream(int fd)
  8. : TJobWriterStream(Duplicate(fd))
  9. { }
  10. TJobWriterStream::TJobWriterStream(const TFile& file)
  11. : FDFile(file)
  12. , FDOutput(FDFile)
  13. , BufferedOutput(&FDOutput, BufferSize)
  14. { }
  15. ////////////////////////////////////////////////////////////////////////////////
  16. } // namespace NDetail
  17. ////////////////////////////////////////////////////////////////////////////////
  18. TJobWriter::TJobWriter(size_t outputTableCount)
  19. {
  20. for (size_t i = 0; i < outputTableCount; ++i) {
  21. Streams_.emplace_back(std::make_unique<NDetail::TJobWriterStream>(static_cast<int>(i * 3 + 1)));
  22. }
  23. }
  24. TJobWriter::TJobWriter(const TVector<TFile>& fileList)
  25. {
  26. for (const auto& f : fileList) {
  27. Streams_.emplace_back(std::make_unique<NDetail::TJobWriterStream>(f));
  28. }
  29. }
  30. size_t TJobWriter::GetStreamCount() const
  31. {
  32. return Streams_.size();
  33. }
  34. IOutputStream* TJobWriter::GetStream(size_t tableIndex) const
  35. {
  36. if (tableIndex >= Streams_.size()) {
  37. ythrow TIOException() <<
  38. "Table index " << tableIndex <<
  39. " is out of range [0, " << Streams_.size() << ")";
  40. }
  41. return &Streams_[tableIndex]->BufferedOutput;
  42. }
  43. void TJobWriter::OnRowFinished(size_t)
  44. { }
  45. size_t TJobWriter::GetBufferMemoryUsage() const
  46. {
  47. return NDetail::TJobWriterStream::BufferSize * GetStreamCount();
  48. }
  49. ////////////////////////////////////////////////////////////////////////////////
  50. THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount)
  51. {
  52. return ::MakeHolder<TJobWriter>(outputTableCount);
  53. }
  54. ////////////////////////////////////////////////////////////////////////////////
  55. TSingleStreamJobWriter::TSingleStreamJobWriter(size_t tableIndex)
  56. : TableIndex_(tableIndex)
  57. , Stream_(std::make_unique<NDetail::TJobWriterStream>(static_cast<int>(tableIndex * 3 + 1)))
  58. { }
  59. size_t TSingleStreamJobWriter::GetStreamCount() const
  60. {
  61. return 1;
  62. }
  63. IOutputStream* TSingleStreamJobWriter::GetStream(size_t tableIndex) const
  64. {
  65. if (tableIndex != TableIndex_) {
  66. ythrow TIOException() <<
  67. "Table index " << tableIndex <<
  68. " does not match this SignleTableJobWriter with index " << TableIndex_;
  69. }
  70. return &Stream_->BufferedOutput;
  71. }
  72. void TSingleStreamJobWriter::OnRowFinished(size_t)
  73. { }
  74. size_t TSingleStreamJobWriter::GetBufferMemoryUsage() const
  75. {
  76. return NDetail::TJobWriterStream::BufferSize * GetStreamCount();
  77. }
  78. ////////////////////////////////////////////////////////////////////////////////
  79. } // namespace NYT