yql_graph_transformer.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. #include "yql_graph_transformer.h"
  2. #include <yql/essentials/ast/yql_expr.h>
  3. #include <yql/essentials/utils/yql_panic.h>
  4. #include <yql/essentials/public/issue/yql_issue_manager.h>
  5. namespace NYql {
  6. namespace {
  7. class TSharedTransformerProxy : public IGraphTransformer {
  8. public:
  9. TSharedTransformerProxy(const std::shared_ptr<IGraphTransformer>& inner)
  10. : Inner_(inner)
  11. {}
  12. TStatus Transform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
  13. return Inner_->Transform(input, output, ctx);
  14. }
  15. NThreading::TFuture<void> GetAsyncFuture(const TExprNode& input) {
  16. return Inner_->GetAsyncFuture(input);
  17. }
  18. TStatus ApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
  19. return Inner_->ApplyAsyncChanges(input, output, ctx);
  20. }
  21. void Rewind() final {
  22. return Inner_->Rewind();
  23. }
  24. TStatistics GetStatistics() const final {
  25. return Inner_->GetStatistics();
  26. }
  27. private:
  28. const std::shared_ptr<IGraphTransformer> Inner_;
  29. };
  30. class TCompositeGraphTransformer : public TGraphTransformerBase {
  31. public:
  32. TCompositeGraphTransformer(const TVector<TTransformStage>& stages, bool useIssueScopes, bool doCheckArguments)
  33. : Stages_(stages)
  34. , UseIssueScopes_(useIssueScopes)
  35. , DoCheckArguments_(doCheckArguments)
  36. {
  37. if (UseIssueScopes_) {
  38. for (const auto& stage : Stages_) {
  39. YQL_ENSURE(!stage.Name.empty());
  40. }
  41. }
  42. }
  43. void Rewind() override {
  44. for (auto& stage : Stages_) {
  45. stage.GetTransformer().Rewind();
  46. }
  47. Index_ = 0;
  48. CheckArgumentsCount_ = 0;
  49. }
  50. TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
  51. //#define TRACE_NODES
  52. #ifdef TRACE_NODES
  53. static ui64 TransformsCount = 0;
  54. ++TransformsCount;
  55. if ((TransformsCount % 100) == 0) {
  56. Cout << "\r#transforms: " << TransformsCount << ", #nodes: " << ctx.NextUniqueId;
  57. }
  58. #endif
  59. if (Index_ >= Stages_.size()) {
  60. return TStatus::Ok;
  61. }
  62. auto status = WithScope(ctx, [&]() {
  63. return Stages_[Index_].GetTransformer().Transform(input, output, ctx);
  64. });
  65. #ifndef NDEBUG
  66. if (DoCheckArguments_ && output && output != input) {
  67. try {
  68. CheckArguments(*output);
  69. ++CheckArgumentsCount_;
  70. } catch (yexception& e) {
  71. e << "at CheckArguments() pass #" << CheckArgumentsCount_
  72. << ", stage '" << Stages_[Index_].Name << "'";
  73. throw;
  74. }
  75. }
  76. #else
  77. Y_UNUSED(DoCheckArguments_);
  78. Y_UNUSED(CheckArgumentsCount_);
  79. #endif
  80. status = HandleStatus(status);
  81. return status;
  82. }
  83. NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) override {
  84. YQL_ENSURE(Index_ < Stages_.size());
  85. return Stages_[Index_].GetTransformer().GetAsyncFuture(input);
  86. }
  87. TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
  88. YQL_ENSURE(Index_ < Stages_.size());
  89. auto status = WithScope(ctx, [&]() {
  90. return Stages_[Index_].GetTransformer().ApplyAsyncChanges(input, output, ctx);
  91. });
  92. status = HandleStatus(status);
  93. return status;
  94. }
  95. TStatistics GetStatistics() const final {
  96. if (Statistics_.Stages.empty()) {
  97. Statistics_.Stages.resize(Stages_.size());
  98. }
  99. YQL_ENSURE(Stages_.size() == Statistics_.Stages.size());
  100. for (size_t i = 0; i < Stages_.size(); ++i) {
  101. auto& stagePair = Statistics_.Stages[i];
  102. stagePair.first = Stages_[i].Name;
  103. stagePair.second = Stages_[i].GetTransformer().GetStatistics();
  104. }
  105. return Statistics_;
  106. }
  107. private:
  108. virtual TStatus HandleStatus(TStatus status) {
  109. if (status.Level == IGraphTransformer::TStatus::Error) {
  110. return status;
  111. }
  112. if (status.HasRestart) {
  113. // ignore Async status in this case
  114. Index_ = 0;
  115. status = IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
  116. } else if (status.Level == IGraphTransformer::TStatus::Ok) {
  117. status = IGraphTransformer::TStatus::Repeat;
  118. ++Index_;
  119. }
  120. return status;
  121. }
  122. template <typename TFunc>
  123. TStatus WithScope(TExprContext& ctx, TFunc func) {
  124. if (UseIssueScopes_) {
  125. TIssueScopeGuard guard(ctx.IssueManager, [&]() {
  126. const auto scopeIssueCode = Stages_[Index_].IssueCode;
  127. const auto scopeIssueMessage = Stages_[Index_].IssueMessage;
  128. auto issue = MakeIntrusive<TIssue>(TPosition(), scopeIssueMessage ? scopeIssueMessage : IssueCodeToString(scopeIssueCode));
  129. issue->SetCode(scopeIssueCode, GetSeverity(scopeIssueCode));
  130. return issue;
  131. });
  132. return func();
  133. } else {
  134. return func();
  135. }
  136. }
  137. protected:
  138. TVector<TTransformStage> Stages_;
  139. const bool UseIssueScopes_;
  140. const bool DoCheckArguments_;
  141. size_t Index_ = 0;
  142. ui64 CheckArgumentsCount_ = 0;
  143. };
  144. void AddTooManyTransformationsError(TPositionHandle pos, const TStringBuf& where, TExprContext& ctx) {
  145. ctx.AddError(TIssue(ctx.GetPosition(pos),
  146. TStringBuilder() << "YQL: Internal core error! " << where << " takes too much iterations: "
  147. << ctx.RepeatTransformLimit
  148. << ". You may set RepeatTransformLimit as flags for config provider."));
  149. }
  150. }
  151. TAutoPtr<IGraphTransformer> CreateCompositeGraphTransformer(const TVector<TTransformStage>& stages, bool useIssueScopes) {
  152. return new TCompositeGraphTransformer(stages, useIssueScopes, /* doCheckArguments = */ true);
  153. }
  154. TAutoPtr<IGraphTransformer> CreateCompositeGraphTransformerWithNoArgChecks(const TVector<TTransformStage>& stages, bool useIssueScopes) {
  155. return new TCompositeGraphTransformer(stages, useIssueScopes, /* doCheckArguments = */ false);
  156. }
  157. namespace {
  158. class TChoiceGraphTransformer : public TCompositeGraphTransformer {
  159. public:
  160. TChoiceGraphTransformer(
  161. const std::function<bool(const TExprNode::TPtr& input, TExprContext& ctx)>& condition,
  162. const TTransformStage& left,
  163. const TTransformStage& right)
  164. : TCompositeGraphTransformer(
  165. {WrapCondition(condition), left, right},
  166. /* useIssueScopes = */ false,
  167. /* doCheckArgumentstrue = */ true)
  168. { }
  169. private:
  170. void Rewind() override {
  171. Condition_.Clear();
  172. TCompositeGraphTransformer::Rewind();
  173. }
  174. TStatus HandleStatus(TStatus status) override {
  175. if (status.Level == IGraphTransformer::TStatus::Error) {
  176. return status;
  177. }
  178. if (status.HasRestart) {
  179. // ignore Async status in this case
  180. Index_ = 0;
  181. status = IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
  182. } else if (status.Level == IGraphTransformer::TStatus::Ok) {
  183. status = IGraphTransformer::TStatus::Repeat;
  184. YQL_ENSURE(!Condition_.Empty(), "Condition must be set");
  185. if (Index_ == 0 && *Condition_) {
  186. Index_ = 1; // left
  187. } else if (Index_ == 0) {
  188. Index_ = 2; // right
  189. } else {
  190. Index_ = 3; // end
  191. }
  192. }
  193. return status;
  194. }
  195. TTransformStage WrapCondition(const std::function<bool(const TExprNode::TPtr& input, TExprContext& ctx)>& condition)
  196. {
  197. auto transformer = CreateFunctorTransformer([this, condition](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  198. output = input;
  199. if (Condition_.Empty()) {
  200. Condition_ = condition(input, ctx);
  201. }
  202. return TStatus::Ok;
  203. });
  204. return TTransformStage(transformer, "Condition", TIssuesIds::DEFAULT_ERROR);
  205. }
  206. TMaybe<bool> Condition_;
  207. };
  208. } // namespace
  209. TAutoPtr<IGraphTransformer> MakeSharedTransformerProxy(const std::shared_ptr<IGraphTransformer>& inner) {
  210. return new TSharedTransformerProxy(inner);
  211. }
  212. TAutoPtr<IGraphTransformer> CreateChoiceGraphTransformer(
  213. const std::function<bool(const TExprNode::TPtr& input, TExprContext& ctx)>& condition,
  214. const TTransformStage& left, const TTransformStage& right)
  215. {
  216. return new TChoiceGraphTransformer(condition, left, right);
  217. }
  218. IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx) {
  219. try {
  220. for (; ctx.RepeatTransformCounter < ctx.RepeatTransformLimit; ++ctx.RepeatTransformCounter) {
  221. TExprNode::TPtr newRoot;
  222. auto status = transformer.Transform(root, newRoot, ctx);
  223. if (newRoot) {
  224. root = newRoot;
  225. }
  226. switch (status.Level) {
  227. case IGraphTransformer::TStatus::Ok:
  228. case IGraphTransformer::TStatus::Error:
  229. return status;
  230. case IGraphTransformer::TStatus::Repeat:
  231. continue;
  232. case IGraphTransformer::TStatus::Async:
  233. break;
  234. default:
  235. YQL_ENSURE(false, "Unknown status");
  236. }
  237. auto future = transformer.GetAsyncFuture(*root);
  238. future.Wait();
  239. HandleFutureException(future);
  240. status = transformer.ApplyAsyncChanges(root, newRoot, ctx);
  241. if (newRoot) {
  242. root = newRoot;
  243. }
  244. switch (status.Level) {
  245. case IGraphTransformer::TStatus::Ok:
  246. case IGraphTransformer::TStatus::Error:
  247. return status;
  248. case IGraphTransformer::TStatus::Repeat:
  249. break;
  250. case IGraphTransformer::TStatus::Async:
  251. YQL_ENSURE(false, "Async status is forbidden for ApplyAsyncChanges");
  252. break;
  253. default:
  254. YQL_ENSURE(false, "Unknown status");
  255. }
  256. }
  257. AddTooManyTransformationsError(root->Pos(), "SyncTransform", ctx);
  258. }
  259. catch (const std::exception& e) {
  260. ctx.AddError(ExceptionToIssue(e));
  261. }
  262. return IGraphTransformer::TStatus::Error;
  263. }
  264. IGraphTransformer::TStatus AsyncTransformStepImpl(IGraphTransformer& transformer, TExprNode::TPtr& root,
  265. TExprContext& ctx, bool applyAsyncChanges, bool breakOnRestart,
  266. const TStringBuf& name)
  267. {
  268. try {
  269. if (applyAsyncChanges) {
  270. TExprNode::TPtr newRoot;
  271. auto status = transformer.ApplyAsyncChanges(root, newRoot, ctx);
  272. if (newRoot) {
  273. root = newRoot;
  274. }
  275. switch (status.Level) {
  276. case IGraphTransformer::TStatus::Ok:
  277. case IGraphTransformer::TStatus::Error:
  278. break;
  279. case IGraphTransformer::TStatus::Repeat:
  280. if (breakOnRestart && status.HasRestart) {
  281. return status;
  282. }
  283. return AsyncTransformStepImpl(transformer, root, ctx, false /* no async changes */, breakOnRestart, name);
  284. case IGraphTransformer::TStatus::Async:
  285. YQL_ENSURE(false, "Async status is forbidden for ApplyAsyncChanges");
  286. break;
  287. default:
  288. YQL_ENSURE(false, "Unknown status");
  289. break;
  290. }
  291. return status;
  292. }
  293. for (; ctx.RepeatTransformCounter < ctx.RepeatTransformLimit; ++ctx.RepeatTransformCounter) {
  294. TExprNode::TPtr newRoot;
  295. auto status = transformer.Transform(root, newRoot, ctx);
  296. if (newRoot) {
  297. root = newRoot;
  298. }
  299. switch (status.Level) {
  300. case IGraphTransformer::TStatus::Ok:
  301. case IGraphTransformer::TStatus::Error:
  302. return status;
  303. case IGraphTransformer::TStatus::Repeat:
  304. if (breakOnRestart && status.HasRestart) {
  305. return status;
  306. }
  307. // if (currentTime - startTime >= threshold) return NThreading::MakeFuture(IGraphTransformer::TStatus::Yield);
  308. continue;
  309. case IGraphTransformer::TStatus::Async:
  310. break;
  311. default:
  312. YQL_ENSURE(false, "Unknown status");
  313. }
  314. break;
  315. }
  316. if (ctx.RepeatTransformCounter >= ctx.RepeatTransformLimit) {
  317. AddTooManyTransformationsError(root->Pos(), name, ctx);
  318. return IGraphTransformer::TStatus::Error;
  319. }
  320. }
  321. catch (const std::exception& e) {
  322. ctx.AddError(ExceptionToIssue(e));
  323. return IGraphTransformer::TStatus::Error;
  324. }
  325. return IGraphTransformer::TStatus::Async;
  326. }
  327. IGraphTransformer::TStatus InstantTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool breakOnRestart) {
  328. IGraphTransformer::TStatus status = AsyncTransformStepImpl(transformer, root, ctx, false, breakOnRestart, "InstantTransform");
  329. if (status.Level == IGraphTransformer::TStatus::Async) {
  330. ctx.AddError(TIssue(ctx.GetPosition(root->Pos()), "Instant transform can not be delayed"));
  331. return IGraphTransformer::TStatus::Error;
  332. }
  333. return status;
  334. }
  335. IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root,
  336. TExprContext& ctx, bool applyAsyncChanges)
  337. {
  338. return AsyncTransformStepImpl(transformer, root, ctx, applyAsyncChanges, false, "AsyncTransformStep");
  339. }
  340. NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx,
  341. bool applyAsyncChanges) {
  342. IGraphTransformer::TStatus status = AsyncTransformStepImpl(transformer, root, ctx, applyAsyncChanges, false, "AsyncTransform");
  343. if (status.Level != IGraphTransformer::TStatus::Async) {
  344. return NThreading::MakeFuture(status);
  345. }
  346. return transformer.GetAsyncFuture(*root).Apply(
  347. [] (const NThreading::TFuture<void>&) mutable -> NThreading::TFuture<IGraphTransformer::TStatus> {
  348. return NThreading::MakeFuture(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Async));
  349. });
  350. }
  351. void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges,
  352. std::function<void(const IGraphTransformer::TStatus&)> asyncCallback) {
  353. NThreading::TFuture<IGraphTransformer::TStatus> status = AsyncTransform(transformer, root, ctx, applyAsyncChanges);
  354. status.Subscribe(
  355. [asyncCallback](const NThreading::TFuture<IGraphTransformer::TStatus>& status) mutable -> void {
  356. HandleFutureException(status);
  357. asyncCallback(status.GetValue());
  358. });
  359. }
  360. }
  361. template<>
  362. void Out<NYql::IGraphTransformer::TStatus::ELevel>(class IOutputStream &o, NYql::IGraphTransformer::TStatus::ELevel x) {
  363. #define YQL_GT_STATUS_MAP_TO_STRING_IMPL(name, ...) \
  364. case NYql::IGraphTransformer::TStatus::name: \
  365. o << #name; \
  366. return;
  367. switch (x) {
  368. YQL_GT_STATUS_MAP(YQL_GT_STATUS_MAP_TO_STRING_IMPL)
  369. default:
  370. o << static_cast<int>(x);
  371. return;
  372. }
  373. }