spec.cpp 18 KB


  1. #include "spec.h"
  2. #include <yql/essentials/public/purecalc/common/names.h>
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  4. #include <yql/essentials/minikql/computation/mkql_custom_list.h>
  5. #include <yql/essentials/minikql/mkql_node_cast.h>
  6. #include <yql/essentials/public/udf/arrow/udf_arrow_helpers.h>
  7. #include <yql/essentials/utils/yql_panic.h>
  8. using namespace NYql::NPureCalc;
  9. using namespace NKikimr::NUdf;
  10. using namespace NKikimr::NMiniKQL;
  11. using IArrowIStream = typename TInputSpecTraits<TArrowInputSpec>::IInputStream;
  12. using InputItemType = typename TInputSpecTraits<TArrowInputSpec>::TInputItemType;
  13. using OutputItemType = typename TOutputSpecTraits<TArrowOutputSpec>::TOutputItemType;
  14. using PullListReturnType = typename TOutputSpecTraits<TArrowOutputSpec>::TPullListReturnType;
  15. using PullStreamReturnType = typename TOutputSpecTraits<TArrowOutputSpec>::TPullStreamReturnType;
  16. using ConsumerType = typename TInputSpecTraits<TArrowInputSpec>::TConsumerType;
  17. namespace {
  18. template <typename T>
  19. inline TVector<THolder<T>> VectorFromHolder(THolder<T> holder) {
  20. TVector<THolder<T>> result;
  21. result.push_back(std::move(holder));
  22. return result;
  23. }
  24. class TArrowIStreamImpl : public IArrowIStream {
  25. private:
  26. IArrowIStream* Underlying_;
  27. // If we own Underlying_, than Owned_ == Underlying_;
  28. // otherwise Owned_ is nullptr.
  29. THolder<IArrowIStream> Owned_;
  30. TArrowIStreamImpl(IArrowIStream* underlying, THolder<IArrowIStream> owned)
  31. : Underlying_(underlying)
  32. , Owned_(std::move(owned))
  33. {
  34. }
  35. public:
  36. TArrowIStreamImpl(THolder<IArrowIStream> stream)
  37. : TArrowIStreamImpl(stream.Get(), nullptr)
  38. {
  39. Owned_ = std::move(stream);
  40. }
  41. TArrowIStreamImpl(IArrowIStream* stream)
  42. : TArrowIStreamImpl(stream, nullptr)
  43. {
  44. }
  45. InputItemType Fetch() {
  46. return Underlying_->Fetch();
  47. }
  48. };
  49. /**
  50. * Converts input Datums to unboxed values.
  51. */
  52. class TArrowInputConverter {
  53. protected:
  54. const THolderFactory& Factory_;
  55. TVector<ui32> DatumToMemberIDMap_;
  56. size_t BatchLengthID_;
  57. public:
  58. explicit TArrowInputConverter(
  59. const TArrowInputSpec& inputSpec,
  60. ui32 index,
  61. IWorker* worker
  62. )
  63. : Factory_(worker->GetGraph().GetHolderFactory())
  64. {
  65. const NYT::TNode& inputSchema = inputSpec.GetSchema(index);
  66. // Deduce the schema from the input MKQL type, if no is
  67. // provided by <inputSpec>.
  68. const NYT::TNode& schema = inputSchema.IsEntity()
  69. ? worker->MakeInputSchema(index)
  70. : inputSchema;
  71. const auto* type = worker->GetRawInputType(index);
  72. Y_ENSURE(type->IsStruct());
  73. Y_ENSURE(schema.ChildAsString(0) == "StructType");
  74. const auto& members = schema.ChildAsList(1);
  75. DatumToMemberIDMap_.resize(members.size());
  76. for (size_t i = 0; i < DatumToMemberIDMap_.size(); i++) {
  77. const auto& name = members[i].ChildAsString(0);
  78. const auto& memberIndex = type->FindMemberIndex(name);
  79. Y_ENSURE(memberIndex);
  80. DatumToMemberIDMap_[i] = *memberIndex;
  81. }
  82. const auto& batchLengthID = type->FindMemberIndex(PurecalcBlockColumnLength);
  83. Y_ENSURE(batchLengthID);
  84. BatchLengthID_ = *batchLengthID;
  85. }
  86. void DoConvert(arrow::compute::ExecBatch* batch, TUnboxedValue& result) {
  87. size_t nvalues = DatumToMemberIDMap_.size();
  88. Y_ENSURE(nvalues == static_cast<size_t>(batch->num_values()));
  89. TUnboxedValue* datums = nullptr;
  90. result = Factory_.CreateDirectArrayHolder(nvalues + 1, datums);
  91. for (size_t i = 0; i < nvalues; i++) {
  92. const ui32 id = DatumToMemberIDMap_[i];
  93. datums[id] = Factory_.CreateArrowBlock(std::move(batch->values[i]));
  94. }
  95. arrow::Datum length(std::make_shared<arrow::UInt64Scalar>(batch->length));
  96. datums[BatchLengthID_] = Factory_.CreateArrowBlock(std::move(length));
  97. }
  98. };
  99. /**
  100. * Converts unboxed values to output Datums (single-output program case).
  101. */
  102. class TArrowOutputConverter {
  103. protected:
  104. const THolderFactory& Factory_;
  105. TVector<ui32> DatumToMemberIDMap_;
  106. THolder<arrow::compute::ExecBatch> Batch_;
  107. size_t BatchLengthID_;
  108. public:
  109. explicit TArrowOutputConverter(
  110. const TArrowOutputSpec& outputSpec,
  111. IWorker* worker
  112. )
  113. : Factory_(worker->GetGraph().GetHolderFactory())
  114. {
  115. Batch_.Reset(new arrow::compute::ExecBatch);
  116. const NYT::TNode& outputSchema = outputSpec.GetSchema();
  117. // Deduce the schema from the output MKQL type, if no is
  118. // provided by <outputSpec>.
  119. const NYT::TNode& schema = outputSchema.IsEntity()
  120. ? worker->MakeOutputSchema()
  121. : outputSchema;
  122. const auto* type = worker->GetRawOutputType();
  123. Y_ENSURE(type->IsStruct());
  124. Y_ENSURE(schema.ChildAsString(0) == "StructType");
  125. const auto* stype = AS_TYPE(NKikimr::NMiniKQL::TStructType, type);
  126. const auto& members = schema.ChildAsList(1);
  127. DatumToMemberIDMap_.resize(members.size());
  128. for (size_t i = 0; i < DatumToMemberIDMap_.size(); i++) {
  129. const auto& name = members[i].ChildAsString(0);
  130. const auto& memberIndex = stype->FindMemberIndex(name);
  131. Y_ENSURE(memberIndex);
  132. DatumToMemberIDMap_[i] = *memberIndex;
  133. }
  134. const auto& batchLengthID = stype->FindMemberIndex(PurecalcBlockColumnLength);
  135. Y_ENSURE(batchLengthID);
  136. BatchLengthID_ = *batchLengthID;
  137. }
  138. OutputItemType DoConvert(TUnboxedValue value) {
  139. OutputItemType batch = Batch_.Get();
  140. size_t nvalues = DatumToMemberIDMap_.size();
  141. const auto& sizeValue = value.GetElement(BatchLengthID_);
  142. const auto& sizeDatum = TArrowBlock::From(sizeValue).GetDatum();
  143. Y_ENSURE(sizeDatum.is_scalar());
  144. const auto& sizeScalar = sizeDatum.scalar();
  145. const auto& sizeData = arrow::internal::checked_cast<const arrow::UInt64Scalar&>(*sizeScalar);
  146. const int64_t length = sizeData.value;
  147. TVector<arrow::Datum> datums(nvalues);
  148. for (size_t i = 0; i < nvalues; i++) {
  149. const ui32 id = DatumToMemberIDMap_[i];
  150. const auto& datumValue = value.GetElement(id);
  151. const auto& datum = TArrowBlock::From(datumValue).GetDatum();
  152. datums[i] = datum;
  153. if (datum.is_scalar()) {
  154. continue;
  155. }
  156. Y_ENSURE(datum.length() == length);
  157. }
  158. *batch = arrow::compute::ExecBatch(std::move(datums), length);
  159. return batch;
  160. }
  161. };
  162. /**
  163. * List (or, better, stream) of unboxed values.
  164. * Used as an input value in pull workers.
  165. */
  166. class TArrowListValue final: public TCustomListValue {
  167. private:
  168. mutable bool HasIterator_ = false;
  169. THolder<IArrowIStream> Underlying_;
  170. IWorker* Worker_;
  171. TArrowInputConverter Converter_;
  172. TScopedAlloc& ScopedAlloc_;
  173. public:
  174. TArrowListValue(
  175. TMemoryUsageInfo* memInfo,
  176. const TArrowInputSpec& inputSpec,
  177. ui32 index,
  178. THolder<IArrowIStream> underlying,
  179. IWorker* worker
  180. )
  181. : TCustomListValue(memInfo)
  182. , Underlying_(std::move(underlying))
  183. , Worker_(worker)
  184. , Converter_(inputSpec, index, Worker_)
  185. , ScopedAlloc_(Worker_->GetScopedAlloc())
  186. {
  187. }
  188. ~TArrowListValue() override {
  189. {
  190. // This list value stored in the worker's computation graph and
  191. // destroyed upon the computation graph's destruction. This brings
  192. // us to an interesting situation: scoped alloc is acquired, worker
  193. // and computation graph are half-way destroyed, and now it's our
  194. // turn to die. The problem is, the underlying stream may own
  195. // another worker. This happens when chaining programs. Now, to
  196. // destroy that worker correctly, we need to release our scoped
  197. // alloc (because that worker has its own computation graph and
  198. // scoped alloc).
  199. // By the way, note that we shouldn't interact with the worker here
  200. // because worker is in the middle of its own destruction. So we're
  201. // using our own reference to the scoped alloc. That reference is
  202. // alive because scoped alloc destroyed after computation graph.
  203. auto unguard = Unguard(ScopedAlloc_);
  204. Underlying_.Destroy();
  205. }
  206. }
  207. TUnboxedValue GetListIterator() const override {
  208. YQL_ENSURE(!HasIterator_, "Only one pass over input is supported");
  209. HasIterator_ = true;
  210. return TUnboxedValuePod(const_cast<TArrowListValue*>(this));
  211. }
  212. bool Next(TUnboxedValue& result) override {
  213. arrow::compute::ExecBatch* batch;
  214. {
  215. auto unguard = Unguard(ScopedAlloc_);
  216. batch = Underlying_->Fetch();
  217. }
  218. if (!batch) {
  219. return false;
  220. }
  221. Converter_.DoConvert(batch, result);
  222. return true;
  223. }
  224. EFetchStatus Fetch(TUnboxedValue& result) override {
  225. if (Next(result)) {
  226. return EFetchStatus::Ok;
  227. } else {
  228. return EFetchStatus::Finish;
  229. }
  230. }
  231. };
  232. /**
  233. * Arrow input stream for unboxed value lists.
  234. */
  235. class TArrowListImpl final: public IStream<OutputItemType> {
  236. protected:
  237. TWorkerHolder<IPullListWorker> WorkerHolder_;
  238. TArrowOutputConverter Converter_;
  239. public:
  240. explicit TArrowListImpl(
  241. const TArrowOutputSpec& outputSpec,
  242. TWorkerHolder<IPullListWorker> worker
  243. )
  244. : WorkerHolder_(std::move(worker))
  245. , Converter_(outputSpec, WorkerHolder_.Get())
  246. {
  247. }
  248. OutputItemType Fetch() override {
  249. TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator());
  250. with_lock(WorkerHolder_->GetScopedAlloc()) {
  251. TUnboxedValue value;
  252. if (!WorkerHolder_->GetOutputIterator().Next(value)) {
  253. return TOutputSpecTraits<TArrowOutputSpec>::StreamSentinel;
  254. }
  255. return Converter_.DoConvert(value);
  256. }
  257. }
  258. };
  259. /**
  260. * Arrow input stream for unboxed value streams.
  261. */
  262. class TArrowStreamImpl final: public IStream<OutputItemType> {
  263. protected:
  264. TWorkerHolder<IPullStreamWorker> WorkerHolder_;
  265. TArrowOutputConverter Converter_;
  266. public:
  267. explicit TArrowStreamImpl(const TArrowOutputSpec& outputSpec, TWorkerHolder<IPullStreamWorker> worker)
  268. : WorkerHolder_(std::move(worker))
  269. , Converter_(outputSpec, WorkerHolder_.Get())
  270. {
  271. }
  272. OutputItemType Fetch() override {
  273. TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator());
  274. with_lock(WorkerHolder_->GetScopedAlloc()) {
  275. TUnboxedValue value;
  276. auto status = WorkerHolder_->GetOutput().Fetch(value);
  277. YQL_ENSURE(status != EFetchStatus::Yield, "Yield is not supported in pull mode");
  278. if (status == EFetchStatus::Finish) {
  279. return TOutputSpecTraits<TArrowOutputSpec>::StreamSentinel;
  280. }
  281. return Converter_.DoConvert(value);
  282. }
  283. }
  284. };
  285. /**
  286. * Consumer which converts Datums to unboxed values and relays them to the
  287. * worker. Used as a return value of the push processor's Process function.
  288. */
  289. class TArrowConsumerImpl final: public IConsumer<arrow::compute::ExecBatch*> {
  290. private:
  291. TWorkerHolder<IPushStreamWorker> WorkerHolder_;
  292. TArrowInputConverter Converter_;
  293. public:
  294. explicit TArrowConsumerImpl(
  295. const TArrowInputSpec& inputSpec,
  296. TWorkerHolder<IPushStreamWorker> worker
  297. )
  298. : TArrowConsumerImpl(inputSpec, 0, std::move(worker))
  299. {
  300. }
  301. explicit TArrowConsumerImpl(
  302. const TArrowInputSpec& inputSpec,
  303. ui32 index,
  304. TWorkerHolder<IPushStreamWorker> worker
  305. )
  306. : WorkerHolder_(std::move(worker))
  307. , Converter_(inputSpec, index, WorkerHolder_.Get())
  308. {
  309. }
  310. void OnObject(arrow::compute::ExecBatch* batch) override {
  311. TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator());
  312. with_lock(WorkerHolder_->GetScopedAlloc()) {
  313. TUnboxedValue result;
  314. Converter_.DoConvert(batch, result);
  315. WorkerHolder_->Push(std::move(result));
  316. }
  317. }
  318. void OnFinish() override {
  319. TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator());
  320. with_lock(WorkerHolder_->GetScopedAlloc()) {
  321. WorkerHolder_->OnFinish();
  322. }
  323. }
  324. };
  325. /**
  326. * Push relay used to convert generated unboxed value to a Datum and push it to
  327. * the user's consumer.
  328. */
  329. class TArrowPushRelayImpl: public IConsumer<const TUnboxedValue*> {
  330. private:
  331. THolder<IConsumer<OutputItemType>> Underlying_;
  332. IWorker* Worker_;
  333. TArrowOutputConverter Converter_;
  334. public:
  335. TArrowPushRelayImpl(
  336. const TArrowOutputSpec& outputSpec,
  337. IPushStreamWorker* worker,
  338. THolder<IConsumer<OutputItemType>> underlying
  339. )
  340. : Underlying_(std::move(underlying))
  341. , Worker_(worker)
  342. , Converter_(outputSpec, Worker_)
  343. {
  344. }
  345. // XXX: If you've read a comment in the TArrowListValue's destructor, you
  346. // may be wondering why don't we do the same trick here. Well, that's
  347. // because in push mode, consumer is destroyed before acquiring scoped alloc
  348. // and destroying computation graph.
  349. void OnObject(const TUnboxedValue* value) override {
  350. OutputItemType message = Converter_.DoConvert(*value);
  351. auto unguard = Unguard(Worker_->GetScopedAlloc());
  352. Underlying_->OnObject(message);
  353. }
  354. void OnFinish() override {
  355. auto unguard = Unguard(Worker_->GetScopedAlloc());
  356. Underlying_->OnFinish();
  357. }
  358. };
  359. template <typename TWorker>
  360. void PrepareWorkerImpl(const TArrowInputSpec& inputSpec, TWorker* worker,
  361. TVector<THolder<TArrowIStreamImpl>>&& streams
  362. ) {
  363. YQL_ENSURE(worker->GetInputsCount() == streams.size(),
  364. "number of input streams should match number of inputs provided by spec");
  365. with_lock(worker->GetScopedAlloc()) {
  366. auto& holderFactory = worker->GetGraph().GetHolderFactory();
  367. for (ui32 i = 0; i < streams.size(); i++) {
  368. auto input = holderFactory.template Create<TArrowListValue>(
  369. inputSpec, i, std::move(streams[i]), worker);
  370. worker->SetInput(std::move(input), i);
  371. }
  372. }
  373. }
  374. } // namespace
  375. TArrowInputSpec::TArrowInputSpec(const TVector<NYT::TNode>& schemas)
  376. : Schemas_(schemas)
  377. {
  378. }
  379. const TVector<NYT::TNode>& TArrowInputSpec::GetSchemas() const {
  380. return Schemas_;
  381. }
  382. const NYT::TNode& TArrowInputSpec::GetSchema(ui32 index) const {
  383. return Schemas_[index];
  384. }
  385. void TInputSpecTraits<TArrowInputSpec>::PreparePullListWorker(
  386. const TArrowInputSpec& inputSpec, IPullListWorker* worker,
  387. IArrowIStream* stream
  388. ) {
  389. TInputSpecTraits<TArrowInputSpec>::PreparePullListWorker(
  390. inputSpec, worker, TVector<IArrowIStream*>({stream}));
  391. }
  392. void TInputSpecTraits<TArrowInputSpec>::PreparePullListWorker(
  393. const TArrowInputSpec& inputSpec, IPullListWorker* worker,
  394. const TVector<IArrowIStream*>& streams
  395. ) {
  396. TVector<THolder<TArrowIStreamImpl>> wrappers;
  397. for (ui32 i = 0; i < streams.size(); i++) {
  398. wrappers.push_back(MakeHolder<TArrowIStreamImpl>(streams[i]));
  399. }
  400. PrepareWorkerImpl(inputSpec, worker, std::move(wrappers));
  401. }
  402. void TInputSpecTraits<TArrowInputSpec>::PreparePullListWorker(
  403. const TArrowInputSpec& inputSpec, IPullListWorker* worker,
  404. THolder<IArrowIStream> stream
  405. ) {
  406. TInputSpecTraits<TArrowInputSpec>::PreparePullListWorker(inputSpec, worker,
  407. VectorFromHolder<IArrowIStream>(std::move(stream)));
  408. }
  409. void TInputSpecTraits<TArrowInputSpec>::PreparePullListWorker(
  410. const TArrowInputSpec& inputSpec, IPullListWorker* worker,
  411. TVector<THolder<IArrowIStream>>&& streams
  412. ) {
  413. TVector<THolder<TArrowIStreamImpl>> wrappers;
  414. for (ui32 i = 0; i < streams.size(); i++) {
  415. wrappers.push_back(MakeHolder<TArrowIStreamImpl>(std::move(streams[i])));
  416. }
  417. PrepareWorkerImpl(inputSpec, worker, std::move(wrappers));
  418. }
  419. void TInputSpecTraits<TArrowInputSpec>::PreparePullStreamWorker(
  420. const TArrowInputSpec& inputSpec, IPullStreamWorker* worker,
  421. IArrowIStream* stream
  422. ) {
  423. TInputSpecTraits<TArrowInputSpec>::PreparePullStreamWorker(
  424. inputSpec, worker, TVector<IArrowIStream*>({stream}));
  425. }
  426. void TInputSpecTraits<TArrowInputSpec>::PreparePullStreamWorker(
  427. const TArrowInputSpec& inputSpec, IPullStreamWorker* worker,
  428. const TVector<IArrowIStream*>& streams
  429. ) {
  430. TVector<THolder<TArrowIStreamImpl>> wrappers;
  431. for (ui32 i = 0; i < streams.size(); i++) {
  432. wrappers.push_back(MakeHolder<TArrowIStreamImpl>(streams[i]));
  433. }
  434. PrepareWorkerImpl(inputSpec, worker, std::move(wrappers));
  435. }
  436. void TInputSpecTraits<TArrowInputSpec>::PreparePullStreamWorker(
  437. const TArrowInputSpec& inputSpec, IPullStreamWorker* worker,
  438. THolder<IArrowIStream> stream
  439. ) {
  440. TInputSpecTraits<TArrowInputSpec>::PreparePullStreamWorker(
  441. inputSpec, worker, VectorFromHolder<IArrowIStream>(std::move(stream)));
  442. }
  443. void TInputSpecTraits<TArrowInputSpec>::PreparePullStreamWorker(
  444. const TArrowInputSpec& inputSpec, IPullStreamWorker* worker,
  445. TVector<THolder<IArrowIStream>>&& streams
  446. ) {
  447. TVector<THolder<TArrowIStreamImpl>> wrappers;
  448. for (ui32 i = 0; i < streams.size(); i++) {
  449. wrappers.push_back(MakeHolder<TArrowIStreamImpl>(std::move(streams[i])));
  450. }
  451. PrepareWorkerImpl(inputSpec, worker, std::move(wrappers));
  452. }
  453. ConsumerType TInputSpecTraits<TArrowInputSpec>::MakeConsumer(
  454. const TArrowInputSpec& inputSpec, TWorkerHolder<IPushStreamWorker> worker
  455. ) {
  456. return MakeHolder<TArrowConsumerImpl>(inputSpec, std::move(worker));
  457. }
  458. TArrowOutputSpec::TArrowOutputSpec(const NYT::TNode& schema)
  459. : Schema_(schema)
  460. {
  461. }
  462. const NYT::TNode& TArrowOutputSpec::GetSchema() const {
  463. return Schema_;
  464. }
  465. PullListReturnType TOutputSpecTraits<TArrowOutputSpec>::ConvertPullListWorkerToOutputType(
  466. const TArrowOutputSpec& outputSpec, TWorkerHolder<IPullListWorker> worker
  467. ) {
  468. return MakeHolder<TArrowListImpl>(outputSpec, std::move(worker));
  469. }
  470. PullStreamReturnType TOutputSpecTraits<TArrowOutputSpec>::ConvertPullStreamWorkerToOutputType(
  471. const TArrowOutputSpec& outputSpec, TWorkerHolder<IPullStreamWorker> worker
  472. ) {
  473. return MakeHolder<TArrowStreamImpl>(outputSpec, std::move(worker));
  474. }
  475. void TOutputSpecTraits<TArrowOutputSpec>::SetConsumerToWorker(
  476. const TArrowOutputSpec& outputSpec, IPushStreamWorker* worker,
  477. THolder<IConsumer<TOutputItemType>> consumer
  478. ) {
  479. worker->SetConsumer(MakeHolder<TArrowPushRelayImpl>(outputSpec, worker, std::move(consumer)));
  480. }