Browse Source

finally, allow more than one listener thread

With single poller thread on incoming connections, every OS scheduler latency on this thread wakeup directly affects requests timings. With oneshot poll events, we can poll on the same poller from many threads, and, if one thread has stalled for some reason, some other will take it's work on the next incoming event. So:
 - make vector of listener threads instead of single one;
 - add nListenerThreads option;
 - stop request queues and listening sockets from the last finished thread;
 - check incoming options and set OneShotPoll if needed.

There is a problem around removing connections on MaxConnections limit or ExpirationTimeout. There is no simple way to safely remove items from epoll (https://lwn.net/Articles/520012/) if it has raw pointers in event data. Try to handle it via postponed deletion of connection objects, wait until all listener threads are ready to reenter poller wait and there are no threads where deleted object can be used:
- close socket immediately after remove from poller, but instead of immediate TClientConnection destruction, put it to "pending delete" list;
 - add cleanup state with thread mask, each bit stating that corresponding thread should reenter poller;
- call Cleanup routine before each poller wait, it will switch to 0 current thread's bit for each pending connection;
- when thread mask becomes all zero, really delete the connection;
- force there is a timeout for poller wait, ensure that all threads do reenter;
- add more configurations for some tests.

There is no significant changes or overhead for standard case with single listener thread, cleanup and pending deletion are just skipped. Also there is no overhead for common case where removing connections is rare. Here is the same review with nListenerThreads = 4 by default https://a.yandex-team.ru/review/4413226.
kulikov 1 year ago
parent
commit
3f2ddee8b1

+ 78 - 20
library/cpp/http/server/http.cpp

@@ -56,6 +56,7 @@ public:
     inline void DeActivate();
     inline void Reject();
 
+    void ScheduleDelete();
 public:
     TSocket Socket_;
     NAddr::IRemoteAddrRef ListenerSockAddrRef_;
@@ -64,6 +65,11 @@ public:
     TInstant LastUsed;
     TInstant AcceptMoment;
     size_t ReceivedRequests = 0;
+
+    struct TCleanupState {
+        ui64 ThreadMask = 0;
+        bool Closed = false;
+    } CleanupState_;
 };
 
 class THttpServer::TImpl {
@@ -90,6 +96,24 @@ public:
             }
         }
 
+        void Cleanup(size_t threadNum) {
+            if (Options.nListenerThreads < 2) {
+                return;
+            }
+
+            TIntrusiveListWithAutoDelete<TClientConnection, TDelete> toDelete;
+
+            {
+                TGuard<TMutex> g(Mutex_);
+
+                PendingDelete_.ForEach([&toDelete, threadNum](TClientConnection * conn) {
+                    if (!(conn->CleanupState_.ThreadMask &= ~((ui64)1 << threadNum))) {
+                        toDelete.PushBack(conn);
+                    }
+                });
+            }
+        }
+
 
         inline void Erase(TClientConnection* c, TInstant now) noexcept {
             TGuard<TMutex> g(Mutex_);
@@ -119,7 +143,14 @@ public:
                 return false;
             }
             EraseUnsafe(c);
-            delete c;
+
+            if (Options.nListenerThreads > 1) {
+                c->ScheduleDelete();
+                PendingDelete_.PushBack(c);
+            } else {
+                delete c;
+            }
+
             return true;
         }
 
@@ -133,6 +164,7 @@ public:
     public:
         TMutex Mutex_;
         TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_;
+        TIntrusiveListWithAutoDelete<TClientConnection, TDelete> PendingDelete_;
         TSocketPoller* Poller_ = nullptr;
         const THttpServerOptions& Options;
     };
@@ -207,10 +239,13 @@ public:
         Cb_->OnListenStart();
 
         try {
-            ListenThread.Reset(new TThread([this]() {
-                ListenSocket();
-            }));
-            ListenThread->Start();
+            RunningListeners_.store(Options_.nListenerThreads);
+            for (size_t i = 0; i < Options_.nListenerThreads; ++i) {
+                ListenThreads.push_back(MakeHolder<TThread>([this, threadNum = i]() {
+                    ListenSocket(threadNum);
+                }));
+                ListenThreads.back()->Start();
+            }
         } catch (const yexception&) {
             SaveErrorCode();
             return false;
@@ -219,24 +254,24 @@ public:
         return true;
     }
 
