yql_graph_transformer.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. #pragma once
  2. #include <yql/essentials/ast/yql_expr.h>
  3. #include <yql/essentials/utils/yql_panic.h>
  4. #include <yql/essentials/core/issue/yql_issue.h>
  5. #include <library/cpp/threading/future/future.h>
  6. #include <util/generic/hash.h>
  7. #include <util/datetime/base.h>
  8. #include <functional>
  9. namespace NYql {
  10. class IGraphTransformer {
  11. public:
  12. struct TStatus {
  13. #define YQL_GT_STATUS_MAP(xx) \
  14. xx(Ok, 0) \
  15. xx(Repeat, 1) \
  16. xx(Async, 2) \
  17. xx(Error, 3)
  18. enum ELevel {
  19. YQL_GT_STATUS_MAP(ENUM_VALUE_GEN)
  20. };
  21. union {
  22. ui32 Raw;
  23. struct {
  24. ui32 Level : 4;
  25. ui32 HasRestart : 1;
  26. ui32 Padding : 27;
  27. };
  28. };
  29. bool operator== (const TStatus& other) const {
  30. return Raw == other.Raw;
  31. }
  32. bool operator!= (const TStatus& other) const {
  33. return Raw != other.Raw;
  34. }
  35. bool operator== (ELevel other) const {
  36. return Level == other;
  37. }
  38. bool operator!= (ELevel other) const {
  39. return Level != other;
  40. }
  41. TStatus(ELevel level, bool hasRestart = false)
  42. : Level(level)
  43. , HasRestart(hasRestart)
  44. , Padding(0)
  45. {}
  46. [[nodiscard]]
  47. TStatus Combine(TStatus other) const {
  48. const bool hasRestart = HasRestart || other.HasRestart;
  49. return TStatus((TStatus::ELevel)Max(Level, other.Level), hasRestart);
  50. }
  51. void Out(IOutputStream &out) const {
  52. out << (TStatus::ELevel)Level;
  53. if (HasRestart) {
  54. out << ", with restart";
  55. }
  56. }
  57. };
  58. struct TStatistics {
  59. TDuration TransformDuration;
  60. TDuration WaitDuration;
  61. i32 NewExprNodes;
  62. i32 NewTypeNodes;
  63. i32 NewConstraintNodes;
  64. ui32 Repeats;
  65. ui32 Restarts;
  66. TVector<std::pair<TString, TStatistics>> Stages;
  67. TStatistics()
  68. : TransformDuration(TDuration::Zero())
  69. , WaitDuration(TDuration::Zero())
  70. , NewExprNodes(0)
  71. , NewTypeNodes(0)
  72. , NewConstraintNodes(0)
  73. , Repeats(0)
  74. , Restarts(0)
  75. , Stages() {}
  76. static TStatistics NotPresent() { return TStatistics(); }
  77. static TStatistics Zero() { return TStatistics(); }
  78. };
  79. virtual ~IGraphTransformer() {}
  80. virtual TStatus Transform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0;
  81. virtual NThreading::TFuture<void> GetAsyncFuture(const TExprNode& input) = 0;
  82. virtual TStatus ApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0;
  83. virtual void Rewind() = 0;
  84. virtual TStatistics GetStatistics() const { return TStatistics::NotPresent(); }
  85. };
  86. class TGraphTransformerBase : public IGraphTransformer {
  87. private:
  88. class TTransformScope {
  89. public:
  90. TTransformScope(TStatistics& statistics, const TExprContext* exprCtx)
  91. : Statistics_(statistics)
  92. , ExprCtx_(exprCtx)
  93. , TransformStart_(TInstant::Now())
  94. , ExprNodesSize_(exprCtx ? exprCtx->ExprNodes.size() : 0)
  95. , TypeNodesSize_(exprCtx ? exprCtx->TypeNodes.size() : 0)
  96. , ConstraintNodesSize_(exprCtx ? exprCtx->ConstraintNodes.size() : 0)
  97. {
  98. }
  99. ~TTransformScope() {
  100. Statistics_.TransformDuration += TInstant::Now() - TransformStart_;
  101. if (ExprCtx_) {
  102. Statistics_.NewExprNodes += ExprCtx_->ExprNodes.size() - ExprNodesSize_;
  103. Statistics_.NewTypeNodes += ExprCtx_->TypeNodes.size() - TypeNodesSize_;
  104. Statistics_.NewConstraintNodes += ExprCtx_->ConstraintNodes.size() - ConstraintNodesSize_;
  105. }
  106. }
  107. TStatus HandleStatus(const TStatus& status) {
  108. if (status == TStatus::Repeat) {
  109. Statistics_.Repeats++;
  110. }
  111. if (status.HasRestart) {
  112. Statistics_.Restarts++;
  113. }
  114. return status;
  115. }
  116. private:
  117. TStatistics& Statistics_;
  118. const TExprContext* ExprCtx_;
  119. TInstant TransformStart_;
  120. i64 ExprNodesSize_;
  121. i64 TypeNodesSize_;
  122. i64 ConstraintNodesSize_;
  123. };
  124. public:
  125. TGraphTransformerBase()
  126. : Statistics_(TStatistics::Zero())
  127. , AsyncStart_() {}
  128. TStatus Transform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  129. TTransformScope scope(Statistics_, &ctx);
  130. return scope.HandleStatus(DoTransform(input, output, ctx));
  131. }
  132. NThreading::TFuture<void> GetAsyncFuture(const TExprNode& input) final {
  133. TTransformScope scope(Statistics_, nullptr);
  134. AsyncStart_ = TInstant::Now();
  135. return DoGetAsyncFuture(input);
  136. }
  137. TStatus ApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  138. TTransformScope scope(Statistics_, &ctx);
  139. Statistics_.WaitDuration += TInstant::Now() - AsyncStart_;
  140. return scope.HandleStatus(DoApplyAsyncChanges(input, output, ctx));
  141. }
  142. virtual TStatistics GetStatistics() const override { return Statistics_; }
  143. public:
  144. virtual TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0;
  145. virtual NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) = 0;
  146. virtual TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0;
  147. protected:
  148. mutable TStatistics Statistics_;
  149. private:
  150. TInstant AsyncStart_;
  151. };
  152. struct TTransformStage {
  153. TString Name;
  154. EYqlIssueCode IssueCode;
  155. TString IssueMessage;
  156. TTransformStage(const TAutoPtr<IGraphTransformer>& transformer, const TString& name, EYqlIssueCode issueCode, const TString& issueMessage = {})
  157. : Name(name)
  158. , IssueCode(issueCode)
  159. , IssueMessage(issueMessage)
  160. , RawTransformer_(transformer.Get())
  161. , Transformer_(transformer)
  162. {}
  163. TTransformStage(IGraphTransformer& transformer, const TString& name, EYqlIssueCode issueCode, const TString& issueMessage = {})
  164. : Name(name)
  165. , IssueCode(issueCode)
  166. , IssueMessage(issueMessage)
  167. , RawTransformer_(&transformer)
  168. {}
  169. IGraphTransformer& GetTransformer() const
  170. {
  171. return *RawTransformer_;
  172. }
  173. private:
  174. IGraphTransformer* const RawTransformer_;
  175. const TAutoPtr<IGraphTransformer> Transformer_;
  176. };
  177. TAutoPtr<IGraphTransformer> CreateCompositeGraphTransformer(const TVector<TTransformStage>& stages, bool useIssueScopes);
  178. TAutoPtr<IGraphTransformer> CreateCompositeGraphTransformerWithNoArgChecks(const TVector<TTransformStage>& stages, bool useIssueScopes);
  179. TAutoPtr<IGraphTransformer> CreateChoiceGraphTransformer(
  180. const std::function<bool(const TExprNode::TPtr& input, TExprContext& ctx)>& condition,
  181. const TTransformStage& left,
  182. const TTransformStage& right);
  183. IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx);
  184. IGraphTransformer::TStatus InstantTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool breakOnRestart = false);
  185. NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges);
  186. void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges,
  187. std::function<void(const IGraphTransformer::TStatus&)> asyncCallback);
  188. IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root,
  189. TExprContext& ctx, bool applyAsyncChanges);
  190. template <typename T>
  191. void HandleFutureException(const NThreading::TFuture<T>& future) {
  192. if (future.HasException()) {
  193. try {
  194. future.TryRethrow();
  195. } catch (...) {
  196. throw yexception() << "Unexpected future exception: " << CurrentExceptionMessage();
  197. }
  198. }
  199. }
  200. class TSyncTransformerBase : public TGraphTransformerBase {
  201. public:
  202. NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
  203. Y_UNUSED(input);
  204. YQL_ENSURE(false, "Not supported");
  205. }
  206. TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  207. Y_UNUSED(input);
  208. Y_UNUSED(output);
  209. Y_UNUSED(ctx);
  210. YQL_ENSURE(false, "Not supported");
  211. }
  212. };
  213. class TNullTransformer final: public TSyncTransformerBase {
  214. public:
  215. TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  216. output = input;
  217. Y_UNUSED(ctx);
  218. return IGraphTransformer::TStatus::Ok;
  219. }
  220. void Rewind() final {
  221. }
  222. };
  223. template <typename TFunctor>
  224. class TFunctorTransformer: public TSyncTransformerBase {
  225. public:
  226. TFunctorTransformer(TFunctor functor)
  227. : Functor_(std::move(functor)) {}
  228. TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
  229. TStatus status = Functor_(input, output, ctx);
  230. YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Async);
  231. return status;
  232. }
  233. void Rewind() override {
  234. }
  235. private:
  236. TFunctor Functor_;
  237. };
  238. template <typename TFunctor>
  239. class TSinglePassFunctorTransformer final: public TFunctorTransformer<TFunctor> {
  240. using TBase = TFunctorTransformer<TFunctor>;
  241. public:
  242. TSinglePassFunctorTransformer(TFunctor functor)
  243. : TFunctorTransformer<TFunctor>(std::move(functor))
  244. {}
  245. IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  246. if (Pass_) {
  247. output = input;
  248. return IGraphTransformer::TStatus::Ok;
  249. }
  250. IGraphTransformer::TStatus status = TBase::DoTransform(input, output, ctx);
  251. if (IGraphTransformer::TStatus::Ok == status.Level) {
  252. Pass_ = true;
  253. }
  254. return status;
  255. }
  256. void Rewind() final {
  257. Pass_ = false;
  258. }
  259. private:
  260. bool Pass_ = false;
  261. };
  262. template <typename TFunctor>
  263. THolder<IGraphTransformer> CreateFunctorTransformer(TFunctor functor) {
  264. return MakeHolder<TFunctorTransformer<TFunctor>>(std::move(functor));
  265. }
  266. template <typename TFunctor>
  267. THolder<IGraphTransformer> CreateSinglePassFunctorTransformer(TFunctor functor) {
  268. return MakeHolder<TSinglePassFunctorTransformer<TFunctor>>(std::move(functor));
  269. }
  270. typedef std::function<IGraphTransformer::TStatus(const TExprNode::TPtr&, TExprNode::TPtr&, TExprContext&)> TAsyncTransformCallback;
  271. typedef NThreading::TFuture<TAsyncTransformCallback> TAsyncTransformCallbackFuture;
  272. template <typename TDerived>
  273. class TAsyncCallbackTransformer : public TGraphTransformerBase {
  274. public:
  275. // CallbackTransform should return std::pair<TStatus, TAsyncTransformCallbackFuture>
  276. TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  277. auto pair = static_cast<TDerived*>(this)->CallbackTransform(input, output, ctx);
  278. if (pair.first == TStatus::Async) {
  279. YQL_ENSURE(Callbacks_.emplace(input.Get(), pair.second).second);
  280. }
  281. return pair.first;
  282. }
  283. NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
  284. const auto it = Callbacks_.find(&input);
  285. YQL_ENSURE(it != Callbacks_.cend());
  286. return it->second.IgnoreResult();
  287. }
  288. TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  289. const auto it = Callbacks_.find(input.Get());
  290. YQL_ENSURE(it != Callbacks_.cend());
  291. auto& future = it->second;
  292. YQL_ENSURE(future.HasValue());
  293. const auto status = future.GetValue()(input, output, ctx);
  294. Callbacks_.erase(it);
  295. return status;
  296. }
  297. void Rewind() override {
  298. Callbacks_.clear();
  299. }
  300. private:
  301. TNodeMap<TAsyncTransformCallbackFuture> Callbacks_;
  302. };
  303. template <bool AlwaysRaiseIssues = true, typename TFuture, typename TCallback>
  304. std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture>
  305. WrapFutureCallback(const TFuture& future, const TCallback& callback, const TString& message = "") {
  306. return std::make_pair(IGraphTransformer::TStatus::Async, future.Apply(
  307. [callback, message](const TFuture& completedFuture) {
  308. return TAsyncTransformCallback([completedFuture, callback, message](const TExprNode::TPtr& input,
  309. TExprNode::TPtr& output, TExprContext& ctx)
  310. {
  311. output = input;
  312. const auto& res = completedFuture.GetValue();
  313. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  314. return MakeIntrusive<TIssue>(
  315. ctx.GetPosition(input->Pos()),
  316. message.empty()
  317. ? TStringBuilder() << "Execution of node: " << input->Content()
  318. : message);
  319. });
  320. if constexpr (AlwaysRaiseIssues)
  321. res.ReportIssues(ctx.IssueManager);
  322. if (!res.Success()) {
  323. if constexpr (!AlwaysRaiseIssues)
  324. res.ReportIssues(ctx.IssueManager);
  325. input->SetState(TExprNode::EState::Error);
  326. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error);
  327. }
  328. else {
  329. return callback(res, input, output, ctx);
  330. }
  331. });
  332. }));
  333. }
  334. template <typename TFuture, typename TResultExtractor>
  335. std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture>
  336. WrapFuture(const TFuture& future, const TResultExtractor& extractor, const TString& message = "") {
  337. return WrapFutureCallback(future, [extractor](const NThreading::TFutureType<TFuture>& res, const TExprNode::TPtr& input, TExprNode::TPtr& /*output*/, TExprContext& ctx) {
  338. input->SetState(TExprNode::EState::ExecutionComplete);
  339. input->SetResult(extractor(res, input, ctx));
  340. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Ok);
  341. }, message);
  342. }
  343. template <typename TFuture, typename TResultExtractor>
  344. std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture>
  345. WrapModifyFuture(const TFuture& future, const TResultExtractor& extractor, const TString& message = "") {
  346. return WrapFutureCallback(future, [extractor](const NThreading::TFutureType<TFuture>& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  347. TExprNode::TPtr resultNode = extractor(res, input, output, ctx);
  348. input->SetState(TExprNode::EState::ExecutionComplete);
  349. output->SetResult(std::move(resultNode));
  350. if (input != output) {
  351. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
  352. }
  353. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Ok);
  354. }, message);
  355. }
  356. inline std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> SyncStatus(IGraphTransformer::TStatus status) {
  357. return std::make_pair(status, TAsyncTransformCallbackFuture());
  358. }
  359. inline std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> SyncError() {
  360. return SyncStatus(IGraphTransformer::TStatus::Error);
  361. }
  362. inline std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> SyncOk() {
  363. return SyncStatus(IGraphTransformer::TStatus::Ok);
  364. }
  365. inline std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> SyncRepeat() {
  366. return SyncStatus(IGraphTransformer::TStatus::Repeat);
  367. }
  368. typedef std::unordered_map<TExprNode::TPtr, ui64, TExprNode::TPtrHash> TSyncMap;
  369. }
  370. template<>
  371. inline void Out<NYql::IGraphTransformer::TStatus>(
  372. IOutputStream &out, const NYql::IGraphTransformer::TStatus& status)
  373. {
  374. status.Out(out);
  375. }