block_reader.h 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. #pragma once
  2. #include "block_item.h"
  3. #include "block_io_buffer.h"
  4. #include "dispatch_traits.h"
  5. #include "util.h"
  6. #include <arrow/datum.h>
  7. #include <yql/essentials/public/decimal/yql_decimal.h>
  8. namespace NYql {
  9. namespace NUdf {
  10. class IBlockReader : private TNonCopyable {
  11. public:
  12. virtual ~IBlockReader() = default;
  13. // result will reference to Array/Scalar internals and will be valid until next call to GetItem/GetScalarItem
  14. virtual TBlockItem GetItem(const arrow::ArrayData& data, size_t index) = 0;
  15. virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0;
  16. virtual ui64 GetDataWeight(const arrow::ArrayData& data) const = 0;
  17. virtual ui64 GetDataWeight(TBlockItem item) const = 0;
  18. virtual ui64 GetDefaultValueWeight() const = 0;
  19. virtual void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const = 0;
  20. virtual void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const = 0;
  21. };
  22. struct TBlockItemSerializeProps {
  23. TMaybe<ui32> MaxSize = 0; // maximum size each block item can occupy in TOutputBuffer
  24. // (will be undefined for dynamic object like string)
  25. bool IsFixed = true; // true if each block item takes fixed size
  26. };
  27. template<typename T, bool Nullable, typename TDerived>
  28. class TFixedSizeBlockReaderBase : public IBlockReader {
  29. public:
  30. TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
  31. if constexpr (Nullable) {
  32. if (IsNull(data, index)) {
  33. return {};
  34. }
  35. }
  36. return static_cast<TDerived*>(this)->MakeBlockItem(data.GetValues<T>(1)[index]);
  37. }
  38. TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
  39. using namespace arrow::internal;
  40. if constexpr (Nullable) {
  41. if (!scalar.is_valid) {
  42. return {};
  43. }
  44. }
  45. if constexpr(std::is_same_v<T, NYql::NDecimal::TInt128>) {
  46. auto& fixedScalar = checked_cast<const arrow::FixedSizeBinaryScalar&>(scalar);
  47. T value; memcpy((void*)&value, fixedScalar.value->data(), sizeof(T));
  48. return static_cast<TDerived*>(this)->MakeBlockItem(value);
  49. } else {
  50. return static_cast<TDerived*>(this)->MakeBlockItem(
  51. *static_cast<const T*>(checked_cast<const PrimitiveScalarBase&>(scalar).data())
  52. );
  53. }
  54. }
  55. ui64 GetDataWeight(const arrow::ArrayData& data) const final {
  56. if constexpr (Nullable) {
  57. return (1 + sizeof(T)) * data.length;
  58. }
  59. return sizeof(T) * data.length;
  60. }
  61. ui64 GetDataWeight(TBlockItem item) const final {
  62. Y_UNUSED(item);
  63. return GetDefaultValueWeight();
  64. }
  65. ui64 GetDefaultValueWeight() const final {
  66. if constexpr (Nullable) {
  67. return 1 + sizeof(T);
  68. }
  69. return sizeof(T);
  70. }
  71. void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
  72. if constexpr (Nullable) {
  73. if (IsNull(data, index)) {
  74. return out.PushChar(0);
  75. }
  76. out.PushChar(1);
  77. }
  78. out.PushNumber(data.GetValues<T>(1)[index]);
  79. }
  80. void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final {
  81. if constexpr (Nullable) {
  82. if (!scalar.is_valid) {
  83. return out.PushChar(0);
  84. }
  85. out.PushChar(1);
  86. }
  87. if constexpr(std::is_same_v<T, NYql::NDecimal::TInt128>) {
  88. auto& fixedScalar = arrow::internal::checked_cast<const arrow::FixedSizeBinaryScalar&>(scalar);
  89. T value; memcpy((void*)&value, fixedScalar.value->data(), sizeof(T));
  90. out.PushNumber(value);
  91. } else {
  92. out.PushNumber(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
  93. }
  94. }
  95. };
  96. template<typename T, bool Nullable>
  97. class TFixedSizeBlockReader : public TFixedSizeBlockReaderBase<T, Nullable, TFixedSizeBlockReader<T, Nullable>> {
  98. public:
  99. TBlockItem MakeBlockItem(const T& item) const {
  100. return TBlockItem(item);
  101. }
  102. };
  103. template<bool Nullable>
  104. class TResourceBlockReader : public TFixedSizeBlockReaderBase<TUnboxedValuePod, Nullable, TResourceBlockReader<Nullable>> {
  105. public:
  106. TBlockItem MakeBlockItem(const TUnboxedValuePod& pod) const {
  107. TBlockItem item;
  108. std::memcpy(item.GetRawPtr(), pod.GetRawPtr(), sizeof(TBlockItem));
  109. return item;
  110. }
  111. };
  112. template<typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot TOriginal = NKikimr::NUdf::EDataSlot::String>
  113. class TStringBlockReader final : public IBlockReader {
  114. public:
  115. using TOffset = typename TStringType::offset_type;
  116. TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
  117. Y_DEBUG_ABORT_UNLESS(data.buffers.size() == 3);
  118. if constexpr (Nullable) {
  119. if (IsNull(data, index)) {
  120. return {};
  121. }
  122. }
  123. const TOffset* offsets = data.GetValues<TOffset>(1);
  124. const char* strData = data.GetValues<char>(2, 0);
  125. std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]);
  126. return TBlockItem(str);
  127. }
  128. TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
  129. if constexpr (Nullable) {
  130. if (!scalar.is_valid) {
  131. return {};
  132. }
  133. }
  134. auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value;
  135. std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size());
  136. return TBlockItem(str);
  137. }
  138. ui64 GetDataWeight(const arrow::ArrayData& data) const final {
  139. ui64 size = 0;
  140. if constexpr (Nullable) {
  141. size += data.length;
  142. }
  143. size += data.buffers[2] ? data.buffers[2]->size() : 0;
  144. return size;
  145. }
  146. ui64 GetDataWeight(TBlockItem item) const final {
  147. if constexpr (Nullable) {
  148. return 1 + (item ? item.AsStringRef().Size() : 0);
  149. }
  150. return item.AsStringRef().Size();
  151. }
  152. ui64 GetDefaultValueWeight() const final {
  153. if constexpr (Nullable) {
  154. return 1;
  155. }
  156. return 0;
  157. }
  158. void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
  159. Y_DEBUG_ABORT_UNLESS(data.buffers.size() == 3);
  160. if constexpr (Nullable) {
  161. if (IsNull(data, index)) {
  162. return out.PushChar(0);
  163. }
  164. out.PushChar(1);
  165. }
  166. const TOffset* offsets = data.GetValues<TOffset>(1);
  167. const char* strData = data.GetValues<char>(2, 0);
  168. std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]);
  169. out.PushString(str);
  170. }
  171. void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final {
  172. if constexpr (Nullable) {
  173. if (!scalar.is_valid) {
  174. return out.PushChar(0);
  175. }
  176. out.PushChar(1);
  177. }
  178. auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value;
  179. std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size());
  180. out.PushString(str);
  181. }
  182. };
  183. template<bool Nullable, typename TDerived>
  184. class TTupleBlockReaderBase : public IBlockReader {
  185. public:
  186. TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
  187. if constexpr (Nullable) {
  188. if (IsNull(data, index)) {
  189. return {};
  190. }
  191. }
  192. return static_cast<TDerived*>(this)->GetChildrenItems(data, index);
  193. }
  194. TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
  195. if constexpr (Nullable) {
  196. if (!scalar.is_valid) {
  197. return {};
  198. }
  199. }
  200. const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
  201. return static_cast<TDerived*>(this)->GetChildrenScalarItems(structScalar);
  202. }
  203. ui64 GetDataWeight(const arrow::ArrayData& data) const final {
  204. ui64 size = 0;
  205. if constexpr (Nullable) {
  206. size += data.length;
  207. }
  208. size += static_cast<const TDerived*>(this)->GetChildrenDataWeight(data);
  209. return size;
  210. }
  211. ui64 GetDataWeight(TBlockItem item) const final {
  212. return static_cast<const TDerived*>(this)->GetDataWeightImpl(item);
  213. }
  214. ui64 GetDefaultValueWeight() const final {
  215. ui64 size = 0;
  216. if constexpr (Nullable) {
  217. size = 1;
  218. }
  219. size += static_cast<const TDerived*>(this)->GetChildrenDefaultDataWeight();
  220. return size;
  221. }
  222. void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
  223. if constexpr (Nullable) {
  224. if (IsNull(data, index)) {
  225. return out.PushChar(0);
  226. }
  227. out.PushChar(1);
  228. }
  229. static_cast<const TDerived*>(this)->SaveChildrenItems(data, index, out);
  230. }
  231. void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final {
  232. if constexpr (Nullable) {
  233. if (!scalar.is_valid) {
  234. return out.PushChar(0);
  235. }
  236. out.PushChar(1);
  237. }
  238. const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
  239. static_cast<const TDerived*>(this)->SaveChildrenScalarItems(structScalar, out);
  240. }
  241. };
  242. template<bool Nullable>
  243. class TTupleBlockReader final : public TTupleBlockReaderBase<Nullable, TTupleBlockReader<Nullable>> {
  244. public:
  245. TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children)
  246. : Children(std::move(children))
  247. , Items(Children.size())
  248. {}
  249. TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) {
  250. for (ui32 i = 0; i < Children.size(); ++i) {
  251. Items[i] = Children[i]->GetItem(*data.child_data[i], index);
  252. }
  253. return TBlockItem(Items.data());
  254. }
  255. TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) {
  256. for (ui32 i = 0; i < Children.size(); ++i) {
  257. Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]);
  258. }
  259. return TBlockItem(Items.data());
  260. }
  261. size_t GetDataWeightImpl(const TBlockItem& item) const {
  262. const TBlockItem* items = nullptr;
  263. ui64 size = 0;
  264. if constexpr (Nullable) {
  265. if (!item) {
  266. return this->GetDefaultValueWeight();
  267. }
  268. size = 1;
  269. items = item.GetOptionalValue().GetElements();
  270. } else {
  271. items = item.GetElements();
  272. }
  273. for (ui32 i = 0; i < Children.size(); ++i) {
  274. size += Children[i]->GetDataWeight(items[i]);
  275. }
  276. return size;
  277. }
  278. size_t GetChildrenDataWeight(const arrow::ArrayData& data) const {
  279. size_t size = 0;
  280. for (ui32 i = 0; i < Children.size(); ++i) {
  281. size += Children[i]->GetDataWeight(*data.child_data[i]);
  282. }
  283. return size;
  284. }
  285. size_t GetChildrenDefaultDataWeight() const {
  286. size_t size = 0;
  287. for (ui32 i = 0; i < Children.size(); ++i) {
  288. size += Children[i]->GetDefaultValueWeight();
  289. }
  290. return size;
  291. }
  292. void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const {
  293. for (ui32 i = 0; i < Children.size(); ++i) {
  294. Children[i]->SaveItem(*data.child_data[i], index, out);
  295. }
  296. }
  297. void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const {
  298. for (ui32 i = 0; i < Children.size(); ++i) {
  299. Children[i]->SaveScalarItem(*structScalar.value[i], out);
  300. }
  301. }
  302. private:
  303. const TVector<std::unique_ptr<IBlockReader>> Children;
  304. TVector<TBlockItem> Items;
  305. };
  306. template<typename TTzDate, bool Nullable>
  307. class TTzDateBlockReader final : public TTupleBlockReaderBase<Nullable, TTzDateBlockReader<TTzDate, Nullable>> {
  308. public:
  309. TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) {
  310. Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2);
  311. TBlockItem item {DateReader_.GetItem(*data.child_data[0], index)};
  312. item.SetTimezoneId(TimezoneReader_.GetItem(*data.child_data[1], index).Get<ui16>());
  313. return item;
  314. }
  315. TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) {
  316. Y_DEBUG_ABORT_UNLESS(structScalar.value.size() == 2);
  317. TBlockItem item {DateReader_.GetScalarItem(*structScalar.value[0])};
  318. item.SetTimezoneId(TimezoneReader_.GetScalarItem(*structScalar.value[1]).Get<ui16>());
  319. return item;
  320. }
  321. size_t GetChildrenDataWeight(const arrow::ArrayData& data) const {
  322. Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2);
  323. size_t size = 0;
  324. size += DateReader_.GetDataWeight(*data.child_data[0]);
  325. size += TimezoneReader_.GetDataWeight(*data.child_data[1]);
  326. return size;
  327. }
  328. size_t GetDataWeightImpl(const TBlockItem& item) const {
  329. Y_UNUSED(item);
  330. return GetChildrenDefaultDataWeight();
  331. }
  332. size_t GetChildrenDefaultDataWeight() const {
  333. ui64 size = 0;
  334. if constexpr (Nullable) {
  335. size = 1;
  336. }
  337. size += DateReader_.GetDefaultValueWeight();
  338. size += TimezoneReader_.GetDefaultValueWeight();
  339. return size;
  340. }
  341. void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const {
  342. DateReader_.SaveItem(*data.child_data[0], index, out);
  343. TimezoneReader_.SaveItem(*data.child_data[1], index, out);
  344. }
  345. void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const {
  346. DateReader_.SaveScalarItem(*structScalar.value[0], out);
  347. TimezoneReader_.SaveScalarItem(*structScalar.value[1], out);
  348. }
  349. private:
  350. TFixedSizeBlockReader<typename TDataType<TTzDate>::TLayout, /* Nullable */false> DateReader_;
  351. TFixedSizeBlockReader<ui16, /* Nullable */false> TimezoneReader_;
  352. };
  353. class TExternalOptionalBlockReader final : public IBlockReader {
  354. public:
  355. TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner)
  356. : Inner(std::move(inner))
  357. {}
  358. TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
  359. if (IsNull(data, index)) {
  360. return {};
  361. }
  362. return Inner->GetItem(*data.child_data.front(), index).MakeOptional();
  363. }
  364. TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
  365. if (!scalar.is_valid) {
  366. return {};
  367. }
  368. const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
  369. return Inner->GetScalarItem(*structScalar.value.front()).MakeOptional();
  370. }
  371. ui64 GetDataWeight(const arrow::ArrayData& data) const final {
  372. return data.length + Inner->GetDataWeight(*data.child_data.front());
  373. }
  374. ui64 GetDataWeight(TBlockItem item) const final {
  375. if (!item) {
  376. return GetDefaultValueWeight();
  377. }
  378. return 1 + Inner->GetDataWeight(item.GetOptionalValue());
  379. }
  380. ui64 GetDefaultValueWeight() const final {
  381. return 1 + Inner->GetDefaultValueWeight();
  382. }
  383. void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
  384. if (IsNull(data, index)) {
  385. return out.PushChar(0);
  386. }
  387. out.PushChar(1);
  388. Inner->SaveItem(*data.child_data.front(), index, out);
  389. }
  390. void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final {
  391. if (!scalar.is_valid) {
  392. return out.PushChar(0);
  393. }
  394. out.PushChar(1);
  395. const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
  396. Inner->SaveScalarItem(*structScalar.value.front(), out);
  397. }
  398. private:
  399. const std::unique_ptr<IBlockReader> Inner;
  400. };
  401. struct TReaderTraits {
  402. using TResult = IBlockReader;
  403. template <bool Nullable>
  404. using TTuple = TTupleBlockReader<Nullable>;
  405. template <typename T, bool Nullable>
  406. using TFixedSize = TFixedSizeBlockReader<T, Nullable>;
  407. template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot TOriginal>
  408. using TStrings = TStringBlockReader<TStringType, Nullable, TOriginal>;
  409. using TExtOptional = TExternalOptionalBlockReader;
  410. template<bool Nullable>
  411. using TResource = TResourceBlockReader<Nullable>;
  412. template<typename TTzDate, bool Nullable>
  413. using TTzDateReader = TTzDateBlockReader<TTzDate, Nullable>;
  414. constexpr static bool PassType = false;
  415. static std::unique_ptr<TResult> MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) {
  416. Y_UNUSED(pgBuilder);
  417. if (desc.PassByValue) {
  418. return std::make_unique<TFixedSize<ui64, true>>();
  419. } else {
  420. return std::make_unique<TStrings<arrow::BinaryType, true, NKikimr::NUdf::EDataSlot::String>>();
  421. }
  422. }
  423. static std::unique_ptr<TResult> MakeResource(bool isOptional) {
  424. if (isOptional) {
  425. return std::make_unique<TResource<true>>();
  426. } else {
  427. return std::make_unique<TResource<false>>();
  428. }
  429. }
  430. template<typename TTzDate>
  431. static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
  432. if (isOptional) {
  433. return std::make_unique<TTzDateReader<TTzDate, true>>();
  434. } else {
  435. return std::make_unique<TTzDateReader<TTzDate, false>>();
  436. }
  437. }
  438. };
  439. inline std::unique_ptr<IBlockReader> MakeBlockReader(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
  440. return DispatchByArrowTraits<TReaderTraits>(typeInfoHelper, type, nullptr);
  441. }
  442. inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper, const TType* type, TBlockItemSerializeProps& props) {
  443. if (!props.MaxSize.Defined()) {
  444. return;
  445. }
  446. for (;;) {
  447. TOptionalTypeInspector typeOpt(typeInfoHelper, type);
  448. if (!typeOpt) {
  449. break;
  450. }
  451. props.MaxSize = *props.MaxSize + 1;
  452. props.IsFixed = false;
  453. type = typeOpt.GetItemType();
  454. }
  455. TStructTypeInspector typeStruct(typeInfoHelper, type);
  456. if (typeStruct) {
  457. for (ui32 i = 0; i < typeStruct.GetMembersCount(); ++i) {
  458. UpdateBlockItemSerializeProps(typeInfoHelper, typeStruct.GetMemberType(i), props);
  459. }
  460. return;
  461. }
  462. TTupleTypeInspector typeTuple(typeInfoHelper, type);
  463. if (typeTuple) {
  464. for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) {
  465. UpdateBlockItemSerializeProps(typeInfoHelper, typeTuple.GetElementType(i), props);
  466. }
  467. return;
  468. }
  469. TDataTypeInspector typeData(typeInfoHelper, type);
  470. if (typeData) {
  471. auto typeId = typeData.GetTypeId();
  472. auto slot = GetDataSlot(typeId);
  473. auto& dataTypeInfo = GetDataTypeInfo(slot);
  474. if (dataTypeInfo.Features & DecimalType) {
  475. *props.MaxSize += 16;
  476. } else if (dataTypeInfo.Features & StringType) {
  477. props.MaxSize = {};
  478. props.IsFixed = false;
  479. } else if (dataTypeInfo.Features & TzDateType) {
  480. *props.MaxSize += dataTypeInfo.FixedSize + sizeof(TTimezoneId);
  481. }
  482. else {
  483. *props.MaxSize += dataTypeInfo.FixedSize;
  484. }
  485. return;
  486. }
  487. TPgTypeInspector typePg(typeInfoHelper, type);
  488. if (typePg) {
  489. auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId());
  490. if (desc->PassByValue) {
  491. *props.MaxSize += 1 + 8;
  492. } else {
  493. props.MaxSize = {};
  494. props.IsFixed = false;
  495. }
  496. return;
  497. }
  498. Y_ENSURE(false, "Unsupported type");
  499. }
  500. }
  501. }