worker.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. #include "worker.h"
  2. #include "compile_mkql.h"
  3. #include <yql/essentials/ast/yql_expr.h>
  4. #include <yql/essentials/core/yql_user_data.h>
  5. #include <yql/essentials/core/yql_user_data_storage.h>
  6. #include <yql/essentials/providers/common/comp_nodes/yql_factory.h>
  7. #include <yql/essentials/public/purecalc/common/names.h>
  8. #include <yql/essentials/minikql/mkql_function_registry.h>
  9. #include <yql/essentials/minikql/mkql_node.h>
  10. #include <yql/essentials/minikql/mkql_node_builder.h>
  11. #include <yql/essentials/minikql/mkql_node_cast.h>
  12. #include <yql/essentials/minikql/mkql_node_visitor.h>
  13. #include <yql/essentials/minikql/mkql_node_serialization.h>
  14. #include <yql/essentials/minikql/mkql_program_builder.h>
  15. #include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
  16. #include <yql/essentials/minikql/computation/mkql_computation_node.h>
  17. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  18. #include <yql/essentials/minikql/computation/mkql_computation_node_impl.h>
  19. #include <yql/essentials/providers/common/mkql/yql_provider_mkql.h>
  20. #include <yql/essentials/providers/common/mkql/yql_type_mkql.h>
  21. #include <library/cpp/random_provider/random_provider.h>
  22. #include <library/cpp/time_provider/time_provider.h>
  23. #include <util/stream/file.h>
  24. #include <yql/essentials/minikql/computation/mkql_custom_list.h>
  25. #include <yql/essentials/parser/pg_wrapper/interface/comp_factory.h>
  26. using namespace NYql;
  27. using namespace NYql::NPureCalc;
  28. TWorkerGraph::TWorkerGraph(
  29. const TExprNode::TPtr& exprRoot,
  30. TExprContext& exprCtx,
  31. const TString& serializedProgram,
  32. const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry,
  33. const TUserDataTable& userData,
  34. const TVector<const TStructExprType*>& inputTypes,
  35. const TVector<const TStructExprType*>& originalInputTypes,
  36. const TVector<const TStructExprType*>& rawInputTypes,
  37. const TTypeAnnotationNode* outputType,
  38. const TTypeAnnotationNode* rawOutputType,
  39. const TString& LLVMSettings,
  40. NKikimr::NUdf::ICountersProvider* countersProvider,
  41. ui64 nativeYtTypeFlags,
  42. TMaybe<ui64> deterministicTimeProviderSeed
  43. )
  44. : ScopedAlloc_(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators())
  45. , Env_(ScopedAlloc_)
  46. , FuncRegistry_(funcRegistry)
  47. , RandomProvider_(CreateDefaultRandomProvider())
  48. , TimeProvider_(deterministicTimeProviderSeed ?
  49. CreateDeterministicTimeProvider(*deterministicTimeProviderSeed) :
  50. CreateDefaultTimeProvider())
  51. , LLVMSettings_(LLVMSettings)
  52. , NativeYtTypeFlags_(nativeYtTypeFlags)
  53. {
  54. // Build the root MKQL node
  55. NCommon::TMemoizedTypesMap typeMemoization;
  56. NKikimr::NMiniKQL::TRuntimeNode rootNode;
  57. if (exprRoot) {
  58. rootNode = CompileMkql(exprRoot, exprCtx, FuncRegistry_, Env_, userData, &typeMemoization);
  59. } else {
  60. rootNode = NKikimr::NMiniKQL::DeserializeRuntimeNode(serializedProgram, Env_);
  61. }
  62. // Prepare container for input nodes
  63. const ui32 inputsCount = inputTypes.size();
  64. YQL_ENSURE(inputTypes.size() == originalInputTypes.size());
  65. SelfNodes_.resize(inputsCount, nullptr);
  66. YQL_ENSURE(SelfNodes_.size() == inputsCount);
  67. // Setup struct types
  68. NKikimr::NMiniKQL::TProgramBuilder pgmBuilder(Env_, FuncRegistry_);
  69. for (ui32 i = 0; i < inputsCount; ++i) {
  70. const auto* type = static_cast<NKikimr::NMiniKQL::TStructType*>(NCommon::BuildType(TPositionHandle(), *inputTypes[i], pgmBuilder, typeMemoization));
  71. const auto* originalType = type;
  72. const auto* rawType = static_cast<NKikimr::NMiniKQL::TStructType*>(NCommon::BuildType(TPositionHandle(), *rawInputTypes[i], pgmBuilder, typeMemoization));
  73. if (inputTypes[i] != originalInputTypes[i]) {
  74. YQL_ENSURE(inputTypes[i]->GetSize() >= originalInputTypes[i]->GetSize());
  75. originalType = static_cast<NKikimr::NMiniKQL::TStructType*>(NCommon::BuildType(TPositionHandle(), *originalInputTypes[i], pgmBuilder, typeMemoization));
  76. }
  77. InputTypes_.push_back(type);
  78. OriginalInputTypes_.push_back(originalType);
  79. RawInputTypes_.push_back(rawType);
  80. }
  81. if (outputType) {
  82. OutputType_ = NCommon::BuildType(TPositionHandle(), *outputType, pgmBuilder, typeMemoization);
  83. }
  84. if (rawOutputType) {
  85. RawOutputType_ = NCommon::BuildType(TPositionHandle(), *rawOutputType, pgmBuilder, typeMemoization);
  86. }
  87. if (!exprRoot) {
  88. auto outMkqlType = rootNode.GetStaticType();
  89. if (outMkqlType->GetKind() == NKikimr::NMiniKQL::TType::EKind::List) {
  90. outMkqlType = static_cast<NKikimr::NMiniKQL::TListType*>(outMkqlType)->GetItemType();
  91. } else if (outMkqlType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Stream) {
  92. outMkqlType = static_cast<NKikimr::NMiniKQL::TStreamType*>(outMkqlType)->GetItemType();
  93. } else {
  94. ythrow TCompileError("", "") << "unexpected mkql output type " << NKikimr::NMiniKQL::TType::KindAsStr(outMkqlType->GetKind());
  95. }
  96. if (OutputType_) {
  97. if (!OutputType_->IsSameType(*outMkqlType)) {
  98. ythrow TCompileError("", "") << "precompiled program output type doesn't match the output schema";
  99. }
  100. } else {
  101. OutputType_ = outMkqlType;
  102. RawOutputType_ = outMkqlType;
  103. }
  104. }
  105. // Compile computation pattern
  106. const THashSet<NKikimr::NMiniKQL::TInternName> selfCallableNames = {
  107. Env_.InternName(PurecalcInputCallableName),
  108. Env_.InternName(PurecalcBlockInputCallableName)
  109. };
  110. NKikimr::NMiniKQL::TExploringNodeVisitor explorer;
  111. explorer.Walk(rootNode.GetNode(), Env_);
  112. auto compositeNodeFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory(
  113. {NKikimr::NMiniKQL::GetYqlFactory(), NYql::GetPgFactory()}
  114. );
  115. auto nodeFactory = [&](
  116. NKikimr::NMiniKQL::TCallable& callable, const NKikimr::NMiniKQL::TComputationNodeFactoryContext& ctx
  117. ) -> NKikimr::NMiniKQL::IComputationNode* {
  118. if (selfCallableNames.contains(callable.GetType()->GetNameStr())) {
  119. YQL_ENSURE(callable.GetInputsCount() == 1, "Self takes exactly 1 argument");
  120. const auto inputIndex = AS_VALUE(NKikimr::NMiniKQL::TDataLiteral, callable.GetInput(0))->AsValue().Get<ui32>();
  121. YQL_ENSURE(inputIndex < inputsCount, "Self index is out of range");
  122. YQL_ENSURE(!SelfNodes_[inputIndex], "Self can be called at most once with each index");
  123. return SelfNodes_[inputIndex] = new NKikimr::NMiniKQL::TExternalComputationNode(ctx.Mutables);
  124. }
  125. else {
  126. return compositeNodeFactory(callable, ctx);
  127. }
  128. };
  129. NKikimr::NMiniKQL::TComputationPatternOpts computationPatternOpts(
  130. ScopedAlloc_.Ref(),
  131. Env_,
  132. nodeFactory,
  133. &funcRegistry,
  134. NKikimr::NUdf::EValidateMode::None,
  135. NKikimr::NUdf::EValidatePolicy::Exception,
  136. LLVMSettings,
  137. NKikimr::NMiniKQL::EGraphPerProcess::Multi,
  138. nullptr,
  139. countersProvider);
  140. ComputationPattern_ = NKikimr::NMiniKQL::MakeComputationPattern(
  141. explorer,
  142. rootNode,
  143. { rootNode.GetNode() },
  144. computationPatternOpts);
  145. ComputationGraph_ = ComputationPattern_->Clone(
  146. computationPatternOpts.ToComputationOptions(*RandomProvider_, *TimeProvider_));
  147. ComputationGraph_->Prepare();
  148. // Scoped alloc acquires itself on construction. We need to release it before returning control to user.
  149. // Note that scoped alloc releases itself on destruction so it is no problem if the above code throws.
  150. ScopedAlloc_.Release();
  151. }
  152. TWorkerGraph::~TWorkerGraph() {
  153. // Remember, we've released scoped alloc in constructor? Now, we need to acquire it back before destroying.
  154. ScopedAlloc_.Acquire();
  155. }
  156. template <typename TBase>
  157. TWorker<TBase>::TWorker(
  158. TWorkerFactoryPtr factory,
  159. const TExprNode::TPtr& exprRoot,
  160. TExprContext& exprCtx,
  161. const TString& serializedProgram,
  162. const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry,
  163. const TUserDataTable& userData,
  164. const TVector<const TStructExprType*>& inputTypes,
  165. const TVector<const TStructExprType*>& originalInputTypes,
  166. const TVector<const TStructExprType*>& rawInputTypes,
  167. const TTypeAnnotationNode* outputType,
  168. const TTypeAnnotationNode* rawOutputType,
  169. const TString& LLVMSettings,
  170. NKikimr::NUdf::ICountersProvider* countersProvider,
  171. ui64 nativeYtTypeFlags,
  172. TMaybe<ui64> deterministicTimeProviderSeed
  173. )
  174. : WorkerFactory_(std::move(factory))
  175. , Graph_(exprRoot, exprCtx, serializedProgram, funcRegistry, userData,
  176. inputTypes, originalInputTypes, rawInputTypes, outputType, rawOutputType,
  177. LLVMSettings, countersProvider, nativeYtTypeFlags, deterministicTimeProviderSeed)
  178. {
  179. }
  180. template <typename TBase>
  181. inline ui32 TWorker<TBase>::GetInputsCount() const {
  182. return Graph_.InputTypes_.size();
  183. }
  184. template <typename TBase>
  185. inline const NKikimr::NMiniKQL::TStructType* TWorker<TBase>::GetInputType(ui32 inputIndex, bool original) const {
  186. const auto& container = original ? Graph_.OriginalInputTypes_ : Graph_.InputTypes_;
  187. YQL_ENSURE(inputIndex < container.size(), "invalid input index (" << inputIndex << ") in GetInputType call");
  188. return container[inputIndex];
  189. }
  190. template <typename TBase>
  191. inline const NKikimr::NMiniKQL::TStructType* TWorker<TBase>::GetInputType(bool original) const {
  192. const auto& container = original ? Graph_.OriginalInputTypes_ : Graph_.InputTypes_;
  193. YQL_ENSURE(container.size() == 1, "GetInputType() can be used only for single-input programs");
  194. return container[0];
  195. }
  196. template <typename TBase>
  197. inline const NKikimr::NMiniKQL::TStructType* TWorker<TBase>::GetRawInputType(ui32 inputIndex) const {
  198. const auto& container = Graph_.RawInputTypes_;
  199. YQL_ENSURE(inputIndex < container.size(), "invalid input index (" << inputIndex << ") in GetInputType call");
  200. return container[inputIndex];
  201. }
  202. template <typename TBase>
  203. inline const NKikimr::NMiniKQL::TStructType* TWorker<TBase>::GetRawInputType() const {
  204. const auto& container = Graph_.RawInputTypes_;
  205. YQL_ENSURE(container.size() == 1, "GetInputType() can be used only for single-input programs");
  206. return container[0];
  207. }
  208. template <typename TBase>
  209. inline const NKikimr::NMiniKQL::TType* TWorker<TBase>::GetOutputType() const {
  210. return Graph_.OutputType_;
  211. }
  212. template <typename TBase>
  213. inline const NKikimr::NMiniKQL::TType* TWorker<TBase>::GetRawOutputType() const {
  214. return Graph_.RawOutputType_;
  215. }
  216. template <typename TBase>
  217. NYT::TNode TWorker<TBase>::MakeInputSchema(ui32 inputIndex) const {
  218. auto p = WorkerFactory_.lock();
  219. YQL_ENSURE(p, "Access to destroyed worker factory");
  220. return p->MakeInputSchema(inputIndex);
  221. }
  222. template <typename TBase>
  223. NYT::TNode TWorker<TBase>::MakeInputSchema() const {
  224. auto p = WorkerFactory_.lock();
  225. YQL_ENSURE(p, "Access to destroyed worker factory");
  226. return p->MakeInputSchema();
  227. }
  228. template <typename TBase>
  229. NYT::TNode TWorker<TBase>::MakeOutputSchema() const {
  230. auto p = WorkerFactory_.lock();
  231. YQL_ENSURE(p, "Access to destroyed worker factory");
  232. return p->MakeOutputSchema();
  233. }
  234. template <typename TBase>
  235. NYT::TNode TWorker<TBase>::MakeOutputSchema(ui32) const {
  236. auto p = WorkerFactory_.lock();
  237. YQL_ENSURE(p, "Access to destroyed worker factory");
  238. return p->MakeOutputSchema();
  239. }
  240. template <typename TBase>
  241. NYT::TNode TWorker<TBase>::MakeOutputSchema(TStringBuf) const {
  242. auto p = WorkerFactory_.lock();
  243. YQL_ENSURE(p, "Access to destroyed worker factory");
  244. return p->MakeOutputSchema();
  245. }
  246. template <typename TBase>
  247. NYT::TNode TWorker<TBase>::MakeFullOutputSchema() const {
  248. auto p = WorkerFactory_.lock();
  249. YQL_ENSURE(p, "Access to destroyed worker factory");
  250. return p->MakeFullOutputSchema();
  251. }
  252. template <typename TBase>
  253. inline NKikimr::NMiniKQL::TScopedAlloc& TWorker<TBase>::GetScopedAlloc() {
  254. return Graph_.ScopedAlloc_;
  255. }
  256. template <typename TBase>
  257. inline NKikimr::NMiniKQL::IComputationGraph& TWorker<TBase>::GetGraph() {
  258. return *Graph_.ComputationGraph_;
  259. }
  260. template <typename TBase>
  261. inline const NKikimr::NMiniKQL::IFunctionRegistry&
  262. TWorker<TBase>::GetFunctionRegistry() const {
  263. return Graph_.FuncRegistry_;
  264. }
  265. template <typename TBase>
  266. inline NKikimr::NMiniKQL::TTypeEnvironment&
  267. TWorker<TBase>::GetTypeEnvironment() {
  268. return Graph_.Env_;
  269. }
  270. template <typename TBase>
  271. inline const TString& TWorker<TBase>::GetLLVMSettings() const {
  272. return Graph_.LLVMSettings_;
  273. }
  274. template <typename TBase>
  275. inline ui64 TWorker<TBase>::GetNativeYtTypeFlags() const {
  276. return Graph_.NativeYtTypeFlags_;
  277. }
  278. template <typename TBase>
  279. ITimeProvider* TWorker<TBase>::GetTimeProvider() const {
  280. return Graph_.TimeProvider_.Get();
  281. }
  282. template <typename TBase>
  283. void TWorker<TBase>::Release() {
  284. if (auto p = WorkerFactory_.lock()) {
  285. p->ReturnWorker(this);
  286. } else {
  287. delete this;
  288. }
  289. }
  290. template <typename TBase>
  291. void TWorker<TBase>::Invalidate() {
  292. auto& ctx = Graph_.ComputationGraph_->GetContext();
  293. for (const auto* selfNode : Graph_.SelfNodes_) {
  294. if (selfNode) {
  295. selfNode->InvalidateValue(ctx);
  296. }
  297. }
  298. Graph_.ComputationGraph_->InvalidateCaches();
  299. }
  300. TPullStreamWorker::~TPullStreamWorker() {
  301. auto guard = Guard(GetScopedAlloc());
  302. Output_.Clear();
  303. }
  304. void TPullStreamWorker::SetInput(NKikimr::NUdf::TUnboxedValue&& value, ui32 inputIndex) {
  305. const auto inputsCount = Graph_.SelfNodes_.size();
  306. if (Y_UNLIKELY(inputIndex >= inputsCount)) {
  307. ythrow yexception() << "invalid input index (" << inputIndex << ") in SetInput call";
  308. }
  309. if (HasInput_.size() < inputsCount) {
  310. HasInput_.resize(inputsCount, false);
  311. }
  312. if (Y_UNLIKELY(HasInput_[inputIndex])) {
  313. ythrow yexception() << "input value for #" << inputIndex << " input is already set";
  314. }
  315. auto selfNode = Graph_.SelfNodes_[inputIndex];
  316. if (selfNode) {
  317. YQL_ENSURE(value);
  318. selfNode->SetValue(Graph_.ComputationGraph_->GetContext(), std::move(value));
  319. }
  320. HasInput_[inputIndex] = true;
  321. if (CheckAllInputsSet()) {
  322. Output_ = Graph_.ComputationGraph_->GetValue();
  323. }
  324. }
  325. NKikimr::NUdf::TUnboxedValue& TPullStreamWorker::GetOutput() {
  326. if (Y_UNLIKELY(!CheckAllInputsSet())) {
  327. ythrow yexception() << "some input values have not been set";
  328. }
  329. return Output_;
  330. }
  331. void TPullStreamWorker::Release() {
  332. with_lock(GetScopedAlloc()) {
  333. Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  334. for (auto selfNode: Graph_.SelfNodes_) {
  335. if (selfNode) {
  336. selfNode->SetValue(Graph_.ComputationGraph_->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid());
  337. }
  338. }
  339. }
  340. HasInput_.clear();
  341. TWorker<IPullStreamWorker>::Release();
  342. }
  343. TPullListWorker::~TPullListWorker() {
  344. auto guard = Guard(GetScopedAlloc());
  345. Output_.Clear();
  346. OutputIterator_.Clear();
  347. }
  348. void TPullListWorker::SetInput(NKikimr::NUdf::TUnboxedValue&& value, ui32 inputIndex) {
  349. const auto inputsCount = Graph_.SelfNodes_.size();
  350. if (Y_UNLIKELY(inputIndex >= inputsCount)) {
  351. ythrow yexception() << "invalid input index (" << inputIndex << ") in SetInput call";
  352. }
  353. if (HasInput_.size() < inputsCount) {
  354. HasInput_.resize(inputsCount, false);
  355. }
  356. if (Y_UNLIKELY(HasInput_[inputIndex])) {
  357. ythrow yexception() << "input value for #" << inputIndex << " input is already set";
  358. }
  359. auto selfNode = Graph_.SelfNodes_[inputIndex];
  360. if (selfNode) {
  361. YQL_ENSURE(value);
  362. selfNode->SetValue(Graph_.ComputationGraph_->GetContext(), std::move(value));
  363. }
  364. HasInput_[inputIndex] = true;
  365. if (CheckAllInputsSet()) {
  366. Output_ = Graph_.ComputationGraph_->GetValue();
  367. ResetOutputIterator();
  368. }
  369. }
  370. NKikimr::NUdf::TUnboxedValue& TPullListWorker::GetOutput() {
  371. if (Y_UNLIKELY(!CheckAllInputsSet())) {
  372. ythrow yexception() << "some input values have not been set";
  373. }
  374. return Output_;
  375. }
  376. NKikimr::NUdf::TUnboxedValue& TPullListWorker::GetOutputIterator() {
  377. if (Y_UNLIKELY(!CheckAllInputsSet())) {
  378. ythrow yexception() << "some input values have not been set";
  379. }
  380. return OutputIterator_;
  381. }
  382. void TPullListWorker::ResetOutputIterator() {
  383. if (Y_UNLIKELY(!CheckAllInputsSet())) {
  384. ythrow yexception() << "some input values have not been set";
  385. }
  386. OutputIterator_ = Output_.GetListIterator();
  387. }
  388. void TPullListWorker::Release() {
  389. with_lock(GetScopedAlloc()) {
  390. Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  391. OutputIterator_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  392. for (auto selfNode: Graph_.SelfNodes_) {
  393. if (selfNode) {
  394. selfNode->SetValue(Graph_.ComputationGraph_->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid());
  395. }
  396. }
  397. }
  398. HasInput_.clear();
  399. TWorker<IPullListWorker>::Release();
  400. }
  401. namespace {
  402. class TPushStream final: public NKikimr::NMiniKQL::TCustomListValue {
  403. private:
  404. mutable bool HasIterator_ = false;
  405. bool HasValue_ = false;
  406. bool IsFinished_ = false;
  407. NKikimr::NUdf::TUnboxedValue Value_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  408. public:
  409. using TCustomListValue::TCustomListValue;
  410. public:
  411. void SetValue(NKikimr::NUdf::TUnboxedValue&& value) {
  412. Value_ = std::move(value);
  413. HasValue_ = true;
  414. }
  415. void SetFinished() {
  416. IsFinished_ = true;
  417. }
  418. NKikimr::NUdf::TUnboxedValue GetListIterator() const override {
  419. YQL_ENSURE(!HasIterator_, "only one pass over input is supported");
  420. HasIterator_ = true;
  421. return NKikimr::NUdf::TUnboxedValuePod(const_cast<TPushStream*>(this));
  422. }
  423. NKikimr::NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) override {
  424. if (IsFinished_) {
  425. return NKikimr::NUdf::EFetchStatus::Finish;
  426. } else if (!HasValue_) {
  427. return NKikimr::NUdf::EFetchStatus::Yield;
  428. } else {
  429. result = std::move(Value_);
  430. HasValue_ = false;
  431. return NKikimr::NUdf::EFetchStatus::Ok;
  432. }
  433. }
  434. };
  435. }
  436. void TPushStreamWorker::FeedToConsumer() {
  437. auto value = Graph_.ComputationGraph_->GetValue();
  438. for (;;) {
  439. NKikimr::NUdf::TUnboxedValue item;
  440. auto status = value.Fetch(item);
  441. if (status != NKikimr::NUdf::EFetchStatus::Ok) {
  442. break;
  443. }
  444. Consumer_->OnObject(&item);
  445. }
  446. }
  447. NYql::NUdf::IBoxedValue* TPushStreamWorker::GetPushStream() const {
  448. auto& ctx = Graph_.ComputationGraph_->GetContext();
  449. NUdf::TUnboxedValue pushStream = SelfNode_->GetValue(ctx);
  450. if (Y_UNLIKELY(pushStream.IsInvalid())) {
  451. SelfNode_->SetValue(ctx, Graph_.ComputationGraph_->GetHolderFactory().Create<TPushStream>());
  452. pushStream = SelfNode_->GetValue(ctx);
  453. }
  454. return pushStream.AsBoxed().Get();
  455. }
  456. void TPushStreamWorker::SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>> consumer) {
  457. auto guard = Guard(GetScopedAlloc());
  458. const auto inputsCount = Graph_.SelfNodes_.size();
  459. YQL_ENSURE(inputsCount < 2, "push stream mode doesn't support several inputs");
  460. YQL_ENSURE(!Consumer_, "consumer is already set");
  461. Consumer_ = std::move(consumer);
  462. if (inputsCount == 1) {
  463. SelfNode_ = Graph_.SelfNodes_[0];
  464. }
  465. if (SelfNode_) {
  466. SelfNode_->SetValue(
  467. Graph_.ComputationGraph_->GetContext(),
  468. Graph_.ComputationGraph_->GetHolderFactory().Create<TPushStream>());
  469. }
  470. FeedToConsumer();
  471. }
  472. void TPushStreamWorker::Push(NKikimr::NUdf::TUnboxedValue&& value) {
  473. YQL_ENSURE(Consumer_, "consumer is not set");
  474. YQL_ENSURE(!Finished_, "OnFinish has already been sent to the consumer; no new values can be pushed");
  475. if (Y_LIKELY(SelfNode_)) {
  476. static_cast<TPushStream*>(GetPushStream())->SetValue(std::move(value));
  477. }
  478. FeedToConsumer();
  479. }
  480. void TPushStreamWorker::OnFinish() {
  481. YQL_ENSURE(Consumer_, "consumer is not set");
  482. YQL_ENSURE(!Finished_, "already finished");
  483. if (Y_LIKELY(SelfNode_)) {
  484. static_cast<TPushStream*>(GetPushStream())->SetFinished();
  485. }
  486. FeedToConsumer();
  487. Consumer_->OnFinish();
  488. Finished_ = true;
  489. }
  490. void TPushStreamWorker::Release() {
  491. with_lock(GetScopedAlloc()) {
  492. Consumer_.Destroy();
  493. if (SelfNode_) {
  494. SelfNode_->SetValue(Graph_.ComputationGraph_->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid());
  495. }
  496. SelfNode_ = nullptr;
  497. }
  498. Finished_ = false;
  499. TWorker<IPushStreamWorker>::Release();
  500. }
  501. namespace NYql {
  502. namespace NPureCalc {
  503. template
  504. class TWorker<IPullStreamWorker>;
  505. template
  506. class TWorker<IPullListWorker>;
  507. template
  508. class TWorker<IPushStreamWorker>;
  509. }
  510. }