block_builder.h 51 KB


  1. #pragma once
  2. #include "util.h"
  3. #include "bit_util.h"
  4. #include "block_io_buffer.h"
  5. #include "block_item.h"
  6. #include "dispatch_traits.h"
  7. #include <yql/essentials/public/udf/udf_value.h>
  8. #include <yql/essentials/public/udf/udf_value_builder.h>
  9. #include <yql/essentials/public/udf/udf_type_inspection.h>
  10. #include <arrow/datum.h>
  11. #include <arrow/c/bridge.h>
  12. #include <deque>
  13. namespace NYql {
  14. namespace NUdf {
  15. class IArrayBuilder {
  16. public:
  17. struct TArrayDataItem {
  18. const arrow::ArrayData* Data = nullptr;
  19. ui64 StartOffset;
  20. };
  21. virtual ~IArrayBuilder() = default;
  22. virtual size_t MaxLength() const = 0;
  23. virtual void Add(NUdf::TUnboxedValuePod value) = 0;
  24. virtual void Add(TBlockItem value) = 0;
  25. virtual void Add(TBlockItem value, size_t count) = 0;
  26. virtual void Add(TInputBuffer& input) = 0;
  27. virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0;
  28. virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) = 0;
  29. virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, const ui64* indexes, size_t count) = 0;
  30. virtual arrow::Datum Build(bool finish) = 0;
  31. };
  32. inline const IArrayBuilder::TArrayDataItem* LookupArrayDataItem(const IArrayBuilder::TArrayDataItem* arrays, size_t arrayCount, ui64& idx) {
  33. IArrayBuilder::TArrayDataItem lookup{ nullptr, idx };
  34. auto it = std::lower_bound(arrays, arrays + arrayCount, lookup, [](const auto& left, const auto& right) {
  35. return left.StartOffset < right.StartOffset;
  36. });
  37. if (it == arrays + arrayCount || it->StartOffset > idx) {
  38. --it;
  39. }
  40. Y_DEBUG_ABORT_UNLESS(it->StartOffset <= idx);
  41. idx -= it->StartOffset;
  42. return it;
  43. }
  44. class IScalarBuilder {
  45. public:
  46. virtual ~IScalarBuilder() = default;
  47. virtual arrow::Datum Build(TBlockItem value) const = 0;
  48. virtual arrow::Datum Build(NUdf::TUnboxedValuePod value) const = 0;
  49. };
  50. inline std::shared_ptr<arrow::DataType> GetArrowType(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
  51. auto arrowTypeHandle = typeInfoHelper.MakeArrowType(type);
  52. Y_ENSURE(arrowTypeHandle);
  53. ArrowSchema s;
  54. arrowTypeHandle->Export(&s);
  55. return ARROW_RESULT(arrow::ImportType(&s));
  56. }
  57. class TArrayBuilderBase : public IArrayBuilder {
  58. using Self = TArrayBuilderBase;
  59. public:
  60. using Ptr = std::unique_ptr<TArrayBuilderBase>;
  61. struct TBlockArrayTree {
  62. using Ptr = std::shared_ptr<TBlockArrayTree>;
  63. std::deque<std::shared_ptr<arrow::ArrayData>> Payload;
  64. std::vector<TBlockArrayTree::Ptr> Children;
  65. };
  66. struct TParams {
  67. size_t* TotalAllocated = nullptr;
  68. TMaybe<ui8> MinFillPercentage; // if an internal buffer size is smaller than % of capacity, then shrink the buffer.
  69. };
  70. TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params)
  71. : ArrowType(std::move(arrowType))
  72. , Pool(&pool)
  73. , MaxLen(maxLen)
  74. , MaxBlockSizeInBytes(typeInfoHelper.GetMaxBlockBytes())
  75. , MinFillPercentage(params.MinFillPercentage)
  76. , TotalAllocated_(params.TotalAllocated)
  77. {
  78. Y_ABORT_UNLESS(ArrowType);
  79. Y_ABORT_UNLESS(maxLen > 0);
  80. }
  81. TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params)
  82. : TArrayBuilderBase(typeInfoHelper, GetArrowType(typeInfoHelper, type), pool, maxLen, params)
  83. {
  84. }
  85. size_t MaxLength() const final {
  86. return MaxLen;
  87. }
  88. void Add(NUdf::TUnboxedValuePod value) final {
  89. Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
  90. DoAdd(value);
  91. CurrLen++;
  92. }
  93. void Add(TBlockItem value) final {
  94. Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
  95. DoAdd(value);
  96. CurrLen++;
  97. }
  98. void Add(TBlockItem value, size_t count) final {
  99. Y_DEBUG_ABORT_UNLESS(CurrLen + count <= MaxLen);
  100. DoAdd(value, count);
  101. CurrLen += count;
  102. }
  103. void Add(TInputBuffer& input) final {
  104. Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
  105. DoAdd(input);
  106. CurrLen++;
  107. }
  108. void AddDefault() {
  109. Y_DEBUG_ABORT_UNLESS(CurrLen < MaxLen);
  110. DoAddDefault();
  111. CurrLen++;
  112. }
  113. inline void AddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
  114. TArrayDataItem item = { &array, 0 };
  115. Self::AddMany(&item, 1, beginIndex, count);
  116. }
  117. inline void AddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
  118. TArrayDataItem item = { &array, 0 };
  119. Self::AddMany(&item, 1, indexes, count);
  120. }
  121. void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) final {
  122. Y_ABORT_UNLESS(size_t(array.length) == bitmapSize);
  123. Y_ABORT_UNLESS(popCount <= bitmapSize);
  124. Y_ABORT_UNLESS(CurrLen + popCount <= MaxLen);
  125. if (popCount) {
  126. DoAddMany(array, sparseBitmap, popCount);
  127. }
  128. CurrLen += popCount;
  129. }
  130. void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) final {
  131. Y_ABORT_UNLESS(arrays);
  132. Y_ABORT_UNLESS(arrayCount > 0);
  133. if (arrayCount == 1) {
  134. Y_ABORT_UNLESS(arrays->Data);
  135. DoAddMany(*arrays->Data, beginIndex, count);
  136. } else {
  137. ui64 idx = beginIndex;
  138. auto item = LookupArrayDataItem(arrays, arrayCount, idx);
  139. size_t avail = item->Data->length;
  140. size_t toAdd = count;
  141. Y_ABORT_UNLESS(idx <= avail);
  142. while (toAdd) {
  143. size_t adding = std::min(avail, toAdd);
  144. DoAddMany(*item->Data, idx, adding);
  145. avail -= adding;
  146. toAdd -= adding;
  147. if (!avail && toAdd) {
  148. ++item;
  149. Y_ABORT_UNLESS(item < arrays + arrayCount);
  150. avail = item->Data->length;
  151. idx = 0;
  152. }
  153. }
  154. }
  155. CurrLen += count;
  156. }
  157. void AddMany(const TArrayDataItem* arrays, size_t arrayCount, const ui64* indexes, size_t count) final {
  158. Y_ABORT_UNLESS(arrays);
  159. Y_ABORT_UNLESS(arrayCount > 0);
  160. Y_ABORT_UNLESS(indexes);
  161. Y_ABORT_UNLESS(CurrLen + count <= MaxLen);
  162. if (arrayCount == 1) {
  163. Y_ABORT_UNLESS(arrays->Data);
  164. DoAddMany(*arrays->Data, indexes, count);
  165. CurrLen += count;
  166. } else {
  167. const IArrayBuilder::TArrayDataItem* currData = nullptr;
  168. TVector<ui64> currDataIndexes;
  169. for (size_t i = 0; i < count; ++i) {
  170. ui64 idx = indexes[i];
  171. const IArrayBuilder::TArrayDataItem* data = LookupArrayDataItem(arrays, arrayCount, idx);
  172. if (!currData) {
  173. currData = data;
  174. }
  175. if (data != currData) {
  176. DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size());
  177. CurrLen += currDataIndexes.size();
  178. currDataIndexes.clear();
  179. currData = data;
  180. }
  181. currDataIndexes.push_back(idx);
  182. }
  183. if (!currDataIndexes.empty()) {
  184. DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size());
  185. CurrLen += currDataIndexes.size();
  186. }
  187. }
  188. }
  189. arrow::Datum Build(bool finish) final {
  190. auto tree = BuildTree(finish);
  191. TVector<std::shared_ptr<arrow::ArrayData>> chunks;
  192. while (size_t size = CalcSliceSize(*tree)) {
  193. chunks.push_back(Slice(*tree, size));
  194. }
  195. return MakeArray(chunks);
  196. }
  197. TBlockArrayTree::Ptr BuildTree(bool finish) {
  198. auto result = DoBuildTree(finish);
  199. CurrLen = 0;
  200. return result;
  201. }
  202. protected:
  203. virtual void DoAdd(NUdf::TUnboxedValuePod value) = 0;
  204. virtual void DoAdd(TBlockItem value) = 0;
  205. virtual void DoAdd(TBlockItem value, size_t count) {
  206. for (size_t i = 0; i < count; ++i) {
  207. DoAdd(value);
  208. }
  209. }
  210. virtual void DoAdd(TInputBuffer& input) = 0;
  211. virtual void DoAddDefault() = 0;
  212. virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0;
  213. virtual void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) = 0;
  214. virtual void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) = 0;
  215. virtual TBlockArrayTree::Ptr DoBuildTree(bool finish) = 0;
  216. // returns the newly allocated size in bytes
  217. virtual size_t DoReserve() = 0;
  218. private:
  219. static size_t CalcSliceSize(const TBlockArrayTree& tree) {
  220. if (tree.Payload.empty()) {
  221. return 0;
  222. }
  223. if (!tree.Children.empty()) {
  224. Y_ABORT_UNLESS(tree.Payload.size() == 1);
  225. size_t result = std::numeric_limits<size_t>::max();
  226. for (auto& child : tree.Children) {
  227. size_t childSize = CalcSliceSize(*child);
  228. result = std::min(result, childSize);
  229. }
  230. Y_ABORT_UNLESS(result <= size_t(tree.Payload.front()->length));
  231. return result;
  232. }
  233. int64_t result = tree.Payload.front()->length;
  234. Y_ABORT_UNLESS(result > 0);
  235. return static_cast<size_t>(result);
  236. }
  237. static std::shared_ptr<arrow::ArrayData> Slice(TBlockArrayTree& tree, size_t size) {
  238. Y_ABORT_UNLESS(size > 0);
  239. Y_ABORT_UNLESS(!tree.Payload.empty());
  240. auto& main = tree.Payload.front();
  241. std::shared_ptr<arrow::ArrayData> sliced;
  242. if (size == size_t(main->length)) {
  243. sliced = main;
  244. tree.Payload.pop_front();
  245. } else {
  246. Y_ABORT_UNLESS(size < size_t(main->length));
  247. sliced = Chop(main, size);
  248. }
  249. if (!tree.Children.empty()) {
  250. std::vector<std::shared_ptr<arrow::ArrayData>> children;
  251. for (auto& child : tree.Children) {
  252. children.push_back(Slice(*child, size));
  253. }
  254. sliced->child_data = std::move(children);
  255. if (tree.Payload.empty()) {
  256. tree.Children.clear();
  257. }
  258. }
  259. return sliced;
  260. }
  261. protected:
  262. size_t GetCurrLen() const {
  263. return CurrLen;
  264. }
  265. void SetCurrLen(size_t len) {
  266. Y_ABORT_UNLESS(len <= MaxLen);
  267. CurrLen = len;
  268. }
  269. void Reserve() {
  270. auto allocated = DoReserve();
  271. if (TotalAllocated_) {
  272. *TotalAllocated_ += allocated;
  273. }
  274. }
  275. void AddExtraAllocated(size_t bytes) {
  276. if (TotalAllocated_) {
  277. *TotalAllocated_ += bytes;
  278. }
  279. }
  280. const std::shared_ptr<arrow::DataType> ArrowType;
  281. arrow::MemoryPool* const Pool;
  282. const size_t MaxLen;
  283. const size_t MaxBlockSizeInBytes;
  284. const TMaybe<ui8> MinFillPercentage;
  285. private:
  286. size_t CurrLen = 0;
  287. size_t* TotalAllocated_ = nullptr;
  288. };
  289. template<typename TLayout, bool Nullable, typename TDerived>
  290. class TFixedSizeArrayBuilderBase : public TArrayBuilderBase {
  291. public:
  292. TFixedSizeArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params)
  293. : TArrayBuilderBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
  294. {
  295. Reserve();
  296. }
  297. TFixedSizeArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params)
  298. : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params)
  299. {
  300. Reserve();
  301. }
  302. void UnsafeReserve(size_t length) {
  303. SetCurrLen(length);
  304. }
  305. TLayout* MutableData() {
  306. return DataPtr;
  307. }
  308. ui8* MutableValidMask() {
  309. return NullPtr;
  310. }
  311. void DoAdd(NUdf::TUnboxedValuePod value) final {
  312. if constexpr (Nullable) {
  313. if (!value) {
  314. DoAddNull();
  315. return;
  316. }
  317. NullPtr[GetCurrLen()] = 1;
  318. }
  319. static_cast<TDerived*>(this)->DoAddNotNull(value);
  320. }
  321. void DoAdd(TBlockItem value) final {
  322. if constexpr (Nullable) {
  323. if (!value) {
  324. DoAddNull();
  325. return;
  326. }
  327. NullPtr[GetCurrLen()] = 1;
  328. }
  329. static_cast<TDerived*>(this)->DoAddNotNull(value);
  330. }
  331. void DoAddNull() {
  332. if constexpr (Nullable) {
  333. NullPtr[GetCurrLen()] = 0;
  334. PlaceItem(TLayout{});
  335. }
  336. }
  337. void DoAdd(TBlockItem value, size_t count) final {
  338. if constexpr (Nullable) {
  339. if (!value) {
  340. std::fill(NullPtr + GetCurrLen(), NullPtr + GetCurrLen() + count, 0);
  341. std::fill(DataPtr + GetCurrLen(), DataPtr + GetCurrLen() + count, TLayout{});
  342. return;
  343. }
  344. std::fill(NullPtr + GetCurrLen(), NullPtr + GetCurrLen() + count, 1);
  345. }
  346. static_cast<TDerived*>(this)->DoAddNotNull(value, count);
  347. }
  348. void DoAdd(TInputBuffer &input) final {
  349. if constexpr (Nullable) {
  350. if (!input.PopChar()) {
  351. DoAddNull();
  352. return;
  353. }
  354. }
  355. static_cast<TDerived*>(this)->DoAddNotNull(input);
  356. }
  357. void DoAddDefault() final {
  358. if constexpr (Nullable) {
  359. NullPtr[GetCurrLen()] = 1;
  360. }
  361. PlaceItem(TLayout{});
  362. }
  363. void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
  364. Y_ABORT_UNLESS(array.buffers.size() > 1);
  365. if constexpr (Nullable) {
  366. if (array.buffers.front()) {
  367. ui8* dstBitmap = NullPtr + GetCurrLen();
  368. CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
  369. } else {
  370. ui8* dstBitmap = NullPtr + GetCurrLen();
  371. std::fill_n(dstBitmap, popCount, 1);
  372. }
  373. }
  374. const TLayout* src = array.GetValues<TLayout>(1);
  375. TLayout* dst = DataPtr + GetCurrLen();
  376. CompressArray(src, sparseBitmap, dst, array.length);
  377. }
  378. void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
  379. Y_ABORT_UNLESS(array.buffers.size() > 1);
  380. if constexpr (Nullable) {
  381. for (size_t i = beginIndex; i < beginIndex + count; ++i) {
  382. NullPtr[GetCurrLen() + i - beginIndex] = !IsNull(array, i);
  383. }
  384. }
  385. const TLayout* values = array.GetValues<TLayout>(1);
  386. for (size_t i = beginIndex; i < beginIndex + count; ++i) {
  387. ::new(DataPtr + GetCurrLen() + i - beginIndex) TLayout(values[i]);
  388. }
  389. }
  390. void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
  391. Y_ABORT_UNLESS(array.buffers.size() > 1);
  392. if constexpr (Nullable) {
  393. for (size_t i = 0; i < count; ++i) {
  394. NullPtr[GetCurrLen() + i] = !IsNull(array, indexes[i]);
  395. }
  396. }
  397. const TLayout* values = array.GetValues<TLayout>(1);
  398. for (size_t i = 0; i < count; ++i) {
  399. ::new(DataPtr + GetCurrLen() + i) TLayout(values[indexes[i]]);
  400. }
  401. }
  402. TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
  403. const size_t len = GetCurrLen();
  404. std::shared_ptr<arrow::Buffer> nulls;
  405. if constexpr (Nullable) {
  406. NullBuilder->UnsafeAdvance(len);
  407. nulls = NullBuilder->Finish();
  408. nulls = MakeDenseBitmap(nulls->data(), len, Pool);
  409. }
  410. DataBuilder->UnsafeAdvance(len);
  411. std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish();
  412. TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
  413. result->Payload.push_back(arrow::ArrayData::Make(ArrowType, len, {nulls, data}));
  414. NullBuilder.reset();
  415. DataBuilder.reset();
  416. if (!finish) {
  417. Reserve();
  418. }
  419. return result;
  420. }
  421. protected:
  422. void PlaceItem(TLayout&& value) {
  423. ::new(DataPtr + GetCurrLen()) TLayout(std::move(value));
  424. }
  425. TLayout* DataPtr = nullptr;
  426. private:
  427. size_t DoReserve() final {
  428. DataBuilder = std::make_unique<TTypedBufferBuilder<TLayout>>(Pool, MinFillPercentage);
  429. DataBuilder->Reserve(MaxLen + 1);
  430. DataPtr = DataBuilder->MutableData();
  431. auto result = DataBuilder->Capacity();
  432. if constexpr (Nullable) {
  433. NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool, MinFillPercentage);
  434. NullBuilder->Reserve(MaxLen + 1);
  435. NullPtr = NullBuilder->MutableData();
  436. result += NullBuilder->Capacity();
  437. }
  438. return result;
  439. }
  440. std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
  441. std::unique_ptr<TTypedBufferBuilder<TLayout>> DataBuilder;
  442. ui8* NullPtr = nullptr;
  443. };
  444. template<typename TLayout, bool Nullable>
  445. class TFixedSizeArrayBuilder final: public TFixedSizeArrayBuilderBase<TLayout, Nullable, TFixedSizeArrayBuilder<TLayout, Nullable>> {
  446. using TSelf = TFixedSizeArrayBuilder<TLayout, Nullable>;
  447. using TBase = TFixedSizeArrayBuilderBase<TLayout, Nullable, TSelf>;
  448. using TParams = TArrayBuilderBase::TParams;
  449. public:
  450. TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
  451. : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
  452. {}
  453. TFixedSizeArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
  454. : TBase(typeInfoHelper, type, pool, maxLen, params)
  455. {}
  456. void DoAddNotNull(TUnboxedValuePod value) {
  457. this->PlaceItem(value.Get<TLayout>());
  458. }
  459. void DoAddNotNull(TBlockItem value) {
  460. this->PlaceItem(value.Get<TLayout>());
  461. }
  462. void DoAddNotNull(TInputBuffer& input) {
  463. this->DoAdd(TBlockItem(input.PopNumber<TLayout>()));
  464. }
  465. void DoAddNotNull(TBlockItem value, size_t count) {
  466. std::fill(this->DataPtr + this->GetCurrLen(), this->DataPtr + this->GetCurrLen() + count, value.Get<TLayout>());
  467. }
  468. };
  469. template<bool Nullable>
  470. class TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable> final: public TFixedSizeArrayBuilderBase<NYql::NDecimal::TInt128, Nullable, TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>> {
  471. using TSelf = TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>;
  472. using TBase = TFixedSizeArrayBuilderBase<NYql::NDecimal::TInt128, Nullable, TSelf>;
  473. using TParams = TArrayBuilderBase::TParams;
  474. public:
  475. TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
  476. : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
  477. {}
  478. TFixedSizeArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
  479. : TBase(typeInfoHelper, type, pool, maxLen, params)
  480. {}
  481. void DoAddNotNull(TUnboxedValuePod value) {
  482. this->PlaceItem(value.GetInt128());
  483. }
  484. void DoAddNotNull(TBlockItem value) {
  485. this->PlaceItem(value.GetInt128());
  486. }
  487. void DoAddNotNull(TInputBuffer& input) {
  488. this->DoAdd(TBlockItem(input.PopNumber<NYql::NDecimal::TInt128>()));
  489. }
  490. void DoAddNotNull(TBlockItem value, size_t count) {
  491. std::fill(this->DataPtr + this->GetCurrLen(), this->DataPtr + this->GetCurrLen() + count, value.GetInt128());
  492. }
  493. };
  494. template<bool Nullable>
  495. class TResourceArrayBuilder final: public TFixedSizeArrayBuilderBase<TUnboxedValue, Nullable, TResourceArrayBuilder<Nullable>> {
  496. using TBase = TFixedSizeArrayBuilderBase<TUnboxedValue, Nullable, TResourceArrayBuilder<Nullable>>;
  497. using TParams = TArrayBuilderBase::TParams;
  498. public:
  499. TResourceArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
  500. : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
  501. {}
  502. TResourceArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
  503. : TBase(typeInfoHelper, type, pool, maxLen, params)
  504. {}
  505. void DoAddNotNull(TUnboxedValuePod value) {
  506. this->PlaceItem(TUnboxedValue(value));
  507. }
  508. TUnboxedValue FromBlockItem(TBlockItem item) {
  509. TUnboxedValue val;
  510. std::memcpy(val.GetRawPtr(), item.GetRawPtr(), sizeof(val));
  511. val.Ref();
  512. return val;
  513. }
  514. void DoAddNotNull(TBlockItem item) {
  515. this->PlaceItem(FromBlockItem(item));
  516. }
  517. void DoAddNotNull(TInputBuffer& input) {
  518. this->DoAdd(input.PopNumber<TUnboxedValuePod>());
  519. }
  520. void DoAddNotNull(TBlockItem item, size_t count) {
  521. for (size_t i = 0; i < count; ++i) {
  522. ::new(this->DataPtr + this->GetCurrLen() + i) TUnboxedValue(FromBlockItem(item));
  523. }
  524. }
  525. };
  526. template<typename TStringType, bool Nullable, EPgStringType PgString = EPgStringType::None>
  527. class TStringArrayBuilder final : public TArrayBuilderBase {
  528. using TOffset = typename TStringType::offset_type;
  529. public:
  530. TStringArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
  531. : TArrayBuilderBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
  532. {
  533. Reserve();
  534. }
  535. TStringArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
  536. : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params)
  537. {
  538. Reserve();
  539. }
  540. void SetPgBuilder(const NUdf::IPgBuilder* pgBuilder, i32 typeLen) {
  541. Y_ENSURE(PgString != EPgStringType::None);
  542. PgBuilder = pgBuilder;
  543. TypeLen = typeLen;
  544. }
  545. void DoAdd(NUdf::TUnboxedValuePod value) final {
  546. if constexpr (Nullable) {
  547. if (!value) {
  548. return DoAdd(TBlockItem{});
  549. }
  550. }
  551. if constexpr (PgString == EPgStringType::CString) {
  552. static_assert(Nullable);
  553. auto buf = PgBuilder->AsCStringBuffer(value);
  554. AddPgItem(buf);
  555. } else if constexpr (PgString == EPgStringType::Text) {
  556. static_assert(Nullable);
  557. auto buf = PgBuilder->AsTextBuffer(value);
  558. AddPgItem(buf);
  559. } else if constexpr (PgString == EPgStringType::Fixed) {
  560. static_assert(Nullable);
  561. auto buf = PgBuilder->AsFixedStringBuffer(value, TypeLen);
  562. AddPgItem(buf);
  563. } else {
  564. DoAdd(TBlockItem(value.AsStringRef()));
  565. }
  566. }
  567. template <bool AddCStringZero = false, ui32 AddVarHdr = 0>
  568. ui8* AddPgItem(TStringRef buf) {
  569. auto alignedSize = AlignUp(buf.Size() + sizeof(void*) + AddVarHdr + (AddCStringZero ? 1 : 0), sizeof(void*));
  570. auto ptr = AddNoFill(alignedSize);
  571. *(void**)ptr = nullptr;
  572. if (alignedSize > sizeof(void*)) {
  573. // clear padding too
  574. *(void**)(ptr + alignedSize - sizeof(void*)) = nullptr;
  575. }
  576. std::memcpy(ptr + sizeof(void*) + AddVarHdr, buf.Data(), buf.Size());
  577. if constexpr (AddCStringZero) {
  578. ptr[sizeof(void*) + buf.Size()] = 0;
  579. }
  580. return ptr;
  581. }
  582. ui8* AddNoFill(size_t size) {
  583. size_t currentLen = DataBuilder->Length();
  584. // empty string can always be appended
  585. if (size > 0 && currentLen + size > MaxBlockSizeInBytes) {
  586. if (currentLen) {
  587. FlushChunk(false);
  588. }
  589. if (size > MaxBlockSizeInBytes) {
  590. ReserveForLargeString(size);
  591. }
  592. }
  593. AppendCurrentOffset();
  594. auto ret = DataBuilder->End();
  595. DataBuilder->UnsafeAdvance(size);
  596. if constexpr (Nullable) {
  597. NullBuilder->UnsafeAppend(1);
  598. }
  599. return ret;
  600. }
  601. void DoAdd(TBlockItem value) final {
  602. if constexpr (Nullable) {
  603. if (!value) {
  604. NullBuilder->UnsafeAppend(0);
  605. AppendCurrentOffset();
  606. return;
  607. }
  608. }
  609. const std::string_view str = value.AsStringRef();
  610. auto ptr = AddNoFill(str.size());
  611. std::memcpy(ptr, str.data(), str.size());
  612. }
  613. void DoAdd(TInputBuffer& input) final {
  614. if constexpr (Nullable) {
  615. if (!input.PopChar()) {
  616. return DoAdd(TBlockItem{});
  617. }
  618. }
  619. auto str = input.PopString();
  620. TStringRef ref(str.data(), str.size());
  621. DoAdd(TBlockItem(ref));
  622. }
  623. void DoAddDefault() final {
  624. if constexpr (Nullable) {
  625. NullBuilder->UnsafeAppend(1);
  626. }
  627. AppendCurrentOffset();
  628. }
  629. void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
  630. Y_UNUSED(popCount);
  631. Y_ABORT_UNLESS(array.buffers.size() > 2);
  632. Y_ABORT_UNLESS(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
  633. const ui8* srcNulls = array.GetValues<ui8>(0, 0);
  634. const TOffset* srcOffset = array.GetValues<TOffset>(1);
  635. const ui8* srcData = array.GetValues<ui8>(2, 0);
  636. const ui8* chunkStart = srcData;
  637. const ui8* chunkEnd = chunkStart;
  638. size_t dataLen = DataBuilder->Length();
  639. ui8* dstNulls = Nullable ? NullBuilder->End() : nullptr;
  640. TOffset* dstOffset = OffsetsBuilder->End();
  641. size_t countAdded = 0;
  642. for (size_t i = 0; i < size_t(array.length); i++) {
  643. if (!sparseBitmap[i]) {
  644. continue;
  645. }
  646. const ui8* begin = srcData + srcOffset[i];
  647. const ui8* end = srcData + srcOffset[i + 1];
  648. const size_t strSize = end - begin;
  649. size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
  650. for (;;) {
  651. // try to append ith string
  652. if (strSize <= availBytes) {
  653. if (begin == chunkEnd) {
  654. chunkEnd = end;
  655. } else {
  656. DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
  657. chunkStart = begin;
  658. chunkEnd = end;
  659. }
  660. size_t nullOffset = i + array.offset;
  661. if constexpr (Nullable) {
  662. *dstNulls++ = srcNulls ? ((srcNulls[nullOffset >> 3] >> (nullOffset & 7)) & 1) : 1u;
  663. }
  664. *dstOffset++ = dataLen;
  665. dataLen += strSize;
  666. ++countAdded;
  667. break;
  668. }
  669. if (dataLen) {
  670. if (chunkStart != chunkEnd) {
  671. DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
  672. chunkStart = chunkEnd = srcData;
  673. }
  674. Y_ABORT_UNLESS(dataLen == DataBuilder->Length());
  675. OffsetsBuilder->UnsafeAdvance(countAdded);
  676. if constexpr (Nullable) {
  677. NullBuilder->UnsafeAdvance(countAdded);
  678. }
  679. FlushChunk(false);
  680. dataLen = 0;
  681. countAdded = 0;
  682. if constexpr (Nullable) {
  683. dstNulls = NullBuilder->End();
  684. }
  685. dstOffset = OffsetsBuilder->End();
  686. } else {
  687. ReserveForLargeString(strSize);
  688. availBytes = strSize;
  689. }
  690. }
  691. }
  692. if (chunkStart != chunkEnd) {
  693. DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
  694. }
  695. Y_ABORT_UNLESS(dataLen == DataBuilder->Length());
  696. OffsetsBuilder->UnsafeAdvance(countAdded);
  697. if constexpr (Nullable) {
  698. NullBuilder->UnsafeAdvance(countAdded);
  699. }
  700. }
  701. void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
  702. Y_ABORT_UNLESS(array.buffers.size() > 2);
  703. Y_ABORT_UNLESS(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
  704. size_t dataLen = DataBuilder->Length();
  705. const TOffset* offsets = array.GetValues<TOffset>(1);
  706. const ui8* srcData = array.GetValues<ui8>(2, 0);
  707. const ui8* chunkStart = srcData + offsets[beginIndex];
  708. const ui8* chunkEnd = chunkStart;
  709. for (size_t i = beginIndex; i < beginIndex + count; ++i) {
  710. const ui8* begin = srcData + offsets[i];
  711. const ui8* end = srcData + offsets[i + 1];
  712. const size_t strSize = end - begin;
  713. size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
  714. for (;;) {
  715. if (strSize <= availBytes) {
  716. if constexpr (Nullable) {
  717. NullBuilder->UnsafeAppend(!IsNull(array, i));
  718. }
  719. OffsetsBuilder->UnsafeAppend(TOffset(dataLen));
  720. chunkEnd = end;
  721. dataLen += strSize;
  722. break;
  723. }
  724. if (dataLen) {
  725. DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
  726. chunkStart = begin;
  727. chunkEnd = end;
  728. FlushChunk(false);
  729. dataLen = 0;
  730. } else {
  731. ReserveForLargeString(strSize);
  732. availBytes = strSize;
  733. }
  734. }
  735. }
  736. if (chunkStart != chunkEnd) {
  737. DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
  738. }
  739. }
  740. void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
  741. Y_ABORT_UNLESS(array.buffers.size() > 2);
  742. Y_ABORT_UNLESS(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
  743. size_t dataLen = DataBuilder->Length();
  744. const TOffset* offsets = array.GetValues<TOffset>(1);
  745. const char* strData = array.GetValues<char>(2, 0);
  746. for (size_t i = 0; i < count; ++i) {
  747. ui64 idx = indexes[i];
  748. std::string_view str(strData + offsets[idx], offsets[idx + 1] - offsets[idx]);
  749. size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
  750. for (;;) {
  751. if (str.size() <= availBytes) {
  752. if constexpr (Nullable) {
  753. NullBuilder->UnsafeAppend(!IsNull(array, idx));
  754. }
  755. OffsetsBuilder->UnsafeAppend(TOffset(dataLen));
  756. DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size());
  757. dataLen += str.size();
  758. break;
  759. }
  760. if (dataLen) {
  761. FlushChunk(false);
  762. dataLen = 0;
  763. } else {
  764. ReserveForLargeString(str.size());
  765. availBytes = str.size();
  766. }
  767. }
  768. }
  769. }
  770. TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
  771. FlushChunk(finish);
  772. TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
  773. result->Payload = std::move(Chunks);
  774. Chunks.clear();
  775. return result;
  776. }
  777. private:
  778. size_t DoReserve() final {
  779. OffsetsBuilder = std::make_unique<TTypedBufferBuilder<TOffset>>(Pool, MinFillPercentage);
  780. OffsetsBuilder->Reserve(MaxLen + 1);
  781. DataBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool, MinFillPercentage);
  782. DataBuilder->Reserve(MaxBlockSizeInBytes);
  783. auto result = OffsetsBuilder->Capacity() + DataBuilder->Capacity();
  784. if constexpr (Nullable) {
  785. NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool, MinFillPercentage);
  786. NullBuilder->Reserve(MaxLen + 1);
  787. result += NullBuilder->Capacity();
  788. }
  789. return result;
  790. }
  791. void ReserveForLargeString(size_t strSize) {
  792. size_t before = DataBuilder->Capacity();
  793. DataBuilder->Reserve(strSize);
  794. size_t after = DataBuilder->Capacity();
  795. Y_ENSURE(before <= after);
  796. AddExtraAllocated(after - before);
  797. }
  798. void AppendCurrentOffset() {
  799. OffsetsBuilder->UnsafeAppend(DataBuilder->Length());
  800. }
  801. void FlushChunk(bool finish) {
  802. const auto length = OffsetsBuilder->Length();
  803. Y_ABORT_UNLESS(length > 0);
  804. AppendCurrentOffset();
  805. std::shared_ptr<arrow::Buffer> nullBitmap;
  806. if constexpr (Nullable) {
  807. nullBitmap = NullBuilder->Finish();
  808. nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
  809. }
  810. std::shared_ptr<arrow::Buffer> offsets = OffsetsBuilder->Finish();
  811. std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish();
  812. Chunks.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap, offsets, data }));
  813. if (!finish) {
  814. Reserve();
  815. }
  816. }
  817. std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
  818. std::unique_ptr<TTypedBufferBuilder<TOffset>> OffsetsBuilder;
  819. std::unique_ptr<TTypedBufferBuilder<ui8>> DataBuilder;
  820. std::deque<std::shared_ptr<arrow::ArrayData>> Chunks;
  821. const IPgBuilder* PgBuilder = nullptr;
  822. i32 TypeLen = 0;
  823. };
  824. template<bool Nullable, typename TDerived>
  825. class TTupleArrayBuilderBase : public TArrayBuilderBase {
  826. public:
  827. TTupleArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
  828. : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params)
  829. {
  830. Reserve();
  831. }
  832. void DoAdd(NUdf::TUnboxedValuePod value) final {
  833. if constexpr (Nullable) {
  834. if (!value) {
  835. NullBuilder->UnsafeAppend(0);
  836. static_cast<TDerived*>(this)->AddToChildrenDefault();
  837. return;
  838. }
  839. NullBuilder->UnsafeAppend(1);
  840. }
  841. static_cast<TDerived*>(this)->AddToChildren(value);
  842. }
  843. void DoAdd(TBlockItem value) final {
  844. if constexpr (Nullable) {
  845. if (!value) {
  846. NullBuilder->UnsafeAppend(0);
  847. static_cast<TDerived*>(this)->AddToChildrenDefault();
  848. return;
  849. }
  850. NullBuilder->UnsafeAppend(1);
  851. }
  852. static_cast<TDerived*>(this)->AddToChildren(value);
  853. }
  854. void DoAdd(TInputBuffer& input) final {
  855. if constexpr (Nullable) {
  856. if (!input.PopChar()) {
  857. NullBuilder->UnsafeAppend(0);
  858. static_cast<TDerived*>(this)->AddToChildrenDefault();
  859. return;
  860. }
  861. NullBuilder->UnsafeAppend(1);
  862. }
  863. static_cast<TDerived*>(this)->AddToChildren(input);
  864. }
  865. void DoAddDefault() final {
  866. if constexpr (Nullable) {
  867. NullBuilder->UnsafeAppend(1);
  868. }
  869. static_cast<TDerived*>(this)->AddToChildrenDefault();
  870. }
  871. void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
  872. Y_ABORT_UNLESS(!array.buffers.empty());
  873. if constexpr (Nullable) {
  874. if (array.buffers.front()) {
  875. ui8* dstBitmap = NullBuilder->End();
  876. CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
  877. NullBuilder->UnsafeAdvance(popCount);
  878. } else {
  879. NullBuilder->UnsafeAppend(popCount, 1);
  880. }
  881. }
  882. static_cast<TDerived*>(this)->AddManyToChildren(array, sparseBitmap, popCount);
  883. }
  884. void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
  885. Y_ABORT_UNLESS(!array.buffers.empty());
  886. if constexpr (Nullable) {
  887. for (ui64 i = beginIndex; i < beginIndex + count; ++i) {
  888. NullBuilder->UnsafeAppend(!IsNull(array, i));
  889. }
  890. }
  891. static_cast<TDerived*>(this)->AddManyToChildren(array, beginIndex, count);
  892. }
  893. void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
  894. Y_ABORT_UNLESS(!array.buffers.empty());
  895. if constexpr (Nullable) {
  896. for (size_t i = 0; i < count; ++i) {
  897. NullBuilder->UnsafeAppend(!IsNull(array, indexes[i]));
  898. }
  899. }
  900. static_cast<TDerived*>(this)->AddManyToChildren(array, indexes, count);
  901. }
  902. TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
  903. TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
  904. std::shared_ptr<arrow::Buffer> nullBitmap;
  905. const size_t length = GetCurrLen();
  906. if constexpr (Nullable) {
  907. Y_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length");
  908. nullBitmap = NullBuilder->Finish();
  909. nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
  910. }
  911. Y_ABORT_UNLESS(length);
  912. result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap }));
  913. static_cast<TDerived*>(this)->BuildChildrenTree(finish, result->Children);
  914. if (!finish) {
  915. Reserve();
  916. }
  917. return result;
  918. }
  919. private:
  920. size_t DoReserve() final {
  921. if constexpr (Nullable) {
  922. NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool, MinFillPercentage);
  923. NullBuilder->Reserve(MaxLen + 1);
  924. return NullBuilder->Capacity();
  925. }
  926. return 0;
  927. }
  928. private:
  929. std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
  930. };
  931. template<bool Nullable>
  932. class TTupleArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTupleArrayBuilder<Nullable>> {
  933. using TBase = TTupleArrayBuilderBase<Nullable, TTupleArrayBuilder<Nullable>>;
  934. using TParams = TArrayBuilderBase::TParams;
  935. public:
  936. TTupleArrayBuilder(TVector<TArrayBuilderBase::Ptr>&& children, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool,
  937. size_t maxLen, const TParams& params = {})
  938. : TBase(typeInfoHelper, type, pool, maxLen, params)
  939. , Children_(std::move(children))
  940. {
  941. }
  942. void AddToChildrenDefault() {
  943. for (ui32 i = 0; i < Children_.size(); ++i) {
  944. Children_[i]->AddDefault();
  945. }
  946. }
  947. void AddToChildren(NUdf::TUnboxedValuePod value) {
  948. auto elements = value.GetElements();
  949. if (elements) {
  950. for (ui32 i = 0; i < Children_.size(); ++i) {
  951. Children_[i]->Add(elements[i]);
  952. }
  953. } else {
  954. for (ui32 i = 0; i < Children_.size(); ++i) {
  955. auto element = value.GetElement(i);
  956. Children_[i]->Add(element);
  957. }
  958. }
  959. }
  960. void AddToChildren(TBlockItem value) {
  961. auto elements = value.AsTuple();
  962. for (ui32 i = 0; i < Children_.size(); ++i) {
  963. Children_[i]->Add(elements[i]);
  964. }
  965. }
  966. void AddToChildren(TInputBuffer& input) {
  967. for (ui32 i = 0; i < Children_.size(); ++i) {
  968. Children_[i]->Add(input);
  969. }
  970. }
  971. void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) {
  972. Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
  973. for (size_t i = 0; i < Children_.size(); ++i) {
  974. Children_[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length);
  975. }
  976. }
  977. void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
  978. Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
  979. for (size_t i = 0; i < Children_.size(); ++i) {
  980. Children_[i]->AddMany(*array.child_data[i], beginIndex, count);
  981. }
  982. }
  983. void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
  984. Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
  985. for (size_t i = 0; i < Children_.size(); ++i) {
  986. Children_[i]->AddMany(*array.child_data[i], indexes, count);
  987. }
  988. }
  989. void BuildChildrenTree(bool finish, std::vector<TArrayBuilderBase::TBlockArrayTree::Ptr>& resultChildren) {
  990. resultChildren.reserve(Children_.size());
  991. for (ui32 i = 0; i < Children_.size(); ++i) {
  992. resultChildren.emplace_back(Children_[i]->BuildTree(finish));
  993. }
  994. }
  995. private:
  996. TVector<std::unique_ptr<TArrayBuilderBase>> Children_;
  997. };
  998. template<typename TDate, bool Nullable>
  999. class TTzDateArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTzDateArrayBuilder<TDate, Nullable>> {
  1000. using TBase = TTupleArrayBuilderBase<Nullable, TTzDateArrayBuilder<TDate, Nullable>>;
  1001. using TParams = TArrayBuilderBase::TParams;
  1002. using TDateLayout = typename TDataType<TDate>::TLayout;
  1003. static constexpr auto DataSlot = TDataType<TDate>::Slot;
  1004. public:
  1005. TTzDateArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
  1006. : TBase(typeInfoHelper, type, pool, maxLen, params)
  1007. , DateBuilder_(typeInfoHelper, GetArrowType(typeInfoHelper, type), pool, maxLen, params)
  1008. , TimezoneBuilder_(typeInfoHelper, arrow::uint16(), pool, maxLen, params)
  1009. {
  1010. }
  1011. void AddToChildrenDefault() {
  1012. DateBuilder_.AddDefault();
  1013. TimezoneBuilder_.AddDefault();
  1014. }
  1015. void AddToChildren(NUdf::TUnboxedValuePod value) {
  1016. DateBuilder_.Add(value);
  1017. TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId()));
  1018. }
  1019. void AddToChildren(TBlockItem value) {
  1020. DateBuilder_.Add(value);
  1021. TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId()));
  1022. }
  1023. void AddToChildren(TInputBuffer& input) {
  1024. DateBuilder_.Add(input);
  1025. TimezoneBuilder_.Add(input);
  1026. }
  1027. void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) {
  1028. Y_ABORT_UNLESS(array.child_data.size() == 2);
  1029. DateBuilder_.AddMany(*array.child_data[0], popCount, sparseBitmap, array.length);
  1030. TimezoneBuilder_.AddMany(*array.child_data[1], popCount, sparseBitmap, array.length);
  1031. }
  1032. void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
  1033. Y_ABORT_UNLESS(array.child_data.size() == 2);
  1034. DateBuilder_.AddMany(*array.child_data[0], beginIndex, count);
  1035. TimezoneBuilder_.AddMany(*array.child_data[1], beginIndex, count);
  1036. }
  1037. void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
  1038. Y_ABORT_UNLESS(array.child_data.size() == 2);
  1039. DateBuilder_.AddMany(*array.child_data[0], indexes, count);
  1040. TimezoneBuilder_.AddMany(*array.child_data[1], indexes, count);
  1041. }
  1042. void BuildChildrenTree(bool finish, std::vector<TArrayBuilderBase::TBlockArrayTree::Ptr>& resultChildren) {
  1043. resultChildren.emplace_back(DateBuilder_.BuildTree(finish));
  1044. resultChildren.emplace_back(TimezoneBuilder_.BuildTree(finish));
  1045. }
  1046. private:
  1047. TFixedSizeArrayBuilder<TDateLayout, false> DateBuilder_;
  1048. TFixedSizeArrayBuilder<ui16, false> TimezoneBuilder_;
  1049. };
  1050. class TExternalOptionalArrayBuilder final : public TArrayBuilderBase {
  1051. public:
  1052. TExternalOptionalArrayBuilder(std::unique_ptr<TArrayBuilderBase>&& inner, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool,
  1053. size_t maxLen, const TParams& params = {})
  1054. : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params)
  1055. , Inner(std::move(inner))
  1056. {
  1057. Reserve();
  1058. }
  1059. void DoAdd(NUdf::TUnboxedValuePod value) final {
  1060. if (!value) {
  1061. NullBuilder->UnsafeAppend(0);
  1062. Inner->AddDefault();
  1063. return;
  1064. }
  1065. NullBuilder->UnsafeAppend(1);
  1066. Inner->Add(value.GetOptionalValue());
  1067. }
  1068. void DoAdd(TBlockItem value) final {
  1069. if (!value) {
  1070. NullBuilder->UnsafeAppend(0);
  1071. Inner->AddDefault();
  1072. return;
  1073. }
  1074. NullBuilder->UnsafeAppend(1);
  1075. Inner->Add(value.GetOptionalValue());
  1076. }
  1077. void DoAdd(TInputBuffer& input) final {
  1078. if (!input.PopChar()) {
  1079. NullBuilder->UnsafeAppend(0);
  1080. Inner->AddDefault();
  1081. return;
  1082. }
  1083. NullBuilder->UnsafeAppend(1);
  1084. Inner->Add(input);
  1085. }
  1086. void DoAddDefault() final {
  1087. NullBuilder->UnsafeAppend(1);
  1088. Inner->AddDefault();
  1089. }
  1090. void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
  1091. Y_ABORT_UNLESS(!array.buffers.empty());
  1092. Y_ABORT_UNLESS(array.child_data.size() == 1);
  1093. if (array.buffers.front()) {
  1094. ui8* dstBitmap = NullBuilder->End();
  1095. CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
  1096. NullBuilder->UnsafeAdvance(popCount);
  1097. } else {
  1098. NullBuilder->UnsafeAppend(popCount, 1);
  1099. }
  1100. Inner->AddMany(*array.child_data[0], popCount, sparseBitmap, array.length);
  1101. }
  1102. void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
  1103. Y_ABORT_UNLESS(!array.buffers.empty());
  1104. Y_ABORT_UNLESS(array.child_data.size() == 1);
  1105. for (ui64 i = beginIndex; i < beginIndex + count; ++i) {
  1106. NullBuilder->UnsafeAppend(!IsNull(array, i));
  1107. }
  1108. Inner->AddMany(*array.child_data[0], beginIndex, count);
  1109. }
  1110. void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
  1111. Y_ABORT_UNLESS(!array.buffers.empty());
  1112. Y_ABORT_UNLESS(array.child_data.size() == 1);
  1113. for (size_t i = 0; i < count; ++i) {
  1114. NullBuilder->UnsafeAppend(!IsNull(array, indexes[i]));
  1115. }
  1116. Inner->AddMany(*array.child_data[0], indexes, count);
  1117. }
  1118. TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
  1119. TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
  1120. std::shared_ptr<arrow::Buffer> nullBitmap;
  1121. const size_t length = GetCurrLen();
  1122. Y_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length");
  1123. nullBitmap = NullBuilder->Finish();
  1124. nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
  1125. Y_ABORT_UNLESS(length);
  1126. result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap }));
  1127. result->Children.emplace_back(Inner->BuildTree(finish));
  1128. if (!finish) {
  1129. Reserve();
  1130. }
  1131. return result;
  1132. }
  1133. private:
  1134. size_t DoReserve() final {
  1135. NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool, MinFillPercentage);
  1136. NullBuilder->Reserve(MaxLen + 1);
  1137. return NullBuilder->Capacity();
  1138. }
  1139. private:
  1140. std::unique_ptr<TArrayBuilderBase> Inner;
  1141. std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
  1142. };
  1143. using TArrayBuilderParams = TArrayBuilderBase::TParams;
  1144. struct TBuilderTraits {
  1145. using TResult = TArrayBuilderBase;
  1146. template <bool Nullable>
  1147. using TTuple = TTupleArrayBuilder<Nullable>;
  1148. template <typename T, bool Nullable>
  1149. using TFixedSize = TFixedSizeArrayBuilder<T, Nullable>;
  1150. template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot TOriginal>
  1151. using TStrings = TStringArrayBuilder<TStringType, Nullable>;
  1152. using TExtOptional = TExternalOptionalArrayBuilder;
  1153. template<bool Nullable>
  1154. using TResource = TResourceArrayBuilder<Nullable>;
  1155. template<typename TTzDate, bool Nullable>
  1156. using TTzDateReader = TTzDateArrayBuilder<TTzDate, Nullable>;
  1157. constexpr static bool PassType = true;
  1158. static std::unique_ptr<TResult> MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) {
  1159. if (desc.PassByValue) {
  1160. return std::make_unique<TFixedSize<ui64, true>>(type, typeInfoHelper, pool, maxLen, params);
  1161. } else {
  1162. if (desc.Typelen == -1) {
  1163. auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Text>>(type, typeInfoHelper, pool, maxLen, params);
  1164. ret->SetPgBuilder(pgBuilder, desc.Typelen);
  1165. return ret;
  1166. } else if (desc.Typelen == -2) {
  1167. auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::CString>>(type, typeInfoHelper, pool, maxLen, params);
  1168. ret->SetPgBuilder(pgBuilder, desc.Typelen);
  1169. return ret;
  1170. } else {
  1171. auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Fixed>>(type, typeInfoHelper, pool, maxLen, params);
  1172. ret->SetPgBuilder(pgBuilder, desc.Typelen);
  1173. return ret;
  1174. }
  1175. }
  1176. }
  1177. static std::unique_ptr<TResult> MakeResource(bool isOptional, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) {
  1178. if (isOptional) {
  1179. return std::make_unique<TResource<true>>(type, typeInfoHelper, pool, maxLen, params);
  1180. } else {
  1181. return std::make_unique<TResource<false>>(type, typeInfoHelper, pool, maxLen, params);
  1182. }
  1183. }
  1184. template<typename TTzDate>
  1185. static std::unique_ptr<TResult> MakeTzDate(bool isOptional, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) {
  1186. if (isOptional) {
  1187. return std::make_unique<TTzDateReader<TTzDate, true>>(type, typeInfoHelper, pool, maxLen, params);
  1188. } else {
  1189. return std::make_unique<TTzDateReader<TTzDate, false>>(type, typeInfoHelper, pool, maxLen, params);
  1190. }
  1191. }
  1192. };
  1193. inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
  1194. const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
  1195. size_t maxBlockLength, const IPgBuilder* pgBuilder)
  1196. {
  1197. return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, TArrayBuilderParams {});
  1198. }
  1199. inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
  1200. const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
  1201. size_t maxBlockLength, const IPgBuilder* pgBuilder, size_t* totalAllocated)
  1202. {
  1203. return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, TArrayBuilderParams {.TotalAllocated = totalAllocated});
  1204. }
  1205. inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
  1206. const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
  1207. size_t maxBlockLength, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params)
  1208. {
  1209. return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, params);
  1210. }
  1211. inline std::unique_ptr<IScalarBuilder> MakeScalarBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
  1212. Y_UNUSED(typeInfoHelper);
  1213. Y_UNUSED(type);
  1214. Y_ENSURE(false);
  1215. return nullptr;
  1216. }
  1217. } // namespace NUdf
  1218. } // namespace NYql