Browse Source

YQL-19299: Support Top-Level No-Optional Yson Entity

YQL-19299: Support Top-Level No-Optional Yson Entity
commit_hash:add10e09899db3798814dc8759fa851f6c3ed2a3
mrlolthe1st 2 months ago
parent
commit
3474e0dbe5

+ 187 - 294
yt/yql/providers/yt/codec/yt_arrow_converter.cpp

@@ -1,4 +1,5 @@
 #include "yt_arrow_converter.h"
 #include "yt_arrow_converter.h"
+#include "yt_arrow_converter_details.h"
 
 
 #include <yql/essentials/public/udf/arrow/defs.h>
 #include <yql/essentials/public/udf/arrow/defs.h>
 #include <yql/essentials/public/udf/arrow/block_builder.h>
 #include <yql/essentials/public/udf/arrow/block_builder.h>
@@ -20,166 +21,156 @@
 
 
 namespace NYql {
 namespace NYql {
 
 
-template<typename T>
-struct TypeHelper {
-    using Type = T;
+using namespace NKikimr::NMiniKQL;
+namespace {
+struct TYtColumnConverterSettings {
+    TYtColumnConverterSettings(TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative);
+    TType* Type;
+    const NUdf::IPgBuilder* PgBuilder;
+    arrow::MemoryPool& Pool;
+    const bool IsNative;
+    const bool IsTopOptional;
+    const bool IsTopLevelYson;
+    std::shared_ptr<arrow::DataType> ArrowType;
+    std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder;
 };
 };
+}
+
+std::string_view GetNotNullString(auto& data, i64 idx) {
+    i32 len;
+    auto ptr = reinterpret_cast<const char*>(data.GetValue(idx, &len));
+    return std::string_view(ptr, len);
+}
+
+using YTDictIndexType = ui32;
 
 
 #define GEN_TYPE(type)\
 #define GEN_TYPE(type)\
     NumericConverterImpl<arrow::type ## Type>
     NumericConverterImpl<arrow::type ## Type>
 
 
-#define GEN_TYPE_STR(type)\
-    StringConverterImpl<arrow::type ## Type>
+#define GEN_TYPE_STR(type, isTopLevelYson)\
+    StringConverterImpl<arrow::type ## Type, isTopLevelYson>
 
 
+template<typename T>
+Y_FORCE_INLINE void AddNumber(NUdf::IArrayBuilder* builder, T&& value) {
+    if constexpr (std::is_same_v<T, bool>) {
+        builder->Add(NUdf::TBlockItem((ui8)value));
+#if defined(_darwin_) && defined(_64_)
+    } else if constexpr (std::is_same_v<T, unsigned long long>) {
+        builder->Add(NUdf::TBlockItem((ui64)value));
+    } else if constexpr (std::is_same_v<T, long long>) {
+        builder->Add(NUdf::TBlockItem((i64)value));
+#endif
+    } else {
+        builder->Add(NUdf::TBlockItem(value));
+    }
+}
+
+// Unpack dictionary with right type
 template<typename T>
 template<typename T>
 arrow::Datum NumericConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr<arrow::ArrayData> block) {
 arrow::Datum NumericConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr<arrow::ArrayData> block) {
     arrow::DictionaryArray dict(block);
     arrow::DictionaryArray dict(block);
     typename ::arrow::TypeTraits<T>::ArrayType val(dict.dictionary()->data());
     typename ::arrow::TypeTraits<T>::ArrayType val(dict.dictionary()->data());
-    auto data = dict.indices()->data()->GetValues<ui32>(1);
+    auto data = dict.indices()->data()->GetValues<YTDictIndexType>(1);
     if (dict.null_count()) {
     if (dict.null_count()) {
         for (i64 i = 0; i < block->length; ++i) {
         for (i64 i = 0; i < block->length; ++i) {
             if (dict.IsNull(i)) {
             if (dict.IsNull(i)) {
                 builder->Add(NUdf::TBlockItem{});
                 builder->Add(NUdf::TBlockItem{});
             } else {
             } else {
-                if constexpr (std::is_same_v<decltype(val.Value(data[i])), bool>) {
-                    builder->Add(NUdf::TBlockItem((ui8)val.Value(data[i])));
-#if defined(_darwin_) && defined(_64_)
-                } else if constexpr (std::is_same_v<decltype(val.Value(data[i])), unsigned long long>) {
-                    builder->Add(NUdf::TBlockItem((ui64)val.Value(data[i])));
-                } else if constexpr (std::is_same_v<decltype(val.Value(data[i])), long long>) {
-                    builder->Add(NUdf::TBlockItem((i64)val.Value(data[i])));
-#endif
-                } else {
-                    builder->Add(NUdf::TBlockItem(val.Value(data[i])));
-                }
+                AddNumber(builder, val.Value(data[i]));
             }
             }
         }
         }
     } else {
     } else {
         for (i64 i = 0; i < block->length; ++i) {
         for (i64 i = 0; i < block->length; ++i) {
-            if constexpr (std::is_same_v<decltype(val.Value(data[i])), bool>) {
-                builder->Add(NUdf::TBlockItem((ui8)val.Value(data[i])));
-#if defined(_darwin_) && defined(_64_)
-            } else if constexpr (std::is_same_v<decltype(val.Value(data[i])), unsigned long long>) {
-                builder->Add(NUdf::TBlockItem((ui64)val.Value(data[i])));
-            } else if constexpr (std::is_same_v<decltype(val.Value(data[i])), long long>) {
-                builder->Add(NUdf::TBlockItem((i64)val.Value(data[i])));
-#endif
-            } else {
-                builder->Add(NUdf::TBlockItem(val.Value(data[i])));
-            }
+            AddNumber(builder, val.Value(data[i]));
         }
         }
     }
     }
     return builder->Build(false);
     return builder->Build(false);
 }
 }
 
 
