123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- #include "progress_merger.h"
- namespace NYT::NYqlPlugin {
- //////////////////////////////////////////////////////////////////////////////
- TNodeProgress::TNodeProgress(const NYql::TOperationProgress& p)
- : TNodeProgressBase(p)
- {}
- void TNodeProgress::Serialize(::NYson::TYsonWriter& writer) const
- {
- writer.OnBeginMap();
- {
- writer.OnKeyedItem("category");
- writer.OnStringScalar(Progress_.Category);
- writer.OnKeyedItem("state");
- writer.OnStringScalar(ToString(Progress_.State));
- writer.OnKeyedItem("remoteId");
- writer.OnStringScalar(Progress_.RemoteId);
- writer.OnKeyedItem("stages");
- writer.OnBeginMap();
- for (size_t index = 0; index < Stages_.size(); index++) {
- writer.OnKeyedItem(ToString(index));
- writer.OnBeginMap();
- {
- writer.OnKeyedItem(Stages_[index].first);
- writer.OnStringScalar(Stages_[index].second.ToString());
- }
- writer.OnEndMap();
- }
- writer.OnEndMap();
- if (Progress_.Counters) {
- writer.OnKeyedItem("completed");
- writer.OnUint64Scalar(Progress_.Counters->Completed);
- writer.OnKeyedItem("running");
- writer.OnUint64Scalar(Progress_.Counters->Running);
- writer.OnKeyedItem("total");
- writer.OnUint64Scalar(Progress_.Counters->Total);
- writer.OnKeyedItem("aborted");
- writer.OnUint64Scalar(Progress_.Counters->Aborted);
- writer.OnKeyedItem("failed");
- writer.OnUint64Scalar(Progress_.Counters->Failed);
- writer.OnKeyedItem("lost");
- writer.OnUint64Scalar(Progress_.Counters->Lost);
- writer.OnKeyedItem("pending");
- writer.OnUint64Scalar(Progress_.Counters->Pending);
- }
- writer.OnKeyedItem("startedAt");
- writer.OnStringScalar(StartedAt_.ToString());
- if (FinishedAt_ != TInstant::Max()) {
- writer.OnKeyedItem("finishedAt");
- writer.OnStringScalar(FinishedAt_.ToString());
- }
- }
- writer.OnEndMap();
- }
- //////////////////////////////////////////////////////////////////////////////
- void TProgressMerger::MergeWith(const NYql::TOperationProgress& progress)
- {
- auto in = NodesMap_.emplace(progress.Id, progress);
- if (!in.second) {
- in.first->second.MergeWith(progress);
- }
- HasChanges_ = true;
- }
- void TProgressMerger::AbortAllUnfinishedNodes()
- {
- for (auto& node: NodesMap_) {
- if (node.second.IsUnfinished()) {
- node.second.Abort();
- HasChanges_ = true;
- }
- }
- }
- TString TProgressMerger::ToYsonString()
- {
- TStringStream yson;
- ::NYson::TYsonWriter writer(&yson);
- writer.OnBeginMap();
- for (auto& node: NodesMap_) {
- writer.OnKeyedItem(ToString(node.first));
- node.second.Serialize(writer);
- }
- writer.OnEndMap();
- HasChanges_ = false;
- return yson.Str();
- }
- bool TProgressMerger::HasChangesSinceLastFlush() const
- {
- return HasChanges_;
- }
- //////////////////////////////////////////////////////////////////////////////
- } // namespace NYT::NYqlPlugin
|