Browse Source

Queue agent cosmetics

akozhikhov 1 year ago
parent
commit
14eab774d5

+ 2 - 2
yt/yt/client/queue_client/common.h

@@ -16,8 +16,8 @@ struct TCrossClusterReference
     TString Cluster;
     NYPath::TYPath Path;
 
-    bool operator ==(const TCrossClusterReference& other) const;
-    bool operator <(const TCrossClusterReference& other) const;
+    bool operator==(const TCrossClusterReference& other) const;
+    bool operator<(const TCrossClusterReference& other) const;
 
     operator NYPath::TRichYPath() const;
 

+ 5 - 2
yt/yt/client/queue_client/config.cpp

@@ -37,7 +37,9 @@ void TQueueAutoTrimConfig::Register(TRegistrar registrar)
         .Default();
 
     registrar.Postprocessor([] (TThis* trimConfig) {
-        if (trimConfig->RetainedLifetimeDuration && trimConfig->RetainedLifetimeDuration->GetValue() % TDuration::Seconds(1).GetValue() != 0) {
+        if (trimConfig->RetainedLifetimeDuration &&
+            trimConfig->RetainedLifetimeDuration->GetValue() % TDuration::Seconds(1).GetValue() != 0)
+        {
             THROW_ERROR_EXCEPTION("The value of \"retained_lifetime_duration\" must be a multiple of 1000 (1 second)");
         }
     });
@@ -45,7 +47,8 @@ void TQueueAutoTrimConfig::Register(TRegistrar registrar)
 
 bool operator==(const TQueueAutoTrimConfig& lhs, const TQueueAutoTrimConfig& rhs)
 {
-    return std::tie(lhs.Enable, lhs.RetainedRows, lhs.RetainedLifetimeDuration) == std::tie(rhs.Enable, rhs.RetainedRows, rhs.RetainedLifetimeDuration);
+    return std::tie(lhs.Enable, lhs.RetainedRows, lhs.RetainedLifetimeDuration) ==
+        std::tie(rhs.Enable, rhs.RetainedRows, rhs.RetainedLifetimeDuration);
 }
 
 ////////////////////////////////////////////////////////////////////////////////

+ 0 - 1
yt/yt/client/queue_client/config.h

@@ -34,7 +34,6 @@ DEFINE_REFCOUNTED_TYPE(TPartitionReaderConfig)
 
 ////////////////////////////////////////////////////////////////////////////////
 
-
 //! Automatic trimming configuration for a single queue.
 //!
 //! All rows up to the smallest offset among vital consumers are considered trimmable.

+ 2 - 8
yt/yt/client/queue_client/consumer_client.cpp

@@ -73,10 +73,6 @@ public:
         , OffsetColumnId_(NameTable_->GetId(OffsetColumnName_))
         , SubConsumerColumnFilter_{PartitionIndexColumnId_, OffsetColumnId_}
         , DecrementOffset_(decrementOffset)
-        , SubConsumerSchema_(New<TTableSchema>(std::vector<TColumnSchema>{
-            TColumnSchema(TString(PartitionIndexColumnName_), EValueType::Uint64, ESortOrder::Ascending),
-            TColumnSchema(TString(OffsetColumnName_), EValueType::Uint64),
-        }, /*strict*/ true, /*uniqueKeys*/ true))
     {
         if (RowPrefix_.GetCount() == 0) {
             RowPrefixCondition_ = "1 = 1";
@@ -338,8 +334,6 @@ private:
     //! should be set to true.
     bool DecrementOffset_ = false;
 
-    TTableSchemaPtr SubConsumerSchema_;
-
     std::vector<TPartitionInfo> DoCollectPartitions(
         const IClientPtr& client,
         const TString& selectQuery,
@@ -370,7 +364,6 @@ private:
 
         std::vector<ui64> partitionIndices;
         for (auto row : selectRowsResult.Rowset->GetRows()) {
-
             YT_VERIFY(row.GetCount() == 2);
 
             const auto& partitionIndexValue = row[*partitionIndexRowsetColumnId];
@@ -514,7 +507,8 @@ public:
     }
 
 private:
-    TYPath Path_;
+    const TYPath Path_;
+
     static const TNameTablePtr NameTable_;
     static const int QueueClusterColumnId_;
     static const int QueuePathColumnId_;

+ 14 - 9
yt/yt/client/queue_client/partition_reader.cpp

@@ -59,8 +59,9 @@ public:
 private:
     const TPartitionReaderConfigPtr Config_;
     const IClientPtr Client_;
-    TYPath ConsumerPath_;
-    int PartitionIndex_;
+    const TYPath ConsumerPath_;
+    const int PartitionIndex_;
+
     NLogging::TLogger Logger;
 
     bool Opened_ = false;
@@ -127,9 +128,9 @@ private:
         }
 
     private:
-        IQueueRowsetPtr Rowset_;
-        TWeakPtr<TPartitionReader> PartitionReader_;
-        i64 CurrentOffset_;
+        const IQueueRowsetPtr Rowset_;
+        const TWeakPtr<TPartitionReader> PartitionReader_;
+        const i64 CurrentOffset_;
     };
 
     IPersistentQueueRowsetPtr DoRead()
@@ -299,6 +300,7 @@ private:
     const int PartitionIndex_;
     const TQueueRowBatchReadOptions RowBatchReadOptions_;
     const NLogging::TLogger Logger;
+
     TPullConsumerOptions PullConsumerOptions_;
 
     ISubConsumerClientPtr ConsumerClient_;
@@ -308,7 +310,10 @@ private:
         : public IPersistentQueueRowset
     {
     public:
-        TPersistentQueueRowset(IQueueRowsetPtr rowset, TWeakPtr<TMultiQueueConsumerPartitionReader> partitionReader, i64 currentOffset)
+        TPersistentQueueRowset(
+            IQueueRowsetPtr rowset,
+            TWeakPtr<TMultiQueueConsumerPartitionReader> partitionReader,
+            i64 currentOffset)
             : Rowset_(std::move(rowset))
             , PartitionReader_(std::move(partitionReader))
             , CurrentOffset_(currentOffset)
@@ -361,9 +366,9 @@ private:
         }
 
     private:
-        IQueueRowsetPtr Rowset_;
-        TWeakPtr<TMultiQueueConsumerPartitionReader> PartitionReader_;
-        i64 CurrentOffset_;
+        const IQueueRowsetPtr Rowset_;
+        const TWeakPtr<TMultiQueueConsumerPartitionReader> PartitionReader_;
+        const i64 CurrentOffset_;
     };
 
     IPersistentQueueRowsetPtr DoRead()