mkql_computation_pattern_cache.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. #include "mkql_computation_pattern_cache.h"
  2. #include <util/generic/intrlist.h>
  3. namespace NKikimr::NMiniKQL {
  4. class TComputationPatternLRUCache::TLRUPatternCacheImpl
  5. {
  6. public:
  7. TLRUPatternCacheImpl(size_t maxPatternsSize,
  8. size_t maxPatternsSizeBytes,
  9. size_t maxCompiledPatternsSize,
  10. size_t maxCompiledPatternsSizeBytes)
  11. : MaxPatternsSize(maxPatternsSize)
  12. , MaxPatternsSizeBytes(maxPatternsSizeBytes)
  13. , MaxCompiledPatternsSize(maxCompiledPatternsSize)
  14. , MaxCompiledPatternsSizeBytes(maxCompiledPatternsSizeBytes)
  15. {}
  16. size_t PatternsSize() const {
  17. return SerializedProgramToPatternCacheHolder.size();
  18. }
  19. size_t PatternsSizeInBytes() const {
  20. return CurrentPatternsSizeBytes;
  21. }
  22. size_t CompiledPatternsSize() const {
  23. return CurrentCompiledPatternsSize;
  24. }
  25. size_t PatternsCompiledCodeSizeInBytes() const {
  26. return CurrentPatternsCompiledCodeSizeInBytes;
  27. }
  28. std::shared_ptr<TPatternCacheEntry>* Find(const TString& serializedProgram) {
  29. auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
  30. if (it == SerializedProgramToPatternCacheHolder.end()) {
  31. return nullptr;
  32. }
  33. PromoteEntry(&it->second);
  34. return &it->second.Entry;
  35. }
  36. void Insert(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
  37. auto [it, inserted] = SerializedProgramToPatternCacheHolder.emplace(std::piecewise_construct,
  38. std::forward_as_tuple(serializedProgram),
  39. std::forward_as_tuple(serializedProgram, entry));
  40. if (!inserted) {
  41. RemoveEntryFromLists(&it->second);
  42. }
  43. entry->UpdateSizeForCache();
  44. /// New item is inserted, insert it in the back of both LRU lists and recalculate sizes
  45. CurrentPatternsSizeBytes += entry->SizeForCache;
  46. LRUPatternList.PushBack(&it->second);
  47. if (entry->Pattern->IsCompiled()) {
  48. ++CurrentCompiledPatternsSize;
  49. CurrentPatternsCompiledCodeSizeInBytes += entry->Pattern->CompiledCodeSize();
  50. LRUCompiledPatternList.PushBack(&it->second);
  51. }
  52. entry->IsInCache.store(true);
  53. ClearIfNeeded();
  54. }
  55. void NotifyPatternCompiled(const TString & serializedProgram) {
  56. auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
  57. if (it == SerializedProgramToPatternCacheHolder.end()) {
  58. return;
  59. }
  60. const auto& entry = it->second.Entry;
  61. // TODO(ilezhankin): wait until migration of yql to arcadia is complete and merge the proper fix from here:
  62. // https://github.com/ydb-platform/ydb/pull/11129
  63. if (!entry->Pattern->IsCompiled()) {
  64. return;
  65. }
  66. if (it->second.LinkedInCompiledPatternLRUList()) {
  67. return;
  68. }
  69. PromoteEntry(&it->second);
  70. ++CurrentCompiledPatternsSize;
  71. CurrentPatternsCompiledCodeSizeInBytes += entry->Pattern->CompiledCodeSize();
  72. LRUCompiledPatternList.PushBack(&it->second);
  73. ClearIfNeeded();
  74. }
  75. void Clear() {
  76. CurrentPatternsSizeBytes = 0;
  77. CurrentCompiledPatternsSize = 0;
  78. CurrentPatternsCompiledCodeSizeInBytes = 0;
  79. SerializedProgramToPatternCacheHolder.clear();
  80. for (auto & holder : LRUPatternList) {
  81. holder.Entry->IsInCache.store(false);
  82. }
  83. LRUPatternList.Clear();
  84. LRUCompiledPatternList.Clear();
  85. }
  86. private:
  87. struct TPatternLRUListTag {};
  88. struct TCompiledPatternLRUListTag {};
  89. /** Cache holder is used to store serialized program and pattern cache entry in intrusive LRU lists.
  90. * Most recently accessed items are in back of the lists, least recently accessed items are in front of the lists.
  91. */
  92. struct TPatternCacheHolder : public TIntrusiveListItem<TPatternCacheHolder, TPatternLRUListTag>, TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag> {
  93. TPatternCacheHolder(TString serializedProgram, std::shared_ptr<TPatternCacheEntry> entry)
  94. : SerializedProgram(std::move(serializedProgram))
  95. , Entry(std::move(entry))
  96. {}
  97. bool LinkedInPatternLRUList() const {
  98. return !TIntrusiveListItem<TPatternCacheHolder, TPatternLRUListTag>::Empty();
  99. }
  100. bool LinkedInCompiledPatternLRUList() const {
  101. return !TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag>::Empty();
  102. }
  103. TString SerializedProgram;
  104. std::shared_ptr<TPatternCacheEntry> Entry;
  105. };
  106. void PromoteEntry(TPatternCacheHolder* holder) {
  107. Y_ASSERT(holder->LinkedInPatternLRUList());
  108. LRUPatternList.Remove(holder);
  109. LRUPatternList.PushBack(holder);
  110. if (!holder->LinkedInCompiledPatternLRUList()) {
  111. return;
  112. }
  113. LRUCompiledPatternList.Remove(holder);
  114. LRUCompiledPatternList.PushBack(holder);
  115. }
  116. void RemoveEntryFromLists(TPatternCacheHolder* holder) {
  117. Y_ASSERT(holder->LinkedInPatternLRUList());
  118. LRUPatternList.Remove(holder);
  119. Y_ASSERT(holder->Entry->SizeForCache <= CurrentPatternsSizeBytes);
  120. CurrentPatternsSizeBytes -= holder->Entry->SizeForCache;
  121. if (!holder->LinkedInCompiledPatternLRUList()) {
  122. return;
  123. }
  124. Y_ASSERT(CurrentCompiledPatternsSize > 0);
  125. --CurrentCompiledPatternsSize;
  126. size_t patternCompiledCodeSize = holder->Entry->Pattern->CompiledCodeSize();
  127. Y_ASSERT(patternCompiledCodeSize <= CurrentPatternsCompiledCodeSizeInBytes);
  128. CurrentPatternsCompiledCodeSizeInBytes -= patternCompiledCodeSize;
  129. LRUCompiledPatternList.Remove(holder);
  130. holder->Entry->IsInCache.store(false);
  131. }
  132. void ClearIfNeeded() {
  133. /// Remove from pattern LRU list and compiled pattern LRU list
  134. while (SerializedProgramToPatternCacheHolder.size() > MaxPatternsSize || CurrentPatternsSizeBytes > MaxPatternsSizeBytes) {
  135. TPatternCacheHolder* holder = LRUPatternList.Front();
  136. RemoveEntryFromLists(holder);
  137. SerializedProgramToPatternCacheHolder.erase(holder->SerializedProgram);
  138. }
  139. /// Only remove from compiled pattern LRU list
  140. while (CurrentCompiledPatternsSize > MaxCompiledPatternsSize || CurrentPatternsCompiledCodeSizeInBytes > MaxCompiledPatternsSizeBytes) {
  141. TPatternCacheHolder* holder = LRUCompiledPatternList.PopFront();
  142. Y_ASSERT(CurrentCompiledPatternsSize > 0);
  143. --CurrentCompiledPatternsSize;
  144. auto & pattern = holder->Entry->Pattern;
  145. size_t patternCompiledSize = pattern->CompiledCodeSize();
  146. Y_ASSERT(patternCompiledSize <= CurrentPatternsCompiledCodeSizeInBytes);
  147. CurrentPatternsCompiledCodeSizeInBytes -= patternCompiledSize;
  148. pattern->RemoveCompiledCode();
  149. holder->Entry->AccessTimes.store(0);
  150. }
  151. }
  152. const size_t MaxPatternsSize;
  153. const size_t MaxPatternsSizeBytes;
  154. const size_t MaxCompiledPatternsSize;
  155. const size_t MaxCompiledPatternsSizeBytes;
  156. size_t CurrentPatternsSizeBytes = 0;
  157. size_t CurrentCompiledPatternsSize = 0;
  158. size_t CurrentPatternsCompiledCodeSizeInBytes = 0;
  159. THashMap<TString, TPatternCacheHolder> SerializedProgramToPatternCacheHolder;
  160. TIntrusiveList<TPatternCacheHolder, TPatternLRUListTag> LRUPatternList;
  161. TIntrusiveList<TPatternCacheHolder, TCompiledPatternLRUListTag> LRUCompiledPatternList;
  162. };
  163. TComputationPatternLRUCache::TComputationPatternLRUCache(const TComputationPatternLRUCache::Config& configuration, NMonitoring::TDynamicCounterPtr counters)
  164. : Cache(std::make_unique<TLRUPatternCacheImpl>(CacheMaxElementsSize, configuration.MaxSizeBytes, CacheMaxElementsSize, configuration.MaxCompiledSizeBytes))
  165. , Configuration(configuration)
  166. , Hits(counters->GetCounter("PatternCache/Hits", true))
  167. , HitsCompiled(counters->GetCounter("PatternCache/HitsCompiled", true))
  168. , Waits(counters->GetCounter("PatternCache/Waits", true))
  169. , Misses(counters->GetCounter("PatternCache/Misses", true))
  170. , NotSuitablePattern(counters->GetCounter("PatternCache/NotSuitablePattern", true))
  171. , SizeItems(counters->GetCounter("PatternCache/SizeItems", false))
  172. , SizeCompiledItems(counters->GetCounter("PatternCache/SizeCompiledItems", false))
  173. , SizeBytes(counters->GetCounter("PatternCache/SizeBytes", false))
  174. , SizeCompiledBytes(counters->GetCounter("PatternCache/SizeCompiledBytes", false))
  175. , MaxSizeBytesCounter(counters->GetCounter("PatternCache/MaxSizeBytes", false))
  176. , MaxCompiledSizeBytesCounter(counters->GetCounter("PatternCache/MaxCompiledSizeBytes", false))
  177. {
  178. *MaxSizeBytesCounter = Configuration.MaxSizeBytes;
  179. *MaxCompiledSizeBytesCounter = Configuration.MaxCompiledSizeBytes;
  180. }
  181. TComputationPatternLRUCache::~TComputationPatternLRUCache() {
  182. CleanCache();
  183. }
  184. std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serializedProgram) {
  185. std::lock_guard<std::mutex> lock(Mutex);
  186. if (auto it = Cache->Find(serializedProgram)) {
  187. ++*Hits;
  188. if ((*it)->Pattern->IsCompiled())
  189. ++*HitsCompiled;
  190. return *it;
  191. }
  192. ++*Misses;
  193. return {};
  194. }
  195. TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) {
  196. std::lock_guard lock(Mutex);
  197. if (auto it = Cache->Find(serializedProgram)) {
  198. ++*Hits;
  199. AccessPattern(serializedProgram, *it);
  200. return TTicket(serializedProgram, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr);
  201. }
  202. auto [notifyIt, isNew] = Notify.emplace(serializedProgram, Nothing());
  203. if (isNew) {
  204. ++*Misses;
  205. return TTicket(serializedProgram, true, {}, this);
  206. }
  207. ++*Waits;
  208. auto promise = NThreading::NewPromise<std::shared_ptr<TPatternCacheEntry>>();
  209. auto& subscribers = notifyIt->second;
  210. if (!subscribers) {
  211. subscribers.ConstructInPlace();
  212. }
  213. subscribers->push_back(promise);
  214. return TTicket(serializedProgram, false, promise, nullptr);
  215. }
  216. void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
  217. Y_DEBUG_ABORT_UNLESS(patternWithEnv && patternWithEnv->Pattern);
  218. TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
  219. {
  220. std::lock_guard<std::mutex> lock(Mutex);
  221. Cache->Insert(serializedProgram, patternWithEnv);
  222. auto notifyIt = Notify.find(serializedProgram);
  223. if (notifyIt != Notify.end()) {
  224. subscribers.swap(notifyIt->second);
  225. Notify.erase(notifyIt);
  226. }
  227. *SizeItems = Cache->PatternsSize();
  228. *SizeBytes = Cache->PatternsSizeInBytes();
  229. *SizeCompiledItems = Cache->CompiledPatternsSize();
  230. *SizeCompiledBytes = Cache->PatternsCompiledCodeSizeInBytes();
  231. }
  232. if (subscribers) {
  233. for (auto& subscriber : *subscribers) {
  234. subscriber.SetValue(patternWithEnv);
  235. }
  236. }
  237. }
  238. void TComputationPatternLRUCache::NotifyPatternCompiled(const TString& serializedProgram) {
  239. std::lock_guard lock(Mutex);
  240. Cache->NotifyPatternCompiled(serializedProgram);
  241. }
  242. size_t TComputationPatternLRUCache::GetSize() const {
  243. std::lock_guard lock(Mutex);
  244. return Cache->PatternsSize();
  245. }
  246. void TComputationPatternLRUCache::CleanCache() {
  247. *SizeItems = 0;
  248. *SizeBytes = 0;
  249. *MaxSizeBytesCounter = 0;
  250. std::lock_guard lock(Mutex);
  251. PatternsToCompile.clear();
  252. Cache->Clear();
  253. }
  254. void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) {
  255. if (!Configuration.PatternAccessTimesBeforeTryToCompile || entry->Pattern->IsCompiled()) {
  256. return;
  257. }
  258. size_t PatternAccessTimes = entry->AccessTimes.fetch_add(1) + 1;
  259. if (PatternAccessTimes == *Configuration.PatternAccessTimesBeforeTryToCompile ||
  260. (*Configuration.PatternAccessTimesBeforeTryToCompile == 0 && PatternAccessTimes == 1)) {
  261. PatternsToCompile.emplace(serializedProgram, entry);
  262. }
  263. }
  264. void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) {
  265. TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
  266. {
  267. std::lock_guard<std::mutex> lock(Mutex);
  268. auto notifyIt = Notify.find(serialized);
  269. if (notifyIt != Notify.end()) {
  270. subscribers.swap(notifyIt->second);
  271. Notify.erase(notifyIt);
  272. }
  273. }
  274. if (subscribers) {
  275. for (auto& subscriber : *subscribers) {
  276. subscriber.SetValue(nullptr);
  277. }
  278. }
  279. }
  280. } // namespace NKikimr::NMiniKQL