#include "aligned_page_pool.h" #include #include #include #include #include #include #include #include #include #if defined(_win_) # include #elif defined(_unix_) # include # include #endif namespace NKikimr { #if defined(ALLOW_DEFAULT_ALLOCATOR) # if defined(PROFILE_MEMORY_ALLOCATIONS) static bool IsDefaultAllocator = true; # else static bool IsDefaultAllocator = false; # endif void UseDefaultAllocator() { // TODO: check that we didn't already used the MKQL allocator IsDefaultAllocator = true; } #endif static ui64 SYS_PAGE_SIZE = NSystemInfo::GetPageSize(); constexpr ui32 MidLevels = 10; constexpr ui32 MaxMidSize = (1u << MidLevels) * TAlignedPagePool::POOL_PAGE_SIZE; static_assert(MaxMidSize == 64 * 1024 * 1024, "Upper memory block 64 Mb"); namespace { ui64 GetMaxMemoryMaps() { ui64 maxMapCount = 0; #if defined(_unix_) maxMapCount = FromString(Strip(TFileInput("/proc/sys/vm/max_map_count").ReadAll())); #endif return maxMapCount; } TString GetMemoryMapsString() { TStringStream ss; ss << " (maps: " << GetMemoryMapsCount() << " vs " << GetMaxMemoryMaps() << ")"; return ss.Str(); } template class TGlobalPools; template class TGlobalPagePool { friend class TGlobalPools; public: TGlobalPagePool(size_t pageSize) : PageSize(pageSize) {} ~TGlobalPagePool() { void* addr = nullptr; while (Pages.Dequeue(&addr)) { FreePage(addr); } } void* GetPage() { void *page = nullptr; if (Pages.Dequeue(&page)) { --Count; return page; } return nullptr; } ui64 GetPageCount() const { return Count.load(std::memory_order_relaxed); } size_t GetPageSize() const { return PageSize; } size_t GetSize() const { return GetPageCount() * GetPageSize(); } private: size_t PushPage(void* addr) { #if defined(ALLOW_DEFAULT_ALLOCATOR) if (Y_UNLIKELY(IsDefaultAllocator)) { FreePage(addr); return GetPageSize(); } #endif ++Count; Pages.Enqueue(addr); return 0; } void FreePage(void* addr) { auto res = T::Munmap(addr, PageSize); Y_DEBUG_ABORT_UNLESS(0 == res, "Munmap failed: %s", LastSystemErrorText()); } private: const size_t PageSize; std::atomic Count = 0; TLockFreeStack Pages; }; template class TGlobalPools { public: static TGlobalPools& Instance() { return *Singleton>(); } TGlobalPagePool& Get(ui32 index) { return *Pools[index]; } const TGlobalPagePool& Get(ui32 index) const { return *Pools[index]; } TGlobalPools() { Reset(); } void* DoMmap(size_t size) { #if defined(ALLOW_DEFAULT_ALLOCATOR) // No memory maps allowed while using default allocator Y_DEBUG_ABORT_UNLESS(!IsDefaultAllocator); #endif void* res = T::Mmap(size); TotalMmappedBytes += size; return res; } void DoCleanupFreeList(ui64 targetSize) { for(ui32 level = 0; level <= MidLevels; ++level) { auto& p = Get(level); const size_t pageSize = p.GetPageSize(); while(p.GetSize() >= targetSize) { void* page = p.GetPage(); if (!page) break; p.FreePage(page); i64 prev = TotalMmappedBytes.fetch_sub(pageSize); Y_DEBUG_ABORT_UNLESS(prev >= 0); } } } void PushPage(size_t level, void* addr) { auto& pool = Get(level); size_t free = pool.PushPage(addr); if (Y_UNLIKELY(free > 0)) { i64 prev = TotalMmappedBytes.fetch_sub(free); Y_DEBUG_ABORT_UNLESS(prev >= 0); } } void DoMunmap(void* addr, size_t size) { if (Y_UNLIKELY(0 != T::Munmap(addr, size))) { TStringStream mmaps; const auto lastError = LastSystemError(); if (lastError == ENOMEM) { mmaps << GetMemoryMapsString(); } ythrow yexception() << "Munmap(0x" << IntToString<16>(reinterpret_cast(addr)) << ", " << size << ") failed: " << LastSystemErrorText(lastError) << mmaps.Str(); } i64 prev = TotalMmappedBytes.fetch_sub(size); Y_DEBUG_ABORT_UNLESS(prev >= 0); } i64 GetTotalMmappedBytes() const { return TotalMmappedBytes.load(); } i64 GetTotalFreeListBytes() const { i64 bytes = 0; for (ui32 i = 0; i <= MidLevels; ++i) { bytes += Get(i).GetSize(); } return bytes; } void Reset() { Pools.clear(); Pools.reserve(MidLevels + 1); for (ui32 i = 0; i <= MidLevels; ++i) { Pools.emplace_back(MakeHolder>(TAlignedPagePool::POOL_PAGE_SIZE << i)); } } private: TVector>> Pools; std::atomic TotalMmappedBytes{0}; }; } // unnamed #ifdef _win_ #define MAP_FAILED (void*)(-1) inline void* TSystemMmap::Mmap(size_t size) { if (auto res = ::VirtualAlloc(0, size, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE)) { return res; } else { return MAP_FAILED; } } inline int TSystemMmap::Munmap(void* addr, size_t size) { Y_ABORT_UNLESS(AlignUp(addr, SYS_PAGE_SIZE) == addr, "Got unaligned address"); Y_ABORT_UNLESS(AlignUp(size, SYS_PAGE_SIZE) == size, "Got unaligned size"); return !::VirtualFree(addr, size, MEM_DECOMMIT); } #else inline void* TSystemMmap::Mmap(size_t size) { return ::mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, 0, 0); } inline int TSystemMmap::Munmap(void* addr, size_t size) { Y_DEBUG_ABORT_UNLESS(AlignUp(addr, SYS_PAGE_SIZE) == addr, "Got unaligned address"); Y_DEBUG_ABORT_UNLESS(AlignUp(size, SYS_PAGE_SIZE) == size, "Got unaligned size"); return ::munmap(addr, size); } #endif std::function TFakeAlignedMmap::OnMmap = {}; std::function TFakeAlignedMmap::OnMunmap = {}; void* TFakeAlignedMmap::Mmap(size_t size) { if (OnMmap) { OnMmap(size); } return reinterpret_cast(TAlignedPagePool::POOL_PAGE_SIZE); } int TFakeAlignedMmap::Munmap(void* addr, size_t size) { if (OnMunmap) { OnMunmap(addr, size); } return 0; } std::function TFakeUnalignedMmap::OnMmap = {}; std::function TFakeUnalignedMmap::OnMunmap = {}; void* TFakeUnalignedMmap::Mmap(size_t size) { if (OnMmap) { OnMmap(size); } return reinterpret_cast(TAlignedPagePool::POOL_PAGE_SIZE+1); } int TFakeUnalignedMmap::Munmap(void* addr, size_t size) { if (OnMunmap) { OnMunmap(addr, size); } return 0; } TAlignedPagePoolCounters::TAlignedPagePoolCounters(::NMonitoring::TDynamicCounterPtr countersRoot, const TString& name) { if (!countersRoot || name.empty()) return; ::NMonitoring::TDynamicCounterPtr subGroup = countersRoot->GetSubgroup("counters", "utils")->GetSubgroup("subsystem", "mkqlalloc"); TotalBytesAllocatedCntr = subGroup->GetCounter(name + "/TotalBytesAllocated"); AllocationsCntr = subGroup->GetCounter(name + "/Allocations", true); PoolsCntr = subGroup->GetCounter(name + "/Pools", true); LostPagesBytesFreeCntr = subGroup->GetCounter(name + "/LostPagesBytesFreed", true); } template TAlignedPagePoolImpl::~TAlignedPagePoolImpl() { if (CheckLostMem && !UncaughtException()) { Y_DEBUG_ABORT_UNLESS(TotalAllocated == FreePages.size() * POOL_PAGE_SIZE, "memory leak; Expected %ld, actual %ld (%ld page(s), %ld offloaded); allocator created at: %s", TotalAllocated, FreePages.size() * POOL_PAGE_SIZE, FreePages.size(), OffloadedActiveBytes, GetDebugInfo().data()); Y_DEBUG_ABORT_UNLESS(OffloadedActiveBytes == 0, "offloaded: %ld", OffloadedActiveBytes); } size_t activeBlocksSize = 0; for (auto it = ActiveBlocks.cbegin(); ActiveBlocks.cend() != it; ActiveBlocks.erase(it++)) { activeBlocksSize += it->second; #if defined(ALLOW_DEFAULT_ALLOCATOR) if (Y_UNLIKELY(IsDefaultAllocator)) { ReturnBlock(it->first, it->second); return; } #endif Free(it->first, it->second); } if (activeBlocksSize > 0 || FreePages.size() != AllPages.size() || OffloadedActiveBytes) { if (Counters.LostPagesBytesFreeCntr) { (*Counters.LostPagesBytesFreeCntr) += OffloadedActiveBytes + activeBlocksSize + (AllPages.size() - FreePages.size()) * POOL_PAGE_SIZE; } } Y_DEBUG_ABORT_UNLESS(TotalAllocated == AllPages.size() * POOL_PAGE_SIZE + OffloadedActiveBytes, "Expected %ld, actual %ld (%ld page(s))", TotalAllocated, AllPages.size() * POOL_PAGE_SIZE + OffloadedActiveBytes, AllPages.size()); for (auto &ptr : AllPages) { TGlobalPools::Instance().PushPage(0, ptr); } if (Counters.TotalBytesAllocatedCntr) { (*Counters.TotalBytesAllocatedCntr) -= TotalAllocated; } if (Counters.PoolsCntr) { --(*Counters.PoolsCntr); } TotalAllocated = 0; } template void TAlignedPagePoolImpl::ReleaseFreePages() { TotalAllocated -= FreePages.size() * POOL_PAGE_SIZE; if (Counters.TotalBytesAllocatedCntr) { (*Counters.TotalBytesAllocatedCntr) -= FreePages.size() * POOL_PAGE_SIZE; } for (; !FreePages.empty(); FreePages.pop()) { AllPages.erase(FreePages.top()); TGlobalPools::Instance().PushPage(0, FreePages.top()); } } template void TAlignedPagePoolImpl::OffloadAlloc(ui64 size) { if (Limit && TotalAllocated + size > Limit && !TryIncreaseLimit(TotalAllocated + size)) { throw TMemoryLimitExceededException(); } if (AllocNotifyCallback) { if (AllocNotifyCurrentBytes > AllocNotifyBytes) { AllocNotifyCallback(); AllocNotifyCurrentBytes = 0; } } ++OffloadedAllocCount; OffloadedBytes += size; OffloadedActiveBytes += size; TotalAllocated += size; if (AllocNotifyCallback) { AllocNotifyCurrentBytes += size; } if (Counters.TotalBytesAllocatedCntr) { (*Counters.TotalBytesAllocatedCntr) += size; } if (Counters.AllocationsCntr) { ++(*Counters.AllocationsCntr); } UpdatePeaks(); } template void TAlignedPagePoolImpl::OffloadFree(ui64 size) noexcept { TotalAllocated -= size; OffloadedActiveBytes -= size; if (Counters.TotalBytesAllocatedCntr) { (*Counters.TotalBytesAllocatedCntr) -= size; } } template void* TAlignedPagePoolImpl::GetPage() { ++PageAllocCount; if (!FreePages.empty()) { ++PageHitCount; const auto res = FreePages.top(); FreePages.pop(); return res; } if (Limit && TotalAllocated + POOL_PAGE_SIZE > Limit && !TryIncreaseLimit(TotalAllocated + POOL_PAGE_SIZE)) { throw TMemoryLimitExceededException(); } #if defined(ALLOW_DEFAULT_ALLOCATOR) if (Y_LIKELY(!IsDefaultAllocator)) { #endif if (const auto ptr = TGlobalPools::Instance().Get(0).GetPage()) { TotalAllocated += POOL_PAGE_SIZE; if (AllocNotifyCallback) { AllocNotifyCurrentBytes += POOL_PAGE_SIZE; } if (Counters.TotalBytesAllocatedCntr) { (*Counters.TotalBytesAllocatedCntr) += POOL_PAGE_SIZE; } ++PageGlobalHitCount; AllPages.emplace(ptr); UpdatePeaks(); return ptr; } ++PageMissCount; #if defined(ALLOW_DEFAULT_ALLOCATOR) } #endif void* res; #if defined(ALLOW_DEFAULT_ALLOCATOR) if (Y_UNLIKELY(IsDefaultAllocator)) { res = GetBlock(POOL_PAGE_SIZE); } else { #endif res = Alloc(POOL_PAGE_SIZE); #if defined(ALLOW_DEFAULT_ALLOCATOR) } #endif AllPages.emplace(res); return res; } template void TAlignedPagePoolImpl::ReturnPage(void* addr) noexcept { #if defined(ALLOW_DEFAULT_ALLOCATOR) if (Y_UNLIKELY(IsDefaultAllocator)) { ReturnBlock(addr, POOL_PAGE_SIZE); return; } #endif Y_DEBUG_ABORT_UNLESS(AllPages.find(addr) != AllPages.end()); FreePages.emplace(addr); } template void* TAlignedPagePoolImpl::GetBlock(size_t size) { Y_DEBUG_ABORT_UNLESS(size >= POOL_PAGE_SIZE); #if defined(ALLOW_DEFAULT_ALLOCATOR) if (Y_UNLIKELY(IsDefaultAllocator)) { OffloadAlloc(size); auto ret = malloc(size); if (!ret) { throw TMemoryLimitExceededException(); } return ret; } #endif if (size == POOL_PAGE_SIZE) { return GetPage(); } else { const auto ptr = Alloc(size); Y_DEBUG_ABORT_UNLESS(ActiveBlocks.emplace(ptr, size).second); return ptr; } } template void TAlignedPagePoolImpl::ReturnBlock(void* ptr, size_t size) noexcept { Y_DEBUG_ABORT_UNLESS(size >= POOL_PAGE_SIZE); #if defined(ALLOW_DEFAULT_ALLOCATOR) if (Y_UNLIKELY(IsDefaultAllocator)) { OffloadFree(size); free(ptr); UpdateMemoryYellowZone(); return; } #endif if (size == POOL_PAGE_SIZE) { ReturnPage(ptr); } else { Free(ptr, size); Y_DEBUG_ABORT_UNLESS(ActiveBlocks.erase(ptr)); } UpdateMemoryYellowZone(); } template void* TAlignedPagePoolImpl::Alloc(size_t size) { void* res = nullptr; size = AlignUp(size, SYS_PAGE_SIZE); if (Limit && TotalAllocated + size > Limit && !TryIncreaseLimit(TotalAllocated + size)) { throw TMemoryLimitExceededException(); } if (AllocNotifyCallback) { if (AllocNotifyCurrentBytes > AllocNotifyBytes) { AllocNotifyCallback(); AllocNotifyCurrentBytes = 0; } } auto& globalPool = TGlobalPools::Instance(); if (size > POOL_PAGE_SIZE && size <= MaxMidSize) { size = FastClp2(size); auto level = LeastSignificantBit(size) - LeastSignificantBit(POOL_PAGE_SIZE); Y_DEBUG_ABORT_UNLESS(level >= 1 && level <= MidLevels); if (res = globalPool.Get(level).GetPage()) { TotalAllocated += size; if (AllocNotifyCallback) { AllocNotifyCurrentBytes += size; } if (Counters.TotalBytesAllocatedCntr) { (*Counters.TotalBytesAllocatedCntr) += size; } ++PageGlobalHitCount; } else { ++PageMissCount; } } if (!res) { auto allocSize = size + ALLOC_AHEAD_PAGES * POOL_PAGE_SIZE; void* mem = globalPool.DoMmap(allocSize); if (Y_UNLIKELY(MAP_FAILED == mem)) { TStringStream mmaps; const auto lastError = LastSystemError(); if (lastError == ENOMEM) { mmaps << GetMemoryMapsString(); } ythrow yexception() << "Mmap failed to allocate " << (size + POOL_PAGE_SIZE) << " bytes: " << LastSystemErrorText(lastError) << mmaps.Str(); } res = AlignUp(mem, POOL_PAGE_SIZE); const size_t off = reinterpret_cast(res) - reinterpret_cast(mem); if (Y_UNLIKELY(off)) { // unmap prefix globalPool.DoMunmap(mem, off); } // Extra space is also page-aligned. Put it to the free page list auto alignedSize = AlignUp(size, POOL_PAGE_SIZE); ui64 extraPages = (allocSize - off - alignedSize) / POOL_PAGE_SIZE; ui64 tail = (allocSize - off - alignedSize) % POOL_PAGE_SIZE; auto extraPage = reinterpret_cast(res) + alignedSize; for (ui64 i = 0; i < extraPages; ++i) { AllPages.emplace(extraPage); FreePages.emplace(extraPage); extraPage += POOL_PAGE_SIZE; } if (size != alignedSize) { // unmap unaligned hole globalPool.DoMunmap(reinterpret_cast(res) + size, alignedSize - size); } if (tail) { // unmap suffix Y_DEBUG_ABORT_UNLESS(extraPage+tail <= reinterpret_cast(mem) + size + ALLOC_AHEAD_PAGES * POOL_PAGE_SIZE); globalPool.DoMunmap(extraPage, tail); } auto extraSize = extraPages * POOL_PAGE_SIZE; auto totalSize = size + extraSize; TotalAllocated += totalSize; if (AllocNotifyCallback) { AllocNotifyCurrentBytes += totalSize; } if (Counters.TotalBytesAllocatedCntr) { (*Counters.TotalBytesAllocatedCntr) += totalSize; } } if (Counters.AllocationsCntr) { ++(*Counters.AllocationsCntr); } ++AllocCount; UpdatePeaks(); return res; } template void TAlignedPagePoolImpl::Free(void* ptr, size_t size) noexcept { size = AlignUp(size, SYS_PAGE_SIZE); if (size <= MaxMidSize) size = FastClp2(size); if (size <= MaxMidSize) { auto level = LeastSignificantBit(size) - LeastSignificantBit(POOL_PAGE_SIZE); Y_DEBUG_ABORT_UNLESS(level >= 1 && level <= MidLevels); TGlobalPools::Instance().PushPage(level, ptr); } else { TGlobalPools::Instance().DoMunmap(ptr, size); } Y_DEBUG_ABORT_UNLESS(TotalAllocated >= size); TotalAllocated -= size; if (Counters.TotalBytesAllocatedCntr) { (*Counters.TotalBytesAllocatedCntr) -= size; } } template void TAlignedPagePoolImpl::DoCleanupGlobalFreeList(ui64 targetSize) { TGlobalPools::Instance().DoCleanupFreeList(targetSize); TGlobalPools::Instance().DoCleanupFreeList(targetSize); } template void TAlignedPagePoolImpl::UpdateMemoryYellowZone() { if (Limit == 0) return; if (IsMemoryYellowZoneForcefullyChanged) return; if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return; ui8 usedMemoryPercent = 100 * GetUsed() / Limit; if (usedMemoryPercent >= EnableMemoryYellowZoneThreshold) { IsMemoryYellowZoneReached = true; } else if (usedMemoryPercent <= DisableMemoryYellowZoneThreshold) { IsMemoryYellowZoneReached = false; } } template bool TAlignedPagePoolImpl::TryIncreaseLimit(ui64 required) { if (!IncreaseMemoryLimitCallback) { return false; } IncreaseMemoryLimitCallback(Limit, required); return Limit >= required; } template ui64 TAlignedPagePoolImpl::GetGlobalPagePoolSize() { ui64 size = 0; for (size_t level = 0; level <= MidLevels; ++level) { size += TGlobalPools::Instance().Get(level).GetSize(); } return size; } template void TAlignedPagePoolImpl::PrintStat(size_t usedPages, IOutputStream& out) const { usedPages += GetFreePageCount(); out << "Count of free pages: " << GetFreePageCount() << Endl; out << "Allocated for blocks: " << (GetAllocated() - usedPages * POOL_PAGE_SIZE) << Endl; out << "Total allocated by lists: " << GetAllocated() << Endl; } template void TAlignedPagePoolImpl::ResetGlobalsUT() { TGlobalPools::Instance().Reset(); } #if defined(ALLOW_DEFAULT_ALLOCATOR) // static template bool TAlignedPagePoolImpl::IsDefaultAllocatorUsed() { return IsDefaultAllocator; } #endif template class TAlignedPagePoolImpl<>; template class TAlignedPagePoolImpl; template class TAlignedPagePoolImpl; template void* GetAlignedPage(ui64 size) { size = AlignUp(size, SYS_PAGE_SIZE); if (size < TAlignedPagePool::POOL_PAGE_SIZE) { size = TAlignedPagePool::POOL_PAGE_SIZE; } auto& pool = TGlobalPools::Instance(); if (size <= MaxMidSize) { size = FastClp2(size); auto level = LeastSignificantBit(size) - LeastSignificantBit(TAlignedPagePool::POOL_PAGE_SIZE); Y_DEBUG_ABORT_UNLESS(level <= MidLevels); if (auto res = pool.Get(level).GetPage()) { return res; } } auto allocSize = Max(MaxMidSize, size); void* mem = pool.DoMmap(allocSize); if (Y_UNLIKELY(MAP_FAILED == mem)) { TStringStream mmaps; const auto lastError = LastSystemError(); if (lastError == ENOMEM) { mmaps << GetMemoryMapsString(); } ythrow yexception() << "Mmap failed to allocate " << allocSize << " bytes: " << LastSystemErrorText(lastError) << mmaps.Str(); } if (size < MaxMidSize) { // push extra allocated pages to cache auto level = LeastSignificantBit(size) - LeastSignificantBit(TAlignedPagePool::POOL_PAGE_SIZE); Y_DEBUG_ABORT_UNLESS(level <= MidLevels); ui8* ptr = (ui8*)mem + size; ui8* const end = (ui8*)mem + MaxMidSize; while (ptr < end) { pool.PushPage(level, ptr); ptr += size; } } return mem; } template void ReleaseAlignedPage(void* mem, ui64 size) { size = AlignUp(size, SYS_PAGE_SIZE); if (size < TAlignedPagePool::POOL_PAGE_SIZE) { size = TAlignedPagePool::POOL_PAGE_SIZE; } if (size <= MaxMidSize) { size = FastClp2(size); auto level = LeastSignificantBit(size) - LeastSignificantBit(TAlignedPagePool::POOL_PAGE_SIZE); Y_DEBUG_ABORT_UNLESS(level <= MidLevels); TGlobalPools::Instance().PushPage(level, mem); return; } TGlobalPools::Instance().DoMunmap(mem, size); } template i64 GetTotalMmapedBytes() { return TGlobalPools::Instance().GetTotalMmappedBytes() + TGlobalPools::Instance().GetTotalMmappedBytes(); } template i64 GetTotalFreeListBytes() { return TGlobalPools::Instance().GetTotalFreeListBytes() + TGlobalPools::Instance().GetTotalFreeListBytes(); } template i64 GetTotalMmapedBytes<>(); template i64 GetTotalMmapedBytes(); template i64 GetTotalMmapedBytes(); template i64 GetTotalFreeListBytes<>(); template i64 GetTotalFreeListBytes(); template i64 GetTotalFreeListBytes(); template void* GetAlignedPage<>(ui64); template void* GetAlignedPage(ui64); template void* GetAlignedPage(ui64); template void ReleaseAlignedPage<>(void*,ui64); template void ReleaseAlignedPage(void*,ui64); template void ReleaseAlignedPage(void*,ui64); size_t GetMemoryMapsCount() { size_t lineCount = 0; TString line; #if defined(_unix_) TFileInput file("/proc/self/maps"); while (file.ReadLine(line)) ++lineCount; #endif return lineCount; } } // NKikimr