#include "spec.h" #include #include #include #include #include #include using namespace NYql::NPureCalc; using namespace NKikimr::NUdf; using namespace NKikimr::NMiniKQL; using IArrowIStream = typename TInputSpecTraits::IInputStream; using InputItemType = typename TInputSpecTraits::TInputItemType; using OutputItemType = typename TOutputSpecTraits::TOutputItemType; using PullListReturnType = typename TOutputSpecTraits::TPullListReturnType; using PullStreamReturnType = typename TOutputSpecTraits::TPullStreamReturnType; using ConsumerType = typename TInputSpecTraits::TConsumerType; namespace { template inline TVector> VectorFromHolder(THolder holder) { TVector> result; result.push_back(std::move(holder)); return result; } class TArrowIStreamImpl : public IArrowIStream { private: IArrowIStream* Underlying_; // If we own Underlying_, than Owned_ == Underlying_; // otherwise Owned_ is nullptr. THolder Owned_; TArrowIStreamImpl(IArrowIStream* underlying, THolder owned) : Underlying_(underlying) , Owned_(std::move(owned)) { } public: TArrowIStreamImpl(THolder stream) : TArrowIStreamImpl(stream.Get(), nullptr) { Owned_ = std::move(stream); } TArrowIStreamImpl(IArrowIStream* stream) : TArrowIStreamImpl(stream, nullptr) { } InputItemType Fetch() { return Underlying_->Fetch(); } }; /** * Converts input Datums to unboxed values. */ class TArrowInputConverter { protected: const THolderFactory& Factory_; TVector DatumToMemberIDMap_; size_t BatchLengthID_; public: explicit TArrowInputConverter( const TArrowInputSpec& inputSpec, ui32 index, IWorker* worker ) : Factory_(worker->GetGraph().GetHolderFactory()) { const NYT::TNode& inputSchema = inputSpec.GetSchema(index); // Deduce the schema from the input MKQL type, if no is // provided by . const NYT::TNode& schema = inputSchema.IsEntity() ? worker->MakeInputSchema(index) : inputSchema; const auto* type = worker->GetRawInputType(index); Y_ENSURE(type->IsStruct()); Y_ENSURE(schema.ChildAsString(0) == "StructType"); const auto& members = schema.ChildAsList(1); DatumToMemberIDMap_.resize(members.size()); for (size_t i = 0; i < DatumToMemberIDMap_.size(); i++) { const auto& name = members[i].ChildAsString(0); const auto& memberIndex = type->FindMemberIndex(name); Y_ENSURE(memberIndex); DatumToMemberIDMap_[i] = *memberIndex; } const auto& batchLengthID = type->FindMemberIndex(PurecalcBlockColumnLength); Y_ENSURE(batchLengthID); BatchLengthID_ = *batchLengthID; } void DoConvert(arrow::compute::ExecBatch* batch, TUnboxedValue& result) { size_t nvalues = DatumToMemberIDMap_.size(); Y_ENSURE(nvalues == static_cast(batch->num_values())); TUnboxedValue* datums = nullptr; result = Factory_.CreateDirectArrayHolder(nvalues + 1, datums); for (size_t i = 0; i < nvalues; i++) { const ui32 id = DatumToMemberIDMap_[i]; datums[id] = Factory_.CreateArrowBlock(std::move(batch->values[i])); } arrow::Datum length(std::make_shared(batch->length)); datums[BatchLengthID_] = Factory_.CreateArrowBlock(std::move(length)); } }; /** * Converts unboxed values to output Datums (single-output program case). */ class TArrowOutputConverter { protected: const THolderFactory& Factory_; TVector DatumToMemberIDMap_; THolder Batch_; size_t BatchLengthID_; public: explicit TArrowOutputConverter( const TArrowOutputSpec& outputSpec, IWorker* worker ) : Factory_(worker->GetGraph().GetHolderFactory()) { Batch_.Reset(new arrow::compute::ExecBatch); const NYT::TNode& outputSchema = outputSpec.GetSchema(); // Deduce the schema from the output MKQL type, if no is // provided by . const NYT::TNode& schema = outputSchema.IsEntity() ? worker->MakeOutputSchema() : outputSchema; const auto* type = worker->GetRawOutputType(); Y_ENSURE(type->IsStruct()); Y_ENSURE(schema.ChildAsString(0) == "StructType"); const auto* stype = AS_TYPE(NKikimr::NMiniKQL::TStructType, type); const auto& members = schema.ChildAsList(1); DatumToMemberIDMap_.resize(members.size()); for (size_t i = 0; i < DatumToMemberIDMap_.size(); i++) { const auto& name = members[i].ChildAsString(0); const auto& memberIndex = stype->FindMemberIndex(name); Y_ENSURE(memberIndex); DatumToMemberIDMap_[i] = *memberIndex; } const auto& batchLengthID = stype->FindMemberIndex(PurecalcBlockColumnLength); Y_ENSURE(batchLengthID); BatchLengthID_ = *batchLengthID; } OutputItemType DoConvert(TUnboxedValue value) { OutputItemType batch = Batch_.Get(); size_t nvalues = DatumToMemberIDMap_.size(); const auto& sizeValue = value.GetElement(BatchLengthID_); const auto& sizeDatum = TArrowBlock::From(sizeValue).GetDatum(); Y_ENSURE(sizeDatum.is_scalar()); const auto& sizeScalar = sizeDatum.scalar(); const auto& sizeData = arrow::internal::checked_cast(*sizeScalar); const int64_t length = sizeData.value; TVector datums(nvalues); for (size_t i = 0; i < nvalues; i++) { const ui32 id = DatumToMemberIDMap_[i]; const auto& datumValue = value.GetElement(id); const auto& datum = TArrowBlock::From(datumValue).GetDatum(); datums[i] = datum; if (datum.is_scalar()) { continue; } Y_ENSURE(datum.length() == length); } *batch = arrow::compute::ExecBatch(std::move(datums), length); return batch; } }; /** * List (or, better, stream) of unboxed values. * Used as an input value in pull workers. */ class TArrowListValue final: public TCustomListValue { private: mutable bool HasIterator_ = false; THolder Underlying_; IWorker* Worker_; TArrowInputConverter Converter_; TScopedAlloc& ScopedAlloc_; public: TArrowListValue( TMemoryUsageInfo* memInfo, const TArrowInputSpec& inputSpec, ui32 index, THolder underlying, IWorker* worker ) : TCustomListValue(memInfo) , Underlying_(std::move(underlying)) , Worker_(worker) , Converter_(inputSpec, index, Worker_) , ScopedAlloc_(Worker_->GetScopedAlloc()) { } ~TArrowListValue() override { { // This list value stored in the worker's computation graph and // destroyed upon the computation graph's destruction. This brings // us to an interesting situation: scoped alloc is acquired, worker // and computation graph are half-way destroyed, and now it's our // turn to die. The problem is, the underlying stream may own // another worker. This happens when chaining programs. Now, to // destroy that worker correctly, we need to release our scoped // alloc (because that worker has its own computation graph and // scoped alloc). // By the way, note that we shouldn't interact with the worker here // because worker is in the middle of its own destruction. So we're // using our own reference to the scoped alloc. That reference is // alive because scoped alloc destroyed after computation graph. auto unguard = Unguard(ScopedAlloc_); Underlying_.Destroy(); } } TUnboxedValue GetListIterator() const override { YQL_ENSURE(!HasIterator_, "Only one pass over input is supported"); HasIterator_ = true; return TUnboxedValuePod(const_cast(this)); } bool Next(TUnboxedValue& result) override { arrow::compute::ExecBatch* batch; { auto unguard = Unguard(ScopedAlloc_); batch = Underlying_->Fetch(); } if (!batch) { return false; } Converter_.DoConvert(batch, result); return true; } EFetchStatus Fetch(TUnboxedValue& result) override { if (Next(result)) { return EFetchStatus::Ok; } else { return EFetchStatus::Finish; } } }; /** * Arrow input stream for unboxed value lists. */ class TArrowListImpl final: public IStream { protected: TWorkerHolder WorkerHolder_; TArrowOutputConverter Converter_; public: explicit TArrowListImpl( const TArrowOutputSpec& outputSpec, TWorkerHolder worker ) : WorkerHolder_(std::move(worker)) , Converter_(outputSpec, WorkerHolder_.Get()) { } OutputItemType Fetch() override { TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator()); with_lock(WorkerHolder_->GetScopedAlloc()) { TUnboxedValue value; if (!WorkerHolder_->GetOutputIterator().Next(value)) { return TOutputSpecTraits::StreamSentinel; } return Converter_.DoConvert(value); } } }; /** * Arrow input stream for unboxed value streams. */ class TArrowStreamImpl final: public IStream { protected: TWorkerHolder WorkerHolder_; TArrowOutputConverter Converter_; public: explicit TArrowStreamImpl(const TArrowOutputSpec& outputSpec, TWorkerHolder worker) : WorkerHolder_(std::move(worker)) , Converter_(outputSpec, WorkerHolder_.Get()) { } OutputItemType Fetch() override { TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator()); with_lock(WorkerHolder_->GetScopedAlloc()) { TUnboxedValue value; auto status = WorkerHolder_->GetOutput().Fetch(value); YQL_ENSURE(status != EFetchStatus::Yield, "Yield is not supported in pull mode"); if (status == EFetchStatus::Finish) { return TOutputSpecTraits::StreamSentinel; } return Converter_.DoConvert(value); } } }; /** * Consumer which converts Datums to unboxed values and relays them to the * worker. Used as a return value of the push processor's Process function. */ class TArrowConsumerImpl final: public IConsumer { private: TWorkerHolder WorkerHolder_; TArrowInputConverter Converter_; public: explicit TArrowConsumerImpl( const TArrowInputSpec& inputSpec, TWorkerHolder worker ) : TArrowConsumerImpl(inputSpec, 0, std::move(worker)) { } explicit TArrowConsumerImpl( const TArrowInputSpec& inputSpec, ui32 index, TWorkerHolder worker ) : WorkerHolder_(std::move(worker)) , Converter_(inputSpec, index, WorkerHolder_.Get()) { } void OnObject(arrow::compute::ExecBatch* batch) override { TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator()); with_lock(WorkerHolder_->GetScopedAlloc()) { TUnboxedValue result; Converter_.DoConvert(batch, result); WorkerHolder_->Push(std::move(result)); } } void OnFinish() override { TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator()); with_lock(WorkerHolder_->GetScopedAlloc()) { WorkerHolder_->OnFinish(); } } }; /** * Push relay used to convert generated unboxed value to a Datum and push it to * the user's consumer. */ class TArrowPushRelayImpl: public IConsumer { private: THolder> Underlying_; IWorker* Worker_; TArrowOutputConverter Converter_; public: TArrowPushRelayImpl( const TArrowOutputSpec& outputSpec, IPushStreamWorker* worker, THolder> underlying ) : Underlying_(std::move(underlying)) , Worker_(worker) , Converter_(outputSpec, Worker_) { } // XXX: If you've read a comment in the TArrowListValue's destructor, you // may be wondering why don't we do the same trick here. Well, that's // because in push mode, consumer is destroyed before acquiring scoped alloc // and destroying computation graph. void OnObject(const TUnboxedValue* value) override { OutputItemType message = Converter_.DoConvert(*value); auto unguard = Unguard(Worker_->GetScopedAlloc()); Underlying_->OnObject(message); } void OnFinish() override { auto unguard = Unguard(Worker_->GetScopedAlloc()); Underlying_->OnFinish(); } }; template void PrepareWorkerImpl(const TArrowInputSpec& inputSpec, TWorker* worker, TVector>&& streams ) { YQL_ENSURE(worker->GetInputsCount() == streams.size(), "number of input streams should match number of inputs provided by spec"); with_lock(worker->GetScopedAlloc()) { auto& holderFactory = worker->GetGraph().GetHolderFactory(); for (ui32 i = 0; i < streams.size(); i++) { auto input = holderFactory.template Create( inputSpec, i, std::move(streams[i]), worker); worker->SetInput(std::move(input), i); } } } } // namespace TArrowInputSpec::TArrowInputSpec(const TVector& schemas) : Schemas_(schemas) { } const TVector& TArrowInputSpec::GetSchemas() const { return Schemas_; } const NYT::TNode& TArrowInputSpec::GetSchema(ui32 index) const { return Schemas_[index]; } void TInputSpecTraits::PreparePullListWorker( const TArrowInputSpec& inputSpec, IPullListWorker* worker, IArrowIStream* stream ) { TInputSpecTraits::PreparePullListWorker( inputSpec, worker, TVector({stream})); } void TInputSpecTraits::PreparePullListWorker( const TArrowInputSpec& inputSpec, IPullListWorker* worker, const TVector& streams ) { TVector> wrappers; for (ui32 i = 0; i < streams.size(); i++) { wrappers.push_back(MakeHolder(streams[i])); } PrepareWorkerImpl(inputSpec, worker, std::move(wrappers)); } void TInputSpecTraits::PreparePullListWorker( const TArrowInputSpec& inputSpec, IPullListWorker* worker, THolder stream ) { TInputSpecTraits::PreparePullListWorker(inputSpec, worker, VectorFromHolder(std::move(stream))); } void TInputSpecTraits::PreparePullListWorker( const TArrowInputSpec& inputSpec, IPullListWorker* worker, TVector>&& streams ) { TVector> wrappers; for (ui32 i = 0; i < streams.size(); i++) { wrappers.push_back(MakeHolder(std::move(streams[i]))); } PrepareWorkerImpl(inputSpec, worker, std::move(wrappers)); } void TInputSpecTraits::PreparePullStreamWorker( const TArrowInputSpec& inputSpec, IPullStreamWorker* worker, IArrowIStream* stream ) { TInputSpecTraits::PreparePullStreamWorker( inputSpec, worker, TVector({stream})); } void TInputSpecTraits::PreparePullStreamWorker( const TArrowInputSpec& inputSpec, IPullStreamWorker* worker, const TVector& streams ) { TVector> wrappers; for (ui32 i = 0; i < streams.size(); i++) { wrappers.push_back(MakeHolder(streams[i])); } PrepareWorkerImpl(inputSpec, worker, std::move(wrappers)); } void TInputSpecTraits::PreparePullStreamWorker( const TArrowInputSpec& inputSpec, IPullStreamWorker* worker, THolder stream ) { TInputSpecTraits::PreparePullStreamWorker( inputSpec, worker, VectorFromHolder(std::move(stream))); } void TInputSpecTraits::PreparePullStreamWorker( const TArrowInputSpec& inputSpec, IPullStreamWorker* worker, TVector>&& streams ) { TVector> wrappers; for (ui32 i = 0; i < streams.size(); i++) { wrappers.push_back(MakeHolder(std::move(streams[i]))); } PrepareWorkerImpl(inputSpec, worker, std::move(wrappers)); } ConsumerType TInputSpecTraits::MakeConsumer( const TArrowInputSpec& inputSpec, TWorkerHolder worker ) { return MakeHolder(inputSpec, std::move(worker)); } TArrowOutputSpec::TArrowOutputSpec(const NYT::TNode& schema) : Schema_(schema) { } const NYT::TNode& TArrowOutputSpec::GetSchema() const { return Schema_; } PullListReturnType TOutputSpecTraits::ConvertPullListWorkerToOutputType( const TArrowOutputSpec& outputSpec, TWorkerHolder worker ) { return MakeHolder(outputSpec, std::move(worker)); } PullStreamReturnType TOutputSpecTraits::ConvertPullStreamWorkerToOutputType( const TArrowOutputSpec& outputSpec, TWorkerHolder worker ) { return MakeHolder(outputSpec, std::move(worker)); } void TOutputSpecTraits::SetConsumerToWorker( const TArrowOutputSpec& outputSpec, IPushStreamWorker* worker, THolder> consumer ) { worker->SetConsumer(MakeHolder(outputSpec, worker, std::move(consumer))); }