mkql_block_trimmer.cpp 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. #include "mkql_block_trimmer.h"
  2. #include <yql/essentials/minikql/arrow/arrow_util.h>
  3. #include <yql/essentials/public/decimal/yql_decimal.h>
  4. #include <yql/essentials/public/udf/arrow/defs.h>
  5. #include <yql/essentials/public/udf/arrow/dispatch_traits.h>
  6. #include <yql/essentials/public/udf/arrow/util.h>
  7. #include <yql/essentials/public/udf/udf_type_inspection.h>
  8. #include <yql/essentials/public/udf/udf_value.h>
  9. #include <yql/essentials/public/udf/udf_value_builder.h>
  10. #include <yql/essentials/utils/yql_panic.h>
  11. #include <arrow/array/data.h>
  12. #include <arrow/datum.h>
  13. namespace NKikimr::NMiniKQL {
  14. class TBlockTrimmerBase : public IBlockTrimmer {
  15. protected:
  16. TBlockTrimmerBase(arrow::MemoryPool* pool)
  17. : Pool_(pool)
  18. {}
  19. TBlockTrimmerBase() = delete;
  20. std::shared_ptr<arrow::Buffer> TrimNullBitmap(const std::shared_ptr<arrow::ArrayData>& array) {
  21. auto& nullBitmapBuffer = array->buffers[0];
  22. std::shared_ptr<arrow::Buffer> result;
  23. auto nullCount = array->GetNullCount();
  24. if (nullCount == array->length) {
  25. result = MakeDenseFalseBitmap(array->length, Pool_);
  26. } else if (nullCount > 0) {
  27. result = MakeDenseBitmapCopy(nullBitmapBuffer->data(), array->length, array->offset, Pool_);
  28. }
  29. return result;
  30. }
  31. protected:
  32. arrow::MemoryPool* Pool_;
  33. };
  34. template<typename TLayout, bool Nullable>
  35. class TFixedSizeBlockTrimmer : public TBlockTrimmerBase {
  36. public:
  37. TFixedSizeBlockTrimmer(arrow::MemoryPool* pool)
  38. : TBlockTrimmerBase(pool)
  39. {}
  40. std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
  41. Y_ENSURE(array->buffers.size() == 2);
  42. Y_ENSURE(array->child_data.empty());
  43. std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
  44. if constexpr (Nullable) {
  45. trimmedNullBitmap = TrimNullBitmap(array);
  46. }
  47. auto origData = array->GetValues<TLayout>(1);
  48. auto dataSize = sizeof(TLayout) * array->length;
  49. auto trimmedDataBuffer = NUdf::AllocateResizableBuffer(dataSize, Pool_);
  50. memcpy(trimmedDataBuffer->mutable_data(), origData, dataSize);
  51. return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedDataBuffer)}, array->GetNullCount());
  52. }
  53. };
  54. template<bool Nullable>
  55. class TResourceBlockTrimmer : public TBlockTrimmerBase {
  56. public:
  57. TResourceBlockTrimmer(arrow::MemoryPool* pool)
  58. : TBlockTrimmerBase(pool)
  59. {}
  60. std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
  61. Y_ENSURE(array->buffers.size() == 2);
  62. Y_ENSURE(array->child_data.empty());
  63. std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
  64. if constexpr (Nullable) {
  65. trimmedNullBitmap = TrimNullBitmap(array);
  66. }
  67. auto origData = array->GetValues<NUdf::TUnboxedValue>(1);
  68. auto dataSize = sizeof(NUdf::TUnboxedValue) * array->length;
  69. auto trimmedBuffer = NUdf::AllocateResizableBuffer<NUdf::TResizableManagedBuffer<NUdf::TUnboxedValue>>(dataSize, Pool_);
  70. ARROW_OK(trimmedBuffer->Resize(dataSize));
  71. auto trimmedBufferData = reinterpret_cast<NUdf::TUnboxedValue*>(trimmedBuffer->mutable_data());
  72. for (int64_t i = 0; i < array->length; i++) {
  73. ::new(&trimmedBufferData[i]) NUdf::TUnboxedValue(origData[i]);
  74. }
  75. return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedBuffer)}, array->GetNullCount());
  76. }
  77. };
  78. template<typename TStringType, bool Nullable>
  79. class TStringBlockTrimmer : public TBlockTrimmerBase {
  80. using TOffset = typename TStringType::offset_type;
  81. public:
  82. TStringBlockTrimmer(arrow::MemoryPool* pool)
  83. : TBlockTrimmerBase(pool)
  84. {}
  85. std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
  86. Y_ENSURE(array->buffers.size() == 3);
  87. Y_ENSURE(array->child_data.empty());
  88. std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
  89. if constexpr (Nullable) {
  90. trimmedNullBitmap = TrimNullBitmap(array);
  91. }
  92. auto origOffsetData = array->GetValues<TOffset>(1);
  93. auto origStringData = reinterpret_cast<const char*>(array->buffers[2]->data() + origOffsetData[0]);
  94. auto stringDataSize = origOffsetData[array->length] - origOffsetData[0];
  95. auto trimmedOffsetBuffer = NUdf::AllocateResizableBuffer(sizeof(TOffset) * (array->length + 1), Pool_);
  96. auto trimmedStringBuffer = NUdf::AllocateResizableBuffer(stringDataSize, Pool_);
  97. auto trimmedOffsetBufferData = reinterpret_cast<TOffset*>(trimmedOffsetBuffer->mutable_data());
  98. auto trimmedStringBufferData = reinterpret_cast<char*>(trimmedStringBuffer->mutable_data());
  99. for (int64_t i = 0; i < array->length + 1; i++) {
  100. trimmedOffsetBufferData[i] = origOffsetData[i] - origOffsetData[0];
  101. }
  102. memcpy(trimmedStringBufferData, origStringData, stringDataSize);
  103. return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedOffsetBuffer), std::move(trimmedStringBuffer)}, array->GetNullCount());
  104. }
  105. };
  106. template<bool Nullable>
  107. class TTupleBlockTrimmer : public TBlockTrimmerBase {
  108. public:
  109. TTupleBlockTrimmer(std::vector<IBlockTrimmer::TPtr> children, arrow::MemoryPool* pool)
  110. : TBlockTrimmerBase(pool)
  111. , Children_(std::move(children))
  112. {}
  113. std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
  114. Y_ENSURE(array->buffers.size() == 1);
  115. std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
  116. if constexpr (Nullable) {
  117. trimmedNullBitmap = TrimNullBitmap(array);
  118. }
  119. std::vector<std::shared_ptr<arrow::ArrayData>> trimmedChildren;
  120. Y_ENSURE(array->child_data.size() == Children_.size());
  121. for (size_t i = 0; i < Children_.size(); i++) {
  122. trimmedChildren.push_back(Children_[i]->Trim(array->child_data[i]));
  123. }
  124. return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap)}, std::move(trimmedChildren), array->GetNullCount());
  125. }
  126. protected:
  127. TTupleBlockTrimmer(arrow::MemoryPool* pool)
  128. : TBlockTrimmerBase(pool)
  129. {}
  130. protected:
  131. std::vector<IBlockTrimmer::TPtr> Children_;
  132. };
  133. template<typename TDate, bool Nullable>
  134. class TTzDateBlockTrimmer : public TTupleBlockTrimmer<Nullable> {
  135. using TBase = TTupleBlockTrimmer<Nullable>;
  136. using TDateLayout = typename NUdf::TDataType<TDate>::TLayout;
  137. public:
  138. TTzDateBlockTrimmer(arrow::MemoryPool* pool)
  139. : TBase(pool)
  140. {
  141. this->Children_.push_back(std::make_unique<TFixedSizeBlockTrimmer<TDateLayout, false>>(pool));
  142. this->Children_.push_back(std::make_unique<TFixedSizeBlockTrimmer<ui16, false>>(pool));
  143. }
  144. };
  145. class TExternalOptionalBlockTrimmer : public TBlockTrimmerBase {
  146. public:
  147. TExternalOptionalBlockTrimmer(IBlockTrimmer::TPtr inner, arrow::MemoryPool* pool)
  148. : TBlockTrimmerBase(pool)
  149. , Inner_(std::move(inner))
  150. {}
  151. std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
  152. Y_ENSURE(array->buffers.size() == 1);
  153. Y_ENSURE(array->child_data.size() == 1);
  154. auto trimmedNullBitmap = TrimNullBitmap(array);
  155. auto trimmedInner = Inner_->Trim(array->child_data[0]);
  156. return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap)}, {std::move(trimmedInner)}, array->GetNullCount());
  157. }
  158. private:
  159. IBlockTrimmer::TPtr Inner_;
  160. };
  161. struct TTrimmerTraits {
  162. using TResult = IBlockTrimmer;
  163. template <bool Nullable>
  164. using TTuple = TTupleBlockTrimmer<Nullable>;
  165. template <typename T, bool Nullable>
  166. using TFixedSize = TFixedSizeBlockTrimmer<T, Nullable>;
  167. template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot>
  168. using TStrings = TStringBlockTrimmer<TStringType, Nullable>;
  169. using TExtOptional = TExternalOptionalBlockTrimmer;
  170. template<bool Nullable>
  171. using TResource = TResourceBlockTrimmer<Nullable>;
  172. template<typename TTzDate, bool Nullable>
  173. using TTzDateReader = TTzDateBlockTrimmer<TTzDate, Nullable>;
  174. constexpr static bool PassType = false;
  175. static TResult::TPtr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool* pool) {
  176. Y_UNUSED(pgBuilder);
  177. if (desc.PassByValue) {
  178. return std::make_unique<TFixedSize<ui64, true>>(pool);
  179. } else {
  180. return std::make_unique<TStrings<arrow::BinaryType, true, NKikimr::NUdf::EDataSlot::String>>(pool);
  181. }
  182. }
  183. static TResult::TPtr MakeResource(bool isOptional, arrow::MemoryPool* pool) {
  184. if (isOptional) {
  185. return std::make_unique<TResource<true>>(pool);
  186. } else {
  187. return std::make_unique<TResource<false>>(pool);
  188. }
  189. }
  190. template<typename TTzDate>
  191. static TResult::TPtr MakeTzDate(bool isOptional, arrow::MemoryPool* pool) {
  192. if (isOptional) {
  193. return std::make_unique<TTzDateReader<TTzDate, true>>(pool);
  194. } else {
  195. return std::make_unique<TTzDateReader<TTzDate, false>>(pool);
  196. }
  197. }
  198. };
  199. IBlockTrimmer::TPtr MakeBlockTrimmer(const NUdf::ITypeInfoHelper& typeInfoHelper, const NUdf::TType* type, arrow::MemoryPool* pool) {
  200. return DispatchByArrowTraits<TTrimmerTraits>(typeInfoHelper, type, nullptr, pool);
  201. }
  202. }