#pragma once #include "block_item.h" #include "block_io_buffer.h" #include "dispatch_traits.h" #include "util.h" #include #include namespace NYql { namespace NUdf { class IBlockReader : private TNonCopyable { public: virtual ~IBlockReader() = default; // result will reference to Array/Scalar internals and will be valid until next call to GetItem/GetScalarItem virtual TBlockItem GetItem(const arrow::ArrayData& data, size_t index) = 0; virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0; virtual ui64 GetDataWeight(const arrow::ArrayData& data) const = 0; virtual ui64 GetDataWeight(TBlockItem item) const = 0; virtual ui64 GetDefaultValueWeight() const = 0; virtual void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const = 0; virtual void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const = 0; }; struct TBlockItemSerializeProps { TMaybe MaxSize = 0; // maximum size each block item can occupy in TOutputBuffer // (will be undefined for dynamic object like string) bool IsFixed = true; // true if each block item takes fixed size }; template class TFixedSizeBlockReaderBase : public IBlockReader { public: TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { if constexpr (Nullable) { if (IsNull(data, index)) { return {}; } } return static_cast(this)->MakeBlockItem(data.GetValues(1)[index]); } TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { using namespace arrow::internal; if constexpr (Nullable) { if (!scalar.is_valid) { return {}; } } if constexpr(std::is_same_v) { auto& fixedScalar = checked_cast(scalar); T value; memcpy((void*)&value, fixedScalar.value->data(), sizeof(T)); return static_cast(this)->MakeBlockItem(value); } else { return static_cast(this)->MakeBlockItem( *static_cast(checked_cast(scalar).data()) ); } } ui64 GetDataWeight(const arrow::ArrayData& data) const final { if constexpr (Nullable) { return (1 + sizeof(T)) * data.length; } return sizeof(T) * data.length; } ui64 GetDataWeight(TBlockItem item) const final { Y_UNUSED(item); return GetDefaultValueWeight(); } ui64 GetDefaultValueWeight() const final { if constexpr (Nullable) { return 1 + sizeof(T); } return sizeof(T); } void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { if constexpr (Nullable) { if (IsNull(data, index)) { return out.PushChar(0); } out.PushChar(1); } out.PushNumber(data.GetValues(1)[index]); } void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final { if constexpr (Nullable) { if (!scalar.is_valid) { return out.PushChar(0); } out.PushChar(1); } if constexpr(std::is_same_v) { auto& fixedScalar = arrow::internal::checked_cast(scalar); T value; memcpy((void*)&value, fixedScalar.value->data(), sizeof(T)); out.PushNumber(value); } else { out.PushNumber(*static_cast(arrow::internal::checked_cast(scalar).data())); } } }; template class TFixedSizeBlockReader : public TFixedSizeBlockReaderBase> { public: TBlockItem MakeBlockItem(const T& item) const { return TBlockItem(item); } }; template class TResourceBlockReader : public TFixedSizeBlockReaderBase> { public: TBlockItem MakeBlockItem(const TUnboxedValuePod& pod) const { TBlockItem item; std::memcpy(item.GetRawPtr(), pod.GetRawPtr(), sizeof(TBlockItem)); return item; } }; template class TStringBlockReader final : public IBlockReader { public: using TOffset = typename TStringType::offset_type; TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { Y_DEBUG_ABORT_UNLESS(data.buffers.size() == 3); if constexpr (Nullable) { if (IsNull(data, index)) { return {}; } } const TOffset* offsets = data.GetValues(1); const char* strData = data.GetValues(2, 0); std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]); return TBlockItem(str); } TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { if constexpr (Nullable) { if (!scalar.is_valid) { return {}; } } auto buffer = arrow::internal::checked_cast(scalar).value; std::string_view str(reinterpret_cast(buffer->data()), buffer->size()); return TBlockItem(str); } ui64 GetDataWeight(const arrow::ArrayData& data) const final { ui64 size = 0; if constexpr (Nullable) { size += data.length; } size += data.buffers[2] ? data.buffers[2]->size() : 0; return size; } ui64 GetDataWeight(TBlockItem item) const final { if constexpr (Nullable) { return 1 + (item ? item.AsStringRef().Size() : 0); } return item.AsStringRef().Size(); } ui64 GetDefaultValueWeight() const final { if constexpr (Nullable) { return 1; } return 0; } void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { Y_DEBUG_ABORT_UNLESS(data.buffers.size() == 3); if constexpr (Nullable) { if (IsNull(data, index)) { return out.PushChar(0); } out.PushChar(1); } const TOffset* offsets = data.GetValues(1); const char* strData = data.GetValues(2, 0); std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]); out.PushString(str); } void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final { if constexpr (Nullable) { if (!scalar.is_valid) { return out.PushChar(0); } out.PushChar(1); } auto buffer = arrow::internal::checked_cast(scalar).value; std::string_view str(reinterpret_cast(buffer->data()), buffer->size()); out.PushString(str); } }; template class TTupleBlockReaderBase : public IBlockReader { public: TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { if constexpr (Nullable) { if (IsNull(data, index)) { return {}; } } return static_cast(this)->GetChildrenItems(data, index); } TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { if constexpr (Nullable) { if (!scalar.is_valid) { return {}; } } const auto& structScalar = arrow::internal::checked_cast(scalar); return static_cast(this)->GetChildrenScalarItems(structScalar); } ui64 GetDataWeight(const arrow::ArrayData& data) const final { ui64 size = 0; if constexpr (Nullable) { size += data.length; } size += static_cast(this)->GetChildrenDataWeight(data); return size; } ui64 GetDataWeight(TBlockItem item) const final { return static_cast(this)->GetDataWeightImpl(item); } ui64 GetDefaultValueWeight() const final { ui64 size = 0; if constexpr (Nullable) { size = 1; } size += static_cast(this)->GetChildrenDefaultDataWeight(); return size; } void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { if constexpr (Nullable) { if (IsNull(data, index)) { return out.PushChar(0); } out.PushChar(1); } static_cast(this)->SaveChildrenItems(data, index, out); } void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final { if constexpr (Nullable) { if (!scalar.is_valid) { return out.PushChar(0); } out.PushChar(1); } const auto& structScalar = arrow::internal::checked_cast(scalar); static_cast(this)->SaveChildrenScalarItems(structScalar, out); } }; template class TTupleBlockReader final : public TTupleBlockReaderBase> { public: TTupleBlockReader(TVector>&& children) : Children(std::move(children)) , Items(Children.size()) {} TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) { for (ui32 i = 0; i < Children.size(); ++i) { Items[i] = Children[i]->GetItem(*data.child_data[i], index); } return TBlockItem(Items.data()); } TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) { for (ui32 i = 0; i < Children.size(); ++i) { Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]); } return TBlockItem(Items.data()); } size_t GetDataWeightImpl(const TBlockItem& item) const { const TBlockItem* items = nullptr; ui64 size = 0; if constexpr (Nullable) { if (!item) { return this->GetDefaultValueWeight(); } size = 1; items = item.GetOptionalValue().GetElements(); } else { items = item.GetElements(); } for (ui32 i = 0; i < Children.size(); ++i) { size += Children[i]->GetDataWeight(items[i]); } return size; } size_t GetChildrenDataWeight(const arrow::ArrayData& data) const { size_t size = 0; for (ui32 i = 0; i < Children.size(); ++i) { size += Children[i]->GetDataWeight(*data.child_data[i]); } return size; } size_t GetChildrenDefaultDataWeight() const { size_t size = 0; for (ui32 i = 0; i < Children.size(); ++i) { size += Children[i]->GetDefaultValueWeight(); } return size; } void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const { for (ui32 i = 0; i < Children.size(); ++i) { Children[i]->SaveItem(*data.child_data[i], index, out); } } void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { for (ui32 i = 0; i < Children.size(); ++i) { Children[i]->SaveScalarItem(*structScalar.value[i], out); } } private: const TVector> Children; TVector Items; }; template class TTzDateBlockReader final : public TTupleBlockReaderBase> { public: TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) { Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2); TBlockItem item {DateReader_.GetItem(*data.child_data[0], index)}; item.SetTimezoneId(TimezoneReader_.GetItem(*data.child_data[1], index).Get()); return item; } TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) { Y_DEBUG_ABORT_UNLESS(structScalar.value.size() == 2); TBlockItem item {DateReader_.GetScalarItem(*structScalar.value[0])}; item.SetTimezoneId(TimezoneReader_.GetScalarItem(*structScalar.value[1]).Get()); return item; } size_t GetChildrenDataWeight(const arrow::ArrayData& data) const { Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2); size_t size = 0; size += DateReader_.GetDataWeight(*data.child_data[0]); size += TimezoneReader_.GetDataWeight(*data.child_data[1]); return size; } size_t GetDataWeightImpl(const TBlockItem& item) const { Y_UNUSED(item); return GetChildrenDefaultDataWeight(); } size_t GetChildrenDefaultDataWeight() const { ui64 size = 0; if constexpr (Nullable) { size = 1; } size += DateReader_.GetDefaultValueWeight(); size += TimezoneReader_.GetDefaultValueWeight(); return size; } void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const { DateReader_.SaveItem(*data.child_data[0], index, out); TimezoneReader_.SaveItem(*data.child_data[1], index, out); } void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { DateReader_.SaveScalarItem(*structScalar.value[0], out); TimezoneReader_.SaveScalarItem(*structScalar.value[1], out); } private: TFixedSizeBlockReader::TLayout, /* Nullable */false> DateReader_; TFixedSizeBlockReader TimezoneReader_; }; class TExternalOptionalBlockReader final : public IBlockReader { public: TExternalOptionalBlockReader(std::unique_ptr&& inner) : Inner(std::move(inner)) {} TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { if (IsNull(data, index)) { return {}; } return Inner->GetItem(*data.child_data.front(), index).MakeOptional(); } TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { if (!scalar.is_valid) { return {}; } const auto& structScalar = arrow::internal::checked_cast(scalar); return Inner->GetScalarItem(*structScalar.value.front()).MakeOptional(); } ui64 GetDataWeight(const arrow::ArrayData& data) const final { return data.length + Inner->GetDataWeight(*data.child_data.front()); } ui64 GetDataWeight(TBlockItem item) const final { if (!item) { return GetDefaultValueWeight(); } return 1 + Inner->GetDataWeight(item.GetOptionalValue()); } ui64 GetDefaultValueWeight() const final { return 1 + Inner->GetDefaultValueWeight(); } void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { if (IsNull(data, index)) { return out.PushChar(0); } out.PushChar(1); Inner->SaveItem(*data.child_data.front(), index, out); } void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final { if (!scalar.is_valid) { return out.PushChar(0); } out.PushChar(1); const auto& structScalar = arrow::internal::checked_cast(scalar); Inner->SaveScalarItem(*structScalar.value.front(), out); } private: const std::unique_ptr Inner; }; struct TReaderTraits { using TResult = IBlockReader; template using TTuple = TTupleBlockReader; template using TFixedSize = TFixedSizeBlockReader; template using TStrings = TStringBlockReader; using TExtOptional = TExternalOptionalBlockReader; template using TResource = TResourceBlockReader; template using TTzDateReader = TTzDateBlockReader; constexpr static bool PassType = false; static std::unique_ptr MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); if (desc.PassByValue) { return std::make_unique>(); } else { return std::make_unique>(); } } static std::unique_ptr MakeResource(bool isOptional) { if (isOptional) { return std::make_unique>(); } else { return std::make_unique>(); } } template static std::unique_ptr MakeTzDate(bool isOptional) { if (isOptional) { return std::make_unique>(); } else { return std::make_unique>(); } } }; inline std::unique_ptr MakeBlockReader(const ITypeInfoHelper& typeInfoHelper, const TType* type) { return DispatchByArrowTraits(typeInfoHelper, type, nullptr); } inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper, const TType* type, TBlockItemSerializeProps& props) { if (!props.MaxSize.Defined()) { return; } for (;;) { TOptionalTypeInspector typeOpt(typeInfoHelper, type); if (!typeOpt) { break; } props.MaxSize = *props.MaxSize + 1; props.IsFixed = false; type = typeOpt.GetItemType(); } TStructTypeInspector typeStruct(typeInfoHelper, type); if (typeStruct) { for (ui32 i = 0; i < typeStruct.GetMembersCount(); ++i) { UpdateBlockItemSerializeProps(typeInfoHelper, typeStruct.GetMemberType(i), props); } return; } TTupleTypeInspector typeTuple(typeInfoHelper, type); if (typeTuple) { for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) { UpdateBlockItemSerializeProps(typeInfoHelper, typeTuple.GetElementType(i), props); } return; } TDataTypeInspector typeData(typeInfoHelper, type); if (typeData) { auto typeId = typeData.GetTypeId(); auto slot = GetDataSlot(typeId); auto& dataTypeInfo = GetDataTypeInfo(slot); if (dataTypeInfo.Features & DecimalType) { *props.MaxSize += 16; } else if (dataTypeInfo.Features & StringType) { props.MaxSize = {}; props.IsFixed = false; } else if (dataTypeInfo.Features & TzDateType) { *props.MaxSize += dataTypeInfo.FixedSize + sizeof(TTimezoneId); } else { *props.MaxSize += dataTypeInfo.FixedSize; } return; } TPgTypeInspector typePg(typeInfoHelper, type); if (typePg) { auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId()); if (desc->PassByValue) { *props.MaxSize += 1 + 8; } else { props.MaxSize = {}; props.IsFixed = false; } return; } Y_ENSURE(false, "Unsupported type"); } } }