Просмотр исходного кода

Restoring authorship annotation for <nohttp@yandex-team.ru>. Commit 2 of 2.

nohttp 3 лет назад
Родитель
Сommit
b168387799

+ 7 - 7
library/cpp/messagebus/remote_client_connection.cpp

@@ -67,7 +67,7 @@ void TRemoteClientConnection::TryConnect() {
 
     if (!WriterData.Channel) {
         if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) {
-            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); 
+            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
             return;
         }
         LastConnectAttempt = now;
@@ -77,11 +77,11 @@ void TRemoteClientConnection::TryConnect() {
     }
 
     if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) {
-        // TryConnect is called from Writer::Act, which is called in cycle 
-        // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects. 
-        return; 
-    } 
- 
+        // TryConnect is called from Writer::Act, which is called in cycle
+        // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects.
+        return;
+    }
+
     ++WriterData.Status.ConnectSyscalls;
 
     int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len());
@@ -113,7 +113,7 @@ void TRemoteClientConnection::TryConnect() {
             WriterData.Status.Connected = false;
             WriterData.Status.ConnectError = err;
 
-            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); 
+            DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
         }
     }
 }

+ 3 - 3
library/cpp/messagebus/remote_connection.cpp

@@ -572,9 +572,9 @@ namespace NBus {
             BeforeTryWrite();
 
             WriterFillInFlight();
- 
+
             WriterGetReconnectQueue()->DequeueAllLikelyEmpty();
- 
+
             if (!WriterData.Status.Connected) {
                 TryConnect();
             } else {
@@ -584,7 +584,7 @@ namespace NBus {
                         GetWriterActor()->AddTaskFromActorLoop();
                         break;
                     }
- 
+
                     if (WriterData.State == WRITER_FILLING) {
                         WriterFillBuffer();
 

+ 1 - 1
library/cpp/messagebus/session.cpp

@@ -59,7 +59,7 @@ namespace NBus {
                 // '[]' and '[<address>' are errors.
                 return false;
             }
- 
+
             *hostName = host.substr(1, pos - 1);
 
             pos++;

+ 14 - 14
library/cpp/messagebus/session_impl.cpp

@@ -211,22 +211,22 @@ void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef
     }
 }
 
-size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const { 
-    TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false); 
-    if (!!conn) { 
-        return conn->GetConnectSyscallsNumForTest(); 
-    } else { 
-        return 0; 
-    } 
-} 
- 
+size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const {
+    TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
+    if (!!conn) {
+        return conn->GetConnectSyscallsNumForTest();
+    } else {
+        return 0;
+    }
+}
+
 void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
     Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
-    for (size_t i = 0; i < addrs.size(); ++i) { 
-        results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]); 
-    } 
-} 
- 
+    for (size_t i = 0; i < addrs.size(); ++i) {
+        results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]);
+    }
+}
+
 void TBusSessionImpl::FillStatus() {
 }
 

+ 8 - 8
library/cpp/messagebus/test/helper/example.cpp

@@ -166,24 +166,24 @@ void TExampleClient::WaitReplies() {
     ResetCounters();
 }
 
-EMessageStatus TExampleClient::WaitForError() { 
+EMessageStatus TExampleClient::WaitForError() {
     WorkDone.WaitT(TDuration::Seconds(60));
 
     UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);
     UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));
     UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());
     UNIT_ASSERT_VALUES_EQUAL(1, Errors);
-    EMessageStatus result = LastError; 
+    EMessageStatus result = LastError;
 
     ResetCounters();
-    return result; 
+    return result;
+}
+
+void TExampleClient::WaitForError(EMessageStatus status) {
+    EMessageStatus error = WaitForError();
+    UNIT_ASSERT_VALUES_EQUAL(status, error);
 }
 
