mkql_computation_pattern_cache.h 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. #pragma once
  2. #include "mkql_computation_node.h"
  3. #include <yql/essentials/minikql/mkql_node.h>
  4. #include <library/cpp/threading/future/future.h>
  5. #include <memory>
  6. #include <mutex>
  7. namespace NKikimr::NMiniKQL {
  8. struct TPatternCacheEntry {
  9. TScopedAlloc Alloc;
  10. TTypeEnvironment Env;
  11. bool UseAlloc;
  12. TRuntimeNode ProgramNode;
  13. ui32 ProgramInputsCount;
  14. TRuntimeNode ProgramParams;
  15. TVector<TString> InputItemTypesRaw;
  16. TVector<TType*> InputItemTypes;
  17. TVector<TString> OutputItemTypesRaw;
  18. TVector<TType*> OutputItemTypes;
  19. TVector<TNode*> EntryPoints; // last entry node stands for parameters
  20. TStructType* ParamsStruct;
  21. IComputationPattern::TPtr Pattern;
  22. size_t SizeForCache = 0; // set only by cache to lock the size, which can slightly vary when pattern is used
  23. std::atomic<size_t> AccessTimes = 0; // set only by cache
  24. std::atomic<bool> IsInCache = false; // set only by cache
  25. void UpdateSizeForCache() {
  26. Y_DEBUG_ABORT_UNLESS(!SizeForCache);
  27. SizeForCache = Alloc.GetAllocated();
  28. }
  29. TPatternCacheEntry(bool useAlloc = true)
  30. : Alloc(__LOCATION__)
  31. , Env(Alloc)
  32. , UseAlloc(useAlloc)
  33. {
  34. // Release Alloc since it was implicitly acquired in Alloc's ctor
  35. Alloc.Release();
  36. }
  37. ~TPatternCacheEntry() {
  38. if (UseAlloc) {
  39. // If alloc was used it should be acquired so dtors of all member fields will use it to free memory
  40. // Release of Alloc will be called implicitly in Alloc's dtor
  41. Alloc.Acquire();
  42. }
  43. }
  44. };
  45. class TComputationPatternLRUCache {
  46. public:
  47. class TTicket : private TNonCopyable {
  48. public:
  49. TTicket(const TString& serialized, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache)
  50. : Serialized(serialized)
  51. , IsOwned(isOwned)
  52. , Future(future)
  53. , Cache(cache)
  54. {}
  55. ~TTicket() {
  56. if (Cache) {
  57. Cache->NotifyMissing(Serialized);
  58. }
  59. }
  60. bool HasFuture() const {
  61. return !IsOwned;
  62. }
  63. std::shared_ptr<TPatternCacheEntry> GetValueSync() const {
  64. Y_ABORT_UNLESS(HasFuture());
  65. return Future.GetValueSync();
  66. }
  67. void Close() {
  68. Cache = nullptr;
  69. }
  70. private:
  71. const TString Serialized;
  72. const bool IsOwned;
  73. const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>> Future;
  74. TComputationPatternLRUCache* Cache;
  75. };
  76. struct Config {
  77. Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes)
  78. : MaxSizeBytes(maxSizeBytes)
  79. , MaxCompiledSizeBytes(maxCompiledSizeBytes)
  80. {}
  81. Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes, size_t patternAccessTimesBeforeTryToCompile)
  82. : MaxSizeBytes(maxSizeBytes)
  83. , MaxCompiledSizeBytes(maxCompiledSizeBytes)
  84. , PatternAccessTimesBeforeTryToCompile(patternAccessTimesBeforeTryToCompile)
  85. {}
  86. const size_t MaxSizeBytes;
  87. const size_t MaxCompiledSizeBytes;
  88. const std::optional<size_t> PatternAccessTimesBeforeTryToCompile;
  89. bool operator==(const Config & rhs) {
  90. return std::tie(MaxSizeBytes, MaxCompiledSizeBytes, PatternAccessTimesBeforeTryToCompile) ==
  91. std::tie(rhs.MaxSizeBytes, rhs.MaxCompiledSizeBytes, rhs.PatternAccessTimesBeforeTryToCompile);
  92. }
  93. bool operator!=(const Config & rhs) {
  94. return !(*this == rhs);
  95. }
  96. };
  97. TComputationPatternLRUCache(const Config& configuration, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>());
  98. ~TComputationPatternLRUCache();
  99. static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry(bool useAlloc = true) {
  100. return std::make_shared<TPatternCacheEntry>(useAlloc);
  101. }
  102. std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram);
  103. TTicket FindOrSubscribe(const TString& serializedProgram);
  104. void EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
  105. void NotifyPatternCompiled(const TString& serializedProgram);
  106. size_t GetSize() const;
  107. void CleanCache();
  108. Config GetConfiguration() const {
  109. std::lock_guard lock(Mutex);
  110. return Configuration;
  111. }
  112. size_t GetMaxSizeBytes() const {
  113. std::lock_guard lock(Mutex);
  114. return Configuration.MaxSizeBytes;
  115. }
  116. i64 GetCacheHits() const {
  117. return *Hits;
  118. }
  119. void IncNotSuitablePattern() {
  120. ++*NotSuitablePattern;
  121. }
  122. size_t GetPatternsToCompileSize() const {
  123. std::lock_guard lock(Mutex);
  124. return PatternsToCompile.size();
  125. }
  126. void GetPatternsToCompile(THashMap<TString, std::shared_ptr<TPatternCacheEntry>> & result) {
  127. std::lock_guard lock(Mutex);
  128. result.swap(PatternsToCompile);
  129. }
  130. private:
  131. void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);
  132. void NotifyMissing(const TString& serialized);
  133. static constexpr size_t CacheMaxElementsSize = 10000;
  134. friend class TTicket;
  135. mutable std::mutex Mutex;
  136. THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
  137. class TLRUPatternCacheImpl;
  138. std::unique_ptr<TLRUPatternCacheImpl> Cache;
  139. THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;
  140. const Config Configuration;
  141. NMonitoring::TDynamicCounters::TCounterPtr Hits;
  142. NMonitoring::TDynamicCounters::TCounterPtr HitsCompiled;
  143. NMonitoring::TDynamicCounters::TCounterPtr Waits;
  144. NMonitoring::TDynamicCounters::TCounterPtr Misses;
  145. NMonitoring::TDynamicCounters::TCounterPtr NotSuitablePattern;
  146. NMonitoring::TDynamicCounters::TCounterPtr SizeItems;
  147. NMonitoring::TDynamicCounters::TCounterPtr SizeCompiledItems;
  148. NMonitoring::TDynamicCounters::TCounterPtr SizeBytes;
  149. NMonitoring::TDynamicCounters::TCounterPtr SizeCompiledBytes;
  150. NMonitoring::TDynamicCounters::TCounterPtr MaxSizeBytesCounter;
  151. NMonitoring::TDynamicCounters::TCounterPtr MaxCompiledSizeBytesCounter;
  152. };
  153. } // namespace NKikimr::NMiniKQL