block_builder.h 56 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557
  1. #pragma once
  2. #include "util.h"
  3. #include "bit_util.h"
  4. #include "block_io_buffer.h"
  5. #include "block_item.h"
  6. #include <yql/essentials/public/udf/udf_value.h>
  7. #include <yql/essentials/public/udf/udf_value_builder.h>
  8. #include <yql/essentials/public/udf/udf_type_inspection.h>
  9. #include <arrow/datum.h>
  10. #include <arrow/c/bridge.h>
  11. #include <deque>
  12. namespace NYql {
  13. namespace NUdf {
  14. class IArrayBuilder {
  15. public:
  16. struct TArrayDataItem {
  17. const arrow::ArrayData* Data = nullptr;
  18. ui64 StartOffset;
  19. };
  20. virtual ~IArrayBuilder() = default;
  21. virtual size_t MaxLength() const = 0;
  22. virtual void Add(NUdf::TUnboxedValuePod value) = 0;
  23. virtual void Add(TBlockItem value) = 0;
  24. virtual void Add(TBlockItem value, size_t count) = 0;
  25. virtual void Add(TInputBuffer& input) = 0;
  26. virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0;
  27. virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) = 0;
  28. virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, const ui64* indexes, size_t count) = 0;
  29. virtual arrow::Datum Build(bool finish) = 0;
  30. };
  31. inline const IArrayBuilder::TArrayDataItem* LookupArrayDataItem(const IArrayBuilder::TArrayDataItem* arrays, size_t arrayCount, ui64& idx) {
  32. IArrayBuilder::TArrayDataItem lookup{ nullptr, idx };
  33. auto it = std::lower_bound(arrays, arrays + arrayCount, lookup, [](const auto& left, const auto& right) {
  34. return left.StartOffset < right.StartOffset;
  35. });
  36. if (it == arrays + arrayCount || it->StartOffset > idx) {
  37. --it;
  38. }
  39. Y_DEBUG_ABORT_UNLESS(it->StartOffset <= idx);
  40. idx -= it->StartOffset;
  41. return it;
  42. }
  43. class IScalarBuilder {
  44. public:
  45. virtual ~IScalarBuilder() = default;
  46. virtual arrow::Datum Build(TBlockItem value) const = 0;
  47. virtual arrow::Datum Build(NUdf::TUnboxedValuePod value) const = 0;
  48. };
  49. inline std::shared_ptr<arrow::DataType> GetArrowType(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
  50. auto arrowTypeHandle = typeInfoHelper.MakeArrowType(type);
  51. Y_ENSURE(arrowTypeHandle);
  52. ArrowSchema s;
  53. arrowTypeHandle->Export(&s);
  54. return ARROW_RESULT(arrow::ImportType(&s));
  55. }
  56. class TArrayBuilderBase : public IArrayBuilder {
  57. using Self = TArrayBuilderBase;
  58. public:
  59. using Ptr = std::unique_ptr<TArrayBuilderBase>;
  60. struct TBlockArrayTree {
  61. using Ptr = std::shared_ptr<TBlockArrayTree>;
  62. std::deque<std::shared_ptr<arrow::ArrayData>> Payload;
  63. std::vector<TBlockArrayTree::Ptr> Children;
  64. };
  65. TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated)
  66. : ArrowType(std::move(arrowType))
  67. , Pool(&pool)
  68. , MaxLen(maxLen)
  69. , MaxBlockSizeInBytes(typeInfoHelper.GetMaxBlockBytes())
  70. , TotalAllocated_(totalAllocated)
  71. {
  72. Y_ABORT_UNLESS(ArrowType);
  73. Y_ABORT_UNLESS(maxLen > 0);
  74. }
  75. TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated)
  76. : TArrayBuilderBase(typeInfoHelper, GetArrowType(typeInfoHelper, type), pool, maxLen, totalAllocated)
  77. {
  78. }
  79. size_t MaxLength() const final {
  80. return MaxLen;
  81. }
  82. void Add(NUdf::TUnboxedValuePod value) final {
  83. Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
  84. DoAdd(value);
  85. CurrLen++;
  86. }
  87. void Add(TBlockItem value) final {
  88. Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
  89. DoAdd(value);
  90. CurrLen++;
  91. }
  92. void Add(TBlockItem value, size_t count) final {
  93. Y_DEBUG_ABORT_UNLESS(CurrLen + count <= MaxLen);
  94. DoAdd(value, count);
  95. CurrLen += count;
  96. }
  97. void Add(TInputBuffer& input) final {
  98. Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
  99. DoAdd(input);
  100. CurrLen++;
  101. }
  102. void AddDefault() {
  103. Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
  104. DoAddDefault();
  105. CurrLen++;
  106. }
  107. inline void AddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
  108. TArrayDataItem item = { &array, 0 };
  109. Self::AddMany(&item, 1, beginIndex, count);
  110. }
  111. inline void AddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
  112. TArrayDataItem item = { &array, 0 };
  113. Self::AddMany(&item, 1, indexes, count);
  114. }
  115. void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) final {
  116. Y_ABORT_UNLESS(size_t(array.length) == bitmapSize);
  117. Y_ABORT_UNLESS(popCount <= bitmapSize);
  118. Y_ABORT_UNLESS(CurrLen + popCount <= MaxLen);
  119. if (popCount) {
  120. DoAddMany(array, sparseBitmap, popCount);
  121. }
  122. CurrLen += popCount;
  123. }
  124. void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) final {
  125. Y_ABORT_UNLESS(arrays);
  126. Y_ABORT_UNLESS(arrayCount > 0);
  127. if (arrayCount == 1) {
  128. Y_ABORT_UNLESS(arrays->Data);
  129. DoAddMany(*arrays->Data, beginIndex, count);
  130. } else {
  131. ui64 idx = beginIndex;
  132. auto item = LookupArrayDataItem(arrays, arrayCount, idx);
  133. size_t avail = item->Data->length;
  134. size_t toAdd = count;
  135. Y_ABORT_UNLESS(idx <= avail);
  136. while (toAdd) {
  137. size_t adding = std::min(avail, toAdd);
  138. DoAddMany(*item->Data, idx, adding);
  139. avail -= adding;
  140. toAdd -= adding;
  141. if (!avail && toAdd) {
  142. ++item;
  143. Y_ABORT_UNLESS(item < arrays + arrayCount);
  144. avail = item->Data->length;
  145. idx = 0;
  146. }
  147. }
  148. }
  149. CurrLen += count;
  150. }
  151. void AddMany(const TArrayDataItem* arrays, size_t arrayCount, const ui64* indexes, size_t count) final {
  152. Y_ABORT_UNLESS(arrays);
  153. Y_ABORT_UNLESS(arrayCount > 0);
  154. Y_ABORT_UNLESS(indexes);
  155. Y_ABORT_UNLESS(CurrLen + count <= MaxLen);
  156. if (arrayCount == 1) {
  157. Y_ABORT_UNLESS(arrays->Data);
  158. DoAddMany(*arrays->Data, indexes, count);
  159. CurrLen += count;
  160. } else {
  161. const IArrayBuilder::TArrayDataItem* currData = nullptr;
  162. TVector<ui64> currDataIndexes;
  163. for (size_t i = 0; i < count; ++i) {
  164. ui64 idx = indexes[i];
  165. const IArrayBuilder::TArrayDataItem* data = LookupArrayDataItem(arrays, arrayCount, idx);
  166. if (!currData) {
  167. currData = data;
  168. }
  169. if (data != currData) {
  170. DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size());
  171. CurrLen += currDataIndexes.size();
  172. currDataIndexes.clear();
  173. currData = data;
  174. }
  175. currDataIndexes.push_back(idx);
  176. }
  177. if (!currDataIndexes.empty()) {
  178. DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size());
  179. CurrLen += currDataIndexes.size();
  180. }
  181. }
  182. }
  183. arrow::Datum Build(bool finish) final {
  184. auto tree = BuildTree(finish);
  185. TVector<std::shared_ptr<arrow::ArrayData>> chunks;
  186. while (size_t size = CalcSliceSize(*tree)) {
  187. chunks.push_back(Slice(*tree, size));
  188. }
  189. return MakeArray(chunks);
  190. }
  191. TBlockArrayTree::Ptr BuildTree(bool finish) {
  192. auto result = DoBuildTree(finish);
  193. CurrLen = 0;
  194. return result;
  195. }
  196. protected:
  197. virtual void DoAdd(NUdf::TUnboxedValuePod value) = 0;
  198. virtual void DoAdd(TBlockItem value) = 0;
  199. virtual void DoAdd(TBlockItem value, size_t count) {
  200. for (size_t i = 0; i < count; ++i) {
  201. DoAdd(value);
  202. }
  203. }
  204. virtual void DoAdd(TInputBuffer& input) = 0;
  205. virtual void DoAddDefault() = 0;
  206. virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0;
  207. virtual void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) = 0;
  208. virtual void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) = 0;
  209. virtual TBlockArrayTree::Ptr DoBuildTree(bool finish) = 0;
  210. virtual void DoReserve() = 0;
  211. // will be called immediately after DoReserve()
  212. virtual size_t GetAllocatedSize() const = 0;
  213. private:
  214. static size_t CalcSliceSize(const TBlockArrayTree& tree) {
  215. if (tree.Payload.empty()) {
  216. return 0;
  217. }
  218. if (!tree.Children.empty()) {
  219. Y_ABORT_UNLESS(tree.Payload.size() == 1);
  220. size_t result = std::numeric_limits<size_t>::max();
  221. for (auto& child : tree.Children) {
  222. size_t childSize = CalcSliceSize(*child);
  223. result = std::min(result, childSize);
  224. }
  225. Y_ABORT_UNLESS(result <= size_t(tree.Payload.front()->length));
  226. return result;
  227. }
  228. int64_t result = tree.Payload.front()->length;
  229. Y_ABORT_UNLESS(result > 0);
  230. return static_cast<size_t>(result);
  231. }
  232. static std::shared_ptr<arrow::ArrayData> Slice(TBlockArrayTree& tree, size_t size) {
  233. Y_ABORT_UNLESS(size > 0);
  234. Y_ABORT_UNLESS(!tree.Payload.empty());
  235. auto& main = tree.Payload.front();
  236. std::shared_ptr<arrow::ArrayData> sliced;
  237. if (size == size_t(main->length)) {
  238. sliced = main;
  239. tree.Payload.pop_front();
  240. } else {
  241. Y_ABORT_UNLESS(size < size_t(main->length));
  242. sliced = Chop(main, size);
  243. }
  244. if (!tree.Children.empty()) {
  245. std::vector<std::shared_ptr<arrow::ArrayData>> children;
  246. for (auto& child : tree.Children) {
  247. children.push_back(Slice(*child, size));
  248. }
  249. sliced->child_data = std::move(children);
  250. if (tree.Payload.empty()) {
  251. tree.Children.clear();
  252. }
  253. }
  254. return sliced;
  255. }
  256. protected:
  257. size_t GetCurrLen() const {
  258. return CurrLen;
  259. }
  260. void SetCurrLen(size_t len) {
  261. Y_ABORT_UNLESS(len <= MaxLen);
  262. CurrLen = len;
  263. }
  264. void Reserve() {
  265. DoReserve();
  266. if (TotalAllocated_) {
  267. *TotalAllocated_ += GetAllocatedSize();
  268. }
  269. }
  270. void AddExtraAllocated(size_t bytes) {
  271. if (TotalAllocated_) {
  272. *TotalAllocated_ += bytes;
  273. }
  274. }
  275. const std::shared_ptr<arrow::DataType> ArrowType;
  276. arrow::MemoryPool* const Pool;
  277. const size_t MaxLen;
  278. const size_t MaxBlockSizeInBytes;
  279. private:
  280. size_t CurrLen = 0;
  281. size_t* TotalAllocated_ = nullptr;
  282. };
  283. template<typename TLayout, bool Nullable, typename TDerived>
  284. class TFixedSizeArrayBuilderBase : public TArrayBuilderBase {
  285. public:
  286. TFixedSizeArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated)
  287. : TArrayBuilderBase(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated)
  288. {
  289. Reserve();
  290. }
  291. TFixedSizeArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated)
  292. : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, totalAllocated)
  293. {
  294. Reserve();
  295. }
  296. void UnsafeReserve(size_t length) {
  297. SetCurrLen(length);
  298. }
  299. TLayout* MutableData() {
  300. return DataPtr;
  301. }
  302. ui8* MutableValidMask() {
  303. return NullPtr;
  304. }
  305. void DoAdd(NUdf::TUnboxedValuePod value) final {
  306. if constexpr (Nullable) {
  307. if (!value) {
  308. DoAddNull();
  309. return;
  310. }
  311. NullPtr[GetCurrLen()] = 1;
  312. }
  313. static_cast<TDerived*>(this)->DoAddNotNull(value);
  314. }
  315. void DoAdd(TBlockItem value) final {
  316. if constexpr (Nullable) {
  317. if (!value) {
  318. DoAddNull();
  319. return;
  320. }
  321. NullPtr[GetCurrLen()] = 1;
  322. }
  323. static_cast<TDerived*>(this)->DoAddNotNull(value);
  324. }
  325. void DoAddNull() {
  326. if constexpr (Nullable) {
  327. NullPtr[GetCurrLen()] = 0;
  328. PlaceItem(TLayout{});
  329. }
  330. }
  331. void DoAdd(TBlockItem value, size_t count) final {
  332. if constexpr (Nullable) {
  333. if (!value) {
  334. std::fill(NullPtr + GetCurrLen(), NullPtr + GetCurrLen() + count, 0);
  335. std::fill(DataPtr + GetCurrLen(), DataPtr + GetCurrLen() + count, TLayout{});
  336. return;
  337. }
  338. std::fill(NullPtr + GetCurrLen(), NullPtr + GetCurrLen() + count, 1);
  339. }
  340. static_cast<TDerived*>(this)->DoAddNotNull(value, count);
  341. }
  342. void DoAdd(TInputBuffer &input) final {
  343. if constexpr (Nullable) {
  344. if (!input.PopChar()) {
  345. DoAddNull();
  346. return;
  347. }
  348. }
  349. static_cast<TDerived*>(this)->DoAddNotNull(input);
  350. }
  351. void DoAddDefault() final {
  352. if constexpr (Nullable) {
  353. NullPtr[GetCurrLen()] = 1;
  354. }
  355. PlaceItem(TLayout{});
  356. }
  357. void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
  358. Y_ABORT_UNLESS(array.buffers.size() > 1);
  359. if constexpr (Nullable) {
  360. if (array.buffers.front()) {
  361. ui8* dstBitmap = NullPtr + GetCurrLen();
  362. CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
  363. } else {
  364. ui8* dstBitmap = NullPtr + GetCurrLen();
  365. std::fill_n(dstBitmap, popCount, 1);
  366. }
  367. }
  368. const TLayout* src = array.GetValues<TLayout>(1);
  369. TLayout* dst = DataPtr + GetCurrLen();
  370. CompressArray(src, sparseBitmap, dst, array.length);
  371. }
  372. void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
  373. Y_ABORT_UNLESS(array.buffers.size() > 1);
  374. if constexpr (Nullable) {
  375. for (size_t i = beginIndex; i < beginIndex + count; ++i) {
  376. NullPtr[GetCurrLen() + i - beginIndex] = !IsNull(array, i);
  377. }
  378. }
  379. const TLayout* values = array.GetValues<TLayout>(1);
  380. for (size_t i = beginIndex; i < beginIndex + count; ++i) {
  381. ::new(DataPtr + GetCurrLen() + i - beginIndex) TLayout(values[i]);
  382. }
  383. }
  384. void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
  385. Y_ABORT_UNLESS(array.buffers.size() > 1);
  386. if constexpr (Nullable) {
  387. for (size_t i = 0; i < count; ++i) {
  388. NullPtr[GetCurrLen() + i] = !IsNull(array, indexes[i]);
  389. }
  390. }
  391. const TLayout* values = array.GetValues<TLayout>(1);
  392. for (size_t i = 0; i < count; ++i) {
  393. ::new(DataPtr + GetCurrLen() + i) TLayout(values[indexes[i]]);
  394. }
  395. }
  396. TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
  397. const size_t len = GetCurrLen();
  398. std::shared_ptr<arrow::Buffer> nulls;
  399. if constexpr (Nullable) {
  400. NullBuilder->UnsafeAdvance(len);
  401. nulls = NullBuilder->Finish();
  402. nulls = MakeDenseBitmap(nulls->data(), len, Pool);
  403. }
  404. DataBuilder->UnsafeAdvance(len);
  405. std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish();
  406. TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
  407. result->Payload.push_back(arrow::ArrayData::Make(ArrowType, len, {nulls, data}));
  408. NullBuilder.reset();
  409. DataBuilder.reset();
  410. if (!finish) {
  411. Reserve();
  412. }
  413. return result;
  414. }
  415. protected:
  416. void PlaceItem(TLayout&& value) {
  417. ::new(DataPtr + GetCurrLen()) TLayout(std::move(value));
  418. }
  419. TLayout* DataPtr = nullptr;
  420. private:
  421. void DoReserve() final {
  422. DataBuilder = std::make_unique<TTypedBufferBuilder<TLayout>>(Pool);
  423. DataBuilder->Reserve(MaxLen + 1);
  424. DataPtr = DataBuilder->MutableData();
  425. if constexpr (Nullable) {
  426. NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
  427. NullBuilder->Reserve(MaxLen + 1);
  428. NullPtr = NullBuilder->MutableData();
  429. }
  430. }
  431. size_t GetAllocatedSize() const final {
  432. Y_ENSURE(DataBuilder);
  433. size_t result = DataBuilder->Capacity();
  434. if constexpr (Nullable) {
  435. Y_ENSURE(NullBuilder);
  436. result += NullBuilder->Capacity();
  437. }
  438. return result;
  439. }
  440. std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
  441. std::unique_ptr<TTypedBufferBuilder<TLayout>> DataBuilder;
  442. ui8* NullPtr = nullptr;
  443. };
  444. template<typename TLayout, bool Nullable>
  445. class TFixedSizeArrayBuilder final: public TFixedSizeArrayBuilderBase<TLayout, Nullable, TFixedSizeArrayBuilder<TLayout, Nullable>> {
  446. using TSelf = TFixedSizeArrayBuilder<TLayout, Nullable>;
  447. using TBase = TFixedSizeArrayBuilderBase<TLayout, Nullable, TSelf>;
  448. public:
  449. TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
  450. : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated)
  451. {}
  452. TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
  453. : TBase(typeInfoHelper, type, pool, maxLen, totalAllocated)
  454. {}
  455. void DoAddNotNull(TUnboxedValuePod value) {
  456. this->PlaceItem(value.Get<TLayout>());
  457. }
  458. void DoAddNotNull(TBlockItem value) {
  459. this->PlaceItem(value.Get<TLayout>());
  460. }
  461. void DoAddNotNull(TInputBuffer& input) {
  462. this->DoAdd(TBlockItem(input.PopNumber<TLayout>()));
  463. }
  464. void DoAddNotNull(TBlockItem value, size_t count) {
  465. std::fill(this->DataPtr + this->GetCurrLen(), this->DataPtr + this->GetCurrLen() + count, value.Get<TLayout>());
  466. }
  467. };
  468. template<bool Nullable>
  469. class TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable> final: public TFixedSizeArrayBuilderBase<NYql::NDecimal::TInt128, Nullable, TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>> {
  470. using TSelf = TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>;
  471. using TBase = TFixedSizeArrayBuilderBase<NYql::NDecimal::TInt128, Nullable, TSelf>;
  472. public:
  473. TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
  474. : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated)
  475. {}
  476. TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
  477. : TBase(typeInfoHelper, type, pool, maxLen, totalAllocated)
  478. {}
  479. void DoAddNotNull(TUnboxedValuePod value) {
  480. this->PlaceItem(value.GetInt128());
  481. }
  482. void DoAddNotNull(TBlockItem value) {
  483. this->PlaceItem(value.GetInt128());
  484. }
  485. void DoAddNotNull(TInputBuffer& input) {
  486. this->DoAdd(TBlockItem(input.PopNumber<NYql::NDecimal::TInt128>()));
  487. }
  488. void DoAddNotNull(TBlockItem value, size_t count) {
  489. std::fill(this->DataPtr + this->GetCurrLen(), this->DataPtr + this->GetCurrLen() + count, value.GetInt128());
  490. }
  491. };
  492. template<bool Nullable>
  493. class TResourceArrayBuilder final: public TFixedSizeArrayBuilderBase<TUnboxedValue, Nullable, TResourceArrayBuilder<Nullable>> {
  494. public:
  495. TResourceArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
  496. : TFixedSizeArrayBuilderBase<TUnboxedValue, Nullable, TResourceArrayBuilder<Nullable>>(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated)
  497. {}
  498. TResourceArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
  499. : TFixedSizeArrayBuilderBase<TUnboxedValue, Nullable, TResourceArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated)
  500. {}
  501. void DoAddNotNull(TUnboxedValuePod value) {
  502. this->PlaceItem(TUnboxedValue(value));
  503. }
  504. TUnboxedValue FromBlockItem(TBlockItem item) {
  505. TUnboxedValue val;
  506. std::memcpy(val.GetRawPtr(), item.GetRawPtr(), sizeof(val));
  507. val.Ref();
  508. return val;
  509. }
  510. void DoAddNotNull(TBlockItem item) {
  511. this->PlaceItem(FromBlockItem(item));
  512. }
  513. void DoAddNotNull(TInputBuffer& input) {
  514. this->DoAdd(input.PopNumber<TUnboxedValuePod>());
  515. }
  516. void DoAddNotNull(TBlockItem item, size_t count) {
  517. for (size_t i = 0; i < count; ++i) {
  518. ::new(this->DataPtr + this->GetCurrLen() + i) TUnboxedValue(FromBlockItem(item));
  519. }
  520. }
  521. };
  522. template<typename TStringType, bool Nullable, EPgStringType PgString = EPgStringType::None>
  523. class TStringArrayBuilder final : public TArrayBuilderBase {
  524. public:
  525. using TOffset = typename TStringType::offset_type;
  526. TStringArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
  527. : TArrayBuilderBase(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated)
  528. {
  529. Reserve();
  530. }
  531. TStringArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
  532. : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, totalAllocated)
  533. {
  534. Reserve();
  535. }
  536. void SetPgBuilder(const NUdf::IPgBuilder* pgBuilder, i32 typeLen) {
  537. Y_ENSURE(PgString != EPgStringType::None);
  538. PgBuilder = pgBuilder;
  539. TypeLen = typeLen;
  540. }
  541. void DoAdd(NUdf::TUnboxedValuePod value) final {
  542. if constexpr (Nullable) {
  543. if (!value) {
  544. return DoAdd(TBlockItem{});
  545. }
  546. }
  547. if constexpr (PgString == EPgStringType::CString) {
  548. static_assert(Nullable);
  549. auto buf = PgBuilder->AsCStringBuffer(value);
  550. AddPgItem(buf);
  551. } else if constexpr (PgString == EPgStringType::Text) {
  552. static_assert(Nullable);
  553. auto buf = PgBuilder->AsTextBuffer(value);
  554. AddPgItem(buf);
  555. } else if constexpr (PgString == EPgStringType::Fixed) {
  556. static_assert(Nullable);
  557. auto buf = PgBuilder->AsFixedStringBuffer(value, TypeLen);
  558. AddPgItem(buf);
  559. } else {
  560. DoAdd(TBlockItem(value.AsStringRef()));
  561. }
  562. }
  563. template <bool AddCStringZero = false, ui32 AddVarHdr = 0>
  564. ui8* AddPgItem(TStringRef buf) {
  565. auto alignedSize = AlignUp(buf.Size() + sizeof(void*) + AddVarHdr + (AddCStringZero ? 1 : 0), sizeof(void*));
  566. auto ptr = AddNoFill(alignedSize);
  567. *(void**)ptr = nullptr;
  568. if (alignedSize > sizeof(void*)) {
  569. // clear padding too
  570. *(void**)(ptr + alignedSize - sizeof(void*)) = nullptr;
  571. }
  572. std::memcpy(ptr + sizeof(void*) + AddVarHdr, buf.Data(), buf.Size());
  573. if constexpr (AddCStringZero) {
  574. ptr[sizeof(void*) + buf.Size()] = 0;
  575. }
  576. return ptr;
  577. }
  578. ui8* AddNoFill(size_t size) {
  579. size_t currentLen = DataBuilder->Length();
  580. // empty string can always be appended
  581. if (size > 0 && currentLen + size > MaxBlockSizeInBytes) {
  582. if (currentLen) {
  583. FlushChunk(false);
  584. }
  585. if (size > MaxBlockSizeInBytes) {
  586. ReserveForLargeString(size);
  587. }
  588. }
  589. AppendCurrentOffset();
  590. auto ret = DataBuilder->End();
  591. DataBuilder->UnsafeAdvance(size);
  592. if constexpr (Nullable) {
  593. NullBuilder->UnsafeAppend(1);
  594. }
  595. return ret;
  596. }
  597. void DoAdd(TBlockItem value) final {
  598. if constexpr (Nullable) {
  599. if (!value) {
  600. NullBuilder->UnsafeAppend(0);
  601. AppendCurrentOffset();
  602. return;
  603. }
  604. }
  605. const std::string_view str = value.AsStringRef();
  606. auto ptr = AddNoFill(str.size());
  607. std::memcpy(ptr, str.data(), str.size());
  608. }
  609. void DoAdd(TInputBuffer& input) final {
  610. if constexpr (Nullable) {
  611. if (!input.PopChar()) {
  612. return DoAdd(TBlockItem{});
  613. }
  614. }
  615. auto str = input.PopString();
  616. TStringRef ref(str.data(), str.size());
  617. DoAdd(TBlockItem(ref));
  618. }
  619. void DoAddDefault() final {
  620. if constexpr (Nullable) {
  621. NullBuilder->UnsafeAppend(1);
  622. }
  623. AppendCurrentOffset();
  624. }
  625. void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
  626. Y_UNUSED(popCount);
  627. Y_ABORT_UNLESS(array.buffers.size() > 2);
  628. Y_ABORT_UNLESS(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
  629. const ui8* srcNulls = array.GetValues<ui8>(0, 0);
  630. const TOffset* srcOffset = array.GetValues<TOffset>(1);
  631. const ui8* srcData = array.GetValues<ui8>(2, 0);
  632. const ui8* chunkStart = srcData;
  633. const ui8* chunkEnd = chunkStart;
  634. size_t dataLen = DataBuilder->Length();
  635. ui8* dstNulls = Nullable ? NullBuilder->End() : nullptr;
  636. TOffset* dstOffset = OffsetsBuilder->End();
  637. size_t countAdded = 0;
  638. for (size_t i = 0; i < size_t(array.length); i++) {
  639. if (!sparseBitmap[i]) {
  640. continue;
  641. }
  642. const ui8* begin = srcData + srcOffset[i];
  643. const ui8* end = srcData + srcOffset[i + 1];
  644. const size_t strSize = end - begin;
  645. size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
  646. for (;;) {
  647. // try to append ith string
  648. if (strSize <= availBytes) {
  649. if (begin == chunkEnd) {
  650. chunkEnd = end;
  651. } else {
  652. DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
  653. chunkStart = begin;
  654. chunkEnd = end;
  655. }
  656. size_t nullOffset = i + array.offset;
  657. if constexpr (Nullable) {
  658. *dstNulls++ = srcNulls ? ((srcNulls[nullOffset >> 3] >> (nullOffset & 7)) & 1) : 1u;
  659. }
  660. *dstOffset++ = dataLen;
  661. dataLen += strSize;
  662. ++countAdded;
  663. break;
  664. }
  665. if (dataLen) {
  666. if (chunkStart != chunkEnd) {
  667. DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
  668. chunkStart = chunkEnd = srcData;
  669. }
  670. Y_ABORT_UNLESS(dataLen == DataBuilder->Length());
  671. OffsetsBuilder->UnsafeAdvance(countAdded);
  672. if constexpr (Nullable) {
  673. NullBuilder->UnsafeAdvance(countAdded);
  674. }
  675. FlushChunk(false);
  676. dataLen = 0;
  677. countAdded = 0;
  678. if constexpr (Nullable) {
  679. dstNulls = NullBuilder->End();
  680. }
  681. dstOffset = OffsetsBuilder->End();
  682. } else {
  683. ReserveForLargeString(strSize);
  684. availBytes = strSize;
  685. }
  686. }
  687. }
  688. if (chunkStart != chunkEnd) {
  689. DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
  690. }
  691. Y_ABORT_UNLESS(dataLen == DataBuilder->Length());
  692. OffsetsBuilder->UnsafeAdvance(countAdded);
  693. if constexpr (Nullable) {
  694. NullBuilder->UnsafeAdvance(countAdded);
  695. }
  696. }
  697. void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
  698. Y_ABORT_UNLESS(array.buffers.size() > 2);
  699. Y_ABORT_UNLESS(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
  700. size_t dataLen = DataBuilder->Length();
  701. const TOffset* offsets = array.GetValues<TOffset>(1);
  702. const ui8* srcData = array.GetValues<ui8>(2, 0);
  703. const ui8* chunkStart = srcData + offsets[beginIndex];
  704. const ui8* chunkEnd = chunkStart;
  705. for (size_t i = beginIndex; i < beginIndex + count; ++i) {
  706. const ui8* begin = srcData + offsets[i];
  707. const ui8* end = srcData + offsets[i + 1];
  708. const size_t strSize = end - begin;
  709. size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
  710. for (;;) {
  711. if (strSize <= availBytes) {
  712. if constexpr (Nullable) {
  713. NullBuilder->UnsafeAppend(!IsNull(array, i));
  714. }
  715. OffsetsBuilder->UnsafeAppend(TOffset(dataLen));
  716. chunkEnd = end;
  717. dataLen += strSize;
  718. break;
  719. }
  720. if (dataLen) {
  721. DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
  722. chunkStart = begin;
  723. chunkEnd = end;
  724. FlushChunk(false);
  725. dataLen = 0;
  726. } else {
  727. ReserveForLargeString(strSize);
  728. availBytes = strSize;
  729. }
  730. }
  731. }
  732. if (chunkStart != chunkEnd) {
  733. DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
  734. }
  735. }
  736. void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
  737. Y_ABORT_UNLESS(array.buffers.size() > 2);
  738. Y_ABORT_UNLESS(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
  739. size_t dataLen = DataBuilder->Length();
  740. const TOffset* offsets = array.GetValues<TOffset>(1);
  741. const char* strData = array.GetValues<char>(2, 0);
  742. for (size_t i = 0; i < count; ++i) {
  743. ui64 idx = indexes[i];
  744. std::string_view str(strData + offsets[idx], offsets[idx + 1] - offsets[idx]);
  745. size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
  746. for (;;) {
  747. if (str.size() <= availBytes) {
  748. if constexpr (Nullable) {
  749. NullBuilder->UnsafeAppend(!IsNull(array, idx));
  750. }
  751. OffsetsBuilder->UnsafeAppend(TOffset(dataLen));
  752. DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size());
  753. dataLen += str.size();
  754. break;
  755. }
  756. if (dataLen) {
  757. FlushChunk(false);
  758. dataLen = 0;
  759. } else {
  760. ReserveForLargeString(str.size());
  761. availBytes = str.size();
  762. }
  763. }
  764. }
  765. }
  766. TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
  767. FlushChunk(finish);
  768. TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
  769. result->Payload = std::move(Chunks);
  770. Chunks.clear();
  771. return result;
  772. }
  773. private:
  774. void DoReserve() final {
  775. if constexpr (Nullable) {
  776. NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
  777. NullBuilder->Reserve(MaxLen + 1);
  778. }
  779. OffsetsBuilder = std::make_unique<TTypedBufferBuilder<TOffset>>(Pool);
  780. OffsetsBuilder->Reserve(MaxLen + 1);
  781. DataBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
  782. DataBuilder->Reserve(MaxBlockSizeInBytes);
  783. }
  784. size_t GetAllocatedSize() const final {
  785. Y_ENSURE(DataBuilder && OffsetsBuilder);
  786. size_t result = DataBuilder->Capacity() + OffsetsBuilder->Capacity();
  787. if constexpr (Nullable) {
  788. Y_ENSURE(NullBuilder);
  789. result += NullBuilder->Capacity();
  790. }
  791. return result;
  792. }
  793. void ReserveForLargeString(size_t strSize) {
  794. size_t before = DataBuilder->Capacity();
  795. DataBuilder->Reserve(strSize);
  796. size_t after = DataBuilder->Capacity();
  797. Y_ENSURE(before <= after);
  798. AddExtraAllocated(after - before);
  799. }
  800. void AppendCurrentOffset() {
  801. OffsetsBuilder->UnsafeAppend(DataBuilder->Length());
  802. }
  803. void FlushChunk(bool finish) {
  804. const auto length = OffsetsBuilder->Length();
  805. Y_ABORT_UNLESS(length > 0);
  806. AppendCurrentOffset();
  807. std::shared_ptr<arrow::Buffer> nullBitmap;
  808. if constexpr (Nullable) {
  809. nullBitmap = NullBuilder->Finish();
  810. nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
  811. }
  812. std::shared_ptr<arrow::Buffer> offsets = OffsetsBuilder->Finish();
  813. std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish();
  814. Chunks.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap, offsets, data }));
  815. if (!finish) {
  816. Reserve();
  817. }
  818. }
  819. std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
  820. std::unique_ptr<TTypedBufferBuilder<TOffset>> OffsetsBuilder;
  821. std::unique_ptr<TTypedBufferBuilder<ui8>> DataBuilder;
  822. std::deque<std::shared_ptr<arrow::ArrayData>> Chunks;
  823. const IPgBuilder* PgBuilder = nullptr;
  824. i32 TypeLen = 0;
  825. };
  826. template<bool Nullable, typename TDerived>
  827. class TTupleArrayBuilderBase : public TArrayBuilderBase {
  828. public:
  829. TTupleArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
  830. : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, totalAllocated)
  831. {
  832. Reserve();
  833. }
  834. void DoAdd(NUdf::TUnboxedValuePod value) final {
  835. if constexpr (Nullable) {
  836. if (!value) {
  837. NullBuilder->UnsafeAppend(0);
  838. static_cast<TDerived*>(this)->AddToChildrenDefault();
  839. return;
  840. }
  841. NullBuilder->UnsafeAppend(1);
  842. }
  843. static_cast<TDerived*>(this)->AddToChildren(value);
  844. }
  845. void DoAdd(TBlockItem value) final {
  846. if constexpr (Nullable) {
  847. if (!value) {
  848. NullBuilder->UnsafeAppend(0);
  849. static_cast<TDerived*>(this)->AddToChildrenDefault();
  850. return;
  851. }
  852. NullBuilder->UnsafeAppend(1);
  853. }
  854. static_cast<TDerived*>(this)->AddToChildren(value);
  855. }
  856. void DoAdd(TInputBuffer& input) final {
  857. if constexpr (Nullable) {
  858. if (!input.PopChar()) {
  859. NullBuilder->UnsafeAppend(0);
  860. static_cast<TDerived*>(this)->AddToChildrenDefault();
  861. return;
  862. }
  863. NullBuilder->UnsafeAppend(1);
  864. }
  865. static_cast<TDerived*>(this)->AddToChildren(input);
  866. }
  867. void DoAddDefault() final {
  868. if constexpr (Nullable) {
  869. NullBuilder->UnsafeAppend(1);
  870. }
  871. static_cast<TDerived*>(this)->AddToChildrenDefault();
  872. }
  873. void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
  874. Y_ABORT_UNLESS(!array.buffers.empty());
  875. if constexpr (Nullable) {
  876. if (array.buffers.front()) {
  877. ui8* dstBitmap = NullBuilder->End();
  878. CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
  879. NullBuilder->UnsafeAdvance(popCount);
  880. } else {
  881. NullBuilder->UnsafeAppend(popCount, 1);
  882. }
  883. }
  884. static_cast<TDerived*>(this)->AddManyToChildren(array, sparseBitmap, popCount);
  885. }
  886. void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
  887. Y_ABORT_UNLESS(!array.buffers.empty());
  888. if constexpr (Nullable) {
  889. for (ui64 i = beginIndex; i < beginIndex + count; ++i) {
  890. NullBuilder->UnsafeAppend(!IsNull(array, i));
  891. }
  892. }
  893. static_cast<TDerived*>(this)->AddManyToChildren(array, beginIndex, count);
  894. }
  895. void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
  896. Y_ABORT_UNLESS(!array.buffers.empty());
  897. if constexpr (Nullable) {
  898. for (size_t i = 0; i < count; ++i) {
  899. NullBuilder->UnsafeAppend(!IsNull(array, indexes[i]));
  900. }
  901. }
  902. static_cast<TDerived*>(this)->AddManyToChildren(array, indexes, count);
  903. }
  904. TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
  905. TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
  906. std::shared_ptr<arrow::Buffer> nullBitmap;
  907. const size_t length = GetCurrLen();
  908. if constexpr (Nullable) {
  909. Y_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length");
  910. nullBitmap = NullBuilder->Finish();
  911. nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
  912. }
  913. Y_ABORT_UNLESS(length);
  914. result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap }));
  915. static_cast<TDerived*>(this)->BuildChildrenTree(finish, result->Children);
  916. if (!finish) {
  917. Reserve();
  918. }
  919. return result;
  920. }
  921. private:
  922. void DoReserve() final {
  923. if constexpr (Nullable) {
  924. NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
  925. NullBuilder->Reserve(MaxLen + 1);
  926. }
  927. }
  928. size_t GetAllocatedSize() const final {
  929. size_t result = 0;
  930. if constexpr (Nullable) {
  931. Y_ENSURE(NullBuilder);
  932. result += NullBuilder->Capacity();
  933. }
  934. return result;
  935. }
  936. private:
  937. std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
  938. };
  939. template<bool Nullable>
  940. class TTupleArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTupleArrayBuilder<Nullable>> {
  941. public:
  942. TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen,
  943. TVector<TArrayBuilderBase::Ptr>&& children, size_t* totalAllocated = nullptr)
  944. : TTupleArrayBuilderBase<Nullable, TTupleArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated)
  945. , Children_(std::move(children)) {}
  946. void AddToChildrenDefault() {
  947. for (ui32 i = 0; i < Children_.size(); ++i) {
  948. Children_[i]->AddDefault();
  949. }
  950. }
  951. void AddToChildren(NUdf::TUnboxedValuePod value) {
  952. auto elements = value.GetElements();
  953. if (elements) {
  954. for (ui32 i = 0; i < Children_.size(); ++i) {
  955. Children_[i]->Add(elements[i]);
  956. }
  957. } else {
  958. for (ui32 i = 0; i < Children_.size(); ++i) {
  959. auto element = value.GetElement(i);
  960. Children_[i]->Add(element);
  961. }
  962. }
  963. }
  964. void AddToChildren(TBlockItem value) {
  965. auto elements = value.AsTuple();
  966. for (ui32 i = 0; i < Children_.size(); ++i) {
  967. Children_[i]->Add(elements[i]);
  968. }
  969. }
  970. void AddToChildren(TInputBuffer& input) {
  971. for (ui32 i = 0; i < Children_.size(); ++i) {
  972. Children_[i]->Add(input);
  973. }
  974. }
  975. void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) {
  976. Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
  977. for (size_t i = 0; i < Children_.size(); ++i) {
  978. Children_[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length);
  979. }
  980. }
  981. void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
  982. Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
  983. for (size_t i = 0; i < Children_.size(); ++i) {
  984. Children_[i]->AddMany(*array.child_data[i], beginIndex, count);
  985. }
  986. }
  987. void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
  988. Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
  989. for (size_t i = 0; i < Children_.size(); ++i) {
  990. Children_[i]->AddMany(*array.child_data[i], indexes, count);
  991. }
  992. }
  993. void BuildChildrenTree(bool finish, std::vector<TArrayBuilderBase::TBlockArrayTree::Ptr>& resultChildren) {
  994. resultChildren.reserve(Children_.size());
  995. for (ui32 i = 0; i < Children_.size(); ++i) {
  996. resultChildren.emplace_back(Children_[i]->BuildTree(finish));
  997. }
  998. }
  999. private:
  1000. TVector<std::unique_ptr<TArrayBuilderBase>> Children_;
  1001. };
  1002. template<typename TDate, bool Nullable>
  1003. class TTzDateArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTzDateArrayBuilder<TDate, Nullable>> {
  1004. using TDateLayout = typename TDataType<TDate>::TLayout;
  1005. static constexpr auto DataSlot = TDataType<TDate>::Slot;
  1006. public:
  1007. TTzDateArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
  1008. : TTupleArrayBuilderBase<Nullable, TTzDateArrayBuilder<TDate, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated)
  1009. , DateBuilder_(typeInfoHelper, GetArrowType(typeInfoHelper, type), pool, maxLen)
  1010. , TimezoneBuilder_(typeInfoHelper, arrow::uint16(), pool, maxLen)
  1011. {
  1012. }
  1013. void AddToChildrenDefault() {
  1014. DateBuilder_.AddDefault();
  1015. TimezoneBuilder_.AddDefault();
  1016. }
  1017. void AddToChildren(NUdf::TUnboxedValuePod value) {
  1018. DateBuilder_.Add(value);
  1019. TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId()));
  1020. }
  1021. void AddToChildren(TBlockItem value) {
  1022. DateBuilder_.Add(value);
  1023. TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId()));
  1024. }
  1025. void AddToChildren(TInputBuffer& input) {
  1026. DateBuilder_.Add(input);
  1027. TimezoneBuilder_.Add(input);
  1028. }
  1029. void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) {
  1030. Y_ABORT_UNLESS(array.child_data.size() == 2);
  1031. DateBuilder_.AddMany(*array.child_data[0], popCount, sparseBitmap, array.length);
  1032. TimezoneBuilder_.AddMany(*array.child_data[1], popCount, sparseBitmap, array.length);
  1033. }
  1034. void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
  1035. Y_ABORT_UNLESS(array.child_data.size() == 2);
  1036. DateBuilder_.AddMany(*array.child_data[0], beginIndex, count);
  1037. TimezoneBuilder_.AddMany(*array.child_data[1], beginIndex, count);
  1038. }
  1039. void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
  1040. Y_ABORT_UNLESS(array.child_data.size() == 2);
  1041. DateBuilder_.AddMany(*array.child_data[0], indexes, count);
  1042. TimezoneBuilder_.AddMany(*array.child_data[1], indexes, count);
  1043. }
  1044. void BuildChildrenTree(bool finish, std::vector<TArrayBuilderBase::TBlockArrayTree::Ptr>& resultChildren) {
  1045. resultChildren.emplace_back(DateBuilder_.BuildTree(finish));
  1046. resultChildren.emplace_back(TimezoneBuilder_.BuildTree(finish));
  1047. }
  1048. private:
  1049. TFixedSizeArrayBuilder<TDateLayout, false> DateBuilder_;
  1050. TFixedSizeArrayBuilder<ui16, false> TimezoneBuilder_;
  1051. };
  1052. class TExternalOptionalArrayBuilder final : public TArrayBuilderBase {
  1053. public:
  1054. TExternalOptionalArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen,
  1055. std::unique_ptr<TArrayBuilderBase>&& inner, size_t* totalAllocated)
  1056. : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, totalAllocated)
  1057. , Inner(std::move(inner))
  1058. {
  1059. Reserve();
  1060. }
  1061. void DoAdd(NUdf::TUnboxedValuePod value) final {
  1062. if (!value) {
  1063. NullBuilder->UnsafeAppend(0);
  1064. Inner->AddDefault();
  1065. return;
  1066. }
  1067. NullBuilder->UnsafeAppend(1);
  1068. Inner->Add(value.GetOptionalValue());
  1069. }
  1070. void DoAdd(TBlockItem value) final {
  1071. if (!value) {
  1072. NullBuilder->UnsafeAppend(0);
  1073. Inner->AddDefault();
  1074. return;
  1075. }
  1076. NullBuilder->UnsafeAppend(1);
  1077. Inner->Add(value.GetOptionalValue());
  1078. }
  1079. void DoAdd(TInputBuffer& input) final {
  1080. if (!input.PopChar()) {
  1081. NullBuilder->UnsafeAppend(0);
  1082. Inner->AddDefault();
  1083. return;
  1084. }
  1085. NullBuilder->UnsafeAppend(1);
  1086. Inner->Add(input);
  1087. }
  1088. void DoAddDefault() final {
  1089. NullBuilder->UnsafeAppend(1);
  1090. Inner->AddDefault();
  1091. }
  1092. void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
  1093. Y_ABORT_UNLESS(!array.buffers.empty());
  1094. Y_ABORT_UNLESS(array.child_data.size() == 1);
  1095. if (array.buffers.front()) {
  1096. ui8* dstBitmap = NullBuilder->End();
  1097. CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
  1098. NullBuilder->UnsafeAdvance(popCount);
  1099. } else {
  1100. NullBuilder->UnsafeAppend(popCount, 1);
  1101. }
  1102. Inner->AddMany(*array.child_data[0], popCount, sparseBitmap, array.length);
  1103. }
  1104. void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
  1105. Y_ABORT_UNLESS(!array.buffers.empty());
  1106. Y_ABORT_UNLESS(array.child_data.size() == 1);
  1107. for (ui64 i = beginIndex; i < beginIndex + count; ++i) {
  1108. NullBuilder->UnsafeAppend(!IsNull(array, i));
  1109. }
  1110. Inner->AddMany(*array.child_data[0], beginIndex, count);
  1111. }
  1112. void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
  1113. Y_ABORT_UNLESS(!array.buffers.empty());
  1114. Y_ABORT_UNLESS(array.child_data.size() == 1);
  1115. for (size_t i = 0; i < count; ++i) {
  1116. NullBuilder->UnsafeAppend(!IsNull(array, indexes[i]));
  1117. }
  1118. Inner->AddMany(*array.child_data[0], indexes, count);
  1119. }
  1120. TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
  1121. TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
  1122. std::shared_ptr<arrow::Buffer> nullBitmap;
  1123. const size_t length = GetCurrLen();
  1124. Y_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length");
  1125. nullBitmap = NullBuilder->Finish();
  1126. nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
  1127. Y_ABORT_UNLESS(length);
  1128. result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap }));
  1129. result->Children.emplace_back(Inner->BuildTree(finish));
  1130. if (!finish) {
  1131. Reserve();
  1132. }
  1133. return result;
  1134. }
  1135. private:
  1136. void DoReserve() final {
  1137. NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
  1138. NullBuilder->Reserve(MaxLen + 1);
  1139. }
  1140. size_t GetAllocatedSize() const final {
  1141. Y_ENSURE(NullBuilder);
  1142. return NullBuilder->Capacity();
  1143. }
  1144. private:
  1145. std::unique_ptr<TArrayBuilderBase> Inner;
  1146. std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
  1147. };
  1148. std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderBase(
  1149. const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
  1150. size_t maxBlockLength, const IPgBuilder* pgBuilder, size_t* totalAllocated);
  1151. template<bool Nullable>
  1152. inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderImpl(
  1153. const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
  1154. size_t maxLen, const IPgBuilder* pgBuilder, size_t* totalAllocated)
  1155. {
  1156. if constexpr (Nullable) {
  1157. TOptionalTypeInspector typeOpt(typeInfoHelper, type);
  1158. type = typeOpt.GetItemType();
  1159. }
  1160. TStructTypeInspector typeStruct(typeInfoHelper, type);
  1161. if (typeStruct) {
  1162. TVector<std::unique_ptr<TArrayBuilderBase>> members;
  1163. for (ui32 i = 0; i < typeStruct.GetMembersCount(); i++) {
  1164. const TType* memberType = typeStruct.GetMemberType(i);
  1165. auto memberBuilder = MakeArrayBuilderBase(typeInfoHelper, memberType, pool, maxLen, pgBuilder, totalAllocated);
  1166. members.push_back(std::move(memberBuilder));
  1167. }
  1168. // XXX: Use Tuple array builder for Struct.
  1169. return std::make_unique<TTupleArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, std::move(members), totalAllocated);
  1170. }
  1171. TTupleTypeInspector typeTuple(typeInfoHelper, type);
  1172. if (typeTuple) {
  1173. TVector<std::unique_ptr<TArrayBuilderBase>> children;
  1174. for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) {
  1175. const TType* childType = typeTuple.GetElementType(i);
  1176. auto childBuilder = MakeArrayBuilderBase(typeInfoHelper, childType, pool, maxLen, pgBuilder, totalAllocated);
  1177. children.push_back(std::move(childBuilder));
  1178. }
  1179. return std::make_unique<TTupleArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, std::move(children), totalAllocated);
  1180. }
  1181. TDataTypeInspector typeData(typeInfoHelper, type);
  1182. if (typeData) {
  1183. auto typeId = typeData.GetTypeId();
  1184. switch (GetDataSlot(typeId)) {
  1185. case NUdf::EDataSlot::Int8:
  1186. return std::make_unique<TFixedSizeArrayBuilder<i8, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1187. case NUdf::EDataSlot::Uint8:
  1188. case NUdf::EDataSlot::Bool:
  1189. return std::make_unique<TFixedSizeArrayBuilder<ui8, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1190. case NUdf::EDataSlot::Int16:
  1191. return std::make_unique<TFixedSizeArrayBuilder<i16, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1192. case NUdf::EDataSlot::Uint16:
  1193. case NUdf::EDataSlot::Date:
  1194. return std::make_unique<TFixedSizeArrayBuilder<ui16, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1195. case NUdf::EDataSlot::Int32:
  1196. case NUdf::EDataSlot::Date32:
  1197. return std::make_unique<TFixedSizeArrayBuilder<i32, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1198. case NUdf::EDataSlot::Uint32:
  1199. case NUdf::EDataSlot::Datetime:
  1200. return std::make_unique<TFixedSizeArrayBuilder<ui32, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1201. case NUdf::EDataSlot::Int64:
  1202. case NUdf::EDataSlot::Interval:
  1203. case NUdf::EDataSlot::Interval64:
  1204. case NUdf::EDataSlot::Datetime64:
  1205. case NUdf::EDataSlot::Timestamp64:
  1206. return std::make_unique<TFixedSizeArrayBuilder<i64, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1207. case NUdf::EDataSlot::Uint64:
  1208. case NUdf::EDataSlot::Timestamp:
  1209. return std::make_unique<TFixedSizeArrayBuilder<ui64, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1210. case NUdf::EDataSlot::Float:
  1211. return std::make_unique<TFixedSizeArrayBuilder<float, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1212. case NUdf::EDataSlot::Double:
  1213. return std::make_unique<TFixedSizeArrayBuilder<double, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1214. case NUdf::EDataSlot::String:
  1215. case NUdf::EDataSlot::Yson:
  1216. case NUdf::EDataSlot::JsonDocument:
  1217. return std::make_unique<TStringArrayBuilder<arrow::BinaryType, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1218. case NUdf::EDataSlot::Utf8:
  1219. case NUdf::EDataSlot::Json:
  1220. return std::make_unique<TStringArrayBuilder<arrow::StringType, Nullable>>(typeInfoHelper, type, pool, maxLen);
  1221. case NUdf::EDataSlot::TzDate:
  1222. return std::make_unique<TTzDateArrayBuilder<TTzDate, Nullable>>(typeInfoHelper, type, pool, maxLen);
  1223. case NUdf::EDataSlot::TzDatetime:
  1224. return std::make_unique<TTzDateArrayBuilder<TTzDatetime, Nullable>>(typeInfoHelper, type, pool, maxLen);
  1225. case NUdf::EDataSlot::TzTimestamp:
  1226. return std::make_unique<TTzDateArrayBuilder<TTzTimestamp, Nullable>>(typeInfoHelper, type, pool, maxLen);
  1227. case NUdf::EDataSlot::TzDate32:
  1228. return std::make_unique<TTzDateArrayBuilder<TTzDate32, Nullable>>(typeInfoHelper, type, pool, maxLen);
  1229. case NUdf::EDataSlot::TzDatetime64:
  1230. return std::make_unique<TTzDateArrayBuilder<TTzDatetime64, Nullable>>(typeInfoHelper, type, pool, maxLen);
  1231. case NUdf::EDataSlot::TzTimestamp64:
  1232. return std::make_unique<TTzDateArrayBuilder<TTzTimestamp64, Nullable>>(typeInfoHelper, type, pool, maxLen);
  1233. case NUdf::EDataSlot::Decimal:
  1234. return std::make_unique<TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1235. default:
  1236. Y_ENSURE(false, "Unsupported data slot");
  1237. }
  1238. }
  1239. TResourceTypeInspector resource(typeInfoHelper, type);
  1240. if (resource) {
  1241. return std::make_unique<TResourceArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1242. }
  1243. TPgTypeInspector typePg(typeInfoHelper, type);
  1244. if (typePg) {
  1245. auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId());
  1246. if (desc->PassByValue) {
  1247. return std::make_unique<TFixedSizeArrayBuilder<ui64, true>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1248. } else {
  1249. if (desc->Typelen == -1) {
  1250. auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Text>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1251. ret->SetPgBuilder(pgBuilder, desc->Typelen);
  1252. return ret;
  1253. } else if (desc->Typelen == -2) {
  1254. auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::CString>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1255. ret->SetPgBuilder(pgBuilder, desc->Typelen);
  1256. return ret;
  1257. } else {
  1258. auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Fixed>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
  1259. ret->SetPgBuilder(pgBuilder, desc->Typelen);
  1260. return ret;
  1261. }
  1262. }
  1263. }
  1264. Y_ENSURE(false, "Unsupported type");
  1265. }
  1266. inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderBase(
  1267. const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
  1268. size_t maxBlockLength, const IPgBuilder* pgBuilder, size_t* totalAllocated) {
  1269. const TType* unpacked = type;
  1270. TOptionalTypeInspector typeOpt(typeInfoHelper, type);
  1271. if (typeOpt) {
  1272. unpacked = typeOpt.GetItemType();
  1273. }
  1274. TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked);
  1275. TPgTypeInspector unpackedPg(typeInfoHelper, unpacked);
  1276. if (unpackedOpt || typeOpt && unpackedPg) {
  1277. // at least 2 levels of optionals
  1278. ui32 nestLevel = 0;
  1279. auto currentType = type;
  1280. auto previousType = type;
  1281. TVector<const TType*> types;
  1282. for (;;) {
  1283. ++nestLevel;
  1284. previousType = currentType;
  1285. types.push_back(currentType);
  1286. TOptionalTypeInspector currentOpt(typeInfoHelper, currentType);
  1287. currentType = currentOpt.GetItemType();
  1288. TOptionalTypeInspector nexOpt(typeInfoHelper, currentType);
  1289. if (!nexOpt) {
  1290. break;
  1291. }
  1292. }
  1293. if (TPgTypeInspector(typeInfoHelper, currentType)) {
  1294. previousType = currentType;
  1295. ++nestLevel;
  1296. }
  1297. auto builder = MakeArrayBuilderBase(typeInfoHelper, previousType, pool, maxBlockLength, pgBuilder, totalAllocated);
  1298. for (ui32 i = 1; i < nestLevel; ++i) {
  1299. builder = std::make_unique<TExternalOptionalArrayBuilder>(typeInfoHelper, types[nestLevel - 1 - i], pool, maxBlockLength, std::move(builder), totalAllocated);
  1300. }
  1301. return builder;
  1302. } else {
  1303. if (typeOpt) {
  1304. return MakeArrayBuilderImpl<true>(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, totalAllocated);
  1305. } else {
  1306. return MakeArrayBuilderImpl<false>(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, totalAllocated);
  1307. }
  1308. }
  1309. }
  1310. inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
  1311. const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
  1312. size_t maxBlockLength, const IPgBuilder* pgBuilder, size_t* totalAllocated = nullptr) {
  1313. return MakeArrayBuilderBase(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, totalAllocated);
  1314. }
  1315. inline std::unique_ptr<IScalarBuilder> MakeScalarBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
  1316. Y_UNUSED(typeInfoHelper);
  1317. Y_UNUSED(type);
  1318. Y_ENSURE(false);
  1319. return nullptr;
  1320. }
  1321. }
  1322. }