progress_merger.cpp 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. #include "progress_merger.h"
  2. namespace NYT::NYqlPlugin {
  3. //////////////////////////////////////////////////////////////////////////////
  4. TNodeProgress::TNodeProgress(const NYql::TOperationProgress& p)
  5. : TNodeProgressBase(p)
  6. {}
  7. void TNodeProgress::Serialize(::NYson::TYsonWriter& writer) const
  8. {
  9. writer.OnBeginMap();
  10. {
  11. writer.OnKeyedItem("category");
  12. writer.OnStringScalar(Progress_.Category);
  13. writer.OnKeyedItem("state");
  14. writer.OnStringScalar(ToString(Progress_.State));
  15. writer.OnKeyedItem("remoteId");
  16. writer.OnStringScalar(Progress_.RemoteId);
  17. writer.OnKeyedItem("stages");
  18. writer.OnBeginMap();
  19. for (size_t index = 0; index < Stages_.size(); index++) {
  20. writer.OnKeyedItem(ToString(index));
  21. writer.OnBeginMap();
  22. {
  23. writer.OnKeyedItem(Stages_[index].first);
  24. writer.OnStringScalar(Stages_[index].second.ToString());
  25. }
  26. writer.OnEndMap();
  27. }
  28. writer.OnEndMap();
  29. if (Progress_.Counters) {
  30. writer.OnKeyedItem("completed");
  31. writer.OnUint64Scalar(Progress_.Counters->Completed);
  32. writer.OnKeyedItem("running");
  33. writer.OnUint64Scalar(Progress_.Counters->Running);
  34. writer.OnKeyedItem("total");
  35. writer.OnUint64Scalar(Progress_.Counters->Total);
  36. writer.OnKeyedItem("aborted");
  37. writer.OnUint64Scalar(Progress_.Counters->Aborted);
  38. writer.OnKeyedItem("failed");
  39. writer.OnUint64Scalar(Progress_.Counters->Failed);
  40. writer.OnKeyedItem("lost");
  41. writer.OnUint64Scalar(Progress_.Counters->Lost);
  42. writer.OnKeyedItem("pending");
  43. writer.OnUint64Scalar(Progress_.Counters->Pending);
  44. }
  45. writer.OnKeyedItem("startedAt");
  46. writer.OnStringScalar(StartedAt_.ToString());
  47. if (FinishedAt_ != TInstant::Max()) {
  48. writer.OnKeyedItem("finishedAt");
  49. writer.OnStringScalar(FinishedAt_.ToString());
  50. }
  51. }
  52. writer.OnEndMap();
  53. }
  54. //////////////////////////////////////////////////////////////////////////////
  55. void TProgressMerger::MergeWith(const NYql::TOperationProgress& progress)
  56. {
  57. auto in = NodesMap_.emplace(progress.Id, progress);
  58. if (!in.second) {
  59. in.first->second.MergeWith(progress);
  60. }
  61. HasChanges_ = true;
  62. }
  63. void TProgressMerger::AbortAllUnfinishedNodes()
  64. {
  65. for (auto& node: NodesMap_) {
  66. if (node.second.IsUnfinished()) {
  67. node.second.Abort();
  68. HasChanges_ = true;
  69. }
  70. }
  71. }
  72. TString TProgressMerger::ToYsonString()
  73. {
  74. TStringStream yson;
  75. ::NYson::TYsonWriter writer(&yson);
  76. writer.OnBeginMap();
  77. for (auto& node: NodesMap_) {
  78. writer.OnKeyedItem(ToString(node.first));
  79. node.second.Serialize(writer);
  80. }
  81. writer.OnEndMap();
  82. HasChanges_ = false;
  83. return yson.Str();
  84. }
  85. bool TProgressMerger::HasChangesSinceLastFlush() const
  86. {
  87. return HasChanges_;
  88. }
  89. //////////////////////////////////////////////////////////////////////////////
  90. } // namespace NYT::NYqlPlugin