-template<typename T>
+// There is no support of the non-optional Yson in YT now.
+// Because of it, isTopLevelYson must indicate whether a type we want is the top-level yson or no.
+// If it is, just put entity symbol string ("#") instead of empty TBlockItem
+template<typename T, bool IsTopLevelYson>
 arrow::Datum StringConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr<arrow::ArrayData> block) {
 arrow::Datum StringConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr<arrow::ArrayData> block) {
     arrow::DictionaryArray dict(block);
     arrow::DictionaryArray dict(block);
     typename ::arrow::TypeTraits<T>::ArrayType val(dict.dictionary()->data());
     typename ::arrow::TypeTraits<T>::ArrayType val(dict.dictionary()->data());
-    auto data = dict.indices()->data()->GetValues<ui32>(1);
+    auto data = dict.indices()->data()->GetValues<YTDictIndexType>(1);
     if (dict.null_count()) {
     if (dict.null_count()) {
         for (i64 i = 0; i < block->length; ++i) {
         for (i64 i = 0; i < block->length; ++i) {
             if (dict.IsNull(i)) {
             if (dict.IsNull(i)) {
-                builder->Add(NUdf::TBlockItem{});
+                if constexpr(IsTopLevelYson) {
+                    builder->Add(NUdf::TBlockItem(std::string_view("#")));
+                } else {
+                    builder->Add(NUdf::TBlockItem{});
+                }
             } else {
             } else {
-                i32 len;
-                auto ptr = reinterpret_cast<const char*>(val.GetValue(data[i], &len));
-                builder->Add(NUdf::TBlockItem(std::string_view(ptr, len)));
+                builder->Add(NUdf::TBlockItem(GetNotNullString(val, data[i])));
             }
             }
         }
         }
     } else {
     } else {
         for (i64 i = 0; i < block->length; ++i) {
         for (i64 i = 0; i < block->length; ++i) {
-                i32 len;
-                auto ptr = reinterpret_cast<const char*>(val.GetValue(data[i], &len));
-                builder->Add(NUdf::TBlockItem(std::string_view(ptr, len)));
+            builder->Add(NUdf::TBlockItem(GetNotNullString(val, data[i])));
         }
         }
     }
     }
     return builder->Build(false);
     return builder->Build(false);
 }
 }
 
 