-void TExampleClient::WaitForError(EMessageStatus status) { 
-    EMessageStatus error = WaitForError(); 
-    UNIT_ASSERT_VALUES_EQUAL(status, error); 
-} 
- 
 void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) {
     SendMessages(count, addr);
     WaitReplies();

+ 126 - 126
library/cpp/messagebus/test/ut/messagebus_ut.cpp

@@ -98,7 +98,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
         THolder<TExampleServer> server;
 
         TBusClientSessionConfig clientConfig;
-        clientConfig.RetryInterval = 0; 
+        clientConfig.RetryInterval = 0;
         TExampleClient client(clientConfig);
 
         server.Reset(new TExampleServer(port, "TExampleServer 1"));
@@ -106,25 +106,25 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
         client.SendMessagesWaitReplies(17, serverAddr);
 
         server.Destroy();
- 
-        // Making the client to detect disconnection. 
-        client.SendMessages(1, serverAddr); 
-        EMessageStatus error = client.WaitForError(); 
-        if (error == MESSAGE_DELIVERY_FAILED) { 
-            client.SendMessages(1, serverAddr); 
-            error = client.WaitForError(); 
-        } 
-        UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error); 
- 
+
+        // Making the client to detect disconnection.
+        client.SendMessages(1, serverAddr);
+        EMessageStatus error = client.WaitForError();
+        if (error == MESSAGE_DELIVERY_FAILED) {
+            client.SendMessages(1, serverAddr);
+            error = client.WaitForError();
+        }
+        UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error);
+
         server.Reset(new TExampleServer(port, "TExampleServer 2"));
 
         client.SendMessagesWaitReplies(19, serverAddr);
     }
 
     struct TestNoServerImplClient: public TExampleClient {
-        TTestSync TestSync; 
-        int failures = 0; 
- 
+        TTestSync TestSync;
+        int failures = 0;
+
         template <typename... Args>
         TestNoServerImplClient(Args&&... args)
             : TExampleClient(std::forward<Args>(args)...)
@@ -132,48 +132,48 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
         }
 
         ~TestNoServerImplClient() override {
-            Session->Shutdown(); 
-        } 
- 
+            Session->Shutdown();
+        }
+
         void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override {
             Y_UNUSED(message);
- 
+
             Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data());
- 
-            TestSync.CheckAndIncrement((failures++) * 2); 
-        } 
-    }; 
- 
-    void TestNoServerImpl(unsigned port, bool oneWay) { 
+
+            TestSync.CheckAndIncrement((failures++) * 2);
+        }
+    };
+
+    void TestNoServerImpl(unsigned port, bool oneWay) {
         TNetAddr noServerAddr("localhost", port);
 
-        TestNoServerImplClient client; 
- 
-        int count = 0; 
-        for (; count < 200; ++count) { 
-            EMessageStatus status; 
-            if (oneWay) { 
-                status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); 
-            } else { 
+        TestNoServerImplClient client;
+
+        int count = 0;
+        for (; count < 200; ++count) {
+            EMessageStatus status;
+            if (oneWay) {
+                status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
+            } else {
                 TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
-                status = client.Session->SendMessageAutoPtr(message, &noServerAddr); 
-            } 
- 
+                status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
+            }
+
             Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
- 
-            if (count == 0) { 
-                // lame way to wait until it is connected 
-                Sleep(TDuration::MilliSeconds(10)); 
-            } 
-            client.TestSync.WaitForAndIncrement(count * 2 + 1); 
-        } 
- 
+
+            if (count == 0) {
+                // lame way to wait until it is connected
+                Sleep(TDuration::MilliSeconds(10));
+            }
+            client.TestSync.WaitForAndIncrement(count * 2 + 1);
+        }
+
         client.TestSync.WaitForAndIncrement(count * 2);
-    } 
- 
-    void HangingServerImpl(unsigned port) { 
-        TNetAddr noServerAddr("localhost", port); 
- 
+    }
+
+    void HangingServerImpl(unsigned port) {
+        TNetAddr noServerAddr("localhost", port);
+
         TExampleClient client;
 
         int count = 0;
@@ -199,13 +199,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
 
         THangingServer server(0);
 
-        HangingServerImpl(server.GetPort()); 
+        HangingServerImpl(server.GetPort());
     }
 
     Y_UNIT_TEST(TestNoServer) {
         TObjectCountCheck objectCountCheck;
 
-        TestNoServerImpl(17, false); 
+        TestNoServerImpl(17, false);
     }
 
     Y_UNIT_TEST(PauseInput) {
@@ -746,20 +746,20 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
 
         TResetAfterSendMessageOneWayDuringShutdown client;
 
-        TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount); 
-        EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr); 
+        TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount);
+        EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr);
         UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
 
-        client.TestSync.WaitForAndIncrement(2); 
- 
+        client.TestSync.WaitForAndIncrement(2);
+
         client.Session->Shutdown();
 
-        ok = client.Session->SendMessageOneWay(message); 
+        ok = client.Session->SendMessageOneWay(message);
         Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data());
- 
-        // check reset is possible here 
-        message->Reset(); 
-        client.TestSync.CheckAndIncrement(3); 
+
+        // check reset is possible here
+        message->Reset();
+        client.TestSync.CheckAndIncrement(3);
 
         delete message;
     }
@@ -767,7 +767,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
     Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) {
         TObjectCountCheck objectCountCheck;
 
-        TestNoServerImpl(17, true); 
+        TestNoServerImpl(17, true);
     }
 
     struct TResetAfterSendOneWaySuccessClient: public TExampleClient {
@@ -904,40 +904,40 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
                 Sync.WaitForAndIncrement(2);
             }
         }
