#include "mkql_block_transport.h" #include "mkql_block_builder.h" #include #include #include #include namespace NKikimr::NMiniKQL { namespace { using NYql::TChunkedBuffer; TChunkedBuffer MakeChunkedBufferAndUntrack(const std::shared_ptr& owner, const char* data, size_t size) { MKQLArrowUntrack(owner->data()); return TChunkedBuffer(TStringBuf{data, size}, owner); } class TOwnedArrowBuffer : public arrow::Buffer { public: TOwnedArrowBuffer(TStringBuf span, const std::shared_ptr& owner) : arrow::Buffer(reinterpret_cast(span.data()), span.size()) , Owner_(owner) { } private: const std::shared_ptr Owner_; }; std::shared_ptr MakeEmptyBuffer() { return std::make_shared(nullptr, 0); } bool HasArrrowAlignment(const void* buf) { return AlignUp(buf, NYql::NUdf::ArrowMemoryAlignment) == buf; } std::shared_ptr MakeZeroBuffer(size_t byteLen) { using namespace NYql::NUdf; if (!byteLen) { return MakeEmptyBuffer(); } constexpr size_t NullWordCount = (MaxBlockSizeInBytes + sizeof(ui64) - 1) / sizeof(ui64); constexpr size_t ExtraAlignWords = (ArrowMemoryAlignment > sizeof(ui64)) ? (ArrowMemoryAlignment / sizeof(ui64) - 1) : 0; static const ui64 nulls[NullWordCount + ExtraAlignWords] = { 0 }; // round all buffer length to 64 bytes size_t capacity = AlignUp(byteLen, size_t(64)); if (capacity <= NullWordCount * sizeof(ui64)) { return std::make_shared(AlignUp(reinterpret_cast(nulls), ArrowMemoryAlignment), byteLen); } auto result = AllocateResizableBuffer(byteLen, GetYqlMemoryPool()); ARROW_OK(result->Resize(byteLen)); std::memset(result->mutable_data(), 0, byteLen); return result; } std::shared_ptr MakeZeroBitmap(size_t bitCount) { // align up 8 byte boundary size_t byteCount = AlignUp(bitCount, size_t(64)) >> 3; return MakeZeroBuffer(byteCount); } bool NeedStoreBitmap(const arrow::ArrayData& data) { auto nullCount = data.GetNullCount(); return nullCount != 0 && nullCount != data.length; } void StoreNullsSizes(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) { metaSink(data.GetNullCount()); if (!NeedStoreBitmap(data)) { metaSink(0); return; } const ui64 desiredOffset = data.offset % 8; size_t nullBytes = AlignUp((size_t)data.length + desiredOffset, size_t(8)) >> 3; metaSink(nullBytes); } void LoadNullsSizes(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe& nullsCount, TMaybe& nullsSize) { YQL_ENSURE(!nullsCount.Defined() && !nullsSize.Defined(), "Attempt to load null sizes twice (most likely LoadArray() is not called)"); nullsCount = metaSource(); nullsSize = metaSource(); } void StoreNulls(const arrow::ArrayData& data, TChunkedBuffer& dst) { if (!NeedStoreBitmap(data)) { return; } const ui64 desiredOffset = data.offset % 8; size_t nullBytes = AlignUp((size_t)data.length + desiredOffset, size_t(8)) >> 3; YQL_ENSURE(desiredOffset <= (size_t)data.offset); YQL_ENSURE((data.offset - desiredOffset) % 8 == 0); const char* nulls = data.GetValues(0, 0) + (data.offset - desiredOffset) / 8; dst.Append(MakeChunkedBufferAndUntrack(data.buffers[0], nulls, nullBytes)); } void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe& result) { YQL_ENSURE(!result.Defined(), "Attempt to load buffer size twice (most likely LoadArray() is not called)"); result = metaSource(); } std::shared_ptr LoadBuffer(TChunkedBuffer& source, TMaybe size) { using namespace NYql::NUdf; YQL_ENSURE(size.Defined(), "Buffer size is not loaded"); if (!*size) { return MakeEmptyBuffer(); } size_t toAppend = *size; const TChunkedBuffer::TChunk& front = source.Front(); if (front.Buf.size() >= toAppend && HasArrrowAlignment(front.Buf.data())) { TStringBuf data = source.Front().Buf; data.Trunc(toAppend); auto result = std::make_shared(data, source.Front().Owner); source.Erase(toAppend); return result; } auto result = AllocateResizableBuffer(toAppend, NYql::NUdf::GetYqlMemoryPool()); ARROW_OK(result->Resize((int64_t)toAppend)); uint8_t* dst = result->mutable_data(); while (toAppend) { const TChunkedBuffer::TChunk& front = source.Front(); TStringBuf buf = front.Buf; YQL_ENSURE(!buf.empty(), "Premature end of buffer"); size_t chunk = std::min(toAppend, buf.size()); std::memcpy(dst, buf.data(), chunk); dst += chunk; toAppend -= chunk; source.Erase(chunk); } return result; } std::shared_ptr LoadNullsBitmap(TChunkedBuffer& source, TMaybe nullCount, TMaybe bitmapSize) { YQL_ENSURE(nullCount.Defined(), "Bitmap null count is not loaded"); YQL_ENSURE(bitmapSize.Defined(), "Bitmap size is not loaded"); if (*nullCount == 0) { YQL_ENSURE(!*bitmapSize); return {}; } YQL_ENSURE(*bitmapSize); return LoadBuffer(source, bitmapSize); } class TBlockDeserializerBase : public IBlockDeserializer { public: TBlockDeserializerBase() = default; virtual void SetArrowType(const std::shared_ptr& type) { ArrowType_ = type; } void LoadMetadata(const TMetadataSource& metaSource) final { if (IsNullable()) { LoadNullsSizes(metaSource, NullsCount_, NullsSize_); } DoLoadMetadata(metaSource); } virtual std::shared_ptr LoadArray(TChunkedBuffer& src, ui64 blockLen, ui64 offset) final { YQL_ENSURE(blockLen > 0, "Should be handled earlier"); std::shared_ptr nulls; i64 nullsCount = 0; if (IsNullable()) { YQL_ENSURE(NullsCount_.Defined() && NullsSize_.Defined(), "Nulls metadata should be loaded"); if (*NullsCount_ != 0) { if (*NullsSize_ == 0) { auto result = MakeDefaultValue(blockLen, offset); ResetMetadata(); return result; } nulls = LoadNullsBitmap(src, NullsCount_, NullsSize_); nullsCount = *NullsCount_; } } auto result = DoLoadArray(src, nulls, nullsCount, blockLen, offset); ResetMetadata(); return result; } void ResetMetadata() { NullsCount_ = NullsSize_ = {}; DoResetMetadata(); } std::shared_ptr MakeDefaultValue(ui64 blockLen, ui64 offset) const { std::shared_ptr nulls; i64 nullsCount = 0; if (IsNullable()) { nulls = MakeZeroBitmap(blockLen + offset); nullsCount = blockLen; } return DoMakeDefaultValue(nulls, nullsCount, blockLen, offset); } protected: virtual void DoLoadMetadata(const TMetadataSource& metaSource) = 0; virtual void DoResetMetadata() = 0; virtual bool IsNullable() const = 0; virtual std::shared_ptr DoMakeDefaultValue(const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const = 0; virtual std::shared_ptr DoLoadArray(TChunkedBuffer& src, const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) = 0; std::shared_ptr ArrowType_; TMaybe NullsCount_; TMaybe NullsSize_; }; template class TFixedSizeBlockSerializer final : public IBlockSerializer { public: TFixedSizeBlockSerializer() = default; size_t ArrayMetadataCount() const final { return Nullable ? 3 : 1; } void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { if constexpr (Nullable) { StoreNullsSizes(data, metaSink); if (data.GetNullCount() == data.length) { metaSink(0); return; } } const ui64 desiredOffset = data.offset % 8; size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize; metaSink(dataBytes); } void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final { if constexpr (Nullable) { StoreNulls(data, dst); if (data.GetNullCount() == data.length) { return; } } const ui64 desiredOffset = data.offset % 8; const char* buf = reinterpret_cast(data.buffers[1]->data()) + (data.offset - desiredOffset) * ObjectSize; size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize; dst.Append(MakeChunkedBufferAndUntrack(data.buffers[1], buf, dataBytes)); } }; template class TFixedSizeBlockDeserializer final : public TBlockDeserializerBase { public: TFixedSizeBlockDeserializer() = default; private: void DoLoadMetadata(const TMetadataSource& metaSource) final { LoadBufferSize(metaSource, DataSize_); } bool IsNullable() const final { return Nullable; } std::shared_ptr DoMakeDefaultValue(const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { auto data = MakeZeroBuffer((blockLen + offset) * ObjectSize); return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, data }, nullsCount, offset); } std::shared_ptr DoLoadArray(TChunkedBuffer& src, const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { auto data = LoadBuffer(src, DataSize_); return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, data}, nullsCount, offset); } void DoResetMetadata() final { DataSize_ = {}; } TMaybe DataSize_; }; template class TStringBlockSerializer final : public IBlockSerializer { using TOffset = typename TStringType::offset_type; public: TStringBlockSerializer() = default; private: size_t ArrayMetadataCount() const final { return Nullable ? 4 : 2; } void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { if constexpr (Nullable) { StoreNullsSizes(data, metaSink); if (data.GetNullCount() == data.length) { metaSink(0); metaSink(0); return; } } const ui64 desiredOffset = data.offset % 8; size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset); metaSink(offsetsSize); metaSink(data.buffers[2]->size()); } void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final { if constexpr (Nullable) { StoreNulls(data, dst); if (data.GetNullCount() == data.length) { return; } } const ui64 desiredOffset = data.offset % 8; const char* offsets = reinterpret_cast(data.GetValues(1) - desiredOffset); size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset); dst.Append(MakeChunkedBufferAndUntrack(data.buffers[1], offsets, offsetsSize)); const char* mainData = reinterpret_cast(data.buffers[2]->data()); size_t mainSize = data.buffers[2]->size(); dst.Append(MakeChunkedBufferAndUntrack(data.buffers[2], mainData, mainSize)); } }; template class TStringBlockDeserializer final : public TBlockDeserializerBase { using TOffset = typename TStringType::offset_type; public: TStringBlockDeserializer() = default; private: void DoLoadMetadata(const TMetadataSource& metaSource) final { LoadBufferSize(metaSource, OffsetsSize_); LoadBufferSize(metaSource, DataSize_); } std::shared_ptr DoMakeDefaultValue(const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { auto offsets = MakeZeroBuffer((blockLen + 1 + offset) * sizeof(TOffset)); auto data = MakeEmptyBuffer(); return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, offsets, data }, nullsCount, offset); } std::shared_ptr DoLoadArray(TChunkedBuffer& src, const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { auto offsets = LoadBuffer(src, OffsetsSize_); auto data = LoadBuffer(src, DataSize_); return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, offsets, data }, nullsCount, offset); } bool IsNullable() const final { return Nullable; } void DoResetMetadata() final { OffsetsSize_ = DataSize_ = {}; } TMaybe OffsetsSize_; TMaybe DataSize_; }; class TExtOptionalBlockSerializer final : public IBlockSerializer { public: explicit TExtOptionalBlockSerializer(std::unique_ptr&& inner) : Inner_(std::move(inner)) { } private: size_t ArrayMetadataCount() const final { return 2 + Inner_->ArrayMetadataCount(); } void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { StoreNullsSizes(data, metaSink); if (data.GetNullCount() == data.length) { auto innerCount = Inner_->ArrayMetadataCount(); for (size_t i = 0; i < innerCount; ++i) { metaSink(0); } } else { Inner_->StoreMetadata(*data.child_data[0], metaSink); } } void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final { StoreNulls(data, dst); if (data.GetNullCount() != data.length) { Inner_->StoreArray(*data.child_data[0], dst); } } const std::unique_ptr Inner_; }; class TExtOptionalBlockDeserializer final : public TBlockDeserializerBase { public: explicit TExtOptionalBlockDeserializer(std::unique_ptr&& inner) : Inner_(std::move(inner)) { } private: void DoLoadMetadata(const TMetadataSource& metaSource) final { Inner_->LoadMetadata(metaSource); } std::shared_ptr DoMakeDefaultValue(const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->MakeDefaultValue(blockLen, offset) }, nullsCount, offset); } std::shared_ptr DoLoadArray(TChunkedBuffer& src, const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->LoadArray(src, blockLen, offset) }, nullsCount, offset); } bool IsNullable() const final { return true; } void DoResetMetadata() final { Inner_->ResetMetadata(); } void SetArrowType(const std::shared_ptr& type) final { ArrowType_ = type; YQL_ENSURE(type->fields().size() == 1); Inner_->SetArrowType(type->fields().front()->type()); } const std::unique_ptr Inner_; }; template class TTupleBlockSerializerBase : public IBlockSerializer { size_t ArrayMetadataCount() const final { size_t result = static_cast(this)->GetChildrenMetaCount(); if constexpr (Nullable) { result += 2; } return result; } void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { if constexpr (Nullable) { StoreNullsSizes(data, metaSink); } if (data.GetNullCount() == data.length) { auto childCount = static_cast(this)->GetChildrenMetaCount(); for (size_t i = 0; i < childCount; ++i) { metaSink(0); } } else { static_cast(this)->StoreChildrenMetadata(data.child_data, metaSink); } } void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final { if constexpr (Nullable) { StoreNulls(data, dst); } if (data.GetNullCount() != data.length) { static_cast(this)->StoreChildrenArrays(data.child_data, dst); } } }; template class TTupleBlockSerializer final : public TTupleBlockSerializerBase> { public: TTupleBlockSerializer(TVector>&& children) : Children_(std::move(children)) {} size_t GetChildrenMetaCount() const { size_t result = 0; for (const auto& child : Children_) { result += child->ArrayMetadataCount(); } return result; } void StoreChildrenMetadata(const std::vector>& child_data, const IBlockSerializer::TMetadataSink& metaSink) const { for (size_t i = 0; i < Children_.size(); ++i) { Children_[i]->StoreMetadata(*child_data[i], metaSink); } } void StoreChildrenArrays(const std::vector>& child_data, TChunkedBuffer& dst) const { for (size_t i = 0; i < Children_.size(); ++i) { Children_[i]->StoreArray(*child_data[i], dst); } } private: const TVector> Children_; }; template class TTzDateBlockSerializer final : public TTupleBlockSerializerBase> { public: TTzDateBlockSerializer() = default; size_t GetChildrenMetaCount() const { return DateSerialiser_.ArrayMetadataCount() + TzSerialiser_.ArrayMetadataCount(); } void StoreChildrenMetadata(const std::vector>& child_data, const IBlockSerializer::TMetadataSink& metaSink) const { DateSerialiser_.StoreMetadata(*child_data[0], metaSink); TzSerialiser_.StoreMetadata(*child_data[1], metaSink); } void StoreChildrenArrays(const std::vector>& child_data, TChunkedBuffer& dst) const { DateSerialiser_.StoreArray(*child_data[0], dst); TzSerialiser_.StoreArray(*child_data[1], dst); } private: using TDateLayout = typename NUdf::TDataType::TLayout; TFixedSizeBlockSerializer DateSerialiser_; TFixedSizeBlockSerializer TzSerialiser_; }; template class TTupleBlockDeserializer final : public TBlockDeserializerBase { public: explicit TTupleBlockDeserializer(TVector>&& children) : Children_(std::move(children)) { } private: void DoLoadMetadata(const TMetadataSource& metaSource) final { for (auto& child : Children_) { child->LoadMetadata(metaSource); } } std::shared_ptr DoMakeDefaultValue(const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { std::vector> childData; for (auto& child : Children_) { childData.emplace_back(child->MakeDefaultValue(blockLen, offset)); } return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); } std::shared_ptr DoLoadArray(TChunkedBuffer& src, const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { std::vector> childData; for (auto& child : Children_) { childData.emplace_back(child->LoadArray(src, blockLen, offset)); } return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); } void DoResetMetadata() final { for (auto& child : Children_) { child->ResetMetadata(); } } bool IsNullable() const final { return Nullable; } void SetArrowType(const std::shared_ptr& type) final { ArrowType_ = type; YQL_ENSURE(type->fields().size() == Children_.size()); for (size_t i = 0; i < Children_.size(); ++i) { Children_[i]->SetArrowType(type->field(i)->type()); } } const TVector> Children_; }; template class TTzDateBlockDeserializer final : public TBlockDeserializerBase { public: TTzDateBlockDeserializer() = default; private: void DoLoadMetadata(const TMetadataSource& metaSource) final { DateDeserialiser_.LoadMetadata(metaSource); TzDeserialiser_.LoadMetadata(metaSource); } std::shared_ptr DoMakeDefaultValue(const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { std::vector> childData; childData.emplace_back(DateDeserialiser_.MakeDefaultValue(blockLen, offset)); childData.emplace_back(TzDeserialiser_.MakeDefaultValue(blockLen, offset)); return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); } std::shared_ptr DoLoadArray(TChunkedBuffer& src, const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { std::vector> childData; childData.emplace_back(DateDeserialiser_.LoadArray(src, blockLen, offset)); childData.emplace_back(TzDeserialiser_.LoadArray(src, blockLen, offset)); return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); } void DoResetMetadata() final { DateDeserialiser_.ResetMetadata(); TzDeserialiser_.ResetMetadata(); } bool IsNullable() const final { return Nullable; } void SetArrowType(const std::shared_ptr& type) final { YQL_ENSURE(type->fields().size() == 2); ArrowType_ = type; DateDeserialiser_.SetArrowType(type->field(0)->type()); TzDeserialiser_.SetArrowType(type->field(1)->type()); } using TDateLayout = typename NUdf::TDataType::TLayout; TFixedSizeBlockDeserializer DateDeserialiser_; TFixedSizeBlockDeserializer TzDeserialiser_; }; struct TSerializerTraits { using TResult = IBlockSerializer; template using TTuple = TTupleBlockSerializer; template using TFixedSize = TFixedSizeBlockSerializer; template using TStrings = TStringBlockSerializer; using TExtOptional = TExtOptionalBlockSerializer; template using TTzDate = TTzDateBlockSerializer; constexpr static bool PassType = false; static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); if (desc.PassByValue) { return std::make_unique>(); } return std::make_unique>(); } static std::unique_ptr MakeResource(bool isOptional) { Y_UNUSED(isOptional); ythrow yexception() << "Serializer not implemented for block resources"; } template static std::unique_ptr MakeTzDate(bool isOptional) { if (isOptional) { return std::make_unique>(); } else { return std::make_unique>(); } } }; struct TDeserializerTraits { using TResult = TBlockDeserializerBase; template using TTuple = TTupleBlockDeserializer; template using TFixedSize = TFixedSizeBlockDeserializer; template using TStrings = TStringBlockDeserializer; using TExtOptional = TExtOptionalBlockDeserializer; template using TTzDate = TTzDateBlockDeserializer; constexpr static bool PassType = false; static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); if (desc.PassByValue) { return std::make_unique>(); } return std::make_unique>(); } static std::unique_ptr MakeResource(bool isOptional) { Y_UNUSED(isOptional); ythrow yexception() << "Deserializer not implemented for block resources"; } template static std::unique_ptr MakeTzDate(bool isOptional) { if (isOptional) { return std::make_unique>(); } else { return std::make_unique>(); } } }; } // namespace std::unique_ptr MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) { return NYql::NUdf::DispatchByArrowTraits(typeInfoHelper, type, nullptr); } std::unique_ptr MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) { std::unique_ptr result = NYql::NUdf::DispatchByArrowTraits(typeInfoHelper, type, nullptr); result->SetArrowType(NYql::NUdf::GetArrowType(typeInfoHelper, type)); return std::move(result); } } // namespace NKikimr::NMiniKQL