#pragma once #include "mkql_computation_node.h" #include "mkql_computation_node_holders.h" #include "mkql_optional_usage_mask.h" #include "mkql_block_transport.h" #include "mkql_block_reader.h" #include #include #include #include #include #include #include #include namespace NKikimr { namespace NMiniKQL { namespace NDetails { enum EPackProps { Begin, UseOptionalMask = Begin, UseTopLength, SingleOptional, End }; using TPackProperties = TEnumBitSet; struct TPackerState { explicit TPackerState(TPackProperties&& properties) : Properties(std::move(properties)) , OptionalMaskReserve(Properties.Test(EPackProps::UseOptionalMask) ? 1 : 0) { } const TPackProperties Properties; TPlainContainerCache TopStruct; TVector>> DictBuffers; TVector>> EncodedDictBuffers; size_t OptionalMaskReserve; NDetails::TOptionalUsageMask OptionalUsageMask; }; } // namespace NDetails template class TValuePackerGeneric { public: using TSelf = TValuePackerGeneric; TValuePackerGeneric(bool stable, const TType *type); // reference is valid till the next call to Pack() TStringBuf Pack(const NUdf::TUnboxedValuePod& value) const; NUdf::TUnboxedValue Unpack(TStringBuf buf, const THolderFactory& holderFactory) const; private: const bool Stable_; const TType* const Type_; // TODO: real thread safety with external state mutable TBuffer Buffer_; mutable NDetails::TPackerState State_; }; template class TValuePackerTransport { public: using TSelf = TValuePackerTransport; explicit TValuePackerTransport(const TType* type, arrow::MemoryPool* pool = nullptr); // for compatibility with TValuePackerGeneric - stable packing is not supported TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* ppol = nullptr); // AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List layout TSelf& AddItem(const NUdf::TUnboxedValuePod& value); TSelf& AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 count); size_t PackedSizeEstimate() const { return IsBlock_ ? BlockBuffer_.Size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0); } bool IsEmpty() const { return !ItemCount_; } bool IsBlock() const { return IsBlock_; } void Clear(); NYql::TChunkedBuffer Finish(); // Pack()/Unpack() will pack/unpack single value of type T NYql::TChunkedBuffer Pack(const NUdf::TUnboxedValuePod& value) const; NUdf::TUnboxedValue Unpack(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory) const; void UnpackBatch(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const; private: void BuildMeta(TPagedBuffer::TPtr& buffer, bool addItemCount) const; void StartPack(); void InitBlocks(); TSelf& AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 count); NYql::TChunkedBuffer FinishBlocks(); void UnpackBatchBlocks(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const; const TType* const Type_; ui64 ItemCount_ = 0; TPagedBuffer::TPtr Buffer_; mutable NDetails::TPackerState State_; mutable NDetails::TPackerState IncrementalState_; arrow::MemoryPool& ArrowPool_; bool IsBlock_ = false; bool IsLegacyBlock_ = false; ui32 BlockLenIndex_ = 0; TVector> BlockSerializers_; TVector> BlockReaders_; TVector> ConvertedScalars_; NYql::TChunkedBuffer BlockBuffer_; TVector> BlockDeserializers_; }; using TValuePacker = TValuePackerGeneric; class TValuePackerBoxed : public TComputationValue, public TValuePacker { typedef TComputationValue TBase; public: TValuePackerBoxed(TMemoryUsageInfo* memInfo, bool stable, const TType* type); }; } }