Browse Source

Restoring authorship annotation for <nohttp@yandex-team.ru>. Commit 1 of 2.

nohttp 3 years ago
parent
commit
dbdd851418

+ 7 - 7
library/cpp/messagebus/remote_client_connection.cpp

@@ -67,7 +67,7 @@ void TRemoteClientConnection::TryConnect() {
 
     if (!WriterData.Channel) {
         if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) {
-            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
+            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); 
             return;
         }
         LastConnectAttempt = now;
@@ -77,11 +77,11 @@ void TRemoteClientConnection::TryConnect() {
     }
 
     if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) {
-        // TryConnect is called from Writer::Act, which is called in cycle
-        // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects.
-        return;
-    }
-
+        // TryConnect is called from Writer::Act, which is called in cycle 
+        // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects. 
+        return; 
+    } 
+ 
     ++WriterData.Status.ConnectSyscalls;
 
     int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len());
@@ -113,7 +113,7 @@ void TRemoteClientConnection::TryConnect() {
             WriterData.Status.Connected = false;
             WriterData.Status.ConnectError = err;
 
-            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
+            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); 
         }
     }
 }

+ 3 - 3
library/cpp/messagebus/remote_connection.cpp

@@ -572,9 +572,9 @@ namespace NBus {
             BeforeTryWrite();
 
             WriterFillInFlight();
-
+ 
             WriterGetReconnectQueue()->DequeueAllLikelyEmpty();
-
+ 
             if (!WriterData.Status.Connected) {
                 TryConnect();
             } else {
@@ -584,7 +584,7 @@ namespace NBus {
                         GetWriterActor()->AddTaskFromActorLoop();
                         break;
                     }
-
+ 
                     if (WriterData.State == WRITER_FILLING) {
                         WriterFillBuffer();
 

+ 1 - 1
library/cpp/messagebus/session.cpp

@@ -59,7 +59,7 @@ namespace NBus {
                 // '[]' and '[<address>' are errors.
                 return false;
             }
-
+ 
             *hostName = host.substr(1, pos - 1);
 
             pos++;

+ 14 - 14
library/cpp/messagebus/session_impl.cpp

@@ -211,22 +211,22 @@ void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef
     }
 }
 
-size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const {
-    TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
-    if (!!conn) {
-        return conn->GetConnectSyscallsNumForTest();
-    } else {
-        return 0;
-    }
-}
-
+size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const { 
+    TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false); 
+    if (!!conn) { 
+        return conn->GetConnectSyscallsNumForTest(); 
+    } else { 
+        return 0; 
+    } 
+} 
+ 
 void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
     Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
-    for (size_t i = 0; i < addrs.size(); ++i) {
-        results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]);
-    }
-}
-
+    for (size_t i = 0; i < addrs.size(); ++i) { 
+        results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]); 
+    } 
+} 
+ 
 void TBusSessionImpl::FillStatus() {
 }
 

+ 8 - 8
library/cpp/messagebus/test/helper/example.cpp

@@ -166,24 +166,24 @@ void TExampleClient::WaitReplies() {
     ResetCounters();
 }
 
-EMessageStatus TExampleClient::WaitForError() {
+EMessageStatus TExampleClient::WaitForError() { 
     WorkDone.WaitT(TDuration::Seconds(60));
 
     UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);
     UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));
     UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());
     UNIT_ASSERT_VALUES_EQUAL(1, Errors);
-    EMessageStatus result = LastError;
+    EMessageStatus result = LastError; 
 
     ResetCounters();
-    return result;
-}
-
-void TExampleClient::WaitForError(EMessageStatus status) {
-    EMessageStatus error = WaitForError();
-    UNIT_ASSERT_VALUES_EQUAL(status, error);
+    return result; 
 }
 
