Browse Source

fix use after free part 1

gvit 1 year ago
parent
commit
995eb3873b

+ 3 - 3
ydb/core/kqp/executer_actor/kqp_executer_impl.h

@@ -991,9 +991,9 @@ protected:
         IActor* proxy;
         if (txResult.IsStream) {
             proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType,
-                txResult.ColumnOrder, txResult.QueryResultIndex, Target, Stats.get(), this->SelfId());
+                txResult.ColumnOrder, txResult.QueryResultIndex, Target, Stats, this->SelfId());
         } else {
-            proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats.get(), this->SelfId(),
+            proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats, this->SelfId(),
                 channel.DstInputIndex, ResponseEv.get());
         }
 
@@ -1070,7 +1070,7 @@ protected:
     const TString Database;
     const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
     TKqpRequestCounters::TPtr Counters;
-    std::unique_ptr<TQueryExecutionStats> Stats;
+    std::shared_ptr<TQueryExecutionStats> Stats;
     TInstant StartTime;
     TMaybe<TInstant> Deadline;
     TMaybe<TInstant> CancelAt;

+ 2 - 2
ydb/core/kqp/executer_actor/kqp_executer_stats.h

@@ -25,8 +25,8 @@ public:
     // basic stats
     std::unordered_set<ui64> AffectedShards;
     ui32 TotalTasks = 0;
-    ui64 ResultBytes = 0;
-    ui64 ResultRows = 0;
+    std::atomic<ui64> ResultBytes = 0;
+    std::atomic<ui64> ResultRows = 0;    
     TDuration ExecuterCpuTime;
 
     TInstant StartTs;

+ 11 - 11
ydb/core/kqp/executer_actor/kqp_result_channel.cpp

@@ -33,11 +33,11 @@ public:
         return NKikimrServices::TActivity::KQP_RESULT_CHANNEL_PROXY;
     }
 
-    TResultCommonChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats, TActorId executer)
+    TResultCommonChannelProxy(ui64 txId, ui64 channelId, std::shared_ptr<TQueryExecutionStats> stats, TActorId executer)
         : TActor(&TResultCommonChannelProxy::WorkState)
         , TxId(txId)
         , ChannelId(channelId)
-        , Stats(stats)
+        , Stats(std::move(stats))
         , Executer(executer) {}
 
 protected:
@@ -130,7 +130,7 @@ private:
 private:
     const ui64 TxId;
     const ui64 ChannelId;
-    TQueryExecutionStats* Stats; // owned by KqpExecuter
+    std::shared_ptr<TQueryExecutionStats> Stats; // owned by KqpExecuter
     const NActors::TActorId Executer;
     NActors::TActorId ComputeActor;
 };
@@ -138,9 +138,9 @@ private:
 class TResultStreamChannelProxy : public TResultCommonChannelProxy {
 public:
     TResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
-        const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, TQueryExecutionStats* stats,
+        const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, std::shared_ptr<TQueryExecutionStats> stats,
         TActorId executer)
-        : TResultCommonChannelProxy(txId, channelId, stats, executer)
+        : TResultCommonChannelProxy(txId, channelId, std::move(stats), executer)
         , ColumnOrder(columnOrder)
         , ItemType(itemType)
         , QueryResultIndex(queryResultIndex)
@@ -179,9 +179,9 @@ private:
 
 class TResultDataChannelProxy : public TResultCommonChannelProxy {
 public:
-    TResultDataChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats, TActorId executer,
+    TResultDataChannelProxy(ui64 txId, ui64 channelId, std::shared_ptr<TQueryExecutionStats> stats, TActorId executer,
         ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* resultReceiver)
-        : TResultCommonChannelProxy(txId, channelId, stats, executer)
+        : TResultCommonChannelProxy(txId, channelId, std::move(stats), executer)
         , InputIndex(inputIndex)
         , ResultReceiver(resultReceiver) {}
 
@@ -212,7 +212,7 @@ private:
 } // anonymous namespace end
 
 NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
-    const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, TQueryExecutionStats* stats,
+    const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, std::shared_ptr<TQueryExecutionStats> stats,
     TActorId executer)
 {
     LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER,
@@ -221,11 +221,11 @@ NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKiki
     );
 
     return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, queryResultIndex, target,
-        stats, executer);
+        std::move(stats), executer);
 }
 
 NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId,
-    TQueryExecutionStats* stats, TActorId executer,
+    std::shared_ptr<TQueryExecutionStats> stats, TActorId executer,
     ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* resultsReceiver)
 {
     LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER,
@@ -233,7 +233,7 @@ NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId,
         ", channelId: " << channelId
     );
 
-    return new TResultDataChannelProxy(txId, channelId, stats, executer, inputIndex, resultsReceiver);
+    return new TResultDataChannelProxy(txId, channelId, std::move(stats), executer, inputIndex, resultsReceiver);
 }
 
 } // namespace NKqp

+ 2 - 2
ydb/core/kqp/executer_actor/kqp_result_channel.h

@@ -26,10 +26,10 @@ struct TQueryExecutionStats;
 struct TKqpExecuterTxResult;
 
 NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
-    const TVector<ui32>* columnOrder, ui32 queryResultIndex, NActors::TActorId target, TQueryExecutionStats* stats,
+    const TVector<ui32>* columnOrder, ui32 queryResultIndex, NActors::TActorId target, std::shared_ptr<TQueryExecutionStats> stats,
     NActors::TActorId executer);
 
-NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats,
+NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, std::shared_ptr<TQueryExecutionStats> stats,
     NActors::TActorId executer, ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* receiver);
 
 } // namespace NKikimr::NKqp