123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- #include "yql_graph_transformer.h"
- #include <yql/essentials/ast/yql_expr.h>
- #include <yql/essentials/utils/yql_panic.h>
- #include <yql/essentials/public/issue/yql_issue_manager.h>
- namespace NYql {
- namespace {
- class TSharedTransformerProxy : public IGraphTransformer {
- public:
- TSharedTransformerProxy(const std::shared_ptr<IGraphTransformer>& inner)
- : Inner_(inner)
- {}
- TStatus Transform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
- return Inner_->Transform(input, output, ctx);
- }
- NThreading::TFuture<void> GetAsyncFuture(const TExprNode& input) {
- return Inner_->GetAsyncFuture(input);
- }
- TStatus ApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
- return Inner_->ApplyAsyncChanges(input, output, ctx);
- }
- void Rewind() final {
- return Inner_->Rewind();
- }
- TStatistics GetStatistics() const final {
- return Inner_->GetStatistics();
- }
- private:
- const std::shared_ptr<IGraphTransformer> Inner_;
- };
- class TCompositeGraphTransformer : public TGraphTransformerBase {
- public:
- TCompositeGraphTransformer(const TVector<TTransformStage>& stages, bool useIssueScopes, bool doCheckArguments)
- : Stages_(stages)
- , UseIssueScopes_(useIssueScopes)
- , DoCheckArguments_(doCheckArguments)
- {
- if (UseIssueScopes_) {
- for (const auto& stage : Stages_) {
- YQL_ENSURE(!stage.Name.empty());
- }
- }
- }
- void Rewind() override {
- for (auto& stage : Stages_) {
- stage.GetTransformer().Rewind();
- }
- Index_ = 0;
- CheckArgumentsCount_ = 0;
- }
- TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
- //#define TRACE_NODES
- #ifdef TRACE_NODES
- static ui64 TransformsCount = 0;
- ++TransformsCount;
- if ((TransformsCount % 100) == 0) {
- Cout << "\r#transforms: " << TransformsCount << ", #nodes: " << ctx.NextUniqueId;
- }
- #endif
- if (Index_ >= Stages_.size()) {
- return TStatus::Ok;
- }
- auto status = WithScope(ctx, [&]() {
- return Stages_[Index_].GetTransformer().Transform(input, output, ctx);
- });
- #ifndef NDEBUG
- if (DoCheckArguments_ && output && output != input) {
- try {
- CheckArguments(*output);
- ++CheckArgumentsCount_;
- } catch (yexception& e) {
- e << "at CheckArguments() pass #" << CheckArgumentsCount_
- << ", stage '" << Stages_[Index_].Name << "'";
- throw;
- }
- }
- #else
- Y_UNUSED(DoCheckArguments_);
- Y_UNUSED(CheckArgumentsCount_);
- #endif
- status = HandleStatus(status);
- return status;
- }
- NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) override {
- YQL_ENSURE(Index_ < Stages_.size());
- return Stages_[Index_].GetTransformer().GetAsyncFuture(input);
- }
- TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
- YQL_ENSURE(Index_ < Stages_.size());
- auto status = WithScope(ctx, [&]() {
- return Stages_[Index_].GetTransformer().ApplyAsyncChanges(input, output, ctx);
- });
- status = HandleStatus(status);
- return status;
- }
- TStatistics GetStatistics() const final {
- if (Statistics_.Stages.empty()) {
- Statistics_.Stages.resize(Stages_.size());
- }
- YQL_ENSURE(Stages_.size() == Statistics_.Stages.size());
- for (size_t i = 0; i < Stages_.size(); ++i) {
- auto& stagePair = Statistics_.Stages[i];
- stagePair.first = Stages_[i].Name;
- stagePair.second = Stages_[i].GetTransformer().GetStatistics();
- }
- return Statistics_;
- }
- private:
- virtual TStatus HandleStatus(TStatus status) {
- if (status.Level == IGraphTransformer::TStatus::Error) {
- return status;
- }
- if (status.HasRestart) {
- // ignore Async status in this case
- Index_ = 0;
- status = IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
- } else if (status.Level == IGraphTransformer::TStatus::Ok) {
- status = IGraphTransformer::TStatus::Repeat;
- ++Index_;
- }
- return status;
- }
- template <typename TFunc>
- TStatus WithScope(TExprContext& ctx, TFunc func) {
- if (UseIssueScopes_) {
- TIssueScopeGuard guard(ctx.IssueManager, [&]() {
- const auto scopeIssueCode = Stages_[Index_].IssueCode;
- const auto scopeIssueMessage = Stages_[Index_].IssueMessage;
- auto issue = MakeIntrusive<TIssue>(TPosition(), scopeIssueMessage ? scopeIssueMessage : IssueCodeToString(scopeIssueCode));
- issue->SetCode(scopeIssueCode, GetSeverity(scopeIssueCode));
- return issue;
- });
- return func();
- } else {
- return func();
- }
- }
- protected:
- TVector<TTransformStage> Stages_;
- const bool UseIssueScopes_;
- const bool DoCheckArguments_;
- size_t Index_ = 0;
- ui64 CheckArgumentsCount_ = 0;
- };
- void AddTooManyTransformationsError(TPositionHandle pos, const TStringBuf& where, TExprContext& ctx) {
- ctx.AddError(TIssue(ctx.GetPosition(pos),
- TStringBuilder() << "YQL: Internal core error! " << where << " takes too much iterations: "
- << ctx.RepeatTransformLimit
- << ". You may set RepeatTransformLimit as flags for config provider."));
- }
- }
- TAutoPtr<IGraphTransformer> CreateCompositeGraphTransformer(const TVector<TTransformStage>& stages, bool useIssueScopes) {
- return new TCompositeGraphTransformer(stages, useIssueScopes, /* doCheckArguments = */ true);
- }
- TAutoPtr<IGraphTransformer> CreateCompositeGraphTransformerWithNoArgChecks(const TVector<TTransformStage>& stages, bool useIssueScopes) {
- return new TCompositeGraphTransformer(stages, useIssueScopes, /* doCheckArguments = */ false);
- }
- namespace {
- class TChoiceGraphTransformer : public TCompositeGraphTransformer {
- public:
- TChoiceGraphTransformer(
- const std::function<bool(const TExprNode::TPtr& input, TExprContext& ctx)>& condition,
- const TTransformStage& left,
- const TTransformStage& right)
- : TCompositeGraphTransformer(
- {WrapCondition(condition), left, right},
- /* useIssueScopes = */ false,
- /* doCheckArgumentstrue = */ true)
- { }
- private:
- void Rewind() override {
- Condition_.Clear();
- TCompositeGraphTransformer::Rewind();
- }
- TStatus HandleStatus(TStatus status) override {
- if (status.Level == IGraphTransformer::TStatus::Error) {
- return status;
- }
- if (status.HasRestart) {
- // ignore Async status in this case
- Index_ = 0;
- status = IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
- } else if (status.Level == IGraphTransformer::TStatus::Ok) {
- status = IGraphTransformer::TStatus::Repeat;
- YQL_ENSURE(!Condition_.Empty(), "Condition must be set");
- if (Index_ == 0 && *Condition_) {
- Index_ = 1; // left
- } else if (Index_ == 0) {
- Index_ = 2; // right
- } else {
- Index_ = 3; // end
- }
- }
- return status;
- }
- TTransformStage WrapCondition(const std::function<bool(const TExprNode::TPtr& input, TExprContext& ctx)>& condition)
- {
- auto transformer = CreateFunctorTransformer([this, condition](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
- output = input;
- if (Condition_.Empty()) {
- Condition_ = condition(input, ctx);
- }
- return TStatus::Ok;
- });
- return TTransformStage(transformer, "Condition", TIssuesIds::DEFAULT_ERROR);
- }
- TMaybe<bool> Condition_;
- };
- } // namespace
- TAutoPtr<IGraphTransformer> MakeSharedTransformerProxy(const std::shared_ptr<IGraphTransformer>& inner) {
- return new TSharedTransformerProxy(inner);
- }
- TAutoPtr<IGraphTransformer> CreateChoiceGraphTransformer(
- const std::function<bool(const TExprNode::TPtr& input, TExprContext& ctx)>& condition,
- const TTransformStage& left, const TTransformStage& right)
- {
- return new TChoiceGraphTransformer(condition, left, right);
- }
- IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx) {
- try {
- for (; ctx.RepeatTransformCounter < ctx.RepeatTransformLimit; ++ctx.RepeatTransformCounter) {
- TExprNode::TPtr newRoot;
- auto status = transformer.Transform(root, newRoot, ctx);
- if (newRoot) {
- root = newRoot;
- }
- switch (status.Level) {
- case IGraphTransformer::TStatus::Ok:
- case IGraphTransformer::TStatus::Error:
- return status;
- case IGraphTransformer::TStatus::Repeat:
- continue;
- case IGraphTransformer::TStatus::Async:
- break;
- default:
- YQL_ENSURE(false, "Unknown status");
- }
- auto future = transformer.GetAsyncFuture(*root);
- future.Wait();
- HandleFutureException(future);
- status = transformer.ApplyAsyncChanges(root, newRoot, ctx);
- if (newRoot) {
- root = newRoot;
- }
- switch (status.Level) {
- case IGraphTransformer::TStatus::Ok:
- case IGraphTransformer::TStatus::Error:
- return status;
- case IGraphTransformer::TStatus::Repeat:
- break;
- case IGraphTransformer::TStatus::Async:
- YQL_ENSURE(false, "Async status is forbidden for ApplyAsyncChanges");
- break;
- default:
- YQL_ENSURE(false, "Unknown status");
- }
- }
- AddTooManyTransformationsError(root->Pos(), "SyncTransform", ctx);
- }
- catch (const std::exception& e) {
- ctx.AddError(ExceptionToIssue(e));
- }
- return IGraphTransformer::TStatus::Error;
- }
- IGraphTransformer::TStatus AsyncTransformStepImpl(IGraphTransformer& transformer, TExprNode::TPtr& root,
- TExprContext& ctx, bool applyAsyncChanges, bool breakOnRestart,
- const TStringBuf& name)
- {
- try {
- if (applyAsyncChanges) {
- TExprNode::TPtr newRoot;
- auto status = transformer.ApplyAsyncChanges(root, newRoot, ctx);
- if (newRoot) {
- root = newRoot;
- }
- switch (status.Level) {
- case IGraphTransformer::TStatus::Ok:
- case IGraphTransformer::TStatus::Error:
- break;
- case IGraphTransformer::TStatus::Repeat:
- if (breakOnRestart && status.HasRestart) {
- return status;
- }
- return AsyncTransformStepImpl(transformer, root, ctx, false /* no async changes */, breakOnRestart, name);
- case IGraphTransformer::TStatus::Async:
- YQL_ENSURE(false, "Async status is forbidden for ApplyAsyncChanges");
- break;
- default:
- YQL_ENSURE(false, "Unknown status");
- break;
- }
- return status;
- }
- for (; ctx.RepeatTransformCounter < ctx.RepeatTransformLimit; ++ctx.RepeatTransformCounter) {
- TExprNode::TPtr newRoot;
- auto status = transformer.Transform(root, newRoot, ctx);
- if (newRoot) {
- root = newRoot;
- }
- switch (status.Level) {
- case IGraphTransformer::TStatus::Ok:
- case IGraphTransformer::TStatus::Error:
- return status;
- case IGraphTransformer::TStatus::Repeat:
- if (breakOnRestart && status.HasRestart) {
- return status;
- }
- // if (currentTime - startTime >= threshold) return NThreading::MakeFuture(IGraphTransformer::TStatus::Yield);
- continue;
- case IGraphTransformer::TStatus::Async:
- break;
- default:
- YQL_ENSURE(false, "Unknown status");
- }
- break;
- }
- if (ctx.RepeatTransformCounter >= ctx.RepeatTransformLimit) {
- AddTooManyTransformationsError(root->Pos(), name, ctx);
- return IGraphTransformer::TStatus::Error;
- }
- }
- catch (const std::exception& e) {
- ctx.AddError(ExceptionToIssue(e));
- return IGraphTransformer::TStatus::Error;
- }
- return IGraphTransformer::TStatus::Async;
- }
- IGraphTransformer::TStatus InstantTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool breakOnRestart) {
- IGraphTransformer::TStatus status = AsyncTransformStepImpl(transformer, root, ctx, false, breakOnRestart, "InstantTransform");
- if (status.Level == IGraphTransformer::TStatus::Async) {
- ctx.AddError(TIssue(ctx.GetPosition(root->Pos()), "Instant transform can not be delayed"));
- return IGraphTransformer::TStatus::Error;
- }
- return status;
- }
- IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root,
- TExprContext& ctx, bool applyAsyncChanges)
- {
- return AsyncTransformStepImpl(transformer, root, ctx, applyAsyncChanges, false, "AsyncTransformStep");
- }
- NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx,
- bool applyAsyncChanges) {
- IGraphTransformer::TStatus status = AsyncTransformStepImpl(transformer, root, ctx, applyAsyncChanges, false, "AsyncTransform");
- if (status.Level != IGraphTransformer::TStatus::Async) {
- return NThreading::MakeFuture(status);
- }
- return transformer.GetAsyncFuture(*root).Apply(
- [] (const NThreading::TFuture<void>&) mutable -> NThreading::TFuture<IGraphTransformer::TStatus> {
- return NThreading::MakeFuture(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Async));
- });
- }
- void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges,
- std::function<void(const IGraphTransformer::TStatus&)> asyncCallback) {
- NThreading::TFuture<IGraphTransformer::TStatus> status = AsyncTransform(transformer, root, ctx, applyAsyncChanges);
- status.Subscribe(
- [asyncCallback](const NThreading::TFuture<IGraphTransformer::TStatus>& status) mutable -> void {
- HandleFutureException(status);
- asyncCallback(status.GetValue());
- });
- }
- }
- template<>
- void Out<NYql::IGraphTransformer::TStatus::ELevel>(class IOutputStream &o, NYql::IGraphTransformer::TStatus::ELevel x) {
- #define YQL_GT_STATUS_MAP_TO_STRING_IMPL(name, ...) \
- case NYql::IGraphTransformer::TStatus::name: \
- o << #name; \
- return;
- switch (x) {
- YQL_GT_STATUS_MAP(YQL_GT_STATUS_MAP_TO_STRING_IMPL)
- default:
- o << static_cast<int>(x);
- return;
- }
- }
|