Browse Source

Block trimmer and its usage in BlockMapJoinCore
commit_hash:568373541db82f01bd26ce36651f8dbb92a007e1

ziganshinmr 2 months ago
parent
commit
f3443af4eb

+ 2 - 0
yql/essentials/minikql/arrow/arrow_util.h

@@ -22,6 +22,8 @@ std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* it
 
 using NYql::NUdf::AllocateBitmapWithReserve;
 using NYql::NUdf::MakeDenseBitmap;
+using NYql::NUdf::MakeDenseBitmapCopy;
+using NYql::NUdf::MakeDenseFalseBitmap;
 
 inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index) {
     return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length };

+ 34 - 20
yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp

@@ -3,6 +3,7 @@
 #include <yql/essentials/minikql/computation/mkql_block_builder.h>
 #include <yql/essentials/minikql/computation/mkql_block_impl.h>
 #include <yql/essentials/minikql/computation/mkql_block_reader.h>
+#include <yql/essentials/minikql/computation/mkql_block_trimmer.h>
 #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
 #include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
 #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
@@ -329,12 +330,12 @@ public:
     public:
         TIterator() = default;
 
-        TIterator(TBlockIndex* blockIndex)
+        TIterator(const TBlockIndex* blockIndex)
             : Type_(EIteratorType::EMPTY)
             , BlockIndex_(blockIndex)
         {}
 
-        TIterator(TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+        TIterator(const TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
             : Type_(EIteratorType::INPLACE)
             , BlockIndex_(blockIndex)
             , Entry_(entry)
@@ -342,7 +343,7 @@ public:
             , ItemsToLookup_(std::move(itemsToLookup))
         {}
 
-        TIterator(TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+        TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
             : Type_(EIteratorType::LIST)
             , BlockIndex_(blockIndex)
             , Node_(node)
@@ -432,7 +433,7 @@ public:
 
     private:
         EIteratorType Type_;
-        TBlockIndex* BlockIndex_ = nullptr;
+        const TBlockIndex* BlockIndex_ = nullptr;
 
         union {
             TIndexNode* Node_;
@@ -451,7 +452,8 @@ public:
         const TVector<TType*>& itemTypes,
         const TVector<ui32>& keyColumns,
         NUdf::TUnboxedValue stream,
-        bool any
+        bool any,
+        arrow::MemoryPool* pool
     )
         : TBase(memInfo)
         , InputsDescr_(ToValueDescr(itemTypes))
@@ -466,6 +468,7 @@ public:
             Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
             Hashers_.push_back(helper.MakeHasher(blockItemType));
             Comparators_.push_back(helper.MakeComparator(blockItemType));
+            Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool));
         }
     }
 
@@ -484,7 +487,12 @@ public:
             for (size_t i = 0; i < Inputs_.size() - 1; i++) {
                 auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
                 ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
-                block.push_back(std::move(datum));
+                if (datum.is_scalar()) {
+                    block.push_back(datum);
+                } else {
+                    MKQL_ENSURE(datum.is_array(), "Expecting array");
+                    block.push_back(Trimmers_[i]->Trim(datum.array()));
+                }
             }
             Data_.push_back(std::move(block));
         }
@@ -565,7 +573,7 @@ public:
                 return;
             }
 
-            auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter));
+            auto value = static_cast<const TIndexMapValue*>(Index_.GetPayload(iter));
             if (value->IsInplace()) {
                 iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i]));
             } else {
@@ -574,23 +582,20 @@ public:
         });
     }
 
-    TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) {
+    TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) const {
         Y_ENSURE(entry.BlockOffset < Data_.size());
         Y_ENSURE(columnIdx < Inputs_.size() - 1);
-
-        auto& datum = Data_[entry.BlockOffset][columnIdx];
-        MKQL_ENSURE(datum.is_array(), "Expecting array");
-        return Readers_[columnIdx]->GetItem(*datum.array(), entry.ItemOffset);
+        return GetItemFromBlock(Data_[entry.BlockOffset], columnIdx, entry.ItemOffset);
     }
 
-    void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) {
+    void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
         Y_ENSURE(row.size() == ioMap.size());
         for (size_t i = 0; i < row.size(); i++) {
             row[i] = GetItem(entry, ioMap[i]);
         }
     }
 