-using namespace NKikimr::NMiniKQL;
-using namespace NYson::NDetail;
-
-class TYsonReaderDetails {
+template<bool IsDictionary, bool IsTopLevelYson>
+class TPrimitiveColumnConverter {
 public:
 public:
-    TYsonReaderDetails(const std::string_view& s) : Data_(s.data()), Available_(s.size()) {}
-
-    constexpr char Next() {
-        YQL_ENSURE(Available_-- > 0);
-        return *(++Data_);
-    }
-
-    constexpr char Current() {
-        return *Data_;
-    }
-
-    template<typename T>
-    constexpr T ReadVarSlow() {
-        T shift = 0;
-        T value = Current() & 0x7f;
-        for (;;) {
-            shift += 7;
-            value |= T(Next() & 0x7f) << shift;
-            if (!(Current() & 0x80)) {
-                break;
-            }
+    TPrimitiveColumnConverter(TYtColumnConverterSettings& settings) : Settings_(settings) {
+        if constexpr (IsDictionary) {
+            switch (Settings_.ArrowType->id()) {
+            case arrow::Type::BOOL:     PrimitiveConverterImpl_ = GEN_TYPE(Boolean); break;
+            case arrow::Type::INT8:     PrimitiveConverterImpl_ = GEN_TYPE(Int8); break;
+            case arrow::Type::UINT8:    PrimitiveConverterImpl_ = GEN_TYPE(UInt8); break;
+            case arrow::Type::INT16:    PrimitiveConverterImpl_ = GEN_TYPE(Int16); break;
+            case arrow::Type::UINT16:   PrimitiveConverterImpl_ = GEN_TYPE(UInt16); break;
+            case arrow::Type::INT32:    PrimitiveConverterImpl_ = GEN_TYPE(Int32); break;
+            case arrow::Type::UINT32:   PrimitiveConverterImpl_ = GEN_TYPE(UInt32); break;
+            case arrow::Type::INT64:    PrimitiveConverterImpl_ = GEN_TYPE(Int64); break;
+            case arrow::Type::UINT64:   PrimitiveConverterImpl_ = GEN_TYPE(UInt64); break;
+            case arrow::Type::DOUBLE:   PrimitiveConverterImpl_ = GEN_TYPE(Double); break;
+            case arrow::Type::FLOAT:    PrimitiveConverterImpl_ = GEN_TYPE(Float); break;
+            case arrow::Type::STRING:   PrimitiveConverterImpl_ = GEN_TYPE_STR(Binary, IsTopLevelYson); break; // all strings from yt are in binary format
+            case arrow::Type::BINARY:   PrimitiveConverterImpl_ = GEN_TYPE_STR(Binary, IsTopLevelYson); break;
+            default:
+                return; // will check in runtime
+            };
         }
         }
-        Next();
-        return value;
     }
     }
 
 
-    ui32 ReadVarUI32() {
-        char prev = Current();
-        if (Y_LIKELY(!(prev & 0x80))) {
-            Next();
-            return prev;
+    arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> block) {
+        if constexpr (IsDictionary) {
+            return PrimitiveConverterImpl_(Settings_.Builder.get(), block);
         }
         }
-
-        return ReadVarSlow<ui32>();
-    }
-
-    ui64 ReadVarUI64() {
-        char prev = Current();
-        if (Y_LIKELY(!(prev & 0x80))) {
-            Next();
-            return prev;
+        if constexpr (IsTopLevelYson) {
+            auto builder = Settings_.Builder.get();
+            arrow::BinaryArray binary(block);
+            if (binary.null_count()) {
+                for (int64_t i = 0; i < binary.length(); ++i) {
+                    if (binary.IsNull(i)) {
+                        builder->Add(NUdf::TBlockItem(std::string_view("#")));
+                    } else {
+                        builder->Add(NUdf::TBlockItem(GetNotNullString(binary, i)));
+                    }
+                }
+            } else {
+                for (int64_t i = 0; i < binary.length(); ++i) {
+                    builder->Add(NUdf::TBlockItem(GetNotNullString(binary, i)));
+                }
+            }
+            return builder->Build(false);
         }
         }
-
-        return ReadVarSlow<ui64>();
-    }
-
-    i32 ReadVarI32() {
-        return NYson::ZigZagDecode32(ReadVarUI32());
-    }
-
-    i64 ReadVarI64() {
-        return NYson::ZigZagDecode64(ReadVarUI64());
-    }
-
-    double NextDouble() {
-        double val = *reinterpret_cast<const double*>(Data_);
-        Data_ += sizeof(double);
-        return val;
-    }
-    
-    void Skip(i32 cnt) {
-        Data_ += cnt;
-    }
-
-    const char* Data() {
-        return Data_;
-    }
-    
-    size_t Available() const {
-        return Available_;
+        return block;
     }
     }
 private:
 private:
-    const char* Data_;
-    size_t Available_;
+    TYtColumnConverterSettings& Settings_;
+    arrow::Datum (*PrimitiveConverterImpl_)(NUdf::IArrayBuilder*, std::shared_ptr<arrow::ArrayData>);
 };
 };
 
 
 namespace {
 namespace {
-void SkipYson(TYsonReaderDetails& buf) {
+void SkipYson(TYsonBuffer& buf) {
     switch (buf.Current()) {
     switch (buf.Current()) {
     case BeginListSymbol: {
     case BeginListSymbol: {
         buf.Next();
         buf.Next();
@@ -223,6 +214,7 @@ void SkipYson(TYsonReaderDetails& buf) {
         buf.Next();
         buf.Next();
         Y_UNUSED(buf.ReadVarI64());
         Y_UNUSED(buf.ReadVarI64());
         break;
         break;
+    case EntitySymbol:
     case TrueMarker:
     case TrueMarker:
     case FalseMarker:
     case FalseMarker:
         buf.Next();
         buf.Next();
@@ -235,68 +227,30 @@ void SkipYson(TYsonReaderDetails& buf) {
         YQL_ENSURE(false, "Unexpected char: " + std::string{buf.Current()});
         YQL_ENSURE(false, "Unexpected char: " + std::string{buf.Current()});
     }
     }
 }
 }
+};
 
 
-NUdf::TBlockItem ReadYson(TYsonReaderDetails& buf) {
+NUdf::TBlockItem ReadYson(TYsonBuffer& buf) {
     const char* beg = buf.Data();
     const char* beg = buf.Data();
     SkipYson(buf);
     SkipYson(buf);
     return NUdf::TBlockItem(std::string_view(beg, buf.Data() - beg));
     return NUdf::TBlockItem(std::string_view(beg, buf.Data() - beg));
 }
 }
-};
-
-class IYsonBlockReader {
-public:
-    virtual NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) = 0;
-    virtual ~IYsonBlockReader() = default;
-};
-
-template<bool Native>
-class IYsonBlockReaderWithNativeFlag : public IYsonBlockReader {
-public:
-    virtual NUdf::TBlockItem GetNotNull(TYsonReaderDetails&) = 0;
-    NUdf::TBlockItem GetNullableItem(TYsonReaderDetails& buf) {
-        char prev = buf.Current();
-        if constexpr (Native) {
-            if (prev == EntitySymbol) {
-                buf.Next();
-                return NUdf::TBlockItem();
-            }
-            return GetNotNull(buf).MakeOptional();
-        }
-        buf.Next();
-        if (prev == EntitySymbol) {
-            return NUdf::TBlockItem();
-        }
-        YQL_ENSURE(prev == BeginListSymbol);
-        if (buf.Current() == EndListSymbol) {
-            buf.Next();
-            return NUdf::TBlockItem();
-        }
-        auto result = GetNotNull(buf);
-        if (buf.Current() == ListItemSeparatorSymbol) {
-            buf.Next();
-        }
-        YQL_ENSURE(buf.Current() == EndListSymbol);
-        buf.Next();
-        return result.MakeOptional();
-    }
-private:
-};
 
 
 template<bool Nullable, bool Native>
 template<bool Nullable, bool Native>
-class TYsonTupleBlockReader final : public IYsonBlockReaderWithNativeFlag<Native> {
+class TTupleYsonReader final : public IYsonComplexTypeReader<Native> {
 public:
 public:
-    TYsonTupleBlockReader(TVector<std::unique_ptr<IYsonBlockReader>>&& children)
+    using TIReaderPtr = std::unique_ptr<IYsonComplexTypeReader<Native>>;
+    TTupleYsonReader(TVector<TIReaderPtr>&& children)
         : Children_(std::move(children))
         : Children_(std::move(children))
         , Items_(Children_.size())
         , Items_(Children_.size())
     {}
     {}
 
 
-    NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final {
+    NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
         if constexpr (Nullable) {
         if constexpr (Nullable) {
             return this->GetNullableItem(buf);
             return this->GetNullableItem(buf);
         }
         }
         return GetNotNull(buf);
         return GetNotNull(buf);
     }
     }
-    NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final {
+    NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
         YQL_ENSURE(buf.Current() == BeginListSymbol);
         YQL_ENSURE(buf.Current() == BeginListSymbol);
         buf.Next();
         buf.Next();
         for (ui32 i = 0; i < Children_.size(); ++i) {
         for (ui32 i = 0; i < Children_.size(); ++i) {
@@ -310,21 +264,21 @@ public:
         return NUdf::TBlockItem(Items_.data());
         return NUdf::TBlockItem(Items_.data());
     }
     }
 private:
 private:
-    const TVector<std::unique_ptr<IYsonBlockReader>> Children_;
+    const TVector<TIReaderPtr> Children_;
     TVector<NUdf::TBlockItem> Items_;
     TVector<NUdf::TBlockItem> Items_;
 };
 };
 
 
 template<typename T, bool Nullable, NKikimr::NUdf::EDataSlot OriginalT, bool Native>
 template<typename T, bool Nullable, NKikimr::NUdf::EDataSlot OriginalT, bool Native>
-class TYsonStringBlockReader final : public IYsonBlockReaderWithNativeFlag<Native> {
+class TStringYsonReader final : public IYsonComplexTypeReader<Native> {
 public:
 public:
-    NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final {
+    NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
         if constexpr (Nullable) {
         if constexpr (Nullable) {
             return this->GetNullableItem(buf);
             return this->GetNullableItem(buf);
         }
         }
         return GetNotNull(buf);
         return GetNotNull(buf);
     }
     }
 
 
-    NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final {
+    NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
         if constexpr (NUdf::EDataSlot::Yson != OriginalT) {
         if constexpr (NUdf::EDataSlot::Yson != OriginalT) {
             YQL_ENSURE(buf.Current() == StringMarker);
             YQL_ENSURE(buf.Current() == StringMarker);
             buf.Next();
             buf.Next();
@@ -339,16 +293,16 @@ public:
 };
 };
 
 
 template<typename T, bool Nullable, bool Native>
 template<typename T, bool Nullable, bool Native>
-class TYsonTzDateBlockReader final : public IYsonBlockReaderWithNativeFlag<Native> {
+class TTzDateYsonReader final : public IYsonComplexTypeReader<Native> {
 public:
 public:
-    NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final {
+    NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
         if constexpr (Nullable) {
         if constexpr (Nullable) {
             return this->GetNullableItem(buf);
             return this->GetNullableItem(buf);
         }
         }
         return GetNotNull(buf);
         return GetNotNull(buf);
     }
     }
 
 
-    NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final {
+    NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
         using TLayout = typename NUdf::TDataType<T>::TLayout;
         using TLayout = typename NUdf::TDataType<T>::TLayout;
         size_t length = sizeof(TLayout) + sizeof(NUdf::TTimezoneId);
         size_t length = sizeof(TLayout) + sizeof(NUdf::TTimezoneId);
         Y_ASSERT(buf.Available() == length);
         Y_ASSERT(buf.Available() == length);
@@ -379,30 +333,17 @@ public:
     }
     }
 };
 };
 
 
-namespace {
-struct TYtColumnConverterSettings {
-    TYtColumnConverterSettings(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative);
-    NKikimr::NMiniKQL::TType* Type;
-    const NUdf::IPgBuilder* PgBuilder;
-    arrow::MemoryPool& Pool;
-    const bool IsNative;
-    const bool IsTopOptional;
-    std::shared_ptr<arrow::DataType> ArrowType;
-    std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder;
-};
-}
-
 template<typename T, bool Nullable, bool Native>
 template<typename T, bool Nullable, bool Native>
-class TYsonFixedSizeBlockReader final : public IYsonBlockReaderWithNativeFlag<Native> {
+class TFixedSizeYsonReader final : public IYsonComplexTypeReader<Native> {
 public:
 public:
-    NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final {
+    NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
         if constexpr (Nullable) {
         if constexpr (Nullable) {
             return this->GetNullableItem(buf);
             return this->GetNullableItem(buf);
         }
         }
         return GetNotNull(buf);
         return GetNotNull(buf);
     }
     }
 
 
-    NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final {
+    NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
         if constexpr (std::is_same_v<T, bool>) {
         if constexpr (std::is_same_v<T, bool>) {
             YQL_ENSURE(buf.Current() == FalseMarker || buf.Current() == TrueMarker);
             YQL_ENSURE(buf.Current() == FalseMarker || buf.Current() == TrueMarker);
             bool res = buf.Current() == TrueMarker;
             bool res = buf.Current() == TrueMarker;
@@ -430,7 +371,7 @@ public:
             }
             }
         } else if constexpr (std::is_floating_point_v<T>) {
         } else if constexpr (std::is_floating_point_v<T>) {
             YQL_ENSURE(buf.Current() == DoubleMarker);
             YQL_ENSURE(buf.Current() == DoubleMarker);
-            buf.Next();                
+            buf.Next();
             return NUdf::TBlockItem(T(buf.NextDouble()));
             return NUdf::TBlockItem(T(buf.NextDouble()));
         } else {
         } else {
             static_assert(std::is_floating_point_v<T>);
             static_assert(std::is_floating_point_v<T>);
@@ -439,13 +380,14 @@ public:
 };
 };
 
 
 template<bool Native>
 template<bool Native>
-class TYsonExternalOptBlockReader final : public IYsonBlockReaderWithNativeFlag<Native> {
+class TExternalOptYsonReader final : public IYsonComplexTypeReader<Native> {
 public:
 public:
-    TYsonExternalOptBlockReader(std::unique_ptr<IYsonBlockReader>&& inner)
-        : Inner_(std::move(inner))
+    using TIReaderPtr = std::unique_ptr<IYsonComplexTypeReader<Native>>;
+    TExternalOptYsonReader(TIReaderPtr&& inner)
+        : Underlying_(std::move(inner))
     {}
     {}
 
 
-    NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) final {
+    NUdf::TBlockItem GetItem(TYsonBuffer& buf) final {
         char prev = buf.Current();
         char prev = buf.Current();
         buf.Next();
         buf.Next();
         if (prev == EntitySymbol) {
         if (prev == EntitySymbol) {
@@ -458,7 +400,7 @@ public:
                 return NUdf::TBlockItem();
                 return NUdf::TBlockItem();
             }
             }
         }
         }
-        auto result = Inner_->GetItem(buf);
+        auto result = Underlying_->GetItem(buf);
         if (buf.Current() == ListItemSeparatorSymbol) {
         if (buf.Current() == ListItemSeparatorSymbol) {
             buf.Next();
             buf.Next();
         }
         }
@@ -467,183 +409,128 @@ public:
         return result.MakeOptional();
         return result.MakeOptional();
     }
     }
 
 
-    NUdf::TBlockItem GetNotNull(TYsonReaderDetails&) override final {
-        YQL_ENSURE(false, "Can't be called");
+    NUdf::TBlockItem GetNotNull(TYsonBuffer&) override final {
+        Y_ABORT("Can't be called");
     }
     }
 private:
 private:
-    std::unique_ptr<IYsonBlockReader> Inner_;
+    TIReaderPtr Underlying_;
 };
 };
 
 
 template<bool Native>
 template<bool Native>
