ib_mem.cpp 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. #include "stdafx.h"
  2. #include "ib_mem.h"
  3. #include "ib_low.h"
  4. #include "cpu_affinity.h"
  5. #if defined(_unix_)
  6. #include <pthread.h>
  7. #endif
  8. namespace NNetliba {
  9. TIBMemSuperBlock::TIBMemSuperBlock(TIBMemPool* pool, size_t szLog)
  10. : Pool(pool)
  11. , SzLog(szLog)
  12. , UseCount(0)
  13. {
  14. size_t sz = GetSize();
  15. MemRegion = new TMemoryRegion(pool->GetIBContext(), sz);
  16. //printf("Alloc super block, size %" PRId64 "\n", sz);
  17. }
  18. TIBMemSuperBlock::~TIBMemSuperBlock() {
  19. Y_ASSERT(AtomicGet(UseCount) == 0);
  20. }
  21. char* TIBMemSuperBlock::GetData() {
  22. return MemRegion->GetData();
  23. }
  24. void TIBMemSuperBlock::DecRef() {
  25. if (AtomicAdd(UseCount, -1) == 0) {
  26. Pool->Return(this);
  27. }
  28. }
  29. TIBMemBlock::~TIBMemBlock() {
  30. if (Super.Get()) {
  31. Super->DecRef();
  32. } else {
  33. delete[] Data;
  34. }
  35. }
  36. //////////////////////////////////////////////////////////////////////////
  37. TIBMemPool::TIBMemPool(TPtrArg<TIBContext> ctx)
  38. : IBCtx(ctx)
  39. , AllocCacheSize(0)
  40. , CurrentOffset(IB_MEM_LARGE_BLOCK)
  41. , WorkThread(TThread::TParams(ThreadFunc, (void*)this).SetName("nl6_ib_mem_pool"))
  42. , KeepRunning(true)
  43. {
  44. WorkThread.Start();
  45. HasStarted.Wait();
  46. }
  47. TIBMemPool::~TIBMemPool() {
  48. Y_ASSERT(WorkThread.Running());
  49. KeepRunning = false;
  50. HasWork.Signal();
  51. WorkThread.Join();
  52. {
  53. TJobItem* work = nullptr;
  54. while (Requests.Dequeue(&work)) {
  55. delete work;
  56. }
  57. }
  58. }
  59. TIntrusivePtr<TIBMemSuperBlock> TIBMemPool::AllocSuper(size_t szArg) {
  60. // assume CacheLock is taken
  61. size_t szLog = 12;
  62. while ((((size_t)1) << szLog) < szArg) {
  63. ++szLog;
  64. }
  65. TIntrusivePtr<TIBMemSuperBlock> super;
  66. {
  67. TVector<TIntrusivePtr<TIBMemSuperBlock>>& cc = AllocCache[szLog];
  68. if (!cc.empty()) {
  69. super = cc.back();
  70. cc.resize(cc.size() - 1);
  71. AllocCacheSize -= 1ll << super->SzLog;
  72. }
  73. }
  74. if (super.Get() == nullptr) {
  75. super = new TIBMemSuperBlock(this, szLog);
  76. }
  77. return super;
  78. }
  79. TIBMemBlock* TIBMemPool::Alloc(size_t sz) {
  80. TGuard<TMutex> gg(CacheLock);
  81. if (sz > IB_MEM_LARGE_BLOCK) {
  82. TIntrusivePtr<TIBMemSuperBlock> super = AllocSuper(sz);
  83. return new TIBMemBlock(super, super->GetData(), sz);
  84. } else {
  85. if (CurrentOffset + sz > IB_MEM_LARGE_BLOCK) {
  86. CurrentBlk.Assign(AllocSuper(IB_MEM_LARGE_BLOCK));
  87. CurrentOffset = 0;
  88. }
  89. CurrentOffset += sz;
  90. return new TIBMemBlock(CurrentBlk.Get(), CurrentBlk.Get()->GetData() + CurrentOffset - sz, sz);
  91. }
  92. }
  93. void TIBMemPool::Return(TPtrArg<TIBMemSuperBlock> blk) {
  94. TGuard<TMutex> gg(CacheLock);
  95. Y_ASSERT(AtomicGet(blk->UseCount) == 0);
  96. size_t sz = 1ull << blk->SzLog;
  97. if (sz + AllocCacheSize > IB_MEM_POOL_SIZE) {
  98. AllocCache.clear();
  99. AllocCacheSize = 0;
  100. }
  101. {
  102. TVector<TIntrusivePtr<TIBMemSuperBlock>>& cc = AllocCache[blk->SzLog];
  103. cc.push_back(blk.Get());
  104. AllocCacheSize += sz;
  105. }
  106. }
  107. void* TIBMemPool::ThreadFunc(void* param) {
  108. BindToSocket(0);
  109. SetHighestThreadPriority();
  110. TIBMemPool* pThis = (TIBMemPool*)param;
  111. pThis->HasStarted.Signal();
  112. while (pThis->KeepRunning) {
  113. TJobItem* work = nullptr;
  114. if (!pThis->Requests.Dequeue(&work)) {
  115. pThis->HasWork.Reset();
  116. if (!pThis->Requests.Dequeue(&work)) {
  117. pThis->HasWork.Wait();
  118. }
  119. }
  120. if (work) {
  121. //printf("mem copy got work\n");
  122. int sz = work->Data->GetSize();
  123. work->Block = pThis->Alloc(sz);
  124. TBlockChainIterator bc(work->Data->GetChain());
  125. bc.Read(work->Block->GetData(), sz);
  126. TIntrusivePtr<TCopyResultStorage> dst = work->ResultStorage;
  127. work->ResultStorage = nullptr;
  128. dst->Results.Enqueue(work);
  129. //printf("mem copy completed\n");
  130. }
  131. }
  132. return nullptr;
  133. }
  134. //////////////////////////////////////////////////////////////////////////
  135. static TMutex IBMemMutex;
  136. static TIntrusivePtr<TIBMemPool> IBMemPool;
  137. static bool IBWasInitialized;
  138. TIntrusivePtr<TIBMemPool> GetIBMemPool() {
  139. TGuard<TMutex> gg(IBMemMutex);
  140. if (IBWasInitialized) {
  141. return IBMemPool;
  142. }
  143. IBWasInitialized = true;
  144. TIntrusivePtr<TIBPort> ibPort = GetIBDevice();
  145. if (ibPort.Get() == nullptr) {
  146. return nullptr;
  147. }
  148. IBMemPool = new TIBMemPool(ibPort->GetCtx());
  149. return IBMemPool;
  150. }
  151. }