-    bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) {
+    bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
         Y_ENSURE(keyItems.size() == KeyColumns_.size());
         for (size_t i = 0; i < KeyColumns_.size(); i++) {
             auto indexItem = GetItem(entry, KeyColumns_[i]);
@@ -607,10 +612,7 @@ private:
         ui64 keyHash = 0;
         keyItems.clear();
         for (ui32 keyColumn : KeyColumns_) {
-            auto& datum = block[keyColumn];
-            MKQL_ENSURE(datum.is_array(), "Expecting array");
-
-            auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset);
+            auto item = GetItemFromBlock(block, keyColumn, offset);
             if (!item) {
                 keyItems.clear();
                 return 0;
@@ -623,11 +625,21 @@ private:
         return keyHash;
     }
 
+    TBlockItem GetItemFromBlock(const std::vector<arrow::Datum>& block, ui32 columnIdx, size_t offset) const {
+        const auto& datum = block[columnIdx];
+        if (datum.is_scalar()) {
+            return Readers_[columnIdx]->GetScalarItem(*datum.scalar());
+        } else {
+            MKQL_ENSURE(datum.is_array(), "Expecting array");
+            return Readers_[columnIdx]->GetItem(*datum.array(), offset);
+        }
+    }
+
     TIndexNode* InsertIndexNode(TIndexEntry entry, TIndexNode* currentHead = nullptr) {
         return &IndexNodes_.emplace_back(entry, currentHead);
     }
 
-    bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) {
+    bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
         if (chain->IsInplace()) {
             return IsKeyEquals(chain->GetEntry(), keyItems);
         } else {
@@ -650,6 +662,7 @@ private:
     TVector<std::unique_ptr<IBlockReader>> Readers_;
     TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
     TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
+    TVector<IBlockTrimmer::TPtr> Trimmers_;
 
     std::vector<std::vector<arrow::Datum>> Data_;
 
@@ -705,7 +718,8 @@ public:
             RightItemTypes_,
             RightKeyColumns_,
             std::move(RightStream_->GetValue(ctx)),
-            RightAny
+            RightAny,
+            &ctx.ArrowMemoryPool
         );
 
         return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,

+ 64 - 7
yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp

@@ -96,19 +96,19 @@ TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind,
 NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup,
     TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops,
     TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny,
-    EJoinKind joinKind, size_t blockSize
+    EJoinKind joinKind, size_t blockSize, bool scalar
 ) {
     TProgramBuilder& pb = *setup.PgmBuilder;
 
     Y_ENSURE(leftType->IsList(), "Left node has to be list");
     const auto leftItemType = AS_TYPE(TListType, leftType)->GetItemType();
     Y_ENSURE(leftItemType->IsTuple(), "List item has to be tuple");
-    TType* leftBlockType = MakeBlockTupleType(pb, leftItemType);
+    TType* leftBlockType = MakeBlockTupleType(pb, leftItemType, scalar);
 
     Y_ENSURE(rightType->IsList(), "Right node has to be list");
     const auto rightItemType = AS_TYPE(TListType, rightType)->GetItemType();
     Y_ENSURE(rightItemType->IsTuple(), "Right item has to be tuple");
-    TType* rightBlockType = MakeBlockTupleType(pb, rightItemType);
+    TType* rightBlockType = MakeBlockTupleType(pb, rightItemType, scalar);
 
     TRuntimeNode leftList = pb.Arg(pb.NewListType(leftBlockType));
     TRuntimeNode rightList = pb.Arg(pb.NewListType(rightBlockType));
@@ -122,8 +122,18 @@ NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup,
     const auto graph = setup.BuildGraph(joinNode, {leftList.GetNode(), rightList.GetNode()});
 
     auto& ctx = graph->GetContext();
-    graph->GetEntryPoint(0, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue)));
-    graph->GetEntryPoint(1, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue)));
+
+    NUdf::TUnboxedValuePod leftBlockListValue, rightBlockListValue;
+    if (scalar) {
+        leftBlockListValue = MakeUint64ScalarBlock(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue));
+        rightBlockListValue = MakeUint64ScalarBlock(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue));
+    } else {
+        leftBlockListValue = ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue));
+        rightBlockListValue = ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue));
+    }
+
+    graph->GetEntryPoint(0, true)->SetValue(ctx, leftBlockListValue);
+    graph->GetEntryPoint(1, true)->SetValue(ctx, rightBlockListValue);
     return FromBlocks(ctx, AS_TYPE(TTupleType, joinItemType)->GetElements(), graph->GetValue());
 }
 
@@ -131,14 +141,15 @@ void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind,
     TType* expectedType, const NUdf::TUnboxedValue& expected,
     TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns,
     TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns,
-    const TVector<ui32>& leftKeyDrops = {}, const TVector<ui32>& rightKeyDrops = {}, bool rightAny = false
+    const TVector<ui32>& leftKeyDrops = {}, const TVector<ui32>& rightKeyDrops = {},
+    bool rightAny = false, bool scalar = false
 ) {
     const size_t testSize = leftListValue.GetListLength();
     for (size_t blockSize = 1; blockSize <= testSize; blockSize <<= 1) {
         const auto got = DoTestBlockJoin(setup,
             leftType, std::move(leftListValue), leftKeyColumns, leftKeyDrops,
             rightType, std::move(rightListValue), rightKeyColumns, rightKeyDrops, rightAny,
-            joinKind, blockSize
+            joinKind, blockSize, scalar
         );
         CompareResults(expectedType, expected, got);
     }
@@ -685,6 +696,52 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestBasic) {
         );
     }
 
