block_reader.h 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778
  1. #pragma once
  2. #include "block_item.h"
  3. #include "block_io_buffer.h"
  4. #include "util.h"
  5. #include <arrow/datum.h>
  6. #include <yql/essentials/public/udf/udf_type_inspection.h>
  7. #include <yql/essentials/public/udf/udf_value_builder.h>
  8. namespace NYql {
  9. namespace NUdf {
  10. class IBlockReader : private TNonCopyable {
  11. public:
  12. virtual ~IBlockReader() = default;
  13. // result will reference to Array/Scalar internals and will be valid until next call to GetItem/GetScalarItem
  14. virtual TBlockItem GetItem(const arrow::ArrayData& data, size_t index) = 0;
  15. virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0;
  16. virtual ui64 GetDataWeight(const arrow::ArrayData& data) const = 0;
  17. virtual ui64 GetDataWeight(TBlockItem item) const = 0;
  18. virtual ui64 GetDefaultValueWeight() const = 0;
  19. virtual void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const = 0;
  20. virtual void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const = 0;
  21. };
  22. struct TBlockItemSerializeProps {
  23. TMaybe<ui32> MaxSize = 0; // maximum size each block item can occupy in TOutputBuffer
  24. // (will be undefined for dynamic object like string)
  25. bool IsFixed = true; // true if each block item takes fixed size
  26. };
  27. template<typename T, bool Nullable, typename TDerived>
  28. class TFixedSizeBlockReaderBase : public IBlockReader {
  29. public:
  30. TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
  31. if constexpr (Nullable) {
  32. if (IsNull(data, index)) {
  33. return {};
  34. }
  35. }
  36. return static_cast<TDerived*>(this)->MakeBlockItem(data.GetValues<T>(1)[index]);
  37. }
  38. TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
  39. using namespace arrow::internal;
  40. if constexpr (Nullable) {
  41. if (!scalar.is_valid) {
  42. return {};
  43. }
  44. }
  45. if constexpr(std::is_same_v<T, NYql::NDecimal::TInt128>) {
  46. auto& fixedScalar = checked_cast<const arrow::FixedSizeBinaryScalar&>(scalar);
  47. T value; memcpy((void*)&value, fixedScalar.value->data(), sizeof(T));
  48. return static_cast<TDerived*>(this)->MakeBlockItem(value);
  49. } else {
  50. return static_cast<TDerived*>(this)->MakeBlockItem(
  51. *static_cast<const T*>(checked_cast<const PrimitiveScalarBase&>(scalar).data())
  52. );
  53. }
  54. }
  55. ui64 GetDataWeight(const arrow::ArrayData& data) const final {
  56. if constexpr (Nullable) {
  57. return (1 + sizeof(T)) * data.length;
  58. }
  59. return sizeof(T) * data.length;
  60. }
  61. ui64 GetDataWeight(TBlockItem item) const final {
  62. Y_UNUSED(item);
  63. return GetDefaultValueWeight();
  64. }
  65. ui64 GetDefaultValueWeight() const final {
  66. if constexpr (Nullable) {
  67. return 1 + sizeof(T);
  68. }
  69. return sizeof(T);
  70. }
  71. void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
  72. if constexpr (Nullable) {
  73. if (IsNull(data, index)) {
  74. return out.PushChar(0);
  75. }
  76. out.PushChar(1);
  77. }
  78. out.PushNumber(data.GetValues<T>(1)[index]);
  79. }
  80. void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final {
  81. if constexpr (Nullable) {
  82. if (!scalar.is_valid) {
  83. return out.PushChar(0);
  84. }
  85. out.PushChar(1);
  86. }
  87. if constexpr(std::is_same_v<T, NYql::NDecimal::TInt128>) {
  88. auto& fixedScalar = arrow::internal::checked_cast<const arrow::FixedSizeBinaryScalar&>(scalar);
  89. T value; memcpy((void*)&value, fixedScalar.value->data(), sizeof(T));
  90. out.PushNumber(value);
  91. } else {
  92. out.PushNumber(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
  93. }
  94. }
  95. };
  96. template<typename T, bool Nullable>
  97. class TFixedSizeBlockReader : public TFixedSizeBlockReaderBase<T, Nullable, TFixedSizeBlockReader<T, Nullable>> {
  98. public:
  99. TBlockItem MakeBlockItem(const T& item) const {
  100. return TBlockItem(item);
  101. }
  102. };
  103. template<bool Nullable>
  104. class TResourceBlockReader : public TFixedSizeBlockReaderBase<TUnboxedValuePod, Nullable, TResourceBlockReader<Nullable>> {
  105. public:
  106. TBlockItem MakeBlockItem(const TUnboxedValuePod& pod) const {
  107. TBlockItem item;
  108. std::memcpy(item.GetRawPtr(), pod.GetRawPtr(), sizeof(TBlockItem));
  109. return item;
  110. }
  111. };
  112. template<typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot TOriginal = NKikimr::NUdf::EDataSlot::String>
  113. class TStringBlockReader final : public IBlockReader {
  114. public:
  115. using TOffset = typename TStringType::offset_type;
  116. TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
  117. Y_DEBUG_ABORT_UNLESS(data.buffers.size() == 3);
  118. if constexpr (Nullable) {
  119. if (IsNull(data, index)) {
  120. return {};
  121. }
  122. }
  123. const TOffset* offsets = data.GetValues<TOffset>(1);
  124. const char* strData = data.GetValues<char>(2, 0);
  125. std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]);
  126. return TBlockItem(str);
  127. }
  128. TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
  129. if constexpr (Nullable) {
  130. if (!scalar.is_valid) {
  131. return {};
  132. }
  133. }
  134. auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value;
  135. std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size());
  136. return TBlockItem(str);
  137. }
  138. ui64 GetDataWeight(const arrow::ArrayData& data) const final {
  139. ui64 size = 0;
  140. if constexpr (Nullable) {
  141. size += data.length;
  142. }
  143. size += data.buffers[2] ? data.buffers[2]->size() : 0;
  144. return size;
  145. }
  146. ui64 GetDataWeight(TBlockItem item) const final {
  147. if constexpr (Nullable) {
  148. return 1 + (item ? item.AsStringRef().Size() : 0);
  149. }
  150. return item.AsStringRef().Size();
  151. }
  152. ui64 GetDefaultValueWeight() const final {
  153. if constexpr (Nullable) {
  154. return 1;
  155. }
  156. return 0;
  157. }
  158. void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
  159. Y_DEBUG_ABORT_UNLESS(data.buffers.size() == 3);
  160. if constexpr (Nullable) {
  161. if (IsNull(data, index)) {
  162. return out.PushChar(0);
  163. }
  164. out.PushChar(1);
  165. }
  166. const TOffset* offsets = data.GetValues<TOffset>(1);
  167. const char* strData = data.GetValues<char>(2, 0);
  168. std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]);
  169. out.PushString(str);
  170. }
  171. void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final {
  172. if constexpr (Nullable) {
  173. if (!scalar.is_valid) {
  174. return out.PushChar(0);
  175. }
  176. out.PushChar(1);
  177. }
  178. auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value;
  179. std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size());
  180. out.PushString(str);
  181. }
  182. };
  183. template<bool Nullable, typename TDerived>
  184. class TTupleBlockReaderBase : public IBlockReader {
  185. public:
  186. TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
  187. if constexpr (Nullable) {
  188. if (IsNull(data, index)) {
  189. return {};
  190. }
  191. }
  192. return static_cast<TDerived*>(this)->GetChildrenItems(data, index);
  193. }
  194. TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
  195. if constexpr (Nullable) {
  196. if (!scalar.is_valid) {
  197. return {};
  198. }
  199. }
  200. const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
  201. return static_cast<TDerived*>(this)->GetChildrenScalarItems(structScalar);
  202. }
  203. ui64 GetDataWeight(const arrow::ArrayData& data) const final {
  204. ui64 size = 0;
  205. if constexpr (Nullable) {
  206. size += data.length;
  207. }
  208. size += static_cast<const TDerived*>(this)->GetChildrenDataWeight(data);
  209. return size;
  210. }
  211. ui64 GetDataWeight(TBlockItem item) const final {
  212. return static_cast<const TDerived*>(this)->GetDataWeightImpl(item);
  213. }
  214. ui64 GetDefaultValueWeight() const final {
  215. ui64 size = 0;
  216. if constexpr (Nullable) {
  217. size = 1;
  218. }
  219. size += static_cast<const TDerived*>(this)->GetChildrenDefaultDataWeight();
  220. return size;
  221. }
  222. void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
  223. if constexpr (Nullable) {
  224. if (IsNull(data, index)) {
  225. return out.PushChar(0);
  226. }
  227. out.PushChar(1);
  228. }
  229. static_cast<const TDerived*>(this)->SaveChildrenItems(data, index, out);
  230. }
  231. void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final {
  232. if constexpr (Nullable) {
  233. if (!scalar.is_valid) {
  234. return out.PushChar(0);
  235. }
  236. out.PushChar(1);
  237. }
  238. const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
  239. static_cast<const TDerived*>(this)->SaveChildrenScalarItems(structScalar, out);
  240. }
  241. };
  242. template<bool Nullable>
  243. class TTupleBlockReader final : public TTupleBlockReaderBase<Nullable, TTupleBlockReader<Nullable>> {
  244. public:
  245. TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children)
  246. : Children(std::move(children))
  247. , Items(Children.size())
  248. {}
  249. TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) {
  250. for (ui32 i = 0; i < Children.size(); ++i) {
  251. Items[i] = Children[i]->GetItem(*data.child_data[i], index);
  252. }
  253. return TBlockItem(Items.data());
  254. }
  255. TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) {
  256. for (ui32 i = 0; i < Children.size(); ++i) {
  257. Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]);
  258. }
  259. return TBlockItem(Items.data());
  260. }
  261. size_t GetDataWeightImpl(const TBlockItem& item) const {
  262. const TBlockItem* items = nullptr;
  263. ui64 size = 0;
  264. if constexpr (Nullable) {
  265. if (!item) {
  266. return this->GetDefaultValueWeight();
  267. }
  268. size = 1;
  269. items = item.GetOptionalValue().GetElements();
  270. } else {
  271. items = item.GetElements();
  272. }
  273. for (ui32 i = 0; i < Children.size(); ++i) {
  274. size += Children[i]->GetDataWeight(items[i]);
  275. }
  276. return size;
  277. }
  278. size_t GetChildrenDataWeight(const arrow::ArrayData& data) const {
  279. size_t size = 0;
  280. for (ui32 i = 0; i < Children.size(); ++i) {
  281. size += Children[i]->GetDataWeight(*data.child_data[i]);
  282. }
  283. return size;
  284. }
  285. size_t GetChildrenDefaultDataWeight() const {
  286. size_t size = 0;
  287. for (ui32 i = 0; i < Children.size(); ++i) {
  288. size += Children[i]->GetDefaultValueWeight();
  289. }
  290. return size;
  291. }
  292. void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const {
  293. for (ui32 i = 0; i < Children.size(); ++i) {
  294. Children[i]->SaveItem(*data.child_data[i], index, out);
  295. }
  296. }
  297. void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const {
  298. for (ui32 i = 0; i < Children.size(); ++i) {
  299. Children[i]->SaveScalarItem(*structScalar.value[i], out);
  300. }
  301. }
  302. private:
  303. const TVector<std::unique_ptr<IBlockReader>> Children;
  304. TVector<TBlockItem> Items;
  305. };
  306. template<typename TTzDate, bool Nullable>
  307. class TTzDateBlockReader final : public TTupleBlockReaderBase<Nullable, TTzDateBlockReader<TTzDate, Nullable>> {
  308. public:
  309. TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) {
  310. Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2);
  311. TBlockItem item {DateReader_.GetItem(*data.child_data[0], index)};
  312. item.SetTimezoneId(TimezoneReader_.GetItem(*data.child_data[1], index).Get<ui16>());
  313. return item;
  314. }
  315. TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) {
  316. Y_DEBUG_ABORT_UNLESS(structScalar.value.size() == 2);
  317. TBlockItem item {DateReader_.GetScalarItem(*structScalar.value[0])};
  318. item.SetTimezoneId(TimezoneReader_.GetScalarItem(*structScalar.value[1]).Get<ui16>());
  319. return item;
  320. }
  321. size_t GetChildrenDataWeight(const arrow::ArrayData& data) const {
  322. Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2);
  323. size_t size = 0;
  324. size += DateReader_.GetDataWeight(*data.child_data[0]);
  325. size += TimezoneReader_.GetDataWeight(*data.child_data[1]);
  326. return size;
  327. }
  328. size_t GetDataWeightImpl(const TBlockItem& item) const {
  329. Y_UNUSED(item);
  330. return GetChildrenDefaultDataWeight();
  331. }
  332. size_t GetChildrenDefaultDataWeight() const {
  333. ui64 size = 0;
  334. if constexpr (Nullable) {
  335. size = 1;
  336. }
  337. size += DateReader_.GetDefaultValueWeight();
  338. size += TimezoneReader_.GetDefaultValueWeight();
  339. return size;
  340. }
  341. void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const {
  342. DateReader_.SaveItem(*data.child_data[0], index, out);
  343. TimezoneReader_.SaveItem(*data.child_data[1], index, out);
  344. }
  345. void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const {
  346. DateReader_.SaveScalarItem(*structScalar.value[0], out);
  347. TimezoneReader_.SaveScalarItem(*structScalar.value[1], out);
  348. }
  349. private:
  350. TFixedSizeBlockReader<typename TDataType<TTzDate>::TLayout, /* Nullable */false> DateReader_;
  351. TFixedSizeBlockReader<ui16, /* Nullable */false> TimezoneReader_;
  352. };
  353. class TExternalOptionalBlockReader final : public IBlockReader {
  354. public:
  355. TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner)
  356. : Inner(std::move(inner))
  357. {}
  358. TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
  359. if (IsNull(data, index)) {
  360. return {};
  361. }
  362. return Inner->GetItem(*data.child_data.front(), index).MakeOptional();
  363. }
  364. TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
  365. if (!scalar.is_valid) {
  366. return {};
  367. }
  368. const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
  369. return Inner->GetScalarItem(*structScalar.value.front()).MakeOptional();
  370. }
  371. ui64 GetDataWeight(const arrow::ArrayData& data) const final {
  372. return data.length + Inner->GetDataWeight(*data.child_data.front());
  373. }
  374. ui64 GetDataWeight(TBlockItem item) const final {
  375. if (!item) {
  376. return GetDefaultValueWeight();
  377. }
  378. return 1 + Inner->GetDataWeight(item.GetOptionalValue());
  379. }
  380. ui64 GetDefaultValueWeight() const final {
  381. return 1 + Inner->GetDefaultValueWeight();
  382. }
  383. void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
  384. if (IsNull(data, index)) {
  385. return out.PushChar(0);
  386. }
  387. out.PushChar(1);
  388. Inner->SaveItem(*data.child_data.front(), index, out);
  389. }
  390. void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final {
  391. if (!scalar.is_valid) {
  392. return out.PushChar(0);
  393. }
  394. out.PushChar(1);
  395. const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
  396. Inner->SaveScalarItem(*structScalar.value.front(), out);
  397. }
  398. private:
  399. const std::unique_ptr<IBlockReader> Inner;
  400. };
  401. struct TReaderTraits {
  402. using TResult = IBlockReader;
  403. template <bool Nullable>
  404. using TTuple = TTupleBlockReader<Nullable>;
  405. template <typename T, bool Nullable>
  406. using TFixedSize = TFixedSizeBlockReader<T, Nullable>;
  407. template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot TOriginal>
  408. using TStrings = TStringBlockReader<TStringType, Nullable, TOriginal>;
  409. using TExtOptional = TExternalOptionalBlockReader;
  410. template<bool Nullable>
  411. using TResource = TResourceBlockReader<Nullable>;
  412. template<typename TTzDate, bool Nullable>
  413. using TTzDateReader = TTzDateBlockReader<TTzDate, Nullable>;
  414. static std::unique_ptr<TResult> MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) {
  415. Y_UNUSED(pgBuilder);
  416. if (desc.PassByValue) {
  417. return std::make_unique<TFixedSize<ui64, true>>();
  418. } else {
  419. return std::make_unique<TStrings<arrow::BinaryType, true, NKikimr::NUdf::EDataSlot::String>>();
  420. }
  421. }
  422. static std::unique_ptr<TResult> MakeResource(bool isOptional) {
  423. if (isOptional) {
  424. return std::make_unique<TResource<true>>();
  425. } else {
  426. return std::make_unique<TResource<false>>();
  427. }
  428. }
  429. template<typename TTzDate>
  430. static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
  431. if (isOptional) {
  432. return std::make_unique<TTzDateReader<TTzDate, true>>();
  433. } else {
  434. return std::make_unique<TTzDateReader<TTzDate, false>>();
  435. }
  436. }
  437. };
  438. template <typename TTraits>
  439. std::unique_ptr<typename TTraits::TResult> MakeTupleBlockReaderImpl(bool isOptional, TVector<std::unique_ptr<typename TTraits::TResult>>&& children) {
  440. if (isOptional) {
  441. return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children));
  442. } else {
  443. return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children));
  444. }
  445. }
  446. template <typename TTraits, typename T>
  447. std::unique_ptr<typename TTraits::TResult> MakeFixedSizeBlockReaderImpl(bool isOptional) {
  448. if (isOptional) {
  449. return std::make_unique<typename TTraits::template TFixedSize<T, true>>();
  450. } else {
  451. return std::make_unique<typename TTraits::template TFixedSize<T, false>>();
  452. }
  453. }
  454. template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot TOriginal>
  455. std::unique_ptr<typename TTraits::TResult> MakeStringBlockReaderImpl(bool isOptional) {
  456. if (isOptional) {
  457. return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>();
  458. } else {
  459. return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>();
  460. }
  461. }
  462. template<typename TTraits>
  463. concept CanInstantiateBlockReaderForDecimal = requires {
  464. typename TTraits::template TFixedSize<NYql::NDecimal::TInt128, true>;
  465. };
  466. template <typename TTraits>
  467. std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type, const IPgBuilder* pgBuilder) {
  468. const TType* unpacked = type;
  469. TOptionalTypeInspector typeOpt(typeInfoHelper, type);
  470. bool isOptional = false;
  471. if (typeOpt) {
  472. unpacked = typeOpt.GetItemType();
  473. isOptional = true;
  474. }
  475. TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked);
  476. TPgTypeInspector unpackedPg(typeInfoHelper, unpacked);
  477. if (unpackedOpt || typeOpt && unpackedPg) {
  478. // at least 2 levels of optionals
  479. ui32 nestLevel = 0;
  480. auto currentType = type;
  481. auto previousType = type;
  482. for (;;) {
  483. ++nestLevel;
  484. previousType = currentType;
  485. TOptionalTypeInspector currentOpt(typeInfoHelper, currentType);
  486. currentType = currentOpt.GetItemType();
  487. TOptionalTypeInspector nexOpt(typeInfoHelper, currentType);
  488. if (!nexOpt) {
  489. break;
  490. }
  491. }
  492. if (TPgTypeInspector(typeInfoHelper, currentType)) {
  493. previousType = currentType;
  494. ++nestLevel;
  495. }
  496. auto reader = MakeBlockReaderImpl<TTraits>(typeInfoHelper, previousType, pgBuilder);
  497. for (ui32 i = 1; i < nestLevel; ++i) {
  498. reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader));
  499. }
  500. return reader;
  501. }
  502. else {
  503. type = unpacked;
  504. }
  505. TStructTypeInspector typeStruct(typeInfoHelper, type);
  506. if (typeStruct) {
  507. TVector<std::unique_ptr<typename TTraits::TResult>> members;
  508. for (ui32 i = 0; i < typeStruct.GetMembersCount(); i++) {
  509. members.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeStruct.GetMemberType(i), pgBuilder));
  510. }
  511. // XXX: Use Tuple block reader for Struct.
  512. return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(members));
  513. }
  514. TTupleTypeInspector typeTuple(typeInfoHelper, type);
  515. if (typeTuple) {
  516. TVector<std::unique_ptr<typename TTraits::TResult>> children;
  517. for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) {
  518. children.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeTuple.GetElementType(i), pgBuilder));
  519. }
  520. return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(children));
  521. }
  522. TDataTypeInspector typeData(typeInfoHelper, type);
  523. if (typeData) {
  524. auto typeId = typeData.GetTypeId();
  525. switch (GetDataSlot(typeId)) {
  526. case NUdf::EDataSlot::Int8:
  527. return MakeFixedSizeBlockReaderImpl<TTraits, i8>(isOptional);
  528. case NUdf::EDataSlot::Bool:
  529. case NUdf::EDataSlot::Uint8:
  530. return MakeFixedSizeBlockReaderImpl<TTraits, ui8>(isOptional);
  531. case NUdf::EDataSlot::Int16:
  532. return MakeFixedSizeBlockReaderImpl<TTraits, i16>(isOptional);
  533. case NUdf::EDataSlot::Uint16:
  534. case NUdf::EDataSlot::Date:
  535. return MakeFixedSizeBlockReaderImpl<TTraits, ui16>(isOptional);
  536. case NUdf::EDataSlot::Int32:
  537. case NUdf::EDataSlot::Date32:
  538. return MakeFixedSizeBlockReaderImpl<TTraits, i32>(isOptional);
  539. case NUdf::EDataSlot::Uint32:
  540. case NUdf::EDataSlot::Datetime:
  541. return MakeFixedSizeBlockReaderImpl<TTraits, ui32>(isOptional);
  542. case NUdf::EDataSlot::Int64:
  543. case NUdf::EDataSlot::Interval:
  544. case NUdf::EDataSlot::Interval64:
  545. case NUdf::EDataSlot::Datetime64:
  546. case NUdf::EDataSlot::Timestamp64:
  547. return MakeFixedSizeBlockReaderImpl<TTraits, i64>(isOptional);
  548. case NUdf::EDataSlot::Uint64:
  549. case NUdf::EDataSlot::Timestamp:
  550. return MakeFixedSizeBlockReaderImpl<TTraits, ui64>(isOptional);
  551. case NUdf::EDataSlot::Float:
  552. return MakeFixedSizeBlockReaderImpl<TTraits, float>(isOptional);
  553. case NUdf::EDataSlot::Double:
  554. return MakeFixedSizeBlockReaderImpl<TTraits, double>(isOptional);
  555. case NUdf::EDataSlot::String:
  556. return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional);
  557. case NUdf::EDataSlot::Yson:
  558. return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional);
  559. case NUdf::EDataSlot::JsonDocument:
  560. return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional);
  561. case NUdf::EDataSlot::Utf8:
  562. return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional);
  563. case NUdf::EDataSlot::Json:
  564. return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional);
  565. case NUdf::EDataSlot::TzDate:
  566. return TTraits::template MakeTzDate<TTzDate>(isOptional);
  567. case NUdf::EDataSlot::TzDatetime:
  568. return TTraits::template MakeTzDate<TTzDatetime>(isOptional);
  569. case NUdf::EDataSlot::TzTimestamp:
  570. return TTraits::template MakeTzDate<TTzTimestamp>(isOptional);
  571. case NUdf::EDataSlot::TzDate32:
  572. return TTraits::template MakeTzDate<TTzDate32>(isOptional);
  573. case NUdf::EDataSlot::TzDatetime64:
  574. return TTraits::template MakeTzDate<TTzDatetime64>(isOptional);
  575. case NUdf::EDataSlot::TzTimestamp64:
  576. return TTraits::template MakeTzDate<TTzTimestamp64>(isOptional);
  577. case NUdf::EDataSlot::Decimal: {
  578. if constexpr (CanInstantiateBlockReaderForDecimal<TTraits>) {
  579. return MakeFixedSizeBlockReaderImpl<TTraits, NYql::NDecimal::TInt128>(isOptional);
  580. } else {
  581. Y_ENSURE(false, "Unsupported data slot");
  582. }
  583. }
  584. case NUdf::EDataSlot::Uuid:
  585. case NUdf::EDataSlot::DyNumber:
  586. Y_ENSURE(false, "Unsupported data slot");
  587. }
  588. }
  589. TResourceTypeInspector resource(typeInfoHelper, type);
  590. if (resource) {
  591. return TTraits::MakeResource(isOptional);
  592. }
  593. TPgTypeInspector typePg(typeInfoHelper, type);
  594. if (typePg) {
  595. auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId());
  596. return TTraits::MakePg(*desc, pgBuilder);
  597. }
  598. Y_ENSURE(false, "Unsupported type");
  599. }
  600. inline std::unique_ptr<IBlockReader> MakeBlockReader(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
  601. return MakeBlockReaderImpl<TReaderTraits>(typeInfoHelper, type, nullptr);
  602. }
  603. inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper, const TType* type, TBlockItemSerializeProps& props) {
  604. if (!props.MaxSize.Defined()) {
  605. return;
  606. }
  607. for (;;) {
  608. TOptionalTypeInspector typeOpt(typeInfoHelper, type);
  609. if (!typeOpt) {
  610. break;
  611. }
  612. props.MaxSize = *props.MaxSize + 1;
  613. props.IsFixed = false;
  614. type = typeOpt.GetItemType();
  615. }
  616. TStructTypeInspector typeStruct(typeInfoHelper, type);
  617. if (typeStruct) {
  618. for (ui32 i = 0; i < typeStruct.GetMembersCount(); ++i) {
  619. UpdateBlockItemSerializeProps(typeInfoHelper, typeStruct.GetMemberType(i), props);
  620. }
  621. return;
  622. }
  623. TTupleTypeInspector typeTuple(typeInfoHelper, type);
  624. if (typeTuple) {
  625. for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) {
  626. UpdateBlockItemSerializeProps(typeInfoHelper, typeTuple.GetElementType(i), props);
  627. }
  628. return;
  629. }
  630. TDataTypeInspector typeData(typeInfoHelper, type);
  631. if (typeData) {
  632. auto typeId = typeData.GetTypeId();
  633. auto slot = GetDataSlot(typeId);
  634. auto& dataTypeInfo = GetDataTypeInfo(slot);
  635. if (dataTypeInfo.Features & DecimalType) {
  636. *props.MaxSize += 16;
  637. } else if (dataTypeInfo.Features & StringType) {
  638. props.MaxSize = {};
  639. props.IsFixed = false;
  640. } else if (dataTypeInfo.Features & TzDateType) {
  641. *props.MaxSize += dataTypeInfo.FixedSize + sizeof(TTimezoneId);
  642. }
  643. else {
  644. *props.MaxSize += dataTypeInfo.FixedSize;
  645. }
  646. return;
  647. }
  648. TPgTypeInspector typePg(typeInfoHelper, type);
  649. if (typePg) {
  650. auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId());
  651. if (desc->PassByValue) {
  652. *props.MaxSize += 1 + 8;
  653. } else {
  654. props.MaxSize = {};
  655. props.IsFixed = false;
  656. }
  657. return;
  658. }
  659. Y_ENSURE(false, "Unsupported type");
  660. }
  661. }
  662. }