mkql_computation_node_pack.h 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. #pragma once
  2. #include "mkql_computation_node.h"
  3. #include "mkql_computation_node_holders.h"
  4. #include "mkql_optional_usage_mask.h"
  5. #include "mkql_block_transport.h"
  6. #include "mkql_block_reader.h"
  7. #include <yql/essentials/minikql/mkql_buffer.h>
  8. #include <yql/essentials/public/udf/udf_value.h>
  9. #include <library/cpp/enumbitset/enumbitset.h>
  10. #include <yql/essentials/utils/chunked_buffer.h>
  11. #include <util/stream/output.h>
  12. #include <util/generic/buffer.h>
  13. #include <util/generic/strbuf.h>
  14. #include <utility>
  15. namespace NKikimr {
  16. namespace NMiniKQL {
  17. namespace NDetails {
  18. enum EPackProps {
  19. Begin,
  20. UseOptionalMask = Begin,
  21. UseTopLength,
  22. SingleOptional,
  23. End
  24. };
  25. using TPackProperties = TEnumBitSet<EPackProps, EPackProps::Begin, EPackProps::End>;
  26. struct TPackerState {
  27. explicit TPackerState(TPackProperties&& properties)
  28. : Properties(std::move(properties))
  29. , OptionalMaskReserve(Properties.Test(EPackProps::UseOptionalMask) ? 1 : 0)
  30. {
  31. }
  32. const TPackProperties Properties;
  33. TPlainContainerCache TopStruct;
  34. TVector<TVector<std::pair<NUdf::TUnboxedValue, NUdf::TUnboxedValue>>> DictBuffers;
  35. TVector<TVector<std::tuple<NUdf::TUnboxedValue, NUdf::TUnboxedValue, NUdf::TUnboxedValue>>> EncodedDictBuffers;
  36. size_t OptionalMaskReserve;
  37. NDetails::TOptionalUsageMask OptionalUsageMask;
  38. };
  39. } // namespace NDetails
  40. template<bool Fast>
  41. class TValuePackerGeneric {
  42. public:
  43. using TSelf = TValuePackerGeneric<Fast>;
  44. TValuePackerGeneric(bool stable, const TType *type);
  45. // reference is valid till the next call to Pack()
  46. TStringBuf Pack(const NUdf::TUnboxedValuePod& value) const;
  47. NUdf::TUnboxedValue Unpack(TStringBuf buf, const THolderFactory& holderFactory) const;
  48. private:
  49. const bool Stable_;
  50. const TType* const Type_;
  51. // TODO: real thread safety with external state
  52. mutable TBuffer Buffer_;
  53. mutable NDetails::TPackerState State_;
  54. };
  55. template<bool Fast>
  56. class TValuePackerTransport {
  57. public:
  58. using TSelf = TValuePackerTransport<Fast>;
  59. explicit TValuePackerTransport(const TType* type, arrow::MemoryPool* pool = nullptr);
  60. // for compatibility with TValuePackerGeneric - stable packing is not supported
  61. TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* ppol = nullptr);
  62. // AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout
  63. TSelf& AddItem(const NUdf::TUnboxedValuePod& value);
  64. TSelf& AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 count);
  65. size_t PackedSizeEstimate() const {
  66. return IsBlock_ ? BlockBuffer_.Size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0);
  67. }
  68. bool IsEmpty() const {
  69. return !ItemCount_;
  70. }
  71. bool IsBlock() const {
  72. return IsBlock_;
  73. }
  74. void Clear();
  75. NYql::TChunkedBuffer Finish();
  76. // Pack()/Unpack() will pack/unpack single value of type T
  77. NYql::TChunkedBuffer Pack(const NUdf::TUnboxedValuePod& value) const;
  78. NUdf::TUnboxedValue Unpack(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory) const;
  79. void UnpackBatch(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
  80. private:
  81. void BuildMeta(TPagedBuffer::TPtr& buffer, bool addItemCount) const;
  82. void StartPack();
  83. void InitBlocks();
  84. TSelf& AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 count);
  85. NYql::TChunkedBuffer FinishBlocks();
  86. void UnpackBatchBlocks(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
  87. const TType* const Type_;
  88. ui64 ItemCount_ = 0;
  89. TPagedBuffer::TPtr Buffer_;
  90. mutable NDetails::TPackerState State_;
  91. mutable NDetails::TPackerState IncrementalState_;
  92. arrow::MemoryPool& ArrowPool_;
  93. bool IsBlock_ = false;
  94. bool IsLegacyBlock_ = false;
  95. ui32 BlockLenIndex_ = 0;
  96. TVector<std::unique_ptr<IBlockSerializer>> BlockSerializers_;
  97. TVector<std::unique_ptr<IBlockReader>> BlockReaders_;
  98. TVector<std::shared_ptr<arrow::ArrayData>> ConvertedScalars_;
  99. NYql::TChunkedBuffer BlockBuffer_;
  100. TVector<std::unique_ptr<IBlockDeserializer>> BlockDeserializers_;
  101. };
  102. using TValuePacker = TValuePackerGeneric<false>;
  103. class TValuePackerBoxed : public TComputationValue<TValuePackerBoxed>, public TValuePacker {
  104. typedef TComputationValue<TValuePackerBoxed> TBase;
  105. public:
  106. TValuePackerBoxed(TMemoryUsageInfo* memInfo, bool stable, const TType* type);
  107. };
  108. }
  109. }