mkql_block_transport.cpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. #include "mkql_block_transport.h"
  2. #include "mkql_block_builder.h"
  3. #include <yql/essentials/minikql/mkql_type_builder.h>
  4. #include <yql/essentials/public/udf/arrow/dispatch_traits.h>
  5. #include <yql/essentials/public/udf/arrow/memory_pool.h>
  6. #include <yql/essentials/utils/yql_panic.h>
  7. namespace NKikimr::NMiniKQL {
  8. namespace {
  9. using NYql::TChunkedBuffer;
  10. TChunkedBuffer MakeChunkedBufferAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) {
  11. MKQLArrowUntrack(owner->data());
  12. return TChunkedBuffer(TStringBuf{data, size}, owner);
  13. }
  14. class TOwnedArrowBuffer : public arrow::Buffer {
  15. public:
  16. TOwnedArrowBuffer(TStringBuf span, const std::shared_ptr<const void>& owner)
  17. : arrow::Buffer(reinterpret_cast<const uint8_t*>(span.data()), span.size())
  18. , Owner_(owner)
  19. {
  20. }
  21. private:
  22. const std::shared_ptr<const void> Owner_;
  23. };
  24. std::shared_ptr<arrow::Buffer> MakeEmptyBuffer() {
  25. return std::make_shared<arrow::Buffer>(nullptr, 0);
  26. }
  27. bool HasArrrowAlignment(const void* buf) {
  28. return AlignUp(buf, NYql::NUdf::ArrowMemoryAlignment) == buf;
  29. }
  30. std::shared_ptr<arrow::Buffer> MakeZeroBuffer(size_t byteLen) {
  31. using namespace NYql::NUdf;
  32. if (!byteLen) {
  33. return MakeEmptyBuffer();
  34. }
  35. constexpr size_t NullWordCount = (MaxBlockSizeInBytes + sizeof(ui64) - 1) / sizeof(ui64);
  36. constexpr size_t ExtraAlignWords = (ArrowMemoryAlignment > sizeof(ui64)) ? (ArrowMemoryAlignment / sizeof(ui64) - 1) : 0;
  37. static const ui64 nulls[NullWordCount + ExtraAlignWords] = { 0 };
  38. // round all buffer length to 64 bytes
  39. size_t capacity = AlignUp(byteLen, size_t(64));
  40. if (capacity <= NullWordCount * sizeof(ui64)) {
  41. return std::make_shared<arrow::Buffer>(AlignUp(reinterpret_cast<const ui8*>(nulls), ArrowMemoryAlignment), byteLen);
  42. }
  43. auto result = AllocateResizableBuffer(byteLen, GetYqlMemoryPool());
  44. ARROW_OK(result->Resize(byteLen));
  45. std::memset(result->mutable_data(), 0, byteLen);
  46. return result;
  47. }
  48. std::shared_ptr<arrow::Buffer> MakeZeroBitmap(size_t bitCount) {
  49. // align up 8 byte boundary
  50. size_t byteCount = AlignUp(bitCount, size_t(64)) >> 3;
  51. return MakeZeroBuffer(byteCount);
  52. }
  53. bool NeedStoreBitmap(const arrow::ArrayData& data) {
  54. auto nullCount = data.GetNullCount();
  55. return nullCount != 0 && nullCount != data.length;
  56. }
  57. void StoreNullsSizes(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) {
  58. metaSink(data.GetNullCount());
  59. if (!NeedStoreBitmap(data)) {
  60. metaSink(0);
  61. return;
  62. }
  63. const ui64 desiredOffset = data.offset % 8;
  64. size_t nullBytes = AlignUp((size_t)data.length + desiredOffset, size_t(8)) >> 3;
  65. metaSink(nullBytes);
  66. }
  67. void LoadNullsSizes(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& nullsCount, TMaybe<ui64>& nullsSize) {
  68. YQL_ENSURE(!nullsCount.Defined() && !nullsSize.Defined(), "Attempt to load null sizes twice (most likely LoadArray() is not called)");
  69. nullsCount = metaSource();
  70. nullsSize = metaSource();
  71. }
  72. void StoreNulls(const arrow::ArrayData& data, TChunkedBuffer& dst) {
  73. if (!NeedStoreBitmap(data)) {
  74. return;
  75. }
  76. const ui64 desiredOffset = data.offset % 8;
  77. size_t nullBytes = AlignUp((size_t)data.length + desiredOffset, size_t(8)) >> 3;
  78. YQL_ENSURE(desiredOffset <= (size_t)data.offset);
  79. YQL_ENSURE((data.offset - desiredOffset) % 8 == 0);
  80. const char* nulls = data.GetValues<char>(0, 0) + (data.offset - desiredOffset) / 8;
  81. dst.Append(MakeChunkedBufferAndUntrack(data.buffers[0], nulls, nullBytes));
  82. }
  83. void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& result) {
  84. YQL_ENSURE(!result.Defined(), "Attempt to load buffer size twice (most likely LoadArray() is not called)");
  85. result = metaSource();
  86. }
  87. std::shared_ptr<arrow::Buffer> LoadBuffer(TChunkedBuffer& source, TMaybe<ui64> size) {
  88. using namespace NYql::NUdf;
  89. YQL_ENSURE(size.Defined(), "Buffer size is not loaded");
  90. if (!*size) {
  91. return MakeEmptyBuffer();
  92. }
  93. size_t toAppend = *size;
  94. const TChunkedBuffer::TChunk& front = source.Front();
  95. if (front.Buf.size() >= toAppend && HasArrrowAlignment(front.Buf.data())) {
  96. TStringBuf data = source.Front().Buf;
  97. data.Trunc(toAppend);
  98. auto result = std::make_shared<TOwnedArrowBuffer>(data, source.Front().Owner);
  99. source.Erase(toAppend);
  100. return result;
  101. }
  102. auto result = AllocateResizableBuffer(toAppend, NYql::NUdf::GetYqlMemoryPool());
  103. ARROW_OK(result->Resize((int64_t)toAppend));
  104. uint8_t* dst = result->mutable_data();
  105. while (toAppend) {
  106. const TChunkedBuffer::TChunk& front = source.Front();
  107. TStringBuf buf = front.Buf;
  108. YQL_ENSURE(!buf.empty(), "Premature end of buffer");
  109. size_t chunk = std::min(toAppend, buf.size());
  110. std::memcpy(dst, buf.data(), chunk);
  111. dst += chunk;
  112. toAppend -= chunk;
  113. source.Erase(chunk);
  114. }
  115. return result;
  116. }
  117. std::shared_ptr<arrow::Buffer> LoadNullsBitmap(TChunkedBuffer& source, TMaybe<ui64> nullCount, TMaybe<ui64> bitmapSize) {
  118. YQL_ENSURE(nullCount.Defined(), "Bitmap null count is not loaded");
  119. YQL_ENSURE(bitmapSize.Defined(), "Bitmap size is not loaded");
  120. if (*nullCount == 0) {
  121. YQL_ENSURE(!*bitmapSize);
  122. return {};
  123. }
  124. YQL_ENSURE(*bitmapSize);
  125. return LoadBuffer(source, bitmapSize);
  126. }
  127. class TBlockDeserializerBase : public IBlockDeserializer {
  128. public:
  129. TBlockDeserializerBase() = default;
  130. virtual void SetArrowType(const std::shared_ptr<arrow::DataType>& type) {
  131. ArrowType_ = type;
  132. }
  133. void LoadMetadata(const TMetadataSource& metaSource) final {
  134. if (IsNullable()) {
  135. LoadNullsSizes(metaSource, NullsCount_, NullsSize_);
  136. }
  137. DoLoadMetadata(metaSource);
  138. }
  139. virtual std::shared_ptr<arrow::ArrayData> LoadArray(TChunkedBuffer& src, ui64 blockLen, ui64 offset) final {
  140. YQL_ENSURE(blockLen > 0, "Should be handled earlier");
  141. std::shared_ptr<arrow::Buffer> nulls;
  142. i64 nullsCount = 0;
  143. if (IsNullable()) {
  144. YQL_ENSURE(NullsCount_.Defined() && NullsSize_.Defined(), "Nulls metadata should be loaded");
  145. if (*NullsCount_ != 0) {
  146. if (*NullsSize_ == 0) {
  147. auto result = MakeDefaultValue(blockLen, offset);
  148. ResetMetadata();
  149. return result;
  150. }
  151. nulls = LoadNullsBitmap(src, NullsCount_, NullsSize_);
  152. nullsCount = *NullsCount_;
  153. }
  154. }
  155. auto result = DoLoadArray(src, nulls, nullsCount, blockLen, offset);
  156. ResetMetadata();
  157. return result;
  158. }
  159. void ResetMetadata() {
  160. NullsCount_ = NullsSize_ = {};
  161. DoResetMetadata();
  162. }
  163. std::shared_ptr<arrow::ArrayData> MakeDefaultValue(ui64 blockLen, ui64 offset) const {
  164. std::shared_ptr<arrow::Buffer> nulls;
  165. i64 nullsCount = 0;
  166. if (IsNullable()) {
  167. nulls = MakeZeroBitmap(blockLen + offset);
  168. nullsCount = blockLen;
  169. }
  170. return DoMakeDefaultValue(nulls, nullsCount, blockLen, offset);
  171. }
  172. protected:
  173. virtual void DoLoadMetadata(const TMetadataSource& metaSource) = 0;
  174. virtual void DoResetMetadata() = 0;
  175. virtual bool IsNullable() const = 0;
  176. virtual std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const = 0;
  177. virtual std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) = 0;
  178. std::shared_ptr<arrow::DataType> ArrowType_;
  179. TMaybe<ui64> NullsCount_;
  180. TMaybe<ui64> NullsSize_;
  181. };
  182. template<size_t ObjectSize, bool Nullable>
  183. class TFixedSizeBlockSerializer final : public IBlockSerializer {
  184. public:
  185. TFixedSizeBlockSerializer() = default;
  186. size_t ArrayMetadataCount() const final {
  187. return Nullable ? 3 : 1;
  188. }
  189. void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
  190. if constexpr (Nullable) {
  191. StoreNullsSizes(data, metaSink);
  192. if (data.GetNullCount() == data.length) {
  193. metaSink(0);
  194. return;
  195. }
  196. }
  197. const ui64 desiredOffset = data.offset % 8;
  198. size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize;
  199. metaSink(dataBytes);
  200. }
  201. void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final {
  202. if constexpr (Nullable) {
  203. StoreNulls(data, dst);
  204. if (data.GetNullCount() == data.length) {
  205. return;
  206. }
  207. }
  208. const ui64 desiredOffset = data.offset % 8;
  209. const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - desiredOffset) * ObjectSize;
  210. size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize;
  211. dst.Append(MakeChunkedBufferAndUntrack(data.buffers[1], buf, dataBytes));
  212. }
  213. };
  214. template<size_t ObjectSize, bool Nullable>
  215. class TFixedSizeBlockDeserializer final : public TBlockDeserializerBase {
  216. public:
  217. TFixedSizeBlockDeserializer() = default;
  218. private:
  219. void DoLoadMetadata(const TMetadataSource& metaSource) final {
  220. LoadBufferSize(metaSource, DataSize_);
  221. }
  222. bool IsNullable() const final {
  223. return Nullable;
  224. }
  225. std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
  226. auto data = MakeZeroBuffer((blockLen + offset) * ObjectSize);
  227. return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, data }, nullsCount, offset);
  228. }
  229. std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
  230. auto data = LoadBuffer(src, DataSize_);
  231. return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, data}, nullsCount, offset);
  232. }
  233. void DoResetMetadata() final {
  234. DataSize_ = {};
  235. }
  236. TMaybe<ui64> DataSize_;
  237. };
  238. template<typename TStringType, bool Nullable>
  239. class TStringBlockSerializer final : public IBlockSerializer {
  240. using TOffset = typename TStringType::offset_type;
  241. public:
  242. TStringBlockSerializer() = default;
  243. private:
  244. size_t ArrayMetadataCount() const final {
  245. return Nullable ? 4 : 2;
  246. }
  247. void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
  248. if constexpr (Nullable) {
  249. StoreNullsSizes(data, metaSink);
  250. if (data.GetNullCount() == data.length) {
  251. metaSink(0);
  252. metaSink(0);
  253. return;
  254. }
  255. }
  256. const ui64 desiredOffset = data.offset % 8;
  257. size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset);
  258. metaSink(offsetsSize);
  259. metaSink(data.buffers[2]->size());
  260. }
  261. void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final {
  262. if constexpr (Nullable) {
  263. StoreNulls(data, dst);
  264. if (data.GetNullCount() == data.length) {
  265. return;
  266. }
  267. }
  268. const ui64 desiredOffset = data.offset % 8;
  269. const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - desiredOffset);
  270. size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset);
  271. dst.Append(MakeChunkedBufferAndUntrack(data.buffers[1], offsets, offsetsSize));
  272. const char* mainData = reinterpret_cast<const char*>(data.buffers[2]->data());
  273. size_t mainSize = data.buffers[2]->size();
  274. dst.Append(MakeChunkedBufferAndUntrack(data.buffers[2], mainData, mainSize));
  275. }
  276. };
  277. template<typename TStringType, bool Nullable>
  278. class TStringBlockDeserializer final : public TBlockDeserializerBase {
  279. using TOffset = typename TStringType::offset_type;
  280. public:
  281. TStringBlockDeserializer() = default;
  282. private:
  283. void DoLoadMetadata(const TMetadataSource& metaSource) final {
  284. LoadBufferSize(metaSource, OffsetsSize_);
  285. LoadBufferSize(metaSource, DataSize_);
  286. }
  287. std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
  288. auto offsets = MakeZeroBuffer((blockLen + 1 + offset) * sizeof(TOffset));
  289. auto data = MakeEmptyBuffer();
  290. return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, offsets, data }, nullsCount, offset);
  291. }
  292. std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
  293. auto offsets = LoadBuffer(src, OffsetsSize_);
  294. auto data = LoadBuffer(src, DataSize_);
  295. return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, offsets, data }, nullsCount, offset);
  296. }
  297. bool IsNullable() const final {
  298. return Nullable;
  299. }
  300. void DoResetMetadata() final {
  301. OffsetsSize_ = DataSize_ = {};
  302. }
  303. TMaybe<ui64> OffsetsSize_;
  304. TMaybe<ui64> DataSize_;
  305. };
  306. class TExtOptionalBlockSerializer final : public IBlockSerializer {
  307. public:
  308. explicit TExtOptionalBlockSerializer(std::unique_ptr<IBlockSerializer>&& inner)
  309. : Inner_(std::move(inner))
  310. {
  311. }
  312. private:
  313. size_t ArrayMetadataCount() const final {
  314. return 2 + Inner_->ArrayMetadataCount();
  315. }
  316. void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
  317. StoreNullsSizes(data, metaSink);
  318. if (data.GetNullCount() == data.length) {
  319. auto innerCount = Inner_->ArrayMetadataCount();
  320. for (size_t i = 0; i < innerCount; ++i) {
  321. metaSink(0);
  322. }
  323. } else {
  324. Inner_->StoreMetadata(*data.child_data[0], metaSink);
  325. }
  326. }
  327. void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final {
  328. StoreNulls(data, dst);
  329. if (data.GetNullCount() != data.length) {
  330. Inner_->StoreArray(*data.child_data[0], dst);
  331. }
  332. }
  333. const std::unique_ptr<IBlockSerializer> Inner_;
  334. };
  335. class TExtOptionalBlockDeserializer final : public TBlockDeserializerBase {
  336. public:
  337. explicit TExtOptionalBlockDeserializer(std::unique_ptr<TBlockDeserializerBase>&& inner)
  338. : Inner_(std::move(inner))
  339. {
  340. }
  341. private:
  342. void DoLoadMetadata(const TMetadataSource& metaSource) final {
  343. Inner_->LoadMetadata(metaSource);
  344. }
  345. std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
  346. return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->MakeDefaultValue(blockLen, offset) }, nullsCount, offset);
  347. }
  348. std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
  349. return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->LoadArray(src, blockLen, offset) }, nullsCount, offset);
  350. }
  351. bool IsNullable() const final {
  352. return true;
  353. }
  354. void DoResetMetadata() final {
  355. Inner_->ResetMetadata();
  356. }
  357. void SetArrowType(const std::shared_ptr<arrow::DataType>& type) final {
  358. ArrowType_ = type;
  359. YQL_ENSURE(type->fields().size() == 1);
  360. Inner_->SetArrowType(type->fields().front()->type());
  361. }
  362. const std::unique_ptr<TBlockDeserializerBase> Inner_;
  363. };
  364. template<bool Nullable, typename TDerived>
  365. class TTupleBlockSerializerBase : public IBlockSerializer {
  366. size_t ArrayMetadataCount() const final {
  367. size_t result = static_cast<const TDerived*>(this)->GetChildrenMetaCount();
  368. if constexpr (Nullable) {
  369. result += 2;
  370. }
  371. return result;
  372. }
  373. void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
  374. if constexpr (Nullable) {
  375. StoreNullsSizes(data, metaSink);
  376. }
  377. if (data.GetNullCount() == data.length) {
  378. auto childCount = static_cast<const TDerived*>(this)->GetChildrenMetaCount();
  379. for (size_t i = 0; i < childCount; ++i) {
  380. metaSink(0);
  381. }
  382. } else {
  383. static_cast<const TDerived*>(this)->StoreChildrenMetadata(data.child_data, metaSink);
  384. }
  385. }
  386. void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final {
  387. if constexpr (Nullable) {
  388. StoreNulls(data, dst);
  389. }
  390. if (data.GetNullCount() != data.length) {
  391. static_cast<const TDerived*>(this)->StoreChildrenArrays(data.child_data, dst);
  392. }
  393. }
  394. };
  395. template<bool Nullable>
  396. class TTupleBlockSerializer final : public TTupleBlockSerializerBase<Nullable, TTupleBlockSerializer<Nullable>> {
  397. public:
  398. TTupleBlockSerializer(TVector<std::unique_ptr<IBlockSerializer>>&& children)
  399. : Children_(std::move(children))
  400. {}
  401. size_t GetChildrenMetaCount() const {
  402. size_t result = 0;
  403. for (const auto& child : Children_) {
  404. result += child->ArrayMetadataCount();
  405. }
  406. return result;
  407. }
  408. void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data,
  409. const IBlockSerializer::TMetadataSink& metaSink) const {
  410. for (size_t i = 0; i < Children_.size(); ++i) {
  411. Children_[i]->StoreMetadata(*child_data[i], metaSink);
  412. }
  413. }
  414. void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TChunkedBuffer& dst) const {
  415. for (size_t i = 0; i < Children_.size(); ++i) {
  416. Children_[i]->StoreArray(*child_data[i], dst);
  417. }
  418. }
  419. private:
  420. const TVector<std::unique_ptr<IBlockSerializer>> Children_;
  421. };
  422. template<typename TDate, bool Nullable>
  423. class TTzDateBlockSerializer final : public TTupleBlockSerializerBase<Nullable, TTzDateBlockSerializer<TDate, Nullable>> {
  424. public:
  425. TTzDateBlockSerializer() = default;
  426. size_t GetChildrenMetaCount() const {
  427. return DateSerialiser_.ArrayMetadataCount() + TzSerialiser_.ArrayMetadataCount();
  428. }
  429. void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data,
  430. const IBlockSerializer::TMetadataSink& metaSink) const {
  431. DateSerialiser_.StoreMetadata(*child_data[0], metaSink);
  432. TzSerialiser_.StoreMetadata(*child_data[1], metaSink);
  433. }
  434. void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TChunkedBuffer& dst) const {
  435. DateSerialiser_.StoreArray(*child_data[0], dst);
  436. TzSerialiser_.StoreArray(*child_data[1], dst);
  437. }
  438. private:
  439. using TDateLayout = typename NUdf::TDataType<TDate>::TLayout;
  440. TFixedSizeBlockSerializer<sizeof(TDateLayout), false> DateSerialiser_;
  441. TFixedSizeBlockSerializer<sizeof(NYql::NUdf::TTimezoneId), false> TzSerialiser_;
  442. };
  443. template<bool Nullable>
  444. class TTupleBlockDeserializer final : public TBlockDeserializerBase {
  445. public:
  446. explicit TTupleBlockDeserializer(TVector<std::unique_ptr<TBlockDeserializerBase>>&& children)
  447. : Children_(std::move(children))
  448. {
  449. }
  450. private:
  451. void DoLoadMetadata(const TMetadataSource& metaSource) final {
  452. for (auto& child : Children_) {
  453. child->LoadMetadata(metaSource);
  454. }
  455. }
  456. std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
  457. std::vector<std::shared_ptr<arrow::ArrayData>> childData;
  458. for (auto& child : Children_) {
  459. childData.emplace_back(child->MakeDefaultValue(blockLen, offset));
  460. }
  461. return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
  462. }
  463. std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
  464. std::vector<std::shared_ptr<arrow::ArrayData>> childData;
  465. for (auto& child : Children_) {
  466. childData.emplace_back(child->LoadArray(src, blockLen, offset));
  467. }
  468. return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
  469. }
  470. void DoResetMetadata() final {
  471. for (auto& child : Children_) {
  472. child->ResetMetadata();
  473. }
  474. }
  475. bool IsNullable() const final {
  476. return Nullable;
  477. }
  478. void SetArrowType(const std::shared_ptr<arrow::DataType>& type) final {
  479. ArrowType_ = type;
  480. YQL_ENSURE(type->fields().size() == Children_.size());
  481. for (size_t i = 0; i < Children_.size(); ++i) {
  482. Children_[i]->SetArrowType(type->field(i)->type());
  483. }
  484. }
  485. const TVector<std::unique_ptr<TBlockDeserializerBase>> Children_;
  486. };
  487. template<typename TDate, bool Nullable>
  488. class TTzDateBlockDeserializer final : public TBlockDeserializerBase {
  489. public:
  490. TTzDateBlockDeserializer() = default;
  491. private:
  492. void DoLoadMetadata(const TMetadataSource& metaSource) final {
  493. DateDeserialiser_.LoadMetadata(metaSource);
  494. TzDeserialiser_.LoadMetadata(metaSource);
  495. }
  496. std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
  497. std::vector<std::shared_ptr<arrow::ArrayData>> childData;
  498. childData.emplace_back(DateDeserialiser_.MakeDefaultValue(blockLen, offset));
  499. childData.emplace_back(TzDeserialiser_.MakeDefaultValue(blockLen, offset));
  500. return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
  501. }
  502. std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
  503. std::vector<std::shared_ptr<arrow::ArrayData>> childData;
  504. childData.emplace_back(DateDeserialiser_.LoadArray(src, blockLen, offset));
  505. childData.emplace_back(TzDeserialiser_.LoadArray(src, blockLen, offset));
  506. return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
  507. }
  508. void DoResetMetadata() final {
  509. DateDeserialiser_.ResetMetadata();
  510. TzDeserialiser_.ResetMetadata();
  511. }
  512. bool IsNullable() const final {
  513. return Nullable;
  514. }
  515. void SetArrowType(const std::shared_ptr<arrow::DataType>& type) final {
  516. YQL_ENSURE(type->fields().size() == 2);
  517. ArrowType_ = type;
  518. DateDeserialiser_.SetArrowType(type->field(0)->type());
  519. TzDeserialiser_.SetArrowType(type->field(1)->type());
  520. }
  521. using TDateLayout = typename NUdf::TDataType<TDate>::TLayout;
  522. TFixedSizeBlockDeserializer<sizeof(TDateLayout), false> DateDeserialiser_;
  523. TFixedSizeBlockDeserializer<sizeof(NYql::NUdf::TTimezoneId), false> TzDeserialiser_;
  524. };
  525. struct TSerializerTraits {
  526. using TResult = IBlockSerializer;
  527. template <bool Nullable>
  528. using TTuple = TTupleBlockSerializer<Nullable>;
  529. template <typename T, bool Nullable>
  530. using TFixedSize = TFixedSizeBlockSerializer<sizeof(T), Nullable>;
  531. template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String>
  532. using TStrings = TStringBlockSerializer<TStringType, Nullable>;
  533. using TExtOptional = TExtOptionalBlockSerializer;
  534. template<typename TTzDateType, bool Nullable>
  535. using TTzDate = TTzDateBlockSerializer<TTzDateType, Nullable>;
  536. constexpr static bool PassType = false;
  537. static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
  538. Y_UNUSED(pgBuilder);
  539. if (desc.PassByValue) {
  540. return std::make_unique<TFixedSize<ui64, true>>();
  541. }
  542. return std::make_unique<TStrings<arrow::BinaryType, true>>();
  543. }
  544. static std::unique_ptr<TResult> MakeResource(bool isOptional) {
  545. Y_UNUSED(isOptional);
  546. ythrow yexception() << "Serializer not implemented for block resources";
  547. }
  548. template<typename TTzDateType>
  549. static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
  550. if (isOptional) {
  551. return std::make_unique<TTzDate<TTzDateType, true>>();
  552. }
  553. else {
  554. return std::make_unique<TTzDate<TTzDateType, false>>();
  555. }
  556. }
  557. };
  558. struct TDeserializerTraits {
  559. using TResult = TBlockDeserializerBase;
  560. template <bool Nullable>
  561. using TTuple = TTupleBlockDeserializer<Nullable>;
  562. template <typename T, bool Nullable>
  563. using TFixedSize = TFixedSizeBlockDeserializer<sizeof(T), Nullable>;
  564. template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String>
  565. using TStrings = TStringBlockDeserializer<TStringType, Nullable>;
  566. using TExtOptional = TExtOptionalBlockDeserializer;
  567. template<typename TTzDateType, bool Nullable>
  568. using TTzDate = TTzDateBlockDeserializer<TTzDateType, Nullable>;
  569. constexpr static bool PassType = false;
  570. static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
  571. Y_UNUSED(pgBuilder);
  572. if (desc.PassByValue) {
  573. return std::make_unique<TFixedSize<ui64, true>>();
  574. }
  575. return std::make_unique<TStrings<arrow::BinaryType, true>>();
  576. }
  577. static std::unique_ptr<TResult> MakeResource(bool isOptional) {
  578. Y_UNUSED(isOptional);
  579. ythrow yexception() << "Deserializer not implemented for block resources";
  580. }
  581. template<typename TTzDateType>
  582. static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
  583. if (isOptional) {
  584. return std::make_unique<TTzDate<TTzDateType, true>>();
  585. }
  586. else {
  587. return std::make_unique<TTzDate<TTzDateType, false>>();
  588. }
  589. }
  590. };
  591. } // namespace
  592. std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) {
  593. return NYql::NUdf::DispatchByArrowTraits<TSerializerTraits>(typeInfoHelper, type, nullptr);
  594. }
  595. std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) {
  596. std::unique_ptr<TBlockDeserializerBase> result = NYql::NUdf::DispatchByArrowTraits<TDeserializerTraits>(typeInfoHelper, type, nullptr);
  597. result->SetArrowType(NYql::NUdf::GetArrowType(typeInfoHelper, type));
  598. return std::move(result);
  599. }
  600. } // namespace NKikimr::NMiniKQL