mkql_alloc.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. #include "mkql_alloc.h"
  2. #include <util/system/align.h>
  3. #include <yql/essentials/public/udf/udf_value.h>
  4. #include <tuple>
  5. namespace NKikimr {
  6. namespace NMiniKQL {
  7. constexpr ui64 ArrowSizeForArena = (TAllocState::POOL_PAGE_SIZE >> 2);
  8. Y_POD_THREAD(TAllocState*) TlsAllocState;
  9. TAllocPageHeader TAllocState::EmptyPageHeader = { 0, 0, 0, 0, nullptr, nullptr };
  10. TAllocState::TCurrentPages TAllocState::EmptyCurrentPages = { &TAllocState::EmptyPageHeader, &TAllocState::EmptyPageHeader };
  11. void TAllocState::TListEntry::Link(TAllocState::TListEntry* root) noexcept {
  12. Left = root;
  13. Right = root->Right;
  14. Right->Left = Left->Right = this;
  15. }
  16. void TAllocState::TListEntry::Unlink() noexcept {
  17. std::tie(Right->Left, Left->Right) = std::make_pair(Left, Right);
  18. Clear();
  19. }
  20. TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAlignedPagePoolCounters &counters, bool supportsSizedAllocators)
  21. : TAlignedPagePool(location, counters)
  22. , SupportsSizedAllocators(supportsSizedAllocators)
  23. , CurrentPAllocList(&GlobalPAllocList)
  24. {
  25. GetRoot()->InitLinks();
  26. OffloadedBlocksRoot.InitLinks();
  27. GlobalPAllocList.InitLinks();
  28. ArrowBlocksRoot.InitLinks();
  29. }
  30. void TAllocState::CleanupPAllocList(TListEntry* root) {
  31. for (auto curr = root->Right; curr != root; ) {
  32. auto next = curr->Right;
  33. auto size = ((TMkqlPAllocHeader*)curr)->Size;
  34. auto fullSize = size + sizeof(TMkqlPAllocHeader);
  35. MKQLFreeWithSize(curr, fullSize, EMemorySubPool::Default); // may free items from OffloadedBlocksRoot
  36. curr = next;
  37. }
  38. root->InitLinks();
  39. }
  40. void TAllocState::CleanupArrowList(TListEntry* root) {
  41. for (auto curr = root->Right; curr != root; ) {
  42. auto next = curr->Right;
  43. #if defined(ALLOW_DEFAULT_ALLOCATOR)
  44. if (Y_UNLIKELY(TAllocState::IsDefaultAllocatorUsed())) {
  45. free(curr);
  46. } else {
  47. #endif
  48. auto size = ((TMkqlArrowHeader*)curr)->Size;
  49. auto fullSize = size + sizeof(TMkqlArrowHeader);
  50. ReleaseAlignedPage(curr, fullSize);
  51. #if defined(ALLOW_DEFAULT_ALLOCATOR)
  52. }
  53. #endif
  54. curr = next;
  55. }
  56. root->InitLinks();
  57. }
  58. void TAllocState::KillAllBoxed() {
  59. {
  60. const auto root = GetRoot();
  61. for (auto next = root->GetRight(); next != root; next = root->GetLeft()) {
  62. next->Ref();
  63. next->~TBoxedValueLink();
  64. }
  65. GetRoot()->InitLinks();
  66. }
  67. {
  68. Y_ABORT_UNLESS(CurrentPAllocList == &GlobalPAllocList);
  69. CleanupPAllocList(&GlobalPAllocList);
  70. }
  71. {
  72. const auto root = &OffloadedBlocksRoot;
  73. for (auto curr = root->Right; curr != root; ) {
  74. auto next = curr->Right;
  75. free(curr);
  76. curr = next;
  77. }
  78. OffloadedBlocksRoot.InitLinks();
  79. }
  80. if (CurrentArrowPages) {
  81. MKQLArrowFree(CurrentArrowPages, 0);
  82. CurrentArrowPages = nullptr;
  83. }
  84. CleanupArrowList(&ArrowBlocksRoot);
  85. #ifndef NDEBUG
  86. ActiveMemInfo.clear();
  87. #endif
  88. }
  89. void TAllocState::InvalidateMemInfo() {
  90. #ifndef NDEBUG
  91. for (auto& pair : ActiveMemInfo) {
  92. pair.first->CheckOnExit(false);
  93. }
  94. #endif
  95. }
  96. size_t TAllocState::GetDeallocatedInPages() const {
  97. size_t deallocated = 0;
  98. for (auto x : AllPages) {
  99. auto currPage = (TAllocPageHeader*)x;
  100. if (currPage->UseCount) {
  101. deallocated += currPage->Deallocated;
  102. }
  103. }
  104. return deallocated;
  105. }
  106. void TAllocState::LockObject(::NKikimr::NUdf::TUnboxedValuePod value) {
  107. if (!UseRefLocking) {
  108. return;
  109. }
  110. void* obj;
  111. if (value.IsString()) {
  112. obj = value.AsStringRef().Data();
  113. } else if (value.IsBoxed()) {
  114. obj = value.AsBoxed().Get();
  115. } else {
  116. return;
  117. }
  118. auto [it, isNew] = LockedObjectsRefs.emplace(obj, TLockInfo{ 0, 0 });
  119. if (isNew) {
  120. it->second.OriginalRefs = value.LockRef();
  121. }
  122. ++it->second.Locks;
  123. }
  124. void TAllocState::UnlockObject(::NKikimr::NUdf::TUnboxedValuePod value) {
  125. if (!UseRefLocking) {
  126. return;
  127. }
  128. void* obj;
  129. if (value.IsString()) {
  130. obj = value.AsStringRef().Data();
  131. } else if (value.IsBoxed()) {
  132. obj = value.AsBoxed().Get();
  133. } else {
  134. return;
  135. }
  136. auto it = LockedObjectsRefs.find(obj);
  137. Y_ABORT_UNLESS(it != LockedObjectsRefs.end());
  138. if (--it->second.Locks == 0) {
  139. value.UnlockRef(it->second.OriginalRefs);
  140. LockedObjectsRefs.erase(it);
  141. }
  142. }
  143. void TScopedAlloc::Acquire() {
  144. if (!AttachedCount_) {
  145. if (PrevState_) {
  146. PgReleaseThreadContext(PrevState_->MainContext);
  147. }
  148. PrevState_ = TlsAllocState;
  149. TlsAllocState = &MyState_;
  150. PgAcquireThreadContext(MyState_.MainContext);
  151. } else {
  152. Y_ABORT_UNLESS(TlsAllocState == &MyState_, "Mismatch allocator in thread");
  153. }
  154. ++AttachedCount_;
  155. }
  156. void TScopedAlloc::Release() {
  157. if (AttachedCount_ && --AttachedCount_ == 0) {
  158. Y_ABORT_UNLESS(TlsAllocState == &MyState_, "Mismatch allocator in thread");
  159. PgReleaseThreadContext(MyState_.MainContext);
  160. TlsAllocState = PrevState_;
  161. if (PrevState_) {
  162. PgAcquireThreadContext(PrevState_->MainContext);
  163. }
  164. PrevState_ = nullptr;
  165. }
  166. }
  167. void* MKQLAllocSlow(size_t sz, TAllocState* state, const EMemorySubPool mPool) {
  168. auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT);
  169. auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize);
  170. auto currPage = (TAllocPageHeader*)state->GetBlock(capacity);
  171. currPage->Deallocated = 0;
  172. currPage->Capacity = capacity;
  173. currPage->Offset = roundedSize;
  174. auto& mPage = state->CurrentPages[(TMemorySubPoolIdx)mPool];
  175. auto newPageAvailable = capacity - roundedSize;
  176. auto curPageAvailable = mPage->Capacity - mPage->Offset;
  177. if (newPageAvailable > curPageAvailable) {
  178. mPage = currPage;
  179. }
  180. void* ret = (char*)currPage + sizeof(TAllocPageHeader);
  181. currPage->UseCount = 1;
  182. currPage->MyAlloc = state;
  183. currPage->Link = nullptr;
  184. return ret;
  185. }
  186. void MKQLFreeSlow(TAllocPageHeader* header, TAllocState *state, const EMemorySubPool mPool) noexcept {
  187. Y_DEBUG_ABORT_UNLESS(state);
  188. Y_DEBUG_ABORT_UNLESS(header->MyAlloc == state, "%s", (TStringBuilder() << "wrong allocator was used; "
  189. "allocated with: " << header->MyAlloc->GetDebugInfo() << " freed with: " << TlsAllocState->GetDebugInfo()).data());
  190. state->ReturnBlock(header, header->Capacity);
  191. if (header == state->CurrentPages[(TMemorySubPoolIdx)mPool]) {
  192. state->CurrentPages[(TMemorySubPoolIdx)mPool] = &TAllocState::EmptyPageHeader;
  193. }
  194. }
  195. void* TPagedArena::AllocSlow(const size_t sz, const EMemorySubPool mPool) {
  196. auto& currentPage = CurrentPages_[(TMemorySubPoolIdx)mPool];
  197. auto prevLink = currentPage;
  198. auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT);
  199. auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize);
  200. currentPage = (TAllocPageHeader*)PagePool_->GetBlock(capacity);
  201. currentPage->Capacity = capacity;
  202. void* ret = (char*)currentPage + sizeof(TAllocPageHeader);
  203. currentPage->Offset = roundedSize;
  204. currentPage->UseCount = 0;
  205. currentPage->MyAlloc = PagePool_;
  206. currentPage->Link = prevLink;
  207. return ret;
  208. }
  209. void TPagedArena::Clear() noexcept {
  210. for (auto&& i : CurrentPages_) {
  211. auto current = i;
  212. while (current != &TAllocState::EmptyPageHeader) {
  213. auto next = current->Link;
  214. PagePool_->ReturnBlock(current, current->Capacity);
  215. current = next;
  216. }
  217. i = &TAllocState::EmptyPageHeader;
  218. }
  219. }
  220. void* MKQLArrowAllocateOnArena(ui64 size) {
  221. TAllocState* state = TlsAllocState;
  222. Y_ENSURE(state);
  223. auto alignedSize = AlignUp(size, ArrowAlignment);
  224. auto& page = state->CurrentArrowPages;
  225. if (Y_UNLIKELY(!page || page->Offset + alignedSize > page->Size)) {
  226. const auto pageSize = TAllocState::POOL_PAGE_SIZE;
  227. if (state->EnableArrowTracking) {
  228. state->OffloadAlloc(pageSize);
  229. }
  230. if (page) {
  231. MKQLArrowFree(page, 0);
  232. }
  233. page = (TMkqlArrowHeader*)GetAlignedPage();
  234. page->Offset = sizeof(TMkqlArrowHeader);
  235. page->Size = pageSize;
  236. page->UseCount = 1;
  237. if (state->EnableArrowTracking) {
  238. page->Entry.Link(&state->ArrowBlocksRoot);
  239. Y_ENSURE(state->ArrowBuffers.insert(page).second);
  240. } else {
  241. page->Entry.Clear();
  242. }
  243. }
  244. void* ptr = (ui8*)page + page->Offset;
  245. page->Offset += alignedSize;
  246. ++page->UseCount;
  247. return ptr;
  248. }
  249. void* MKQLArrowAllocate(ui64 size) {
  250. if (size <= ArrowSizeForArena) {
  251. return MKQLArrowAllocateOnArena(size);
  252. }
  253. TAllocState* state = TlsAllocState;
  254. Y_ENSURE(state);
  255. auto fullSize = size + sizeof(TMkqlArrowHeader);
  256. if (state->EnableArrowTracking) {
  257. state->OffloadAlloc(fullSize);
  258. }
  259. void* ptr;
  260. #if defined(ALLOW_DEFAULT_ALLOCATOR)
  261. if (Y_UNLIKELY(TAllocState::IsDefaultAllocatorUsed())) {
  262. ptr = malloc(fullSize);
  263. if (!ptr) {
  264. throw TMemoryLimitExceededException();
  265. }
  266. } else {
  267. #endif
  268. ptr = GetAlignedPage(fullSize);
  269. #if defined(ALLOW_DEFAULT_ALLOCATOR)
  270. }
  271. #endif
  272. auto* header = (TMkqlArrowHeader*)ptr;
  273. header->Offset = 0;
  274. header->UseCount = 0;
  275. if (state->EnableArrowTracking) {
  276. header->Entry.Link(&state->ArrowBlocksRoot);
  277. Y_ENSURE(state->ArrowBuffers.insert(header + 1).second);
  278. } else {
  279. header->Entry.Clear();
  280. }
  281. header->Size = size;
  282. return header + 1;
  283. }
  284. void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
  285. auto res = MKQLArrowAllocate(size);
  286. memcpy(res, mem, Min(prevSize, size));
  287. MKQLArrowFree(mem, prevSize);
  288. return res;
  289. }
  290. void MKQLArrowFreeOnArena(const void* ptr) {
  291. auto* page = (TMkqlArrowHeader*)TAllocState::GetPageStart(ptr);
  292. if (page->UseCount.fetch_sub(1) == 1) {
  293. if (!page->Entry.IsUnlinked()) {
  294. TAllocState* state = TlsAllocState;
  295. Y_ENSURE(state);
  296. state->OffloadFree(page->Size);
  297. page->Entry.Unlink();
  298. auto it = state->ArrowBuffers.find(page);
  299. Y_ENSURE(it != state->ArrowBuffers.end());
  300. state->ArrowBuffers.erase(it);
  301. }
  302. ReleaseAlignedPage(page);
  303. }
  304. return;
  305. }
  306. void MKQLArrowFree(const void* mem, ui64 size) {
  307. if (size <= ArrowSizeForArena) {
  308. return MKQLArrowFreeOnArena(mem);
  309. }
  310. auto fullSize = size + sizeof(TMkqlArrowHeader);
  311. auto header = ((TMkqlArrowHeader*)mem) - 1;
  312. if (!header->Entry.IsUnlinked()) {
  313. TAllocState* state = TlsAllocState;
  314. Y_ENSURE(state);
  315. state->OffloadFree(fullSize);
  316. header->Entry.Unlink();
  317. auto it = state->ArrowBuffers.find(mem);
  318. Y_ENSURE(it != state->ArrowBuffers.end());
  319. state->ArrowBuffers.erase(it);
  320. }
  321. Y_ENSURE(size == header->Size);
  322. #if defined(ALLOW_DEFAULT_ALLOCATOR)
  323. if (Y_UNLIKELY(TAllocState::IsDefaultAllocatorUsed())) {
  324. free(header);
  325. return;
  326. }
  327. #endif
  328. ReleaseAlignedPage(header, fullSize);
  329. }
  330. void MKQLArrowUntrack(const void* mem, ui64 size) {
  331. TAllocState* state = TlsAllocState;
  332. Y_ENSURE(state);
  333. if (!state->EnableArrowTracking) {
  334. return;
  335. }
  336. if (size <= ArrowSizeForArena) {
  337. auto* page = (TMkqlArrowHeader*)TAllocState::GetPageStart(mem);
  338. auto it = state->ArrowBuffers.find(page);
  339. if (it == state->ArrowBuffers.end()) {
  340. return;
  341. }
  342. if (!page->Entry.IsUnlinked()) {
  343. page->Entry.Unlink(); // unlink page immediately so we don't accidentally free untracked memory within `TAllocState`
  344. state->ArrowBuffers.erase(it);
  345. state->OffloadFree(page->Size);
  346. }
  347. return;
  348. }
  349. auto it = state->ArrowBuffers.find(mem);
  350. if (it == state->ArrowBuffers.end()) {
  351. return;
  352. }
  353. auto* header = ((TMkqlArrowHeader*)mem) - 1;
  354. Y_ENSURE(header->UseCount == 0);
  355. if (!header->Entry.IsUnlinked()) {
  356. header->Entry.Unlink();
  357. auto fullSize = header->Size + sizeof(TMkqlArrowHeader);
  358. state->OffloadFree(fullSize);
  359. state->ArrowBuffers.erase(it);
  360. }
  361. }
  362. } // NMiniKQL
  363. } // NKikimr