ib_mem.h 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. #pragma once
  2. #include "block_chain.h"
  3. #include <util/thread/lfqueue.h>
  4. #include <util/system/thread.h>
  5. namespace NNetliba {
  6. // registered memory blocks
  7. class TMemoryRegion;
  8. class TIBContext;
  9. class TIBMemPool;
  10. struct TIBMemSuperBlock: public TThrRefBase, TNonCopyable {
  11. TIntrusivePtr<TIBMemPool> Pool;
  12. size_t SzLog;
  13. TAtomic UseCount;
  14. TIntrusivePtr<TMemoryRegion> MemRegion;
  15. TIBMemSuperBlock(TIBMemPool* pool, size_t szLog);
  16. ~TIBMemSuperBlock() override;
  17. char* GetData();
  18. size_t GetSize() {
  19. return ((ui64)1) << SzLog;
  20. }
  21. void IncRef() {
  22. AtomicAdd(UseCount, 1);
  23. }
  24. void DecRef();
  25. };
  26. class TIBMemBlock: public TThrRefBase, TNonCopyable {
  27. TIntrusivePtr<TIBMemSuperBlock> Super;
  28. char* Data;
  29. size_t Size;
  30. ~TIBMemBlock() override;
  31. public:
  32. TIBMemBlock(TPtrArg<TIBMemSuperBlock> super, char* data, size_t sz)
  33. : Super(super)
  34. , Data(data)
  35. , Size(sz)
  36. {
  37. Super->IncRef();
  38. }
  39. TIBMemBlock(size_t sz)
  40. : Super(nullptr)
  41. , Size(sz)
  42. {
  43. // not really IB mem block, but useful IB code debug without IB
  44. Data = new char[sz];
  45. }
  46. char* GetData() {
  47. return Data;
  48. }
  49. ui64 GetAddr() {
  50. return reinterpret_cast<ui64>(Data) / sizeof(char);
  51. }
  52. size_t GetSize() {
  53. return Size;
  54. }
  55. TMemoryRegion* GetMemRegion() {
  56. return Super.Get() ? Super->MemRegion.Get() : nullptr;
  57. }
  58. };
  59. const size_t IB_MEM_LARGE_BLOCK_LN = 20;
  60. const size_t IB_MEM_LARGE_BLOCK = 1ul << IB_MEM_LARGE_BLOCK_LN;
  61. const size_t IB_MEM_POOL_SIZE = 1024 * 1024 * 1024;
  62. class TIBMemPool: public TThrRefBase, TNonCopyable {
  63. public:
  64. struct TCopyResultStorage;
  65. private:
  66. class TIBMemSuperBlockPtr {
  67. TIntrusivePtr<TIBMemSuperBlock> Blk;
  68. public:
  69. ~TIBMemSuperBlockPtr() {
  70. Detach();
  71. }
  72. void Assign(TIntrusivePtr<TIBMemSuperBlock> p) {
  73. Detach();
  74. Blk = p;
  75. if (p.Get()) {
  76. AtomicAdd(p->UseCount, 1);
  77. }
  78. }
  79. void Detach() {
  80. if (Blk.Get()) {
  81. Blk->DecRef();
  82. Blk = nullptr;
  83. }
  84. }
  85. TIBMemSuperBlock* Get() {
  86. return Blk.Get();
  87. }
  88. };
  89. TIntrusivePtr<TIBContext> IBCtx;
  90. THashMap<size_t, TVector<TIntrusivePtr<TIBMemSuperBlock>>> AllocCache;
  91. size_t AllocCacheSize;
  92. TIBMemSuperBlockPtr CurrentBlk;
  93. int CurrentOffset;
  94. TMutex CacheLock;
  95. TThread WorkThread;
  96. TSystemEvent HasStarted;
  97. bool KeepRunning;
  98. struct TJobItem {
  99. TRopeDataPacket* Data;
  100. i64 MsgHandle;
  101. TIntrusivePtr<TThrRefBase> Context;
  102. TIntrusivePtr<TIBMemBlock> Block;
  103. TIntrusivePtr<TCopyResultStorage> ResultStorage;
  104. TJobItem(TRopeDataPacket* data, i64 msgHandle, TThrRefBase* context, TPtrArg<TCopyResultStorage> dst)
  105. : Data(data)
  106. , MsgHandle(msgHandle)
  107. , Context(context)
  108. , ResultStorage(dst)
  109. {
  110. }
  111. };
  112. TLockFreeQueue<TJobItem*> Requests;
  113. TSystemEvent HasWork;
  114. static void* ThreadFunc(void* param);
  115. void Return(TPtrArg<TIBMemSuperBlock> blk);
  116. TIntrusivePtr<TIBMemSuperBlock> AllocSuper(size_t sz);
  117. ~TIBMemPool() override;
  118. public:
  119. struct TCopyResultStorage: public TThrRefBase {
  120. TLockFreeStack<TJobItem*> Results;
  121. ~TCopyResultStorage() override {
  122. TJobItem* work;
  123. while (Results.Dequeue(&work)) {
  124. delete work;
  125. }
  126. }
  127. template <class T>
  128. bool GetCopyResult(TIntrusivePtr<TIBMemBlock>* resBlock, i64* resMsgHandle, TIntrusivePtr<T>* context) {
  129. TJobItem* work;
  130. if (Results.Dequeue(&work)) {
  131. *resBlock = work->Block;
  132. *resMsgHandle = work->MsgHandle;
  133. *context = static_cast<T*>(work->Context.Get()); // caller responsibility to make sure this makes sense
  134. delete work;
  135. return true;
  136. } else {
  137. return false;
  138. }
  139. }
  140. };
  141. public:
  142. TIBMemPool(TPtrArg<TIBContext> ctx);
  143. TIBContext* GetIBContext() {
  144. return IBCtx.Get();
  145. }
  146. TIBMemBlock* Alloc(size_t sz);
  147. void CopyData(TRopeDataPacket* data, i64 msgHandle, TThrRefBase* context, TPtrArg<TCopyResultStorage> dst) {
  148. Requests.Enqueue(new TJobItem(data, msgHandle, context, dst));
  149. HasWork.Signal();
  150. }
  151. friend class TIBMemBlock;
  152. friend struct TIBMemSuperBlock;
  153. };
  154. extern TIntrusivePtr<TIBMemPool> GetIBMemPool();
  155. }