123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- #include "mkql_alloc.h"
- #include <util/system/align.h>
- #include <yql/essentials/public/udf/udf_value.h>
- #include <tuple>
- namespace NKikimr {
- namespace NMiniKQL {
- Y_POD_THREAD(TAllocState*) TlsAllocState;
- TAllocPageHeader TAllocState::EmptyPageHeader = { 0, 0, 0, 0, nullptr, nullptr };
- TAllocState::TCurrentPages TAllocState::EmptyCurrentPages = { &TAllocState::EmptyPageHeader, &TAllocState::EmptyPageHeader };
- void TAllocState::TListEntry::Link(TAllocState::TListEntry* root) noexcept {
- Left = root;
- Right = root->Right;
- Right->Left = Left->Right = this;
- }
- void TAllocState::TListEntry::Unlink() noexcept {
- std::tie(Right->Left, Left->Right) = std::make_pair(Left, Right);
- Clear();
- }
- TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAlignedPagePoolCounters &counters, bool supportsSizedAllocators)
- : TAlignedPagePool(location, counters)
- , SupportsSizedAllocators(supportsSizedAllocators)
- , CurrentPAllocList(&GlobalPAllocList)
- {
- GetRoot()->InitLinks();
- OffloadedBlocksRoot.InitLinks();
- GlobalPAllocList.InitLinks();
- ArrowBlocksRoot.InitLinks();
- }
- void TAllocState::CleanupPAllocList(TListEntry* root) {
- for (auto curr = root->Right; curr != root; ) {
- auto next = curr->Right;
- auto size = ((TMkqlPAllocHeader*)curr)->Size;
- auto fullSize = size + sizeof(TMkqlPAllocHeader);
- MKQLFreeWithSize(curr, fullSize, EMemorySubPool::Default); // may free items from OffloadedBlocksRoot
- curr = next;
- }
- root->InitLinks();
- }
- void TAllocState::CleanupArrowList(TListEntry* root) {
- for (auto curr = root->Right; curr != root; ) {
- auto next = curr->Right;
- #if defined(ALLOW_DEFAULT_ALLOCATOR)
- if (Y_UNLIKELY(TAllocState::IsDefaultAllocatorUsed())) {
- free(curr);
- } else {
- #endif
- auto size = ((TMkqlArrowHeader*)curr)->Size;
- auto fullSize = size + sizeof(TMkqlArrowHeader);
- ReleaseAlignedPage(curr, fullSize);
- #if defined(ALLOW_DEFAULT_ALLOCATOR)
- }
- #endif
- curr = next;
- }
- root->InitLinks();
- }
- void TAllocState::KillAllBoxed() {
- {
- const auto root = GetRoot();
- for (auto next = root->GetRight(); next != root; next = root->GetLeft()) {
- next->Ref();
- next->~TBoxedValueLink();
- }
- GetRoot()->InitLinks();
- }
- {
- Y_ABORT_UNLESS(CurrentPAllocList == &GlobalPAllocList);
- CleanupPAllocList(&GlobalPAllocList);
- }
- {
- const auto root = &OffloadedBlocksRoot;
- for (auto curr = root->Right; curr != root; ) {
- auto next = curr->Right;
- free(curr);
- curr = next;
- }
- OffloadedBlocksRoot.InitLinks();
- }
- CleanupArrowList(&ArrowBlocksRoot);
- #ifndef NDEBUG
- ActiveMemInfo.clear();
- #endif
- }
- void TAllocState::InvalidateMemInfo() {
- #ifndef NDEBUG
- for (auto& pair : ActiveMemInfo) {
- pair.first->CheckOnExit(false);
- }
- #endif
- }
- size_t TAllocState::GetDeallocatedInPages() const {
- size_t deallocated = 0;
- for (auto x : AllPages) {
- auto currPage = (TAllocPageHeader*)x;
- if (currPage->UseCount) {
- deallocated += currPage->Deallocated;
- }
- }
- return deallocated;
- }
- void TAllocState::LockObject(::NKikimr::NUdf::TUnboxedValuePod value) {
- if (!UseRefLocking) {
- return;
- }
- void* obj;
- if (value.IsString()) {
- obj = value.AsStringRef().Data();
- } else if (value.IsBoxed()) {
- obj = value.AsBoxed().Get();
- } else {
- return;
- }
- auto [it, isNew] = LockedObjectsRefs.emplace(obj, TLockInfo{ 0, 0 });
- if (isNew) {
- it->second.OriginalRefs = value.LockRef();
- }
- ++it->second.Locks;
- }
- void TAllocState::UnlockObject(::NKikimr::NUdf::TUnboxedValuePod value) {
- if (!UseRefLocking) {
- return;
- }
- void* obj;
- if (value.IsString()) {
- obj = value.AsStringRef().Data();
- } else if (value.IsBoxed()) {
- obj = value.AsBoxed().Get();
- } else {
- return;
- }
- auto it = LockedObjectsRefs.find(obj);
- Y_ABORT_UNLESS(it != LockedObjectsRefs.end());
- if (--it->second.Locks == 0) {
- value.UnlockRef(it->second.OriginalRefs);
- LockedObjectsRefs.erase(it);
- }
- }
- void TScopedAlloc::Acquire() {
- if (!AttachedCount_) {
- if (PrevState_) {
- PgReleaseThreadContext(PrevState_->MainContext);
- }
- PrevState_ = TlsAllocState;
- TlsAllocState = &MyState_;
- PgAcquireThreadContext(MyState_.MainContext);
- } else {
- Y_ABORT_UNLESS(TlsAllocState == &MyState_, "Mismatch allocator in thread");
- }
- ++AttachedCount_;
- }
- void TScopedAlloc::Release() {
- if (AttachedCount_ && --AttachedCount_ == 0) {
- Y_ABORT_UNLESS(TlsAllocState == &MyState_, "Mismatch allocator in thread");
- PgReleaseThreadContext(MyState_.MainContext);
- TlsAllocState = PrevState_;
- if (PrevState_) {
- PgAcquireThreadContext(PrevState_->MainContext);
- }
- PrevState_ = nullptr;
- }
- }
- void* MKQLAllocSlow(size_t sz, TAllocState* state, const EMemorySubPool mPool) {
- auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT);
- auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize);
- auto currPage = (TAllocPageHeader*)state->GetBlock(capacity);
- currPage->Deallocated = 0;
- currPage->Capacity = capacity;
- currPage->Offset = roundedSize;
- auto& mPage = state->CurrentPages[(TMemorySubPoolIdx)mPool];
- auto newPageAvailable = capacity - roundedSize;
- auto curPageAvailable = mPage->Capacity - mPage->Offset;
- if (newPageAvailable > curPageAvailable) {
- mPage = currPage;
- }
- void* ret = (char*)currPage + sizeof(TAllocPageHeader);
- currPage->UseCount = 1;
- currPage->MyAlloc = state;
- currPage->Link = nullptr;
- return ret;
- }
- void MKQLFreeSlow(TAllocPageHeader* header, TAllocState *state, const EMemorySubPool mPool) noexcept {
- Y_DEBUG_ABORT_UNLESS(state);
- Y_DEBUG_ABORT_UNLESS(header->MyAlloc == state, "%s", (TStringBuilder() << "wrong allocator was used; "
- "allocated with: " << header->MyAlloc->GetDebugInfo() << " freed with: " << TlsAllocState->GetDebugInfo()).data());
- state->ReturnBlock(header, header->Capacity);
- if (header == state->CurrentPages[(TMemorySubPoolIdx)mPool]) {
- state->CurrentPages[(TMemorySubPoolIdx)mPool] = &TAllocState::EmptyPageHeader;
- }
- }
- void* TPagedArena::AllocSlow(const size_t sz, const EMemorySubPool mPool) {
- auto& currentPage = CurrentPages_[(TMemorySubPoolIdx)mPool];
- auto prevLink = currentPage;
- auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT);
- auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize);
- currentPage = (TAllocPageHeader*)PagePool_->GetBlock(capacity);
- currentPage->Capacity = capacity;
- void* ret = (char*)currentPage + sizeof(TAllocPageHeader);
- currentPage->Offset = roundedSize;
- currentPage->UseCount = 0;
- currentPage->MyAlloc = PagePool_;
- currentPage->Link = prevLink;
- return ret;
- }
- void TPagedArena::Clear() noexcept {
- for (auto&& i : CurrentPages_) {
- auto current = i;
- while (current != &TAllocState::EmptyPageHeader) {
- auto next = current->Link;
- PagePool_->ReturnBlock(current, current->Capacity);
- current = next;
- }
- i = &TAllocState::EmptyPageHeader;
- }
- }
- void* MKQLArrowAllocate(ui64 size) {
- TAllocState* state = TlsAllocState;
- Y_ENSURE(state);
- auto fullSize = size + sizeof(TMkqlArrowHeader);
- if (state->EnableArrowTracking) {
- state->OffloadAlloc(fullSize);
- }
- void* ptr;
- #if defined(ALLOW_DEFAULT_ALLOCATOR)
- if (Y_UNLIKELY(TAllocState::IsDefaultAllocatorUsed())) {
- ptr = malloc(fullSize);
- if (!ptr) {
- throw TMemoryLimitExceededException();
- }
- } else {
- #endif
- ptr = GetAlignedPage(fullSize);
- #if defined(ALLOW_DEFAULT_ALLOCATOR)
- }
- #endif
- auto* header = (TMkqlArrowHeader*)ptr;
- if (state->EnableArrowTracking) {
- header->Entry.Link(&state->ArrowBlocksRoot);
- Y_ENSURE(state->ArrowBuffers.insert(header + 1).second);
- } else {
- header->Entry.Clear();
- }
- header->Size = size;
- return header + 1;
- }
- void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
- auto res = MKQLArrowAllocate(size);
- memcpy(res, mem, Min(prevSize, size));
- MKQLArrowFree(mem, prevSize);
- return res;
- }
- void MKQLArrowFree(const void* mem, ui64 size) {
- auto fullSize = size + sizeof(TMkqlArrowHeader);
- auto header = ((TMkqlArrowHeader*)mem) - 1;
- if (!header->Entry.IsUnlinked()) {
- TAllocState* state = TlsAllocState;
- Y_ENSURE(state);
- state->OffloadFree(fullSize);
- header->Entry.Unlink();
- auto it = state->ArrowBuffers.find(mem);
- Y_ENSURE(it != state->ArrowBuffers.end());
- state->ArrowBuffers.erase(it);
- }
- Y_ENSURE(size == header->Size);
- #if defined(ALLOW_MEMORY_ALLOCATOR)
- if (Y_UNLIKELY(TAllocState::IsDefaultAllocatorUsed())) {
- free(header);
- return;
- }
- #endif
- ReleaseAlignedPage(header, fullSize);
- }
- void MKQLArrowUntrack(const void* mem) {
- TAllocState* state = TlsAllocState;
- Y_ENSURE(state);
- if (!state->EnableArrowTracking) {
- return;
- }
- auto it = state->ArrowBuffers.find(mem);
- if (it == state->ArrowBuffers.end()) {
- return;
- }
- auto header = ((TMkqlArrowHeader*)mem) - 1;
- if (!header->Entry.IsUnlinked()) {
- header->Entry.Unlink();
- auto fullSize = header->Size + sizeof(TMkqlArrowHeader);
- state->OffloadFree(fullSize);
- state->ArrowBuffers.erase(it);
- }
- }
- } // NMiniKQL
- } // NKikimr
|