-struct TYsonBlockReaderTraits {
-    using TResult = IYsonBlockReader;
+struct TComplexTypeYsonReaderTraits {
+    using TResult = IYsonComplexTypeReader<Native>;
     template <bool Nullable>
     template <bool Nullable>
-    using TTuple = TYsonTupleBlockReader<Nullable, Native>;
+    using TTuple = TTupleYsonReader<Nullable, Native>;
     // TODO: Implement reader for decimals
     // TODO: Implement reader for decimals
     template <typename T, bool Nullable, typename = std::enable_if_t<!std::is_same_v<T, NYql::NDecimal::TInt128> && (std::is_integral_v<T> || std::is_floating_point_v<T>)>>
     template <typename T, bool Nullable, typename = std::enable_if_t<!std::is_same_v<T, NYql::NDecimal::TInt128> && (std::is_integral_v<T> || std::is_floating_point_v<T>)>>
-    using TFixedSize = TYsonFixedSizeBlockReader<T, Nullable, Native>;
+    using TFixedSize = TFixedSizeYsonReader<T, Nullable, Native>;
     template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot OriginalT>
     template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot OriginalT>
-    using TStrings = TYsonStringBlockReader<TStringType, Nullable, OriginalT, Native>;
-    using TExtOptional = TYsonExternalOptBlockReader<Native>;
+    using TStrings = TStringYsonReader<TStringType, Nullable, OriginalT, Native>;
+    using TExtOptional = TExternalOptYsonReader<Native>;
 
 
     static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
     static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
-        Y_UNUSED(pgBuilder);
-        if (desc.PassByValue) {
-            return std::make_unique<TFixedSize<ui64, true>>();
-        } else {
-            return std::make_unique<TStrings<arrow::BinaryType, true, NKikimr::NUdf::EDataSlot::String>>();
-        }
+        ythrow yexception() << "Complex type Yson reader not implemented for block resources";
     }
     }
 
 
