util.h 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. #pragma once
  2. #include "defs.h"
  3. #include <util/generic/vector.h>
  4. #include <arrow/compute/api.h>
  5. #include <arrow/datum.h>
  6. #include <arrow/memory_pool.h>
  7. #include <arrow/util/bit_util.h>
  8. #include <functional>
  9. namespace NYql {
  10. namespace NUdf {
  11. enum class EPgStringType {
  12. None,
  13. Text,
  14. CString,
  15. Fixed
  16. };
  17. std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool);
  18. std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool);
  19. std::shared_ptr<arrow::Buffer> MakeDenseBitmapNegate(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool);
  20. /// \brief Recursive version of ArrayData::Slice() method
  21. std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len);
  22. /// \brief Chops first len items of `data` as new ArrayData object
  23. std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len);
  24. /// \brief Unwrap array (decrease optional level)
  25. std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, bool isNestedOptional);
  26. void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func);
  27. arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks);
  28. inline bool IsNull(const arrow::ArrayData& data, size_t index) {
  29. return data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset);
  30. }
  31. ui64 GetSizeOfArrowBatchInBytes(const arrow::RecordBatch& batch);
  32. ui64 GetSizeOfArrowExecBatchInBytes(const arrow::compute::ExecBatch& batch);
  33. class TResizeableBuffer : public arrow::ResizableBuffer {
  34. public:
  35. explicit TResizeableBuffer(arrow::MemoryPool* pool)
  36. : ResizableBuffer(nullptr, 0, arrow::CPUDevice::memory_manager(pool))
  37. , Pool(pool)
  38. {
  39. }
  40. ~TResizeableBuffer() override {
  41. uint8_t* ptr = mutable_data();
  42. if (ptr) {
  43. Pool->Free(ptr, capacity_);
  44. }
  45. }
  46. arrow::Status Reserve(const int64_t capacity) override {
  47. if (capacity < 0) {
  48. return arrow::Status::Invalid("Negative buffer capacity: ", capacity);
  49. }
  50. uint8_t* ptr = mutable_data();
  51. if (!ptr || capacity > capacity_) {
  52. int64_t newCapacity = arrow::BitUtil::RoundUpToMultipleOf64(capacity);
  53. if (ptr) {
  54. ARROW_RETURN_NOT_OK(Pool->Reallocate(capacity_, newCapacity, &ptr));
  55. } else {
  56. ARROW_RETURN_NOT_OK(Pool->Allocate(newCapacity, &ptr));
  57. }
  58. data_ = ptr;
  59. capacity_ = newCapacity;
  60. }
  61. return arrow::Status::OK();
  62. }
  63. arrow::Status Resize(const int64_t newSize, bool shrink_to_fit = true) override {
  64. if (ARROW_PREDICT_FALSE(newSize < 0)) {
  65. return arrow::Status::Invalid("Negative buffer resize: ", newSize);
  66. }
  67. uint8_t* ptr = mutable_data();
  68. if (ptr && shrink_to_fit && newSize <= size_) {
  69. int64_t newCapacity = arrow::BitUtil::RoundUpToMultipleOf64(newSize);
  70. if (capacity_ != newCapacity) {
  71. ARROW_RETURN_NOT_OK(Pool->Reallocate(capacity_, newCapacity, &ptr));
  72. data_ = ptr;
  73. capacity_ = newCapacity;
  74. }
  75. } else {
  76. RETURN_NOT_OK(Reserve(newSize));
  77. }
  78. size_ = newSize;
  79. return arrow::Status::OK();
  80. }
  81. private:
  82. arrow::MemoryPool* Pool;
  83. };
  84. /// \brief same as arrow::AllocateResizableBuffer, but allows to control zero padding
  85. template<typename TBuffer = TResizeableBuffer>
  86. std::unique_ptr<arrow::ResizableBuffer> AllocateResizableBuffer(size_t size, arrow::MemoryPool* pool, bool zeroPad = false) {
  87. std::unique_ptr<TBuffer> result = std::make_unique<TBuffer>(pool);
  88. ARROW_OK(result->Reserve(size));
  89. if (zeroPad) {
  90. result->ZeroPadding();
  91. }
  92. return result;
  93. }
  94. /// \brief owning buffer that calls destructors
  95. template<typename T>
  96. class TResizableManagedBuffer final : public TResizeableBuffer {
  97. static_assert(!std::is_trivially_destructible_v<T>);
  98. public:
  99. explicit TResizableManagedBuffer(arrow::MemoryPool* pool)
  100. : TResizeableBuffer(pool) {}
  101. ~TResizableManagedBuffer() override {
  102. for (int64_t i = 0; i < size_; i += sizeof(T)) {
  103. auto* ptr = reinterpret_cast<T*>(mutable_data() + i);
  104. ptr->~T();
  105. }
  106. }
  107. };
  108. // similar to arrow::TypedBufferBuilder, but:
  109. // 1) with UnsafeAdvance() method
  110. // 2) shrinkToFit = false
  111. // 3) doesn't zero pad buffer
  112. template<typename T>
  113. class TTypedBufferBuilder {
  114. static_assert(!std::is_same_v<T, bool>);
  115. using TArrowBuffer = std::conditional_t<std::is_trivially_destructible_v<T>, TResizeableBuffer, TResizableManagedBuffer<T>>;
  116. public:
  117. explicit TTypedBufferBuilder(arrow::MemoryPool* pool)
  118. : Pool(pool)
  119. {
  120. }
  121. inline void Reserve(size_t size) {
  122. if (!Buffer) {
  123. bool zeroPad = false;
  124. Buffer = AllocateResizableBuffer<TArrowBuffer>(size * sizeof(T), Pool, zeroPad);
  125. } else {
  126. size_t requiredBytes = (size + Length()) * sizeof(T);
  127. size_t currentCapacity = Buffer->capacity();
  128. if (requiredBytes > currentCapacity) {
  129. size_t newCapacity = std::max(requiredBytes, currentCapacity * 2);
  130. ARROW_OK(Buffer->Reserve(newCapacity));
  131. }
  132. }
  133. }
  134. inline size_t Length() const {
  135. return Len;
  136. }
  137. inline size_t Capacity() const {
  138. return Buffer ? size_t(Buffer->capacity()) : 0;
  139. }
  140. inline T* MutableData() {
  141. return reinterpret_cast<T*>(Buffer->mutable_data());
  142. }
  143. inline T* End() {
  144. return MutableData() + Length();
  145. }
  146. inline const T* Data() const {
  147. return reinterpret_cast<const T*>(Buffer->data());
  148. }
  149. inline void UnsafeAppend(const T* values, size_t count) {
  150. Y_DEBUG_ABORT_UNLESS(count + Length() <= Buffer->capacity() / sizeof(T));
  151. std::memcpy(End(), values, count * sizeof(T));
  152. UnsafeAdvance(count);
  153. }
  154. inline void UnsafeAppend(size_t count, const T& value) {
  155. Y_DEBUG_ABORT_UNLESS(count + Length() <= Buffer->capacity() / sizeof(T));
  156. T* target = End();
  157. std::fill(target, target + count, value);
  158. UnsafeAdvance(count);
  159. }
  160. inline void UnsafeAppend(T&& value) {
  161. Y_DEBUG_ABORT_UNLESS(1 + Length() <= Buffer->capacity() / sizeof(T));
  162. *End() = std::move(value);
  163. UnsafeAdvance(1);
  164. }
  165. inline void UnsafeAdvance(size_t count) {
  166. Y_DEBUG_ABORT_UNLESS(count + Length() <= Buffer->capacity() / sizeof(T));
  167. Len += count;
  168. }
  169. inline std::shared_ptr<arrow::Buffer> Finish() {
  170. bool shrinkToFit = false;
  171. ARROW_OK(Buffer->Resize(Len * sizeof(T), shrinkToFit));
  172. std::shared_ptr<arrow::ResizableBuffer> result;
  173. std::swap(result, Buffer);
  174. Len = 0;
  175. return result;
  176. }
  177. private:
  178. arrow::MemoryPool* const Pool;
  179. std::shared_ptr<arrow::ResizableBuffer> Buffer;
  180. size_t Len = 0;
  181. };
  182. inline void* GetMemoryContext(const void* ptr) {
  183. return *(void**)((char*)ptr - sizeof(void*));
  184. }
  185. inline void SetMemoryContext(void* ptr, void* ctx) {
  186. *(void**)((char*)ptr - sizeof(void*)) = ctx;
  187. }
  188. inline void ZeroMemoryContext(void* ptr) {
  189. SetMemoryContext(ptr, nullptr);
  190. }
  191. }
  192. }