123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- #include "stdafx.h"
- #include "ib_mem.h"
- #include "ib_low.h"
- #include "cpu_affinity.h"
- #if defined(_unix_)
- #include <pthread.h>
- #endif
- namespace NNetliba {
- TIBMemSuperBlock::TIBMemSuperBlock(TIBMemPool* pool, size_t szLog)
- : Pool(pool)
- , SzLog(szLog)
- , UseCount(0)
- {
- size_t sz = GetSize();
- MemRegion = new TMemoryRegion(pool->GetIBContext(), sz);
- //printf("Alloc super block, size %" PRId64 "\n", sz);
- }
- TIBMemSuperBlock::~TIBMemSuperBlock() {
- Y_ASSERT(AtomicGet(UseCount) == 0);
- }
- char* TIBMemSuperBlock::GetData() {
- return MemRegion->GetData();
- }
- void TIBMemSuperBlock::DecRef() {
- if (AtomicAdd(UseCount, -1) == 0) {
- Pool->Return(this);
- }
- }
- TIBMemBlock::~TIBMemBlock() {
- if (Super.Get()) {
- Super->DecRef();
- } else {
- delete[] Data;
- }
- }
- //////////////////////////////////////////////////////////////////////////
- TIBMemPool::TIBMemPool(TPtrArg<TIBContext> ctx)
- : IBCtx(ctx)
- , AllocCacheSize(0)
- , CurrentOffset(IB_MEM_LARGE_BLOCK)
- , WorkThread(TThread::TParams(ThreadFunc, (void*)this).SetName("nl6_ib_mem_pool"))
- , KeepRunning(true)
- {
- WorkThread.Start();
- HasStarted.Wait();
- }
- TIBMemPool::~TIBMemPool() {
- Y_ASSERT(WorkThread.Running());
- KeepRunning = false;
- HasWork.Signal();
- WorkThread.Join();
- {
- TJobItem* work = nullptr;
- while (Requests.Dequeue(&work)) {
- delete work;
- }
- }
- }
- TIntrusivePtr<TIBMemSuperBlock> TIBMemPool::AllocSuper(size_t szArg) {
- // assume CacheLock is taken
- size_t szLog = 12;
- while ((((size_t)1) << szLog) < szArg) {
- ++szLog;
- }
- TIntrusivePtr<TIBMemSuperBlock> super;
- {
- TVector<TIntrusivePtr<TIBMemSuperBlock>>& cc = AllocCache[szLog];
- if (!cc.empty()) {
- super = cc.back();
- cc.resize(cc.size() - 1);
- AllocCacheSize -= 1ll << super->SzLog;
- }
- }
- if (super.Get() == nullptr) {
- super = new TIBMemSuperBlock(this, szLog);
- }
- return super;
- }
- TIBMemBlock* TIBMemPool::Alloc(size_t sz) {
- TGuard<TMutex> gg(CacheLock);
- if (sz > IB_MEM_LARGE_BLOCK) {
- TIntrusivePtr<TIBMemSuperBlock> super = AllocSuper(sz);
- return new TIBMemBlock(super, super->GetData(), sz);
- } else {
- if (CurrentOffset + sz > IB_MEM_LARGE_BLOCK) {
- CurrentBlk.Assign(AllocSuper(IB_MEM_LARGE_BLOCK));
- CurrentOffset = 0;
- }
- CurrentOffset += sz;
- return new TIBMemBlock(CurrentBlk.Get(), CurrentBlk.Get()->GetData() + CurrentOffset - sz, sz);
- }
- }
- void TIBMemPool::Return(TPtrArg<TIBMemSuperBlock> blk) {
- TGuard<TMutex> gg(CacheLock);
- Y_ASSERT(AtomicGet(blk->UseCount) == 0);
- size_t sz = 1ull << blk->SzLog;
- if (sz + AllocCacheSize > IB_MEM_POOL_SIZE) {
- AllocCache.clear();
- AllocCacheSize = 0;
- }
- {
- TVector<TIntrusivePtr<TIBMemSuperBlock>>& cc = AllocCache[blk->SzLog];
- cc.push_back(blk.Get());
- AllocCacheSize += sz;
- }
- }
- void* TIBMemPool::ThreadFunc(void* param) {
- BindToSocket(0);
- SetHighestThreadPriority();
- TIBMemPool* pThis = (TIBMemPool*)param;
- pThis->HasStarted.Signal();
- while (pThis->KeepRunning) {
- TJobItem* work = nullptr;
- if (!pThis->Requests.Dequeue(&work)) {
- pThis->HasWork.Reset();
- if (!pThis->Requests.Dequeue(&work)) {
- pThis->HasWork.Wait();
- }
- }
- if (work) {
- //printf("mem copy got work\n");
- int sz = work->Data->GetSize();
- work->Block = pThis->Alloc(sz);
- TBlockChainIterator bc(work->Data->GetChain());
- bc.Read(work->Block->GetData(), sz);
- TIntrusivePtr<TCopyResultStorage> dst = work->ResultStorage;
- work->ResultStorage = nullptr;
- dst->Results.Enqueue(work);
- //printf("mem copy completed\n");
- }
- }
- return nullptr;
- }
- //////////////////////////////////////////////////////////////////////////
- static TMutex IBMemMutex;
- static TIntrusivePtr<TIBMemPool> IBMemPool;
- static bool IBWasInitialized;
- TIntrusivePtr<TIBMemPool> GetIBMemPool() {
- TGuard<TMutex> gg(IBMemMutex);
- if (IBWasInitialized) {
- return IBMemPool;
- }
- IBWasInitialized = true;
- TIntrusivePtr<TIBPort> ibPort = GetIBDevice();
- if (ibPort.Get() == nullptr) {
- return nullptr;
- }
- IBMemPool = new TIBMemPool(ibPort->GetCtx());
- return IBMemPool;
- }
- }
|