+void TExampleClient::WaitForError(EMessageStatus status) { 
+    EMessageStatus error = WaitForError(); 
+    UNIT_ASSERT_VALUES_EQUAL(status, error); 
+} 
+ 
 void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) {
     SendMessages(count, addr);
     WaitReplies();

+ 126 - 126
library/cpp/messagebus/test/ut/messagebus_ut.cpp

@@ -98,7 +98,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
         THolder<TExampleServer> server;
 
         TBusClientSessionConfig clientConfig;
-        clientConfig.RetryInterval = 0;
+        clientConfig.RetryInterval = 0; 
         TExampleClient client(clientConfig);
 
         server.Reset(new TExampleServer(port, "TExampleServer 1"));
@@ -106,25 +106,25 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
         client.SendMessagesWaitReplies(17, serverAddr);
 
         server.Destroy();
-
-        // Making the client to detect disconnection.
-        client.SendMessages(1, serverAddr);
-        EMessageStatus error = client.WaitForError();
-        if (error == MESSAGE_DELIVERY_FAILED) {
-            client.SendMessages(1, serverAddr);
-            error = client.WaitForError();
-        }
-        UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error);
-
+ 
+        // Making the client to detect disconnection. 
+        client.SendMessages(1, serverAddr); 
+        EMessageStatus error = client.WaitForError(); 
+        if (error == MESSAGE_DELIVERY_FAILED) { 
+            client.SendMessages(1, serverAddr); 
+            error = client.WaitForError(); 
+        } 
+        UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error); 
+ 
         server.Reset(new TExampleServer(port, "TExampleServer 2"));
 
         client.SendMessagesWaitReplies(19, serverAddr);
     }
 
     struct TestNoServerImplClient: public TExampleClient {
-        TTestSync TestSync;
-        int failures = 0;
-
+        TTestSync TestSync; 
+        int failures = 0; 
+ 
         template <typename... Args>
         TestNoServerImplClient(Args&&... args)
             : TExampleClient(std::forward<Args>(args)...)
@@ -132,48 +132,48 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
         }
 
         ~TestNoServerImplClient() override {
-            Session->Shutdown();
-        }
-
+            Session->Shutdown(); 
+        } 
+ 
         void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override {
             Y_UNUSED(message);
-
+ 
             Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data());
-
-            TestSync.CheckAndIncrement((failures++) * 2);
-        }
-    };
-
-    void TestNoServerImpl(unsigned port, bool oneWay) {
+ 
+            TestSync.CheckAndIncrement((failures++) * 2); 
+        } 
+    }; 
+ 
+    void TestNoServerImpl(unsigned port, bool oneWay) { 
         TNetAddr noServerAddr("localhost", port);
 
-        TestNoServerImplClient client;
-
-        int count = 0;
-        for (; count < 200; ++count) {
-            EMessageStatus status;
-            if (oneWay) {
-                status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
-            } else {
+        TestNoServerImplClient client; 
+ 
+        int count = 0; 
+        for (; count < 200; ++count) { 
+            EMessageStatus status; 
+            if (oneWay) { 
+                status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); 
+            } else { 
                 TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
-                status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
-            }
-
+                status = client.Session->SendMessageAutoPtr(message, &noServerAddr); 
+            } 
+ 
             Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
-
-            if (count == 0) {
-                // lame way to wait until it is connected
-                Sleep(TDuration::MilliSeconds(10));
-            }
-            client.TestSync.WaitForAndIncrement(count * 2 + 1);
-        }
-
+ 
+            if (count == 0) { 
+                // lame way to wait until it is connected 
+                Sleep(TDuration::MilliSeconds(10)); 
+            } 
+            client.TestSync.WaitForAndIncrement(count * 2 + 1); 
+        } 
+ 
         client.TestSync.WaitForAndIncrement(count * 2);
-    }
-
-    void HangingServerImpl(unsigned port) {
-        TNetAddr noServerAddr("localhost", port);
-
+    } 
+ 
+    void HangingServerImpl(unsigned port) { 
+        TNetAddr noServerAddr("localhost", port); 
+ 
         TExampleClient client;
 
         int count = 0;
@@ -199,13 +199,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
 
         THangingServer server(0);
 
-        HangingServerImpl(server.GetPort());
+        HangingServerImpl(server.GetPort()); 
     }
 
     Y_UNIT_TEST(TestNoServer) {
         TObjectCountCheck objectCountCheck;
 
-        TestNoServerImpl(17, false);
+        TestNoServerImpl(17, false); 
     }
 
     Y_UNIT_TEST(PauseInput) {
@@ -746,20 +746,20 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
 
         TResetAfterSendMessageOneWayDuringShutdown client;
 
-        TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount);
-        EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr);
+        TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount); 
+        EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr); 
         UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
 
-        client.TestSync.WaitForAndIncrement(2);
-
+        client.TestSync.WaitForAndIncrement(2); 
+ 
         client.Session->Shutdown();
 
-        ok = client.Session->SendMessageOneWay(message);
+        ok = client.Session->SendMessageOneWay(message); 
         Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data());
-
-        // check reset is possible here
-        message->Reset();
-        client.TestSync.CheckAndIncrement(3);
+ 
+        // check reset is possible here 
+        message->Reset(); 
+        client.TestSync.CheckAndIncrement(3); 
 
         delete message;
     }
@@ -767,7 +767,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
     Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) {
         TObjectCountCheck objectCountCheck;
 
-        TestNoServerImpl(17, true);
+        TestNoServerImpl(17, true); 
     }
 
     struct TResetAfterSendOneWaySuccessClient: public TExampleClient {
@@ -904,40 +904,40 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
                 Sync.WaitForAndIncrement(2);
             }
         }
