mkql_computation_node_impl.h 35 KB


  1. #pragma once
  2. #include "mkql_computation_node.h"
  3. #include <yql/essentials/minikql/mkql_alloc.h>
  4. #include <yql/essentials/public/udf/udf_value.h>
  5. #include <util/system/type_name.h>
  6. namespace NKikimr {
  7. namespace NMiniKQL {
  8. enum class EDictType {
  9. Sorted,
  10. Hashed
  11. };
  12. enum class EContainerOptMode {
  13. NoOpt,
  14. OptNoAdd,
  15. OptAdd
  16. };
  17. template <class IComputationNodeInterface>
  18. class TRefCountedComputationNode : public IComputationNodeInterface {
  19. private:
  20. void Ref() final;
  21. void UnRef() final;
  22. ui32 RefCount() const final;
  23. private:
  24. ui32 Refs_ = 0;
  25. };
  26. class TUnboxedImmutableComputationNode: public TRefCountedComputationNode<IComputationNode>
  27. {
  28. public:
  29. TUnboxedImmutableComputationNode(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& value);
  30. ~TUnboxedImmutableComputationNode();
  31. private:
  32. void InitNode(TComputationContext&) const override {}
  33. NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const final;
  34. const IComputationNode* GetSource() const final;
  35. IComputationNode* AddDependence(const IComputationNode*) final;
  36. void RegisterDependencies() const final;
  37. ui32 GetIndex() const final;
  38. void CollectDependentIndexes(const IComputationNode* owner, TIndexesMap&) const final;
  39. ui32 GetDependencyWeight() const final;
  40. ui32 GetDependencesCount() const final;
  41. bool IsTemporaryValue() const final;
  42. void PrepareStageOne() final;
  43. void PrepareStageTwo() final;
  44. TString DebugString() const final;
  45. EValueRepresentation GetRepresentation() const final;
  46. TMemoryUsageInfo *const MemInfo;
  47. protected:
  48. const NUdf::TUnboxedValue UnboxedValue;
  49. const EValueRepresentation RepresentationKind;
  50. };
  51. class TStatefulComputationNodeBase {
  52. protected:
  53. TStatefulComputationNodeBase(ui32 valueIndex, EValueRepresentation kind);
  54. ~TStatefulComputationNodeBase();
  55. void AddDependenceImpl(const IComputationNode* node);
  56. void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
  57. IComputationNode::TIndexesMap& dependencies, bool stateless) const;
  58. TConstComputationNodePtrVector Dependencies;
  59. const ui32 ValueIndex;
  60. const EValueRepresentation RepresentationKind;
  61. };
  62. template <class IComputationNodeInterface, bool SerializableState = false>
  63. class TStatefulComputationNode: public TRefCountedComputationNode<IComputationNodeInterface>, protected TStatefulComputationNodeBase
  64. {
  65. protected:
  66. TStatefulComputationNode(TComputationMutables& mutables, EValueRepresentation kind);
  67. protected:
  68. void InitNode(TComputationContext&) const override;
  69. ui32 GetIndex() const final;
  70. IComputationNode* AddDependence(const IComputationNode* node) final;
  71. EValueRepresentation GetRepresentation() const override;
  72. NUdf::TUnboxedValue& ValueRef(TComputationContext& compCtx) const {
  73. return compCtx.MutableValues[ValueIndex];
  74. }
  75. private:
  76. ui32 GetDependencesCount() const final;
  77. };
  78. class TExternalComputationNode: public TStatefulComputationNode<IComputationExternalNode>
  79. {
  80. public:
  81. TExternalComputationNode(TComputationMutables& mutables, EValueRepresentation kind = EValueRepresentation::Any);
  82. protected:
  83. NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override;
  84. NUdf::TUnboxedValue& RefValue(TComputationContext& compCtx) const override;
  85. void SetValue(TComputationContext& compCtx, NUdf::TUnboxedValue&& value) const override;
  86. TString DebugString() const final;
  87. private:
  88. ui32 GetDependencyWeight() const final;
  89. void RegisterDependencies() const final;
  90. void SetOwner(const IComputationNode* owner) final;
  91. void PrepareStageOne() final;
  92. void PrepareStageTwo() final;
  93. void CollectDependentIndexes(const IComputationNode* owner, TIndexesMap& dependencies) const final;
  94. bool IsTemporaryValue() const final;
  95. const IComputationNode* Owner = nullptr;
  96. void SetGetter(TGetter&& getter) final;
  97. void InvalidateValue(TComputationContext& compCtx) const final;
  98. const IComputationNode* GetSource() const final;
  99. protected:
  100. std::vector<std::pair<ui32, EValueRepresentation>> InvalidationSet;
  101. TGetter Getter;
  102. };
  103. class TStatefulSourceComputationNodeBase {
  104. protected:
  105. TStatefulSourceComputationNodeBase();
  106. ~TStatefulSourceComputationNodeBase();
  107. void PrepareStageOneImpl(const TConstComputationNodePtrVector& dependencies);
  108. void AddSource(IComputationNode* source) const;
  109. mutable std::unordered_set<const IComputationNode*> Sources; // TODO: remove const and mutable.
  110. std::optional<bool> Stateless;
  111. };
  112. template <typename TDerived, bool SerializableState = false>
  113. class TStatefulSourceComputationNode: public TStatefulComputationNode<IComputationNode, SerializableState>,
  114. protected TStatefulSourceComputationNodeBase
  115. {
  116. using TStatefulComputationNode = TStatefulComputationNode<IComputationNode, SerializableState>;
  117. private:
  118. bool IsTemporaryValue() const final {
  119. return *Stateless;
  120. }
  121. ui32 GetDependencyWeight() const final {
  122. return Sources.size();
  123. }
  124. void PrepareStageOne() final {
  125. PrepareStageOneImpl(this->Dependencies);
  126. }
  127. void PrepareStageTwo() final {}
  128. void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
  129. this->CollectDependentIndexesImpl(this, owner, dependencies, *Stateless);
  130. }
  131. const IComputationNode* GetSource() const final { return this; }
  132. protected:
  133. TStatefulSourceComputationNode(TComputationMutables& mutables, EValueRepresentation kind = EValueRepresentation::Any)
  134. : TStatefulComputationNode(mutables, kind)
  135. {}
  136. void DependsOn(IComputationNode* node) const {
  137. if (node) {
  138. if (const auto source = node->AddDependence(this)) {
  139. AddSource(source);
  140. }
  141. }
  142. }
  143. void Own(IComputationExternalNode* node) const {
  144. if (node) {
  145. node->SetOwner(this);
  146. }
  147. }
  148. TString DebugString() const override {
  149. return TypeName<TDerived>();
  150. }
  151. };
  152. template <typename TDerived>
  153. class TMutableComputationNode: public TStatefulSourceComputationNode<TDerived> {
  154. protected:
  155. using TStatefulSourceComputationNode<TDerived>::TStatefulSourceComputationNode;
  156. NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override {
  157. if (*this->Stateless)
  158. return static_cast<const TDerived*>(this)->DoCalculate(compCtx);
  159. NUdf::TUnboxedValue& valueRef = this->ValueRef(compCtx);
  160. if (valueRef.IsInvalid()) {
  161. valueRef = static_cast<const TDerived*>(this)->DoCalculate(compCtx);
  162. }
  163. return valueRef;
  164. }
  165. };
  166. template <typename TDerived, typename IFlowInterface>
  167. class TFlowSourceBaseComputationNode: public TStatefulComputationNode<IFlowInterface>
  168. {
  169. using TBase = TStatefulComputationNode<IFlowInterface>;
  170. protected:
  171. TFlowSourceBaseComputationNode(TComputationMutables& mutables, EValueRepresentation stateKind)
  172. : TBase(mutables, stateKind)
  173. {}
  174. TString DebugString() const override {
  175. return TypeName<TDerived>();
  176. }
  177. void DependsOn(IComputationNode* node) const {
  178. if (node) {
  179. if (const auto source = node->AddDependence(this)) {
  180. Sources.emplace(source);
  181. }
  182. }
  183. }
  184. void Own(IComputationExternalNode* node) const {
  185. if (node) {
  186. node->SetOwner(this);
  187. }
  188. }
  189. private:
  190. bool IsTemporaryValue() const final {
  191. return true;
  192. }
  193. ui32 GetDependencyWeight() const final {
  194. return this->Dependencies.size() + Sources.size();
  195. }
  196. void CollectDependentIndexes(const IComputationNode* owner, IComputationExternalNode::TIndexesMap& dependencies) const final {
  197. this->CollectDependentIndexesImpl(this, owner, dependencies, false);
  198. }
  199. void PrepareStageOne() final {}
  200. void PrepareStageTwo() final {}
  201. const IComputationNode* GetSource() const final { return this; }
  202. mutable std::unordered_set<const IComputationNode*> Sources; // TODO: remove const and mutable.
  203. };
  204. template <typename TDerived>
  205. class TFlowSourceComputationNode: public TFlowSourceBaseComputationNode<TDerived, IComputationNode>
  206. {
  207. using TBase = TFlowSourceBaseComputationNode<TDerived, IComputationNode>;
  208. protected:
  209. TFlowSourceComputationNode(TComputationMutables& mutables, EValueRepresentation kind, EValueRepresentation stateKind)
  210. : TBase(mutables, stateKind), RepresentationKind(kind)
  211. {}
  212. private:
  213. EValueRepresentation GetRepresentation() const final {
  214. return RepresentationKind;
  215. }
  216. NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const final {
  217. return static_cast<const TDerived*>(this)->DoCalculate(this->ValueRef(compCtx), compCtx);
  218. }
  219. private:
  220. const EValueRepresentation RepresentationKind;
  221. };
  222. template <typename TDerived>
  223. class TWideFlowSourceComputationNode: public TFlowSourceBaseComputationNode<TDerived, IComputationWideFlowNode>
  224. {
  225. using TBase = TFlowSourceBaseComputationNode<TDerived, IComputationWideFlowNode>;
  226. protected:
  227. TWideFlowSourceComputationNode(TComputationMutables& mutables, EValueRepresentation stateKind)
  228. : TBase(mutables, stateKind)
  229. {}
  230. private:
  231. EValueRepresentation GetRepresentation() const final {
  232. THROW yexception() << "Failed to get representation kind.";
  233. }
  234. NUdf::TUnboxedValue GetValue(TComputationContext&) const final {
  235. THROW yexception() << "Failed to get value from wide flow node.";
  236. }
  237. EFetchResult FetchValues(TComputationContext& compCtx, NUdf::TUnboxedValue*const* values) const final {
  238. return static_cast<const TDerived*>(this)->DoCalculate(this->ValueRef(compCtx), compCtx, values);
  239. }
  240. };
  241. template <typename TDerived, typename IFlowInterface>
  242. class TFlowBaseComputationNode: public TRefCountedComputationNode<IFlowInterface>
  243. {
  244. protected:
  245. TFlowBaseComputationNode(const IComputationNode* source) : Source(source) {}
  246. void InitNode(TComputationContext&) const override {}
  247. TString DebugString() const override {
  248. return TypeName<TDerived>();
  249. }
  250. IComputationNode* FlowDependsOn(IComputationNode* node) const {
  251. if (node) {
  252. if (const auto source = node->AddDependence(this); dynamic_cast<IComputationExternalNode*>(source) || dynamic_cast<IComputationWideFlowProxyNode*>(source))
  253. return const_cast<IComputationNode*>(static_cast<const IComputationNode*>(this)); // TODO: remove const in RegisterDependencies.
  254. else
  255. return source;
  256. }
  257. return nullptr;
  258. }
  259. IComputationNode* FlowDependsOnBoth(IComputationNode* one, IComputationNode* two) const {
  260. const auto flowOne = FlowDependsOn(one);
  261. const auto flowTwo = FlowDependsOn(two);
  262. if (flowOne && flowTwo) {
  263. if (flowOne == flowTwo)
  264. return flowOne;
  265. const auto flow = const_cast<IComputationNode*>(static_cast<const IComputationNode*>(this));
  266. DependsOn(flow, flowOne);
  267. DependsOn(flow, flowTwo);
  268. return flow;
  269. } else if (flowOne) {
  270. return flowOne;
  271. } else if (flowTwo) {
  272. return flowTwo;
  273. }
  274. return nullptr;
  275. }
  276. IComputationNode* FlowDependsOnAll(const std::vector<IFlowInterface*, TMKQLAllocator<IFlowInterface*>>& sources) const {
  277. std::unordered_set<IComputationNode*> flows(sources.size());
  278. for (const auto& source : sources)
  279. if (const auto flow = FlowDependsOn(source))
  280. flows.emplace(flow);
  281. if (flows.size() > 1U) {
  282. const auto flow = const_cast<IComputationNode*>(static_cast<const IComputationNode*>(this));
  283. std::for_each(flows.cbegin(), flows.cend(), std::bind(&TFlowBaseComputationNode::DependsOn, flow, std::placeholders::_1));
  284. return flow;
  285. }
  286. return flows.empty() ? nullptr : *flows.cbegin();
  287. }
  288. static void DependsOn(IComputationNode* source, IComputationNode* node) {
  289. if (node && source && node != source) {
  290. node->AddDependence(source);
  291. }
  292. }
  293. static void Own(IComputationNode* source, IComputationExternalNode* node) {
  294. if (node && source && node != source) {
  295. node->SetOwner(source);
  296. }
  297. }
  298. static void OwnProxy(IComputationNode* source, IComputationWideFlowProxyNode* node) {
  299. if (node && source && node != source) {
  300. node->SetOwner(source);
  301. }
  302. }
  303. private:
  304. ui32 GetDependencyWeight() const final { return 42U; }
  305. ui32 GetDependencesCount() const final {
  306. return Dependence ? 1U : 0U;
  307. }
  308. IComputationNode* AddDependence(const IComputationNode* node) final {
  309. if (!Dependence) {
  310. Dependence = node;
  311. }
  312. return this;
  313. }
  314. bool IsTemporaryValue() const final {
  315. return true;
  316. }
  317. void PrepareStageOne() final {}
  318. void PrepareStageTwo() final {}
  319. const IComputationNode* GetSource() const final {
  320. if (Source && Source != this)
  321. if (const auto s = Source->GetSource())
  322. return s;
  323. return this;
  324. }
  325. protected:
  326. const IComputationNode *const Source;
  327. const IComputationNode *Dependence = nullptr;
  328. };
  329. template <typename TDerived>
  330. class TBaseFlowBaseComputationNode: public TFlowBaseComputationNode<TDerived, IComputationNode>
  331. {
  332. protected:
  333. TBaseFlowBaseComputationNode(const IComputationNode* source, EValueRepresentation kind)
  334. : TFlowBaseComputationNode<TDerived, IComputationNode>(source), RepresentationKind(kind)
  335. {}
  336. private:
  337. EValueRepresentation GetRepresentation() const final {
  338. return RepresentationKind;
  339. }
  340. const EValueRepresentation RepresentationKind;
  341. };
  342. class TStatelessFlowComputationNodeBase {
  343. protected:
  344. ui32 GetIndexImpl() const;
  345. void CollectDependentIndexesImpl(const IComputationNode* self,
  346. const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies,
  347. const IComputationNode* dependence) const;
  348. };
  349. template <typename TDerived>
  350. class TStatelessFlowComputationNode: public TBaseFlowBaseComputationNode<TDerived>, protected TStatelessFlowComputationNodeBase
  351. {
  352. protected:
  353. TStatelessFlowComputationNode(const IComputationNode* source, EValueRepresentation kind)
  354. : TBaseFlowBaseComputationNode<TDerived>(source, kind)
  355. {}
  356. TStatelessFlowComputationNode(TComputationMutables&, const IComputationNode* source, EValueRepresentation kind)
  357. : TBaseFlowBaseComputationNode<TDerived>(source, kind)
  358. {}
  359. NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override {
  360. return static_cast<const TDerived*>(this)->DoCalculate(compCtx);
  361. }
  362. private:
  363. ui32 GetIndex() const final {
  364. return GetIndexImpl();
  365. }
  366. void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
  367. CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence);
  368. }
  369. };
  370. class TStatefulFlowComputationNodeBase {
  371. protected:
  372. TStatefulFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation stateKind);
  373. void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
  374. IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const;
  375. const ui32 StateIndex;
  376. const EValueRepresentation StateKind;
  377. };
  378. template <typename TDerived, bool SerializableState = false>
  379. class TStatefulFlowComputationNode: public TBaseFlowBaseComputationNode<TDerived>, protected TStatefulFlowComputationNodeBase
  380. {
  381. protected:
  382. TStatefulFlowComputationNode(TComputationMutables& mutables, const IComputationNode* source, EValueRepresentation kind, EValueRepresentation stateKind = EValueRepresentation::Any)
  383. : TBaseFlowBaseComputationNode<TDerived>(source, kind), TStatefulFlowComputationNodeBase(mutables.CurValueIndex++, stateKind)
  384. {
  385. if constexpr (SerializableState) {
  386. mutables.SerializableValues.push_back(StateIndex);
  387. }
  388. }
  389. NUdf::TUnboxedValue& RefState(TComputationContext& compCtx) const {
  390. return compCtx.MutableValues[GetIndex()];
  391. }
  392. private:
  393. ui32 GetIndex() const final {
  394. return StateIndex;
  395. }
  396. NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const final {
  397. return static_cast<const TDerived*>(this)->DoCalculate(compCtx.MutableValues[StateIndex], compCtx);
  398. }
  399. void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
  400. CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence);
  401. }
  402. };
  403. const IComputationNode* GetCommonSource(const IComputationNode* first, const IComputationNode* second, const IComputationNode* common);
  404. class TPairStateFlowComputationNodeBase {
  405. protected:
  406. TPairStateFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation firstKind, EValueRepresentation secondKind);
  407. void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
  408. IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const;
  409. const ui32 StateIndex;
  410. const EValueRepresentation FirstKind, SecondKind;
  411. };
  412. template <typename TDerived>
  413. class TPairStateFlowComputationNode: public TBaseFlowBaseComputationNode<TDerived>, protected TPairStateFlowComputationNodeBase
  414. {
  415. protected:
  416. TPairStateFlowComputationNode(TComputationMutables& mutables, const IComputationNode* source, EValueRepresentation kind, EValueRepresentation firstKind = EValueRepresentation::Any, EValueRepresentation secondKind = EValueRepresentation::Any)
  417. : TBaseFlowBaseComputationNode<TDerived>(source, kind), TPairStateFlowComputationNodeBase(mutables.CurValueIndex++, firstKind, secondKind)
  418. {
  419. ++mutables.CurValueIndex;
  420. }
  421. private:
  422. NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const final {
  423. return static_cast<const TDerived*>(this)->DoCalculate(compCtx.MutableValues[StateIndex], compCtx.MutableValues[StateIndex + 1U], compCtx);
  424. }
  425. ui32 GetIndex() const final {
  426. return StateIndex;
  427. }
  428. void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
  429. CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence);
  430. }
  431. };
  432. class TWideFlowProxyComputationNode: public TRefCountedComputationNode<IComputationWideFlowProxyNode>
  433. {
  434. public:
  435. TWideFlowProxyComputationNode() = default;
  436. protected:
  437. TString DebugString() const final;
  438. private:
  439. void InitNode(TComputationContext&) const override {}
  440. EValueRepresentation GetRepresentation() const final;
  441. NUdf::TUnboxedValue GetValue(TComputationContext&) const final;
  442. ui32 GetIndex() const final;
  443. ui32 GetDependencyWeight() const final;
  444. ui32 GetDependencesCount() const final;
  445. const IComputationNode* GetSource() const final;
  446. IComputationNode* AddDependence(const IComputationNode* node) final;
  447. bool IsTemporaryValue() const final;
  448. void RegisterDependencies() const final;
  449. void PrepareStageOne() final;
  450. void PrepareStageTwo() final;
  451. void SetOwner(const IComputationNode* owner) final;
  452. void CollectDependentIndexes(const IComputationNode*, TIndexesMap&) const final;
  453. void InvalidateValue(TComputationContext& ctx) const final;
  454. void SetFetcher(TFetcher&& fetcher) final;
  455. EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* values) const final;
  456. protected:
  457. const IComputationNode* Dependence = nullptr;
  458. const IComputationNode* Owner = nullptr;
  459. std::vector<std::pair<ui32, EValueRepresentation>> InvalidationSet;
  460. TFetcher Fetcher;
  461. };
  462. class TWideFlowBaseComputationNodeBase {
  463. protected:
  464. EValueRepresentation GetRepresentationImpl() const;
  465. NUdf::TUnboxedValue GetValueImpl(TComputationContext&) const;
  466. };
  467. template <typename TDerived>
  468. class TWideFlowBaseComputationNode: public TFlowBaseComputationNode<TDerived, IComputationWideFlowNode>,
  469. protected TWideFlowBaseComputationNodeBase
  470. {
  471. protected:
  472. TWideFlowBaseComputationNode(const IComputationNode* source)
  473. : TFlowBaseComputationNode<TDerived, IComputationWideFlowNode>(source)
  474. {}
  475. private:
  476. EValueRepresentation GetRepresentation() const final {
  477. return GetRepresentationImpl();
  478. }
  479. NUdf::TUnboxedValue GetValue(TComputationContext& ctx) const {
  480. return GetValueImpl(ctx);
  481. }
  482. };
  483. class TStatelessWideFlowComputationNodeBase {
  484. protected:
  485. ui32 GetIndexImpl() const;
  486. void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
  487. IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const;
  488. };
  489. template <typename TDerived>
  490. class TStatelessWideFlowComputationNode: public TWideFlowBaseComputationNode<TDerived>, protected TStatelessWideFlowComputationNodeBase
  491. {
  492. protected:
  493. TStatelessWideFlowComputationNode(const IComputationNode* source)
  494. :TWideFlowBaseComputationNode<TDerived>(source)
  495. {}
  496. private:
  497. EFetchResult FetchValues(TComputationContext& compCtx, NUdf::TUnboxedValue*const* values) const final {
  498. return static_cast<const TDerived*>(this)->DoCalculate(compCtx, values);
  499. }
  500. ui32 GetIndex() const final {
  501. return GetIndexImpl();
  502. }
  503. void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
  504. CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence);
  505. }
  506. };
  507. class TStatefulWideFlowComputationNodeBase {
  508. protected:
  509. TStatefulWideFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation stateKind);
  510. void CollectDependentIndexesImpl(const IComputationNode* self,
  511. const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const;
  512. const ui32 StateIndex;
  513. const EValueRepresentation StateKind;
  514. };
  515. template <typename TDerived, bool SerializableState = false>
  516. class TStatefulWideFlowComputationNode: public TWideFlowBaseComputationNode<TDerived>, protected TStatefulWideFlowComputationNodeBase
  517. {
  518. protected:
  519. TStatefulWideFlowComputationNode(TComputationMutables& mutables, const IComputationNode* source, EValueRepresentation stateKind)
  520. : TWideFlowBaseComputationNode<TDerived>(source), TStatefulWideFlowComputationNodeBase(mutables.CurValueIndex++, stateKind)
  521. {
  522. if constexpr (SerializableState) {
  523. mutables.SerializableValues.push_back(StateIndex);
  524. }
  525. }
  526. NUdf::TUnboxedValue& RefState(TComputationContext& compCtx) const {
  527. return compCtx.MutableValues[GetIndex()];
  528. }
  529. private:
  530. EFetchResult FetchValues(TComputationContext& compCtx, NUdf::TUnboxedValue*const* values) const final {
  531. return static_cast<const TDerived*>(this)->DoCalculate(compCtx.MutableValues[StateIndex], compCtx, values);
  532. }
  533. ui32 GetIndex() const final {
  534. return StateIndex;
  535. }
  536. void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
  537. CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence);
  538. }
  539. };
  540. class TPairStateWideFlowComputationNodeBase {
  541. protected:
  542. TPairStateWideFlowComputationNodeBase(ui32 stateIndex, EValueRepresentation firstKind, EValueRepresentation secondKind);
  543. void CollectDependentIndexesImpl(const IComputationNode* self, const IComputationNode* owner,
  544. IComputationNode::TIndexesMap& dependencies, const IComputationNode* dependence) const;
  545. const ui32 StateIndex;
  546. const EValueRepresentation FirstKind, SecondKind;
  547. };
  548. template <typename TDerived>
  549. class TPairStateWideFlowComputationNode: public TWideFlowBaseComputationNode<TDerived>, protected TPairStateWideFlowComputationNodeBase
  550. {
  551. protected:
  552. TPairStateWideFlowComputationNode(TComputationMutables& mutables, const IComputationNode* source, EValueRepresentation firstKind, EValueRepresentation secondKind)
  553. : TWideFlowBaseComputationNode<TDerived>(source), TPairStateWideFlowComputationNodeBase(mutables.CurValueIndex++, firstKind, secondKind)
  554. {
  555. ++mutables.CurValueIndex;
  556. }
  557. private:
  558. EFetchResult FetchValues(TComputationContext& compCtx, NUdf::TUnboxedValue*const* values) const final {
  559. return static_cast<const TDerived*>(this)->DoCalculate(compCtx.MutableValues[StateIndex], compCtx.MutableValues[StateIndex + 1U], compCtx, values);
  560. }
  561. ui32 GetIndex() const final {
  562. return StateIndex;
  563. }
  564. void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
  565. CollectDependentIndexesImpl(this, owner, dependencies, this->Dependence);
  566. }
  567. };
  568. class TDecoratorComputationNodeBase {
  569. protected:
  570. TDecoratorComputationNodeBase(IComputationNode* node, EValueRepresentation kind);
  571. ui32 GetIndexImpl() const;
  572. TString DebugStringImpl(const TString& typeName) const;
  573. IComputationNode *const Node;
  574. const EValueRepresentation Kind;
  575. };
  576. template <typename TDerived>
  577. class TDecoratorComputationNode: public TRefCountedComputationNode<IComputationNode>, protected TDecoratorComputationNodeBase
  578. {
  579. private:
  580. void InitNode(TComputationContext&) const final {}
  581. const IComputationNode* GetSource() const final { return Node; }
  582. IComputationNode* AddDependence(const IComputationNode* node) final {
  583. return Node->AddDependence(node);
  584. }
  585. EValueRepresentation GetRepresentation() const final { return Kind; }
  586. bool IsTemporaryValue() const final { return true; }
  587. void PrepareStageOne() final {}
  588. void PrepareStageTwo() final {}
  589. void RegisterDependencies() const final { Node->AddDependence(this); }
  590. void CollectDependentIndexes(const IComputationNode*, TIndexesMap&) const final {}
  591. ui32 GetDependencyWeight() const final { return 0U; }
  592. ui32 GetDependencesCount() const final {
  593. return Node->GetDependencesCount();
  594. }
  595. ui32 GetIndex() const final {
  596. return GetIndexImpl();
  597. }
  598. NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const final {
  599. return static_cast<const TDerived*>(this)->DoCalculate(compCtx, Node->GetValue(compCtx));
  600. }
  601. protected:
  602. TDecoratorComputationNode(IComputationNode* node, EValueRepresentation kind)
  603. : TDecoratorComputationNodeBase(node, kind)
  604. {}
  605. TDecoratorComputationNode(IComputationNode* node)
  606. : TDecoratorComputationNodeBase(node, node->GetRepresentation())
  607. {}
  608. TString DebugString() const override {
  609. return DebugStringImpl(TypeName<TDerived>());
  610. }
  611. };
  612. class TBinaryComputationNodeBase {
  613. protected:
  614. TBinaryComputationNodeBase(IComputationNode* left, IComputationNode* right, EValueRepresentation kind);
  615. ui32 GetIndexImpl() const;
  616. TString DebugStringImpl(const TString& typeName) const;
  617. IComputationNode *const Left;
  618. IComputationNode *const Right;
  619. const EValueRepresentation Kind;
  620. };
  621. template <typename TDerived>
  622. class TBinaryComputationNode: public TRefCountedComputationNode<IComputationNode>, protected TBinaryComputationNodeBase
  623. {
  624. private:
  625. NUdf::TUnboxedValue GetValue(TComputationContext& ctx) const final {
  626. return static_cast<const TDerived*>(this)->DoCalculate(ctx);
  627. }
  628. const IComputationNode* GetSource() const final {
  629. return GetCommonSource(Left, Right, this);
  630. }
  631. void InitNode(TComputationContext&) const final {}
  632. protected:
  633. TString DebugString() const override {
  634. return DebugStringImpl(TypeName<TDerived>());
  635. }
  636. IComputationNode* AddDependence(const IComputationNode* node) final {
  637. const auto l = Left->AddDependence(node);
  638. const auto r = Right->AddDependence(node);
  639. if (!l) return r;
  640. if (!r) return l;
  641. return this;
  642. }
  643. EValueRepresentation GetRepresentation() const final {
  644. return Kind;
  645. }
  646. bool IsTemporaryValue() const final { return true; }
  647. void PrepareStageOne() final {}
  648. void PrepareStageTwo() final {}
  649. void RegisterDependencies() const final {
  650. Left->AddDependence(this);
  651. Right->AddDependence(this);
  652. }
  653. void CollectDependentIndexes(const IComputationNode*, TIndexesMap&) const final {}
  654. ui32 GetDependencyWeight() const final { return 0U; }
  655. ui32 GetDependencesCount() const final {
  656. return Left->GetDependencesCount() + Right->GetDependencesCount();
  657. }
  658. ui32 GetIndex() const final {
  659. return GetIndexImpl();
  660. }
  661. TBinaryComputationNode(IComputationNode* left, IComputationNode* right, const EValueRepresentation kind)
  662. : TBinaryComputationNodeBase(left, right, kind)
  663. {
  664. }
  665. };
  666. [[noreturn]]
  667. void ThrowNotSupportedImplForClass(const TString& className, const char *func);
  668. class TComputationValueBaseNotSupportedStub: public NYql::NUdf::IBoxedValue
  669. {
  670. private:
  671. using TBase = NYql::NUdf::IBoxedValue;
  672. public:
  673. template <typename... Args>
  674. TComputationValueBaseNotSupportedStub(Args&&... args)
  675. : TBase(std::forward<Args>(args)...)
  676. {
  677. }
  678. ~TComputationValueBaseNotSupportedStub() {
  679. }
  680. private:
  681. bool HasFastListLength() const override;
  682. ui64 GetListLength() const override;
  683. ui64 GetEstimatedListLength() const override;
  684. bool HasListItems() const override;
  685. const NUdf::TOpaqueListRepresentation* GetListRepresentation() const override;
  686. NUdf::IBoxedValuePtr ReverseListImpl(const NUdf::IValueBuilder& builder) const override;
  687. NUdf::IBoxedValuePtr SkipListImpl(const NUdf::IValueBuilder& builder, ui64 count) const override;
  688. NUdf::IBoxedValuePtr TakeListImpl(const NUdf::IValueBuilder& builder, ui64 count) const override;
  689. NUdf::IBoxedValuePtr ToIndexDictImpl(const NUdf::IValueBuilder& builder) const override;
  690. ui64 GetDictLength() const override;
  691. bool HasDictItems() const override;
  692. NUdf::TStringRef GetResourceTag() const override;
  693. void* GetResource() override;
  694. void Apply(NUdf::IApplyContext& applyCtx) const override;
  695. NUdf::TUnboxedValue GetListIterator() const override ;
  696. NUdf::TUnboxedValue GetDictIterator() const override;
  697. NUdf::TUnboxedValue GetKeysIterator() const override;
  698. NUdf::TUnboxedValue GetPayloadsIterator() const override;
  699. bool Contains(const NUdf::TUnboxedValuePod& key) const override;
  700. NUdf::TUnboxedValue Lookup(const NUdf::TUnboxedValuePod& key) const override;
  701. NUdf::TUnboxedValue GetElement(ui32 index) const override ;
  702. const NUdf::TUnboxedValue* GetElements() const override;
  703. NUdf::TUnboxedValue Run(
  704. const NUdf::IValueBuilder* valueBuilder,
  705. const NUdf::TUnboxedValuePod* args) const override;
  706. bool Skip() override;
  707. bool Next(NUdf::TUnboxedValue&) override;
  708. bool NextPair(NUdf::TUnboxedValue&, NUdf::TUnboxedValue&) override;
  709. ui32 GetVariantIndex() const override;
  710. NUdf::TUnboxedValue GetVariantItem() const override;
  711. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override;
  712. ui32 GetTraverseCount() const override;
  713. NUdf::TUnboxedValue GetTraverseItem(ui32 index) const override;
  714. NUdf::TUnboxedValue Save() const override;
  715. void Load(const NUdf::TStringRef& state) override;
  716. bool Load2(const NUdf::TUnboxedValue& state) override;
  717. void Push(const NUdf::TUnboxedValuePod& value) override;
  718. bool IsSortedDict() const override;
  719. void Unused1() override;
  720. void Unused2() override;
  721. void Unused3() override;
  722. void Unused4() override;
  723. void Unused5() override;
  724. void Unused6() override;
  725. NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) override;
  726. protected:
  727. [[noreturn]] virtual void ThrowNotSupported(const char* func) const = 0;
  728. };
  729. template <typename TDerived>
  730. class TComputationValueBase: public TComputationValueBaseNotSupportedStub
  731. {
  732. private:
  733. using TBase = TComputationValueBaseNotSupportedStub;
  734. public:
  735. template <typename... Args>
  736. TComputationValueBase(Args&&... args)
  737. : TBase(std::forward<Args>(args)...)
  738. {
  739. }
  740. ~TComputationValueBase() {
  741. }
  742. TString DebugString() const {
  743. return TypeName<TDerived>();
  744. }
  745. protected:
  746. [[noreturn]] void ThrowNotSupported(const char* func) const final {
  747. ThrowNotSupportedImplForClass(TypeName(*this), func);
  748. }
  749. };
  750. template <typename TDerived, EMemorySubPool MemoryPool>
  751. class TComputationValueImpl: public TComputationValueBase<TDerived>,
  752. public TWithMiniKQLAlloc<MemoryPool> {
  753. private:
  754. using TBase = TComputationValueBase<TDerived>;
  755. protected:
  756. inline TMemoryUsageInfo* GetMemInfo() const {
  757. #ifndef NDEBUG
  758. return static_cast<TMemoryUsageInfo*>(M_.MemInfo);
  759. #else
  760. return nullptr;
  761. #endif
  762. }
  763. using TWithMiniKQLAlloc<MemoryPool>::AllocWithSize;
  764. using TWithMiniKQLAlloc<MemoryPool>::FreeWithSize;
  765. public:
  766. template <typename... Args>
  767. TComputationValueImpl(TMemoryUsageInfo* memInfo, Args&&... args)
  768. : TBase(std::forward<Args>(args)...) {
  769. #ifndef NDEBUG
  770. M_.MemInfo = memInfo;
  771. MKQL_MEM_TAKE(memInfo, this, sizeof(TDerived), __MKQL_LOCATION__);
  772. #else
  773. Y_UNUSED(memInfo);
  774. #endif
  775. }
  776. ~TComputationValueImpl() {
  777. #ifndef NDEBUG
  778. MKQL_MEM_RETURN(GetMemInfo(), this, sizeof(TDerived));
  779. #endif
  780. }
  781. private:
  782. #ifndef NDEBUG
  783. struct {
  784. void* MemInfo; // used for tracking memory usage during execution
  785. } M_;
  786. #endif
  787. };
  788. template <typename TDerived>
  789. class TTemporaryComputationValue: public TComputationValueImpl<TDerived, EMemorySubPool::Temporary> {
  790. private:
  791. using TBase = TComputationValueImpl<TDerived, EMemorySubPool::Temporary>;
  792. public:
  793. using TBase::TBase;
  794. };
  795. template <typename TDerived>
  796. class TComputationValue: public TComputationValueImpl<TDerived, EMemorySubPool::Default> {
  797. private:
  798. using TBase = TComputationValueImpl<TDerived, EMemorySubPool::Default>;
  799. public:
  800. using TBase::TBase;
  801. };
  802. template<bool IsStream>
  803. struct TThresher;
  804. template<>
  805. struct TThresher<true> {
  806. template<class Handler>
  807. static void DoForEachItem(const NUdf::TUnboxedValuePod& stream, const Handler& handler) {
  808. for (NUdf::TUnboxedValue item;; handler(std::move(item))) {
  809. const auto status = stream.Fetch(item);
  810. MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Unexpected stream status!");
  811. if (status == NUdf::EFetchStatus::Finish)
  812. break;
  813. }
  814. }
  815. };
  816. template<>
  817. struct TThresher<false> {
  818. template<class Handler>
  819. static void DoForEachItem(const NUdf::TUnboxedValuePod& list, const Handler& handler) {
  820. if (auto ptr = list.GetElements()) {
  821. if (auto size = list.GetListLength()) do {
  822. handler(NUdf::TUnboxedValue(*ptr++));
  823. } while (--size);
  824. } else if (const auto& iter = list.GetListIterator()) {
  825. for (NUdf::TUnboxedValue item; iter.Next(item); handler(std::move(item)))
  826. continue;
  827. }
  828. }
  829. };
  830. IComputationNode* LocateNode(const TNodeLocator& nodeLocator, TCallable& callable, ui32 index, bool pop = false);
  831. IComputationNode* LocateNode(const TNodeLocator& nodeLocator, TNode& node, bool pop = false);
  832. IComputationExternalNode* LocateExternalNode(const TNodeLocator& nodeLocator, TCallable& callable, ui32 index, bool pop = true);
  833. using TPasstroughtMap = std::vector<std::optional<size_t>>;
  834. template<class TContainerOne, class TContainerTwo>
  835. TPasstroughtMap GetPasstroughtMap(const TContainerOne& from, const TContainerTwo& to);
  836. template<class TContainerOne, class TContainerTwo>
  837. TPasstroughtMap GetPasstroughtMapOneToOne(const TContainerOne& from, const TContainerTwo& to);
  838. std::optional<size_t> IsPasstrought(const IComputationNode* root, const TComputationExternalNodePtrVector& args);
  839. TPasstroughtMap MergePasstroughtMaps(const TPasstroughtMap& lhs, const TPasstroughtMap& rhs);
  840. void ApplyChanges(const NUdf::TUnboxedValue& value, NUdf::IApplyContext& applyCtx);
  841. void CleanupCurrentContext();
  842. }
  843. }