-    void JoinListenerThread() {
-        if (ListenThread) {
-            ListenThread->Join();
-            ListenThread.Reset(nullptr);
+    void JoinListenerThreads() {
+        while (!ListenThreads.empty()) {
+            ListenThreads.back()->Join();
+            ListenThreads.pop_back();
         }
     }
 
     void Wait() {
         Cb_->OnWait();
         TGuard<TMutex> g(StopMutex);
-        JoinListenerThread();
+        JoinListenerThreads();
     }
 
     void Stop() {
         Shutdown();
 
         TGuard<TMutex> g(StopMutex);
-        JoinListenerThread();
+        JoinListenerThreads();
 
         while (ConnectionCount) {
             usleep(10000);
@@ -334,7 +369,7 @@ public:
         NAddr::IRemoteAddrRef SockAddrRef_;
     };
 
-    void ListenSocket() {
+    void ListenSocket(size_t threadNum) {
         TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str());
 
         TVector<void*> events;
@@ -343,6 +378,8 @@ public:
         TInstant now = TInstant::Now();
         for (;;) {
             try {
+                Connections->Cleanup(threadNum);
+
                 const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout;
                 const size_t ret = Poller->WaitD(events.data(), events.size(), deadline);
 
@@ -371,15 +408,17 @@ public:
             }
         }
 
-        while (!Reqs.Empty()) {
-            THolder<TListenSocket> ls(Reqs.PopFront());
+        if (0 == --RunningListeners_) {
+            while (!Reqs.Empty()) {
+                THolder<TListenSocket> ls(Reqs.PopFront());
 
-            Poller->Unwait(ls->GetSocket());
-        }
+                Poller->Unwait(ls->GetSocket());
+            }
 
-        Requests->Stop();
-        FailRequests->Stop();
-        Cb_->OnListenStop();
+            Requests->Stop();
+            FailRequests->Stop();
+            Cb_->OnListenStop();
+        }
     }
 
     void RestartRequestThreads(ui32 nTh, ui32 maxQS) {
@@ -396,6 +435,16 @@ public:
         , Cb_(cb)
         , Parent_(parent)
     {
+        if (Options_.nListenerThreads > 1) {
+            Options_.OneShotPoll = true;
+
+            const auto minPollTimeout = TDuration::MilliSeconds(100);
+            if (!Options_.PollTimeout || Options_.PollTimeout > minPollTimeout) {
+                Options_.PollTimeout = minPollTimeout;
+            }
+
+            Y_ENSURE(Options_.nListenerThreads < 64);
+        }
     }
 
     TImpl(THttpServer* parent, ICallBack* cb, const TOptions& options, IThreadFactory* factory)
@@ -434,7 +483,8 @@ public:
         return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections);
     }
 
-    THolder<TThread> ListenThread;
+    TVector<THolder<TThread>> ListenThreads;
+    std::atomic<size_t> RunningListeners_ = 0;
     TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
     TPipeHandle ListenWakeupReadFd;
     TPipeHandle ListenWakeupWriteFd;
@@ -558,7 +608,15 @@ TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv,
 }
 
 TClientConnection::~TClientConnection() {
+    if (!CleanupState_.Closed) {
+        HttpServ_->DecreaseConnections();
+    }
+}
+void TClientConnection::ScheduleDelete() {
+    Socket_.Close();
     HttpServ_->DecreaseConnections();
+    CleanupState_.ThreadMask = ((ui64)1 << HttpServ_->Options().nListenerThreads) - 1;
+    CleanupState_.Closed = true;
 }
 
 void TClientConnection::OnPollEvent(TInstant now) {

+ 23 - 4
library/cpp/http/server/http_ut.cpp

@@ -822,6 +822,25 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
         TVector<TCounters> Counters_;
     };
 
+    struct TTestConfig {
+        bool OneShot = false;
+        ui32 ListenerThreads = 1;
+    };
+
+    TVector<TTestConfig> testConfigs = {
+        {.OneShot = false, .ListenerThreads = 1},
+        {.OneShot = true, .ListenerThreads = 1},
+        {.OneShot = true, .ListenerThreads = 4},
+        {.OneShot = true, .ListenerThreads = 63},
+    };
+
+    THttpServer::TOptions ApplyConfig(const THttpServer::TOptions& opts, const TTestConfig& cfg) {
+        THttpServer::TOptions res = opts;
+        res.OneShotPoll = cfg.OneShot;
+        res.nListenerThreads = cfg.ListenerThreads;
+        return res;
+    }
+
     Y_UNIT_TEST(TestStartStop) {
         TPortManager pm;
         const ui16 port = pm.GetPort();
@@ -830,9 +849,9 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
         TShooter shooter(threadCount, port);
 
         TString res = TestData();
-        for (bool oneShot : {true, false}) {
+        for (const auto& cfg : testConfigs) {
             TEchoServer serverImpl(res);
-            THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).SetOneShotPoll(oneShot));
+            THttpServer server(&serverImpl, ApplyConfig(THttpServer::TOptions(port).EnableKeepAlive(true), cfg));
             for (size_t i = 0; i < 100; ++i) {
                 UNIT_ASSERT(server.Start());
                 shooter.WaitProgress();
@@ -884,9 +903,9 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
 
         TString res = TestData();
 
-        for (bool oneShot : {true, false}) {
+        for (const auto& cfg : testConfigs) {
             TMaxConnServer serverImpl(res);
-            THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxConnections(maxConnections).SetOneShotPoll(oneShot));
+            THttpServer server(&serverImpl, ApplyConfig(THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxConnections(maxConnections), cfg));
 
             UNIT_ASSERT(server.Start());
 

+ 7 - 0
library/cpp/http/server/options.h

@@ -145,6 +145,12 @@ public:
         return *this;
     }
 
+    inline THttpServerOptions& SetListenerThreads(ui32 val) {
+        nListenerThreads = val;
+
+        return *this;
+    }
+
     struct TAddr {
         TString Addr;
         ui16 Port;
@@ -182,4 +188,5 @@ public:
     TString FailRequestsThreadName = "HttpServer";
 
     bool OneShotPoll = false;
+    ui32 nListenerThreads = 1;
 };