util.h 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. #pragma once
  2. #include "defs.h"
  3. #include <arrow/compute/api.h>
  4. #include <arrow/datum.h>
  5. #include <arrow/memory_pool.h>
  6. #include <arrow/util/bit_util.h>
  7. #include <util/generic/maybe.h>
  8. #include <util/generic/vector.h>
  9. #include <functional>
  10. namespace NYql {
  11. namespace NUdf {
  12. enum class EPgStringType {
  13. None,
  14. Text,
  15. CString,
  16. Fixed
  17. };
  18. std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool);
  19. std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool);
  20. std::shared_ptr<arrow::Buffer> MakeDenseBitmapNegate(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool);
  21. std::shared_ptr<arrow::Buffer> MakeDenseBitmapCopy(const ui8* src, size_t len, size_t offset, arrow::MemoryPool* pool);
  22. std::shared_ptr<arrow::Buffer> MakeDenseFalseBitmap(int64_t len, arrow::MemoryPool* pool);
  23. /// \brief Recursive version of ArrayData::Slice() method
  24. std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len);
  25. /// \brief Chops first len items of `data` as new ArrayData object
  26. std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len);
  27. /// \brief Unwrap array (decrease optional level)
  28. std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, bool isNestedOptional);
  29. void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func);
  30. arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks);
  31. inline bool IsNull(const arrow::ArrayData& data, size_t index) {
  32. return data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset);
  33. }
  34. ui64 GetSizeOfArrowBatchInBytes(const arrow::RecordBatch& batch);
  35. ui64 GetSizeOfArrowExecBatchInBytes(const arrow::compute::ExecBatch& batch);
  36. class TResizeableBuffer : public arrow::ResizableBuffer {
  37. public:
  38. explicit TResizeableBuffer(arrow::MemoryPool* pool)
  39. : ResizableBuffer(nullptr, 0, arrow::CPUDevice::memory_manager(pool))
  40. , Pool(pool)
  41. {
  42. }
  43. ~TResizeableBuffer() override {
  44. uint8_t* ptr = mutable_data();
  45. if (ptr) {
  46. Pool->Free(ptr, capacity_);
  47. }
  48. }
  49. arrow::Status Reserve(const int64_t capacity) override {
  50. if (capacity < 0) {
  51. return arrow::Status::Invalid("Negative buffer capacity: ", capacity);
  52. }
  53. uint8_t* ptr = mutable_data();
  54. if (!ptr || capacity > capacity_) {
  55. int64_t newCapacity = arrow::BitUtil::RoundUpToMultipleOf64(capacity);
  56. if (ptr) {
  57. ARROW_RETURN_NOT_OK(Pool->Reallocate(capacity_, newCapacity, &ptr));
  58. } else {
  59. ARROW_RETURN_NOT_OK(Pool->Allocate(newCapacity, &ptr));
  60. }
  61. data_ = ptr;
  62. capacity_ = newCapacity;
  63. }
  64. return arrow::Status::OK();
  65. }
  66. arrow::Status Resize(const int64_t newSize, bool shrink_to_fit = true) override {
  67. if (ARROW_PREDICT_FALSE(newSize < 0)) {
  68. return arrow::Status::Invalid("Negative buffer resize: ", newSize);
  69. }
  70. uint8_t* ptr = mutable_data();
  71. if (ptr && shrink_to_fit) {
  72. int64_t newCapacity = arrow::BitUtil::RoundUpToMultipleOf64(newSize);
  73. if (capacity_ != newCapacity) {
  74. ARROW_RETURN_NOT_OK(Pool->Reallocate(capacity_, newCapacity, &ptr));
  75. data_ = ptr;
  76. capacity_ = newCapacity;
  77. }
  78. } else {
  79. RETURN_NOT_OK(Reserve(newSize));
  80. }
  81. size_ = newSize;
  82. return arrow::Status::OK();
  83. }
  84. private:
  85. arrow::MemoryPool* Pool;
  86. };
  87. /// \brief same as arrow::AllocateResizableBuffer, but allows to control zero padding
  88. template<typename TBuffer = TResizeableBuffer>
  89. std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBuffer(size_t size, arrow::MemoryPool* pool, bool zeroPad = false) {
  90. std::unique_ptr<TBuffer> result = std::make_unique<TBuffer>(pool);
  91. ARROW_OK(result->Reserve(size));
  92. if (zeroPad) {
  93. result->ZeroPadding();
  94. }
  95. return result;
  96. }
  97. /// \brief owning buffer that calls destructors
  98. template<typename T>
  99. class TResizableManagedBuffer final : public TResizeableBuffer {
  100. static_assert(!std::is_trivially_destructible_v<T>);
  101. public:
  102. explicit TResizableManagedBuffer(arrow::MemoryPool* pool)
  103. : TResizeableBuffer(pool) {}
  104. ~TResizableManagedBuffer() override {
  105. for (int64_t i = 0; i < size_; i += sizeof(T)) {
  106. auto* ptr = reinterpret_cast<T*>(mutable_data() + i);
  107. ptr->~T();
  108. }
  109. }
  110. };
  111. // similar to arrow::TypedBufferBuilder, but:
  112. // 1) with UnsafeAdvance() method
  113. // 2) shrinkToFit = false
  114. // 3) doesn't zero pad buffer
  115. template<typename T>
  116. class TTypedBufferBuilder {
  117. static_assert(!std::is_same_v<T, bool>);
  118. using TArrowBuffer = std::conditional_t<std::is_trivially_destructible_v<T>, TResizeableBuffer, TResizableManagedBuffer<T>>;
  119. public:
  120. explicit TTypedBufferBuilder(arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage = {})
  121. : MinFillPercentage(minFillPercentage)
  122. , Pool(pool)
  123. {
  124. Y_ENSURE(!MinFillPercentage || *MinFillPercentage <= 100);
  125. }
  126. inline void Reserve(size_t size) {
  127. if (!Buffer) {
  128. bool zeroPad = false;
  129. Buffer = AllocateResizableBuffer<TArrowBuffer>(size * sizeof(T), Pool, zeroPad);
  130. } else {
  131. size_t requiredBytes = (size + Length()) * sizeof(T);
  132. size_t currentCapacity = Buffer->capacity();
  133. if (requiredBytes > currentCapacity) {
  134. size_t newCapacity = std::max(requiredBytes, currentCapacity * 2);
  135. ARROW_OK(Buffer->Reserve(newCapacity));
  136. }
  137. }
  138. }
  139. inline size_t Length() const {
  140. return Len;
  141. }
  142. inline size_t Capacity() const {
  143. return Buffer ? size_t(Buffer->capacity()) : 0;
  144. }
  145. inline T* MutableData() {
  146. return reinterpret_cast<T*>(Buffer->mutable_data());
  147. }
  148. inline T* End() {
  149. return MutableData() + Length();
  150. }
  151. inline const T* Data() const {
  152. return reinterpret_cast<const T*>(Buffer->data());
  153. }
  154. inline void UnsafeAppend(const T* values, size_t count) {
  155. Y_DEBUG_ABORT_UNLESS(count + Length() <= Buffer->capacity() / sizeof(T));
  156. std::memcpy(End(), values, count * sizeof(T));
  157. UnsafeAdvance(count);
  158. }
  159. inline void UnsafeAppend(size_t count, const T& value) {
  160. Y_DEBUG_ABORT_UNLESS(count + Length() <= Buffer->capacity() / sizeof(T));
  161. T* target = End();
  162. std::fill(target, target + count, value);
  163. UnsafeAdvance(count);
  164. }
  165. inline void UnsafeAppend(T&& value) {
  166. Y_DEBUG_ABORT_UNLESS(1 + Length() <= Buffer->capacity() / sizeof(T));
  167. *End() = std::move(value);
  168. UnsafeAdvance(1);
  169. }
  170. inline void UnsafeAdvance(size_t count) {
  171. Y_DEBUG_ABORT_UNLESS(count + Length() <= Buffer->capacity() / sizeof(T));
  172. Len += count;
  173. }
  174. inline std::shared_ptr<arrow::Buffer> Finish() {
  175. int64_t newSize = Len * sizeof(T);
  176. bool shrinkToFit = MinFillPercentage
  177. ? newSize <= Buffer->capacity() * *MinFillPercentage / 100
  178. : false;
  179. ARROW_OK(Buffer->Resize(newSize, shrinkToFit));
  180. std::shared_ptr<arrow::ResizableBuffer> result;
  181. std::swap(result, Buffer);
  182. Len = 0;
  183. return result;
  184. }
  185. private:
  186. const TMaybe<ui8> MinFillPercentage;
  187. arrow::MemoryPool* const Pool;
  188. std::shared_ptr<arrow::ResizableBuffer> Buffer;
  189. size_t Len = 0;
  190. };
  191. inline void* GetMemoryContext(const void* ptr) {
  192. return *(void**)((char*)ptr - sizeof(void*));
  193. }
  194. inline void SetMemoryContext(void* ptr, void* ctx) {
  195. *(void**)((char*)ptr - sizeof(void*)) = ctx;
  196. }
  197. inline void ZeroMemoryContext(void* ptr) {
  198. SetMemoryContext(ptr, nullptr);
  199. }
  200. } // namespace NUdf
  201. } // namespace NYql