yamr_table_writer.cpp 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. #include "yamr_table_writer.h"
  2. #include <yt/cpp/mapreduce/interface/io.h>
  3. namespace NYT {
  4. ////////////////////////////////////////////////////////////////////////////////
  5. TYaMRTableWriter::TYaMRTableWriter(THolder<IProxyOutput> output)
  6. : Output_(std::move(output))
  7. { }
  8. TYaMRTableWriter::~TYaMRTableWriter()
  9. { }
  10. size_t TYaMRTableWriter::GetBufferMemoryUsage() const
  11. {
  12. return Output_->GetBufferMemoryUsage();
  13. }
  14. size_t TYaMRTableWriter::GetTableCount() const
  15. {
  16. return Output_->GetStreamCount();
  17. }
  18. void TYaMRTableWriter::FinishTable(size_t tableIndex) {
  19. Output_->GetStream(tableIndex)->Finish();
  20. }
  21. void TYaMRTableWriter::AddRow(const TYaMRRow& row, size_t tableIndex)
  22. {
  23. auto* stream = Output_->GetStream(tableIndex);
  24. auto writeField = [&stream] (const TStringBuf& field) {
  25. i32 length = static_cast<i32>(field.length());
  26. stream->Write(&length, sizeof(length));
  27. stream->Write(field.data(), field.length());
  28. };
  29. writeField(row.Key);
  30. writeField(row.SubKey);
  31. writeField(row.Value);
  32. Output_->OnRowFinished(tableIndex);
  33. }
  34. void TYaMRTableWriter::AddRow(TYaMRRow&& row, size_t tableIndex) {
  35. TYaMRTableWriter::AddRow(row, tableIndex);
  36. }
  37. void TYaMRTableWriter::Abort()
  38. {
  39. Output_->Abort();
  40. }
  41. ////////////////////////////////////////////////////////////////////////////////
  42. } // namespace NYT