+    Y_UNIT_TEST(TestScalar) {
+        TSetup<false> setup(GetNodeFactory());
+        const size_t testSize = 1 << 7;
+
+        // 1. Make input for the "left" stream.
+        TVector<ui64> leftKeyInit(testSize, 1);
+        TVector<ui64> leftSubkeyInit(testSize, 2);
+        TVector<ui64> leftValueInit(testSize, 3);
+
+        // 2. Make input for the "right" stream.
+        TVector<ui64> rightKeyInit(testSize, 1);
+        TVector<ui64> rightValueInit(testSize, 2);
+
+        // 3. Make "expected" data.
+        TMultiMap<ui64, ui64> rightMap;
+        for (size_t i = 0; i < testSize; i++) {
+            rightMap.insert({rightKeyInit[i], rightValueInit[i]});
+        }
+        TVector<ui64> expectedKey;
+        TVector<ui64> expectedSubkey;
+        TVector<ui64> expectedValue;
+        TVector<ui64> expectedRightValue;
+        for (size_t i = 0; i < testSize; i++) {
+            const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]);
+            for (auto it = begin; it != end; it++) {
+                expectedKey.push_back(leftKeyInit[i]);
+                expectedSubkey.push_back(leftSubkeyInit[i]);
+                expectedValue.push_back(leftValueInit[i]);
+                expectedRightValue.push_back(it->second);
+            }
+        }
+
+        auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+            leftKeyInit, leftSubkeyInit, leftValueInit);
+        auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+            rightKeyInit, rightValueInit);
+        auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+            expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
+        RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+                         leftType, std::move(leftList), {0},
+                         rightType, std::move(rightList), {0},
+                         {}, {0}, false, true
+        );
+    }
+
 } // Y_UNIT_TEST_SUITE
 
 Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestOptional) {

+ 33 - 2
yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp

@@ -137,7 +137,7 @@ IComputationNode* WrapWideStreamDethrottler(TCallable& callable, const TComputat
 
 }
 
-TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType) {
+TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool scalar) {
     const auto itemTypes = AS_TYPE(TTupleType, tupleType)->GetElements();
     const auto ui64Type = pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id);
     const auto blockLenType = pgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
@@ -145,7 +145,7 @@ TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType) {
     TVector<TType*> blockItemTypes;
     std::transform(itemTypes.cbegin(), itemTypes.cend(), std::back_inserter(blockItemTypes),
         [&](const auto& itemType) {
-            return pgmBuilder.NewBlockType(itemType, TBlockType::EShape::Many);
+            return pgmBuilder.NewBlockType(itemType, scalar ? TBlockType::EShape::Scalar : TBlockType::EShape::Many);
         });
     // XXX: Mind the last block length column.
     blockItemTypes.push_back(blockLenType);
@@ -204,6 +204,37 @@ NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize,
     return holderFactory.CreateDirectListHolder(std::move(listValues));
 }
 
+NUdf::TUnboxedValuePod MakeUint64ScalarBlock(TComputationContext& ctx, size_t blockSize,
+    const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
+) {
+    // Creates a block of scalar values using the first element of the given list
+
+    for (auto type : types) {
+        // Because IScalarBuilder has no implementations
+        Y_ENSURE(AS_TYPE(TDataType, type)->GetDataSlot() == NYql::NUdf::EDataSlot::Uint64);
+    }
+
+    const auto& holderFactory = ctx.HolderFactory;
+    const size_t width = types.size();
+    const size_t rowsCount = values.GetListLength();
+
+    NUdf::TUnboxedValue row;
+    Y_ENSURE(values.GetListIterator().Next(row));
+    TDefaultListRepresentation listValues;
+    for (size_t rowOffset = 0; rowOffset < rowsCount; rowOffset += blockSize) {
+        NUdf::TUnboxedValue* items = nullptr;
+        const auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items);
+        for (size_t i = 0; i < width; i++) {
+            const NUdf::TUnboxedValuePod& item = row.GetElement(i);
+            items[i] = holderFactory.CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(item.Get<ui64>())));
+        }
+        items[width] = MakeBlockCount(holderFactory, std::min(blockSize, rowsCount - rowOffset));
+        listValues = listValues.Append(std::move(tuple));
+    }
+
+    return holderFactory.CreateDirectListHolder(std::move(listValues));
+}
+
 NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx,
     const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
 ) {

+ 3 - 1
yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h

@@ -9,10 +9,12 @@ inline bool IsOptionalOrNull(const TType* type) {
     return type->IsOptional() || type->IsNull() || type->IsPg();
 }
 
-TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType);
+TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool scalar);
 
 NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize,
     const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values);
+NUdf::TUnboxedValuePod MakeUint64ScalarBlock(TComputationContext& ctx, size_t blockSize,
+    const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values);
 NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx,
     const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values);
 

+ 251 - 0
yql/essentials/minikql/computation/mkql_block_trimmer.cpp

