mkql_computation_pattern_cache.h 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #pragma once
  2. #include "mkql_computation_node.h"
  3. #include <yql/essentials/minikql/mkql_node.h>
  4. #include <library/cpp/threading/future/future.h>
  5. #include <memory>
  6. #include <mutex>
  7. namespace NKikimr::NMiniKQL {
  8. struct TPatternCacheEntry {
  9. TScopedAlloc Alloc;
  10. TTypeEnvironment Env;
  11. bool UseAlloc;
  12. TRuntimeNode ProgramNode;
  13. ui32 ProgramInputsCount;
  14. TRuntimeNode ProgramParams;
  15. TVector<TString> InputItemTypesRaw;
  16. TVector<TType*> InputItemTypes;
  17. TVector<TString> OutputItemTypesRaw;
  18. TVector<TType*> OutputItemTypes;
  19. TVector<TNode*> EntryPoints; // last entry node stands for parameters
  20. TStructType* ParamsStruct;
  21. IComputationPattern::TPtr Pattern;
  22. size_t SizeForCache = 0; // set only by cache to lock the size, which can slightly vary when pattern is used
  23. std::atomic<size_t> AccessTimes = 0; // set only by cache
  24. std::atomic<bool> IsInCache = false; // set only by cache
  25. void UpdateSizeForCache() {
  26. Y_DEBUG_ABORT_UNLESS(!SizeForCache);
  27. SizeForCache = Alloc.GetAllocated();
  28. }
  29. TPatternCacheEntry(bool useAlloc = true)
  30. : Alloc(__LOCATION__)
  31. , Env(Alloc)
  32. , UseAlloc(useAlloc)
  33. {
  34. // Release Alloc since it was implicitly acquired in Alloc's ctor
  35. Alloc.Release();
  36. }
  37. ~TPatternCacheEntry() {
  38. if (UseAlloc) {
  39. // If alloc was used it should be acquired so dtors of all member fields will use it to free memory
  40. // Release of Alloc will be called implicitly in Alloc's dtor
  41. Alloc.Acquire();
  42. }
  43. }
  44. };
  45. using TPatternCacheEntryPtr = std::shared_ptr<TPatternCacheEntry>;
  46. using TPatternCacheEntryFuture = NThreading::TFuture<TPatternCacheEntryPtr>;
  47. class TComputationPatternLRUCache {
  48. public:
  49. struct Config {
  50. Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes)
  51. : MaxSizeBytes(maxSizeBytes)
  52. , MaxCompiledSizeBytes(maxCompiledSizeBytes)
  53. {}
  54. Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes, size_t patternAccessTimesBeforeTryToCompile)
  55. : MaxSizeBytes(maxSizeBytes)
  56. , MaxCompiledSizeBytes(maxCompiledSizeBytes)
  57. , PatternAccessTimesBeforeTryToCompile(patternAccessTimesBeforeTryToCompile)
  58. {}
  59. const size_t MaxSizeBytes;
  60. const size_t MaxCompiledSizeBytes;
  61. const std::optional<size_t> PatternAccessTimesBeforeTryToCompile;
  62. bool operator==(const Config & rhs) {
  63. return std::tie(MaxSizeBytes, MaxCompiledSizeBytes, PatternAccessTimesBeforeTryToCompile) ==
  64. std::tie(rhs.MaxSizeBytes, rhs.MaxCompiledSizeBytes, rhs.PatternAccessTimesBeforeTryToCompile);
  65. }
  66. bool operator!=(const Config & rhs) {
  67. return !(*this == rhs);
  68. }
  69. };
  70. TComputationPatternLRUCache(const Config& configuration, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>());
  71. ~TComputationPatternLRUCache();
  72. static TPatternCacheEntryPtr CreateCacheEntry(bool useAlloc = true) {
  73. return std::make_shared<TPatternCacheEntry>(useAlloc);
  74. }
  75. TPatternCacheEntryPtr Find(const TString& serializedProgram);
  76. TPatternCacheEntryFuture FindOrSubscribe(const TString& serializedProgram);
  77. void EmplacePattern(const TString& serializedProgram, TPatternCacheEntryPtr& patternWithEnv);
  78. void NotifyPatternCompiled(const TString& serializedProgram);
  79. void NotifyPatternMissing(const TString& serializedProgram);
  80. size_t GetSize() const;
  81. void CleanCache();
  82. Config GetConfiguration() const {
  83. std::lock_guard lock(Mutex);
  84. return Configuration;
  85. }
  86. size_t GetMaxSizeBytes() const {
  87. std::lock_guard lock(Mutex);
  88. return Configuration.MaxSizeBytes;
  89. }
  90. i64 GetCacheHits() const {
  91. return *Hits;
  92. }
  93. void IncNotSuitablePattern() {
  94. ++*NotSuitablePattern;
  95. }
  96. size_t GetPatternsToCompileSize() const {
  97. std::lock_guard lock(Mutex);
  98. return PatternsToCompile.size();
  99. }
  100. void GetPatternsToCompile(THashMap<TString, TPatternCacheEntryPtr> & result) {
  101. std::lock_guard lock(Mutex);
  102. result.swap(PatternsToCompile);
  103. }
  104. private:
  105. class TLRUPatternCacheImpl;
  106. static constexpr size_t CacheMaxElementsSize = 10000;
  107. void AccessPattern(const TString& serializedProgram, TPatternCacheEntryPtr entry);
  108. mutable std::mutex Mutex;
  109. THashMap<TString, TVector<NThreading::TPromise<TPatternCacheEntryPtr>>> Notify; // protected by Mutex
  110. std::unique_ptr<TLRUPatternCacheImpl> Cache; // protected by Mutex
  111. THashMap<TString, TPatternCacheEntryPtr> PatternsToCompile; // protected by Mutex
  112. const Config Configuration;
  113. NMonitoring::TDynamicCounters::TCounterPtr Hits;
  114. NMonitoring::TDynamicCounters::TCounterPtr HitsCompiled;
  115. NMonitoring::TDynamicCounters::TCounterPtr Waits;
  116. NMonitoring::TDynamicCounters::TCounterPtr Misses;
  117. NMonitoring::TDynamicCounters::TCounterPtr NotSuitablePattern;
  118. NMonitoring::TDynamicCounters::TCounterPtr SizeItems;
  119. NMonitoring::TDynamicCounters::TCounterPtr SizeCompiledItems;
  120. NMonitoring::TDynamicCounters::TCounterPtr SizeBytes;
  121. NMonitoring::TDynamicCounters::TCounterPtr SizeCompiledBytes;
  122. NMonitoring::TDynamicCounters::TCounterPtr MaxSizeBytesCounter;
  123. NMonitoring::TDynamicCounters::TCounterPtr MaxCompiledSizeBytesCounter;
  124. };
  125. } // namespace NKikimr::NMiniKQL