mkql_alloc.cpp 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. #include "mkql_alloc.h"
  2. #include <util/system/align.h>
  3. #include <yql/essentials/public/udf/udf_value.h>
  4. #include <tuple>
  5. namespace NKikimr {
  6. namespace NMiniKQL {
  7. Y_POD_THREAD(TAllocState*) TlsAllocState;
  8. TAllocPageHeader TAllocState::EmptyPageHeader = { 0, 0, 0, 0, nullptr, nullptr };
  9. TAllocState::TCurrentPages TAllocState::EmptyCurrentPages = { &TAllocState::EmptyPageHeader, &TAllocState::EmptyPageHeader };
  10. void TAllocState::TListEntry::Link(TAllocState::TListEntry* root) noexcept {
  11. Left = root;
  12. Right = root->Right;
  13. Right->Left = Left->Right = this;
  14. }
  15. void TAllocState::TListEntry::Unlink() noexcept {
  16. std::tie(Right->Left, Left->Right) = std::make_pair(Left, Right);
  17. Clear();
  18. }
  19. TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAlignedPagePoolCounters &counters, bool supportsSizedAllocators)
  20. : TAlignedPagePool(location, counters)
  21. , SupportsSizedAllocators(supportsSizedAllocators)
  22. , CurrentPAllocList(&GlobalPAllocList)
  23. {
  24. GetRoot()->InitLinks();
  25. OffloadedBlocksRoot.InitLinks();
  26. GlobalPAllocList.InitLinks();
  27. ArrowBlocksRoot.InitLinks();
  28. }
  29. void TAllocState::CleanupPAllocList(TListEntry* root) {
  30. for (auto curr = root->Right; curr != root; ) {
  31. auto next = curr->Right;
  32. auto size = ((TMkqlPAllocHeader*)curr)->Size;
  33. auto fullSize = size + sizeof(TMkqlPAllocHeader);
  34. MKQLFreeWithSize(curr, fullSize, EMemorySubPool::Default); // may free items from OffloadedBlocksRoot
  35. curr = next;
  36. }
  37. root->InitLinks();
  38. }
  39. void TAllocState::CleanupArrowList(TListEntry* root) {
  40. for (auto curr = root->Right; curr != root; ) {
  41. auto next = curr->Right;
  42. #if defined(ALLOW_DEFAULT_ALLOCATOR)
  43. if (Y_UNLIKELY(TAllocState::IsDefaultAllocatorUsed())) {
  44. free(curr);
  45. } else {
  46. #endif
  47. auto size = ((TMkqlArrowHeader*)curr)->Size;
  48. auto fullSize = size + sizeof(TMkqlArrowHeader);
  49. ReleaseAlignedPage(curr, fullSize);
  50. #if defined(ALLOW_DEFAULT_ALLOCATOR)
  51. }
  52. #endif
  53. curr = next;
  54. }
  55. root->InitLinks();
  56. }
  57. void TAllocState::KillAllBoxed() {
  58. {
  59. const auto root = GetRoot();
  60. for (auto next = root->GetRight(); next != root; next = root->GetLeft()) {
  61. next->Ref();
  62. next->~TBoxedValueLink();
  63. }
  64. GetRoot()->InitLinks();
  65. }
  66. {
  67. Y_ABORT_UNLESS(CurrentPAllocList == &GlobalPAllocList);
  68. CleanupPAllocList(&GlobalPAllocList);
  69. }
  70. {
  71. const auto root = &OffloadedBlocksRoot;
  72. for (auto curr = root->Right; curr != root; ) {
  73. auto next = curr->Right;
  74. free(curr);
  75. curr = next;
  76. }
  77. OffloadedBlocksRoot.InitLinks();
  78. }
  79. CleanupArrowList(&ArrowBlocksRoot);
  80. #ifndef NDEBUG
  81. ActiveMemInfo.clear();
  82. #endif
  83. }
  84. void TAllocState::InvalidateMemInfo() {
  85. #ifndef NDEBUG
  86. for (auto& pair : ActiveMemInfo) {
  87. pair.first->CheckOnExit(false);
  88. }
  89. #endif
  90. }
  91. size_t TAllocState::GetDeallocatedInPages() const {
  92. size_t deallocated = 0;
  93. for (auto x : AllPages) {
  94. auto currPage = (TAllocPageHeader*)x;
  95. if (currPage->UseCount) {
  96. deallocated += currPage->Deallocated;
  97. }
  98. }
  99. return deallocated;
  100. }
  101. void TAllocState::LockObject(::NKikimr::NUdf::TUnboxedValuePod value) {
  102. if (!UseRefLocking) {
  103. return;
  104. }
  105. void* obj;
  106. if (value.IsString()) {
  107. obj = value.AsStringRef().Data();
  108. } else if (value.IsBoxed()) {
  109. obj = value.AsBoxed().Get();
  110. } else {
  111. return;
  112. }
  113. auto [it, isNew] = LockedObjectsRefs.emplace(obj, TLockInfo{ 0, 0 });
  114. if (isNew) {
  115. it->second.OriginalRefs = value.LockRef();
  116. }
  117. ++it->second.Locks;
  118. }
  119. void TAllocState::UnlockObject(::NKikimr::NUdf::TUnboxedValuePod value) {
  120. if (!UseRefLocking) {
  121. return;
  122. }
  123. void* obj;
  124. if (value.IsString()) {
  125. obj = value.AsStringRef().Data();
  126. } else if (value.IsBoxed()) {
  127. obj = value.AsBoxed().Get();
  128. } else {
  129. return;
  130. }
  131. auto it = LockedObjectsRefs.find(obj);
  132. Y_ABORT_UNLESS(it != LockedObjectsRefs.end());
  133. if (--it->second.Locks == 0) {
  134. value.UnlockRef(it->second.OriginalRefs);
  135. LockedObjectsRefs.erase(it);
  136. }
  137. }
  138. void TScopedAlloc::Acquire() {
  139. if (!AttachedCount_) {
  140. if (PrevState_) {
  141. PgReleaseThreadContext(PrevState_->MainContext);
  142. }
  143. PrevState_ = TlsAllocState;
  144. TlsAllocState = &MyState_;
  145. PgAcquireThreadContext(MyState_.MainContext);
  146. } else {
  147. Y_ABORT_UNLESS(TlsAllocState == &MyState_, "Mismatch allocator in thread");
  148. }
  149. ++AttachedCount_;
  150. }
  151. void TScopedAlloc::Release() {
  152. if (AttachedCount_ && --AttachedCount_ == 0) {
  153. Y_ABORT_UNLESS(TlsAllocState == &MyState_, "Mismatch allocator in thread");
  154. PgReleaseThreadContext(MyState_.MainContext);
  155. TlsAllocState = PrevState_;
  156. if (PrevState_) {
  157. PgAcquireThreadContext(PrevState_->MainContext);
  158. }
  159. PrevState_ = nullptr;
  160. }
  161. }
  162. void* MKQLAllocSlow(size_t sz, TAllocState* state, const EMemorySubPool mPool) {
  163. auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT);
  164. auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize);
  165. auto currPage = (TAllocPageHeader*)state->GetBlock(capacity);
  166. currPage->Deallocated = 0;
  167. currPage->Capacity = capacity;
  168. currPage->Offset = roundedSize;
  169. auto& mPage = state->CurrentPages[(TMemorySubPoolIdx)mPool];
  170. auto newPageAvailable = capacity - roundedSize;
  171. auto curPageAvailable = mPage->Capacity - mPage->Offset;
  172. if (newPageAvailable > curPageAvailable) {
  173. mPage = currPage;
  174. }
  175. void* ret = (char*)currPage + sizeof(TAllocPageHeader);
  176. currPage->UseCount = 1;
  177. currPage->MyAlloc = state;
  178. currPage->Link = nullptr;
  179. return ret;
  180. }
  181. void MKQLFreeSlow(TAllocPageHeader* header, TAllocState *state, const EMemorySubPool mPool) noexcept {
  182. Y_DEBUG_ABORT_UNLESS(state);
  183. Y_DEBUG_ABORT_UNLESS(header->MyAlloc == state, "%s", (TStringBuilder() << "wrong allocator was used; "
  184. "allocated with: " << header->MyAlloc->GetDebugInfo() << " freed with: " << TlsAllocState->GetDebugInfo()).data());
  185. state->ReturnBlock(header, header->Capacity);
  186. if (header == state->CurrentPages[(TMemorySubPoolIdx)mPool]) {
  187. state->CurrentPages[(TMemorySubPoolIdx)mPool] = &TAllocState::EmptyPageHeader;
  188. }
  189. }
  190. void* TPagedArena::AllocSlow(const size_t sz, const EMemorySubPool mPool) {
  191. auto& currentPage = CurrentPages_[(TMemorySubPoolIdx)mPool];
  192. auto prevLink = currentPage;
  193. auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT);
  194. auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize);
  195. currentPage = (TAllocPageHeader*)PagePool_->GetBlock(capacity);
  196. currentPage->Capacity = capacity;
  197. void* ret = (char*)currentPage + sizeof(TAllocPageHeader);
  198. currentPage->Offset = roundedSize;
  199. currentPage->UseCount = 0;
  200. currentPage->MyAlloc = PagePool_;
  201. currentPage->Link = prevLink;
  202. return ret;
  203. }
  204. void TPagedArena::Clear() noexcept {
  205. for (auto&& i : CurrentPages_) {
  206. auto current = i;
  207. while (current != &TAllocState::EmptyPageHeader) {
  208. auto next = current->Link;
  209. PagePool_->ReturnBlock(current, current->Capacity);
  210. current = next;
  211. }
  212. i = &TAllocState::EmptyPageHeader;
  213. }
  214. }
  215. void* MKQLArrowAllocate(ui64 size) {
  216. TAllocState* state = TlsAllocState;
  217. Y_ENSURE(state);
  218. auto fullSize = size + sizeof(TMkqlArrowHeader);
  219. if (state->EnableArrowTracking) {
  220. state->OffloadAlloc(fullSize);
  221. }
  222. void* ptr;
  223. #if defined(ALLOW_DEFAULT_ALLOCATOR)
  224. if (Y_UNLIKELY(TAllocState::IsDefaultAllocatorUsed())) {
  225. ptr = malloc(fullSize);
  226. if (!ptr) {
  227. throw TMemoryLimitExceededException();
  228. }
  229. } else {
  230. #endif
  231. ptr = GetAlignedPage(fullSize);
  232. #if defined(ALLOW_DEFAULT_ALLOCATOR)
  233. }
  234. #endif
  235. auto* header = (TMkqlArrowHeader*)ptr;
  236. if (state->EnableArrowTracking) {
  237. header->Entry.Link(&state->ArrowBlocksRoot);
  238. Y_ENSURE(state->ArrowBuffers.insert(header + 1).second);
  239. } else {
  240. header->Entry.Clear();
  241. }
  242. header->Size = size;
  243. return header + 1;
  244. }
  245. void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
  246. auto res = MKQLArrowAllocate(size);
  247. memcpy(res, mem, Min(prevSize, size));
  248. MKQLArrowFree(mem, prevSize);
  249. return res;
  250. }
  251. void MKQLArrowFree(const void* mem, ui64 size) {
  252. auto fullSize = size + sizeof(TMkqlArrowHeader);
  253. auto header = ((TMkqlArrowHeader*)mem) - 1;
  254. if (!header->Entry.IsUnlinked()) {
  255. TAllocState* state = TlsAllocState;
  256. Y_ENSURE(state);
  257. state->OffloadFree(fullSize);
  258. header->Entry.Unlink();
  259. auto it = state->ArrowBuffers.find(mem);
  260. Y_ENSURE(it != state->ArrowBuffers.end());
  261. state->ArrowBuffers.erase(it);
  262. }
  263. Y_ENSURE(size == header->Size);
  264. #if defined(ALLOW_MEMORY_ALLOCATOR)
  265. if (Y_UNLIKELY(TAllocState::IsDefaultAllocatorUsed())) {
  266. free(header);
  267. return;
  268. }
  269. #endif
  270. ReleaseAlignedPage(header, fullSize);
  271. }
  272. void MKQLArrowUntrack(const void* mem) {
  273. TAllocState* state = TlsAllocState;
  274. Y_ENSURE(state);
  275. if (!state->EnableArrowTracking) {
  276. return;
  277. }
  278. auto it = state->ArrowBuffers.find(mem);
  279. if (it == state->ArrowBuffers.end()) {
  280. return;
  281. }
  282. auto header = ((TMkqlArrowHeader*)mem) - 1;
  283. if (!header->Entry.IsUnlinked()) {
  284. header->Entry.Unlink();
  285. auto fullSize = header->Size + sizeof(TMkqlArrowHeader);
  286. state->OffloadFree(fullSize);
  287. state->ArrowBuffers.erase(it);
  288. }
  289. }
  290. } // NMiniKQL
  291. } // NKikimr