123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446 |
- #pragma once
- #include "util.h"
- #include "bit_util.h"
- #include "block_io_buffer.h"
- #include "block_item.h"
- #include "dispatch_traits.h"
- #include <yql/essentials/public/udf/udf_value.h>
- #include <yql/essentials/public/udf/udf_value_builder.h>
- #include <yql/essentials/public/udf/udf_type_inspection.h>
- #include <arrow/datum.h>
- #include <arrow/c/bridge.h>
- #include <deque>
- namespace NYql {
- namespace NUdf {
- class IArrayBuilder {
- public:
- struct TArrayDataItem {
- const arrow::ArrayData* Data = nullptr;
- ui64 StartOffset;
- };
- virtual ~IArrayBuilder() = default;
- virtual size_t MaxLength() const = 0;
- virtual void Add(NUdf::TUnboxedValuePod value) = 0;
- virtual void Add(TBlockItem value) = 0;
- virtual void Add(TBlockItem value, size_t count) = 0;
- virtual void Add(TInputBuffer& input) = 0;
- virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0;
- virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) = 0;
- virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, const ui64* indexes, size_t count) = 0;
- virtual arrow::Datum Build(bool finish) = 0;
- };
- inline const IArrayBuilder::TArrayDataItem* LookupArrayDataItem(const IArrayBuilder::TArrayDataItem* arrays, size_t arrayCount, ui64& idx) {
- IArrayBuilder::TArrayDataItem lookup{ nullptr, idx };
- auto it = std::lower_bound(arrays, arrays + arrayCount, lookup, [](const auto& left, const auto& right) {
- return left.StartOffset < right.StartOffset;
- });
- if (it == arrays + arrayCount || it->StartOffset > idx) {
- --it;
- }
- Y_DEBUG_ABORT_UNLESS(it->StartOffset <= idx);
- idx -= it->StartOffset;
- return it;
- }
- class IScalarBuilder {
- public:
- virtual ~IScalarBuilder() = default;
- virtual arrow::Datum Build(TBlockItem value) const = 0;
- virtual arrow::Datum Build(NUdf::TUnboxedValuePod value) const = 0;
- };
- inline std::shared_ptr<arrow::DataType> GetArrowType(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
- auto arrowTypeHandle = typeInfoHelper.MakeArrowType(type);
- Y_ENSURE(arrowTypeHandle);
- ArrowSchema s;
- arrowTypeHandle->Export(&s);
- return ARROW_RESULT(arrow::ImportType(&s));
- }
- class TArrayBuilderBase : public IArrayBuilder {
- using Self = TArrayBuilderBase;
- public:
- using Ptr = std::unique_ptr<TArrayBuilderBase>;
- struct TBlockArrayTree {
- using Ptr = std::shared_ptr<TBlockArrayTree>;
- std::deque<std::shared_ptr<arrow::ArrayData>> Payload;
- std::vector<TBlockArrayTree::Ptr> Children;
- };
- struct TParams {
- size_t* TotalAllocated = nullptr;
- TMaybe<ui8> MinFillPercentage; // if an internal buffer size is smaller than % of capacity, then shrink the buffer.
- };
- TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params)
- : ArrowType(std::move(arrowType))
- , Pool(&pool)
- , MaxLen(maxLen)
- , MaxBlockSizeInBytes(typeInfoHelper.GetMaxBlockBytes())
- , MinFillPercentage(params.MinFillPercentage)
- , TotalAllocated_(params.TotalAllocated)
- {
- Y_ABORT_UNLESS(ArrowType);
- Y_ABORT_UNLESS(maxLen > 0);
- }
- TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params)
- : TArrayBuilderBase(typeInfoHelper, GetArrowType(typeInfoHelper, type), pool, maxLen, params)
- {
- }
- size_t MaxLength() const final {
- return MaxLen;
- }
- void Add(NUdf::TUnboxedValuePod value) final {
- Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
- DoAdd(value);
- CurrLen++;
- }
- void Add(TBlockItem value) final {
- Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
- DoAdd(value);
- CurrLen++;
- }
- void Add(TBlockItem value, size_t count) final {
- Y_DEBUG_ABORT_UNLESS(CurrLen + count <= MaxLen);
- DoAdd(value, count);
- CurrLen += count;
- }
- void Add(TInputBuffer& input) final {
- Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
- DoAdd(input);
- CurrLen++;
- }
- void AddDefault() {
- Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
- DoAddDefault();
- CurrLen++;
- }
- inline void AddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
- TArrayDataItem item = { &array, 0 };
- Self::AddMany(&item, 1, beginIndex, count);
- }
- inline void AddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
- TArrayDataItem item = { &array, 0 };
- Self::AddMany(&item, 1, indexes, count);
- }
- void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) final {
- Y_ABORT_UNLESS(size_t(array.length) == bitmapSize);
- Y_ABORT_UNLESS(popCount <= bitmapSize);
- Y_ABORT_UNLESS(CurrLen + popCount <= MaxLen);
- if (popCount) {
- DoAddMany(array, sparseBitmap, popCount);
- }
- CurrLen += popCount;
- }
- void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) final {
- Y_ABORT_UNLESS(arrays);
- Y_ABORT_UNLESS(arrayCount > 0);
- if (arrayCount == 1) {
- Y_ABORT_UNLESS(arrays->Data);
- DoAddMany(*arrays->Data, beginIndex, count);
- } else {
- ui64 idx = beginIndex;
- auto item = LookupArrayDataItem(arrays, arrayCount, idx);
- size_t avail = item->Data->length;
- size_t toAdd = count;
- Y_ABORT_UNLESS(idx <= avail);
- while (toAdd) {
- size_t adding = std::min(avail, toAdd);
- DoAddMany(*item->Data, idx, adding);
- avail -= adding;
- toAdd -= adding;
- if (!avail && toAdd) {
- ++item;
- Y_ABORT_UNLESS(item < arrays + arrayCount);
- avail = item->Data->length;
- idx = 0;
- }
- }
- }
- CurrLen += count;
- }
- void AddMany(const TArrayDataItem* arrays, size_t arrayCount, const ui64* indexes, size_t count) final {
- Y_ABORT_UNLESS(arrays);
- Y_ABORT_UNLESS(arrayCount > 0);
- Y_ABORT_UNLESS(indexes);
- Y_ABORT_UNLESS(CurrLen + count <= MaxLen);
- if (arrayCount == 1) {
- Y_ABORT_UNLESS(arrays->Data);
- DoAddMany(*arrays->Data, indexes, count);
- CurrLen += count;
- } else {
- const IArrayBuilder::TArrayDataItem* currData = nullptr;
- TVector<ui64> currDataIndexes;
- for (size_t i = 0; i < count; ++i) {
- ui64 idx = indexes[i];
- const IArrayBuilder::TArrayDataItem* data = LookupArrayDataItem(arrays, arrayCount, idx);
- if (!currData) {
- currData = data;
- }
- if (data != currData) {
- DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size());
- CurrLen += currDataIndexes.size();
- currDataIndexes.clear();
- currData = data;
- }
- currDataIndexes.push_back(idx);
- }
- if (!currDataIndexes.empty()) {
- DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size());
- CurrLen += currDataIndexes.size();
- }
- }
- }
- arrow::Datum Build(bool finish) final {
- auto tree = BuildTree(finish);
- TVector<std::shared_ptr<arrow::ArrayData>> chunks;
- while (size_t size = CalcSliceSize(*tree)) {
- chunks.push_back(Slice(*tree, size));
- }
- return MakeArray(chunks);
- }
- TBlockArrayTree::Ptr BuildTree(bool finish) {
- auto result = DoBuildTree(finish);
- CurrLen = 0;
- return result;
- }
- protected:
- virtual void DoAdd(NUdf::TUnboxedValuePod value) = 0;
- virtual void DoAdd(TBlockItem value) = 0;
- virtual void DoAdd(TBlockItem value, size_t count) {
- for (size_t i = 0; i < count; ++i) {
- DoAdd(value);
- }
- }
- virtual void DoAdd(TInputBuffer& input) = 0;
- virtual void DoAddDefault() = 0;
- virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0;
- virtual void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) = 0;
- virtual void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) = 0;
- virtual TBlockArrayTree::Ptr DoBuildTree(bool finish) = 0;
- // returns the newly allocated size in bytes
- virtual size_t DoReserve() = 0;
- private:
- static size_t CalcSliceSize(const TBlockArrayTree& tree) {
- if (tree.Payload.empty()) {
- return 0;
- }
- if (!tree.Children.empty()) {
- Y_ABORT_UNLESS(tree.Payload.size() == 1);
- size_t result = std::numeric_limits<size_t>::max();
- for (auto& child : tree.Children) {
- size_t childSize = CalcSliceSize(*child);
- result = std::min(result, childSize);
- }
- Y_ABORT_UNLESS(result <= size_t(tree.Payload.front()->length));
- return result;
- }
- int64_t result = tree.Payload.front()->length;
- Y_ABORT_UNLESS(result > 0);
- return static_cast<size_t>(result);
- }
- static std::shared_ptr<arrow::ArrayData> Slice(TBlockArrayTree& tree, size_t size) {
- Y_ABORT_UNLESS(size > 0);
- Y_ABORT_UNLESS(!tree.Payload.empty());
- auto& main = tree.Payload.front();
- std::shared_ptr<arrow::ArrayData> sliced;
- if (size == size_t(main->length)) {
- sliced = main;
- tree.Payload.pop_front();
- } else {
- Y_ABORT_UNLESS(size < size_t(main->length));
- sliced = Chop(main, size);
- }
- if (!tree.Children.empty()) {
- std::vector<std::shared_ptr<arrow::ArrayData>> children;
- for (auto& child : tree.Children) {
- children.push_back(Slice(*child, size));
- }
- sliced->child_data = std::move(children);
- if (tree.Payload.empty()) {
- tree.Children.clear();
- }
- }
- return sliced;
- }
- protected:
- size_t GetCurrLen() const {
- return CurrLen;
- }
- void SetCurrLen(size_t len) {
- Y_ABORT_UNLESS(len <= MaxLen);
- CurrLen = len;
- }
- void Reserve() {
- auto allocated = DoReserve();
- if (TotalAllocated_) {
- *TotalAllocated_ += allocated;
- }
- }
- void AddExtraAllocated(size_t bytes) {
- if (TotalAllocated_) {
- *TotalAllocated_ += bytes;
- }
- }
- const std::shared_ptr<arrow::DataType> ArrowType;
- arrow::MemoryPool* const Pool;
- const size_t MaxLen;
- const size_t MaxBlockSizeInBytes;
- const TMaybe<ui8> MinFillPercentage;
- private:
- size_t CurrLen = 0;
- size_t* TotalAllocated_ = nullptr;
- };
- template<typename TLayout, bool Nullable, typename TDerived>
- class TFixedSizeArrayBuilderBase : public TArrayBuilderBase {
- public:
- TFixedSizeArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params)
- : TArrayBuilderBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
- {
- Reserve();
- }
- TFixedSizeArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params)
- : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params)
- {
- Reserve();
- }
- void UnsafeReserve(size_t length) {
- SetCurrLen(length);
- }
- TLayout* MutableData() {
- return DataPtr;
- }
- ui8* MutableValidMask() {
- return NullPtr;
- }
- void DoAdd(NUdf::TUnboxedValuePod value) final {
- if constexpr (Nullable) {
- if (!value) {
- DoAddNull();
- return;
- }
- NullPtr[GetCurrLen()] = 1;
- }
- static_cast<TDerived*>(this)->DoAddNotNull(value);
- }
- void DoAdd(TBlockItem value) final {
- if constexpr (Nullable) {
- if (!value) {
- DoAddNull();
- return;
- }
- NullPtr[GetCurrLen()] = 1;
- }
- static_cast<TDerived*>(this)->DoAddNotNull(value);
- }
- void DoAddNull() {
- if constexpr (Nullable) {
- NullPtr[GetCurrLen()] = 0;
- PlaceItem(TLayout{});
- }
- }
- void DoAdd(TBlockItem value, size_t count) final {
- if constexpr (Nullable) {
- if (!value) {
- std::fill(NullPtr + GetCurrLen(), NullPtr + GetCurrLen() + count, 0);
- std::fill(DataPtr + GetCurrLen(), DataPtr + GetCurrLen() + count, TLayout{});
- return;
- }
- std::fill(NullPtr + GetCurrLen(), NullPtr + GetCurrLen() + count, 1);
- }
- static_cast<TDerived*>(this)->DoAddNotNull(value, count);
- }
- void DoAdd(TInputBuffer &input) final {
- if constexpr (Nullable) {
- if (!input.PopChar()) {
- DoAddNull();
- return;
- }
- }
- static_cast<TDerived*>(this)->DoAddNotNull(input);
- }
- void DoAddDefault() final {
- if constexpr (Nullable) {
- NullPtr[GetCurrLen()] = 1;
- }
- PlaceItem(TLayout{});
- }
- void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
- Y_ABORT_UNLESS(array.buffers.size() > 1);
- if constexpr (Nullable) {
- if (array.buffers.front()) {
- ui8* dstBitmap = NullPtr + GetCurrLen();
- CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
- } else {
- ui8* dstBitmap = NullPtr + GetCurrLen();
- std::fill_n(dstBitmap, popCount, 1);
- }
- }
- const TLayout* src = array.GetValues<TLayout>(1);
- TLayout* dst = DataPtr + GetCurrLen();
- CompressArray(src, sparseBitmap, dst, array.length);
- }
- void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
- Y_ABORT_UNLESS(array.buffers.size() > 1);
- if constexpr (Nullable) {
- for (size_t i = beginIndex; i < beginIndex + count; ++i) {
- NullPtr[GetCurrLen() + i - beginIndex] = !IsNull(array, i);
- }
- }
- const TLayout* values = array.GetValues<TLayout>(1);
- for (size_t i = beginIndex; i < beginIndex + count; ++i) {
- ::new(DataPtr + GetCurrLen() + i - beginIndex) TLayout(values[i]);
- }
- }
- void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
- Y_ABORT_UNLESS(array.buffers.size() > 1);
- if constexpr (Nullable) {
- for (size_t i = 0; i < count; ++i) {
- NullPtr[GetCurrLen() + i] = !IsNull(array, indexes[i]);
- }
- }
- const TLayout* values = array.GetValues<TLayout>(1);
- for (size_t i = 0; i < count; ++i) {
- ::new(DataPtr + GetCurrLen() + i) TLayout(values[indexes[i]]);
- }
- }
- TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
- const size_t len = GetCurrLen();
- std::shared_ptr<arrow::Buffer> nulls;
- if constexpr (Nullable) {
- NullBuilder->UnsafeAdvance(len);
- nulls = NullBuilder->Finish();
- nulls = MakeDenseBitmap(nulls->data(), len, Pool);
- }
- DataBuilder->UnsafeAdvance(len);
- std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish();
- TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
- result->Payload.push_back(arrow::ArrayData::Make(ArrowType, len, {nulls, data}));
- NullBuilder.reset();
- DataBuilder.reset();
- if (!finish) {
- Reserve();
- }
- return result;
- }
- protected:
- void PlaceItem(TLayout&& value) {
- ::new(DataPtr + GetCurrLen()) TLayout(std::move(value));
- }
- TLayout* DataPtr = nullptr;
- private:
- size_t DoReserve() final {
- DataBuilder = std::make_unique<TTypedBufferBuilder<TLayout>>(Pool, MinFillPercentage);
- DataBuilder->Reserve(MaxLen + 1);
- DataPtr = DataBuilder->MutableData();
- auto result = DataBuilder->Capacity();
- if constexpr (Nullable) {
- NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool, MinFillPercentage);
- NullBuilder->Reserve(MaxLen + 1);
- NullPtr = NullBuilder->MutableData();
- result += NullBuilder->Capacity();
- }
- return result;
- }
- std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
- std::unique_ptr<TTypedBufferBuilder<TLayout>> DataBuilder;
- ui8* NullPtr = nullptr;
- };
- template<typename TLayout, bool Nullable>
- class TFixedSizeArrayBuilder final: public TFixedSizeArrayBuilderBase<TLayout, Nullable, TFixedSizeArrayBuilder<TLayout, Nullable>> {
- using TSelf = TFixedSizeArrayBuilder<TLayout, Nullable>;
- using TBase = TFixedSizeArrayBuilderBase<TLayout, Nullable, TSelf>;
- using TParams = TArrayBuilderBase::TParams;
- public:
- TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
- : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
- {}
- TFixedSizeArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
- : TBase(typeInfoHelper, type, pool, maxLen, params)
- {}
- void DoAddNotNull(TUnboxedValuePod value) {
- this->PlaceItem(value.Get<TLayout>());
- }
- void DoAddNotNull(TBlockItem value) {
- this->PlaceItem(value.Get<TLayout>());
- }
- void DoAddNotNull(TInputBuffer& input) {
- this->DoAdd(TBlockItem(input.PopNumber<TLayout>()));
- }
- void DoAddNotNull(TBlockItem value, size_t count) {
- std::fill(this->DataPtr + this->GetCurrLen(), this->DataPtr + this->GetCurrLen() + count, value.Get<TLayout>());
- }
- };
- template<bool Nullable>
- class TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable> final: public TFixedSizeArrayBuilderBase<NYql::NDecimal::TInt128, Nullable, TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>> {
- using TSelf = TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>;
- using TBase = TFixedSizeArrayBuilderBase<NYql::NDecimal::TInt128, Nullable, TSelf>;
- using TParams = TArrayBuilderBase::TParams;
- public:
- TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
- : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
- {}
- TFixedSizeArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
- : TBase(typeInfoHelper, type, pool, maxLen, params)
- {}
- void DoAddNotNull(TUnboxedValuePod value) {
- this->PlaceItem(value.GetInt128());
- }
- void DoAddNotNull(TBlockItem value) {
- this->PlaceItem(value.GetInt128());
- }
- void DoAddNotNull(TInputBuffer& input) {
- this->DoAdd(TBlockItem(input.PopNumber<NYql::NDecimal::TInt128>()));
- }
- void DoAddNotNull(TBlockItem value, size_t count) {
- std::fill(this->DataPtr + this->GetCurrLen(), this->DataPtr + this->GetCurrLen() + count, value.GetInt128());
- }
- };
- template<bool Nullable>
- class TResourceArrayBuilder final: public TFixedSizeArrayBuilderBase<TUnboxedValue, Nullable, TResourceArrayBuilder<Nullable>> {
- using TBase = TFixedSizeArrayBuilderBase<TUnboxedValue, Nullable, TResourceArrayBuilder<Nullable>>;
- using TParams = TArrayBuilderBase::TParams;
- public:
- TResourceArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
- : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
- {}
- TResourceArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
- : TBase(typeInfoHelper, type, pool, maxLen, params)
- {}
- void DoAddNotNull(TUnboxedValuePod value) {
- this->PlaceItem(TUnboxedValue(value));
- }
- TUnboxedValue FromBlockItem(TBlockItem item) {
- TUnboxedValue val;
- std::memcpy(val.GetRawPtr(), item.GetRawPtr(), sizeof(val));
- val.Ref();
- return val;
- }
- void DoAddNotNull(TBlockItem item) {
- this->PlaceItem(FromBlockItem(item));
- }
- void DoAddNotNull(TInputBuffer& input) {
- this->DoAdd(input.PopNumber<TUnboxedValuePod>());
- }
- void DoAddNotNull(TBlockItem item, size_t count) {
- for (size_t i = 0; i < count; ++i) {
- ::new(this->DataPtr + this->GetCurrLen() + i) TUnboxedValue(FromBlockItem(item));
- }
- }
- };
- template<typename TStringType, bool Nullable, EPgStringType PgString = EPgStringType::None>
- class TStringArrayBuilder final : public TArrayBuilderBase {
- using TOffset = typename TStringType::offset_type;
- public:
- TStringArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
- : TArrayBuilderBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
- {
- Reserve();
- }
- TStringArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
- : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params)
- {
- Reserve();
- }
- void SetPgBuilder(const NUdf::IPgBuilder* pgBuilder, i32 typeLen) {
- Y_ENSURE(PgString != EPgStringType::None);
- PgBuilder = pgBuilder;
- TypeLen = typeLen;
- }
- void DoAdd(NUdf::TUnboxedValuePod value) final {
- if constexpr (Nullable) {
- if (!value) {
- return DoAdd(TBlockItem{});
- }
- }
- if constexpr (PgString == EPgStringType::CString) {
- static_assert(Nullable);
- auto buf = PgBuilder->AsCStringBuffer(value);
- AddPgItem(buf);
- } else if constexpr (PgString == EPgStringType::Text) {
- static_assert(Nullable);
- auto buf = PgBuilder->AsTextBuffer(value);
- AddPgItem(buf);
- } else if constexpr (PgString == EPgStringType::Fixed) {
- static_assert(Nullable);
- auto buf = PgBuilder->AsFixedStringBuffer(value, TypeLen);
- AddPgItem(buf);
- } else {
- DoAdd(TBlockItem(value.AsStringRef()));
- }
- }
- template <bool AddCStringZero = false, ui32 AddVarHdr = 0>
- ui8* AddPgItem(TStringRef buf) {
- auto alignedSize = AlignUp(buf.Size() + sizeof(void*) + AddVarHdr + (AddCStringZero ? 1 : 0), sizeof(void*));
- auto ptr = AddNoFill(alignedSize);
- *(void**)ptr = nullptr;
- if (alignedSize > sizeof(void*)) {
- // clear padding too
- *(void**)(ptr + alignedSize - sizeof(void*)) = nullptr;
- }
- std::memcpy(ptr + sizeof(void*) + AddVarHdr, buf.Data(), buf.Size());
- if constexpr (AddCStringZero) {
- ptr[sizeof(void*) + buf.Size()] = 0;
- }
- return ptr;
- }
- ui8* AddNoFill(size_t size) {
- size_t currentLen = DataBuilder->Length();
- // empty string can always be appended
- if (size > 0 && currentLen + size > MaxBlockSizeInBytes) {
- if (currentLen) {
- FlushChunk(false);
- }
- if (size > MaxBlockSizeInBytes) {
- ReserveForLargeString(size);
- }
- }
- AppendCurrentOffset();
- auto ret = DataBuilder->End();
- DataBuilder->UnsafeAdvance(size);
- if constexpr (Nullable) {
- NullBuilder->UnsafeAppend(1);
- }
- return ret;
- }
- void DoAdd(TBlockItem value) final {
- if constexpr (Nullable) {
- if (!value) {
- NullBuilder->UnsafeAppend(0);
- AppendCurrentOffset();
- return;
- }
- }
- const std::string_view str = value.AsStringRef();
- auto ptr = AddNoFill(str.size());
- std::memcpy(ptr, str.data(), str.size());
- }
- void DoAdd(TInputBuffer& input) final {
- if constexpr (Nullable) {
- if (!input.PopChar()) {
- return DoAdd(TBlockItem{});
- }
- }
- auto str = input.PopString();
- TStringRef ref(str.data(), str.size());
- DoAdd(TBlockItem(ref));
- }
- void DoAddDefault() final {
- if constexpr (Nullable) {
- NullBuilder->UnsafeAppend(1);
- }
- AppendCurrentOffset();
- }
- void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
- Y_UNUSED(popCount);
- Y_ABORT_UNLESS(array.buffers.size() > 2);
- Y_ABORT_UNLESS(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
- const ui8* srcNulls = array.GetValues<ui8>(0, 0);
- const TOffset* srcOffset = array.GetValues<TOffset>(1);
- const ui8* srcData = array.GetValues<ui8>(2, 0);
- const ui8* chunkStart = srcData;
- const ui8* chunkEnd = chunkStart;
- size_t dataLen = DataBuilder->Length();
- ui8* dstNulls = Nullable ? NullBuilder->End() : nullptr;
- TOffset* dstOffset = OffsetsBuilder->End();
- size_t countAdded = 0;
- for (size_t i = 0; i < size_t(array.length); i++) {
- if (!sparseBitmap[i]) {
- continue;
- }
- const ui8* begin = srcData + srcOffset[i];
- const ui8* end = srcData + srcOffset[i + 1];
- const size_t strSize = end - begin;
- size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
- for (;;) {
- // try to append ith string
- if (strSize <= availBytes) {
- if (begin == chunkEnd) {
- chunkEnd = end;
- } else {
- DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
- chunkStart = begin;
- chunkEnd = end;
- }
- size_t nullOffset = i + array.offset;
- if constexpr (Nullable) {
- *dstNulls++ = srcNulls ? ((srcNulls[nullOffset >> 3] >> (nullOffset & 7)) & 1) : 1u;
- }
- *dstOffset++ = dataLen;
- dataLen += strSize;
- ++countAdded;
- break;
- }
- if (dataLen) {
- if (chunkStart != chunkEnd) {
- DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
- chunkStart = chunkEnd = srcData;
- }
- Y_ABORT_UNLESS(dataLen == DataBuilder->Length());
- OffsetsBuilder->UnsafeAdvance(countAdded);
- if constexpr (Nullable) {
- NullBuilder->UnsafeAdvance(countAdded);
- }
- FlushChunk(false);
- dataLen = 0;
- countAdded = 0;
- if constexpr (Nullable) {
- dstNulls = NullBuilder->End();
- }
- dstOffset = OffsetsBuilder->End();
- } else {
- ReserveForLargeString(strSize);
- availBytes = strSize;
- }
- }
- }
- if (chunkStart != chunkEnd) {
- DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
- }
- Y_ABORT_UNLESS(dataLen == DataBuilder->Length());
- OffsetsBuilder->UnsafeAdvance(countAdded);
- if constexpr (Nullable) {
- NullBuilder->UnsafeAdvance(countAdded);
- }
- }
- void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
- Y_ABORT_UNLESS(array.buffers.size() > 2);
- Y_ABORT_UNLESS(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
- size_t dataLen = DataBuilder->Length();
- const TOffset* offsets = array.GetValues<TOffset>(1);
- const ui8* srcData = array.GetValues<ui8>(2, 0);
- const ui8* chunkStart = srcData + offsets[beginIndex];
- const ui8* chunkEnd = chunkStart;
- for (size_t i = beginIndex; i < beginIndex + count; ++i) {
- const ui8* begin = srcData + offsets[i];
- const ui8* end = srcData + offsets[i + 1];
- const size_t strSize = end - begin;
- size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
- for (;;) {
- if (strSize <= availBytes) {
- if constexpr (Nullable) {
- NullBuilder->UnsafeAppend(!IsNull(array, i));
- }
- OffsetsBuilder->UnsafeAppend(TOffset(dataLen));
- chunkEnd = end;
- dataLen += strSize;
- break;
- }
- if (dataLen) {
- DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
- chunkStart = begin;
- chunkEnd = end;
- FlushChunk(false);
- dataLen = 0;
- } else {
- ReserveForLargeString(strSize);
- availBytes = strSize;
- }
- }
- }
- if (chunkStart != chunkEnd) {
- DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
- }
- }
- void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
- Y_ABORT_UNLESS(array.buffers.size() > 2);
- Y_ABORT_UNLESS(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
- size_t dataLen = DataBuilder->Length();
- const TOffset* offsets = array.GetValues<TOffset>(1);
- const char* strData = array.GetValues<char>(2, 0);
- for (size_t i = 0; i < count; ++i) {
- ui64 idx = indexes[i];
- std::string_view str(strData + offsets[idx], offsets[idx + 1] - offsets[idx]);
- size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
- for (;;) {
- if (str.size() <= availBytes) {
- if constexpr (Nullable) {
- NullBuilder->UnsafeAppend(!IsNull(array, idx));
- }
- OffsetsBuilder->UnsafeAppend(TOffset(dataLen));
- DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size());
- dataLen += str.size();
- break;
- }
- if (dataLen) {
- FlushChunk(false);
- dataLen = 0;
- } else {
- ReserveForLargeString(str.size());
- availBytes = str.size();
- }
- }
- }
- }
- TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
- FlushChunk(finish);
- TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
- result->Payload = std::move(Chunks);
- Chunks.clear();
- return result;
- }
- private:
- size_t DoReserve() final {
- OffsetsBuilder = std::make_unique<TTypedBufferBuilder<TOffset>>(Pool, MinFillPercentage);
- OffsetsBuilder->Reserve(MaxLen + 1);
- DataBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool, MinFillPercentage);
- DataBuilder->Reserve(MaxBlockSizeInBytes);
- auto result = OffsetsBuilder->Capacity() + DataBuilder->Capacity();
- if constexpr (Nullable) {
- NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool, MinFillPercentage);
- NullBuilder->Reserve(MaxLen + 1);
- result += NullBuilder->Capacity();
- }
- return result;
- }
- void ReserveForLargeString(size_t strSize) {
- size_t before = DataBuilder->Capacity();
- DataBuilder->Reserve(strSize);
- size_t after = DataBuilder->Capacity();
- Y_ENSURE(before <= after);
- AddExtraAllocated(after - before);
- }
- void AppendCurrentOffset() {
- OffsetsBuilder->UnsafeAppend(DataBuilder->Length());
- }
- void FlushChunk(bool finish) {
- const auto length = OffsetsBuilder->Length();
- Y_ABORT_UNLESS(length > 0);
- AppendCurrentOffset();
- std::shared_ptr<arrow::Buffer> nullBitmap;
- if constexpr (Nullable) {
- nullBitmap = NullBuilder->Finish();
- nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
- }
- std::shared_ptr<arrow::Buffer> offsets = OffsetsBuilder->Finish();
- std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish();
- Chunks.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap, offsets, data }));
- if (!finish) {
- Reserve();
- }
- }
- std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
- std::unique_ptr<TTypedBufferBuilder<TOffset>> OffsetsBuilder;
- std::unique_ptr<TTypedBufferBuilder<ui8>> DataBuilder;
- std::deque<std::shared_ptr<arrow::ArrayData>> Chunks;
- const IPgBuilder* PgBuilder = nullptr;
- i32 TypeLen = 0;
- };
- template<bool Nullable, typename TDerived>
- class TTupleArrayBuilderBase : public TArrayBuilderBase {
- public:
- TTupleArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
- : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params)
- {
- Reserve();
- }
- void DoAdd(NUdf::TUnboxedValuePod value) final {
- if constexpr (Nullable) {
- if (!value) {
- NullBuilder->UnsafeAppend(0);
- static_cast<TDerived*>(this)->AddToChildrenDefault();
- return;
- }
- NullBuilder->UnsafeAppend(1);
- }
- static_cast<TDerived*>(this)->AddToChildren(value);
- }
- void DoAdd(TBlockItem value) final {
- if constexpr (Nullable) {
- if (!value) {
- NullBuilder->UnsafeAppend(0);
- static_cast<TDerived*>(this)->AddToChildrenDefault();
- return;
- }
- NullBuilder->UnsafeAppend(1);
- }
- static_cast<TDerived*>(this)->AddToChildren(value);
- }
- void DoAdd(TInputBuffer& input) final {
- if constexpr (Nullable) {
- if (!input.PopChar()) {
- NullBuilder->UnsafeAppend(0);
- static_cast<TDerived*>(this)->AddToChildrenDefault();
- return;
- }
- NullBuilder->UnsafeAppend(1);
- }
- static_cast<TDerived*>(this)->AddToChildren(input);
- }
- void DoAddDefault() final {
- if constexpr (Nullable) {
- NullBuilder->UnsafeAppend(1);
- }
- static_cast<TDerived*>(this)->AddToChildrenDefault();
- }
- void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
- Y_ABORT_UNLESS(!array.buffers.empty());
- if constexpr (Nullable) {
- if (array.buffers.front()) {
- ui8* dstBitmap = NullBuilder->End();
- CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
- NullBuilder->UnsafeAdvance(popCount);
- } else {
- NullBuilder->UnsafeAppend(popCount, 1);
- }
- }
- static_cast<TDerived*>(this)->AddManyToChildren(array, sparseBitmap, popCount);
- }
- void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
- Y_ABORT_UNLESS(!array.buffers.empty());
- if constexpr (Nullable) {
- for (ui64 i = beginIndex; i < beginIndex + count; ++i) {
- NullBuilder->UnsafeAppend(!IsNull(array, i));
- }
- }
- static_cast<TDerived*>(this)->AddManyToChildren(array, beginIndex, count);
- }
- void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
- Y_ABORT_UNLESS(!array.buffers.empty());
- if constexpr (Nullable) {
- for (size_t i = 0; i < count; ++i) {
- NullBuilder->UnsafeAppend(!IsNull(array, indexes[i]));
- }
- }
- static_cast<TDerived*>(this)->AddManyToChildren(array, indexes, count);
- }
- TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
- TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
- std::shared_ptr<arrow::Buffer> nullBitmap;
- const size_t length = GetCurrLen();
- if constexpr (Nullable) {
- Y_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length");
- nullBitmap = NullBuilder->Finish();
- nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
- }
- Y_ABORT_UNLESS(length);
- result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap }));
- static_cast<TDerived*>(this)->BuildChildrenTree(finish, result->Children);
- if (!finish) {
- Reserve();
- }
- return result;
- }
- private:
- size_t DoReserve() final {
- if constexpr (Nullable) {
- NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool, MinFillPercentage);
- NullBuilder->Reserve(MaxLen + 1);
- return NullBuilder->Capacity();
- }
- return 0;
- }
- private:
- std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
- };
- template<bool Nullable>
- class TTupleArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTupleArrayBuilder<Nullable>> {
- using TBase = TTupleArrayBuilderBase<Nullable, TTupleArrayBuilder<Nullable>>;
- using TParams = TArrayBuilderBase::TParams;
- public:
- TTupleArrayBuilder(TVector<TArrayBuilderBase::Ptr>&& children, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool,
- size_t maxLen, const TParams& params = {})
- : TBase(typeInfoHelper, type, pool, maxLen, params)
- , Children_(std::move(children))
- {
- }
- void AddToChildrenDefault() {
- for (ui32 i = 0; i < Children_.size(); ++i) {
- Children_[i]->AddDefault();
- }
- }
- void AddToChildren(NUdf::TUnboxedValuePod value) {
- auto elements = value.GetElements();
- if (elements) {
- for (ui32 i = 0; i < Children_.size(); ++i) {
- Children_[i]->Add(elements[i]);
- }
- } else {
- for (ui32 i = 0; i < Children_.size(); ++i) {
- auto element = value.GetElement(i);
- Children_[i]->Add(element);
- }
- }
- }
- void AddToChildren(TBlockItem value) {
- auto elements = value.AsTuple();
- for (ui32 i = 0; i < Children_.size(); ++i) {
- Children_[i]->Add(elements[i]);
- }
- }
- void AddToChildren(TInputBuffer& input) {
- for (ui32 i = 0; i < Children_.size(); ++i) {
- Children_[i]->Add(input);
- }
- }
- void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) {
- Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
- for (size_t i = 0; i < Children_.size(); ++i) {
- Children_[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length);
- }
- }
- void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
- Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
- for (size_t i = 0; i < Children_.size(); ++i) {
- Children_[i]->AddMany(*array.child_data[i], beginIndex, count);
- }
- }
- void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
- Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
- for (size_t i = 0; i < Children_.size(); ++i) {
- Children_[i]->AddMany(*array.child_data[i], indexes, count);
- }
- }
- void BuildChildrenTree(bool finish, std::vector<TArrayBuilderBase::TBlockArrayTree::Ptr>& resultChildren) {
- resultChildren.reserve(Children_.size());
- for (ui32 i = 0; i < Children_.size(); ++i) {
- resultChildren.emplace_back(Children_[i]->BuildTree(finish));
- }
- }
- private:
- TVector<std::unique_ptr<TArrayBuilderBase>> Children_;
- };
- template<typename TDate, bool Nullable>
- class TTzDateArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTzDateArrayBuilder<TDate, Nullable>> {
- using TBase = TTupleArrayBuilderBase<Nullable, TTzDateArrayBuilder<TDate, Nullable>>;
- using TParams = TArrayBuilderBase::TParams;
- using TDateLayout = typename TDataType<TDate>::TLayout;
- static constexpr auto DataSlot = TDataType<TDate>::Slot;
- public:
- TTzDateArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
- : TBase(typeInfoHelper, type, pool, maxLen, params)
- , DateBuilder_(typeInfoHelper, GetArrowType(typeInfoHelper, type), pool, maxLen, params)
- , TimezoneBuilder_(typeInfoHelper, arrow::uint16(), pool, maxLen, params)
- {
- }
- void AddToChildrenDefault() {
- DateBuilder_.AddDefault();
- TimezoneBuilder_.AddDefault();
- }
- void AddToChildren(NUdf::TUnboxedValuePod value) {
- DateBuilder_.Add(value);
- TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId()));
- }
- void AddToChildren(TBlockItem value) {
- DateBuilder_.Add(value);
- TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId()));
- }
- void AddToChildren(TInputBuffer& input) {
- DateBuilder_.Add(input);
- TimezoneBuilder_.Add(input);
- }
- void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) {
- Y_ABORT_UNLESS(array.child_data.size() == 2);
- DateBuilder_.AddMany(*array.child_data[0], popCount, sparseBitmap, array.length);
- TimezoneBuilder_.AddMany(*array.child_data[1], popCount, sparseBitmap, array.length);
- }
- void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
- Y_ABORT_UNLESS(array.child_data.size() == 2);
- DateBuilder_.AddMany(*array.child_data[0], beginIndex, count);
- TimezoneBuilder_.AddMany(*array.child_data[1], beginIndex, count);
- }
- void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
- Y_ABORT_UNLESS(array.child_data.size() == 2);
- DateBuilder_.AddMany(*array.child_data[0], indexes, count);
- TimezoneBuilder_.AddMany(*array.child_data[1], indexes, count);
- }
- void BuildChildrenTree(bool finish, std::vector<TArrayBuilderBase::TBlockArrayTree::Ptr>& resultChildren) {
- resultChildren.emplace_back(DateBuilder_.BuildTree(finish));
- resultChildren.emplace_back(TimezoneBuilder_.BuildTree(finish));
- }
- private:
- TFixedSizeArrayBuilder<TDateLayout, false> DateBuilder_;
- TFixedSizeArrayBuilder<ui16, false> TimezoneBuilder_;
- };
- class TExternalOptionalArrayBuilder final : public TArrayBuilderBase {
- public:
- TExternalOptionalArrayBuilder(std::unique_ptr<TArrayBuilderBase>&& inner, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool,
- size_t maxLen, const TParams& params = {})
- : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params)
- , Inner(std::move(inner))
- {
- Reserve();
- }
- void DoAdd(NUdf::TUnboxedValuePod value) final {
- if (!value) {
- NullBuilder->UnsafeAppend(0);
- Inner->AddDefault();
- return;
- }
- NullBuilder->UnsafeAppend(1);
- Inner->Add(value.GetOptionalValue());
- }
- void DoAdd(TBlockItem value) final {
- if (!value) {
- NullBuilder->UnsafeAppend(0);
- Inner->AddDefault();
- return;
- }
- NullBuilder->UnsafeAppend(1);
- Inner->Add(value.GetOptionalValue());
- }
- void DoAdd(TInputBuffer& input) final {
- if (!input.PopChar()) {
- NullBuilder->UnsafeAppend(0);
- Inner->AddDefault();
- return;
- }
- NullBuilder->UnsafeAppend(1);
- Inner->Add(input);
- }
- void DoAddDefault() final {
- NullBuilder->UnsafeAppend(1);
- Inner->AddDefault();
- }
- void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
- Y_ABORT_UNLESS(!array.buffers.empty());
- Y_ABORT_UNLESS(array.child_data.size() == 1);
- if (array.buffers.front()) {
- ui8* dstBitmap = NullBuilder->End();
- CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
- NullBuilder->UnsafeAdvance(popCount);
- } else {
- NullBuilder->UnsafeAppend(popCount, 1);
- }
- Inner->AddMany(*array.child_data[0], popCount, sparseBitmap, array.length);
- }
- void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
- Y_ABORT_UNLESS(!array.buffers.empty());
- Y_ABORT_UNLESS(array.child_data.size() == 1);
- for (ui64 i = beginIndex; i < beginIndex + count; ++i) {
- NullBuilder->UnsafeAppend(!IsNull(array, i));
- }
- Inner->AddMany(*array.child_data[0], beginIndex, count);
- }
- void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
- Y_ABORT_UNLESS(!array.buffers.empty());
- Y_ABORT_UNLESS(array.child_data.size() == 1);
- for (size_t i = 0; i < count; ++i) {
- NullBuilder->UnsafeAppend(!IsNull(array, indexes[i]));
- }
- Inner->AddMany(*array.child_data[0], indexes, count);
- }
- TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
- TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
- std::shared_ptr<arrow::Buffer> nullBitmap;
- const size_t length = GetCurrLen();
- Y_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length");
- nullBitmap = NullBuilder->Finish();
- nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
- Y_ABORT_UNLESS(length);
- result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap }));
- result->Children.emplace_back(Inner->BuildTree(finish));
- if (!finish) {
- Reserve();
- }
- return result;
- }
- private:
- size_t DoReserve() final {
- NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool, MinFillPercentage);
- NullBuilder->Reserve(MaxLen + 1);
- return NullBuilder->Capacity();
- }
- private:
- std::unique_ptr<TArrayBuilderBase> Inner;
- std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
- };
- using TArrayBuilderParams = TArrayBuilderBase::TParams;
- struct TBuilderTraits {
- using TResult = TArrayBuilderBase;
- template <bool Nullable>
- using TTuple = TTupleArrayBuilder<Nullable>;
- template <typename T, bool Nullable>
- using TFixedSize = TFixedSizeArrayBuilder<T, Nullable>;
- template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot TOriginal>
- using TStrings = TStringArrayBuilder<TStringType, Nullable>;
- using TExtOptional = TExternalOptionalArrayBuilder;
- template<bool Nullable>
- using TResource = TResourceArrayBuilder<Nullable>;
- template<typename TTzDate, bool Nullable>
- using TTzDateReader = TTzDateArrayBuilder<TTzDate, Nullable>;
- constexpr static bool PassType = true;
- static std::unique_ptr<TResult> MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) {
- if (desc.PassByValue) {
- return std::make_unique<TFixedSize<ui64, true>>(type, typeInfoHelper, pool, maxLen, params);
- } else {
- if (desc.Typelen == -1) {
- auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Text>>(type, typeInfoHelper, pool, maxLen, params);
- ret->SetPgBuilder(pgBuilder, desc.Typelen);
- return ret;
- } else if (desc.Typelen == -2) {
- auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::CString>>(type, typeInfoHelper, pool, maxLen, params);
- ret->SetPgBuilder(pgBuilder, desc.Typelen);
- return ret;
- } else {
- auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Fixed>>(type, typeInfoHelper, pool, maxLen, params);
- ret->SetPgBuilder(pgBuilder, desc.Typelen);
- return ret;
- }
- }
- }
- static std::unique_ptr<TResult> MakeResource(bool isOptional, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) {
- if (isOptional) {
- return std::make_unique<TResource<true>>(type, typeInfoHelper, pool, maxLen, params);
- } else {
- return std::make_unique<TResource<false>>(type, typeInfoHelper, pool, maxLen, params);
- }
- }
- template<typename TTzDate>
- static std::unique_ptr<TResult> MakeTzDate(bool isOptional, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) {
- if (isOptional) {
- return std::make_unique<TTzDateReader<TTzDate, true>>(type, typeInfoHelper, pool, maxLen, params);
- } else {
- return std::make_unique<TTzDateReader<TTzDate, false>>(type, typeInfoHelper, pool, maxLen, params);
- }
- }
- };
- inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
- const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
- size_t maxBlockLength, const IPgBuilder* pgBuilder)
- {
- return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, TArrayBuilderParams {});
- }
- inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
- const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
- size_t maxBlockLength, const IPgBuilder* pgBuilder, size_t* totalAllocated)
- {
- return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, TArrayBuilderParams {.TotalAllocated = totalAllocated});
- }
- inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
- const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
- size_t maxBlockLength, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params)
- {
- return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, params);
- }
- inline std::unique_ptr<IScalarBuilder> MakeScalarBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
- Y_UNUSED(typeInfoHelper);
- Y_UNUSED(type);
- Y_ENSURE(false);
- return nullptr;
- }
- } // namespace NUdf
- } // namespace NYql
|