-    static std::unique_ptr<TResult> MakeResource(bool isOptional) {
-        Y_UNUSED(isOptional);
-        ythrow yexception() << "Yson reader not implemented for block resources";
-    }   
+    static std::unique_ptr<TResult> MakeResource(bool) {
+        ythrow yexception() << "Complex type Yson reader not implemented for block resources";
+    }
 
 
     template<typename TTzDate>
     template<typename TTzDate>
     static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
     static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
-        Y_UNUSED(isOptional);
         if (isOptional) {
         if (isOptional) {
-            using TTzDateReader = TYsonTzDateBlockReader<TTzDate, true, Native>;
+            using TTzDateReader = TTzDateYsonReader<TTzDate, true, Native>;
             return std::make_unique<TTzDateReader>();
             return std::make_unique<TTzDateReader>();
         } else {
         } else {
-            using TTzDateReader = TYsonTzDateBlockReader<TTzDate, false, Native>;
+            using TTzDateReader = TTzDateYsonReader<TTzDate, false, Native>;
             return std::make_unique<TTzDateReader>();
             return std::make_unique<TTzDateReader>();
         }
         }
-    }   
+    }
 };
 };
 
 
-template<bool IsDictionary>
-class TPrimitiveColumnConverter {
-public:
-    TPrimitiveColumnConverter(TYtColumnConverterSettings& settings) : Settings_(settings) {
-        if constexpr (IsDictionary) {
-            switch (Settings_.ArrowType->id()) {
-            case arrow::Type::BOOL:     PrimitiveConverterImpl_ = GEN_TYPE(Boolean); break;
-            case arrow::Type::INT8:     PrimitiveConverterImpl_ = GEN_TYPE(Int8); break;
-            case arrow::Type::UINT8:    PrimitiveConverterImpl_ = GEN_TYPE(UInt8); break;
-            case arrow::Type::INT16:    PrimitiveConverterImpl_ = GEN_TYPE(Int16); break;
-            case arrow::Type::UINT16:   PrimitiveConverterImpl_ = GEN_TYPE(UInt16); break;
-            case arrow::Type::INT32:    PrimitiveConverterImpl_ = GEN_TYPE(Int32); break;
-            case arrow::Type::UINT32:   PrimitiveConverterImpl_ = GEN_TYPE(UInt32); break;
-            case arrow::Type::INT64:    PrimitiveConverterImpl_ = GEN_TYPE(Int64); break;
-            case arrow::Type::UINT64:   PrimitiveConverterImpl_ = GEN_TYPE(UInt64); break;
-            case arrow::Type::DOUBLE:   PrimitiveConverterImpl_ = GEN_TYPE(Double); break;
-            case arrow::Type::FLOAT:    PrimitiveConverterImpl_ = GEN_TYPE(Float); break;
-            case arrow::Type::STRING:   PrimitiveConverterImpl_ = GEN_TYPE_STR(Binary); break; // all strings from yt is in binary format
-            case arrow::Type::BINARY:   PrimitiveConverterImpl_ = GEN_TYPE_STR(Binary); break;
-            default:
-                return; // will check in runtime
-            };
-        }
-    }
-    arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> block) {
-        if constexpr (IsDictionary) {
-            return PrimitiveConverterImpl_(Settings_.Builder.get(), block);
-        }
-        return block;
+template<bool Native, bool IsTopOptional>
+Y_FORCE_INLINE void AddFromYson(auto& reader, auto& builder, std::string_view yson) {
+    TYsonBuffer inp(yson);
+    auto res = reader->GetItem(inp);
+    if constexpr (!Native && IsTopOptional) {
+        res = res.MakeOptional();
     }
     }
-private:
-    TYtColumnConverterSettings& Settings_;
-    arrow::Datum (*PrimitiveConverterImpl_)(NUdf::IArrayBuilder*, std::shared_ptr<arrow::ArrayData>);
-};
+    builder->Add(std::move(res));
+}
 
 
 template<bool Native, bool IsTopOptional, bool IsDictionary>
 template<bool Native, bool IsTopOptional, bool IsDictionary>
