123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934 |
- #include "pg_compat.h"
- #include "arrow.h"
- #include "arrow_impl.h"
- #include <yql/essentials/minikql/defs.h>
- #include <yql/essentials/parser/pg_wrapper/interface/arrow.h>
- #include <yql/essentials/parser/pg_wrapper/interface/utils.h>
- #include <yql/essentials/minikql/mkql_node_cast.h>
- #include <yql/essentials/minikql/arrow/arrow_util.h>
- #include <yql/essentials/types/dynumber/dynumber.h>
- #include <yql/essentials/public/decimal/yql_decimal.h>
- #include <util/generic/singleton.h>
- #include <arrow/compute/cast.h>
- #include <arrow/array.h>
- #include <arrow/array/builder_binary.h>
- #include <util/system/mutex.h>
- extern "C" {
- #include "utils/date.h"
- #include "utils/timestamp.h"
- #include "utils/fmgrprotos.h"
- }
- namespace NYql {
- extern "C" {
- Y_PRAGMA_DIAGNOSTIC_PUSH
- Y_PRAGMA("GCC diagnostic ignored \"-Wreturn-type-c-linkage\"")
- #include "pg_kernels_fwd.inc"
- Y_PRAGMA_DIAGNOSTIC_POP
- }
- struct TExecs {
- static TExecs& Instance() {
- return *Singleton<TExecs>();
- }
- TExecs();
- THashMap<Oid, TExecFunc> Table;
- };
- TExecFunc FindExec(Oid oid) {
- const auto& table = TExecs::Instance().Table;
- auto it = table.find(oid);
- if (it == table.end()) {
- return nullptr;
- }
- return it->second;
- }
- bool HasPgKernel(ui32 procOid) {
- return FindExec(procOid) != nullptr;
- }
- TExecs::TExecs()
- {
- #define RegisterExec(oid, func) Table[oid] = func
- #include "pg_kernels_register.all.inc"
- #undef RegisterExec
- }
- const NPg::TAggregateDesc& ResolveAggregation(const TString& name, NKikimr::NMiniKQL::TTupleType* tupleType,
- const std::vector<ui32>& argsColumns, NKikimr::NMiniKQL::TType* returnType, ui32 hint) {
- using namespace NKikimr::NMiniKQL;
- if (returnType) {
- MKQL_ENSURE(argsColumns.size() == 1, "Expected one column");
- TType* stateType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]))->GetItemType();
- TType* returnItemType = AS_TYPE(TBlockType, returnType)->GetItemType();
- return NPg::LookupAggregation(name + "#" + ToString(hint), AS_TYPE(TPgType, stateType)->GetTypeId(), AS_TYPE(TPgType, returnItemType)->GetTypeId());
- } else {
- TVector<ui32> argTypeIds;
- for (const auto col : argsColumns) {
- argTypeIds.push_back(AS_TYPE(TPgType, AS_TYPE(TBlockType, tupleType->GetElementType(col))->GetItemType())->GetTypeId());
- }
- return NPg::LookupAggregation(name, argTypeIds);
- }
- }
- std::shared_ptr<arrow::Array> PgConvertBool(const std::shared_ptr<arrow::Array>& value) {
- const auto& data = value->data();
- size_t length = data->length;
- NUdf::TFixedSizeArrayBuilder<ui64, false> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length);
- auto input = data->GetValues<ui8>(1, 0);
- builder.UnsafeReserve(length);
- auto output = builder.MutableData();
- for (size_t i = 0; i < length; ++i) {
- auto fullIndex = i + data->offset;
- output[i] = BoolGetDatum(arrow::BitUtil::GetBit(input, fullIndex));
- }
- auto dataBuffer = builder.Build(true).array()->buffers[1];
- return arrow::MakeArray(arrow::ArrayData::Make(arrow::uint64(), length, { data->buffers[0], dataBuffer }));
- }
- template <typename T, typename F>
- std::shared_ptr<arrow::Array> PgConvertFixed(const std::shared_ptr<arrow::Array>& value, const F& f) {
- const auto& data = value->data();
- size_t length = data->length;
- NUdf::TFixedSizeArrayBuilder<ui64, false> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length);
- auto input = data->GetValues<T>(1);
- builder.UnsafeReserve(length);
- auto output = builder.MutableData();
- for (size_t i = 0; i < length; ++i) {
- output[i] = f(input[i]);
- }
- auto dataBuffer = builder.Build(true).array()->buffers[1];
- return arrow::MakeArray(arrow::ArrayData::Make(arrow::uint64(), length, { data->buffers[0], dataBuffer }));
- }
- template <bool IsCString>
- std::shared_ptr<arrow::Array> PgConvertString(const std::shared_ptr<arrow::Array>& value) {
- const auto& data = value->data();
- size_t length = data->length;
- arrow::BinaryBuilder builder;
- ARROW_OK(builder.Reserve(length));
- auto inputDataSize = arrow::BinaryArray(data).total_values_length();
- ARROW_OK(builder.ReserveData(inputDataSize + length * (sizeof(void*) + (IsCString ? 1 : VARHDRSZ))));
- NUdf::TStringBlockReader<arrow::BinaryType, true> reader;
- std::vector<char> tmp;
- for (size_t i = 0; i < length; ++i) {
- auto item = reader.GetItem(*data, i);
- if (!item) {
- ARROW_OK(builder.AppendNull());
- continue;
- }
- auto originalLen = item.AsStringRef().Size();
- ui32 len;
- if constexpr (IsCString) {
- len = sizeof(void*) + 1 + originalLen;
- } else {
- len = sizeof(void*) + VARHDRSZ + originalLen;
- }
- if (Y_UNLIKELY(len < originalLen)) {
- ythrow yexception() << "Too long string";
- }
- if (tmp.capacity() < len) {
- tmp.reserve(Max<ui64>(len, tmp.capacity() * 2));
- }
- tmp.resize(len);
- NUdf::ZeroMemoryContext(tmp.data() + sizeof(void*));
- if constexpr (IsCString) {
- memcpy(tmp.data() + sizeof(void*), item.AsStringRef().Data(), originalLen);
- tmp[len - 1] = 0;
- } else {
- memcpy(tmp.data() + sizeof(void*) + VARHDRSZ, item.AsStringRef().Data(), originalLen);
- UpdateCleanVarSize((text*)(tmp.data() + sizeof(void*)), originalLen);
- }
- ARROW_OK(builder.Append(tmp.data(), len));
- }
- std::shared_ptr<arrow::BinaryArray> ret;
- ARROW_OK(builder.Finish(&ret));
- return ret;
- }
- Numeric Uint64ToPgNumeric(ui64 value) {
- if (value <= (ui64)Max<i64>()) {
- return int64_to_numeric((i64)value);
- }
- auto ret1 = int64_to_numeric((i64)(value & ~(1ull << 63)));
- auto bit = int64_to_numeric(Min<i64>());
- bool haveError = false;
- auto ret2 = numeric_sub_opt_error(ret1, bit, &haveError);
- Y_ENSURE(!haveError);
- pfree(ret1);
- pfree(bit);
- return ret2;
- }
- Numeric DecimalToPgNumeric(const NUdf::TUnboxedValuePod& value, ui8 precision, ui8 scale) {
- const auto str = NYql::NDecimal::ToString(value.GetInt128(), precision, scale);
- Y_ENSURE(str);
- return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID,
- PointerGetDatum(str), Int32GetDatum(0), Int32GetDatum(-1));
- }
- Numeric DyNumberToPgNumeric(const NUdf::TUnboxedValuePod& value) {
- auto str = NKikimr::NDyNumber::DyNumberToString(value.AsStringRef());
- Y_ENSURE(str);
- return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID,
- PointerGetDatum(str->c_str()), Int32GetDatum(0), Int32GetDatum(-1));
- }
- Numeric PgFloatToNumeric(double item, ui64 scale, int digits) {
- double intPart, fracPart;
- bool error;
- fracPart = modf(item, &intPart);
- i64 fracInt = round(fracPart * scale);
- // scale compaction: represent 711.56000 as 711.56
- while (digits > 0 && fracInt % 10 == 0) {
- fracInt /= 10;
- digits -= 1;
- }
- if (digits == 0) {
- return int64_to_numeric(intPart);
- } else {
- return numeric_add_opt_error(
- int64_to_numeric(intPart),
- int64_div_fast_to_numeric(fracInt, digits),
- &error);
- }
- }
- std::shared_ptr<arrow::Array> PgDecimal128ConvertNumeric(const std::shared_ptr<arrow::Array>& value, int32_t precision, int32_t scale) {
- TArenaMemoryContext arena;
- const auto& data = value->data();
- size_t length = data->length;
- arrow::BinaryBuilder builder;
- bool error;
- Numeric high_bits_mul = numeric_mul_opt_error(int64_to_numeric(int64_t(1) << 62), int64_to_numeric(4), &error);
- auto input = data->GetValues<arrow::Decimal128>(1);
- for (size_t i = 0; i < length; ++i) {
- if (value->IsNull(i)) {
- ARROW_OK(builder.AppendNull());
- continue;
- }
- Numeric v = PgDecimal128ToNumeric(input[i], precision, scale, high_bits_mul);
- auto datum = NumericGetDatum(v);
- auto ptr = (char*)datum;
- auto len = GetFullVarSize((const text*)datum);
- NUdf::ZeroMemoryContext(ptr);
- ARROW_OK(builder.Append(ptr - sizeof(void*), len + sizeof(void*)));
- }
- std::shared_ptr<arrow::BinaryArray> ret;
- ARROW_OK(builder.Finish(&ret));
- return ret;
- }
- Numeric PgDecimal128ToNumeric(arrow::Decimal128 value, int32_t precision, int32_t scale, Numeric high_bits_mul) {
- uint64_t low_bits = value.low_bits();
- int64 high_bits = value.high_bits();
- if (low_bits > INT64_MAX){
- high_bits += 1;
- }
- bool error;
- Numeric low_bits_res = int64_div_fast_to_numeric(low_bits, scale);
- Numeric high_bits_res = numeric_mul_opt_error(int64_div_fast_to_numeric(high_bits, scale), high_bits_mul, &error);
- MKQL_ENSURE(error == false, "Bad numeric multiplication.");
- Numeric res = numeric_add_opt_error(high_bits_res, low_bits_res, &error);
- MKQL_ENSURE(error == false, "Bad numeric addition.");
- return res;
- }
- TColumnConverter BuildPgNumericColumnConverter(const std::shared_ptr<arrow::DataType>& originalType) {
- switch (originalType->id()) {
- case arrow::Type::INT16:
- return [](const std::shared_ptr<arrow::Array>& value) {
- return PgConvertNumeric<i16>(value);
- };
- case arrow::Type::INT32:
- return [](const std::shared_ptr<arrow::Array>& value) {
- return PgConvertNumeric<i32>(value);
- };
- case arrow::Type::INT64:
- return [](const std::shared_ptr<arrow::Array>& value) {
- return PgConvertNumeric<i64>(value);
- };
- case arrow::Type::FLOAT:
- return [](const std::shared_ptr<arrow::Array>& value) {
- return PgConvertNumeric<float>(value);
- };
- case arrow::Type::DOUBLE:
- return [](const std::shared_ptr<arrow::Array>& value) {
- return PgConvertNumeric<double>(value);
- };
- case arrow::Type::DECIMAL128: {
- auto decimal128Ptr = std::static_pointer_cast<arrow::Decimal128Type>(originalType);
- int32_t precision = decimal128Ptr->precision();
- int32_t scale = decimal128Ptr->scale();
- return [precision, scale](const std::shared_ptr<arrow::Array>& value) {
- return PgDecimal128ConvertNumeric(value, precision, scale);
- };
- }
- default:
- return {};
- }
- }
- template <typename T, typename F>
- TColumnConverter BuildPgFixedColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, const F& f) {
- auto primaryType = NKikimr::NMiniKQL::GetPrimitiveDataType<T>();
- if (!originalType->Equals(*primaryType) && !arrow::compute::CanCast(*originalType, *primaryType)) {
- return {};
- }
- return [primaryType, originalType, f](const std::shared_ptr<arrow::Array>& value) {
- auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType));
- return PgConvertFixed<T, F>(res, f);
- };
- }
- Datum MakePgDateFromUint16(ui16 value) {
- return DatumGetDateADT(UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE + value);
- }
- Datum MakePgTimestampFromInt64(i64 value) {
- return DatumGetTimestamp(USECS_PER_SEC * ((UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE) * SECS_PER_DAY + value));
- }
- TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType) {
- switch (targetType->GetTypeId()) {
- case BOOLOID: {
- auto primaryType = arrow::boolean();
- if (!originalType->Equals(*primaryType) && !arrow::compute::CanCast(*originalType, *primaryType)) {
- return {};
- }
- return [primaryType, originalType](const std::shared_ptr<arrow::Array>& value) {
- auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType));
- return PgConvertBool(res);
- };
- }
- case INT2OID: {
- return BuildPgFixedColumnConverter<i16>(originalType, [](auto value){ return Int16GetDatum(value); });
- }
- case INT4OID: {
- return BuildPgFixedColumnConverter<i32>(originalType, [](auto value){ return Int32GetDatum(value); });
- }
- case INT8OID: {
- return BuildPgFixedColumnConverter<i64>(originalType, [](auto value){ return Int64GetDatum(value); });
- }
- case FLOAT4OID: {
- return BuildPgFixedColumnConverter<float>(originalType, [](auto value){ return Float4GetDatum(value); });
- }
- case FLOAT8OID: {
- return BuildPgFixedColumnConverter<double>(originalType, [](auto value){ return Float8GetDatum(value); });
- }
- case NUMERICOID: {
- return BuildPgNumericColumnConverter(originalType);
- }
- case BYTEAOID:
- case VARCHAROID:
- case TEXTOID:
- case CSTRINGOID: {
- auto primaryType = (targetType->GetTypeId() == BYTEAOID) ? arrow::binary() : arrow::utf8();
- if (!arrow::compute::CanCast(*originalType, *primaryType)) {
- return {};
- }
- return [primaryType, originalType, isCString = NPg::LookupType(targetType->GetTypeId()).TypeLen == -2](const std::shared_ptr<arrow::Array>& value) {
- auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType));
- if (isCString) {
- return PgConvertString<true>(res);
- } else {
- return PgConvertString<false>(res);
- }
- };
- }
- case DATEOID: {
- if (originalType->Equals(arrow::uint16())) {
- return [](const std::shared_ptr<arrow::Array>& value) {
- return PgConvertFixed<ui16>(value, [](auto value){ return MakePgDateFromUint16(value); });
- };
- } else if (originalType->Equals(arrow::date32())) {
- return [](const std::shared_ptr<arrow::Array>& value) {
- return PgConvertFixed<i32>(value, [](auto value){ return MakePgDateFromUint16(value); });
- };
- } else {
- return {};
- }
- }
- case TIMESTAMPOID: {
- if (originalType->Equals(arrow::int64())) {
- return [](const std::shared_ptr<arrow::Array>& value) {
- return PgConvertFixed<i64>(value, [](auto value){ return MakePgTimestampFromInt64(value); });
- };
- } else {
- return {};
- }
- }
- }
- return {};
- }
- class IYsonBlockReaderForPg : public IYsonComplexTypeReader {
- public:
- virtual NUdf::TBlockItem GetNotNull(TYsonBuffer&) = 0;
- NUdf::TBlockItem GetNullableItem(TYsonBuffer& buf) {
- char prev = buf.Current();
- if (prev == NYson::NDetail::EntitySymbol) {
- buf.Next();
- return NUdf::TBlockItem();
- }
- if (prev == NYson::NDetail::BeginListSymbol) {
- buf.Next();
- YQL_ENSURE(buf.Current() == NYson::NDetail::EndListSymbol);
- buf.Next();
- return NUdf::TBlockItem();
- }
- return GetNotNull(buf);
- }
- };
- NUdf::TBlockItem BlockItemFromDatum(Datum datum, const NPg::TTypeDesc& desc, std::vector<char>& tmp) {
- if (desc.PassByValue) {
- return NUdf::TBlockItem((ui64)datum);
- }
- auto typeLen = desc.TypeLen;
- ui32 len;
- if (typeLen == -1) {
- len = GetFullVarSize((const text*)datum);
- } else if (typeLen == -2) {
- len = 1 + strlen((const char*)datum);
- } else {
- len = typeLen;
- }
- auto objlen = len;
- len += sizeof(void*);
- len = AlignUp<i32>(len, 8);
- tmp.resize(len);
- *(uint64_t*)tmp.data() = 0;
- memcpy(tmp.data() + sizeof(void*), (const char*) datum, objlen);
- return NUdf::TBlockItem(std::string_view(tmp.data(), len));
- }
- NUdf::TBlockItem PgBlockItemFromNativeBinary(const TStringBuf binary, ui32 pgTypeId, std::vector<char>& tmp) {
- NKikimr::NMiniKQL::TPAllocScope call;
- StringInfoData stringInfo;
- stringInfo.data = (char*)binary.Data();
- stringInfo.len = binary.Size();
- stringInfo.maxlen = binary.Size();
- stringInfo.cursor = 0;
- const auto& typeInfo = NPg::LookupType(pgTypeId);
- auto typeIOParam = MakeTypeIOParam(typeInfo);
- auto receiveFuncId = typeInfo.ReceiveFuncId;
- if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
- receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId;
- }
- {
- FmgrInfo finfo;
- Zero(finfo);
- Y_ENSURE(receiveFuncId);
- fmgr_info(receiveFuncId, &finfo);
- Y_ENSURE(!finfo.fn_retset);
- Y_ENSURE(finfo.fn_addr);
- Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
- LOCAL_FCINFO(callInfo, 3);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 3;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { (Datum)&stringInfo, false };
- callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
- callInfo->args[2] = { Int32GetDatum(-1), false };
- auto x = finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- if (stringInfo.cursor != stringInfo.len) {
- TStringBuilder errMsg;
- errMsg << "Not all data has been consumed by 'recv' function: " << NPg::LookupProc(receiveFuncId).Name << ", data size: " << stringInfo.len << ", consumed size: " << stringInfo.cursor;
- UdfTerminate(errMsg.c_str());
- }
- return BlockItemFromDatum(x, typeInfo, tmp);
- }
- }
- template<typename T>
- constexpr Datum FixedToDatum(T v) {
- if constexpr (std::is_same_v<T, bool>) {
- return BoolGetDatum(v);
- } else if constexpr (std::is_same_v<T, i16>) {
- return Int16GetDatum(v);
- } else if constexpr (std::is_same_v<T, i32>) {
- return Int32GetDatum(v);
- } else if constexpr (std::is_same_v<T, i64>) {
- return Int64GetDatum(v);
- } else if constexpr (std::is_same_v<T, float>) {
- return Float4GetDatum(v);
- } else if constexpr (std::is_same_v<T, double>) {
- return Float8GetDatum(v);
- }
- }
- template<typename T>
- class TPgYsonFixedConverter final : public IYsonBlockReaderForPg {
- public:
- NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
- return this->GetNullableItem(buf);
- }
- NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
- Datum val;
- if constexpr (std::is_same_v<T, bool>) {
- Y_ENSURE(buf.Current() == NYson::NDetail::FalseMarker || buf.Current() == NYson::NDetail::TrueMarker);
- val = FixedToDatum<T>(buf.Current() == NYson::NDetail::TrueMarker);
- buf.Next();
- } else if constexpr (std::is_integral_v<T>) {
- if constexpr (std::is_signed_v<T>) {
- Y_ENSURE(buf.Current() == NYson::NDetail::Int64Marker);
- buf.Next();
- val = FixedToDatum<T>(buf.ReadVarI64());
- } else {
- Y_ENSURE(buf.Current() == NYson::NDetail::Uint64Marker);
- buf.Next();
- val = FixedToDatum<T>(buf.ReadVarUI64());
- }
- } else {
- Y_ENSURE(buf.Current() == NYson::NDetail::DoubleMarker);
- buf.Next();
- val = FixedToDatum<T>(buf.NextDouble());
- }
- return NUdf::TBlockItem(val);
- }
- };
- template<bool IsCString, bool FixedLength>
- class TPgYsonStringConverter final : public IYsonBlockReaderForPg {
- public:
- TPgYsonStringConverter(i32 typeLen) : TypeLen_(typeLen) {
- if (typeLen == -2) {
- YQL_ENSURE(IsCString && !FixedLength);
- } else if (typeLen == -1) {
- YQL_ENSURE(!IsCString && !FixedLength);
- } else {
- YQL_ENSURE(typeLen >= 0 && FixedLength);
- }
- }
- NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
- return this->GetNullableItem(buf);
- }
- NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
- Y_ENSURE(buf.Current() == NYson::NDetail::StringMarker);
- buf.Next();
- const i32 originalLen = buf.ReadVarI32();
- auto res = buf.Data();
- buf.Skip(originalLen);
- ui32 len;
- if constexpr (IsCString) {
- len = 1 + originalLen + sizeof(void*);
- } else if constexpr (FixedLength) {
- len = TypeLen_ + sizeof(void*);
- } else {
- len = VARHDRSZ + originalLen + sizeof(void*);
- }
- if (Tmp_.capacity() < len) {
- Tmp_.reserve(Max<ui64>(len, Tmp_.capacity() * 2));
- }
- len = AlignUp<ui32>(len, 8);
- Tmp_.resize(len);
- if constexpr (IsCString) {
- memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
- } else if constexpr (FixedLength) {
- memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
- } else {
- memcpy(Tmp_.data() + VARHDRSZ + sizeof(void*), res, originalLen);
- UpdateCleanVarSize((text*)(Tmp_.data() + sizeof(void*)), originalLen);
- }
- return NUdf::TBlockItem(NUdf::TStringRef(Tmp_.data(), len));
- }
- private:
- std::vector<char> Tmp_;
- i32 TypeLen_;
- };
- class TPgYsonOtherConverter : public IYsonBlockReaderForPg {
- public:
- TPgYsonOtherConverter(Oid typeId) : TypeId_(typeId) {}
- NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
- return this->GetNullableItem(buf);
- }
- NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
- if (buf.Current() != NYson::NDetail::StringMarker) {
- Y_ENSURE(buf.Current() == NYson::NDetail::StringMarker);
- }
- buf.Next();
- const i32 len = buf.ReadVarI32();
- auto ptr = buf.Data();
- buf.Skip(len);
- return PgBlockItemFromNativeBinary(TStringBuf(ptr, len), TypeId_, Tmp_);
- }
- private:
- Oid TypeId_;
- std::vector<char> Tmp_;
- };
- template<typename T, arrow::Type::type Expected, typename ArrType>
- class TPgTopLevelFixedConverter : public IYtColumnConverter {
- public:
- using Fn = Datum(*)(const T&);
- TPgTopLevelFixedConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder) : Builder_(std::move(builder)) {}
- arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
- if (arrow::Type::DICTIONARY == data->type->id()) {
- auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
- Y_ENSURE(Expected == valType->id());
- return ConvertDict(data);
- } else {
- Y_ENSURE(Expected == data->type->id());
- return ConvertNonDict(data);
- }
- }
- arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
- ArrType arr(data);
- if (arr.null_count()) {
- for (i64 i = 0; i < data->length; ++i) {
- if (arr.IsNull(i)) {
- Builder_->Add(NUdf::TBlockItem{});
- } else {
- Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(arr.Value(i))));
- }
- }
- } else {
- for (i64 i = 0; i < data->length; ++i) {
- Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(arr.Value(i))));
- }
- }
- return Builder_->Build(false);
- }
- arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
- arrow::DictionaryArray dict(data);
- auto values = dict.dictionary()->data()->GetValues<T>(1);
- auto indices = dict.indices()->data()->GetValues<ui32>(1);
- if (dict.null_count()) {
- for (i64 i = 0; i < data->length; ++i) {
- if (dict.IsNull(i)) {
- Builder_->Add(NUdf::TBlockItem{});
- } else {
- Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(values[indices[i]])));
- }
- }
- } else {
- for (i64 i = 0; i < data->length; ++i) {
- Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(values[indices[i]])));
- }
- }
- return Builder_->Build(false);
- }
- private:
- std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
- };
- template<bool IsCString, bool FixedLength>
- class TPgTopLevelStringConverter : public IYtColumnConverter {
- public:
- TPgTopLevelStringConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, i32 typeLen) : Builder_(std::move(builder)), TypeLen_(typeLen) {
- if (typeLen == -2) {
- YQL_ENSURE(IsCString && !FixedLength);
- } else if (typeLen == -1) {
- YQL_ENSURE(!IsCString && !FixedLength);
- } else {
- YQL_ENSURE(typeLen >= 0 && FixedLength);
- }
- }
- constexpr NUdf::TBlockItem ConvertOnce(const uint8_t* res, size_t originalLen) {
- ui32 len;
- if constexpr (IsCString) {
- len = 1 + originalLen + sizeof(void*);
- } else if constexpr (FixedLength) {
- len = TypeLen_ + sizeof(void*);
- } else {
- len = VARHDRSZ + originalLen + sizeof(void*);
- }
- if (Tmp_.capacity() < len) {
- Tmp_.reserve(Max<ui64>(len, Tmp_.capacity() * 2));
- }
- len = AlignUp<ui32>(len, 8);
- Tmp_.resize(len);
- if constexpr (IsCString) {
- memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
- } else if constexpr (FixedLength) {
- memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
- } else {
- memcpy(Tmp_.data() + VARHDRSZ + sizeof(void*), res, originalLen);
- UpdateCleanVarSize((text*)(Tmp_.data() + sizeof(void*)), originalLen);
- }
- return NUdf::TBlockItem(NUdf::TStringRef(Tmp_.data(), len));
- }
- arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
- if (arrow::Type::DICTIONARY == data->type->id()) {
- auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
- Y_ENSURE(arrow::Type::BINARY == valType->id() || arrow::Type::STRING == valType->id());
- return ConvertDict(data);
- } else {
- if (arrow::Type::STRING == data->type->id()) {
- auto res = arrow::compute::Cast(data, std::make_shared<arrow::BinaryType>());
- Y_ENSURE(res.ok());
- data = res->array();
- }
- Y_ENSURE(arrow::Type::BINARY == data->type->id());
- return ConvertNonDict(data);
- }
- }
- arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
- arrow::BinaryArray arr(data);
- if (arr.null_count()) {
- for (i64 i = 0; i < data->length; ++i) {
- if (arr.IsNull(i)) {
- Builder_->Add(NUdf::TBlockItem{});
- } else {
- i32 len;
- auto res = arr.GetValue(i, &len);
- Builder_->Add(ConvertOnce(res, len));
- }
- }
- } else {
- for (i64 i = 0; i < data->length; ++i) {
- i32 len;
- auto res = arr.GetValue(i, &len);
- Builder_->Add(ConvertOnce(res, len));
- }
- }
- return Builder_->Build(false);
- }
- arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
- arrow::DictionaryArray dict(data);
- if (arrow::Type::STRING == data->dictionary->type->id()) {
- auto res = arrow::compute::Cast(data->dictionary, std::make_shared<arrow::BinaryType>());
- Y_ENSURE(res.ok());
- data->dictionary = res->array();
- }
- arrow::BinaryArray arr(data->dictionary);
- auto indices = dict.indices()->data()->GetValues<ui32>(1);
- if (dict.null_count()) {
- for (i64 i = 0; i < data->length; ++i) {
- if (dict.IsNull(i)) {
- Builder_->Add(NUdf::TBlockItem{});
- } else {
- i32 len;
- auto res = arr.GetValue(indices[i], &len);
- Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
- }
- }
- } else {
- for (i64 i = 0; i < data->length; ++i) {
- i32 len;
- auto res = arr.GetValue(indices[i], &len);
- Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
- }
- }
- return Builder_->Build(false);
- }
- private:
- std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
- std::vector<char> Tmp_;
- i32 TypeLen_;
- };
- class TPgTopLevelOtherConverter : public IYtColumnConverter {
- public:
- TPgTopLevelOtherConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, Oid typeId) : Builder_(std::move(builder)), TypeId_(typeId) {}
- inline NUdf::TBlockItem ConvertOnce(const uint8_t* res, size_t len) {
- return PgBlockItemFromNativeBinary(TStringBuf(reinterpret_cast<const char*>(res), len), TypeId_, Tmp_);
- }
- arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
- if (arrow::Type::DICTIONARY == data->type->id()) {
- auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
- Y_ENSURE(arrow::Type::BINARY == valType->id() || arrow::Type::STRING == valType->id());
- return ConvertDict(data);
- } else {
- Y_ENSURE(arrow::Type::BINARY == data->type->id() || arrow::Type::STRING == data->type->id());
- return ConvertNonDict(data);
- }
- }
- arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
- arrow::BinaryArray arr(data);
- if (arr.null_count()) {
- for (i64 i = 0; i < data->length; ++i) {
- if (arr.IsNull(i)) {
- Builder_->Add(NUdf::TBlockItem{});
- } else {
- i32 len;
- auto res = arr.GetValue(i, &len);
- Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
- }
- }
- } else {
- for (i64 i = 0; i < data->length; ++i) {
- i32 len;
- auto res = arr.GetValue(i, &len);
- Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
- }
- }
- return Builder_->Build(false);
- }
- arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
- arrow::DictionaryArray dict(data);
- if (arrow::Type::STRING == data->dictionary->type->id()) {
- auto res = arrow::compute::Cast(data->dictionary, std::make_shared<arrow::BinaryType>());
- Y_ENSURE(res.ok());
- data->dictionary = res->array();
- }
- arrow::BinaryArray arr(data->dictionary);
- auto indices = dict.indices()->data()->GetValues<ui32>(1);
- if (dict.null_count()) {
- for (i64 i = 0; i < data->length; ++i) {
- if (dict.IsNull(i)) {
- Builder_->Add(NUdf::TBlockItem{});
- } else {
- i32 len;
- auto res = arr.GetValue(indices[i], &len);
- Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
- }
- }
- } else {
- for (i64 i = 0; i < data->length; ++i) {
- i32 len;
- auto res = arr.GetValue(indices[i], &len);
- Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
- }
- }
- return Builder_->Build(false);
- }
- private:
- std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
- Oid TypeId_;
- std::vector<char> Tmp_;
- };
- std::unique_ptr<IYtColumnConverter> BuildPgTopLevelColumnReader(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, const NKikimr::NMiniKQL::TPgType* targetType) {
- YQL_ENSURE(targetType);
- switch (targetType->GetTypeId()) {
- case BOOLOID: {
- return std::make_unique<TPgTopLevelFixedConverter<bool, arrow::Type::BOOL, arrow::BooleanArray>>(std::move(builder));
- }
- case INT2OID: {
- return std::make_unique<TPgTopLevelFixedConverter<i16, arrow::Type::INT16, arrow::Int16Array>>(std::move(builder));
- }
- case INT4OID: {
- return std::make_unique<TPgTopLevelFixedConverter<i32, arrow::Type::INT32, arrow::Int32Array>>(std::move(builder));
- }
- case INT8OID: {
- return std::make_unique<TPgTopLevelFixedConverter<i64, arrow::Type::INT64, arrow::Int64Array>>(std::move(builder));
- }
- case FLOAT4OID: {
- return std::make_unique<TPgTopLevelFixedConverter<float, arrow::Type::DOUBLE, arrow::DoubleArray>>(std::move(builder));
- }
- case FLOAT8OID: {
- return std::make_unique<TPgTopLevelFixedConverter<double, arrow::Type::DOUBLE, arrow::DoubleArray>>(std::move(builder));
- }
- case BYTEAOID:
- case VARCHAROID:
- case TEXTOID:
- case NAMEOID:
- case CSTRINGOID: {
- auto typeLen = NPg::LookupType(targetType->GetTypeId()).TypeLen;
- if (typeLen == -2) {
- return std::make_unique<TPgTopLevelStringConverter<true, false>>(std::move(builder), typeLen);
- } else if (typeLen == -1) {
- return std::make_unique<TPgTopLevelStringConverter<false, false>>(std::move(builder), typeLen);
- } else {
- return std::make_unique<TPgTopLevelStringConverter<false, true>>(std::move(builder), typeLen);
- }
- }
- default:
- return std::make_unique<TPgTopLevelOtherConverter>(std::move(builder), targetType->GetTypeId());
- }
- }
- std::unique_ptr<IYsonComplexTypeReader> BuildPgYsonColumnReader(const NUdf::TPgTypeDescription& desc) {
- switch (desc.TypeId) {
- case BOOLOID: {
- return std::make_unique<TPgYsonFixedConverter<bool>>();
- }
- case INT2OID: {
- return std::make_unique<TPgYsonFixedConverter<i16>>();
- }
- case INT4OID: {
- return std::make_unique<TPgYsonFixedConverter<i32>>();
- }
- case INT8OID: {
- return std::make_unique<TPgYsonFixedConverter<i64>>();
- }
- case FLOAT4OID: {
- return std::make_unique<TPgYsonFixedConverter<float>>();
- }
- case FLOAT8OID: {
- return std::make_unique<TPgYsonFixedConverter<double>>();
- }
- case BYTEAOID:
- case NAMEOID:
- case VARCHAROID:
- case TEXTOID:
- case CSTRINGOID: {
- auto typeLen = NPg::LookupType(desc.TypeId).TypeLen;
- if (typeLen == -2) {
- return std::make_unique<TPgYsonStringConverter<true, false>>(typeLen);
- } else if (typeLen == -1) {
- return std::make_unique<TPgYsonStringConverter<false, false>>(typeLen);
- } else {
- return std::make_unique<TPgYsonStringConverter<false, true>>(typeLen);
- }
- }
- default:
- return std::make_unique<TPgYsonOtherConverter>(desc.TypeId);
- }
- }
- }
|