arrow.cpp 34 KB


  1. #include "pg_compat.h"
  2. #include "arrow.h"
  3. #include "arrow_impl.h"
  4. #include <yql/essentials/minikql/defs.h>
  5. #include <yql/essentials/parser/pg_wrapper/interface/arrow.h>
  6. #include <yql/essentials/parser/pg_wrapper/interface/utils.h>
  7. #include <yql/essentials/minikql/mkql_node_cast.h>
  8. #include <yql/essentials/minikql/arrow/arrow_util.h>
  9. #include <yql/essentials/types/dynumber/dynumber.h>
  10. #include <yql/essentials/public/decimal/yql_decimal.h>
  11. #include <util/generic/singleton.h>
  12. #include <arrow/compute/cast.h>
  13. #include <arrow/array.h>
  14. #include <arrow/array/builder_binary.h>
  15. #include <util/system/mutex.h>
  16. extern "C" {
  17. #include "utils/date.h"
  18. #include "utils/timestamp.h"
  19. #include "utils/fmgrprotos.h"
  20. }
  21. namespace NYql {
  22. extern "C" {
  23. Y_PRAGMA_DIAGNOSTIC_PUSH
  24. Y_PRAGMA("GCC diagnostic ignored \"-Wreturn-type-c-linkage\"")
  25. #include "pg_kernels_fwd.inc"
  26. Y_PRAGMA_DIAGNOSTIC_POP
  27. }
  28. struct TExecs {
  29. static TExecs& Instance() {
  30. return *Singleton<TExecs>();
  31. }
  32. TExecs();
  33. THashMap<Oid, TExecFunc> Table;
  34. };
  35. TExecFunc FindExec(Oid oid) {
  36. const auto& table = TExecs::Instance().Table;
  37. auto it = table.find(oid);
  38. if (it == table.end()) {
  39. return nullptr;
  40. }
  41. return it->second;
  42. }
  43. bool HasPgKernel(ui32 procOid) {
  44. return FindExec(procOid) != nullptr;
  45. }
  46. TExecs::TExecs()
  47. {
  48. #define RegisterExec(oid, func) Table[oid] = func
  49. #include "pg_kernels_register.all.inc"
  50. #undef RegisterExec
  51. }
  52. const NPg::TAggregateDesc& ResolveAggregation(const TString& name, NKikimr::NMiniKQL::TTupleType* tupleType,
  53. const std::vector<ui32>& argsColumns, NKikimr::NMiniKQL::TType* returnType, ui32 hint) {
  54. using namespace NKikimr::NMiniKQL;
  55. if (returnType) {
  56. MKQL_ENSURE(argsColumns.size() == 1, "Expected one column");
  57. TType* stateType = AS_TYPE(TBlockType, tupleType->GetElementType(argsColumns[0]))->GetItemType();
  58. TType* returnItemType = AS_TYPE(TBlockType, returnType)->GetItemType();
  59. return NPg::LookupAggregation(name + "#" + ToString(hint), AS_TYPE(TPgType, stateType)->GetTypeId(), AS_TYPE(TPgType, returnItemType)->GetTypeId());
  60. } else {
  61. TVector<ui32> argTypeIds;
  62. for (const auto col : argsColumns) {
  63. argTypeIds.push_back(AS_TYPE(TPgType, AS_TYPE(TBlockType, tupleType->GetElementType(col))->GetItemType())->GetTypeId());
  64. }
  65. return NPg::LookupAggregation(name, argTypeIds);
  66. }
  67. }
  68. std::shared_ptr<arrow::Array> PgConvertBool(const std::shared_ptr<arrow::Array>& value) {
  69. const auto& data = value->data();
  70. size_t length = data->length;
  71. NUdf::TFixedSizeArrayBuilder<ui64, false> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length);
  72. auto input = data->GetValues<ui8>(1, 0);
  73. builder.UnsafeReserve(length);
  74. auto output = builder.MutableData();
  75. for (size_t i = 0; i < length; ++i) {
  76. auto fullIndex = i + data->offset;
  77. output[i] = BoolGetDatum(arrow::BitUtil::GetBit(input, fullIndex));
  78. }
  79. auto dataBuffer = builder.Build(true).array()->buffers[1];
  80. return arrow::MakeArray(arrow::ArrayData::Make(arrow::uint64(), length, { data->buffers[0], dataBuffer }));
  81. }
  82. template <typename T, typename F>
  83. std::shared_ptr<arrow::Array> PgConvertFixed(const std::shared_ptr<arrow::Array>& value, const F& f) {
  84. const auto& data = value->data();
  85. size_t length = data->length;
  86. NUdf::TFixedSizeArrayBuilder<ui64, false> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length);
  87. auto input = data->GetValues<T>(1);
  88. builder.UnsafeReserve(length);
  89. auto output = builder.MutableData();
  90. for (size_t i = 0; i < length; ++i) {
  91. output[i] = f(input[i]);
  92. }
  93. auto dataBuffer = builder.Build(true).array()->buffers[1];
  94. return arrow::MakeArray(arrow::ArrayData::Make(arrow::uint64(), length, { data->buffers[0], dataBuffer }));
  95. }
  96. template <bool IsCString>
  97. std::shared_ptr<arrow::Array> PgConvertString(const std::shared_ptr<arrow::Array>& value) {
  98. const auto& data = value->data();
  99. size_t length = data->length;
  100. arrow::BinaryBuilder builder;
  101. ARROW_OK(builder.Reserve(length));
  102. auto inputDataSize = arrow::BinaryArray(data).total_values_length();
  103. ARROW_OK(builder.ReserveData(inputDataSize + length * (sizeof(void*) + (IsCString ? 1 : VARHDRSZ))));
  104. NUdf::TStringBlockReader<arrow::BinaryType, true> reader;
  105. std::vector<char> tmp;
  106. for (size_t i = 0; i < length; ++i) {
  107. auto item = reader.GetItem(*data, i);
  108. if (!item) {
  109. ARROW_OK(builder.AppendNull());
  110. continue;
  111. }
  112. auto originalLen = item.AsStringRef().Size();
  113. ui32 len;
  114. if constexpr (IsCString) {
  115. len = sizeof(void*) + 1 + originalLen;
  116. } else {
  117. len = sizeof(void*) + VARHDRSZ + originalLen;
  118. }
  119. if (Y_UNLIKELY(len < originalLen)) {
  120. ythrow yexception() << "Too long string";
  121. }
  122. if (tmp.capacity() < len) {
  123. tmp.reserve(Max<ui64>(len, tmp.capacity() * 2));
  124. }
  125. tmp.resize(len);
  126. NUdf::ZeroMemoryContext(tmp.data() + sizeof(void*));
  127. if constexpr (IsCString) {
  128. memcpy(tmp.data() + sizeof(void*), item.AsStringRef().Data(), originalLen);
  129. tmp[len - 1] = 0;
  130. } else {
  131. memcpy(tmp.data() + sizeof(void*) + VARHDRSZ, item.AsStringRef().Data(), originalLen);
  132. UpdateCleanVarSize((text*)(tmp.data() + sizeof(void*)), originalLen);
  133. }
  134. ARROW_OK(builder.Append(tmp.data(), len));
  135. }
  136. std::shared_ptr<arrow::BinaryArray> ret;
  137. ARROW_OK(builder.Finish(&ret));
  138. return ret;
  139. }
  140. Numeric Uint64ToPgNumeric(ui64 value) {
  141. if (value <= (ui64)Max<i64>()) {
  142. return int64_to_numeric((i64)value);
  143. }
  144. auto ret1 = int64_to_numeric((i64)(value & ~(1ull << 63)));
  145. auto bit = int64_to_numeric(Min<i64>());
  146. bool haveError = false;
  147. auto ret2 = numeric_sub_opt_error(ret1, bit, &haveError);
  148. Y_ENSURE(!haveError);
  149. pfree(ret1);
  150. pfree(bit);
  151. return ret2;
  152. }
  153. Numeric DecimalToPgNumeric(const NUdf::TUnboxedValuePod& value, ui8 precision, ui8 scale) {
  154. const auto str = NYql::NDecimal::ToString(value.GetInt128(), precision, scale);
  155. Y_ENSURE(str);
  156. return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID,
  157. PointerGetDatum(str), Int32GetDatum(0), Int32GetDatum(-1));
  158. }
  159. Numeric DyNumberToPgNumeric(const NUdf::TUnboxedValuePod& value) {
  160. auto str = NKikimr::NDyNumber::DyNumberToString(value.AsStringRef());
  161. Y_ENSURE(str);
  162. return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID,
  163. PointerGetDatum(str->c_str()), Int32GetDatum(0), Int32GetDatum(-1));
  164. }
  165. Numeric PgFloatToNumeric(double item, ui64 scale, int digits) {
  166. double intPart, fracPart;
  167. bool error;
  168. fracPart = modf(item, &intPart);
  169. i64 fracInt = round(fracPart * scale);
  170. // scale compaction: represent 711.56000 as 711.56
  171. while (digits > 0 && fracInt % 10 == 0) {
  172. fracInt /= 10;
  173. digits -= 1;
  174. }
  175. if (digits == 0) {
  176. return int64_to_numeric(intPart);
  177. } else {
  178. return numeric_add_opt_error(
  179. int64_to_numeric(intPart),
  180. int64_div_fast_to_numeric(fracInt, digits),
  181. &error);
  182. }
  183. }
  184. std::shared_ptr<arrow::Array> PgDecimal128ConvertNumeric(const std::shared_ptr<arrow::Array>& value, int32_t precision, int32_t scale) {
  185. TArenaMemoryContext arena;
  186. const auto& data = value->data();
  187. size_t length = data->length;
  188. arrow::BinaryBuilder builder;
  189. bool error;
  190. Numeric high_bits_mul = numeric_mul_opt_error(int64_to_numeric(int64_t(1) << 62), int64_to_numeric(4), &error);
  191. auto input = data->GetValues<arrow::Decimal128>(1);
  192. for (size_t i = 0; i < length; ++i) {
  193. if (value->IsNull(i)) {
  194. ARROW_OK(builder.AppendNull());
  195. continue;
  196. }
  197. Numeric v = PgDecimal128ToNumeric(input[i], precision, scale, high_bits_mul);
  198. auto datum = NumericGetDatum(v);
  199. auto ptr = (char*)datum;
  200. auto len = GetFullVarSize((const text*)datum);
  201. NUdf::ZeroMemoryContext(ptr);
  202. ARROW_OK(builder.Append(ptr - sizeof(void*), len + sizeof(void*)));
  203. }
  204. std::shared_ptr<arrow::BinaryArray> ret;
  205. ARROW_OK(builder.Finish(&ret));
  206. return ret;
  207. }
  208. Numeric PgDecimal128ToNumeric(arrow::Decimal128 value, int32_t precision, int32_t scale, Numeric high_bits_mul) {
  209. uint64_t low_bits = value.low_bits();
  210. int64 high_bits = value.high_bits();
  211. if (low_bits > INT64_MAX){
  212. high_bits += 1;
  213. }
  214. bool error;
  215. Numeric low_bits_res = int64_div_fast_to_numeric(low_bits, scale);
  216. Numeric high_bits_res = numeric_mul_opt_error(int64_div_fast_to_numeric(high_bits, scale), high_bits_mul, &error);
  217. MKQL_ENSURE(error == false, "Bad numeric multiplication.");
  218. Numeric res = numeric_add_opt_error(high_bits_res, low_bits_res, &error);
  219. MKQL_ENSURE(error == false, "Bad numeric addition.");
  220. return res;
  221. }
  222. TColumnConverter BuildPgNumericColumnConverter(const std::shared_ptr<arrow::DataType>& originalType) {
  223. switch (originalType->id()) {
  224. case arrow::Type::INT16:
  225. return [](const std::shared_ptr<arrow::Array>& value) {
  226. return PgConvertNumeric<i16>(value);
  227. };
  228. case arrow::Type::INT32:
  229. return [](const std::shared_ptr<arrow::Array>& value) {
  230. return PgConvertNumeric<i32>(value);
  231. };
  232. case arrow::Type::INT64:
  233. return [](const std::shared_ptr<arrow::Array>& value) {
  234. return PgConvertNumeric<i64>(value);
  235. };
  236. case arrow::Type::FLOAT:
  237. return [](const std::shared_ptr<arrow::Array>& value) {
  238. return PgConvertNumeric<float>(value);
  239. };
  240. case arrow::Type::DOUBLE:
  241. return [](const std::shared_ptr<arrow::Array>& value) {
  242. return PgConvertNumeric<double>(value);
  243. };
  244. case arrow::Type::DECIMAL128: {
  245. auto decimal128Ptr = std::static_pointer_cast<arrow::Decimal128Type>(originalType);
  246. int32_t precision = decimal128Ptr->precision();
  247. int32_t scale = decimal128Ptr->scale();
  248. return [precision, scale](const std::shared_ptr<arrow::Array>& value) {
  249. return PgDecimal128ConvertNumeric(value, precision, scale);
  250. };
  251. }
  252. default:
  253. return {};
  254. }
  255. }
  256. template <typename T, typename F>
  257. TColumnConverter BuildPgFixedColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, const F& f) {
  258. auto primaryType = NKikimr::NMiniKQL::GetPrimitiveDataType<T>();
  259. if (!originalType->Equals(*primaryType) && !arrow::compute::CanCast(*originalType, *primaryType)) {
  260. return {};
  261. }
  262. return [primaryType, originalType, f](const std::shared_ptr<arrow::Array>& value) {
  263. auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType));
  264. return PgConvertFixed<T, F>(res, f);
  265. };
  266. }
  267. Datum MakePgDateFromUint16(ui16 value) {
  268. return DatumGetDateADT(UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE + value);
  269. }
  270. Datum MakePgTimestampFromInt64(i64 value) {
  271. return DatumGetTimestamp(USECS_PER_SEC * ((UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE) * SECS_PER_DAY + value));
  272. }
  273. TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType) {
  274. switch (targetType->GetTypeId()) {
  275. case BOOLOID: {
  276. auto primaryType = arrow::boolean();
  277. if (!originalType->Equals(*primaryType) && !arrow::compute::CanCast(*originalType, *primaryType)) {
  278. return {};
  279. }
  280. return [primaryType, originalType](const std::shared_ptr<arrow::Array>& value) {
  281. auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType));
  282. return PgConvertBool(res);
  283. };
  284. }
  285. case INT2OID: {
  286. return BuildPgFixedColumnConverter<i16>(originalType, [](auto value){ return Int16GetDatum(value); });
  287. }
  288. case INT4OID: {
  289. return BuildPgFixedColumnConverter<i32>(originalType, [](auto value){ return Int32GetDatum(value); });
  290. }
  291. case INT8OID: {
  292. return BuildPgFixedColumnConverter<i64>(originalType, [](auto value){ return Int64GetDatum(value); });
  293. }
  294. case FLOAT4OID: {
  295. return BuildPgFixedColumnConverter<float>(originalType, [](auto value){ return Float4GetDatum(value); });
  296. }
  297. case FLOAT8OID: {
  298. return BuildPgFixedColumnConverter<double>(originalType, [](auto value){ return Float8GetDatum(value); });
  299. }
  300. case NUMERICOID: {
  301. return BuildPgNumericColumnConverter(originalType);
  302. }
  303. case BYTEAOID:
  304. case VARCHAROID:
  305. case TEXTOID:
  306. case CSTRINGOID: {
  307. auto primaryType = (targetType->GetTypeId() == BYTEAOID) ? arrow::binary() : arrow::utf8();
  308. if (!arrow::compute::CanCast(*originalType, *primaryType)) {
  309. return {};
  310. }
  311. return [primaryType, originalType, isCString = NPg::LookupType(targetType->GetTypeId()).TypeLen == -2](const std::shared_ptr<arrow::Array>& value) {
  312. auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType));
  313. if (isCString) {
  314. return PgConvertString<true>(res);
  315. } else {
  316. return PgConvertString<false>(res);
  317. }
  318. };
  319. }
  320. case DATEOID: {
  321. if (originalType->Equals(arrow::uint16())) {
  322. return [](const std::shared_ptr<arrow::Array>& value) {
  323. return PgConvertFixed<ui16>(value, [](auto value){ return MakePgDateFromUint16(value); });
  324. };
  325. } else if (originalType->Equals(arrow::date32())) {
  326. return [](const std::shared_ptr<arrow::Array>& value) {
  327. return PgConvertFixed<i32>(value, [](auto value){ return MakePgDateFromUint16(value); });
  328. };
  329. } else {
  330. return {};
  331. }
  332. }
  333. case TIMESTAMPOID: {
  334. if (originalType->Equals(arrow::int64())) {
  335. return [](const std::shared_ptr<arrow::Array>& value) {
  336. return PgConvertFixed<i64>(value, [](auto value){ return MakePgTimestampFromInt64(value); });
  337. };
  338. } else {
  339. return {};
  340. }
  341. }
  342. }
  343. return {};
  344. }
  345. class IYsonBlockReaderForPg : public IYsonComplexTypeReader {
  346. public:
  347. virtual NUdf::TBlockItem GetNotNull(TYsonBuffer&) = 0;
  348. NUdf::TBlockItem GetNullableItem(TYsonBuffer& buf) {
  349. char prev = buf.Current();
  350. if (prev == NYson::NDetail::EntitySymbol) {
  351. buf.Next();
  352. return NUdf::TBlockItem();
  353. }
  354. if (prev == NYson::NDetail::BeginListSymbol) {
  355. buf.Next();
  356. YQL_ENSURE(buf.Current() == NYson::NDetail::EndListSymbol);
  357. buf.Next();
  358. return NUdf::TBlockItem();
  359. }
  360. return GetNotNull(buf);
  361. }
  362. };
  363. NUdf::TBlockItem BlockItemFromDatum(Datum datum, const NPg::TTypeDesc& desc, std::vector<char>& tmp) {
  364. if (desc.PassByValue) {
  365. return NUdf::TBlockItem((ui64)datum);
  366. }
  367. auto typeLen = desc.TypeLen;
  368. ui32 len;
  369. if (typeLen == -1) {
  370. len = GetFullVarSize((const text*)datum);
  371. } else if (typeLen == -2) {
  372. len = 1 + strlen((const char*)datum);
  373. } else {
  374. len = typeLen;
  375. }
  376. auto objlen = len;
  377. len += sizeof(void*);
  378. len = AlignUp<i32>(len, 8);
  379. tmp.resize(len);
  380. *(uint64_t*)tmp.data() = 0;
  381. memcpy(tmp.data() + sizeof(void*), (const char*) datum, objlen);
  382. return NUdf::TBlockItem(std::string_view(tmp.data(), len));
  383. }
  384. NUdf::TBlockItem PgBlockItemFromNativeBinary(const TStringBuf binary, ui32 pgTypeId, std::vector<char>& tmp) {
  385. NKikimr::NMiniKQL::TPAllocScope call;
  386. StringInfoData stringInfo;
  387. stringInfo.data = (char*)binary.Data();
  388. stringInfo.len = binary.Size();
  389. stringInfo.maxlen = binary.Size();
  390. stringInfo.cursor = 0;
  391. const auto& typeInfo = NPg::LookupType(pgTypeId);
  392. auto typeIOParam = MakeTypeIOParam(typeInfo);
  393. auto receiveFuncId = typeInfo.ReceiveFuncId;
  394. if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
  395. receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId;
  396. }
  397. {
  398. FmgrInfo finfo;
  399. Zero(finfo);
  400. Y_ENSURE(receiveFuncId);
  401. fmgr_info(receiveFuncId, &finfo);
  402. Y_ENSURE(!finfo.fn_retset);
  403. Y_ENSURE(finfo.fn_addr);
  404. Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
  405. LOCAL_FCINFO(callInfo, 3);
  406. Zero(*callInfo);
  407. callInfo->flinfo = &finfo;
  408. callInfo->nargs = 3;
  409. callInfo->fncollation = DEFAULT_COLLATION_OID;
  410. callInfo->isnull = false;
  411. callInfo->args[0] = { (Datum)&stringInfo, false };
  412. callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
  413. callInfo->args[2] = { Int32GetDatum(-1), false };
  414. auto x = finfo.fn_addr(callInfo);
  415. Y_ENSURE(!callInfo->isnull);
  416. if (stringInfo.cursor != stringInfo.len) {
  417. TStringBuilder errMsg;
  418. errMsg << "Not all data has been consumed by 'recv' function: " << NPg::LookupProc(receiveFuncId).Name << ", data size: " << stringInfo.len << ", consumed size: " << stringInfo.cursor;
  419. UdfTerminate(errMsg.c_str());
  420. }
  421. return BlockItemFromDatum(x, typeInfo, tmp);
  422. }
  423. }
  424. template<typename T>
  425. constexpr Datum FixedToDatum(T v) {
  426. if constexpr (std::is_same_v<T, bool>) {
  427. return BoolGetDatum(v);
  428. } else if constexpr (std::is_same_v<T, i16>) {
  429. return Int16GetDatum(v);
  430. } else if constexpr (std::is_same_v<T, i32>) {
  431. return Int32GetDatum(v);
  432. } else if constexpr (std::is_same_v<T, i64>) {
  433. return Int64GetDatum(v);
  434. } else if constexpr (std::is_same_v<T, float>) {
  435. return Float4GetDatum(v);
  436. } else if constexpr (std::is_same_v<T, double>) {
  437. return Float8GetDatum(v);
  438. }
  439. }
  440. template<typename T>
  441. class TPgYsonFixedConverter final : public IYsonBlockReaderForPg {
  442. public:
  443. NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
  444. return this->GetNullableItem(buf);
  445. }
  446. NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
  447. Datum val;
  448. if constexpr (std::is_same_v<T, bool>) {
  449. Y_ENSURE(buf.Current() == NYson::NDetail::FalseMarker || buf.Current() == NYson::NDetail::TrueMarker);
  450. val = FixedToDatum<T>(buf.Current() == NYson::NDetail::TrueMarker);
  451. buf.Next();
  452. } else if constexpr (std::is_integral_v<T>) {
  453. if constexpr (std::is_signed_v<T>) {
  454. Y_ENSURE(buf.Current() == NYson::NDetail::Int64Marker);
  455. buf.Next();
  456. val = FixedToDatum<T>(buf.ReadVarI64());
  457. } else {
  458. Y_ENSURE(buf.Current() == NYson::NDetail::Uint64Marker);
  459. buf.Next();
  460. val = FixedToDatum<T>(buf.ReadVarUI64());
  461. }
  462. } else {
  463. Y_ENSURE(buf.Current() == NYson::NDetail::DoubleMarker);
  464. buf.Next();
  465. val = FixedToDatum<T>(buf.NextDouble());
  466. }
  467. return NUdf::TBlockItem(val);
  468. }
  469. };
  470. template<bool IsCString, bool FixedLength>
  471. class TPgYsonStringConverter final : public IYsonBlockReaderForPg {
  472. public:
  473. TPgYsonStringConverter(i32 typeLen) : TypeLen_(typeLen) {
  474. if (typeLen == -2) {
  475. YQL_ENSURE(IsCString && !FixedLength);
  476. } else if (typeLen == -1) {
  477. YQL_ENSURE(!IsCString && !FixedLength);
  478. } else {
  479. YQL_ENSURE(typeLen >= 0 && FixedLength);
  480. }
  481. }
  482. NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
  483. return this->GetNullableItem(buf);
  484. }
  485. NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
  486. Y_ENSURE(buf.Current() == NYson::NDetail::StringMarker);
  487. buf.Next();
  488. const i32 originalLen = buf.ReadVarI32();
  489. auto res = buf.Data();
  490. buf.Skip(originalLen);
  491. ui32 len;
  492. if constexpr (IsCString) {
  493. len = 1 + originalLen + sizeof(void*);
  494. } else if constexpr (FixedLength) {
  495. len = TypeLen_ + sizeof(void*);
  496. } else {
  497. len = VARHDRSZ + originalLen + sizeof(void*);
  498. }
  499. if (Tmp_.capacity() < len) {
  500. Tmp_.reserve(Max<ui64>(len, Tmp_.capacity() * 2));
  501. }
  502. len = AlignUp<ui32>(len, 8);
  503. Tmp_.resize(len);
  504. if constexpr (IsCString) {
  505. memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
  506. } else if constexpr (FixedLength) {
  507. memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
  508. } else {
  509. memcpy(Tmp_.data() + VARHDRSZ + sizeof(void*), res, originalLen);
  510. UpdateCleanVarSize((text*)(Tmp_.data() + sizeof(void*)), originalLen);
  511. }
  512. return NUdf::TBlockItem(NUdf::TStringRef(Tmp_.data(), len));
  513. }
  514. private:
  515. std::vector<char> Tmp_;
  516. i32 TypeLen_;
  517. };
  518. class TPgYsonOtherConverter : public IYsonBlockReaderForPg {
  519. public:
  520. TPgYsonOtherConverter(Oid typeId) : TypeId_(typeId) {}
  521. NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
  522. return this->GetNullableItem(buf);
  523. }
  524. NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
  525. if (buf.Current() != NYson::NDetail::StringMarker) {
  526. Y_ENSURE(buf.Current() == NYson::NDetail::StringMarker);
  527. }
  528. buf.Next();
  529. const i32 len = buf.ReadVarI32();
  530. auto ptr = buf.Data();
  531. buf.Skip(len);
  532. return PgBlockItemFromNativeBinary(TStringBuf(ptr, len), TypeId_, Tmp_);
  533. }
  534. private:
  535. Oid TypeId_;
  536. std::vector<char> Tmp_;
  537. };
  538. template<typename T, arrow::Type::type Expected, typename ArrType>
  539. class TPgTopLevelFixedConverter : public IYtColumnConverter {
  540. public:
  541. using Fn = Datum(*)(const T&);
  542. TPgTopLevelFixedConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder) : Builder_(std::move(builder)) {}
  543. arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
  544. if (arrow::Type::DICTIONARY == data->type->id()) {
  545. auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
  546. Y_ENSURE(Expected == valType->id());
  547. return ConvertDict(data);
  548. } else {
  549. Y_ENSURE(Expected == data->type->id());
  550. return ConvertNonDict(data);
  551. }
  552. }
  553. arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
  554. ArrType arr(data);
  555. if (arr.null_count()) {
  556. for (i64 i = 0; i < data->length; ++i) {
  557. if (arr.IsNull(i)) {
  558. Builder_->Add(NUdf::TBlockItem{});
  559. } else {
  560. Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(arr.Value(i))));
  561. }
  562. }
  563. } else {
  564. for (i64 i = 0; i < data->length; ++i) {
  565. Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(arr.Value(i))));
  566. }
  567. }
  568. return Builder_->Build(false);
  569. }
  570. arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
  571. arrow::DictionaryArray dict(data);
  572. auto values = dict.dictionary()->data()->GetValues<T>(1);
  573. auto indices = dict.indices()->data()->GetValues<ui32>(1);
  574. if (dict.null_count()) {
  575. for (i64 i = 0; i < data->length; ++i) {
  576. if (dict.IsNull(i)) {
  577. Builder_->Add(NUdf::TBlockItem{});
  578. } else {
  579. Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(values[indices[i]])));
  580. }
  581. }
  582. } else {
  583. for (i64 i = 0; i < data->length; ++i) {
  584. Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(values[indices[i]])));
  585. }
  586. }
  587. return Builder_->Build(false);
  588. }
  589. private:
  590. std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
  591. };
  592. template<bool IsCString, bool FixedLength>
  593. class TPgTopLevelStringConverter : public IYtColumnConverter {
  594. public:
  595. TPgTopLevelStringConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, i32 typeLen) : Builder_(std::move(builder)), TypeLen_(typeLen) {
  596. if (typeLen == -2) {
  597. YQL_ENSURE(IsCString && !FixedLength);
  598. } else if (typeLen == -1) {
  599. YQL_ENSURE(!IsCString && !FixedLength);
  600. } else {
  601. YQL_ENSURE(typeLen >= 0 && FixedLength);
  602. }
  603. }
  604. constexpr NUdf::TBlockItem ConvertOnce(const uint8_t* res, size_t originalLen) {
  605. ui32 len;
  606. if constexpr (IsCString) {
  607. len = 1 + originalLen + sizeof(void*);
  608. } else if constexpr (FixedLength) {
  609. len = TypeLen_ + sizeof(void*);
  610. } else {
  611. len = VARHDRSZ + originalLen + sizeof(void*);
  612. }
  613. if (Tmp_.capacity() < len) {
  614. Tmp_.reserve(Max<ui64>(len, Tmp_.capacity() * 2));
  615. }
  616. len = AlignUp<ui32>(len, 8);
  617. Tmp_.resize(len);
  618. if constexpr (IsCString) {
  619. memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
  620. } else if constexpr (FixedLength) {
  621. memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
  622. } else {
  623. memcpy(Tmp_.data() + VARHDRSZ + sizeof(void*), res, originalLen);
  624. UpdateCleanVarSize((text*)(Tmp_.data() + sizeof(void*)), originalLen);
  625. }
  626. return NUdf::TBlockItem(NUdf::TStringRef(Tmp_.data(), len));
  627. }
  628. arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
  629. if (arrow::Type::DICTIONARY == data->type->id()) {
  630. auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
  631. Y_ENSURE(arrow::Type::BINARY == valType->id() || arrow::Type::STRING == valType->id());
  632. return ConvertDict(data);
  633. } else {
  634. if (arrow::Type::STRING == data->type->id()) {
  635. auto res = arrow::compute::Cast(data, std::make_shared<arrow::BinaryType>());
  636. Y_ENSURE(res.ok());
  637. data = res->array();
  638. }
  639. Y_ENSURE(arrow::Type::BINARY == data->type->id());
  640. return ConvertNonDict(data);
  641. }
  642. }
  643. arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
  644. arrow::BinaryArray arr(data);
  645. if (arr.null_count()) {
  646. for (i64 i = 0; i < data->length; ++i) {
  647. if (arr.IsNull(i)) {
  648. Builder_->Add(NUdf::TBlockItem{});
  649. } else {
  650. i32 len;
  651. auto res = arr.GetValue(i, &len);
  652. Builder_->Add(ConvertOnce(res, len));
  653. }
  654. }
  655. } else {
  656. for (i64 i = 0; i < data->length; ++i) {
  657. i32 len;
  658. auto res = arr.GetValue(i, &len);
  659. Builder_->Add(ConvertOnce(res, len));
  660. }
  661. }
  662. return Builder_->Build(false);
  663. }
  664. arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
  665. arrow::DictionaryArray dict(data);
  666. if (arrow::Type::STRING == data->dictionary->type->id()) {
  667. auto res = arrow::compute::Cast(data->dictionary, std::make_shared<arrow::BinaryType>());
  668. Y_ENSURE(res.ok());
  669. data->dictionary = res->array();
  670. }
  671. arrow::BinaryArray arr(data->dictionary);
  672. auto indices = dict.indices()->data()->GetValues<ui32>(1);
  673. if (dict.null_count()) {
  674. for (i64 i = 0; i < data->length; ++i) {
  675. if (dict.IsNull(i)) {
  676. Builder_->Add(NUdf::TBlockItem{});
  677. } else {
  678. i32 len;
  679. auto res = arr.GetValue(indices[i], &len);
  680. Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
  681. }
  682. }
  683. } else {
  684. for (i64 i = 0; i < data->length; ++i) {
  685. i32 len;
  686. auto res = arr.GetValue(indices[i], &len);
  687. Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
  688. }
  689. }
  690. return Builder_->Build(false);
  691. }
  692. private:
  693. std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
  694. std::vector<char> Tmp_;
  695. i32 TypeLen_;
  696. };
  697. class TPgTopLevelOtherConverter : public IYtColumnConverter {
  698. public:
  699. TPgTopLevelOtherConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, Oid typeId) : Builder_(std::move(builder)), TypeId_(typeId) {}
  700. inline NUdf::TBlockItem ConvertOnce(const uint8_t* res, size_t len) {
  701. return PgBlockItemFromNativeBinary(TStringBuf(reinterpret_cast<const char*>(res), len), TypeId_, Tmp_);
  702. }
  703. arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
  704. if (arrow::Type::DICTIONARY == data->type->id()) {
  705. auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
  706. Y_ENSURE(arrow::Type::BINARY == valType->id() || arrow::Type::STRING == valType->id());
  707. return ConvertDict(data);
  708. } else {
  709. Y_ENSURE(arrow::Type::BINARY == data->type->id() || arrow::Type::STRING == data->type->id());
  710. return ConvertNonDict(data);
  711. }
  712. }
  713. arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
  714. arrow::BinaryArray arr(data);
  715. if (arr.null_count()) {
  716. for (i64 i = 0; i < data->length; ++i) {
  717. if (arr.IsNull(i)) {
  718. Builder_->Add(NUdf::TBlockItem{});
  719. } else {
  720. i32 len;
  721. auto res = arr.GetValue(i, &len);
  722. Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
  723. }
  724. }
  725. } else {
  726. for (i64 i = 0; i < data->length; ++i) {
  727. i32 len;
  728. auto res = arr.GetValue(i, &len);
  729. Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
  730. }
  731. }
  732. return Builder_->Build(false);
  733. }
  734. arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
  735. arrow::DictionaryArray dict(data);
  736. if (arrow::Type::STRING == data->dictionary->type->id()) {
  737. auto res = arrow::compute::Cast(data->dictionary, std::make_shared<arrow::BinaryType>());
  738. Y_ENSURE(res.ok());
  739. data->dictionary = res->array();
  740. }
  741. arrow::BinaryArray arr(data->dictionary);
  742. auto indices = dict.indices()->data()->GetValues<ui32>(1);
  743. if (dict.null_count()) {
  744. for (i64 i = 0; i < data->length; ++i) {
  745. if (dict.IsNull(i)) {
  746. Builder_->Add(NUdf::TBlockItem{});
  747. } else {
  748. i32 len;
  749. auto res = arr.GetValue(indices[i], &len);
  750. Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
  751. }
  752. }
  753. } else {
  754. for (i64 i = 0; i < data->length; ++i) {
  755. i32 len;
  756. auto res = arr.GetValue(indices[i], &len);
  757. Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
  758. }
  759. }
  760. return Builder_->Build(false);
  761. }
  762. private:
  763. std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
  764. Oid TypeId_;
  765. std::vector<char> Tmp_;
  766. };
  767. std::unique_ptr<IYtColumnConverter> BuildPgTopLevelColumnReader(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, const NKikimr::NMiniKQL::TPgType* targetType) {
  768. YQL_ENSURE(targetType);
  769. switch (targetType->GetTypeId()) {
  770. case BOOLOID: {
  771. return std::make_unique<TPgTopLevelFixedConverter<bool, arrow::Type::BOOL, arrow::BooleanArray>>(std::move(builder));
  772. }
  773. case INT2OID: {
  774. return std::make_unique<TPgTopLevelFixedConverter<i16, arrow::Type::INT16, arrow::Int16Array>>(std::move(builder));
  775. }
  776. case INT4OID: {
  777. return std::make_unique<TPgTopLevelFixedConverter<i32, arrow::Type::INT32, arrow::Int32Array>>(std::move(builder));
  778. }
  779. case INT8OID: {
  780. return std::make_unique<TPgTopLevelFixedConverter<i64, arrow::Type::INT64, arrow::Int64Array>>(std::move(builder));
  781. }
  782. case FLOAT4OID: {
  783. return std::make_unique<TPgTopLevelFixedConverter<float, arrow::Type::DOUBLE, arrow::DoubleArray>>(std::move(builder));
  784. }
  785. case FLOAT8OID: {
  786. return std::make_unique<TPgTopLevelFixedConverter<double, arrow::Type::DOUBLE, arrow::DoubleArray>>(std::move(builder));
  787. }
  788. case BYTEAOID:
  789. case VARCHAROID:
  790. case TEXTOID:
  791. case NAMEOID:
  792. case CSTRINGOID: {
  793. auto typeLen = NPg::LookupType(targetType->GetTypeId()).TypeLen;
  794. if (typeLen == -2) {
  795. return std::make_unique<TPgTopLevelStringConverter<true, false>>(std::move(builder), typeLen);
  796. } else if (typeLen == -1) {
  797. return std::make_unique<TPgTopLevelStringConverter<false, false>>(std::move(builder), typeLen);
  798. } else {
  799. return std::make_unique<TPgTopLevelStringConverter<false, true>>(std::move(builder), typeLen);
  800. }
  801. }
  802. default:
  803. return std::make_unique<TPgTopLevelOtherConverter>(std::move(builder), targetType->GetTypeId());
  804. }
  805. }
  806. std::unique_ptr<IYsonComplexTypeReader> BuildPgYsonColumnReader(const NUdf::TPgTypeDescription& desc) {
  807. switch (desc.TypeId) {
  808. case BOOLOID: {
  809. return std::make_unique<TPgYsonFixedConverter<bool>>();
  810. }
  811. case INT2OID: {
  812. return std::make_unique<TPgYsonFixedConverter<i16>>();
  813. }
  814. case INT4OID: {
  815. return std::make_unique<TPgYsonFixedConverter<i32>>();
  816. }
  817. case INT8OID: {
  818. return std::make_unique<TPgYsonFixedConverter<i64>>();
  819. }
  820. case FLOAT4OID: {
  821. return std::make_unique<TPgYsonFixedConverter<float>>();
  822. }
  823. case FLOAT8OID: {
  824. return std::make_unique<TPgYsonFixedConverter<double>>();
  825. }
  826. case BYTEAOID:
  827. case NAMEOID:
  828. case VARCHAROID:
  829. case TEXTOID:
  830. case CSTRINGOID: {
  831. auto typeLen = NPg::LookupType(desc.TypeId).TypeLen;
  832. if (typeLen == -2) {
  833. return std::make_unique<TPgYsonStringConverter<true, false>>(typeLen);
  834. } else if (typeLen == -1) {
  835. return std::make_unique<TPgYsonStringConverter<false, false>>(typeLen);
  836. } else {
  837. return std::make_unique<TPgYsonStringConverter<false, true>>(typeLen);
  838. }
  839. }
  840. default:
  841. return std::make_unique<TPgYsonOtherConverter>(desc.TypeId);
  842. }
  843. }
  844. }