- 
+
         void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
-            // We do not check for message errors in this test. 
-        } 
- 
+            // We do not check for message errors in this test.
+        }
+
         void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
-        } 
+        }
     };
 
     struct TOnConnectionEventServer: public TExampleServer {
-        TOnConnectionEventServer() 
+        TOnConnectionEventServer()
             : TExampleServer("TOnConnectionEventServer")
         {
         }
- 
+
         ~TOnConnectionEventServer() override {
-            Session->Shutdown(); 
-        } 
- 
+            Session->Shutdown();
+        }
+
         void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
-            // We do not check for server message errors in this test. 
-        } 
-    }; 
- 
+            // We do not check for server message errors in this test.
+        }
+    };
+
     Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) {
         TObjectCountCheck objectCountCheck;
 
-        TOnConnectionEventServer server; 
+        TOnConnectionEventServer server;
 
         TOnConnectionEventClient client;
 
         TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort());
 
-        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); 
+        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
 
         client.Sync.WaitForAndIncrement(1);
 
@@ -949,12 +949,12 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
     Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) {
         TObjectCountCheck objectCountCheck;
 
-        THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer); 
+        THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer);
 
         TOnConnectionEventClient client;
         TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort());
 
-        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); 
+        client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
 
         client.Sync.WaitForAndIncrement(1);
 
@@ -1059,66 +1059,66 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
         client.WaitReplies();
 
         server.WaitForOnMessageCount(test_msg_count);
-    }; 
- 
+    };
+
     Y_UNIT_TEST(TestConnectionAttempts) {
-        TObjectCountCheck objectCountCheck; 
- 
-        TNetAddr noServerAddr("localhost", 17); 
-        TBusClientSessionConfig clientConfig; 
+        TObjectCountCheck objectCountCheck;
+
+        TNetAddr noServerAddr("localhost", 17);
+        TBusClientSessionConfig clientConfig;
         clientConfig.RetryInterval = 100;
         TestNoServerImplClient client(clientConfig);
- 
-        int count = 0; 
-        for (; count < 10; ++count) { 
-            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), 
-                                                                      &noServerAddr); 
- 
+
+        int count = 0;
+        for (; count < 10; ++count) {
+            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
+                                                                      &noServerAddr);
+
             Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
-            client.TestSync.WaitForAndIncrement(count * 2 + 1); 
- 
-            // First connection attempt is for connect call; second one is to get connect result. 
-            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); 
-        } 
-        Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval)); 
-        for (; count < 10; ++count) { 
-            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), 
-                                                                      &noServerAddr); 
- 
+            client.TestSync.WaitForAndIncrement(count * 2 + 1);
+
+            // First connection attempt is for connect call; second one is to get connect result.
+            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+        }
+        Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval));
+        for (; count < 10; ++count) {
+            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
+                                                                      &noServerAddr);
+
             Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
-            client.TestSync.WaitForAndIncrement(count * 2 + 1); 
- 
-            // First connection attempt is for connect call; second one is to get connect result. 
-            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4); 
-        } 
-    }; 
- 
+            client.TestSync.WaitForAndIncrement(count * 2 + 1);
+
+            // First connection attempt is for connect call; second one is to get connect result.
+            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4);
+        }
+    };
+
     Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndNotReconnectWhenIdle) {
-        TObjectCountCheck objectCountCheck; 
- 
-        TNetAddr noServerAddr("localhost", 17); 
-        TBusClientSessionConfig clientConfig; 
+        TObjectCountCheck objectCountCheck;
+
+        TNetAddr noServerAddr("localhost", 17);
+        TBusClientSessionConfig clientConfig;
         clientConfig.RetryInterval = 100;
         clientConfig.ReconnectWhenIdle = false;
         TestNoServerImplClient client(clientConfig);
- 
-        int count = 0; 
-        for (; count < 10; ++count) { 
-            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), 
-                                                                      &noServerAddr); 
- 
+
+        int count = 0;
+        for (; count < 10; ++count) {
+            EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
+                                                                      &noServerAddr);
+
             Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
-            client.TestSync.WaitForAndIncrement(count * 2 + 1); 
- 
-            // First connection attempt is for connect call; second one is to get connect result. 
-            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); 
-        } 
- 
+            client.TestSync.WaitForAndIncrement(count * 2 + 1);
+
+            // First connection attempt is for connect call; second one is to get connect result.
+            UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+        }
+
         Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2));
-        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); 
+        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
         Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval));
-        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); 
-    }; 
+        UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+    };
 
     Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndReconnectWhenIdle) {
         TObjectCountCheck objectCountCheck;