@@ -0,0 +1,251 @@
+#include "mkql_block_trimmer.h"
+
+#include <yql/essentials/minikql/arrow/arrow_util.h>
+#include <yql/essentials/public/decimal/yql_decimal.h>
+#include <yql/essentials/public/udf/arrow/block_reader.h>
+#include <yql/essentials/public/udf/arrow/defs.h>
+#include <yql/essentials/public/udf/arrow/util.h>
+#include <yql/essentials/public/udf/udf_type_inspection.h>
+#include <yql/essentials/public/udf/udf_value.h>
+#include <yql/essentials/public/udf/udf_value_builder.h>
+#include <yql/essentials/utils/yql_panic.h>
+
+#include <arrow/array/data.h>
+#include <arrow/datum.h>
+
+namespace NKikimr::NMiniKQL {
+
+class TBlockTrimmerBase : public IBlockTrimmer {
+protected:
+    TBlockTrimmerBase(arrow::MemoryPool* pool)
+        : Pool_(pool)
+    {}
+
+    TBlockTrimmerBase() = delete;
+
+    std::shared_ptr<arrow::Buffer> TrimNullBitmap(const std::shared_ptr<arrow::ArrayData>& array) {
+        auto& nullBitmapBuffer = array->buffers[0];
+
+        std::shared_ptr<arrow::Buffer> result;
+        auto nullCount = array->GetNullCount();
+        if (nullCount == array->length) {
+            result = MakeDenseFalseBitmap(array->length, Pool_);
+        } else if (nullCount > 0) {
+            result = MakeDenseBitmapCopy(nullBitmapBuffer->data(), array->length, array->offset, Pool_);
+        }
+
+        return result;
+    }
+
+protected:
+    arrow::MemoryPool* Pool_;
+};
+
+template<typename TLayout, bool Nullable>
+class TFixedSizeBlockTrimmer : public TBlockTrimmerBase {
+public:
+    TFixedSizeBlockTrimmer(arrow::MemoryPool* pool)
+        : TBlockTrimmerBase(pool)
+    {}
+
+    std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
+        Y_ENSURE(array->buffers.size() == 2);
+        Y_ENSURE(array->child_data.empty());
+
+        std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
+        if constexpr (Nullable) {
+            trimmedNullBitmap = TrimNullBitmap(array);
+        }
+
+        auto origData = array->GetValues<TLayout>(1);
+        auto dataSize = sizeof(TLayout) * array->length;
+
+        auto trimmedDataBuffer = NUdf::AllocateResizableBuffer(dataSize, Pool_);
+        memcpy(trimmedDataBuffer->mutable_data(), origData, dataSize);
+
+        return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedDataBuffer)}, array->GetNullCount());
+    }
+};
+
+template<bool Nullable>
+class TResourceBlockTrimmer : public TBlockTrimmerBase {
+public:
+    TResourceBlockTrimmer(arrow::MemoryPool* pool)
+        : TBlockTrimmerBase(pool)
+    {}
+
+    std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
+        Y_ENSURE(array->buffers.size() == 2);
+        Y_ENSURE(array->child_data.empty());
+
+        std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
+        if constexpr (Nullable) {
+            trimmedNullBitmap = TrimNullBitmap(array);
+        }
+
+        auto origData = array->GetValues<NUdf::TUnboxedValue>(1);
+        auto dataSize = sizeof(NUdf::TUnboxedValue) * array->length;
+
+        auto trimmedBuffer = NUdf::AllocateResizableBuffer<NUdf::TResizableManagedBuffer<NUdf::TUnboxedValue>>(dataSize, Pool_);
+        ARROW_OK(trimmedBuffer->Resize(dataSize));
+        auto trimmedBufferData = reinterpret_cast<NUdf::TUnboxedValue*>(trimmedBuffer->mutable_data());
+
+        for (int64_t i = 0; i < array->length; i++) {
+            ::new(&trimmedBufferData[i]) NUdf::TUnboxedValue(origData[i]);
+        }
+
+        return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedBuffer)}, array->GetNullCount());
+    }
+};
+
+template<typename TStringType, bool Nullable>
+class TStringBlockTrimmer : public TBlockTrimmerBase {
+    using TOffset = typename TStringType::offset_type;
+
+public:
+    TStringBlockTrimmer(arrow::MemoryPool* pool)
+        : TBlockTrimmerBase(pool)
+    {}
+
+    std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
+        Y_ENSURE(array->buffers.size() == 3);
+        Y_ENSURE(array->child_data.empty());
+
+        std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
+        if constexpr (Nullable) {
+            trimmedNullBitmap = TrimNullBitmap(array);
+        }
+
+        auto origOffsetData = array->GetValues<TOffset>(1);
+        auto origStringData = reinterpret_cast<const char*>(array->buffers[2]->data() + origOffsetData[0]);
+        auto stringDataSize = origOffsetData[array->length] - origOffsetData[0];
+
+        auto trimmedOffsetBuffer = NUdf::AllocateResizableBuffer(sizeof(TOffset) * (array->length + 1), Pool_);
+        auto trimmedStringBuffer = NUdf::AllocateResizableBuffer(stringDataSize, Pool_);
+
+        auto trimmedOffsetBufferData = reinterpret_cast<TOffset*>(trimmedOffsetBuffer->mutable_data());
+        auto trimmedStringBufferData = reinterpret_cast<char*>(trimmedStringBuffer->mutable_data());
+
+        for (int64_t i = 0; i < array->length + 1; i++) {
+            trimmedOffsetBufferData[i] = origOffsetData[i] - origOffsetData[0];
+        }
+        memcpy(trimmedStringBufferData, origStringData, stringDataSize);
+
+        return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedOffsetBuffer), std::move(trimmedStringBuffer)}, array->GetNullCount());
+    }
+};
+
+template<bool Nullable>
+class TTupleBlockTrimmer : public TBlockTrimmerBase {
+public:
+    TTupleBlockTrimmer(std::vector<IBlockTrimmer::TPtr> children, arrow::MemoryPool* pool)
+        : TBlockTrimmerBase(pool)
+        , Children_(std::move(children))
+    {}
+
+    std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
+        Y_ENSURE(array->buffers.size() == 1);
+
+        std::shared_ptr<arrow::Buffer> trimmedNullBitmap;
+        if constexpr (Nullable) {
+            trimmedNullBitmap = TrimNullBitmap(array);
+        }
+
+        std::vector<std::shared_ptr<arrow::ArrayData>> trimmedChildren;
+        Y_ENSURE(array->child_data.size() == Children_.size());
+        for (size_t i = 0; i < Children_.size(); i++) {
+            trimmedChildren.push_back(Children_[i]->Trim(array->child_data[i]));
+        }
+
+        return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap)}, std::move(trimmedChildren), array->GetNullCount());
+    }
+
+protected:
+    TTupleBlockTrimmer(arrow::MemoryPool* pool)
+        : TBlockTrimmerBase(pool)
+    {}
+
+protected:
+    std::vector<IBlockTrimmer::TPtr> Children_;
+};
+
+template<typename TDate, bool Nullable>
+class TTzDateBlockTrimmer : public TTupleBlockTrimmer<Nullable> {
+    using TBase = TTupleBlockTrimmer<Nullable>;
+    using TDateLayout = typename NUdf::TDataType<TDate>::TLayout;
+
+public:
+    TTzDateBlockTrimmer(arrow::MemoryPool* pool)
+        : TBase(pool)
+    {
+        this->Children_.push_back(std::make_unique<TFixedSizeBlockTrimmer<TDateLayout, false>>(pool));
+        this->Children_.push_back(std::make_unique<TFixedSizeBlockTrimmer<ui16, false>>(pool));
+    }
+};
+
+class TExternalOptionalBlockTrimmer : public TBlockTrimmerBase {
+public:
+    TExternalOptionalBlockTrimmer(IBlockTrimmer::TPtr inner, arrow::MemoryPool* pool)
+        : TBlockTrimmerBase(pool)
+        , Inner_(std::move(inner))
+    {}
+
+    std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
+        Y_ENSURE(array->buffers.size() == 1);
+        Y_ENSURE(array->child_data.size() == 1);
+
+        auto trimmedNullBitmap = TrimNullBitmap(array);
+        auto trimmedInner = Inner_->Trim(array->child_data[0]);
+
+        return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap)}, {std::move(trimmedInner)}, array->GetNullCount());
+    }
+
+private:
+    IBlockTrimmer::TPtr Inner_;
+};
+
+struct TTrimmerTraits {
+    using TResult = IBlockTrimmer;
+    template <bool Nullable>
+    using TTuple = TTupleBlockTrimmer<Nullable>;
+    template <typename T, bool Nullable>
+    using TFixedSize = TFixedSizeBlockTrimmer<T, Nullable>;
+    template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot>
+    using TStrings = TStringBlockTrimmer<TStringType, Nullable>;
+    using TExtOptional = TExternalOptionalBlockTrimmer;
+    template<bool Nullable>
+    using TResource = TResourceBlockTrimmer<Nullable>;
+    template<typename TTzDate, bool Nullable>
+    using TTzDateReader = TTzDateBlockTrimmer<TTzDate, Nullable>;
+
+    static TResult::TPtr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool* pool) {
+        if (desc.PassByValue) {
+            return std::make_unique<TFixedSize<ui64, true>>(pool);
+        } else {
+            return std::make_unique<TStrings<arrow::BinaryType, true, NKikimr::NUdf::EDataSlot::String>>(pool);
+        }
+    }
+
+    static TResult::TPtr MakeResource(bool isOptional, arrow::MemoryPool* pool) {
+        if (isOptional) {
+            return std::make_unique<TResource<true>>(pool);
+        } else {
+            return std::make_unique<TResource<false>>(pool);
+        }
+    }
+
+    template<typename TTzDate>
+    static TResult::TPtr MakeTzDate(bool isOptional, arrow::MemoryPool* pool) {
+        if (isOptional) {
+            return std::make_unique<TTzDateReader<TTzDate, true>>(pool);
+        } else {
+            return std::make_unique<TTzDateReader<TTzDate, false>>(pool);
+        }
+    }
+};
+
+IBlockTrimmer::TPtr MakeBlockTrimmer(const NUdf::ITypeInfoHelper& typeInfoHelper, const NUdf::TType* type, arrow::MemoryPool* pool) {
+    return MakeBlockReaderImpl<TTrimmerTraits>(typeInfoHelper, type, nullptr, pool);
+}
+
+}