-
+ 
         void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
-            // We do not check for message errors in this test.
-        }
-
+            // We do not check for message errors in this test. 
+        } 
+ 
         void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
-        }
+        } 
     };
 
     struct TOnConnectionEventServer: public TExampleServer {
-        TOnConnectionEventServer()
+        TOnConnectionEventServer() 
             : TExampleServer("TOnConnectionEventServer")
         {
         }
-
+ 
         ~TOnConnectionEventServer() override {
-            Session->Shutdown();
-        }
-
+            Session->Shutdown(); 
+        } 
+ 
         void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
-            // We do not check for server message errors in this test.
-        }
-    };
-
+            // We do not check for server message errors in this test. 
+        } 
+    }; 
+ 
     Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) {
         TObjectCountCheck objectCountCheck;
 
-        TOnConnectionEventServer server;
+        TOnConnectionEventServer server; 
 
         TOnConnectionEventClient client;
 
         TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort());
 
-        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
+        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); 
 
         client.Sync.WaitForAndIncrement(1);
 
@@ -949,12 +949,12 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
     Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) {
         TObjectCountCheck objectCountCheck;
 
-        THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer);
+        THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer); 
 
         TOnConnectionEventClient client;
         TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort());
 
-        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
+        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); 
 
         client.Sync.WaitForAndIncrement(1);
 
@@ -1059,66 +1059,66 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
         client.WaitReplies();
 
         server.WaitForOnMessageCount(test_msg_count);
-    };
-
+    }; 
+ 
     Y_UNIT_TEST(TestConnectionAttempts) {
-        TObjectCountCheck objectCountCheck;
-
-        TNetAddr noServerAddr("localhost", 17);
-        TBusClientSessionConfig clientConfig;
+        TObjectCountCheck objectCountCheck; 
+ 
+        TNetAddr noServerAddr("localhost", 17); 
+        TBusClientSessionConfig clientConfig; 
         clientConfig.RetryInterval = 100;
         TestNoServerImplClient client(clientConfig);
-
-        int count = 0;
-        for (; count < 10; ++count) {
-            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
-                                                                      &noServerAddr);
-
+ 
+        int count = 0; 
+        for (; count < 10; ++count) { 
+            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), 
+                                                                      &noServerAddr); 
+ 
             Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
-            client.TestSync.WaitForAndIncrement(count * 2 + 1);
-
-            // First connection attempt is for connect call; second one is to get connect result.
-            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
-        }
-        Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval));
-        for (; count < 10; ++count) {
-            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
-                                                                      &noServerAddr);
-
+            client.TestSync.WaitForAndIncrement(count * 2 + 1); 
+ 
+            // First connection attempt is for connect call; second one is to get connect result. 
+            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); 
+        } 
+        Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval)); 
+        for (; count < 10; ++count) { 
+            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), 
+                                                                      &noServerAddr); 
+ 
             Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
-            client.TestSync.WaitForAndIncrement(count * 2 + 1);
-
-            // First connection attempt is for connect call; second one is to get connect result.
-            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4);
-        }
-    };
-
+            client.TestSync.WaitForAndIncrement(count * 2 + 1); 
+ 
+            // First connection attempt is for connect call; second one is to get connect result. 
+            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4); 
+        } 
+    }; 
+ 
     Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndNotReconnectWhenIdle) {
-        TObjectCountCheck objectCountCheck;
-
-        TNetAddr noServerAddr("localhost", 17);
-        TBusClientSessionConfig clientConfig;
+        TObjectCountCheck objectCountCheck; 
+ 
+        TNetAddr noServerAddr("localhost", 17); 
+        TBusClientSessionConfig clientConfig; 
         clientConfig.RetryInterval = 100;
         clientConfig.ReconnectWhenIdle = false;
         TestNoServerImplClient client(clientConfig);
-
-        int count = 0;
-        for (; count < 10; ++count) {
-            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
-                                                                      &noServerAddr);
-
+ 
+        int count = 0; 
+        for (; count < 10; ++count) { 
+            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), 
+                                                                      &noServerAddr); 
+ 
             Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
-            client.TestSync.WaitForAndIncrement(count * 2 + 1);
-
-            // First connection attempt is for connect call; second one is to get connect result.
-            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
-        }
-
+            client.TestSync.WaitForAndIncrement(count * 2 + 1); 
+ 
+            // First connection attempt is for connect call; second one is to get connect result. 
+            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); 
+        } 
+ 
         Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2));
-        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); 
         Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval));
-        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
-    };
+        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); 
+    }; 
 
     Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndReconnectWhenIdle) {
         TObjectCountCheck objectCountCheck;