Browse Source

KQP computation pattern cache serialized program (#634)

KQP computation pattern cache serialized program
Maksim Kita 1 year ago
parent
commit
d4ee658c51

+ 2 - 2
ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp

@@ -84,7 +84,7 @@ private:
             return;
         }
 
-        THashMap<TString, std::shared_ptr<NMiniKQL::TPatternCacheEntry>> patternsToCompile;
+        THashMap<NMiniKQL::TSerializedProgram, std::shared_ptr<NMiniKQL::TPatternCacheEntry>> patternsToCompile;
         patternCache->GetPatternsToCompile(patternsToCompile);
 
         TVector<std::pair<TPatternToCompile, size_t>> patternsToCompileWithAccessTimes;
@@ -115,7 +115,7 @@ private:
     TIntrusivePtr<TKqpCounters> Counters;
 
     struct TPatternToCompile {
-        TString SerializedProgram;
+        NMiniKQL::TSerializedProgram SerializedProgram;
         std::shared_ptr<NMiniKQL::TPatternCacheEntry> Entry;
     };
 

+ 13 - 13
ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp

@@ -33,7 +33,7 @@ public:
         return CurrentPatternsCompiledCodeSizeInBytes;
     }
 
-    std::shared_ptr<TPatternCacheEntry>* Find(const TString& serializedProgram) {
+    std::shared_ptr<TPatternCacheEntry>* Find(const TSerializedProgram& serializedProgram) {
         auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
         if (it == SerializedProgramToPatternCacheHolder.end()) {
             return nullptr;
@@ -44,7 +44,7 @@ public:
         return &it->second.Entry;
     }
 
-    void Insert(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
+    void Insert(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
         auto [it, inserted] = SerializedProgramToPatternCacheHolder.emplace(std::piecewise_construct,
             std::forward_as_tuple(serializedProgram),
             std::forward_as_tuple(serializedProgram, entry));
@@ -69,7 +69,7 @@ public:
         ClearIfNeeded();
     }
 
-    void NotifyPatternCompiled(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
+    void NotifyPatternCompiled(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
         auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
         if (it == SerializedProgramToPatternCacheHolder.end()) {
             return;
@@ -108,7 +108,7 @@ private:
       * Most recently accessed items are in back of the lists, least recently accessed items are in front of the lists.
       */
     struct TPatternCacheHolder : public TIntrusiveListItem<TPatternCacheHolder, TPatternLRUListTag>, TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag> {
-        TPatternCacheHolder(TString serializedProgram, std::shared_ptr<TPatternCacheEntry> entry)
+        TPatternCacheHolder(TSerializedProgram serializedProgram, std::shared_ptr<TPatternCacheEntry> entry)
             : SerializedProgram(std::move(serializedProgram))
             , Entry(std::move(entry))
         {}
@@ -121,7 +121,7 @@ private:
             return !TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag>::Empty();
         }
 
-        TString SerializedProgram;
+        TSerializedProgram SerializedProgram;
         std::shared_ptr<TPatternCacheEntry> Entry;
     };
 
@@ -195,7 +195,7 @@ private:
     size_t CurrentCompiledPatternsSize = 0;
     size_t CurrentPatternsCompiledCodeSizeInBytes = 0;
 
-    THashMap<TString, TPatternCacheHolder> SerializedProgramToPatternCacheHolder;
+    THashMap<TSerializedProgram, TPatternCacheHolder> SerializedProgramToPatternCacheHolder;
     TIntrusiveList<TPatternCacheHolder, TPatternLRUListTag> LRUPatternList;
     TIntrusiveList<TPatternCacheHolder, TCompiledPatternLRUListTag> LRUCompiledPatternList;
 };
@@ -223,7 +223,7 @@ TComputationPatternLRUCache::~TComputationPatternLRUCache() {
     CleanCache();
 }
 
-std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serializedProgram) {
+std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TSerializedProgram& serializedProgram) {
     std::lock_guard<std::mutex> lock(Mutex);
     if (auto it = Cache->Find(serializedProgram)) {
         ++*Hits;
@@ -238,7 +238,7 @@ std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TStr
     return {};
 }
 
-TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) {
+TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TSerializedProgram& serializedProgram) {
     std::lock_guard lock(Mutex);
     if (auto it = Cache->Find(serializedProgram)) {
         ++*Hits;
@@ -263,7 +263,7 @@ TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscrib
     return TTicket(serializedProgram, false, promise, nullptr);
 }
 
-void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
+void TComputationPatternLRUCache::EmplacePattern(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
     Y_DEBUG_ABORT_UNLESS(patternWithEnv && patternWithEnv->Pattern);
     TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
 
@@ -290,7 +290,7 @@ void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgra
     }
 }
 
-void TComputationPatternLRUCache::NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
+void TComputationPatternLRUCache::NotifyPatternCompiled(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
     std::lock_guard lock(Mutex);
     Cache->NotifyPatternCompiled(serializedProgram, patternWithEnv);
 }
@@ -309,7 +309,7 @@ void TComputationPatternLRUCache::CleanCache() {
     Cache->Clear();
 }
 
-void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) {
+void TComputationPatternLRUCache::AccessPattern(const TSerializedProgram & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) {
     if (!Configuration.PatternAccessTimesBeforeTryToCompile || entry->Pattern->IsCompiled()) {
         return;
     }
@@ -321,11 +321,11 @@ void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgra
     }
 }
 
-void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) {
+void TComputationPatternLRUCache::NotifyMissing(const TSerializedProgram& serializedProgram) {
     TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
     {
         std::lock_guard<std::mutex> lock(Mutex);
-        auto notifyIt = Notify.find(serialized);
+        auto notifyIt = Notify.find(serializedProgram);
         if (notifyIt != Notify.end()) {
             subscribers.swap(notifyIt->second);
             Notify.erase(notifyIt);

+ 14 - 13
ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h

@@ -1,6 +1,7 @@
 #pragma once
 
 #include "mkql_computation_node.h"
+#include "mkql_serialized_program.h"
 
 #include <ydb/library/yql/minikql/mkql_node.h>
 #include <library/cpp/threading/future/future.h>
@@ -57,8 +58,8 @@ class TComputationPatternLRUCache {
 public:
     class TTicket : private TNonCopyable {
     public:
-        TTicket(const TString& serialized, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache)
-            : Serialized(serialized)
+        TTicket(const TSerializedProgram& serializedProgram, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache)
+            : SerializedProgram(serializedProgram)
             , IsOwned(isOwned)
             , Future(future)
             , Cache(cache)
@@ -66,7 +67,7 @@ public:
 
         ~TTicket() {
             if (Cache) {
-                Cache->NotifyMissing(Serialized);
+                Cache->NotifyMissing(SerializedProgram);
             }
         }
 
@@ -84,7 +85,7 @@ public:
         }
 
     private:
-        const TString Serialized;
+        const TSerializedProgram SerializedProgram;
         const bool IsOwned;
         const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>> Future;
         TComputationPatternLRUCache* Cache;
@@ -124,13 +125,13 @@ public:
         return std::make_shared<TPatternCacheEntry>(useAlloc);
     }
 
-    std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram);
+    std::shared_ptr<TPatternCacheEntry> Find(const TSerializedProgram& serializedProgram);
 
-    TTicket FindOrSubscribe(const TString& serializedProgram);
+    TTicket FindOrSubscribe(const TSerializedProgram& serializedProgram);
 
-    void EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
+    void EmplacePattern(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
 
-    void NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
+    void NotifyPatternCompiled(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
 
     size_t GetSize() const;
 
@@ -159,27 +160,27 @@ public:
         return PatternsToCompile.size();
     }
 
-    void GetPatternsToCompile(THashMap<TString, std::shared_ptr<TPatternCacheEntry>> & result) {
+    void GetPatternsToCompile(THashMap<TSerializedProgram, std::shared_ptr<TPatternCacheEntry>> & result) {
         std::lock_guard lock(Mutex);
         result.swap(PatternsToCompile);
     }
 
 private:
-    void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);
+    void AccessPattern(const TSerializedProgram & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);
 
-    void NotifyMissing(const TString& serialized);
+    void NotifyMissing(const TSerializedProgram& serializedProgram);
 
     static constexpr size_t CacheMaxElementsSize = 10000;
 
     friend class TTicket;
 
     mutable std::mutex Mutex;
-    THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
+    THashMap<TSerializedProgram, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
 
     class TLRUPatternCacheImpl;
     std::unique_ptr<TLRUPatternCacheImpl> Cache;
 
-    THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;
+    THashMap<TSerializedProgram, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;
 
     const Config Configuration;
 

+ 49 - 0
ydb/library/yql/minikql/computation/mkql_serialized_program.h

@@ -0,0 +1,49 @@
+#pragma once
+
+#include <util/generic/string.h>
+#include <util/generic/hash.h>
+
+namespace NKikimr::NMiniKQL {
+
+/// Serialized program with precomputed hash
+class TSerializedProgram
+{
+public:
+    TSerializedProgram(TString data)
+        : Data(std::move(data))
+        , Hash(THash<TString>()(data))
+    {}
+
+    const TString & GetData() const
+    {
+        return Data;
+    }
+
+    uint64_t GetHash() const
+    {
+        return Hash;
+    }
+
+    friend bool operator==(const TSerializedProgram & lhs, const TSerializedProgram & rhs)
+    {
+        return lhs.Hash == rhs.Hash && lhs.Data == rhs.Data;
+    }
+
+    friend bool operator!=(const TSerializedProgram & lhs, const TSerializedProgram & rhs)
+    {
+        return !(lhs == rhs);
+    }
+
+private:
+    TString Data;
+    ui64 Hash;
+};
+
+}
+
+template<>
+struct THash<NKikimr::NMiniKQL::TSerializedProgram> {
+    inline ui64 operator()(const NKikimr::NMiniKQL::TSerializedProgram& serializedProgram) const noexcept {
+        return serializedProgram.GetHash();
+    }
+};