+ 21 - 0
yql/essentials/minikql/computation/mkql_block_trimmer.h

@@ -0,0 +1,21 @@
+#pragma once
+
+#include <util/generic/noncopyable.h>
+#include <yql/essentials/public/udf/udf_types.h>
+
+#include <arrow/type.h>
+
+namespace NKikimr::NMiniKQL {
+
+class IBlockTrimmer : private TNonCopyable {
+public:
+    using TPtr = std::unique_ptr<IBlockTrimmer>;
+
+    virtual ~IBlockTrimmer() = default;
+
+    virtual std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) = 0;
+};
+
+IBlockTrimmer::TPtr MakeBlockTrimmer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, arrow::MemoryPool* pool);
+
+}

+ 373 - 0
yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp

@@ -0,0 +1,373 @@
+#include "mkql_block_trimmer.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <yql/essentials/public/udf/arrow/block_builder.h>
+#include <yql/essentials/public/udf/arrow/memory_pool.h>
+#include <yql/essentials/minikql/mkql_type_builder.h>
+#include <yql/essentials/minikql/mkql_function_registry.h>
+#include <yql/essentials/minikql/mkql_program_builder.h>
+#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
+
+using namespace NYql::NUdf;
+using namespace NKikimr;
+
+struct TBlockTrimmerTestData {
+    TBlockTrimmerTestData()
+        : FunctionRegistry(NMiniKQL::CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry()))
+        , Alloc(__LOCATION__)
+        , Env(Alloc)
+        , PgmBuilder(Env, *FunctionRegistry)
+        , MemInfo("Memory")
+        , ArrowPool(GetYqlMemoryPool())
+    {
+    }
+
+    TIntrusivePtr<NMiniKQL::IFunctionRegistry> FunctionRegistry;
+    NMiniKQL::TScopedAlloc Alloc;
+    NMiniKQL::TTypeEnvironment Env;
+    NMiniKQL::TProgramBuilder PgmBuilder;
+    NMiniKQL::TMemoryUsageInfo MemInfo;
+    arrow::MemoryPool* const ArrowPool;
+};
+
+Y_UNIT_TEST_SUITE(TBlockTrimmerTest) {
+    Y_UNIT_TEST(TestFixedSize) {
+        TBlockTrimmerTestData data;
+
+        const auto int64Type = data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, false);
+
+        size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(int64Type);
+        size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+        Y_ENSURE(blockLen > 8);
+
+        constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64);
+        constexpr auto sliceSize = 1024;
+        static_assert(testSize % sliceSize == 0);
+
+        auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), int64Type, *data.ArrowPool, blockLen, nullptr);
+        auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), int64Type);
+        auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), int64Type, data.ArrowPool);
+
+        for (size_t i = 0; i < testSize; i++) {
+            builder->Add(TBlockItem(i));
+        }
+        auto datum = builder->Build(true);
+        Y_ENSURE(datum.is_array());
+        auto array = datum.array();
+
+        for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+            auto slice = Chop(array, sliceSize);
+            auto trimmedSlice = trimmer->Trim(slice);
+
+            for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+                TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+                TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+                UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim");
+            }
+        }
+    }
+
+    Y_UNIT_TEST(TestString) {
+        TBlockTrimmerTestData data;
+
+        const auto stringType = data.PgmBuilder.NewDataType(NUdf::EDataSlot::String, false);
+
+        size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(stringType);
+        size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+        Y_ENSURE(blockLen > 8);
+
+        // To fit all strings into single block
+        constexpr auto testSize = 512;
+        constexpr auto sliceSize = 128;
+        static_assert(testSize % sliceSize == 0);
+
+        auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), stringType, *data.ArrowPool, blockLen, nullptr);
+        auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), stringType);
+        auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), stringType, data.ArrowPool);
+
+        std::string testString;
+        testString.resize(testSize);
+        for (size_t i = 0; i < testSize; i++) {
+            testString[i] = static_cast<char>(i);
+            if (i % 2) {
+                builder->Add(TBlockItem(TStringRef(testString.data(), i + 1)));
+            } else {
+                // Empty string
+                builder->Add(TBlockItem(TStringRef()));
+            }
+        }
+        auto datum = builder->Build(true);
+        Y_ENSURE(datum.is_array());
+        auto array = datum.array();
+
+        for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+            auto slice = Chop(array, sliceSize);
+            auto trimmedSlice = trimmer->Trim(slice);
+
+            for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+                TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+                TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+                UNIT_ASSERT_VALUES_EQUAL_C(lhs.AsStringRef(), rhs.AsStringRef(), "Expected the same data after trim");
+            }
+        }
+    }
+
+    Y_UNIT_TEST(TestOptional) {
+        TBlockTrimmerTestData data;
+
+        const auto optionalInt64Type = data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true);
+
+        size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(optionalInt64Type);
+        size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+        Y_ENSURE(blockLen > 8);
+
+        constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64);
+        constexpr auto sliceSize = 1024;
+        static_assert(testSize % sliceSize == 0);
+
+        auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), optionalInt64Type, *data.ArrowPool, blockLen, nullptr);
+        auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), optionalInt64Type);
+        auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), optionalInt64Type, data.ArrowPool);
+
+        for (size_t i = 0; i < testSize; i++) {
+            if (i % 2) {
+                builder->Add(TBlockItem());
+            } else {
+                builder->Add(TBlockItem(i));
+            }
+        }
+        auto datum = builder->Build(true);
+        Y_ENSURE(datum.is_array());
+        auto array = datum.array();
+
+        for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+            auto slice = Chop(array, sliceSize);
+            auto trimmedSlice = trimmer->Trim(slice);
+
+            for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+                TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+                TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+                UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs), bool(rhs), "Expected the same optionality after trim");
+
+                if (lhs) {
+                    UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim");
+                }
+            }
+        }
+    }
+
+    Y_UNIT_TEST(TestExternalOptional) {
+        TBlockTrimmerTestData data;
+
+        const auto doubleOptInt64Type = data.PgmBuilder.NewOptionalType(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true));
+
+        size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(doubleOptInt64Type);
+        size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+        Y_ENSURE(blockLen > 8);
+
+        constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64);
+        constexpr auto sliceSize = 1024;
+        static_assert(testSize % sliceSize == 0);
+
+        auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type, *data.ArrowPool, blockLen, nullptr);
+        auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type);
+        auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type, data.ArrowPool);
+
+        for (size_t i = 0; i < testSize; i++) {
+            if (i % 2) {
+                builder->Add(TBlockItem(i).MakeOptional());
+            } else if (i % 4) {
+                builder->Add(TBlockItem());
+            } else {
+                builder->Add(TBlockItem().MakeOptional());
+            }
+        }
+        auto datum = builder->Build(true);
+        Y_ENSURE(datum.is_array());
+        auto array = datum.array();
+
+        for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+            auto slice = Chop(array, sliceSize);
+            auto trimmedSlice = trimmer->Trim(slice);
+
+            for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+                TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+                TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+
+                for (size_t i = 0; i < 2; i++) {
+                    UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs), bool(rhs), "Expected the same optionality after trim");
+                    if (!lhs) {
+                        break;
+                    }
+
+                    lhs = lhs.GetOptionalValue();
+                    rhs = rhs.GetOptionalValue();
+                }
+
+                if (lhs) {
+                    UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim");
+                }
+            }
+        }
+    }
+
+    Y_UNIT_TEST(TestTuple) {
+        TBlockTrimmerTestData data;
+
+        std::vector<NMiniKQL::TType*> types;
+        types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64));
+        types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::String));
+        types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true));
+        const auto tupleType = data.PgmBuilder.NewTupleType(types);
+
+        size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(tupleType);
+        size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+        Y_ENSURE(blockLen > 8);
+
+        // To fit all strings into single block
+        constexpr auto testSize = 512;
+        constexpr auto sliceSize = 128;
+        static_assert(testSize % sliceSize == 0);
+
+        auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tupleType, *data.ArrowPool, blockLen, nullptr);
+        auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), tupleType);
+        auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), tupleType, data.ArrowPool);
+
+        std::string testString;
+        testString.resize(testSize);
+        std::vector<TBlockItem*> testTuples(testSize);
+        for (size_t i = 0; i < testSize; i++) {
+            testString[i] = static_cast<char>(i);
+
+            TBlockItem* tupleItems = new TBlockItem[3];
+            testTuples.push_back(tupleItems);
+            tupleItems[0] = TBlockItem(i);
+            tupleItems[1] = TBlockItem(TStringRef(testString.data(), i + 1));
+            tupleItems[2] = i % 2 ? TBlockItem(i) : TBlockItem();
+
+            builder->Add(TBlockItem(tupleItems));
+        }
+        auto datum = builder->Build(true);
+        Y_ENSURE(datum.is_array());
+        auto array = datum.array();
+
+        for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+            auto slice = Chop(array, sliceSize);
+            auto trimmedSlice = trimmer->Trim(slice);
+
+            for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+                TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+                TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+
+                UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(0).Get<i64>(), rhs.GetElement(0).Get<i64>(), "Expected the same data after trim");
+                UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(1).AsStringRef(), rhs.GetElement(1).AsStringRef(), "Expected the same data after trim");
+                UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs.GetElement(2)), bool(rhs.GetElement(2)), "Expected the same optionality after trim");
+                if (bool(lhs.GetElement(2))) {
+                    UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(2).Get<i64>(), rhs.GetElement(2).Get<i64>(), "Expected the same data after trim");
+                }
+            }
+        }
+
+        for (auto tupleItems : testTuples) {
+            delete[] tupleItems;
+        }
+    }
+
+    Y_UNIT_TEST(TestTzDate) {
+        TBlockTrimmerTestData data;
+        using TDtLayout = TDataType<TTzDatetime>::TLayout;
+
+        const auto tzDatetimeType = data.PgmBuilder.NewDataType(NUdf::EDataSlot::TzDatetime, false);
+
+        size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(tzDatetimeType);
+        size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+        Y_ENSURE(blockLen > 8);
+
+        constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / (sizeof(TDtLayout) + sizeof(ui16));
+        constexpr auto sliceSize = 1024;
+        static_assert(testSize % sliceSize == 0);
+
+        auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDatetimeType, *data.ArrowPool, blockLen, nullptr);
+        auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), tzDatetimeType);
+        auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), tzDatetimeType, data.ArrowPool);
+
+        for (size_t i = 0; i < testSize; i++) {
+            TBlockItem dt = TBlockItem(i);
+            dt.SetTimezoneId(i * 2);
+            builder->Add(dt);
+        }
+        auto datum = builder->Build(true);
+        Y_ENSURE(datum.is_array());
+        auto array = datum.array();
+
+        for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+            auto slice = Chop(array, sliceSize);
+            auto trimmedSlice = trimmer->Trim(slice);
+
+            for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+                TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+                TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+                UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<TDtLayout>(), rhs.Get<TDtLayout>(), "Expected the same data after trim");
+                UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetTimezoneId(), rhs.GetTimezoneId(), "Expected the same data after trim");
+            }
+        }
+    }
+
+    extern const char ResourceName[] = "Resource.Name";
+    Y_UNIT_TEST(TestResource) {
+        TBlockTrimmerTestData data;
+
+        const auto resourceType = data.PgmBuilder.NewResourceType(ResourceName);
+
+        size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(resourceType);
+        size_t blockLen = NMiniKQL::CalcBlockLen(itemSize);
+        Y_ENSURE(blockLen > 8);
+
+        constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(TUnboxedValue);
+        constexpr auto sliceSize = 1024;
+        static_assert(testSize % sliceSize == 0);
+
+        auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), resourceType, *data.ArrowPool, blockLen, nullptr);
+        auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), resourceType);
+        auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), resourceType, data.ArrowPool);
+
+        struct TWithDtor {
+            int Payload;
+            std::shared_ptr<int> DestructorCallsCnt;
+            TWithDtor(int payload, std::shared_ptr<int> destructorCallsCnt):
+                Payload(payload), DestructorCallsCnt(std::move(destructorCallsCnt)) {
+            }
+            ~TWithDtor() {
+                *DestructorCallsCnt = *DestructorCallsCnt + 1;
+            }
+        };
+        using TTestResource = TBoxedResource<TWithDtor, ResourceName>;
+
+        auto destructorCallsCnt = std::make_shared<int>(0);
+        {
+            for (size_t i = 0; i < testSize; i++) {
+                builder->Add(TUnboxedValuePod(new TTestResource(i, destructorCallsCnt)));
+            }
+            auto datum = builder->Build(true);
+            Y_ENSURE(datum.is_array());
+            auto array = datum.array();
+
+            for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
+                auto slice = Chop(array, sliceSize);
+                auto trimmedSlice = trimmer->Trim(slice);
+
+                for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
+                    TBlockItem lhs = reader->GetItem(*slice, elemIdx);
+                    TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx);
+
+                    auto lhsResource = reinterpret_cast<TTestResource*>(lhs.GetBoxed().Get());
+                    auto rhsResource = reinterpret_cast<TTestResource*>(rhs.GetBoxed().Get());
+                    UNIT_ASSERT_VALUES_EQUAL_C(lhsResource->Get()->Payload, rhsResource->Get()->Payload, "Expected the same data after trim");
+                }
+            }
+        }
+
+        UNIT_ASSERT_VALUES_EQUAL_C(*destructorCallsCnt, testSize, "Expected 1 call to resource destructor");
+    }
+}

+ 1 - 0
yql/essentials/minikql/computation/ut/ya.make.inc

@@ -12,6 +12,7 @@ ENDIF()
 SRCDIR(yql/essentials/minikql/computation)
 
 SRCS(
+    mkql_block_trimmer_ut.cpp
     mkql_computation_node_holders_ut.cpp
     mkql_computation_node_pack_ut.cpp
     mkql_computation_node_list_ut.cpp

+ 2 - 0
yql/essentials/minikql/computation/ya.make

@@ -5,6 +5,7 @@ SRCS(
     mkql_block_impl.cpp
     mkql_block_reader.cpp
     mkql_block_transport.cpp
+    mkql_block_trimmer.cpp
     mkql_computation_node.cpp
     mkql_computation_node_holders.cpp
     mkql_computation_node_impl.cpp
@@ -22,6 +23,7 @@ PEERDIR(
     yql/essentials/public/types
     yql/essentials/parser/pg_wrapper/interface
     yql/essentials/public/udf
+    yql/essentials/public/udf/arrow
     yql/essentials/minikql/arrow
 )
 

Some files were not shown because too many files changed in this diff