-class TYtYsonColumnConverter {
+class TComplexTypeYsonColumnConverter {
 public:
 public:
-    TYtYsonColumnConverter(TYtColumnConverterSettings& settings) : Settings_(settings) {
-        Reader_ = NUdf::MakeBlockReaderImpl<TYsonBlockReaderTraits<Native>>(TTypeInfoHelper(), settings.Type, settings.PgBuilder);
+    TComplexTypeYsonColumnConverter(TYtColumnConverterSettings& settings) : Settings_(settings) {
+        Reader_ = NUdf::MakeBlockReaderImpl<TComplexTypeYsonReaderTraits<Native>>(TTypeInfoHelper(), settings.Type, settings.PgBuilder);
     }
     }
 
 
     arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> block) {
     arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> block) {
+        auto& builder = Settings_.Builder;
         if constexpr(!IsDictionary) {
         if constexpr(!IsDictionary) {
             arrow::BinaryArray binary(block);
             arrow::BinaryArray binary(block);
             if (block->GetNullCount()) {
             if (block->GetNullCount()) {
                 for (i64 i = 0; i < block->length; ++i) {
                 for (i64 i = 0; i < block->length; ++i) {
                     if (binary.IsNull(i)) {
                     if (binary.IsNull(i)) {
-                        Settings_.Builder->Add(NUdf::TBlockItem{});
+                        builder->Add(NUdf::TBlockItem{});
                     } else {
                     } else {
-                        i32 len;
-                        auto ptr = reinterpret_cast<const char*>(binary.GetValue(i, &len));
-                        TYsonReaderDetails inp(std::string_view(ptr, len));
-                        auto res = Reader_->GetItem(inp);
-                        if constexpr (!Native && IsTopOptional) {
-                            res = res.MakeOptional();
-                        }
-                        Settings_.Builder->Add(std::move(res));
+                        AddFromYson<Native, IsTopOptional>(Reader_, builder, GetNotNullString(binary, i));
                     }
                     }
                 }
                 }
             } else {
             } else {
                 for (i64 i = 0; i < block->length; ++i) {
                 for (i64 i = 0; i < block->length; ++i) {
-                    i32 len;
-                    auto ptr = reinterpret_cast<const char*>(binary.GetValue(i, &len));
-                    TYsonReaderDetails inp(std::string_view(ptr, len));
-                    auto res = Reader_->GetItem(inp);
-                    if constexpr (!Native && IsTopOptional) {
-                        res = res.MakeOptional();
-                    }
-                    Settings_.Builder->Add(std::move(res));
+                    AddFromYson<Native, IsTopOptional>(Reader_, builder, GetNotNullString(binary, i));
                 }
                 }
             }
             }
-            return Settings_.Builder->Build(false);
+            return builder->Build(false);
         }
         }
         arrow::DictionaryArray dict(block);
         arrow::DictionaryArray dict(block);
         arrow::BinaryArray binary(block->dictionary);
         arrow::BinaryArray binary(block->dictionary);
-        auto data = dict.indices()->data()->GetValues<ui32>(1);
+        auto data = dict.indices()->data()->GetValues<YTDictIndexType>(1);
         if (dict.null_count()) {
         if (dict.null_count()) {
             for (i64 i = 0; i < block->length; ++i) {
             for (i64 i = 0; i < block->length; ++i) {
                 if (dict.IsNull(i)) {
                 if (dict.IsNull(i)) {
                     Settings_.Builder->Add(NUdf::TBlockItem{});
                     Settings_.Builder->Add(NUdf::TBlockItem{});
                 } else {
                 } else {
-                    i32 len;
-                    auto ptr = reinterpret_cast<const char*>(binary.GetValue(data[i], &len));
-                    TYsonReaderDetails inp(std::string_view(ptr, len));
-                    auto res = Reader_->GetItem(inp);
-                    if constexpr (!Native && IsTopOptional) {
-                        res = res.MakeOptional();
-                    }
-                    Settings_.Builder->Add(std::move(res));
+                    AddFromYson<Native, IsTopOptional>(Reader_, builder, GetNotNullString(binary, data[i]));
                 }
                 }
             }
             }
         } else {
         } else {
             for (i64 i = 0; i < block->length; ++i) {
             for (i64 i = 0; i < block->length; ++i) {
-                i32 len;
-                auto ptr = reinterpret_cast<const char*>(binary.GetValue(data[i], &len));
-                TYsonReaderDetails inp(std::string_view(ptr, len));
-                auto res = Reader_->GetItem(inp);
-                if constexpr (!Native && IsTopOptional) {
-                    res = res.MakeOptional();
-                }
-                Settings_.Builder->Add(std::move(res));
+                AddFromYson<Native, IsTopOptional>(Reader_, builder, GetNotNullString(binary, data[i]));
             }
             }
         }
         }
         return Settings_.Builder->Build(false);
         return Settings_.Builder->Build(false);
     }
     }
 
 
 private:
 private:
