|
- #include "yql_execution.h"
- #include "yql_expr_optimize.h"
- #include "yql_opt_proposed_by_data.h"
- #include <yql/essentials/utils/log/log.h>
- #include <yql/essentials/utils/yql_panic.h>
- #include <util/string/builder.h>
- #include <util/string/join.h>
- #include <util/system/env.h>
- #include <util/generic/queue.h>
- namespace NYql {
- namespace {
- const bool RewriteSanityCheck = false;
- class TExecutionTransformer : public TGraphTransformerBase {
- public:
- struct TState : public TThrRefBase {
- TAdaptiveLock Lock;
- struct TItem : public TIntrusiveListItem<TItem> {
- TExprNode* Node = nullptr;
- IDataProvider* DataProvider = nullptr;
- NThreading::TFuture<void> Future;
- };
- using TQueueType = TIntrusiveListWithAutoDelete<TState::TItem, TDelete>;
- TQueueType Completed;
- TQueueType Inflight;
- NThreading::TPromise<void> Promise;
- bool HasResult = false;
- };
- using TStatePtr = TIntrusivePtr<TState>;
- TExecutionTransformer(TTypeAnnotationContext& types,
- TOperationProgressWriter writer,
- bool withFinalize)
- : Types(types)
- , Writer(writer)
- , WithFinalize(withFinalize)
- , DeterministicMode(GetEnv("YQL_DETERMINISTIC_MODE"))
- {
- Rewind();
- }
- TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- if (FinalizingTransformer) {
- YQL_CLOG(INFO, CoreExecution) << "FinalizingTransformer, root #" << input->UniqueId();
- auto status = FinalizingTransformer->Transform(input, output, ctx);
- YQL_CLOG(INFO, CoreExecution) << "FinalizingTransformer done, output #" << output->UniqueId() << ", status: " << status;
- return status;
- }
- YQL_CLOG(INFO, CoreExecution) << "Begin, root #" << input->UniqueId();
- output = input;
- if (RewriteSanityCheck) {
- VisitExpr(input, [&](const TExprNode::TPtr& localInput) {
- if (NewNodes.cend() != NewNodes.find(localInput.Get())) {
- Cerr << "found old node: #" << localInput->UniqueId() << "\n" << input->Dump();
- YQL_ENSURE(false);
- }
- return true;
- });
- }
- auto status = CollectUnusedNodes(*input, ctx);
- YQL_CLOG(INFO, CoreExecution) << "Collect unused nodes for root #" << input->UniqueId() << ", status: " << status;
- if (status != TStatus::Ok) {
- return status;
- }
- status = status.Combine(ExecuteNode(input, output, ctx, 0));
- for (auto node: FreshPendingNodes) {
- if (TExprNode::EState::ExecutionPending == node->GetState()) {
- node->SetState(TExprNode::EState::ConstrComplete);
- }
- }
- FreshPendingNodes.clear();
- if (!ReplaceNewNodes(output, ctx)) {
- return TStatus::Error;
- }
- YQL_CLOG(INFO, CoreExecution) << "Finish, output #" << output->UniqueId() << ", status: " << status;
- if (status != TStatus::Ok || !WithFinalize) {
- return status;
- }
- YQL_CLOG(INFO, CoreExecution) << "Creating finalizing transformer, output #" << output->UniqueId();
- FinalizingTransformer = CreateCompositeFinalizingTransformer(Types);
- return FinalizingTransformer->Transform(input, output, ctx);
- }
- NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
- return FinalizingTransformer ?
- FinalizingTransformer->GetAsyncFuture(input) :
- State->Promise.GetFuture();
- }
- TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- if (FinalizingTransformer) {
- return FinalizingTransformer->ApplyAsyncChanges(input, output, ctx);
- }
- output = input;
- TStatus combinedStatus = TStatus::Ok;
- TState::TQueueType completed;
- auto newPromise = NThreading::NewPromise();
- {
- TGuard<TAdaptiveLock> guard(State->Lock);
- completed.Swap(State->Completed);
- State->Promise.Swap(newPromise);
- State->HasResult = false;
- }
- for (auto& item : completed) {
- auto collectedIt = CollectingNodes.find(item.Node);
- if (collectedIt != CollectingNodes.end()) {
- YQL_CLOG(INFO, CoreExecution) << "Completed async cleanup for node #" << item.Node->UniqueId();
- TExprNode::TPtr callableOutput;
- auto status = item.DataProvider->GetTrackableNodeProcessor().GetCleanupTransformer().ApplyAsyncChanges(item.Node, callableOutput, ctx);
- combinedStatus = combinedStatus.Combine(status);
- CollectingNodes.erase(collectedIt);
- continue;
- }
- YQL_CLOG(INFO, CoreExecution) << "Completed async execution for node #" << item.Node->UniqueId();
- auto asyncIt = AsyncNodes.find(item.Node);
- YQL_ENSURE(asyncIt != AsyncNodes.end());
- TExprNode::TPtr callableOutput;
- auto status = item.DataProvider->GetCallableExecutionTransformer().ApplyAsyncChanges(item.Node, callableOutput, ctx);
- Y_ABORT_UNLESS(callableOutput);
- YQL_ENSURE(status != TStatus::Async);
- combinedStatus = combinedStatus.Combine(status);
- if (status.Level == TStatus::Error) {
- item.Node->SetState(TExprNode::EState::Error);
- } else if (status.Level == TStatus::Repeat) {
- if (callableOutput != item.Node) {
- YQL_CLOG(INFO, CoreExecution) << "Rewrite node #" << item.Node->UniqueId() << " to #" << callableOutput->UniqueId()
- << " in ApplyAsyncChanges()";
- NewNodes[item.Node] = callableOutput;
- combinedStatus = combinedStatus.Combine(TStatus(TStatus::Repeat, true));
- FinishNode(item.DataProvider->GetName(), *item.Node, *callableOutput);
- }
- }
- if (callableOutput == item.Node) {
- YQL_CLOG(INFO, CoreExecution) << "State is " << item.Node->GetState()
- << " after apply async changes for node #" << item.Node->UniqueId();
- }
- if (item.Node->GetState() == TExprNode::EState::ExecutionComplete ||
- item.Node->GetState() == TExprNode::EState::Error)
- {
- FinishNode(item.DataProvider->GetName(), *item.Node, *callableOutput);
- }
- AsyncNodes.erase(asyncIt);
- }
- if (!ReplaceNewNodes(output, ctx)) {
- return TStatus::Error;
- }
- if (!completed.Empty() && combinedStatus.Level == TStatus::Ok) {
- combinedStatus = TStatus::Repeat;
- }
- return combinedStatus;
- }
- bool ReplaceNewNodes(TExprNode::TPtr& output, TExprContext& ctx) {
- if (!NewNodes.empty()) {
- TOptimizeExprSettings settings(&Types);
- settings.VisitChanges = true;
- settings.VisitStarted = true;
- auto replaceStatus = OptimizeExpr(output, output, [&](const TExprNode::TPtr& input, TExprContext& ctx) -> TExprNode::TPtr {
- Y_UNUSED(ctx);
- const auto replace = NewNodes.find(input.Get());
- if (NewNodes.cend() != replace) {
- return replace->second;
- }
- return input;
- }, ctx, settings);
- if (!RewriteSanityCheck) {
- NewNodes.clear();
- }
- if (replaceStatus.Level == TStatus::Error) {
- return false;
- }
- }
- if (RewriteSanityCheck) {
- VisitExpr(output, [&](const TExprNode::TPtr& localInput) {
- if (NewNodes.cend() != NewNodes.find(localInput.Get())) {
- Cerr << "found old node: #" << localInput->UniqueId() << "\n" << output->Dump();
- YQL_ENSURE(false);
- }
- return true;
- });
- }
- return true;
- }
- void Rewind() override {
- State = MakeIntrusive<TState>();
- State->Promise = NThreading::NewPromise();
- State->HasResult = false;
- NewNodes.clear();
- FinalizingTransformer.Reset();
- TrackableNodes.clear();
- CollectingNodes.clear();
- ProvidersCache.clear();
- AsyncNodes.clear();
- }
- TStatus ExecuteNode(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) {
- output = node;
- bool changed = false;
- const auto knownNode = NewNodes.find(node.Get());
- if (NewNodes.cend() != knownNode) {
- output = knownNode->second;
- changed = true;
- }
- switch (output->GetState()) {
- case TExprNode::EState::Initial:
- case TExprNode::EState::TypeInProgress:
- case TExprNode::EState::TypePending:
- case TExprNode::EState::TypeComplete:
- case TExprNode::EState::ConstrInProgress:
- case TExprNode::EState::ConstrPending:
- return TStatus(TStatus::Repeat, true);
- case TExprNode::EState::ExecutionInProgress:
- return TStatus::Async;
- case TExprNode::EState::ExecutionPending:
- return ExecuteChildren(output, output, ctx, depth + 1);
- case TExprNode::EState::ConstrComplete:
- case TExprNode::EState::ExecutionRequired:
- break;
- case TExprNode::EState::ExecutionComplete:
- YQL_ENSURE(output->HasResult());
- OnNodeExecutionComplete(output, ctx);
- return changed ? TStatus(TStatus::Repeat, true) : TStatus(TStatus::Ok);
- case TExprNode::EState::Error:
- return TStatus::Error;
- default:
- YQL_ENSURE(false, "Unknown state");
- }
- switch (output->Type()) {
- case TExprNode::Atom:
- case TExprNode::Argument:
- case TExprNode::Arguments:
- case TExprNode::Lambda:
- ctx.AddError(TIssue(ctx.GetPosition(output->Pos()), TStringBuilder() << "Failed to execute node with type: " << output->Type()));
- output->SetState(TExprNode::EState::Error);
- return TStatus::Error;
- case TExprNode::List:
- case TExprNode::Callable:
- {
- auto prevOutput = output;
- auto status = output->Type() == TExprNode::Callable
- ? ExecuteCallable(output, output, ctx, depth)
- : ExecuteList(output, ctx);
- if (status.Level == TStatus::Error) {
- output->SetState(TExprNode::EState::Error);
- } else if (status.Level == TStatus::Ok) {
- output->SetState(TExprNode::EState::ExecutionComplete);
- OnNodeExecutionComplete(output, ctx);
- YQL_ENSURE(output->HasResult());
- } else if (status.Level == TStatus::Repeat) {
- if (!status.HasRestart) {
- output->SetState(TExprNode::EState::ExecutionPending);
- status = ExecuteChildren(output, output, ctx, depth + 1);
- if (TExprNode::EState::ExecutionPending == output->GetState()) {
- FreshPendingNodes.push_back(output.Get());
- }
- if (status.Level != TStatus::Repeat) {
- return status;
- }
- }
- if (output != prevOutput) {
- YQL_CLOG(INFO, CoreExecution) << "Rewrite node #" << node->UniqueId() << " to #" << output->UniqueId();
- NewNodes[node.Get()] = output;
- }
- return TStatus(TStatus::Repeat, output != prevOutput);
- } else if (status.Level == TStatus::Async) {
- output->SetState(TExprNode::EState::ExecutionInProgress);
- }
- return status;
- }
- case TExprNode::World:
- output->SetState(TExprNode::EState::ExecutionComplete);
- return TStatus::Ok;
- default:
- YQL_ENSURE(false, "Unknown type");
- }
- }
- TStatus ExecuteChildren(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) {
- TStatus combinedStatus = TStatus::Ok;
- TExprNode::TListType newChildren;
- bool newNode = false;
- for (auto& child : node->Children()) {
- auto newChild = child;
- if (child->GetState() == TExprNode::EState::ExecutionRequired) {
- auto childStatus = ExecuteNode(child, newChild, ctx, depth);
- if (childStatus.Level == TStatus::Error)
- return childStatus;
- combinedStatus = combinedStatus.Combine(childStatus);
- } else if (child->GetState() == TExprNode::EState::ExecutionInProgress) {
- combinedStatus = combinedStatus.Combine(TStatus::Async);
- } else if (child->GetState() == TExprNode::EState::ExecutionPending) {
- combinedStatus = combinedStatus.Combine(TStatus::Repeat);
- }
- newChildren.push_back(newChild);
- if (newChild != child) {
- newNode = true;
- }
- }
- if (combinedStatus.Level == TStatus::Ok) {
- Y_DEBUG_ABORT_UNLESS(!newNode);
- node->SetState(TExprNode::EState::ConstrComplete);
- return ExecuteNode(node, output, ctx, depth - 1);
- } else {
- if (combinedStatus.Level == TStatus::Error) {
- node->SetState(TExprNode::EState::Error);
- }
- if (newNode) {
- output = ctx.ChangeChildren(*node, std::move(newChildren));
- }
- return combinedStatus;
- }
- }
- TStatus ExecuteList(const TExprNode::TPtr& node, TExprContext& ctx) {
- IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok;
- for (ui32 i = 0; i < node->ChildrenSize(); ++i) {
- combinedStatus = combinedStatus.Combine(RequireChild(*node, i));
- }
- if (combinedStatus.Level != IGraphTransformer::TStatus::Ok) {
- return combinedStatus;
- }
- node->SetResult(ctx.NewWorld(node->Pos()));
- return TStatus::Ok;
- }
- IDataProvider* GetDataProvider(const TExprNode& node) const {
- IDataProvider* dataProvider = nullptr;
- for (const auto& x : Types.DataSources) {
- if (x->CanExecute(node)) {
- dataProvider = x.Get();
- break;
- }
- }
- if (!dataProvider) {
- for (const auto& x : Types.DataSinks) {
- if (x->CanExecute(node)) {
- dataProvider = x.Get();
- }
- }
- }
- return dataProvider;
- }
- TStatus ExecuteCallable(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) {
- YQL_CLOG(TRACE, CoreExecution) << '{' << depth << "}, callable #"
- << node->UniqueId() << " <" << node->Content() << '>';
- if (node->Content() == CommitName) {
- auto requireStatus = RequireChild(*node, 0);
- if (requireStatus.Level != TStatus::Ok) {
- return requireStatus;
- }
- auto category = node->Child(1)->Child(0)->Content();
- auto datasink = Types.DataSinkMap.FindPtr(category);
- YQL_ENSURE(datasink);
- output = node;
- auto status = (*datasink)->GetCallableExecutionTransformer().Transform(node, output, ctx);
- if (status.Level == TStatus::Async) {
- Y_DEBUG_ABORT_UNLESS(output == node);
- StartNode(category, *node);
- AddCallable(node, (*datasink).Get(), ctx);
- } else {
- if (output->GetState() == TExprNode::EState::ExecutionComplete ||
- output->GetState() == TExprNode::EState::Error)
- {
- StartNode(category, *node);
- FinishNode(category, *node, *output);
- }
- }
- return status;
- }
- if (node->Content() == SyncName) {
- return ExecuteList(node, ctx);
- }
- if (node->Content() == LeftName) {
- auto requireStatus = RequireChild(*node, 0);
- if (requireStatus.Level != TStatus::Ok) {
- return requireStatus;
- }
- node->SetResult(ctx.NewWorld(node->Pos()));
- return TStatus::Ok;
- }
- if (node->Content() == RightName) {
- auto requireStatus = RequireChild(*node, 0);
- if (requireStatus.Level != TStatus::Ok) {
- return requireStatus;
- }
- node->SetResult(ctx.NewWorld(node->Pos()));
- return TStatus::Ok;
- }
- IDataProvider* dataProvider = GetDataProvider(*node);
- if (!dataProvider) {
- ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute callable with name: " << node->Content()));
- return TStatus::Error;
- }
- output = node;
- TStatus status = dataProvider->GetCallableExecutionTransformer().Transform(node, output, ctx);
- if (status.Level == TStatus::Async) {
- Y_DEBUG_ABORT_UNLESS(output == node);
- StartNode(dataProvider->GetName(), *node);
- AddCallable(node, dataProvider, ctx);
- } else {
- if (output->GetState() == TExprNode::EState::ExecutionComplete ||
- output->GetState() == TExprNode::EState::Error)
- {
- StartNode(dataProvider->GetName(), *node);
- FinishNode(dataProvider->GetName(), *node, *output);
- }
- }
- return status;
- }
- void AddCallable(const TExprNode::TPtr& node, IDataProvider* dataProvider, TExprContext& ctx) {
- Y_UNUSED(ctx);
- YQL_CLOG(INFO, CoreExecution) << "Register async execution for node #" << node->UniqueId();
- auto future = dataProvider->GetCallableExecutionTransformer().GetAsyncFuture(*node);
- AsyncNodes[node.Get()] = node;
- SubscribeAsyncFuture(node, dataProvider, future);
- }
- static void ProcessFutureResultQueue(TStatePtr state) {
- NThreading::TPromise<void> promiseToSet;
- bool hasResult = false;
- TGuard<TAdaptiveLock> guard(state->Lock);
- while (!state->Inflight.Empty()) {
- auto* first = state->Inflight.Front();
- if (first->Future.HasValue()) {
- state->Inflight.PopFront();
- state->Completed.PushBack(first);
- hasResult = true;
- } else {
- break;
- }
- }
- guard.Release();
- if (hasResult && !state->HasResult) {
- state->HasResult = true;
- promiseToSet = state->Promise;
- }
- if (promiseToSet.Initialized()) {
- promiseToSet.SetValue();
- }
- }
- static void ProcessAsyncFutureResult(TStatePtr state, TAutoPtr<TState::TItem> item) {
- NThreading::TPromise<void> promiseToSet;
- {
- TGuard<TAdaptiveLock> guard(state->Lock);
- state->Completed.PushBack(item.Release());
- if (!state->HasResult) {
- state->HasResult = true;
- promiseToSet = state->Promise;
- }
- }
- if (promiseToSet.Initialized()) {
- promiseToSet.SetValue();
- }
- }
- void SubscribeAsyncFuture(const TExprNode::TPtr& node, IDataProvider* dataProvider, const NThreading::TFuture<void>& future)
- {
- auto state = State;
- if (DeterministicMode) {
- TAutoPtr<TState::TItem> item = new TState::TItem;
- item->Node = node.Get(); item->DataProvider = dataProvider; item->Future = future;
- TGuard<TAdaptiveLock> guard(state->Lock);
- state->Inflight.PushBack(item.Release());
- }
- if (DeterministicMode) {
- future.Subscribe([state](const NThreading::TFuture<void>& future) {
- HandleFutureException(future);
- ProcessFutureResultQueue(state);
- });
- } else {
- future.Subscribe([state, node=node.Get(), dataProvider](const NThreading::TFuture<void>& future) {
- HandleFutureException(future);
- TAutoPtr<TState::TItem> item = new TState::TItem;
- item->Node = node; item->DataProvider = dataProvider;
- ProcessAsyncFutureResult(state, item.Release());
- });
- }
- }
- void StartNode(TStringBuf category, const TExprNode& node) {
- auto publicId = Types.TranslateOperationId(node.UniqueId());
- if (publicId) {
- auto x = Progresses.insert({ *publicId,
- TOperationProgress(TString(category), *publicId, TOperationProgress::EState::Started) });
- if (x.second) {
- Writer(x.first->second);
- }
- }
- }
- void FinishNode(TStringBuf category, const TExprNode& node, const TExprNode& newNode) {
- auto publicId = Types.TranslateOperationId(node.UniqueId());
- if (publicId) {
- if (newNode.UniqueId() != node.UniqueId()) {
- Types.NodeToOperationId[newNode.UniqueId()] = *publicId;
- }
- auto progIt = Progresses.find(*publicId);
- YQL_ENSURE(progIt != Progresses.end());
- auto newState = (node.GetState() == TExprNode::EState::ExecutionComplete)
- ? TOperationProgress::EState::Finished
- : TOperationProgress::EState::Failed;
- if (progIt->second.State != newState) {
- TString stage = progIt->second.Stage.first;
- progIt->second = TOperationProgress(TString(category), *publicId, newState, stage);
- Writer(progIt->second);
- }
- }
- }
- void OnNodeExecutionComplete(const TExprNode::TPtr& node, TExprContext& ctx) {
- auto nodeId = node->UniqueId();
- YQL_CLOG(INFO, CoreExecution) << "Node #" << nodeId << "<" << node->Content() << "> finished execution";
- auto dataProvider = GetDataProvider(*node);
- if (!dataProvider) {
- return;
- }
- TVector<ITrackableNodeProcessor::TExprNodeAndId> createdNodes;
- dataProvider->GetTrackableNodeProcessor().GetCreatedNodes(*node, createdNodes, ctx);
- TVector<TString> ids;
- for (const auto& c : createdNodes) {
- auto& info = TrackableNodes[c.Id];
- info.Provider = dataProvider;
- info.Node = c.Node;
- ids.push_back(c.Id);
- }
- YQL_CLOG(INFO, CoreExecution) << "Node #" << nodeId << "<" << node->Content() << "> created " << ids.size()
- << " trackable nodes: " << JoinSeq(", ", ids);
- }
- TStatus CollectUnusedNodes(const TExprNode& root, TExprContext& ctx) {
- if (TrackableNodes.empty()) {
- return TStatus::Ok;
- }
- YQL_CLOG(TRACE, CoreExecution) << "Collecting unused nodes on root #" << root.UniqueId();
- THashSet<ui64> visited;
- THashSet<TString> usedIds;
- VisitExpr(root, [&](const TExprNode& node) {
- if (node.GetState() == TExprNode::EState::ExecutionComplete) {
- return false;
- }
- auto nodeId = node.UniqueId();
- visited.insert(nodeId);
- TIntrusivePtr<IDataProvider> dataProvider;
- auto providerIt = ProvidersCache.find(nodeId);
- if (providerIt != ProvidersCache.end()) {
- YQL_ENSURE(providerIt->second);
- dataProvider = providerIt->second;
- } else {
- dataProvider = GetDataProvider(node);
- if (dataProvider) {
- ProvidersCache[nodeId] = dataProvider;
- }
- }
- if (dataProvider) {
- TVector<TString> usedNodes;
- dataProvider->GetTrackableNodeProcessor().GetUsedNodes(node, usedNodes);
- usedIds.insert(usedNodes.begin(), usedNodes.end());
- }
- return true;
- });
- for (auto i = ProvidersCache.begin(); i != ProvidersCache.end();) {
- if (visited.count(i->first) == 0) {
- ProvidersCache.erase(i++);
- } else {
- ++i;
- }
- }
- THashMap<TIntrusivePtr<IDataProvider>, TExprNode::TListType> toCollect;
- for (auto i = TrackableNodes.begin(); i != TrackableNodes.end();) {
- TString id = i->first;
- TTrackableNodeInfo info = i->second;
- if (!usedIds.contains(id)) {
- YQL_ENSURE(info.Node);
- YQL_ENSURE(info.Provider);
- YQL_CLOG(INFO, CoreExecution) << "Marking node " << id << " for collection";
- toCollect[info.Provider].push_back(info.Node);
- TrackableNodes.erase(i++);
- } else {
- ++i;
- }
- }
- TStatus collectStatus = TStatus::Ok;
- for (auto& i : toCollect) {
- const auto& provider = i.first;
- YQL_ENSURE(!i.second.empty());
- auto pos = i.second.front()->Pos();
- TExprNode::TPtr collectNode = ctx.NewList(pos, std::move(i.second));
- TExprNode::TPtr output;
- TStatus status = provider->GetTrackableNodeProcessor().GetCleanupTransformer().Transform(collectNode, output, ctx);
- YQL_ENSURE(status != TStatus::Repeat);
- collectStatus = collectStatus.Combine(status);
- if (status == TStatus::Error) {
- break;
- }
- if (status == TStatus::Async) {
- CollectingNodes[collectNode.Get()] = collectNode;
- auto future = provider->GetTrackableNodeProcessor().GetCleanupTransformer().GetAsyncFuture(*collectNode);
- SubscribeAsyncFuture(collectNode, provider.Get(), future);
- }
- }
- return collectStatus;
- }
- private:
- TTypeAnnotationContext& Types;
- TOperationProgressWriter Writer;
- const bool WithFinalize;
- TStatePtr State;
- TNodeOnNodeOwnedMap NewNodes;
- TAutoPtr<IGraphTransformer> FinalizingTransformer;
- THashMap<ui32, TOperationProgress> Progresses;
- struct TTrackableNodeInfo
- {
- TIntrusivePtr<IDataProvider> Provider;
- TExprNode::TPtr Node;
- };
- THashMap<TString, TTrackableNodeInfo> TrackableNodes;
- TNodeOnNodeOwnedMap CollectingNodes;
- THashMap<ui64, TIntrusivePtr<IDataProvider>> ProvidersCache;
- TExprNode::TListType FreshPendingNodes;
- bool DeterministicMode;
- TNodeOnNodeOwnedMap AsyncNodes;
- };
- IGraphTransformer::TStatus ValidateExecution(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited);
- IGraphTransformer::TStatus ValidateList(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited) {
- IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok;
- for (ui32 i = 0; i < node->ChildrenSize(); ++i) {
- combinedStatus = combinedStatus.Combine(ValidateExecution(node->ChildPtr(i), ctx, types, visited));
- }
- return combinedStatus;
- }
- IGraphTransformer::TStatus ValidateCallable(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited) {
- using TStatus = IGraphTransformer::TStatus;
- if (node->Content() == CommitName) {
- auto datasink = types.DataSinkMap.FindPtr(node->Child(1)->Child(0)->Content());
- if (!datasink) {
- ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Unknown datasink: " << node->Child(1)->Child(0)->Content()));
- return TStatus::Error;
- }
- return ValidateExecution(node->ChildPtr(0), ctx, types, visited);
- }
- if (node->Content() == SyncName) {
- return ValidateList(node, ctx, types, visited);
- }
- if (node->Content() == LeftName) {
- return ValidateExecution(node->ChildPtr(0), ctx, types, visited);
- }
- if (node->Content() == RightName) {
- return ValidateExecution(node->ChildPtr(0), ctx, types, visited);
- }
- IDataProvider* dataProvider = nullptr;
- for (auto& x : types.DataSources) {
- if (x->CanExecute(*node)) {
- dataProvider = x.Get();
- break;
- }
- }
- if (!dataProvider) {
- for (auto& x : types.DataSinks) {
- if (x->CanExecute(*node)) {
- dataProvider = x.Get();
- break;
- }
- }
- }
- if (!dataProvider) {
- ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute callable with name: " << node->Content()
- << ", you possibly used cross provider/cluster operations or pulled not materialized result in refselect mode"));
- return TStatus::Error;
- }
- if (node->ChildrenSize() < 2) {
- ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Executable callable should have at least 2 children"));
- return TStatus::Error;
- }
- if (!dataProvider->ValidateExecution(*node, ctx)) {
- return TStatus::Error;
- }
- TExprNode::TListType childrenToCheck;
- dataProvider->GetRequiredChildren(*node, childrenToCheck);
- IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok;
- for (ui32 i = 0; i < childrenToCheck.size(); ++i) {
- combinedStatus = combinedStatus.Combine(ValidateExecution(childrenToCheck[i], ctx, types, visited));
- }
- return combinedStatus;
- }
- IGraphTransformer::TStatus ValidateExecution(const TExprNode::TPtr& node, TExprContext& ctx,
- const TTypeAnnotationContext& types, TNodeSet& visited) {
- using TStatus = IGraphTransformer::TStatus;
- if (node->GetState() == TExprNode::EState::ExecutionComplete) {
- return TStatus::Ok;
- }
- if (!node->GetTypeAnn()) {
- ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Node has no type annotation"));
- return TStatus::Error;
- }
- TStatus status = TStatus::Ok;
- switch (node->Type()) {
- case TExprNode::Atom:
- case TExprNode::Argument:
- case TExprNode::Arguments:
- case TExprNode::Lambda:
- ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute node with type: " << node->Type()));
- return TStatus::Error;
- case TExprNode::List:
- return ValidateList(node, ctx, types, visited);
- case TExprNode::Callable:
- if (visited.cend() != visited.find(node.Get())) {
- return TStatus::Ok;
- }
- status = ValidateCallable(node, ctx, types, visited);
- if (status.Level == TStatus::Ok) {
- visited.insert(node.Get());
- }
- break;
- case TExprNode::World:
- break;
- default:
- YQL_ENSURE(false, "Unknown type");
- }
- return status;
- }
- }
- TAutoPtr<IGraphTransformer> CreateExecutionTransformer(
- TTypeAnnotationContext& types,
- TOperationProgressWriter writer,
- bool withFinalize) {
- return new TExecutionTransformer(types, writer, withFinalize);
- }
- TAutoPtr<IGraphTransformer> CreateCheckExecutionTransformer(const TTypeAnnotationContext& types, bool checkWorld) {
- return CreateFunctorTransformer([&types, checkWorld](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx)
- -> IGraphTransformer::TStatus {
- output = input;
- if (checkWorld) {
- TNodeSet visited;
- auto status = ValidateExecution(input, ctx, types, visited);
- if (status.Level != IGraphTransformer::TStatus::Ok) {
- return status;
- }
- }
- TParentsMap parentsMap;
- THashSet<TExprNode*> overWinNodes;
- GatherParents(*input, parentsMap);
- bool hasErrors = false;
- THashSet<TIssue> added;
- auto funcCheckExecution = [&](const THashSet<TStringBuf>& notAllowList, bool collectCalcOverWindow, const TExprNode::TPtr& node) {
- if (node->IsCallable("ErrorType")) {
- hasErrors = true;
- const auto err = node->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TErrorExprType>()->GetError();
- if (added.insert(err).second) {
- ctx.AddError(err);
- }
- } else if (node->IsCallable("TablePath") || node->IsCallable("TableRecord")) {
- auto issue = TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << node->Content() << " will be empty");
- SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_FREE_TABLE_PATH_RECORD, issue);
- if (!ctx.AddWarning(issue)) {
- hasErrors = true;
- }
- } else if (node->IsCallable("UnsafeTimestampCast")) {
- auto issue = TIssue(ctx.GetPosition(node->Pos()), "Unsafe conversion integral value to Timestamp, consider using date types");
- SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_CAST_INTEGRAL_TO_TIMESTAMP_UNSAFE, issue);
- if (!ctx.AddWarning(issue)) {
- hasErrors = true;
- }
- } else if (collectCalcOverWindow && node->IsCallable({"CalcOverWindow", "CalcOverSessionWindow", "CalcOverWindowGroup"})) {
- overWinNodes.emplace(node.Get());
- return false;
- } else if (node->IsCallable(notAllowList)) {
- hasErrors = true;
- const auto err = TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Can't execute " << node->Content());
- if (added.insert(err).second) {
- ctx.AddError(err);
- }
- } else if (node->Type() != TExprNode::Lambda &&
- (node->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream || node->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Flow)) {
- auto parentsIt = parentsMap.find(node.Get());
- if (parentsIt != parentsMap.end()) {
- ui32 usageCount = 0;
- for (auto& x : parentsIt->second) {
- if (x->IsCallable("DependsOn")) {
- continue;
- }
- for (auto& y : x->Children()) {
- if (y.Get() == node.Get()) {
- ++usageCount;
- }
- }
- }
- if (usageCount > 1 && !node->GetConstraint<TEmptyConstraintNode>()) {
- hasErrors = true;
- const auto err = TIssue(ctx.GetPosition(node->Pos()), "Multiple stream clients");
- if (added.insert(err).second) {
- ctx.AddError(err);
- }
- }
- }
- }
- return true;
- };
- static const THashSet<TStringBuf> noExecutionList = {"InstanceOf", "Lag", "Lead", "RowNumber", "Rank", "DenseRank", "PercentRank", "CumeDist", "NTile"};
- static const THashSet<TStringBuf> noExecutionListForCalcOverWindow = {"InstanceOf"};
- VisitExpr(input, [funcCheckExecution](const TExprNode::TPtr& node) {
- bool collectCalcOverWindow = true;
- return funcCheckExecution(noExecutionList, collectCalcOverWindow, node);
- });
- for (auto overWin: overWinNodes) {
- VisitExpr(overWin, [funcCheckExecution](const TExprNode::TPtr& node) {
- bool collectCalcOverWindow = false;
- return funcCheckExecution(noExecutionListForCalcOverWindow, collectCalcOverWindow, node);
- });
- }
- return hasErrors ? IGraphTransformer::TStatus::Error : IGraphTransformer::TStatus::Ok;
- });
- };
- IGraphTransformer::TStatus RequireChild(const TExprNode& node, ui32 index) {
- switch (node.Child(index)->GetState()) {
- case TExprNode::EState::Error:
- case TExprNode::EState::ExecutionComplete:
- return IGraphTransformer::TStatus::Ok;
- case TExprNode::EState::ExecutionInProgress:
- case TExprNode::EState::ExecutionPending:
- return IGraphTransformer::TStatus::Repeat;
- default:
- break;
- }
- node.Child(index)->SetState(TExprNode::EState::ExecutionRequired);
- return IGraphTransformer::TStatus::Repeat;
- }
- }
- template<>
- void Out<NYql::TOperationProgress::EState>(class IOutputStream &o, NYql::TOperationProgress::EState x) {
- #define YQL_OPERATION_PROGRESS_STATE_MAP_TO_STRING_IMPL(name, ...) \
- case NYql::TOperationProgress::EState::name: \
- o << #name; \
- return;
- switch (x) {
- YQL_OPERATION_PROGRESS_STATE_MAP(YQL_OPERATION_PROGRESS_STATE_MAP_TO_STRING_IMPL)
- default:
- o << static_cast<int>(x);
- return;
- }
- }
|