#pragma once #include "mkql_computation_node.h" #include #include #include #include namespace NKikimr::NMiniKQL { struct TPatternCacheEntry { TScopedAlloc Alloc; TTypeEnvironment Env; bool UseAlloc; TRuntimeNode ProgramNode; ui32 ProgramInputsCount; TRuntimeNode ProgramParams; TVector InputItemTypesRaw; TVector InputItemTypes; TVector OutputItemTypesRaw; TVector OutputItemTypes; TVector EntryPoints; // last entry node stands for parameters TStructType* ParamsStruct; IComputationPattern::TPtr Pattern; size_t SizeForCache = 0; // set only by cache to lock the size, which can slightly vary when pattern is used std::atomic AccessTimes = 0; // set only by cache std::atomic IsInCache = false; // set only by cache void UpdateSizeForCache() { Y_DEBUG_ABORT_UNLESS(!SizeForCache); SizeForCache = Alloc.GetAllocated(); } TPatternCacheEntry(bool useAlloc = true) : Alloc(__LOCATION__) , Env(Alloc) , UseAlloc(useAlloc) { // Release Alloc since it was implicitly acquired in Alloc's ctor Alloc.Release(); } ~TPatternCacheEntry() { if (UseAlloc) { // If alloc was used it should be acquired so dtors of all member fields will use it to free memory // Release of Alloc will be called implicitly in Alloc's dtor Alloc.Acquire(); } } }; class TComputationPatternLRUCache { public: class TTicket : private TNonCopyable { public: TTicket(const TString& serialized, bool isOwned, const NThreading::TFuture>& future, TComputationPatternLRUCache* cache) : Serialized(serialized) , IsOwned(isOwned) , Future(future) , Cache(cache) {} ~TTicket() { if (Cache) { Cache->NotifyMissing(Serialized); } } bool HasFuture() const { return !IsOwned; } std::shared_ptr GetValueSync() const { Y_ABORT_UNLESS(HasFuture()); return Future.GetValueSync(); } void Close() { Cache = nullptr; } private: const TString Serialized; const bool IsOwned; const NThreading::TFuture> Future; TComputationPatternLRUCache* Cache; }; struct Config { Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes) : MaxSizeBytes(maxSizeBytes) , MaxCompiledSizeBytes(maxCompiledSizeBytes) {} Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes, size_t patternAccessTimesBeforeTryToCompile) : MaxSizeBytes(maxSizeBytes) , MaxCompiledSizeBytes(maxCompiledSizeBytes) , PatternAccessTimesBeforeTryToCompile(patternAccessTimesBeforeTryToCompile) {} const size_t MaxSizeBytes; const size_t MaxCompiledSizeBytes; const std::optional PatternAccessTimesBeforeTryToCompile; bool operator==(const Config & rhs) { return std::tie(MaxSizeBytes, MaxCompiledSizeBytes, PatternAccessTimesBeforeTryToCompile) == std::tie(rhs.MaxSizeBytes, rhs.MaxCompiledSizeBytes, rhs.PatternAccessTimesBeforeTryToCompile); } bool operator!=(const Config & rhs) { return !(*this == rhs); } }; TComputationPatternLRUCache(const Config& configuration, NMonitoring::TDynamicCounterPtr counters = MakeIntrusive()); ~TComputationPatternLRUCache(); static std::shared_ptr CreateCacheEntry(bool useAlloc = true) { return std::make_shared(useAlloc); } std::shared_ptr Find(const TString& serializedProgram); TTicket FindOrSubscribe(const TString& serializedProgram); void EmplacePattern(const TString& serializedProgram, std::shared_ptr patternWithEnv); void NotifyPatternCompiled(const TString& serializedProgram); size_t GetSize() const; void CleanCache(); Config GetConfiguration() const { std::lock_guard lock(Mutex); return Configuration; } size_t GetMaxSizeBytes() const { std::lock_guard lock(Mutex); return Configuration.MaxSizeBytes; } i64 GetCacheHits() const { return *Hits; } void IncNotSuitablePattern() { ++*NotSuitablePattern; } size_t GetPatternsToCompileSize() const { std::lock_guard lock(Mutex); return PatternsToCompile.size(); } void GetPatternsToCompile(THashMap> & result) { std::lock_guard lock(Mutex); result.swap(PatternsToCompile); } private: void AccessPattern(const TString & serializedProgram, std::shared_ptr & entry); void NotifyMissing(const TString& serialized); static constexpr size_t CacheMaxElementsSize = 10000; friend class TTicket; mutable std::mutex Mutex; THashMap>>>> Notify; class TLRUPatternCacheImpl; std::unique_ptr Cache; THashMap> PatternsToCompile; const Config Configuration; NMonitoring::TDynamicCounters::TCounterPtr Hits; NMonitoring::TDynamicCounters::TCounterPtr HitsCompiled; NMonitoring::TDynamicCounters::TCounterPtr Waits; NMonitoring::TDynamicCounters::TCounterPtr Misses; NMonitoring::TDynamicCounters::TCounterPtr NotSuitablePattern; NMonitoring::TDynamicCounters::TCounterPtr SizeItems; NMonitoring::TDynamicCounters::TCounterPtr SizeCompiledItems; NMonitoring::TDynamicCounters::TCounterPtr SizeBytes; NMonitoring::TDynamicCounters::TCounterPtr SizeCompiledBytes; NMonitoring::TDynamicCounters::TCounterPtr MaxSizeBytesCounter; NMonitoring::TDynamicCounters::TCounterPtr MaxCompiledSizeBytesCounter; }; } // namespace NKikimr::NMiniKQL