util.cpp 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. #include "util.h"
  2. #include "bit_util.h"
  3. #include "defs.h"
  4. #include <arrow/array/array_base.h>
  5. #include <arrow/array/util.h>
  6. #include <arrow/chunked_array.h>
  7. #include <arrow/record_batch.h>
  8. namespace NYql {
  9. namespace NUdf {
  10. namespace {
  11. ui64 GetSizeOfArrayDataInBytes(const arrow::ArrayData& data) {
  12. ui64 size = sizeof(data);
  13. size += data.buffers.size() * sizeof(void*);
  14. size += data.child_data.size() * sizeof(void*);
  15. for (const auto& b : data.buffers) {
  16. if (b) {
  17. size += b->size();
  18. }
  19. }
  20. for (const auto& c : data.child_data) {
  21. if (c) {
  22. size += GetSizeOfArrayDataInBytes(*c);
  23. }
  24. }
  25. return size;
  26. }
  27. ui64 GetSizeOfDatumInBytes(const arrow::Datum& datum) {
  28. ui64 size = sizeof(datum);
  29. if (datum.is_scalar()) {
  30. const auto& scarray = ARROW_RESULT(arrow::MakeArrayFromScalar(*datum.scalar(), 1));
  31. return size + GetSizeOfArrayDataInBytes(*scarray->data());
  32. }
  33. if (datum.is_arraylike()) {
  34. ForEachArrayData(datum, [&size](const auto& arrayData) {
  35. size += GetSizeOfArrayDataInBytes(*arrayData);
  36. });
  37. return size;
  38. }
  39. Y_ABORT("Not yet implemented");
  40. }
  41. } // namespace
  42. std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool) {
  43. // align up to 64 bit
  44. bitCount = (bitCount + 63u) & ~size_t(63u);
  45. // this simplifies code compression code - we can write single 64 bit word after array boundaries
  46. bitCount += 64;
  47. return ARROW_RESULT(arrow::AllocateBitmap(bitCount, pool));
  48. }
  49. std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool) {
  50. auto bitmap = AllocateBitmapWithReserve(len, pool);
  51. CompressSparseBitmap(bitmap->mutable_data(), srcSparse, len);
  52. return bitmap;
  53. }
  54. std::shared_ptr<arrow::Buffer> MakeDenseBitmapNegate(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool) {
  55. auto bitmap = AllocateBitmapWithReserve(len, pool);
  56. CompressSparseBitmapNegate(bitmap->mutable_data(), srcSparse, len);
  57. return bitmap;
  58. }
  59. std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len) {
  60. Y_ENSURE(data->length >= 0);
  61. Y_ENSURE(offset + len <= (size_t)data->length);
  62. if (offset == 0 && len == (size_t)data->length) {
  63. return data;
  64. }
  65. std::shared_ptr<arrow::ArrayData> result = data->Copy();
  66. result->offset = data->offset + offset;
  67. result->length = len;
  68. if (data->null_count == data->length) {
  69. result->null_count = len;
  70. } else if (len == 0) {
  71. result->null_count = 0;
  72. } else {
  73. result->null_count = data->null_count != 0 ? arrow::kUnknownNullCount : 0;
  74. }
  75. for (size_t i = 0; i < data->child_data.size(); ++i) {
  76. result->child_data[i] = DeepSlice(data->child_data[i], offset, len);
  77. }
  78. return result;
  79. }
  80. std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len) {
  81. auto first = DeepSlice(data, 0, len);
  82. data = DeepSlice(data, len, data->length - len);
  83. return first;
  84. }
  85. std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, bool isNestedOptional) {
  86. Y_ENSURE(data.GetNullCount() == 0);
  87. if (isNestedOptional) {
  88. Y_ENSURE(data.buffers.size() == 1);
  89. Y_ENSURE(data.child_data.size() == 1);
  90. return data.child_data.front();
  91. }
  92. auto result = data.Copy();
  93. result->buffers.front().reset();
  94. return result;
  95. }
  96. void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func) {
  97. Y_ENSURE(datum.is_arraylike(), "Expected array");
  98. if (datum.is_array()) {
  99. func(datum.array());
  100. } else {
  101. for (auto& chunk : datum.chunks()) {
  102. func(chunk->data());
  103. }
  104. }
  105. }
  106. arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks) {
  107. Y_ENSURE(!chunks.empty(), "Expected non empty chunks");
  108. arrow::ArrayVector resultChunks;
  109. for (auto& chunk : chunks) {
  110. resultChunks.push_back(arrow::Datum(chunk).make_array());
  111. }
  112. if (resultChunks.size() > 1) {
  113. auto type = resultChunks.front()->type();
  114. auto chunked = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(resultChunks), type));
  115. return arrow::Datum(chunked);
  116. }
  117. return arrow::Datum(resultChunks.front());
  118. }
  119. ui64 GetSizeOfArrowBatchInBytes(const arrow::RecordBatch& batch) {
  120. ui64 size = sizeof(batch);
  121. size += batch.num_columns() * sizeof(void*);
  122. for (int i = 0; i < batch.num_columns(); ++i) {
  123. size += GetSizeOfArrayDataInBytes(*batch.column_data(i));
  124. }
  125. return size;
  126. }
  127. ui64 GetSizeOfArrowExecBatchInBytes(const arrow::compute::ExecBatch& batch) {
  128. ui64 size = sizeof(batch);
  129. size += batch.num_values() * sizeof(void*);
  130. for (const auto& datum : batch.values) {
  131. size += GetSizeOfDatumInBytes(datum);
  132. }
  133. return size;
  134. }
  135. }
  136. }