#include "pg_compat.h" #include "arrow.h" #include "arrow_impl.h" #include #include #include #include #include #include #include #include #include #include #include #include extern "C" { #include "utils/date.h" #include "utils/timestamp.h" #include "utils/fmgrprotos.h" } namespace NYql { extern "C" { Y_PRAGMA_DIAGNOSTIC_PUSH Y_PRAGMA("GCC diagnostic ignored \"-Wreturn-type-c-linkage\"") #include "pg_kernels_fwd.inc" Y_PRAGMA_DIAGNOSTIC_POP } struct TExecs { static TExecs& Instance() { return *Singleton(); } TExecs(); THashMap Table; }; TExecFunc FindExec(Oid oid) { const auto& table = TExecs::Instance().Table; auto it = table.find(oid); if (it == table.end()) { return nullptr; } return it->second; } bool HasPgKernel(ui32 procOid) { return FindExec(procOid) != nullptr; } TExecs::TExecs() { #define RegisterExec(oid, func) Table[oid] = func #include "pg_kernels_register.all.inc" #undef RegisterExec } const NPg::TAggregateDesc& ResolveAggregation(const TString& name, NKikimr::NMiniKQL::TTupleType* tupleType, const std::vector& argsColumns, NKikimr::NMiniKQL::TType* returnType, ui32 hint) { using namespace NKikimr::NMiniKQL; if (returnType) { MKQL_ENSURE(argsColumns.size() == 1, "Expected one column"); TType* stateType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]))->GetItemType(); TType* returnItemType = AS_TYPE(TBlockType, returnType)->GetItemType(); return NPg::LookupAggregation(name + "#" + ToString(hint), AS_TYPE(TPgType, stateType)->GetTypeId(), AS_TYPE(TPgType, returnItemType)->GetTypeId()); } else { TVector argTypeIds; for (const auto col : argsColumns) { argTypeIds.push_back(AS_TYPE(TPgType, AS_TYPE(TBlockType, tupleType->GetElementType(col))->GetItemType())->GetTypeId()); } return NPg::LookupAggregation(name, argTypeIds); } } std::shared_ptr PgConvertBool(const std::shared_ptr& value) { const auto& data = value->data(); size_t length = data->length; NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length); auto input = data->GetValues(1, 0); builder.UnsafeReserve(length); auto output = builder.MutableData(); for (size_t i = 0; i < length; ++i) { auto fullIndex = i + data->offset; output[i] = BoolGetDatum(arrow::BitUtil::GetBit(input, fullIndex)); } auto dataBuffer = builder.Build(true).array()->buffers[1]; return arrow::MakeArray(arrow::ArrayData::Make(arrow::uint64(), length, { data->buffers[0], dataBuffer })); } template std::shared_ptr PgConvertFixed(const std::shared_ptr& value, const F& f) { const auto& data = value->data(); size_t length = data->length; NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length); auto input = data->GetValues(1); builder.UnsafeReserve(length); auto output = builder.MutableData(); for (size_t i = 0; i < length; ++i) { output[i] = f(input[i]); } auto dataBuffer = builder.Build(true).array()->buffers[1]; return arrow::MakeArray(arrow::ArrayData::Make(arrow::uint64(), length, { data->buffers[0], dataBuffer })); } template std::shared_ptr PgConvertString(const std::shared_ptr& value) { const auto& data = value->data(); size_t length = data->length; arrow::BinaryBuilder builder; ARROW_OK(builder.Reserve(length)); auto inputDataSize = arrow::BinaryArray(data).total_values_length(); ARROW_OK(builder.ReserveData(inputDataSize + length * (sizeof(void*) + (IsCString ? 1 : VARHDRSZ)))); NUdf::TStringBlockReader reader; std::vector tmp; for (size_t i = 0; i < length; ++i) { auto item = reader.GetItem(*data, i); if (!item) { ARROW_OK(builder.AppendNull()); continue; } auto originalLen = item.AsStringRef().Size(); ui32 len; if constexpr (IsCString) { len = sizeof(void*) + 1 + originalLen; } else { len = sizeof(void*) + VARHDRSZ + originalLen; } if (Y_UNLIKELY(len < originalLen)) { ythrow yexception() << "Too long string"; } if (tmp.capacity() < len) { tmp.reserve(Max(len, tmp.capacity() * 2)); } tmp.resize(len); NUdf::ZeroMemoryContext(tmp.data() + sizeof(void*)); if constexpr (IsCString) { memcpy(tmp.data() + sizeof(void*), item.AsStringRef().Data(), originalLen); tmp[len - 1] = 0; } else { memcpy(tmp.data() + sizeof(void*) + VARHDRSZ, item.AsStringRef().Data(), originalLen); UpdateCleanVarSize((text*)(tmp.data() + sizeof(void*)), originalLen); } ARROW_OK(builder.Append(tmp.data(), len)); } std::shared_ptr ret; ARROW_OK(builder.Finish(&ret)); return ret; } Numeric Uint64ToPgNumeric(ui64 value) { if (value <= (ui64)Max()) { return int64_to_numeric((i64)value); } auto ret1 = int64_to_numeric((i64)(value & ~(1ull << 63))); auto bit = int64_to_numeric(Min()); bool haveError = false; auto ret2 = numeric_sub_opt_error(ret1, bit, &haveError); Y_ENSURE(!haveError); pfree(ret1); pfree(bit); return ret2; } Numeric DecimalToPgNumeric(const NUdf::TUnboxedValuePod& value, ui8 precision, ui8 scale) { const auto str = NYql::NDecimal::ToString(value.GetInt128(), precision, scale); Y_ENSURE(str); return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID, PointerGetDatum(str), Int32GetDatum(0), Int32GetDatum(-1)); } Numeric DyNumberToPgNumeric(const NUdf::TUnboxedValuePod& value) { auto str = NKikimr::NDyNumber::DyNumberToString(value.AsStringRef()); Y_ENSURE(str); return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID, PointerGetDatum(str->c_str()), Int32GetDatum(0), Int32GetDatum(-1)); } Numeric PgFloatToNumeric(double item, ui64 scale, int digits) { double intPart, fracPart; bool error; fracPart = modf(item, &intPart); i64 fracInt = round(fracPart * scale); // scale compaction: represent 711.56000 as 711.56 while (digits > 0 && fracInt % 10 == 0) { fracInt /= 10; digits -= 1; } if (digits == 0) { return int64_to_numeric(intPart); } else { return numeric_add_opt_error( int64_to_numeric(intPart), int64_div_fast_to_numeric(fracInt, digits), &error); } } std::shared_ptr PgDecimal128ConvertNumeric(const std::shared_ptr& value, int32_t precision, int32_t scale) { TArenaMemoryContext arena; const auto& data = value->data(); size_t length = data->length; arrow::BinaryBuilder builder; bool error; Numeric high_bits_mul = numeric_mul_opt_error(int64_to_numeric(int64_t(1) << 62), int64_to_numeric(4), &error); auto input = data->GetValues(1); for (size_t i = 0; i < length; ++i) { if (value->IsNull(i)) { ARROW_OK(builder.AppendNull()); continue; } Numeric v = PgDecimal128ToNumeric(input[i], precision, scale, high_bits_mul); auto datum = NumericGetDatum(v); auto ptr = (char*)datum; auto len = GetFullVarSize((const text*)datum); NUdf::ZeroMemoryContext(ptr); ARROW_OK(builder.Append(ptr - sizeof(void*), len + sizeof(void*))); } std::shared_ptr ret; ARROW_OK(builder.Finish(&ret)); return ret; } Numeric PgDecimal128ToNumeric(arrow::Decimal128 value, int32_t precision, int32_t scale, Numeric high_bits_mul) { uint64_t low_bits = value.low_bits(); int64 high_bits = value.high_bits(); if (low_bits > INT64_MAX){ high_bits += 1; } bool error; Numeric low_bits_res = int64_div_fast_to_numeric(low_bits, scale); Numeric high_bits_res = numeric_mul_opt_error(int64_div_fast_to_numeric(high_bits, scale), high_bits_mul, &error); MKQL_ENSURE(error == false, "Bad numeric multiplication."); Numeric res = numeric_add_opt_error(high_bits_res, low_bits_res, &error); MKQL_ENSURE(error == false, "Bad numeric addition."); return res; } TColumnConverter BuildPgNumericColumnConverter(const std::shared_ptr& originalType) { switch (originalType->id()) { case arrow::Type::INT16: return [](const std::shared_ptr& value) { return PgConvertNumeric(value); }; case arrow::Type::INT32: return [](const std::shared_ptr& value) { return PgConvertNumeric(value); }; case arrow::Type::INT64: return [](const std::shared_ptr& value) { return PgConvertNumeric(value); }; case arrow::Type::FLOAT: return [](const std::shared_ptr& value) { return PgConvertNumeric(value); }; case arrow::Type::DOUBLE: return [](const std::shared_ptr& value) { return PgConvertNumeric(value); }; case arrow::Type::DECIMAL128: { auto decimal128Ptr = std::static_pointer_cast(originalType); int32_t precision = decimal128Ptr->precision(); int32_t scale = decimal128Ptr->scale(); return [precision, scale](const std::shared_ptr& value) { return PgDecimal128ConvertNumeric(value, precision, scale); }; } default: return {}; } } template TColumnConverter BuildPgFixedColumnConverter(const std::shared_ptr& originalType, const F& f) { auto primaryType = NKikimr::NMiniKQL::GetPrimitiveDataType(); if (!originalType->Equals(*primaryType) && !arrow::compute::CanCast(*originalType, *primaryType)) { return {}; } return [primaryType, originalType, f](const std::shared_ptr& value) { auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType)); return PgConvertFixed(res, f); }; } Datum MakePgDateFromUint16(ui16 value) { return DatumGetDateADT(UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE + value); } Datum MakePgTimestampFromInt64(i64 value) { return DatumGetTimestamp(USECS_PER_SEC * ((UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE) * SECS_PER_DAY + value)); } TColumnConverter BuildPgColumnConverter(const std::shared_ptr& originalType, NKikimr::NMiniKQL::TPgType* targetType) { switch (targetType->GetTypeId()) { case BOOLOID: { auto primaryType = arrow::boolean(); if (!originalType->Equals(*primaryType) && !arrow::compute::CanCast(*originalType, *primaryType)) { return {}; } return [primaryType, originalType](const std::shared_ptr& value) { auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType)); return PgConvertBool(res); }; } case INT2OID: { return BuildPgFixedColumnConverter(originalType, [](auto value){ return Int16GetDatum(value); }); } case INT4OID: { return BuildPgFixedColumnConverter(originalType, [](auto value){ return Int32GetDatum(value); }); } case INT8OID: { return BuildPgFixedColumnConverter(originalType, [](auto value){ return Int64GetDatum(value); }); } case FLOAT4OID: { return BuildPgFixedColumnConverter(originalType, [](auto value){ return Float4GetDatum(value); }); } case FLOAT8OID: { return BuildPgFixedColumnConverter(originalType, [](auto value){ return Float8GetDatum(value); }); } case NUMERICOID: { return BuildPgNumericColumnConverter(originalType); } case BYTEAOID: case VARCHAROID: case TEXTOID: case CSTRINGOID: { auto primaryType = (targetType->GetTypeId() == BYTEAOID) ? arrow::binary() : arrow::utf8(); if (!arrow::compute::CanCast(*originalType, *primaryType)) { return {}; } return [primaryType, originalType, isCString = NPg::LookupType(targetType->GetTypeId()).TypeLen == -2](const std::shared_ptr& value) { auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType)); if (isCString) { return PgConvertString(res); } else { return PgConvertString(res); } }; } case DATEOID: { if (originalType->Equals(arrow::uint16())) { return [](const std::shared_ptr& value) { return PgConvertFixed(value, [](auto value){ return MakePgDateFromUint16(value); }); }; } else if (originalType->Equals(arrow::date32())) { return [](const std::shared_ptr& value) { return PgConvertFixed(value, [](auto value){ return MakePgDateFromUint16(value); }); }; } else { return {}; } } case TIMESTAMPOID: { if (originalType->Equals(arrow::int64())) { return [](const std::shared_ptr& value) { return PgConvertFixed(value, [](auto value){ return MakePgTimestampFromInt64(value); }); }; } else { return {}; } } } return {}; } class IYsonBlockReaderForPg : public IYsonComplexTypeReader { public: virtual NUdf::TBlockItem GetNotNull(TYsonBuffer&) = 0; NUdf::TBlockItem GetNullableItem(TYsonBuffer& buf) { char prev = buf.Current(); if (prev == NYson::NDetail::EntitySymbol) { buf.Next(); return NUdf::TBlockItem(); } if (prev == NYson::NDetail::BeginListSymbol) { buf.Next(); YQL_ENSURE(buf.Current() == NYson::NDetail::EndListSymbol); buf.Next(); return NUdf::TBlockItem(); } return GetNotNull(buf); } }; NUdf::TBlockItem BlockItemFromDatum(Datum datum, const NPg::TTypeDesc& desc, std::vector& tmp) { if (desc.PassByValue) { return NUdf::TBlockItem((ui64)datum); } auto typeLen = desc.TypeLen; ui32 len; if (typeLen == -1) { len = GetFullVarSize((const text*)datum); } else if (typeLen == -2) { len = 1 + strlen((const char*)datum); } else { len = typeLen; } auto objlen = len; len += sizeof(void*); len = AlignUp(len, 8); tmp.resize(len); *(uint64_t*)tmp.data() = 0; memcpy(tmp.data() + sizeof(void*), (const char*) datum, objlen); return NUdf::TBlockItem(std::string_view(tmp.data(), len)); } NUdf::TBlockItem PgBlockItemFromNativeBinary(const TStringBuf binary, ui32 pgTypeId, std::vector& tmp) { NKikimr::NMiniKQL::TPAllocScope call; StringInfoData stringInfo; stringInfo.data = (char*)binary.Data(); stringInfo.len = binary.Size(); stringInfo.maxlen = binary.Size(); stringInfo.cursor = 0; const auto& typeInfo = NPg::LookupType(pgTypeId); auto typeIOParam = MakeTypeIOParam(typeInfo); auto receiveFuncId = typeInfo.ReceiveFuncId; if (typeInfo.TypeId == typeInfo.ArrayTypeId) { receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId; } { FmgrInfo finfo; Zero(finfo); Y_ENSURE(receiveFuncId); fmgr_info(receiveFuncId, &finfo); Y_ENSURE(!finfo.fn_retset); Y_ENSURE(finfo.fn_addr); Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3); LOCAL_FCINFO(callInfo, 3); Zero(*callInfo); callInfo->flinfo = &finfo; callInfo->nargs = 3; callInfo->fncollation = DEFAULT_COLLATION_OID; callInfo->isnull = false; callInfo->args[0] = { (Datum)&stringInfo, false }; callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false }; callInfo->args[2] = { Int32GetDatum(-1), false }; auto x = finfo.fn_addr(callInfo); Y_ENSURE(!callInfo->isnull); if (stringInfo.cursor != stringInfo.len) { TStringBuilder errMsg; errMsg << "Not all data has been consumed by 'recv' function: " << NPg::LookupProc(receiveFuncId).Name << ", data size: " << stringInfo.len << ", consumed size: " << stringInfo.cursor; UdfTerminate(errMsg.c_str()); } return BlockItemFromDatum(x, typeInfo, tmp); } } template constexpr Datum FixedToDatum(T v) { if constexpr (std::is_same_v) { return BoolGetDatum(v); } else if constexpr (std::is_same_v) { return Int16GetDatum(v); } else if constexpr (std::is_same_v) { return Int32GetDatum(v); } else if constexpr (std::is_same_v) { return Int64GetDatum(v); } else if constexpr (std::is_same_v) { return Float4GetDatum(v); } else if constexpr (std::is_same_v) { return Float8GetDatum(v); } } template class TPgYsonFixedConverter final : public IYsonBlockReaderForPg { public: NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final { return this->GetNullableItem(buf); } NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final { Datum val; if constexpr (std::is_same_v) { Y_ENSURE(buf.Current() == NYson::NDetail::FalseMarker || buf.Current() == NYson::NDetail::TrueMarker); val = FixedToDatum(buf.Current() == NYson::NDetail::TrueMarker); buf.Next(); } else if constexpr (std::is_integral_v) { if constexpr (std::is_signed_v) { Y_ENSURE(buf.Current() == NYson::NDetail::Int64Marker); buf.Next(); val = FixedToDatum(buf.ReadVarI64()); } else { Y_ENSURE(buf.Current() == NYson::NDetail::Uint64Marker); buf.Next(); val = FixedToDatum(buf.ReadVarUI64()); } } else { Y_ENSURE(buf.Current() == NYson::NDetail::DoubleMarker); buf.Next(); val = FixedToDatum(buf.NextDouble()); } return NUdf::TBlockItem(val); } }; template class TPgYsonStringConverter final : public IYsonBlockReaderForPg { public: TPgYsonStringConverter(i32 typeLen) : TypeLen_(typeLen) { if (typeLen == -2) { YQL_ENSURE(IsCString && !FixedLength); } else if (typeLen == -1) { YQL_ENSURE(!IsCString && !FixedLength); } else { YQL_ENSURE(typeLen >= 0 && FixedLength); } } NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final { return this->GetNullableItem(buf); } NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final { Y_ENSURE(buf.Current() == NYson::NDetail::StringMarker); buf.Next(); const i32 originalLen = buf.ReadVarI32(); auto res = buf.Data(); buf.Skip(originalLen); ui32 len; if constexpr (IsCString) { len = 1 + originalLen + sizeof(void*); } else if constexpr (FixedLength) { len = TypeLen_ + sizeof(void*); } else { len = VARHDRSZ + originalLen + sizeof(void*); } if (Tmp_.capacity() < len) { Tmp_.reserve(Max(len, Tmp_.capacity() * 2)); } len = AlignUp(len, 8); Tmp_.resize(len); if constexpr (IsCString) { memcpy(Tmp_.data() + sizeof(void*), res, originalLen); } else if constexpr (FixedLength) { memcpy(Tmp_.data() + sizeof(void*), res, originalLen); } else { memcpy(Tmp_.data() + VARHDRSZ + sizeof(void*), res, originalLen); UpdateCleanVarSize((text*)(Tmp_.data() + sizeof(void*)), originalLen); } return NUdf::TBlockItem(NUdf::TStringRef(Tmp_.data(), len)); } private: std::vector Tmp_; i32 TypeLen_; }; class TPgYsonOtherConverter : public IYsonBlockReaderForPg { public: TPgYsonOtherConverter(Oid typeId) : TypeId_(typeId) {} NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final { return this->GetNullableItem(buf); } NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final { if (buf.Current() != NYson::NDetail::StringMarker) { Y_ENSURE(buf.Current() == NYson::NDetail::StringMarker); } buf.Next(); const i32 len = buf.ReadVarI32(); auto ptr = buf.Data(); buf.Skip(len); return PgBlockItemFromNativeBinary(TStringBuf(ptr, len), TypeId_, Tmp_); } private: Oid TypeId_; std::vector Tmp_; }; template class TPgTopLevelFixedConverter : public IYtColumnConverter { public: using Fn = Datum(*)(const T&); TPgTopLevelFixedConverter(std::unique_ptr&& builder) : Builder_(std::move(builder)) {} arrow::Datum Convert(std::shared_ptr data) override final { if (arrow::Type::DICTIONARY == data->type->id()) { auto valType = static_cast(*data->type).value_type(); Y_ENSURE(Expected == valType->id()); return ConvertDict(data); } else { Y_ENSURE(Expected == data->type->id()); return ConvertNonDict(data); } } arrow::Datum ConvertNonDict(std::shared_ptr data) { ArrType arr(data); if (arr.null_count()) { for (i64 i = 0; i < data->length; ++i) { if (arr.IsNull(i)) { Builder_->Add(NUdf::TBlockItem{}); } else { Builder_->Add(NUdf::TBlockItem(FixedToDatum(arr.Value(i)))); } } } else { for (i64 i = 0; i < data->length; ++i) { Builder_->Add(NUdf::TBlockItem(FixedToDatum(arr.Value(i)))); } } return Builder_->Build(false); } arrow::Datum ConvertDict(std::shared_ptr data) { arrow::DictionaryArray dict(data); auto values = dict.dictionary()->data()->GetValues(1); auto indices = dict.indices()->data()->GetValues(1); if (dict.null_count()) { for (i64 i = 0; i < data->length; ++i) { if (dict.IsNull(i)) { Builder_->Add(NUdf::TBlockItem{}); } else { Builder_->Add(NUdf::TBlockItem(FixedToDatum(values[indices[i]]))); } } } else { for (i64 i = 0; i < data->length; ++i) { Builder_->Add(NUdf::TBlockItem(FixedToDatum(values[indices[i]]))); } } return Builder_->Build(false); } private: std::unique_ptr Builder_; }; template class TPgTopLevelStringConverter : public IYtColumnConverter { public: TPgTopLevelStringConverter(std::unique_ptr&& builder, i32 typeLen) : Builder_(std::move(builder)), TypeLen_(typeLen) { if (typeLen == -2) { YQL_ENSURE(IsCString && !FixedLength); } else if (typeLen == -1) { YQL_ENSURE(!IsCString && !FixedLength); } else { YQL_ENSURE(typeLen >= 0 && FixedLength); } } constexpr NUdf::TBlockItem ConvertOnce(const uint8_t* res, size_t originalLen) { ui32 len; if constexpr (IsCString) { len = 1 + originalLen + sizeof(void*); } else if constexpr (FixedLength) { len = TypeLen_ + sizeof(void*); } else { len = VARHDRSZ + originalLen + sizeof(void*); } if (Tmp_.capacity() < len) { Tmp_.reserve(Max(len, Tmp_.capacity() * 2)); } len = AlignUp(len, 8); Tmp_.resize(len); if constexpr (IsCString) { memcpy(Tmp_.data() + sizeof(void*), res, originalLen); } else if constexpr (FixedLength) { memcpy(Tmp_.data() + sizeof(void*), res, originalLen); } else { memcpy(Tmp_.data() + VARHDRSZ + sizeof(void*), res, originalLen); UpdateCleanVarSize((text*)(Tmp_.data() + sizeof(void*)), originalLen); } return NUdf::TBlockItem(NUdf::TStringRef(Tmp_.data(), len)); } arrow::Datum Convert(std::shared_ptr data) override final { if (arrow::Type::DICTIONARY == data->type->id()) { auto valType = static_cast(*data->type).value_type(); Y_ENSURE(arrow::Type::BINARY == valType->id() || arrow::Type::STRING == valType->id()); return ConvertDict(data); } else { if (arrow::Type::STRING == data->type->id()) { auto res = arrow::compute::Cast(data, std::make_shared()); Y_ENSURE(res.ok()); data = res->array(); } Y_ENSURE(arrow::Type::BINARY == data->type->id()); return ConvertNonDict(data); } } arrow::Datum ConvertNonDict(std::shared_ptr data) { arrow::BinaryArray arr(data); if (arr.null_count()) { for (i64 i = 0; i < data->length; ++i) { if (arr.IsNull(i)) { Builder_->Add(NUdf::TBlockItem{}); } else { i32 len; auto res = arr.GetValue(i, &len); Builder_->Add(ConvertOnce(res, len)); } } } else { for (i64 i = 0; i < data->length; ++i) { i32 len; auto res = arr.GetValue(i, &len); Builder_->Add(ConvertOnce(res, len)); } } return Builder_->Build(false); } arrow::Datum ConvertDict(std::shared_ptr data) { arrow::DictionaryArray dict(data); if (arrow::Type::STRING == data->dictionary->type->id()) { auto res = arrow::compute::Cast(data->dictionary, std::make_shared()); Y_ENSURE(res.ok()); data->dictionary = res->array(); } arrow::BinaryArray arr(data->dictionary); auto indices = dict.indices()->data()->GetValues(1); if (dict.null_count()) { for (i64 i = 0; i < data->length; ++i) { if (dict.IsNull(i)) { Builder_->Add(NUdf::TBlockItem{}); } else { i32 len; auto res = arr.GetValue(indices[i], &len); Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len))); } } } else { for (i64 i = 0; i < data->length; ++i) { i32 len; auto res = arr.GetValue(indices[i], &len); Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len))); } } return Builder_->Build(false); } private: std::unique_ptr Builder_; std::vector Tmp_; i32 TypeLen_; }; class TPgTopLevelOtherConverter : public IYtColumnConverter { public: TPgTopLevelOtherConverter(std::unique_ptr&& builder, Oid typeId) : Builder_(std::move(builder)), TypeId_(typeId) {} inline NUdf::TBlockItem ConvertOnce(const uint8_t* res, size_t len) { return PgBlockItemFromNativeBinary(TStringBuf(reinterpret_cast(res), len), TypeId_, Tmp_); } arrow::Datum Convert(std::shared_ptr data) override final { if (arrow::Type::DICTIONARY == data->type->id()) { auto valType = static_cast(*data->type).value_type(); Y_ENSURE(arrow::Type::BINARY == valType->id() || arrow::Type::STRING == valType->id()); return ConvertDict(data); } else { Y_ENSURE(arrow::Type::BINARY == data->type->id() || arrow::Type::STRING == data->type->id()); return ConvertNonDict(data); } } arrow::Datum ConvertNonDict(std::shared_ptr data) { arrow::BinaryArray arr(data); if (arr.null_count()) { for (i64 i = 0; i < data->length; ++i) { if (arr.IsNull(i)) { Builder_->Add(NUdf::TBlockItem{}); } else { i32 len; auto res = arr.GetValue(i, &len); Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len))); } } } else { for (i64 i = 0; i < data->length; ++i) { i32 len; auto res = arr.GetValue(i, &len); Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len))); } } return Builder_->Build(false); } arrow::Datum ConvertDict(std::shared_ptr data) { arrow::DictionaryArray dict(data); if (arrow::Type::STRING == data->dictionary->type->id()) { auto res = arrow::compute::Cast(data->dictionary, std::make_shared()); Y_ENSURE(res.ok()); data->dictionary = res->array(); } arrow::BinaryArray arr(data->dictionary); auto indices = dict.indices()->data()->GetValues(1); if (dict.null_count()) { for (i64 i = 0; i < data->length; ++i) { if (dict.IsNull(i)) { Builder_->Add(NUdf::TBlockItem{}); } else { i32 len; auto res = arr.GetValue(indices[i], &len); Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len))); } } } else { for (i64 i = 0; i < data->length; ++i) { i32 len; auto res = arr.GetValue(indices[i], &len); Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len))); } } return Builder_->Build(false); } private: std::unique_ptr Builder_; Oid TypeId_; std::vector Tmp_; }; std::unique_ptr BuildPgTopLevelColumnReader(std::unique_ptr&& builder, const NKikimr::NMiniKQL::TPgType* targetType) { YQL_ENSURE(targetType); switch (targetType->GetTypeId()) { case BOOLOID: { return std::make_unique>(std::move(builder)); } case INT2OID: { return std::make_unique>(std::move(builder)); } case INT4OID: { return std::make_unique>(std::move(builder)); } case INT8OID: { return std::make_unique>(std::move(builder)); } case FLOAT4OID: { return std::make_unique>(std::move(builder)); } case FLOAT8OID: { return std::make_unique>(std::move(builder)); } case BYTEAOID: case VARCHAROID: case TEXTOID: case NAMEOID: case CSTRINGOID: { auto typeLen = NPg::LookupType(targetType->GetTypeId()).TypeLen; if (typeLen == -2) { return std::make_unique>(std::move(builder), typeLen); } else if (typeLen == -1) { return std::make_unique>(std::move(builder), typeLen); } else { return std::make_unique>(std::move(builder), typeLen); } } default: return std::make_unique(std::move(builder), targetType->GetTypeId()); } } std::unique_ptr BuildPgYsonColumnReader(const NUdf::TPgTypeDescription& desc) { switch (desc.TypeId) { case BOOLOID: { return std::make_unique>(); } case INT2OID: { return std::make_unique>(); } case INT4OID: { return std::make_unique>(); } case INT8OID: { return std::make_unique>(); } case FLOAT4OID: { return std::make_unique>(); } case FLOAT8OID: { return std::make_unique>(); } case BYTEAOID: case NAMEOID: case VARCHAROID: case TEXTOID: case CSTRINGOID: { auto typeLen = NPg::LookupType(desc.TypeId).TypeLen; if (typeLen == -2) { return std::make_unique>(typeLen); } else if (typeLen == -1) { return std::make_unique>(typeLen); } else { return std::make_unique>(typeLen); } } default: return std::make_unique(desc.TypeId); } } }