worker.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613
  1. #include "worker.h"
  2. #include "compile_mkql.h"
  3. #include <yql/essentials/ast/yql_expr.h>
  4. #include <yql/essentials/core/yql_user_data.h>
  5. #include <yql/essentials/core/yql_user_data_storage.h>
  6. #include <yql/essentials/providers/common/comp_nodes/yql_factory.h>
  7. #include <yql/essentials/public/purecalc/common/names.h>
  8. #include <yql/essentials/minikql/mkql_function_registry.h>
  9. #include <yql/essentials/minikql/mkql_node.h>
  10. #include <yql/essentials/minikql/mkql_node_builder.h>
  11. #include <yql/essentials/minikql/mkql_node_cast.h>
  12. #include <yql/essentials/minikql/mkql_node_visitor.h>
  13. #include <yql/essentials/minikql/mkql_node_serialization.h>
  14. #include <yql/essentials/minikql/mkql_program_builder.h>
  15. #include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
  16. #include <yql/essentials/minikql/computation/mkql_computation_node.h>
  17. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  18. #include <yql/essentials/minikql/computation/mkql_computation_node_impl.h>
  19. #include <yql/essentials/providers/common/mkql/yql_provider_mkql.h>
  20. #include <yql/essentials/providers/common/mkql/yql_type_mkql.h>
  21. #include <library/cpp/random_provider/random_provider.h>
  22. #include <library/cpp/time_provider/time_provider.h>
  23. #include <util/stream/file.h>
  24. #include <yql/essentials/minikql/computation/mkql_custom_list.h>
  25. #include <yql/essentials/parser/pg_wrapper/interface/comp_factory.h>
  26. using namespace NYql;
  27. using namespace NYql::NPureCalc;
  28. TWorkerGraph::TWorkerGraph(
  29. const TExprNode::TPtr& exprRoot,
  30. TExprContext& exprCtx,
  31. const TString& serializedProgram,
  32. const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry,
  33. const TUserDataTable& userData,
  34. const TVector<const TStructExprType*>& inputTypes,
  35. const TVector<const TStructExprType*>& originalInputTypes,
  36. const TVector<const TStructExprType*>& rawInputTypes,
  37. const TTypeAnnotationNode* outputType,
  38. const TTypeAnnotationNode* rawOutputType,
  39. const TString& LLVMSettings,
  40. NKikimr::NUdf::ICountersProvider* countersProvider,
  41. ui64 nativeYtTypeFlags,
  42. TMaybe<ui64> deterministicTimeProviderSeed
  43. )
  44. : ScopedAlloc_(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators())
  45. , Env_(ScopedAlloc_)
  46. , FuncRegistry_(funcRegistry)
  47. , RandomProvider_(CreateDefaultRandomProvider())
  48. , TimeProvider_(deterministicTimeProviderSeed ?
  49. CreateDeterministicTimeProvider(*deterministicTimeProviderSeed) :
  50. CreateDefaultTimeProvider())
  51. , LLVMSettings_(LLVMSettings)
  52. , NativeYtTypeFlags_(nativeYtTypeFlags)
  53. {
  54. // Build the root MKQL node
  55. NKikimr::NMiniKQL::TRuntimeNode rootNode;
  56. if (exprRoot) {
  57. rootNode = CompileMkql(exprRoot, exprCtx, FuncRegistry_, Env_, userData);
  58. } else {
  59. rootNode = NKikimr::NMiniKQL::DeserializeRuntimeNode(serializedProgram, Env_);
  60. }
  61. // Prepare container for input nodes
  62. const ui32 inputsCount = inputTypes.size();
  63. YQL_ENSURE(inputTypes.size() == originalInputTypes.size());
  64. SelfNodes_.resize(inputsCount, nullptr);
  65. YQL_ENSURE(SelfNodes_.size() == inputsCount);
  66. // Setup struct types
  67. NKikimr::NMiniKQL::TProgramBuilder pgmBuilder(Env_, FuncRegistry_);
  68. for (ui32 i = 0; i < inputsCount; ++i) {
  69. const auto* type = static_cast<NKikimr::NMiniKQL::TStructType*>(NCommon::BuildType(TPositionHandle(), *inputTypes[i], pgmBuilder));
  70. const auto* originalType = type;
  71. const auto* rawType = static_cast<NKikimr::NMiniKQL::TStructType*>(NCommon::BuildType(TPositionHandle(), *rawInputTypes[i], pgmBuilder));
  72. if (inputTypes[i] != originalInputTypes[i]) {
  73. YQL_ENSURE(inputTypes[i]->GetSize() >= originalInputTypes[i]->GetSize());
  74. originalType = static_cast<NKikimr::NMiniKQL::TStructType*>(NCommon::BuildType(TPositionHandle(), *originalInputTypes[i], pgmBuilder));
  75. }
  76. InputTypes_.push_back(type);
  77. OriginalInputTypes_.push_back(originalType);
  78. RawInputTypes_.push_back(rawType);
  79. }
  80. if (outputType) {
  81. OutputType_ = NCommon::BuildType(TPositionHandle(), *outputType, pgmBuilder);
  82. }
  83. if (rawOutputType) {
  84. RawOutputType_ = NCommon::BuildType(TPositionHandle(), *rawOutputType, pgmBuilder);
  85. }
  86. if (!exprRoot) {
  87. auto outMkqlType = rootNode.GetStaticType();
  88. if (outMkqlType->GetKind() == NKikimr::NMiniKQL::TType::EKind::List) {
  89. outMkqlType = static_cast<NKikimr::NMiniKQL::TListType*>(outMkqlType)->GetItemType();
  90. } else if (outMkqlType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Stream) {
  91. outMkqlType = static_cast<NKikimr::NMiniKQL::TStreamType*>(outMkqlType)->GetItemType();
  92. } else {
  93. ythrow TCompileError("", "") << "unexpected mkql output type " << NKikimr::NMiniKQL::TType::KindAsStr(outMkqlType->GetKind());
  94. }
  95. if (OutputType_) {
  96. if (!OutputType_->IsSameType(*outMkqlType)) {
  97. ythrow TCompileError("", "") << "precompiled program output type doesn't match the output schema";
  98. }
  99. } else {
  100. OutputType_ = outMkqlType;
  101. RawOutputType_ = outMkqlType;
  102. }
  103. }
  104. // Compile computation pattern
  105. const THashSet<NKikimr::NMiniKQL::TInternName> selfCallableNames = {
  106. Env_.InternName(PurecalcInputCallableName),
  107. Env_.InternName(PurecalcBlockInputCallableName)
  108. };
  109. NKikimr::NMiniKQL::TExploringNodeVisitor explorer;
  110. explorer.Walk(rootNode.GetNode(), Env_);
  111. auto compositeNodeFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory(
  112. {NKikimr::NMiniKQL::GetYqlFactory(), NYql::GetPgFactory()}
  113. );
  114. auto nodeFactory = [&](
  115. NKikimr::NMiniKQL::TCallable& callable, const NKikimr::NMiniKQL::TComputationNodeFactoryContext& ctx
  116. ) -> NKikimr::NMiniKQL::IComputationNode* {
  117. if (selfCallableNames.contains(callable.GetType()->GetNameStr())) {
  118. YQL_ENSURE(callable.GetInputsCount() == 1, "Self takes exactly 1 argument");
  119. const auto inputIndex = AS_VALUE(NKikimr::NMiniKQL::TDataLiteral, callable.GetInput(0))->AsValue().Get<ui32>();
  120. YQL_ENSURE(inputIndex < inputsCount, "Self index is out of range");
  121. YQL_ENSURE(!SelfNodes_[inputIndex], "Self can be called at most once with each index");
  122. return SelfNodes_[inputIndex] = new NKikimr::NMiniKQL::TExternalComputationNode(ctx.Mutables);
  123. }
  124. else {
  125. return compositeNodeFactory(callable, ctx);
  126. }
  127. };
  128. NKikimr::NMiniKQL::TComputationPatternOpts computationPatternOpts(
  129. ScopedAlloc_.Ref(),
  130. Env_,
  131. nodeFactory,
  132. &funcRegistry,
  133. NKikimr::NUdf::EValidateMode::None,
  134. NKikimr::NUdf::EValidatePolicy::Exception,
  135. LLVMSettings,
  136. NKikimr::NMiniKQL::EGraphPerProcess::Multi,
  137. nullptr,
  138. countersProvider);
  139. ComputationPattern_ = NKikimr::NMiniKQL::MakeComputationPattern(
  140. explorer,
  141. rootNode,
  142. { rootNode.GetNode() },
  143. computationPatternOpts);
  144. ComputationGraph_ = ComputationPattern_->Clone(
  145. computationPatternOpts.ToComputationOptions(*RandomProvider_, *TimeProvider_));
  146. ComputationGraph_->Prepare();
  147. // Scoped alloc acquires itself on construction. We need to release it before returning control to user.
  148. // Note that scoped alloc releases itself on destruction so it is no problem if the above code throws.
  149. ScopedAlloc_.Release();
  150. }
  151. TWorkerGraph::~TWorkerGraph() {
  152. // Remember, we've released scoped alloc in constructor? Now, we need to acquire it back before destroying.
  153. ScopedAlloc_.Acquire();
  154. }
  155. template <typename TBase>
  156. TWorker<TBase>::TWorker(
  157. TWorkerFactoryPtr factory,
  158. const TExprNode::TPtr& exprRoot,
  159. TExprContext& exprCtx,
  160. const TString& serializedProgram,
  161. const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry,
  162. const TUserDataTable& userData,
  163. const TVector<const TStructExprType*>& inputTypes,
  164. const TVector<const TStructExprType*>& originalInputTypes,
  165. const TVector<const TStructExprType*>& rawInputTypes,
  166. const TTypeAnnotationNode* outputType,
  167. const TTypeAnnotationNode* rawOutputType,
  168. const TString& LLVMSettings,
  169. NKikimr::NUdf::ICountersProvider* countersProvider,
  170. ui64 nativeYtTypeFlags,
  171. TMaybe<ui64> deterministicTimeProviderSeed
  172. )
  173. : WorkerFactory_(std::move(factory))
  174. , Graph_(exprRoot, exprCtx, serializedProgram, funcRegistry, userData,
  175. inputTypes, originalInputTypes, rawInputTypes, outputType, rawOutputType,
  176. LLVMSettings, countersProvider, nativeYtTypeFlags, deterministicTimeProviderSeed)
  177. {
  178. }
  179. template <typename TBase>
  180. inline ui32 TWorker<TBase>::GetInputsCount() const {
  181. return Graph_.InputTypes_.size();
  182. }
  183. template <typename TBase>
  184. inline const NKikimr::NMiniKQL::TStructType* TWorker<TBase>::GetInputType(ui32 inputIndex, bool original) const {
  185. const auto& container = original ? Graph_.OriginalInputTypes_ : Graph_.InputTypes_;
  186. YQL_ENSURE(inputIndex < container.size(), "invalid input index (" << inputIndex << ") in GetInputType call");
  187. return container[inputIndex];
  188. }
  189. template <typename TBase>
  190. inline const NKikimr::NMiniKQL::TStructType* TWorker<TBase>::GetInputType(bool original) const {
  191. const auto& container = original ? Graph_.OriginalInputTypes_ : Graph_.InputTypes_;
  192. YQL_ENSURE(container.size() == 1, "GetInputType() can be used only for single-input programs");
  193. return container[0];
  194. }
  195. template <typename TBase>
  196. inline const NKikimr::NMiniKQL::TStructType* TWorker<TBase>::GetRawInputType(ui32 inputIndex) const {
  197. const auto& container = Graph_.RawInputTypes_;
  198. YQL_ENSURE(inputIndex < container.size(), "invalid input index (" << inputIndex << ") in GetInputType call");
  199. return container[inputIndex];
  200. }
  201. template <typename TBase>
  202. inline const NKikimr::NMiniKQL::TStructType* TWorker<TBase>::GetRawInputType() const {
  203. const auto& container = Graph_.RawInputTypes_;
  204. YQL_ENSURE(container.size() == 1, "GetInputType() can be used only for single-input programs");
  205. return container[0];
  206. }
  207. template <typename TBase>
  208. inline const NKikimr::NMiniKQL::TType* TWorker<TBase>::GetOutputType() const {
  209. return Graph_.OutputType_;
  210. }
  211. template <typename TBase>
  212. inline const NKikimr::NMiniKQL::TType* TWorker<TBase>::GetRawOutputType() const {
  213. return Graph_.RawOutputType_;
  214. }
  215. template <typename TBase>
  216. NYT::TNode TWorker<TBase>::MakeInputSchema(ui32 inputIndex) const {
  217. auto p = WorkerFactory_.lock();
  218. YQL_ENSURE(p, "Access to destroyed worker factory");
  219. return p->MakeInputSchema(inputIndex);
  220. }
  221. template <typename TBase>
  222. NYT::TNode TWorker<TBase>::MakeInputSchema() const {
  223. auto p = WorkerFactory_.lock();
  224. YQL_ENSURE(p, "Access to destroyed worker factory");
  225. return p->MakeInputSchema();
  226. }
  227. template <typename TBase>
  228. NYT::TNode TWorker<TBase>::MakeOutputSchema() const {
  229. auto p = WorkerFactory_.lock();
  230. YQL_ENSURE(p, "Access to destroyed worker factory");
  231. return p->MakeOutputSchema();
  232. }
  233. template <typename TBase>
  234. NYT::TNode TWorker<TBase>::MakeOutputSchema(ui32) const {
  235. auto p = WorkerFactory_.lock();
  236. YQL_ENSURE(p, "Access to destroyed worker factory");
  237. return p->MakeOutputSchema();
  238. }
  239. template <typename TBase>
  240. NYT::TNode TWorker<TBase>::MakeOutputSchema(TStringBuf) const {
  241. auto p = WorkerFactory_.lock();
  242. YQL_ENSURE(p, "Access to destroyed worker factory");
  243. return p->MakeOutputSchema();
  244. }
  245. template <typename TBase>
  246. NYT::TNode TWorker<TBase>::MakeFullOutputSchema() const {
  247. auto p = WorkerFactory_.lock();
  248. YQL_ENSURE(p, "Access to destroyed worker factory");
  249. return p->MakeFullOutputSchema();
  250. }
  251. template <typename TBase>
  252. inline NKikimr::NMiniKQL::TScopedAlloc& TWorker<TBase>::GetScopedAlloc() {
  253. return Graph_.ScopedAlloc_;
  254. }
  255. template <typename TBase>
  256. inline NKikimr::NMiniKQL::IComputationGraph& TWorker<TBase>::GetGraph() {
  257. return *Graph_.ComputationGraph_;
  258. }
  259. template <typename TBase>
  260. inline const NKikimr::NMiniKQL::IFunctionRegistry&
  261. TWorker<TBase>::GetFunctionRegistry() const {
  262. return Graph_.FuncRegistry_;
  263. }
  264. template <typename TBase>
  265. inline NKikimr::NMiniKQL::TTypeEnvironment&
  266. TWorker<TBase>::GetTypeEnvironment() {
  267. return Graph_.Env_;
  268. }
  269. template <typename TBase>
  270. inline const TString& TWorker<TBase>::GetLLVMSettings() const {
  271. return Graph_.LLVMSettings_;
  272. }
  273. template <typename TBase>
  274. inline ui64 TWorker<TBase>::GetNativeYtTypeFlags() const {
  275. return Graph_.NativeYtTypeFlags_;
  276. }
  277. template <typename TBase>
  278. ITimeProvider* TWorker<TBase>::GetTimeProvider() const {
  279. return Graph_.TimeProvider_.Get();
  280. }
  281. template <typename TBase>
  282. void TWorker<TBase>::Release() {
  283. if (auto p = WorkerFactory_.lock()) {
  284. p->ReturnWorker(this);
  285. } else {
  286. delete this;
  287. }
  288. }
  289. TPullStreamWorker::~TPullStreamWorker() {
  290. auto guard = Guard(GetScopedAlloc());
  291. Output_.Clear();
  292. }
  293. void TPullStreamWorker::SetInput(NKikimr::NUdf::TUnboxedValue&& value, ui32 inputIndex) {
  294. const auto inputsCount = Graph_.SelfNodes_.size();
  295. if (Y_UNLIKELY(inputIndex >= inputsCount)) {
  296. ythrow yexception() << "invalid input index (" << inputIndex << ") in SetInput call";
  297. }
  298. if (HasInput_.size() < inputsCount) {
  299. HasInput_.resize(inputsCount, false);
  300. }
  301. if (Y_UNLIKELY(HasInput_[inputIndex])) {
  302. ythrow yexception() << "input value for #" << inputIndex << " input is already set";
  303. }
  304. auto selfNode = Graph_.SelfNodes_[inputIndex];
  305. if (selfNode) {
  306. YQL_ENSURE(value);
  307. selfNode->SetValue(Graph_.ComputationGraph_->GetContext(), std::move(value));
  308. }
  309. HasInput_[inputIndex] = true;
  310. if (CheckAllInputsSet()) {
  311. Output_ = Graph_.ComputationGraph_->GetValue();
  312. }
  313. }
  314. NKikimr::NUdf::TUnboxedValue& TPullStreamWorker::GetOutput() {
  315. if (Y_UNLIKELY(!CheckAllInputsSet())) {
  316. ythrow yexception() << "some input values have not been set";
  317. }
  318. return Output_;
  319. }
  320. void TPullStreamWorker::Release() {
  321. with_lock(GetScopedAlloc()) {
  322. Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  323. for (auto selfNode: Graph_.SelfNodes_) {
  324. if (selfNode) {
  325. selfNode->SetValue(Graph_.ComputationGraph_->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid());
  326. }
  327. }
  328. }
  329. HasInput_.clear();
  330. TWorker<IPullStreamWorker>::Release();
  331. }
  332. TPullListWorker::~TPullListWorker() {
  333. auto guard = Guard(GetScopedAlloc());
  334. Output_.Clear();
  335. OutputIterator_.Clear();
  336. }
  337. void TPullListWorker::SetInput(NKikimr::NUdf::TUnboxedValue&& value, ui32 inputIndex) {
  338. const auto inputsCount = Graph_.SelfNodes_.size();
  339. if (Y_UNLIKELY(inputIndex >= inputsCount)) {
  340. ythrow yexception() << "invalid input index (" << inputIndex << ") in SetInput call";
  341. }
  342. if (HasInput_.size() < inputsCount) {
  343. HasInput_.resize(inputsCount, false);
  344. }
  345. if (Y_UNLIKELY(HasInput_[inputIndex])) {
  346. ythrow yexception() << "input value for #" << inputIndex << " input is already set";
  347. }
  348. auto selfNode = Graph_.SelfNodes_[inputIndex];
  349. if (selfNode) {
  350. YQL_ENSURE(value);
  351. selfNode->SetValue(Graph_.ComputationGraph_->GetContext(), std::move(value));
  352. }
  353. HasInput_[inputIndex] = true;
  354. if (CheckAllInputsSet()) {
  355. Output_ = Graph_.ComputationGraph_->GetValue();
  356. ResetOutputIterator();
  357. }
  358. }
  359. NKikimr::NUdf::TUnboxedValue& TPullListWorker::GetOutput() {
  360. if (Y_UNLIKELY(!CheckAllInputsSet())) {
  361. ythrow yexception() << "some input values have not been set";
  362. }
  363. return Output_;
  364. }
  365. NKikimr::NUdf::TUnboxedValue& TPullListWorker::GetOutputIterator() {
  366. if (Y_UNLIKELY(!CheckAllInputsSet())) {
  367. ythrow yexception() << "some input values have not been set";
  368. }
  369. return OutputIterator_;
  370. }
  371. void TPullListWorker::ResetOutputIterator() {
  372. if (Y_UNLIKELY(!CheckAllInputsSet())) {
  373. ythrow yexception() << "some input values have not been set";
  374. }
  375. OutputIterator_ = Output_.GetListIterator();
  376. }
  377. void TPullListWorker::Release() {
  378. with_lock(GetScopedAlloc()) {
  379. Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  380. OutputIterator_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  381. for (auto selfNode: Graph_.SelfNodes_) {
  382. if (selfNode) {
  383. selfNode->SetValue(Graph_.ComputationGraph_->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid());
  384. }
  385. }
  386. }
  387. HasInput_.clear();
  388. TWorker<IPullListWorker>::Release();
  389. }
  390. namespace {
  391. class TPushStream final: public NKikimr::NMiniKQL::TCustomListValue {
  392. private:
  393. mutable bool HasIterator_ = false;
  394. bool HasValue_ = false;
  395. bool IsFinished_ = false;
  396. NKikimr::NUdf::TUnboxedValue Value_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  397. public:
  398. using TCustomListValue::TCustomListValue;
  399. public:
  400. void SetValue(NKikimr::NUdf::TUnboxedValue&& value) {
  401. Value_ = std::move(value);
  402. HasValue_ = true;
  403. }
  404. void SetFinished() {
  405. IsFinished_ = true;
  406. }
  407. NKikimr::NUdf::TUnboxedValue GetListIterator() const override {
  408. YQL_ENSURE(!HasIterator_, "only one pass over input is supported");
  409. HasIterator_ = true;
  410. return NKikimr::NUdf::TUnboxedValuePod(const_cast<TPushStream*>(this));
  411. }
  412. NKikimr::NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) override {
  413. if (IsFinished_) {
  414. return NKikimr::NUdf::EFetchStatus::Finish;
  415. } else if (!HasValue_) {
  416. return NKikimr::NUdf::EFetchStatus::Yield;
  417. } else {
  418. result = std::move(Value_);
  419. HasValue_ = false;
  420. return NKikimr::NUdf::EFetchStatus::Ok;
  421. }
  422. }
  423. };
  424. }
  425. void TPushStreamWorker::FeedToConsumer() {
  426. auto value = Graph_.ComputationGraph_->GetValue();
  427. for (;;) {
  428. NKikimr::NUdf::TUnboxedValue item;
  429. auto status = value.Fetch(item);
  430. if (status != NKikimr::NUdf::EFetchStatus::Ok) {
  431. break;
  432. }
  433. Consumer_->OnObject(&item);
  434. }
  435. }
  436. NYql::NUdf::IBoxedValue* TPushStreamWorker::GetPushStream() const {
  437. auto& ctx = Graph_.ComputationGraph_->GetContext();
  438. NUdf::TUnboxedValue pushStream = SelfNode_->GetValue(ctx);
  439. if (Y_UNLIKELY(pushStream.IsInvalid())) {
  440. SelfNode_->SetValue(ctx, Graph_.ComputationGraph_->GetHolderFactory().Create<TPushStream>());
  441. pushStream = SelfNode_->GetValue(ctx);
  442. }
  443. return pushStream.AsBoxed().Get();
  444. }
  445. void TPushStreamWorker::SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>> consumer) {
  446. auto guard = Guard(GetScopedAlloc());
  447. const auto inputsCount = Graph_.SelfNodes_.size();
  448. YQL_ENSURE(inputsCount < 2, "push stream mode doesn't support several inputs");
  449. YQL_ENSURE(!Consumer_, "consumer is already set");
  450. Consumer_ = std::move(consumer);
  451. if (inputsCount == 1) {
  452. SelfNode_ = Graph_.SelfNodes_[0];
  453. }
  454. if (SelfNode_) {
  455. SelfNode_->SetValue(
  456. Graph_.ComputationGraph_->GetContext(),
  457. Graph_.ComputationGraph_->GetHolderFactory().Create<TPushStream>());
  458. }
  459. FeedToConsumer();
  460. }
  461. void TPushStreamWorker::Push(NKikimr::NUdf::TUnboxedValue&& value) {
  462. YQL_ENSURE(Consumer_, "consumer is not set");
  463. YQL_ENSURE(!Finished_, "OnFinish has already been sent to the consumer; no new values can be pushed");
  464. if (Y_LIKELY(SelfNode_)) {
  465. static_cast<TPushStream*>(GetPushStream())->SetValue(std::move(value));
  466. }
  467. FeedToConsumer();
  468. }
  469. void TPushStreamWorker::OnFinish() {
  470. YQL_ENSURE(Consumer_, "consumer is not set");
  471. YQL_ENSURE(!Finished_, "already finished");
  472. if (Y_LIKELY(SelfNode_)) {
  473. static_cast<TPushStream*>(GetPushStream())->SetFinished();
  474. }
  475. FeedToConsumer();
  476. Consumer_->OnFinish();
  477. Finished_ = true;
  478. }
  479. void TPushStreamWorker::Release() {
  480. with_lock(GetScopedAlloc()) {
  481. Consumer_.Destroy();
  482. if (SelfNode_) {
  483. SelfNode_->SetValue(Graph_.ComputationGraph_->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid());
  484. }
  485. SelfNode_ = nullptr;
  486. }
  487. Finished_ = false;
  488. TWorker<IPushStreamWorker>::Release();
  489. }
  490. namespace NYql {
  491. namespace NPureCalc {
  492. template
  493. class TWorker<IPullStreamWorker>;
  494. template
  495. class TWorker<IPullListWorker>;
  496. template
  497. class TWorker<IPushStreamWorker>;
  498. }
  499. }