#include "mkql_block_impl.h" #include "mkql_computation_node_pack.h" #include "mkql_computation_node_pack_impl.h" #include "mkql_computation_node_holders.h" #include "presort.h" #include #include #include #include #include #include #include #include #include #include #include #include using NYql::TChunkedBuffer; namespace NKikimr { namespace NMiniKQL { namespace { using namespace NDetails; template void PackData(T value, TBuf& buffer) { static_assert(std::is_arithmetic_v); if constexpr (Fast || sizeof(T) == 1 || std::is_floating_point_v) { PutRawData(value, buffer); } else if constexpr (std::is_same_v) { PackInt16(value, buffer); } else if constexpr (std::is_same_v) { PackUInt16(value, buffer); } else if constexpr (std::is_same_v) { PackInt32(value, buffer); } else if constexpr (std::is_same_v) { PackUInt32(value, buffer); } else if constexpr (std::is_same_v) { PackInt64(value, buffer); } else { static_assert(std::is_same_v); PackUInt64(value, buffer); } } template void PackBlob(const char* data, size_t size, TBuf& buf) { buf.Append(data, size); } template T UnpackData(TChunkedInputBuffer& buf) { static_assert(std::is_arithmetic_v); T res; if constexpr (Fast || sizeof(T) == 1 || std::is_floating_point_v) { res = GetRawData(buf); } else if constexpr (std::is_same_v) { res = UnpackInt16(buf); } else if constexpr (std::is_same_v) { res = UnpackUInt16(buf); } else if constexpr (std::is_same_v) { res = UnpackInt32(buf); } else if constexpr (std::is_same_v) { res = UnpackUInt32(buf); } else if constexpr (std::is_same_v) { res = UnpackInt64(buf); } else { static_assert(std::is_same_v); res = UnpackUInt64(buf); } return res; } NUdf::TUnboxedValuePod UnpackString(TChunkedInputBuffer& buf, ui32 size) { auto res = MakeStringNotFilled(size, 0); NYql::NUdf::TMutableStringRef ref = res.AsStringRef(); Y_DEBUG_ABORT_UNLESS(size == ref.Size()); buf.CopyTo(ref.Data(), size); return res; } template void SerializeMeta(TBuf& buf, bool useMask, const NDetails::TOptionalUsageMask& mask, ui32 fullLen, bool singleOptional) { if (fullLen > 7) { NDetails::PutRawData(fullLen, buf); // Long length always singnals non-empty optional. So, don't check // EProps::SingleOptional here } else { ui8 length = 1 | (fullLen << 1); // Empty root optional always has short length. Embed empty flag // into the length if (singleOptional && !mask.IsEmptyMask()) { length |= 0x10; } NDetails::PutRawData(length, buf); } if (useMask) { // Prepend optional mask before data mask.Serialize(buf); } } class TFixedSizeBuffer { public: TFixedSizeBuffer(char* buf, size_t size) : Data_(buf) , Capacity_(size) { } inline char* Pos() { return Data_ + Size_; } inline size_t Size() const { return Size_; } inline void Advance(size_t len) { Size_ += len; } inline void EraseBack(size_t len) { Y_DEBUG_ABORT_UNLESS(Size_ >= len); Size_ -= len; } inline void Append(const char* data, size_t len) { Y_DEBUG_ABORT_UNLESS(Size_ + len <= Capacity_); std::memcpy(Data_ + Size_, data, len); Size_ += len; } inline void Append(char c) { Y_DEBUG_ABORT_UNLESS(Size_ + 1 <= Capacity_); *(Pos()) = c; ++Size_; } private: char* const Data_; size_t Size_ = 0; const size_t Capacity_; }; template std::pair SkipEmbeddedLength(TChunkedInputBuffer& buf, size_t totalBufSize) { if constexpr (Fast) { Y_ABORT("Should not be called"); } ui32 length = 0; bool emptySingleOptional = false; if (totalBufSize > 8) { length = GetRawData(buf); MKQL_ENSURE(length + 4 == totalBufSize, "Bad packed data. Invalid embedded size"); } else { length = GetRawData(buf); MKQL_ENSURE(length & 1, "Bad packed data. Invalid embedded size"); emptySingleOptional = 0 != (length & 0x10); length = (length & 0x0f) >> 1; MKQL_ENSURE(length + 1 == totalBufSize, "Bad packed data. Invalid embedded size"); } return {length, emptySingleOptional}; } bool HasOptionalFields(const TType* type) { switch (type->GetKind()) { case TType::EKind::Void: case TType::EKind::Null: case TType::EKind::EmptyList: case TType::EKind::EmptyDict: case TType::EKind::Data: return false; case TType::EKind::Optional: return true; case TType::EKind::Pg: return true; case TType::EKind::List: return HasOptionalFields(static_cast(type)->GetItemType()); case TType::EKind::Struct: { auto structType = static_cast(type); for (ui32 index = 0; index < structType->GetMembersCount(); ++index) { if (HasOptionalFields(structType->GetMemberType(index))) { return true; } } return false; } case TType::EKind::Tuple: { auto tupleType = static_cast(type); for (ui32 index = 0; index < tupleType->GetElementsCount(); ++index) { if (HasOptionalFields(tupleType->GetElementType(index))) { return true; } } return false; } case TType::EKind::Dict: { auto dictType = static_cast(type); return HasOptionalFields(dictType->GetKeyType()) || HasOptionalFields(dictType->GetPayloadType()); } case TType::EKind::Variant: { auto variantType = static_cast(type); return HasOptionalFields(variantType->GetUnderlyingType()); } case TType::EKind::Tagged: { auto taggedType = static_cast(type); return HasOptionalFields(taggedType->GetBaseType()); } case TType::EKind::Multi: { auto multiType = static_cast(type); for (ui32 index = 0; index < multiType->GetElementsCount(); ++index) { if (HasOptionalFields(multiType->GetElementType(index))) { return true; } } return false; } case TType::EKind::Block: { auto blockType = static_cast(type); return HasOptionalFields(blockType->GetItemType()); } default: THROW yexception() << "Unsupported type: " << type->GetKindAsStr(); } } TPackProperties ScanTypeProperties(const TType* type, bool assumeList) { TPackProperties props; if (HasOptionalFields(type)) { props.Set(EPackProps::UseOptionalMask); } if (assumeList) { return props; } if (type->GetKind() == TType::EKind::Optional) { type = static_cast(type)->GetItemType(); if (!HasOptionalFields(type)) { props.Set(EPackProps::SingleOptional); props.Reset(EPackProps::UseOptionalMask); } } // Here and after the type is unwrapped!! if (type->GetKind() == TType::EKind::Data) { auto dataType = static_cast(type); switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::String: case NUdf::EDataSlot::Json: case NUdf::EDataSlot::Yson: case NUdf::EDataSlot::Utf8: case NUdf::EDataSlot::JsonDocument: // Reuse entire packed value length for strings props.Set(EPackProps::UseTopLength); break; default: break; } } return props; } template NUdf::TUnboxedValue UnpackFromChunkedBuffer(const TType* type, TChunkedInputBuffer& buf, ui32 topLength, const THolderFactory& holderFactory, TPackerState& s) { switch (type->GetKind()) { case TType::EKind::Void: return NUdf::TUnboxedValuePod::Void(); case TType::EKind::Null: return NUdf::TUnboxedValuePod(); case TType::EKind::EmptyList: return holderFactory.GetEmptyContainerLazy(); case TType::EKind::EmptyDict: return holderFactory.GetEmptyContainerLazy(); case TType::EKind::Data: { auto dataType = static_cast(type); switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Bool: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Int8: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Uint8: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Int16: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Uint16: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Int32: case NUdf::EDataSlot::Date32: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Uint32: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Int64: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Uint64: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Float: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Double: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Date: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Datetime: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Timestamp: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::Interval: case NUdf::EDataSlot::Datetime64: case NUdf::EDataSlot::Timestamp64: case NUdf::EDataSlot::Interval64: return NUdf::TUnboxedValuePod(UnpackData(buf)); case NUdf::EDataSlot::TzDate: { auto ret = NUdf::TUnboxedValuePod(UnpackData(buf)); ret.SetTimezoneId(UnpackData(buf)); return ret; } case NUdf::EDataSlot::TzDatetime: { auto ret = NUdf::TUnboxedValuePod(UnpackData(buf)); ret.SetTimezoneId(UnpackData(buf)); return ret; } case NUdf::EDataSlot::TzTimestamp: { auto ret = NUdf::TUnboxedValuePod(UnpackData(buf)); ret.SetTimezoneId(UnpackData(buf)); return ret; } case NUdf::EDataSlot::TzDate32: { auto ret = NUdf::TUnboxedValuePod(UnpackData(buf)); ret.SetTimezoneId(UnpackData(buf)); return ret; } case NUdf::EDataSlot::TzDatetime64: { auto ret = NUdf::TUnboxedValuePod(UnpackData(buf)); ret.SetTimezoneId(UnpackData(buf)); return ret; } case NUdf::EDataSlot::TzTimestamp64: { auto ret = NUdf::TUnboxedValuePod(UnpackData(buf)); ret.SetTimezoneId(UnpackData(buf)); return ret; } case NUdf::EDataSlot::Uuid: { return UnpackString(buf, 16); } case NUdf::EDataSlot::Decimal: { return NUdf::TUnboxedValuePod(UnpackDecimal(buf)); } case NUdf::EDataSlot::String: case NUdf::EDataSlot::Utf8: case NUdf::EDataSlot::Yson: case NUdf::EDataSlot::Json: case NUdf::EDataSlot::JsonDocument: case NUdf::EDataSlot::DyNumber: { ui32 size = 0; if constexpr (Fast) { size = NDetails::GetRawData(buf); } else { if (s.Properties.Test(EPackProps::UseTopLength)) { size = topLength; } else { size = NDetails::UnpackUInt32(buf); } } return UnpackString(buf, size); } } break; } case TType::EKind::Optional: { auto optionalType = static_cast(type); bool present; if constexpr (Fast) { present = NDetails::GetRawData(buf); } else { present = !s.OptionalUsageMask.IsNextEmptyOptional(); } if (present) { return UnpackFromChunkedBuffer(optionalType->GetItemType(), buf, topLength, holderFactory, s).Release().MakeOptional(); } else { return NUdf::TUnboxedValuePod(); } } case TType::EKind::Pg: { auto pgType = static_cast(type); bool present; if constexpr (Fast) { present = NDetails::GetRawData(buf); } else { present = !s.OptionalUsageMask.IsNextEmptyOptional(); } if (present) { return PGUnpackImpl(pgType, buf); } else { return NUdf::TUnboxedValuePod(); } } case TType::EKind::List: { auto listType = static_cast(type); auto itemType = listType->GetItemType(); ui64 len; if constexpr (Fast) { len = NDetails::GetRawData(buf); } else { len = NDetails::UnpackUInt64(buf); } if (!len) { return holderFactory.GetEmptyContainerLazy(); } TTemporaryUnboxedValueVector tmp; for (ui64 i = 0; i < len; ++i) { tmp.emplace_back(UnpackFromChunkedBuffer(itemType, buf, topLength, holderFactory, s)); } NUdf::TUnboxedValue *items = nullptr; auto list = holderFactory.CreateDirectArrayHolder(len, items); for (ui64 i = 0; i < len; ++i) { items[i] = std::move(tmp[i]); } return std::move(list); } case TType::EKind::Struct: { auto structType = static_cast(type); NUdf::TUnboxedValue* itemsPtr = nullptr; auto res = holderFactory.CreateDirectArrayHolder(structType->GetMembersCount(), itemsPtr); for (ui32 index = 0; index < structType->GetMembersCount(); ++index) { auto memberType = structType->GetMemberType(index); itemsPtr[index] = UnpackFromChunkedBuffer(memberType, buf, topLength, holderFactory, s); } return std::move(res); } case TType::EKind::Tuple: { auto tupleType = static_cast(type); NUdf::TUnboxedValue* itemsPtr = nullptr; auto res = holderFactory.CreateDirectArrayHolder(tupleType->GetElementsCount(), itemsPtr); for (ui32 index = 0; index < tupleType->GetElementsCount(); ++index) { auto elementType = tupleType->GetElementType(index); itemsPtr[index] = UnpackFromChunkedBuffer(elementType, buf, topLength, holderFactory, s); } return std::move(res); } case TType::EKind::Dict: { auto dictType = static_cast(type); auto keyType = dictType->GetKeyType(); auto payloadType = dictType->GetPayloadType(); auto dictBuilder = holderFactory.NewDict(dictType, NUdf::TDictFlags::EDictKind::Hashed); ui64 len; if constexpr (Fast) { len = NDetails::GetRawData(buf); } else { len = NDetails::UnpackUInt64(buf); } for (ui64 i = 0; i < len; ++i) { auto key = UnpackFromChunkedBuffer(keyType, buf, topLength, holderFactory, s); auto payload = UnpackFromChunkedBuffer(payloadType, buf, topLength, holderFactory, s); dictBuilder->Add(std::move(key), std::move(payload)); } return dictBuilder->Build(); } case TType::EKind::Variant: { auto variantType = static_cast(type); ui32 variantIndex; if constexpr (Fast) { variantIndex = NDetails::GetRawData(buf); } else { variantIndex = NDetails::UnpackUInt32(buf); } TType* innerType = variantType->GetUnderlyingType(); if (innerType->IsStruct()) { MKQL_ENSURE(variantIndex < static_cast(innerType)->GetMembersCount(), "Bad variant index: " << variantIndex); innerType = static_cast(innerType)->GetMemberType(variantIndex); } else { MKQL_ENSURE(innerType->IsTuple(), "Unexpected underlying variant type: " << innerType->GetKindAsStr()); MKQL_ENSURE(variantIndex < static_cast(innerType)->GetElementsCount(), "Bad variant index: " << variantIndex); innerType = static_cast(innerType)->GetElementType(variantIndex); } return holderFactory.CreateVariantHolder(UnpackFromChunkedBuffer(innerType, buf, topLength, holderFactory, s).Release(), variantIndex); } case TType::EKind::Tagged: { auto taggedType = static_cast(type); return UnpackFromChunkedBuffer(taggedType->GetBaseType(), buf, topLength, holderFactory, s); } default: THROW yexception() << "Unsupported type: " << type->GetKindAsStr(); } } template NUdf::TUnboxedValue DoUnpack(const TType* type, TChunkedInputBuffer& buf, size_t totalBufSize, const THolderFactory& holderFactory, TPackerState& s) { if constexpr (Fast) { NUdf::TUnboxedValue res; res = UnpackFromChunkedBuffer(type, buf, 0, holderFactory, s); MKQL_ENSURE(buf.IsEmpty(), "Bad packed data - partial data read"); return res; } auto pair = SkipEmbeddedLength(buf, totalBufSize); ui32 length = pair.first; bool emptySingleOptional = pair.second; if (s.Properties.Test(EPackProps::UseOptionalMask)) { s.OptionalUsageMask.Reset(buf); } NUdf::TUnboxedValue res; if (s.Properties.Test(EPackProps::SingleOptional) && emptySingleOptional) { res = NUdf::TUnboxedValuePod(); } else if (type->IsStruct()) { auto structType = static_cast(type); NUdf::TUnboxedValue* items = nullptr; res = s.TopStruct.NewArray(holderFactory, structType->GetMembersCount(), items); for (ui32 index = 0; index < structType->GetMembersCount(); ++index) { auto memberType = structType->GetMemberType(index); *items++ = UnpackFromChunkedBuffer(memberType, buf, length, holderFactory, s); } } else { res = UnpackFromChunkedBuffer(type, buf, length, holderFactory, s); } MKQL_ENSURE(buf.IsEmpty(), "Bad packed data - partial data read"); return res; } template void DoUnpackBatch(const TType* type, TChunkedInputBuffer& buf, size_t totalSize, const THolderFactory& holderFactory, TPackerState& s, TUnboxedValueBatch& result) { ui64 len; ui32 topLength; const TType* itemType = type; if constexpr (!Fast) { auto pair = SkipEmbeddedLength(buf, totalSize); topLength = pair.first; bool emptySingleOptional = pair.second; if (s.Properties.Test(EPackProps::UseOptionalMask)) { s.OptionalUsageMask.Reset(buf); } MKQL_ENSURE(!s.Properties.Test(EPackProps::SingleOptional) || !emptySingleOptional, "Unexpected header settings"); len = NDetails::UnpackUInt64(buf); } else { topLength = 0; len = NDetails::GetRawData(buf); } if (type->IsMulti()) { auto multiType = static_cast(type); const ui32 width = multiType->GetElementsCount(); Y_DEBUG_ABORT_UNLESS(result.IsWide()); Y_DEBUG_ABORT_UNLESS(result.Width() == width); for (ui64 i = 0; i < len; ++i) { result.PushRow([&](ui32 j) { return UnpackFromChunkedBuffer(multiType->GetElementType(j), buf, topLength, holderFactory, s); }); } } else { Y_DEBUG_ABORT_UNLESS(!result.IsWide()); for (ui64 i = 0; i < len; ++i) { result.emplace_back(UnpackFromChunkedBuffer(itemType, buf, topLength, holderFactory, s)); } } MKQL_ENSURE(buf.IsEmpty(), "Bad packed data - partial data read"); } template void PackImpl(const TType* type, TBuf& buffer, const NUdf::TUnboxedValuePod& value, TPackerState& s) { switch (type->GetKind()) { case TType::EKind::Void: break; case TType::EKind::Null: break; case TType::EKind::EmptyList: break; case TType::EKind::EmptyDict: break; case TType::EKind::Data: { auto dataType = static_cast(type); switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Bool: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Int8: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Uint8: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Int16: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Uint16: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Int32: case NUdf::EDataSlot::Date32: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Uint32: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Int64: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Uint64: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Float: { float x = value.Get(); if constexpr (Stable) { NYql::CanonizeFpBits(&x); } PackData(x, buffer); break; } case NUdf::EDataSlot::Double: { double x = value.Get(); if constexpr (Stable) { NYql::CanonizeFpBits(&x); } PackData(x, buffer); break; } case NUdf::EDataSlot::Date: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Datetime: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Timestamp: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Interval: case NUdf::EDataSlot::Datetime64: case NUdf::EDataSlot::Timestamp64: case NUdf::EDataSlot::Interval64: PackData(value.Get(), buffer); break; case NUdf::EDataSlot::Uuid: { auto ref = value.AsStringRef(); PackBlob(ref.Data(), ref.Size(), buffer); break; } case NUdf::EDataSlot::TzDate: { PackData(value.Get(), buffer); PackData(value.GetTimezoneId(), buffer); break; } case NUdf::EDataSlot::TzDatetime: { PackData(value.Get(), buffer); PackData(value.GetTimezoneId(), buffer); break; } case NUdf::EDataSlot::TzTimestamp: { PackData(value.Get(), buffer); PackData(value.GetTimezoneId(), buffer); break; } case NUdf::EDataSlot::TzDate32: { PackData(value.Get(), buffer); PackData(value.GetTimezoneId(), buffer); break; } case NUdf::EDataSlot::TzDatetime64: { PackData(value.Get(), buffer); PackData(value.GetTimezoneId(), buffer); break; } case NUdf::EDataSlot::TzTimestamp64: { PackData(value.Get(), buffer); PackData(value.GetTimezoneId(), buffer); break; } case NUdf::EDataSlot::Decimal: { PackDecimal(value.GetInt128(), buffer); break; } case NUdf::EDataSlot::String: case NUdf::EDataSlot::Utf8: case NUdf::EDataSlot::Yson: case NUdf::EDataSlot::Json: case NUdf::EDataSlot::JsonDocument: case NUdf::EDataSlot::DyNumber: { auto stringRef = value.AsStringRef(); if constexpr (Fast) { static_assert(std::is_same_v); PackData(stringRef.Size(), buffer); } else { if (!s.Properties.Test(EPackProps::UseTopLength)) { PackData(stringRef.Size(), buffer); } } PackBlob(stringRef.Data(), stringRef.Size(), buffer); } } break; } case TType::EKind::Optional: { auto optionalType = static_cast(type); if constexpr (Fast) { PackData(ui8(bool(value)), buffer); } else { s.OptionalUsageMask.SetNextEmptyOptional(!value); } if (value) { PackImpl(optionalType->GetItemType(), buffer, value.GetOptionalValue(), s); } break; } case TType::EKind::Pg: { auto pgType = static_cast(type); if constexpr (Fast) { PackData(ui8(bool(value)), buffer); } else { s.OptionalUsageMask.SetNextEmptyOptional(!value); } if (value) { PGPackImpl(Stable, pgType, value, buffer); } break; } case TType::EKind::List: { auto listType = static_cast(type); auto itemType = listType->GetItemType(); if (value.HasFastListLength()) { ui64 len = value.GetListLength(); PackData(len, buffer); TThresher::DoForEachItem(value, [&](const NYql::NUdf::TUnboxedValuePod& item) { PackImpl(itemType, buffer, item, s); }); } else { const auto iter = value.GetListIterator(); if constexpr (Fast) { ui64 count = 0; buffer.Advance(sizeof(count)); char* dst = buffer.Pos() - sizeof(count); for (NUdf::TUnboxedValue item; iter.Next(item);) { PackImpl(itemType, buffer, item, s); ++count; } std::memcpy(dst, &count, sizeof(count)); } else { TUnboxedValueVector items; for (NUdf::TUnboxedValue item; iter.Next(item);) { items.emplace_back(std::move(item)); } PackData(ui64(items.size()), buffer); for (const auto& item : items) { PackImpl(itemType, buffer, item, s); } } } break; } case TType::EKind::Struct: { auto structType = static_cast(type); for (ui32 index = 0; index < structType->GetMembersCount(); ++index) { auto memberType = structType->GetMemberType(index); PackImpl(memberType, buffer, value.GetElement(index), s); } break; } case TType::EKind::Tuple: { auto tupleType = static_cast(type); for (ui32 index = 0; index < tupleType->GetElementsCount(); ++index) { auto elementType = tupleType->GetElementType(index); PackImpl(elementType, buffer, value.GetElement(index), s); } break; } case TType::EKind::Dict: { auto dictType = static_cast(type); auto keyType = dictType->GetKeyType(); auto payloadType = dictType->GetPayloadType(); ui64 length = value.GetDictLength(); PackData(length, buffer); const auto iter = value.GetDictIterator(); if constexpr (Fast) { for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) { PackImpl(keyType, buffer, key, s); PackImpl(payloadType, buffer, payload, s); } } else { if (Stable && !value.IsSortedDict()) { // no key duplicates here TKeyTypes types; bool isTuple; bool encoded; bool useIHash; GetDictionaryKeyTypes(keyType, types, isTuple, encoded, useIHash); if (encoded) { TGenericPresortEncoder packer(keyType); typename decltype(s.EncodedDictBuffers)::value_type dictBuffer; if (!s.EncodedDictBuffers.empty()) { dictBuffer = std::move(s.EncodedDictBuffers.back()); s.EncodedDictBuffers.pop_back(); dictBuffer.clear(); } dictBuffer.reserve(length); for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) { NUdf::TUnboxedValue encodedKey = MakeString(packer.Encode(key, false)); dictBuffer.emplace_back(std::move(encodedKey), std::move(key), std::move(payload)); } Sort(dictBuffer.begin(), dictBuffer.end(), [&](const auto &left, const auto &right) { return CompareKeys(std::get<0>(left), std::get<0>(right), types, isTuple) < 0; }); for (const auto& x : dictBuffer) { PackImpl(keyType, buffer, std::get<1>(x), s); PackImpl(payloadType, buffer, std::get<2>(x), s); } dictBuffer.clear(); s.EncodedDictBuffers.push_back(std::move(dictBuffer)); } else { typename decltype(s.DictBuffers)::value_type dictBuffer; if (!s.DictBuffers.empty()) { dictBuffer = std::move(s.DictBuffers.back()); s.DictBuffers.pop_back(); dictBuffer.clear(); } dictBuffer.reserve(length); for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) { dictBuffer.emplace_back(std::move(key), std::move(payload)); } NUdf::ICompare::TPtr cmp = useIHash ? MakeCompareImpl(keyType) : nullptr; Sort(dictBuffer.begin(), dictBuffer.end(), TKeyPayloadPairLess(types, isTuple, cmp.Get())); for (const auto& p: dictBuffer) { PackImpl(keyType, buffer, p.first, s); PackImpl(payloadType, buffer, p.second, s); } dictBuffer.clear(); s.DictBuffers.push_back(std::move(dictBuffer)); } } else { for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) { PackImpl(keyType, buffer, key, s); PackImpl(payloadType, buffer, payload, s); } } } break; } case TType::EKind::Variant: { auto variantType = static_cast(type); ui32 variantIndex = value.GetVariantIndex(); TType* innerType = variantType->GetUnderlyingType(); if (innerType->IsStruct()) { innerType = static_cast(innerType)->GetMemberType(variantIndex); } else { MKQL_ENSURE(innerType->IsTuple(), "Unexpected underlying variant type: " << innerType->GetKindAsStr()); innerType = static_cast(innerType)->GetElementType(variantIndex); } PackData(variantIndex, buffer); PackImpl(innerType, buffer, value.GetVariantItem(), s); break; } case TType::EKind::Tagged: { auto taggedType = static_cast(type); PackImpl(taggedType->GetBaseType(), buffer, value, s); break; } default: THROW yexception() << "Unsupported type: " << type->GetKindAsStr(); } } bool HasOffset(const arrow::ArrayData& array, i64 expectedOffset) { return array.offset == expectedOffset && AllOf(array.child_data, [&](const auto& child) { return HasOffset(*child, expectedOffset); }); } bool IsUi64Scalar(const TBlockType* blockType) { if (blockType->GetShape() != TBlockType::EShape::Scalar) { return false; } if (!blockType->GetItemType()->IsData()) { return false; } return static_cast(blockType->GetItemType())->GetDataSlot() == NUdf::EDataSlot::Uint64; } bool IsLegacyStructBlock(const TType* type, ui32& blockLengthIndex, TVector& items) { items.clear(); blockLengthIndex = Max(); if (!type->IsStruct()) { return false; } const TStructType* structType = static_cast(type); static const TStringBuf blockLenColumnName = "_yql_block_length"; auto index = structType->FindMemberIndex(blockLenColumnName); if (!index) { return false; } for (ui32 i = 0; i < structType->GetMembersCount(); i++) { auto type = structType->GetMemberType(i); if (!type->IsBlock()) { return false; } const TBlockType* blockType = static_cast(type); items.push_back(blockType); if (i == *index && !IsUi64Scalar(blockType)) { return false; } } blockLengthIndex = *index; return true; } bool IsMultiBlock(const TType* type, ui32& blockLengthIndex, TVector& items) { items.clear(); blockLengthIndex = Max(); if (!type->IsMulti()) { return false; } const TMultiType* multiType = static_cast(type); ui32 width = multiType->GetElementsCount(); if (!width) { return false; } for (ui32 i = 0; i < width; i++) { auto type = multiType->GetElementType(i); if (!type->IsBlock()) { return false; } const TBlockType* blockType = static_cast(type); items.push_back(blockType); if (i == width - 1 && !IsUi64Scalar(blockType)) { return false; } } blockLengthIndex = width - 1; return true; } } // namespace template TValuePackerGeneric::TValuePackerGeneric(bool stable, const TType* type) : Stable_(stable) , Type_(type) , State_(ScanTypeProperties(Type_, false)) { MKQL_ENSURE(!Fast || !Stable_, "Stable mode is not supported"); } template NUdf::TUnboxedValue TValuePackerGeneric::Unpack(TStringBuf buf, const THolderFactory& holderFactory) const { TChunkedInputBuffer chunked(buf); return DoUnpack(Type_, chunked, buf.size(), holderFactory, State_); } template TStringBuf TValuePackerGeneric::Pack(const NUdf::TUnboxedValuePod& value) const { auto& s = State_; if constexpr (Fast) { Buffer_.Proceed(0); if (Stable_) { PackImpl(Type_, Buffer_, value, s); } else { PackImpl(Type_, Buffer_, value, s); } return TStringBuf(Buffer_.data(), Buffer_.size()); } s.OptionalUsageMask.Reset(); const size_t lengthReserve = sizeof(ui32); Buffer_.Proceed(lengthReserve + s.OptionalMaskReserve); if (Stable_) { PackImpl(Type_, Buffer_, value, s); } else { PackImpl(Type_, Buffer_, value, s); } size_t delta = 0; size_t len = Buffer_.Size(); if (s.Properties.Test(EPackProps::UseOptionalMask)) { // Prepend optional mask const size_t actualOptionalMaskSize = s.OptionalUsageMask.CalcSerializedSize(); if (actualOptionalMaskSize > s.OptionalMaskReserve) { TBuffer buf(Buffer_.Size() + actualOptionalMaskSize - s.OptionalMaskReserve); buf.Proceed(actualOptionalMaskSize - s.OptionalMaskReserve); buf.Append(Buffer_.Data(), Buffer_.Size()); Buffer_.Swap(buf); s.OptionalMaskReserve = actualOptionalMaskSize; len = Buffer_.Size(); } delta = s.OptionalMaskReserve - actualOptionalMaskSize; Buffer_.Proceed(lengthReserve + delta); s.OptionalUsageMask.Serialize(Buffer_); } // Prepend length if (len - delta - lengthReserve > 7) { const ui32 length = len - delta - lengthReserve; Buffer_.Proceed(delta); Buffer_.Append((const char*)&length, sizeof(length)); // Long length always singnals non-empty optional. So, don't check EProps::SingleOptional here } else { ui8 length = 1 | ((len - delta - lengthReserve) << 1); // Empty root optional always has short length. Embed empty flag into the length if (s.Properties.Test(EPackProps::SingleOptional) && !s.OptionalUsageMask.IsEmptyMask()) { length |= 0x10; } delta += 3; Buffer_.Proceed(delta); Buffer_.Append((const char*)&length, sizeof(length)); } return TStringBuf(Buffer_.Data() + delta, len - delta); } // Transport packer template TValuePackerTransport::TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* pool) : Type_(type) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) { MKQL_ENSURE(!stable, "Stable packing is not supported"); InitBlocks(); } template TValuePackerTransport::TValuePackerTransport(const TType* type, arrow::MemoryPool* pool) : Type_(type) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) { InitBlocks(); } template void TValuePackerTransport::InitBlocks() { TVector items; if (IsLegacyStructBlock(Type_, BlockLenIndex_, items)) { IsLegacyBlock_ = true; } else if (!IsMultiBlock(Type_, BlockLenIndex_, items)) { return; } IsBlock_ = true; ConvertedScalars_.resize(items.size()); BlockReaders_.resize(items.size()); BlockSerializers_.resize(items.size()); BlockDeserializers_.resize(items.size()); for (ui32 i = 0; i < items.size(); ++i) { if (i != BlockLenIndex_) { const TBlockType* itemType = items[i]; BlockSerializers_[i] = MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType()); BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType()); if (itemType->GetShape() == TBlockType::EShape::Scalar) { BlockReaders_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType()); } } } } template NUdf::TUnboxedValue TValuePackerTransport::Unpack(TChunkedBuffer&& buf, const THolderFactory& holderFactory) const { MKQL_ENSURE(!IsBlock_, "Unpack() should not be used for blocks"); const size_t totalSize = buf.Size(); TChunkedInputBuffer chunked(std::move(buf)); return DoUnpack(Type_, chunked, totalSize, holderFactory, State_); } template void TValuePackerTransport::UnpackBatch(TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { if (IsBlock_) { return UnpackBatchBlocks(std::move(buf), holderFactory, result); } const size_t totalSize = buf.Size(); TChunkedInputBuffer chunked(std::move(buf)); DoUnpackBatch(Type_, chunked, totalSize, holderFactory, IncrementalState_, result); } template TChunkedBuffer TValuePackerTransport::Pack(const NUdf::TUnboxedValuePod& value) const { MKQL_ENSURE(ItemCount_ == 0, "Can not mix Pack() and AddItem() calls"); MKQL_ENSURE(!IsBlock_, "Pack() should not be used for blocks"); TPagedBuffer::TPtr result = std::make_shared(); if constexpr (Fast) { PackImpl(Type_, *result, value, State_); } else { State_.OptionalUsageMask.Reset(); result->ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve); PackImpl(Type_, *result, value, State_); BuildMeta(result, false); } return TPagedBuffer::AsChunkedBuffer(result); } template void TValuePackerTransport::StartPack() { Buffer_ = std::make_shared(); if constexpr (Fast) { // reserve place for list item count Buffer_->ReserveHeader(sizeof(ItemCount_)); } else { IncrementalState_.OptionalUsageMask.Reset(); Buffer_->ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve + MAX_PACKED64_SIZE); } } template TValuePackerTransport& TValuePackerTransport::AddItem(const NUdf::TUnboxedValuePod& value) { Y_DEBUG_ABORT_UNLESS(!Type_->IsMulti()); if (IsLegacyBlock_) { static_assert(sizeof(NUdf::TUnboxedValuePod) == sizeof(NUdf::TUnboxedValue)); const NUdf::TUnboxedValuePod* values = static_cast(value.GetElements()); return AddWideItemBlocks(values, BlockSerializers_.size()); } const TType* itemType = Type_; if (!ItemCount_) { StartPack(); } PackImpl(itemType, *Buffer_, value, IncrementalState_); ++ItemCount_; return *this; } template TValuePackerTransport& TValuePackerTransport::AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 width) { Y_DEBUG_ABORT_UNLESS(Type_->IsMulti()); Y_DEBUG_ABORT_UNLESS(static_cast(Type_)->GetElementsCount() == width); if (IsBlock_) { return AddWideItemBlocks(values, width); } const TMultiType* itemType = static_cast(Type_); if (!ItemCount_) { StartPack(); } for (ui32 i = 0; i < width; ++i) { PackImpl(itemType->GetElementType(i), *Buffer_, values[i], IncrementalState_); } ++ItemCount_; return *this; } template TValuePackerTransport& TValuePackerTransport::AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 width) { MKQL_ENSURE(width == BlockSerializers_.size(), "Invalid width"); const ui64 len = TArrowBlock::From(values[BlockLenIndex_]).GetDatum().scalar_as().value; auto metadataBuffer = std::make_shared(); ui32 totalMetadataCount = 0; for (size_t i = 0; i < width; ++i) { if (i != BlockLenIndex_) { MKQL_ENSURE(BlockSerializers_[i], "Invalid serializer"); totalMetadataCount += BlockSerializers_[i]->ArrayMetadataCount(); } } // calculate approximate metadata size const size_t metadataReservedSize = MAX_PACKED64_SIZE + // block len MAX_PACKED64_SIZE + // feature flags (width - 1) + // 1-byte offsets MAX_PACKED32_SIZE + // metadata words count MAX_PACKED64_SIZE * totalMetadataCount; // metadata words metadataBuffer->Reserve(len ? metadataReservedSize : MAX_PACKED64_SIZE); // save block length PackData(len, *metadataBuffer); if (!len) { // only block len should be serialized in this case BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer); ++ItemCount_; return *this; } // save feature flags // 1 = "scalars are present" const ui64 metadataFlags = 1 << 0; PackData(metadataFlags, *metadataBuffer); TVector> arrays(width); // save reminder of original offset for each column - it is needed to properly handle offset in bitmaps for (size_t i = 0; i < width; ++i) { if (i == BlockLenIndex_) { continue; } arrow::Datum datum = TArrowBlock::From(values[i]).GetDatum(); ui8 reminder = 0; if (datum.is_array()) { i64 offset = datum.array()->offset; MKQL_ENSURE(offset >= 0, "Negative offset"); // all offsets should be equal MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data"); reminder = offset % 8; arrays[i] = datum.array(); } else { MKQL_ENSURE(datum.is_scalar(), "Expecting array or scalar"); if (!ConvertedScalars_[i]) { const TType* itemType = IsLegacyBlock_ ? static_cast(Type_)->GetMemberType(i) : static_cast(Type_)->GetElementType(i); datum = MakeArrayFromScalar(*datum.scalar(), 1, static_cast(itemType)->GetItemType(), ArrowPool_); MKQL_ENSURE(HasOffset(*datum.array(), 0), "Expected zero array offset after scalar is converted to array"); ConvertedScalars_[i] = datum.array(); } arrays[i] = ConvertedScalars_[i]; } PackData(reminder, *metadataBuffer); } // save count of metadata words PackData(totalMetadataCount, *metadataBuffer); // save metadata itself ui32 savedMetadata = 0; for (size_t i = 0; i < width; ++i) { if (i != BlockLenIndex_) { BlockSerializers_[i]->StoreMetadata(*arrays[i], [&](ui64 meta) { PackData(meta, *metadataBuffer); ++savedMetadata; }); } } MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error"); BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer); // save buffers for (size_t i = 0; i < width; ++i) { if (i != BlockLenIndex_) { BlockSerializers_[i]->StoreArray(*arrays[i], BlockBuffer_); } } ++ItemCount_; return *this; } template void TValuePackerTransport::UnpackBatchBlocks(TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { while (!buf.Empty()) { TChunkedInputBuffer chunked(std::move(buf)); // unpack block length const ui64 len = UnpackData(chunked); if (len == 0) { continue; } // unpack flags const ui64 metadataFlags = UnpackData(chunked); MKQL_ENSURE(metadataFlags == 1, "Unsupported metadata flags"); // unpack array offsets const ui32 width = BlockDeserializers_.size(); MKQL_ENSURE(width > 0, "Invalid width"); TVector offsets(width); for (ui32 i = 0; i < width; ++i) { if (BlockDeserializers_[i]) { offsets[i] = UnpackData(chunked); MKQL_ENSURE(offsets[i] < 8, "Unexpected offset value"); } } // unpack metadata ui32 metaCount = UnpackData(chunked); for (ui32 i = 0; i < width; ++i) { if (BlockDeserializers_[i]) { BlockDeserializers_[i]->LoadMetadata([&]() -> ui64 { MKQL_ENSURE(metaCount > 0, "No more metadata available"); --metaCount; return UnpackData(chunked); }); } } MKQL_ENSURE(metaCount == 0, "Partial buffers read"); TChunkedBuffer ropeTail = chunked.ReleaseRope(); // unpack buffers auto producer = [&](ui32 i) { MKQL_ENSURE(i < width, "Unexpected row index"); if (i != BlockLenIndex_) { MKQL_ENSURE(BlockDeserializers_[i], "Missing deserializer"); const bool isScalar = BlockReaders_[i] != nullptr; auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offsets[i]); if (isScalar) { TBlockItem item = BlockReaders_[i]->GetItem(*array, 0); const TType* itemType = IsLegacyBlock_ ? static_cast(Type_)->GetMemberType(i) : static_cast(Type_)->GetElementType(i); return holderFactory.CreateArrowBlock(ConvertScalar(static_cast(itemType)->GetItemType(), item, ArrowPool_)); } return holderFactory.CreateArrowBlock(array); } return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared(len))); }; if (IsLegacyBlock_) { NYql::NUdf::TUnboxedValue* valueItems; auto structValue = holderFactory.CreateDirectArrayHolder(width, valueItems); for (ui32 i = 0; i < width; ++i) { valueItems[i] = producer(i); } result.emplace_back(std::move(structValue)); } else { result.PushRow(producer); } buf = std::move(ropeTail); } } template void TValuePackerTransport::Clear() { Buffer_.reset(); BlockBuffer_.Clear(); ItemCount_ = 0; } template TChunkedBuffer TValuePackerTransport::Finish() { if (IsBlock_) { return FinishBlocks(); } if (!ItemCount_) { StartPack(); } if constexpr (Fast) { char* dst = Buffer_->Header(sizeof(ItemCount_)); Y_DEBUG_ABORT_UNLESS(dst); std::memcpy(dst, &ItemCount_, sizeof(ItemCount_)); } else { BuildMeta(Buffer_, true); } TPagedBuffer::TPtr result = std::move(Buffer_); Clear(); return TPagedBuffer::AsChunkedBuffer(result); } template TChunkedBuffer TValuePackerTransport::FinishBlocks() { TChunkedBuffer result = std::move(BlockBuffer_); Clear(); return result; } template void TValuePackerTransport::BuildMeta(TPagedBuffer::TPtr& buffer, bool addItemCount) const { const size_t itemCountSize = addItemCount ? GetPack64Length(ItemCount_) : 0; const size_t packedSize = buffer->Size() + itemCountSize; auto& s = addItemCount ? IncrementalState_ : State_; const bool useMask = s.Properties.Test(EPackProps::UseOptionalMask); const size_t maskSize = useMask ? s.OptionalUsageMask.CalcSerializedSize() : 0; const size_t fullLen = maskSize + packedSize; MKQL_ENSURE(fullLen <= Max(), "Packed obbject size exceeds 4G"); size_t metaSize = (fullLen > 7 ? sizeof(ui32) : sizeof(ui8)) + maskSize; if (char* header = buffer->Header(metaSize + itemCountSize)) { TFixedSizeBuffer buf(header, metaSize + itemCountSize); SerializeMeta(buf, useMask, s.OptionalUsageMask, fullLen, s.Properties.Test(EPackProps::SingleOptional)); if (addItemCount) { if constexpr (Fast) { PackData(ItemCount_, buf); } else { // PackData() can not be used here - it may overwrite some bytes past the end of header char tmp[MAX_PACKED64_SIZE]; size_t actualItemCountSize = Pack64(ItemCount_, tmp); std::memcpy(buf.Pos(), tmp, actualItemCountSize); buf.Advance(actualItemCountSize); } } MKQL_ENSURE(buf.Size() == metaSize + itemCountSize, "Partial header write"); } else { s.OptionalMaskReserve = maskSize; TPagedBuffer::TPtr resultBuffer = std::make_shared(); SerializeMeta(*resultBuffer, useMask, s.OptionalUsageMask, fullLen, s.Properties.Test(EPackProps::SingleOptional)); if (addItemCount) { PackData(ItemCount_, *resultBuffer); } buffer->ForEachPage([&resultBuffer](const char* data, size_t len) { resultBuffer->Append(data, len); }); buffer = std::move(resultBuffer); } } template class TValuePackerGeneric; template class TValuePackerGeneric; template class TValuePackerTransport; template class TValuePackerTransport; TValuePackerBoxed::TValuePackerBoxed(TMemoryUsageInfo* memInfo, bool stable, const TType* type) : TBase(memInfo) , TValuePacker(stable, type) {} } // NMiniKQL } // NKikimr