-    std::shared_ptr<typename TYsonBlockReaderTraits<Native>::TResult> Reader_;
+    std::shared_ptr<typename TComplexTypeYsonReaderTraits<Native>::TResult> Reader_;
     TYtColumnConverterSettings& Settings_;
     TYtColumnConverterSettings& Settings_;
 };
 };
 
 
 template<bool Native, bool IsTopOptional>
 template<bool Native, bool IsTopOptional>
 class TYtColumnConverter final : public IYtColumnConverter {
 class TYtColumnConverter final : public IYtColumnConverter {
 public:
 public:
-    TYtColumnConverter(TYtColumnConverterSettings&& settings) 
+    TYtColumnConverter(TYtColumnConverterSettings&& settings)
         : Settings_(std::move(settings))
         : Settings_(std::move(settings))
         , DictYsonConverter_(Settings_)
         , DictYsonConverter_(Settings_)
         , YsonConverter_(Settings_)
         , YsonConverter_(Settings_)
         , DictPrimitiveConverter_(Settings_)
         , DictPrimitiveConverter_(Settings_)
+        , TopLevelYsonDictConverter_(Settings_)
+        , TopLevelYsonConverter_(Settings_)
     {
     {
         auto type = Settings_.Type;
         auto type = Settings_.Type;
-        IsJson_ = type->IsData() && AS_TYPE(TDataType, type)->GetDataSlot() == NUdf::EDataSlot::Json 
-            || (Native && type->IsOptional() && AS_TYPE(TOptionalType, type)->GetItemType()->IsData() 
+        IsJson_ = type->IsData() && AS_TYPE(TDataType, type)->GetDataSlot() == NUdf::EDataSlot::Json
+            || (Native && type->IsOptional() && AS_TYPE(TOptionalType, type)->GetItemType()->IsData()
             && AS_TYPE(TDataType, AS_TYPE(TOptionalType, type)->GetItemType())->GetDataSlot() == NUdf::EDataSlot::Json);
             && AS_TYPE(TDataType, AS_TYPE(TOptionalType, type)->GetItemType())->GetDataSlot() == NUdf::EDataSlot::Json);
     }
     }
 
 
     arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> block) override {
     arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> block) override {
         if (arrow::Type::DICTIONARY == block->type->id()) {
         if (arrow::Type::DICTIONARY == block->type->id()) {
             auto valType = static_cast<const arrow::DictionaryType&>(*block->type).value_type();
             auto valType = static_cast<const arrow::DictionaryType&>(*block->type).value_type();
-            if (valType->Equals(Settings_.ArrowType)) {
+            if (Settings_.IsTopLevelYson) {
+                return TopLevelYsonDictConverter_.Convert(block);
+            } else if (valType->Equals(Settings_.ArrowType)) {
                 // just unpack
                 // just unpack
                 return DictPrimitiveConverter_.Convert(block);
                 return DictPrimitiveConverter_.Convert(block);
             }  else if (arrow::Type::UINT8 == Settings_.ArrowType->id() && arrow::Type::BOOL == valType->id()) {
             }  else if (arrow::Type::UINT8 == Settings_.ArrowType->id() && arrow::Type::BOOL == valType->id()) {
@@ -651,8 +538,7 @@ public:
                 auto result = arrow::compute::Cast(DictPrimitiveConverter_.Convert(block), Settings_.ArrowType);
                 auto result = arrow::compute::Cast(DictPrimitiveConverter_.Convert(block), Settings_.ArrowType);
                 YQL_ENSURE(result.ok());
                 YQL_ENSURE(result.ok());
                 return *result;
                 return *result;
-            } else if (IsJson_ && arrow::Type::STRING == Settings_.ArrowType->id() && arrow::Type::BINARY == valType->id())
-            {
+            } else if (IsJson_ && arrow::Type::STRING == Settings_.ArrowType->id() && arrow::Type::BINARY == valType->id()) {
                 auto result = arrow::compute::Cast(DictPrimitiveConverter_.Convert(block), Settings_.ArrowType);
                 auto result = arrow::compute::Cast(DictPrimitiveConverter_.Convert(block), Settings_.ArrowType);
                 YQL_ENSURE(result.ok());
                 YQL_ENSURE(result.ok());
                 return *result;
                 return *result;
@@ -661,15 +547,15 @@ public:
             }
             }
         } else {
         } else {
             auto blockType = block->type;
             auto blockType = block->type;
-            auto noConvert = blockType->Equals(Settings_.ArrowType);
-            if (noConvert) {
+            if (Settings_.IsTopLevelYson) {
+                return TopLevelYsonConverter_.Convert(block);
+            } else if (blockType->Equals(Settings_.ArrowType)) {
                 return block;
                 return block;
             } else if (arrow::Type::UINT8 == Settings_.ArrowType->id() && arrow::Type::BOOL == blockType->id()) {
             } else if (arrow::Type::UINT8 == Settings_.ArrowType->id() && arrow::Type::BOOL == blockType->id()) {
                 auto result = arrow::compute::Cast(arrow::Datum(*block), Settings_.ArrowType);
                 auto result = arrow::compute::Cast(arrow::Datum(*block), Settings_.ArrowType);
                 YQL_ENSURE(result.ok());
                 YQL_ENSURE(result.ok());
                 return *result;
                 return *result;
-            } else if (IsJson_ && arrow::Type::STRING == Settings_.ArrowType->id() && arrow::Type::BINARY == blockType->id())
-            {
+            } else if (IsJson_ && arrow::Type::STRING == Settings_.ArrowType->id() && arrow::Type::BINARY == blockType->id()) {
                 auto result = arrow::compute::Cast(arrow::Datum(*block), Settings_.ArrowType);
                 auto result = arrow::compute::Cast(arrow::Datum(*block), Settings_.ArrowType);
                 YQL_ENSURE(result.ok());
                 YQL_ENSURE(result.ok());
                 return *result;
                 return *result;
@@ -681,18 +567,25 @@ public:
     }
     }
 private:
 private:
     TYtColumnConverterSettings Settings_;
     TYtColumnConverterSettings Settings_;
-    TYtYsonColumnConverter<Native, IsTopOptional, true> DictYsonConverter_;
-    TYtYsonColumnConverter<Native, IsTopOptional, false> YsonConverter_;
-    TPrimitiveColumnConverter<true> DictPrimitiveConverter_;
+    TComplexTypeYsonColumnConverter<Native, IsTopOptional, true> DictYsonConverter_;
+    TComplexTypeYsonColumnConverter<Native, IsTopOptional, false> YsonConverter_;
+    TPrimitiveColumnConverter<true, false> DictPrimitiveConverter_;
+    TPrimitiveColumnConverter<true, true> TopLevelYsonDictConverter_;
+    TPrimitiveColumnConverter<false, true> TopLevelYsonConverter_;
     bool IsJson_;
     bool IsJson_;
 };
 };
 
 
-TYtColumnConverterSettings::TYtColumnConverterSettings(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative) 
-    : Type(type), PgBuilder(pgBuilder), Pool(pool), IsNative(isNative), IsTopOptional(!isNative && type->IsOptional())
+TYtColumnConverterSettings::TYtColumnConverterSettings(TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative)
+    : Type(type)
+    , PgBuilder(pgBuilder)
+    , Pool(pool)
+    , IsNative(isNative)
+    , IsTopOptional(type->IsOptional())
+    , IsTopLevelYson(type->IsData() && static_cast<TDataType*>(Type)->GetDataSlot() == NUdf::EDataSlot::Yson)
 {
 {
     if (!isNative) {
     if (!isNative) {
         if (Type->IsOptional()) {
         if (Type->IsOptional()) {
-            Type = static_cast<NKikimr::NMiniKQL::TOptionalType*>(Type)->GetItemType();
+            Type = static_cast<TOptionalType*>(Type)->GetItemType();
         }
         }
     }
     }
     YQL_ENSURE(ConvertArrowType(type, ArrowType), "Can't convert type to arrow");
     YQL_ENSURE(ConvertArrowType(type, ArrowType), "Can't convert type to arrow");

+ 129 - 0
yt/yql/providers/yt/codec/yt_arrow_converter_details.h

@@ -0,0 +1,129 @@
+#pragma once
+
+#include <yql/essentials/public/udf/arrow/block_reader.h>
+
+#include <yql/essentials/utils/yql_panic.h>
+
+#include <library/cpp/yson/detail.h>
+#include <library/cpp/yson/varint.h>
+
+
+namespace NYql {
+
+using namespace NYson::NDetail;
+
+class TYsonBuffer {
+public:
+    TYsonBuffer(const std::string_view& s) : Data_(s.data()), Available_(s.size()) {}
+
+    constexpr char Next() {
+        YQL_ENSURE(Available_-- > 0);
+        return *(++Data_);
+    }
+
+    constexpr char Current() {
+        return *Data_;
+    }
+
+    ui32 ReadVarUI32() {
+        char prev = Current();
+        if (Y_LIKELY(!(prev & 0x80))) {
+            Next();
+            return prev;
+        }
+
+        return ReadVarSlow<ui32>();
+    }
+
+    ui64 ReadVarUI64() {
+        char prev = Current();
+        if (Y_LIKELY(!(prev & 0x80))) {
+            Next();
+            return prev;
+        }
+
+        return ReadVarSlow<ui64>();
+    }
+
+    i32 ReadVarI32() {
+        return NYson::ZigZagDecode32(ReadVarUI32());
+    }
+
+    i64 ReadVarI64() {
+        return NYson::ZigZagDecode64(ReadVarUI64());
+    }
+
+    double NextDouble() {
+        double val = *reinterpret_cast<const double*>(Data_);
+        Data_ += sizeof(double);
+        return val;
+    }
+
+    void Skip(i32 cnt) {
+        Data_ += cnt;
+    }
+
+    const char* Data() {
+        return Data_;
+    }
+
+    size_t Available() const {
+        return Available_;
+    }
+private:
+    template<typename T>
+    constexpr T ReadVarSlow() {
+        T shift = 0;
+        T value = Current() & 0x7f;
+        for (;;) {
+            shift += 7;
+            value |= T(Next() & 0x7f) << shift;
+            if (!(Current() & 0x80)) {
+                break;
+            }
+        }
+        Next();
+        return value;
+    }
+
+    const char* Data_;
+    size_t Available_;
+};
+
+
+template<bool Native>
+class IYsonComplexTypeReader {
+public:
+    using TIReaderPtr = std::unique_ptr<IYsonComplexTypeReader<Native>>;
+    virtual ~IYsonComplexTypeReader() = default;
+    virtual NUdf::TBlockItem GetItem(TYsonBuffer& buf) = 0;
+    virtual NUdf::TBlockItem GetNotNull(TYsonBuffer&) = 0;
+    NUdf::TBlockItem GetNullableItem(TYsonBuffer& buf) {
+        char prev = buf.Current();
+        if constexpr (Native) {
+            if (prev == EntitySymbol) {
+                buf.Next();
+                return NUdf::TBlockItem();
+            }
+            return GetNotNull(buf).MakeOptional();
+        }
+        buf.Next();
+        if (prev == EntitySymbol) {
+            return NUdf::TBlockItem();
+        }
+        YQL_ENSURE(prev == BeginListSymbol);
+        if (buf.Current() == EndListSymbol) {
+            buf.Next();
+            return NUdf::TBlockItem();
+        }
+        auto result = GetNotNull(buf);
+        if (buf.Current() == ListItemSeparatorSymbol) {
+            buf.Next();
+        }
+        YQL_ENSURE(buf.Current() == EndListSymbol);
+        buf.Next();
+        return result.MakeOptional();
+    }
+};
+
+}