mkql_alloc.cpp 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. #include "mkql_alloc.h"
  2. #include <util/system/align.h>
  3. #include <yql/essentials/public/udf/udf_value.h>
  4. #include <tuple>
  5. namespace NKikimr {
  6. namespace NMiniKQL {
  7. Y_POD_THREAD(TAllocState*) TlsAllocState;
  8. TAllocPageHeader TAllocState::EmptyPageHeader = { 0, 0, 0, 0, nullptr, nullptr };
  9. TAllocState::TCurrentPages TAllocState::EmptyCurrentPages = { &TAllocState::EmptyPageHeader, &TAllocState::EmptyPageHeader };
  10. void TAllocState::TListEntry::Link(TAllocState::TListEntry* root) noexcept {
  11. Left = root;
  12. Right = root->Right;
  13. Right->Left = Left->Right = this;
  14. }
  15. void TAllocState::TListEntry::Unlink() noexcept {
  16. std::tie(Right->Left, Left->Right) = std::make_pair(Left, Right);
  17. Clear();
  18. }
  19. TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAlignedPagePoolCounters &counters, bool supportsSizedAllocators)
  20. : TAlignedPagePool(location, counters)
  21. , SupportsSizedAllocators(supportsSizedAllocators)
  22. , CurrentPAllocList(&GlobalPAllocList)
  23. {
  24. GetRoot()->InitLinks();
  25. OffloadedBlocksRoot.InitLinks();
  26. GlobalPAllocList.InitLinks();
  27. ArrowBlocksRoot.InitLinks();
  28. }
  29. void TAllocState::CleanupPAllocList(TListEntry* root) {
  30. for (auto curr = root->Right; curr != root; ) {
  31. auto next = curr->Right;
  32. auto size = ((TMkqlPAllocHeader*)curr)->Size;
  33. auto fullSize = size + sizeof(TMkqlPAllocHeader);
  34. MKQLFreeWithSize(curr, fullSize, EMemorySubPool::Default); // may free items from OffloadedBlocksRoot
  35. curr = next;
  36. }
  37. root->InitLinks();
  38. }
  39. void TAllocState::CleanupArrowList(TListEntry* root) {
  40. for (auto curr = root->Right; curr != root; ) {
  41. auto next = curr->Right;
  42. #ifdef PROFILE_MEMORY_ALLOCATIONS
  43. free(curr);
  44. #else
  45. auto size = ((TMkqlArrowHeader*)curr)->Size;
  46. auto fullSize = size + sizeof(TMkqlArrowHeader);
  47. ReleaseAlignedPage(curr, fullSize);
  48. #endif
  49. curr = next;
  50. }
  51. root->InitLinks();
  52. }
  53. void TAllocState::KillAllBoxed() {
  54. {
  55. const auto root = GetRoot();
  56. for (auto next = root->GetRight(); next != root; next = root->GetLeft()) {
  57. next->Ref();
  58. next->~TBoxedValueLink();
  59. }
  60. GetRoot()->InitLinks();
  61. }
  62. {
  63. Y_ABORT_UNLESS(CurrentPAllocList == &GlobalPAllocList);
  64. CleanupPAllocList(&GlobalPAllocList);
  65. }
  66. {
  67. const auto root = &OffloadedBlocksRoot;
  68. for (auto curr = root->Right; curr != root; ) {
  69. auto next = curr->Right;
  70. free(curr);
  71. curr = next;
  72. }
  73. OffloadedBlocksRoot.InitLinks();
  74. }
  75. CleanupArrowList(&ArrowBlocksRoot);
  76. #ifndef NDEBUG
  77. ActiveMemInfo.clear();
  78. #endif
  79. }
  80. void TAllocState::InvalidateMemInfo() {
  81. #ifndef NDEBUG
  82. for (auto& pair : ActiveMemInfo) {
  83. pair.first->CheckOnExit(false);
  84. }
  85. #endif
  86. }
  87. size_t TAllocState::GetDeallocatedInPages() const {
  88. size_t deallocated = 0;
  89. for (auto x : AllPages) {
  90. auto currPage = (TAllocPageHeader*)x;
  91. if (currPage->UseCount) {
  92. deallocated += currPage->Deallocated;
  93. }
  94. }
  95. return deallocated;
  96. }
  97. void TAllocState::LockObject(::NKikimr::NUdf::TUnboxedValuePod value) {
  98. if (!UseRefLocking) {
  99. return;
  100. }
  101. void* obj;
  102. if (value.IsString()) {
  103. obj = value.AsStringRef().Data();
  104. } else if (value.IsBoxed()) {
  105. obj = value.AsBoxed().Get();
  106. } else {
  107. return;
  108. }
  109. auto [it, isNew] = LockedObjectsRefs.emplace(obj, TLockInfo{ 0, 0 });
  110. if (isNew) {
  111. it->second.OriginalRefs = value.LockRef();
  112. }
  113. ++it->second.Locks;
  114. }
  115. void TAllocState::UnlockObject(::NKikimr::NUdf::TUnboxedValuePod value) {
  116. if (!UseRefLocking) {
  117. return;
  118. }
  119. void* obj;
  120. if (value.IsString()) {
  121. obj = value.AsStringRef().Data();
  122. } else if (value.IsBoxed()) {
  123. obj = value.AsBoxed().Get();
  124. } else {
  125. return;
  126. }
  127. auto it = LockedObjectsRefs.find(obj);
  128. Y_ABORT_UNLESS(it != LockedObjectsRefs.end());
  129. if (--it->second.Locks == 0) {
  130. value.UnlockRef(it->second.OriginalRefs);
  131. LockedObjectsRefs.erase(it);
  132. }
  133. }
  134. void TScopedAlloc::Acquire() {
  135. if (!AttachedCount_) {
  136. if (PrevState_) {
  137. PgReleaseThreadContext(PrevState_->MainContext);
  138. }
  139. PrevState_ = TlsAllocState;
  140. TlsAllocState = &MyState_;
  141. PgAcquireThreadContext(MyState_.MainContext);
  142. } else {
  143. Y_ABORT_UNLESS(TlsAllocState == &MyState_, "Mismatch allocator in thread");
  144. }
  145. ++AttachedCount_;
  146. }
  147. void TScopedAlloc::Release() {
  148. if (AttachedCount_ && --AttachedCount_ == 0) {
  149. Y_ABORT_UNLESS(TlsAllocState == &MyState_, "Mismatch allocator in thread");
  150. PgReleaseThreadContext(MyState_.MainContext);
  151. TlsAllocState = PrevState_;
  152. if (PrevState_) {
  153. PgAcquireThreadContext(PrevState_->MainContext);
  154. }
  155. PrevState_ = nullptr;
  156. }
  157. }
  158. void* MKQLAllocSlow(size_t sz, TAllocState* state, const EMemorySubPool mPool) {
  159. auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT);
  160. auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize);
  161. auto currPage = (TAllocPageHeader*)state->GetBlock(capacity);
  162. currPage->Deallocated = 0;
  163. currPage->Capacity = capacity;
  164. currPage->Offset = roundedSize;
  165. auto& mPage = state->CurrentPages[(TMemorySubPoolIdx)mPool];
  166. auto newPageAvailable = capacity - roundedSize;
  167. auto curPageAvailable = mPage->Capacity - mPage->Offset;
  168. if (newPageAvailable > curPageAvailable) {
  169. mPage = currPage;
  170. }
  171. void* ret = (char*)currPage + sizeof(TAllocPageHeader);
  172. currPage->UseCount = 1;
  173. currPage->MyAlloc = state;
  174. currPage->Link = nullptr;
  175. return ret;
  176. }
  177. void MKQLFreeSlow(TAllocPageHeader* header, TAllocState *state, const EMemorySubPool mPool) noexcept {
  178. Y_DEBUG_ABORT_UNLESS(state);
  179. Y_DEBUG_ABORT_UNLESS(header->MyAlloc == state, "%s", (TStringBuilder() << "wrong allocator was used; "
  180. "allocated with: " << header->MyAlloc->GetDebugInfo() << " freed with: " << TlsAllocState->GetDebugInfo()).data());
  181. state->ReturnBlock(header, header->Capacity);
  182. if (header == state->CurrentPages[(TMemorySubPoolIdx)mPool]) {
  183. state->CurrentPages[(TMemorySubPoolIdx)mPool] = &TAllocState::EmptyPageHeader;
  184. }
  185. }
  186. void* TPagedArena::AllocSlow(const size_t sz, const EMemorySubPool mPool) {
  187. auto& currentPage = CurrentPages_[(TMemorySubPoolIdx)mPool];
  188. auto prevLink = currentPage;
  189. auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT);
  190. auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize);
  191. currentPage = (TAllocPageHeader*)PagePool_->GetBlock(capacity);
  192. currentPage->Capacity = capacity;
  193. void* ret = (char*)currentPage + sizeof(TAllocPageHeader);
  194. currentPage->Offset = roundedSize;
  195. currentPage->UseCount = 0;
  196. currentPage->MyAlloc = PagePool_;
  197. currentPage->Link = prevLink;
  198. return ret;
  199. }
  200. void TPagedArena::Clear() noexcept {
  201. for (auto&& i : CurrentPages_) {
  202. auto current = i;
  203. while (current != &TAllocState::EmptyPageHeader) {
  204. auto next = current->Link;
  205. PagePool_->ReturnBlock(current, current->Capacity);
  206. current = next;
  207. }
  208. i = &TAllocState::EmptyPageHeader;
  209. }
  210. }
  211. void* MKQLArrowAllocate(ui64 size) {
  212. TAllocState* state = TlsAllocState;
  213. Y_ENSURE(state);
  214. auto fullSize = size + sizeof(TMkqlArrowHeader);
  215. if (state->EnableArrowTracking) {
  216. state->OffloadAlloc(fullSize);
  217. }
  218. #ifdef PROFILE_MEMORY_ALLOCATIONS
  219. auto ptr = malloc(fullSize);
  220. if (!ptr) {
  221. throw TMemoryLimitExceededException();
  222. }
  223. #else
  224. auto ptr = GetAlignedPage(fullSize);
  225. #endif
  226. auto header = (TMkqlArrowHeader*)ptr;
  227. if (state->EnableArrowTracking) {
  228. header->Entry.Link(&state->ArrowBlocksRoot);
  229. Y_ENSURE(state->ArrowBuffers.insert(header + 1).second);
  230. } else {
  231. header->Entry.Clear();
  232. }
  233. header->Size = size;
  234. return header + 1;
  235. }
  236. void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
  237. auto res = MKQLArrowAllocate(size);
  238. memcpy(res, mem, Min(prevSize, size));
  239. MKQLArrowFree(mem, prevSize);
  240. return res;
  241. }
  242. void MKQLArrowFree(const void* mem, ui64 size) {
  243. auto fullSize = size + sizeof(TMkqlArrowHeader);
  244. auto header = ((TMkqlArrowHeader*)mem) - 1;
  245. if (!header->Entry.IsUnlinked()) {
  246. TAllocState* state = TlsAllocState;
  247. Y_ENSURE(state);
  248. state->OffloadFree(fullSize);
  249. header->Entry.Unlink();
  250. auto it = state->ArrowBuffers.find(mem);
  251. Y_ENSURE(it != state->ArrowBuffers.end());
  252. state->ArrowBuffers.erase(it);
  253. }
  254. Y_ENSURE(size == header->Size);
  255. #ifdef PROFILE_MEMORY_ALLOCATIONS
  256. free(header);
  257. #else
  258. ReleaseAlignedPage(header, fullSize);
  259. #endif
  260. }
  261. void MKQLArrowUntrack(const void* mem) {
  262. TAllocState* state = TlsAllocState;
  263. Y_ENSURE(state);
  264. if (!state->EnableArrowTracking) {
  265. return;
  266. }
  267. auto it = state->ArrowBuffers.find(mem);
  268. if (it == state->ArrowBuffers.end()) {
  269. return;
  270. }
  271. auto header = ((TMkqlArrowHeader*)mem) - 1;
  272. if (!header->Entry.IsUnlinked()) {
  273. header->Entry.Unlink();
  274. auto fullSize = header->Size + sizeof(TMkqlArrowHeader);
  275. state->OffloadFree(fullSize);
  276. state->ArrowBuffers.erase(it);
  277. }
  278. }
  279. } // NMiniKQL
  280. } // NKikimr