node_table_writer.cpp 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. #include "node_table_writer.h"
  2. #include <yt/cpp/mapreduce/common/node_visitor.h>
  3. #include <yt/cpp/mapreduce/interface/io.h>
  4. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  5. #include <library/cpp/yson/writer.h>
  6. namespace NYT {
  7. ////////////////////////////////////////////////////////////////////////////////
  8. TNodeTableWriter::TNodeTableWriter(THolder<IProxyOutput> output, NYson::EYsonFormat format)
  9. : Output_(std::move(output))
  10. {
  11. for (size_t i = 0; i < Output_->GetStreamCount(); ++i) {
  12. Writers_.push_back(
  13. MakeHolder<::NYson::TYsonWriter>(Output_->GetStream(i), format, NYT::NYson::EYsonType::ListFragment));
  14. }
  15. }
  16. TNodeTableWriter::~TNodeTableWriter()
  17. { }
  18. size_t TNodeTableWriter::GetBufferMemoryUsage() const
  19. {
  20. return Output_->GetBufferMemoryUsage();
  21. }
  22. size_t TNodeTableWriter::GetTableCount() const
  23. {
  24. return Output_->GetStreamCount();
  25. }
  26. void TNodeTableWriter::FinishTable(size_t tableIndex) {
  27. Output_->GetStream(tableIndex)->Finish();
  28. }
  29. void TNodeTableWriter::AddRow(const TNode& row, size_t tableIndex)
  30. {
  31. if (row.HasAttributes()) {
  32. ythrow TIOException() << "Row cannot have attributes";
  33. }
  34. static const TNode emptyMap = TNode::CreateMap();
  35. const TNode* outRow = &emptyMap;
  36. if (row.GetType() != TNode::Undefined) {
  37. if (!row.IsMap()) {
  38. ythrow TIOException() << "Row should be a map node";
  39. } else {
  40. outRow = &row;
  41. }
  42. }
  43. auto* writer = Writers_[tableIndex].Get();
  44. writer->OnListItem();
  45. TNodeVisitor visitor(writer);
  46. visitor.Visit(*outRow);
  47. Output_->OnRowFinished(tableIndex);
  48. }
  49. void TNodeTableWriter::AddRow(TNode&& row, size_t tableIndex) {
  50. AddRow(row, tableIndex);
  51. }
  52. void TNodeTableWriter::Abort()
  53. {
  54. Output_->Abort();
  55. }
  56. ////////////////////////////////////////////////////////////////////////////////
  57. } // namespace NYT