12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019 |
- #include "http.h"
- #include "http_ex.h"
- #include <library/cpp/testing/unittest/registar.h>
- #include <library/cpp/testing/unittest/tests_data.h>
- #include <util/generic/cast.h>
- #include <util/stream/output.h>
- #include <util/stream/zlib.h>
- #include <util/system/datetime.h>
- #include <util/system/mutex.h>
- #include <util/random/random.h>
- Y_UNIT_TEST_SUITE(THttpServerTest) {
- class TEchoServer: public THttpServer::ICallBack {
- class TRequest: public THttpClientRequestEx {
- public:
- inline TRequest(TEchoServer* parent)
- : Parent_(parent)
- {
- }
- bool Reply(void* /*tsr*/) override {
- if (!ProcessHeaders()) {
- return true;
- }
- Output() << "HTTP/1.1 200 Ok\r\n\r\n";
- if (Buf.Size()) {
- Output().Write(Buf.AsCharPtr(), Buf.Size());
- } else {
- Output() << Parent_->Res_;
- }
- Output().Finish();
- return true;
- }
- private:
- TEchoServer* Parent_ = nullptr;
- };
- public:
- inline TEchoServer(const TString& res)
- : Res_(res)
- {
- }
- TClientRequest* CreateClient() override {
- return new TRequest(this);
- }
- private:
- TString Res_;
- };
- class TSleepingServer: public THttpServer::ICallBack {
- class TReplier: public TRequestReplier {
- public:
- inline TReplier(TSleepingServer* server)
- : Server(server)
- {
- }
- bool BeforeParseRequestOk(void*) override {
- if (Server->Ttl && (TInstant::Now() - CreateTime > TDuration::MilliSeconds(Server->Ttl))) {
- Output().Write("HTTP/1.0 503 Created\nX-Server: sleeping server\n\nTTL Exceed");
- return false;
- } else {
- return true;
- }
- }
- bool DoReply(const TReplyParams& params) override {
- ++Server->Replies;
- with_lock (Server->Lock) {
- params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo");
- params.Output.Finish();
- }
- return true;
- }
- using TClientRequest::Output;
- private:
- TSleepingServer* Server = nullptr;
- TInstant CreateTime = TInstant::Now();
- };
- public:
- TSleepingServer(size_t ttl = 0)
- : Ttl(ttl) {}
- TClientRequest* CreateClient() override {
- return new TReplier(this);
- }
- void OnMaxConn() override {
- ++MaxConns;
- }
- public:
- TMutex Lock;
- std::atomic<size_t> Replies;
- std::atomic<size_t> MaxConns;
- size_t Ttl;
- };
- static const TString CrLf = "\r\n";
- struct TTestRequest {
- TTestRequest(ui16 port, TString content = TString())
- : Port(port)
- , Content(std::move(content))
- {
- }
- void CheckContinue(TSocketInput& si) {
- if (Expect100Continue) {
- TStringStream ss;
- TString firstLine;
- si.ReadLine(firstLine);
- for (;;) {
- TString buf;
- si.ReadLine(buf);
- if (buf.size() == 0) {
- break;
- }
- ss << buf << CrLf;
- }
- UNIT_ASSERT_EQUAL(firstLine, "HTTP/1.1 100 Continue");
- }
- }
- TString Execute() {
- TSocket* s = nullptr;
- THolder<TSocket> singleReqSocket;
- if (KeepAliveConnection) {
- if (!KeepAlivedSocket) {
- KeepAlivedSocket = MakeHolder<TSocket>(TNetworkAddress("localhost", Port), TDuration::Seconds(10));
- }
- s = KeepAlivedSocket.Get();
- } else {
- TNetworkAddress addr("localhost", Port);
- singleReqSocket.Reset(new TSocket(addr, TDuration::Seconds(10)));
- s = singleReqSocket.Get();
- }
- bool isPost = Type == "POST";
- TSocketInput si(*s);
- if (UseHttpOutput) {
- TSocketOutput so(*s);
- THttpOutput output(&so);
- output.EnableKeepAlive(KeepAliveConnection);
- output.EnableCompression(EnableResponseEncoding);
- TStringStream r;
- r << Type << " / HTTP/1.1" << CrLf;
- r << "Host: localhost:" + ToString(Port) << CrLf;
- if (isPost) {
- if (ContentEncoding.size()) {
- r << "Content-Encoding: " << ContentEncoding << CrLf;
- } else {
- r << "Transfer-Encoding: chunked" << CrLf;
- }
- if (Expect100Continue) {
- r << "Expect: 100-continue" << CrLf;
- }
- }
- r << CrLf;
- if (isPost) {
- output.Write(r.Str());
- output.Flush();
- CheckContinue(si);
- output.Write(Content);
- output.Finish();
- } else {
- output.Write(r.Str());
- output.Finish();
- }
- } else {
- TStringStream r;
- r << Type << " / HTTP/1.1" << CrLf;
- r << "Host: localhost:" + ToString(Port) << CrLf;
- if (KeepAliveConnection) {
- r << "Connection: Keep-Alive" << CrLf;
- } else {
- r << "Connection: Close" << CrLf;
- }
- if (EnableResponseEncoding) {
- r << "Accept-Encoding: gzip, deflate, x-gzip, x-deflate, y-lzo, y-lzf, y-lzq, y-bzip2, y-lzma" << CrLf;
- }
- if (isPost && Expect100Continue) {
- r << "Expect: 100-continue" << CrLf;
- }
- if (isPost && ContentEncoding.size() && Content.size()) {
- r << "Content-Encoding: " << ContentEncoding << CrLf;
- TStringStream compressedContent;
- {
- TZLibCompress zlib(&compressedContent);
- zlib.Write(Content.data(), Content.size());
- zlib.Flush();
- zlib.Finish();
- }
- r << "Content-Length: " << compressedContent.Size() << CrLf;
- r << CrLf;
- s->Send(r.Data(), r.Size());
- CheckContinue(si);
- Hdr = r.Str();
- TString tosend = compressedContent.Str();
- s->Send(tosend.data(), tosend.size());
- } else {
- if (isPost) {
- r << "Content-Length: " << Content.size() << CrLf;
- r << CrLf;
- s->Send(r.Data(), r.Size());
- CheckContinue(si);
- Hdr = r.Str();
- s->Send(Content.data(), Content.size());
- } else {
- r << CrLf;
- Hdr = r.Str();
- s->Send(r.Data(), r.Size());
- }
- }
- }
- THttpInput input(&si);
- TStringStream ss;
- TransferData(&input, &ss);
- return ss.Str();
- }
- TString GetDescription() const {
- if (UseHttpOutput) {
- TStringStream ss;
- ss << (KeepAliveConnection ? "keep-alive " : "") << Type;
- if (ContentEncoding.size()) {
- ss << " with encoding=" << ContentEncoding;
- }
- return ss.Str();
- } else {
- return Hdr;
- }
- }
- ui16 Port = 0;
- bool UseHttpOutput = true;
- TString Type = "GET";
- TString ContentEncoding;
- TString Content;
- bool KeepAliveConnection = false;
- THolder<TSocket> KeepAlivedSocket;
- bool EnableResponseEncoding = false;
- TString Hdr;
- bool Expect100Continue = false;
- };
- class TFailingMtpQueue: public TSimpleThreadPool {
- private:
- bool FailOnAdd_ = false;
- public:
- void SetFailOnAdd(bool fail = true) {
- FailOnAdd_ = fail;
- }
- [[nodiscard]] bool Add(IObjectInQueue* pObj) override {
- if (FailOnAdd_) {
- return false;
- }
- return TSimpleThreadPool::Add(pObj);
- }
- TFailingMtpQueue() = default;
- TFailingMtpQueue(IThreadFactory* pool)
- : TSimpleThreadPool(pool)
- {
- }
- };
- TString TestData(size_t size = 5 * 4096) {
- TString res;
- for (size_t i = 0; i < size; ++i) {
- res += (char)i;
- }
- return res;
- }
- Y_UNIT_TEST(TestEchoServer) {
- TString res = TestData();
- TPortManager pm;
- const ui16 port = pm.GetPort();
- const bool trueFalse[] = {true, false};
- TEchoServer serverImpl(res);
- THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true));
- for (int i = 0; i < 2; ++i) {
- UNIT_ASSERT(server.Start());
- TTestRequest r(port);
- r.Content = res;
- for (bool keepAlive : trueFalse) {
- r.KeepAliveConnection = keepAlive;
- // THttpOutput use chunked stream, else use Content-Length
- for (bool useHttpOutput : trueFalse) {
- r.UseHttpOutput = useHttpOutput;
- for (bool enableResponseEncoding : trueFalse) {
- r.EnableResponseEncoding = enableResponseEncoding;
- const TString reqTypes[] = {"GET", "POST"};
- for (const TString& reqType : reqTypes) {
- r.Type = reqType;
- const TString encoders[] = {"", "deflate"};
- for (const TString& encoder : encoders) {
- r.ContentEncoding = encoder;
- for (bool expect100Continue : trueFalse) {
- r.Expect100Continue = expect100Continue;
- TString resp = r.Execute();
- UNIT_ASSERT_C(resp == res, "diff echo response for request:\n" + r.GetDescription());
- }
- }
- }
- }
- }
- }
- server.Stop();
- }
- }
- Y_UNIT_TEST(TestReusePortEnabled) {
- TString res = TestData();
- TPortManager pm;
- const ui16 port = pm.GetPort();
- TEchoServer serverImpl(res);
- TVector<THolder<THttpServer>> servers;
- for (ui32 i = 0; i < 10; i++) {
- servers.push_back(MakeHolder<THttpServer>(&serverImpl, THttpServer::TOptions(port).EnableReusePort(true)));
- }
- for (ui32 testRun = 0; testRun < 3; testRun++) {
- for (auto& server : servers) {
- // start servers one at a time and check at least one of them is replying
- UNIT_ASSERT(server->Start());
- TTestRequest r(port, res);
- UNIT_ASSERT_C(r.Execute() == res, "diff echo response for request:\n" + r.GetDescription());
- }
- for (auto& server : servers) {
- // ping servers and stop them one at a time
- // at the last iteration only one server is still working and then gets stopped as well
- TTestRequest r(port, res);
- UNIT_ASSERT_C(r.Execute() == res, "diff echo response for request:\n" + r.GetDescription());
- server->Stop();
- }
- }
- }
- Y_UNIT_TEST(TestReusePortDisabled) {
- // check that with the ReusePort option disabled it's impossible to start two servers on the same port
- // check that ReusePort option is disabled by default (don't set it explicitly in the test)
- TPortManager pm;
- const ui16 port = pm.GetPort();
- TEchoServer serverImpl(TString{});
- THttpServer server1(&serverImpl, THttpServer::TOptions(port));
- THttpServer server2(&serverImpl, THttpServer::TOptions(port));
- UNIT_ASSERT(true == server1.Start());
- UNIT_ASSERT(false == server2.Start());
- server1.Stop();
- // Stop() is a sync call, port should be free by now
- UNIT_ASSERT(true == server2.Start());
- UNIT_ASSERT(false == server1.Start());
- }
- Y_UNIT_TEST(TestFailServer) {
- /**
- * Emulate request processing failures
- * Data should be large enough not to fit into socket buffer
- **/
- TString res = TestData(10 * 1024 * 1024);
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
- TEchoServer serverImpl(res);
- THttpServer::TOptions options(port);
- options.EnableKeepAlive(true);
- options.EnableCompression(true);
- using TFailingServerMtpQueue = TThreadPoolBinder<TFailingMtpQueue, THttpServer::ICallBack>;
- THttpServer::TMtpQueueRef mainWorkers = new TFailingServerMtpQueue(&serverImpl, SystemThreadFactory());
- THttpServer::TMtpQueueRef failWorkers = new TThreadPool(SystemThreadFactory());
- THttpServer server(&serverImpl, mainWorkers, failWorkers, options);
- UNIT_ASSERT(server.Start());
- for (size_t i = 0; i < 3; ++i) {
- // should fail on 2nd request
- static_cast<TFailingMtpQueue*>(mainWorkers.Get())->SetFailOnAdd(i == 1);
- TTestRequest r(port);
- r.Content = res;
- r.Type = "POST";
- TString resp = r.Execute();
- if (i == 1) {
- UNIT_ASSERT(resp.Contains("Service Unavailable"));
- } else {
- UNIT_ASSERT_C(resp == res, "diff echo response for request:\n" + r.GetDescription());
- }
- }
- server.Stop();
- }
- class TReleaseConnectionServer: public THttpServer::ICallBack {
- class TRequest: public THttpClientRequestEx {
- public:
- bool Reply(void* /*tsr*/) override {
- Output() << "HTTP/1.1 200 Ok\r\n\r\n";
- Output() << "reply";
- Output().Finish();
- ReleaseConnection();
- throw yexception() << "some error";
- return true;
- }
- };
- public:
- TClientRequest* CreateClient() override {
- return new TRequest();
- }
- void OnException() override {
- ExceptionMessage = CurrentExceptionMessage();
- }
- TString ExceptionMessage;
- };
- class TResetConnectionServer: public THttpServer::ICallBack {
- class TRequest: public TClientRequest {
- public:
- bool Reply(void* /*tsr*/) override {
- Output() << "HTTP/1.1";
- ResetConnection();
- return true;
- }
- };
- public:
- TClientRequest* CreateClient() override {
- return new TRequest();
- }
- void OnException() override {
- ExceptionMessage = CurrentExceptionMessage();
- }
- TString ExceptionMessage;
- };
- class TListenerSockAddrReplyServer: public THttpServer::ICallBack {
- class TRequest: public TClientRequest {
- public:
- bool Reply(void* /*tsr*/) override {
- Output() << "HTTP/1.1 200 Ok\r\n\r\n";
- Output() << PrintHostAndPort(*GetListenerSockAddrRef());
- Output().Finish();
- return true;
- }
- };
- public:
- TClientRequest* CreateClient() override {
- return new TRequest();
- }
- };
- Y_UNIT_TEST(TTestResetConnection) {
- TPortManager pm;
- const ui16 port = pm.GetPort();
- TResetConnectionServer serverImpl;
- THttpServer server(&serverImpl, THttpServer::TOptions(port));
- UNIT_ASSERT(server.Start());
- TTestRequest r(port, "request");
- UNIT_ASSERT_EXCEPTION_CONTAINS(r.Execute(), TSystemError, "Connection reset by peer");
- server.Stop();
- }
- Y_UNIT_TEST(TTestReleaseConnection) {
- TPortManager pm;
- const ui16 port = pm.GetPort();
- TReleaseConnectionServer serverImpl;
- THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true));
- UNIT_ASSERT(server.Start());
- TTestRequest r(port, "request");
- r.KeepAliveConnection = true;
- UNIT_ASSERT_C(r.Execute() == "reply", "diff echo response for request:\n" + r.GetDescription());
- server.Stop();
- UNIT_ASSERT_STRINGS_EQUAL(serverImpl.ExceptionMessage, "(yexception) some error");
- }
- THttpInput SendRequest(TSocket& socket, ui16 port) {
- TSocketInput si(socket);
- TSocketOutput so(socket);
- THttpOutput out(&so);
- out.EnableKeepAlive(true);
- out << "GET / HTTP/1.1" << CrLf;
- out << "Host: localhost:" + ToString(port) << CrLf;
- out << CrLf;
- out.Flush();
- THttpInput input(&si);
- input.ReadAll();
- return input;
- }
- THttpInput SendRequestWithBody(TSocket& socket, ui16 port, TString body) {
- TSocketInput si(socket);
- TSocketOutput so(socket);
- THttpOutput out(&so);
- out << "POST / HTTP/1.1" << CrLf;
- out << "Host: localhost:" + ToString(port) << CrLf;
- out << "Content-Length: " + ToString(body.size()) << CrLf;
- out << CrLf;
- out << body;
- out.Flush();
- THttpInput input(&si);
- input.ReadAll();
- return input;
- }
- Y_UNIT_TEST(TTestExpirationTimeout) {
- TPortManager pm;
- const ui16 port = pm.GetPort();
- TEchoServer serverImpl("test_data");
- THttpServer::TOptions options(port);
- options.nThreads = 1;
- options.MaxQueueSize = 0;
- options.MaxConnections = 0;
- options.KeepAliveEnabled = true;
- options.ExpirationTimeout = TDuration::Seconds(1);
- options.PollTimeout = TDuration::MilliSeconds(100);
- THttpServer server(&serverImpl, options);
- UNIT_ASSERT(server.Start());
- TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(10));
- SendRequest(socket, port);
- SendRequest(socket, port);
- Sleep(TDuration::Seconds(5));
- UNIT_ASSERT_EXCEPTION(SendRequest(socket, port), THttpReadException);
- server.Stop();
- }
- Y_UNIT_TEST(TTestContentLengthTooLarge) {
- TPortManager pm;
- const ui16 port = pm.GetPort();
- TEchoServer serverImpl("test_data");
- THttpServer::TOptions options(port);
- options.nThreads = 1;
- options.MaxQueueSize = 0;
- options.MaxInputContentLength = 2_KB;
- options.MaxConnections = 0;
- options.KeepAliveEnabled = false;
- options.ExpirationTimeout = TDuration::Seconds(1);
- options.PollTimeout = TDuration::MilliSeconds(100);
- THttpServer server(&serverImpl, options);
- UNIT_ASSERT(server.Start());
- TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(5));
- UNIT_ASSERT_STRING_CONTAINS(SendRequestWithBody(socket, port, TString(1_KB, 'a')).FirstLine(), "HTTP/1.1 200 Ok");
- TSocket socket2(TNetworkAddress("localhost", port), TDuration::Seconds(5));
- UNIT_ASSERT_STRING_CONTAINS(SendRequestWithBody(socket2, port, TString(10_KB, 'a')).FirstLine(), "HTTP/1.1 413 Payload Too Large");
- server.Stop();
- }
- Y_UNIT_TEST(TTestNullInRequest) {
- TPortManager pm;
- const ui16 port = pm.GetPort();
- TEchoServer serverImpl("test_data");
- THttpServer::TOptions options(port);
- options.nThreads = 1;
- options.MaxQueueSize = 0;
- options.MaxConnections = 0;
- options.KeepAliveEnabled = false;
- options.ExpirationTimeout = TDuration::Seconds(1);
- options.PollTimeout = TDuration::MilliSeconds(100);
- THttpServer server(&serverImpl, options);
- UNIT_ASSERT(server.Start());
- TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(5));
- TSocketInput si(socket);
- TSocketOutput so(socket);
- THttpOutput out(&so);
- out << "GET \0/ggg HTTP/1.1" << CrLf;
- out << "Host: localhost:" + ToString(port) << CrLf;
- out << CrLf;
- out.Flush();
- THttpInput input(&si);
- input.ReadAll();
- UNIT_ASSERT_STRING_CONTAINS(input.FirstLine(), "HTTP/1.1 4");
- server.Stop();
- }
- Y_UNIT_TEST(TTestCloseConnectionOnRequestLimit) {
- TPortManager pm;
- const ui16 port = pm.GetPort();
- TEchoServer serverImpl("test_data");
- THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxRequestsPerConnection(2));
- UNIT_ASSERT(server.Start());
- TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(10));
- UNIT_ASSERT(SendRequest(socket, port).IsKeepAlive());
- UNIT_ASSERT(!SendRequest(socket, port).IsKeepAlive());
- UNIT_ASSERT_EXCEPTION(SendRequest(socket, port), THttpReadException);
- server.Stop();
- }
- Y_UNIT_TEST(TTestListenerSockAddrConnection) {
- TPortManager pm;
- const ui16 port1 = pm.GetPort();
- const ui16 port2 = pm.GetPort();
- TListenerSockAddrReplyServer serverImpl;
- THttpServer server(&serverImpl, THttpServer::TOptions().EnableKeepAlive(true).AddBindAddress("127.0.0.1", port1).AddBindAddress("127.0.0.1", port2));
- UNIT_ASSERT(server.Start());
- TTestRequest r1(port1);
- r1.KeepAliveConnection = true;
- TString resp = r1.Execute();
- UNIT_ASSERT(resp == TString::Join("127.0.0.1", ":", ToString(port1)));
- TTestRequest r2(port2);
- r2.KeepAliveConnection = true;
- resp = r2.Execute();
- UNIT_ASSERT(resp == TString::Join("127.0.0.1", ":", ToString(port2)));
- server.Stop();
- }
- Y_UNIT_TEST(TestSocketsLeak) {
- TPortManager portManager;
- TString res = TestData(25);
- const bool trueFalse[] = {true, false};
- for (bool rejectExcessConnections : trueFalse) {
- for (bool keepAlive : trueFalse) {
- const ui16 port = portManager.GetPort();
- TSleepingServer server;
- THttpServer::TOptions options(port);
- options.nThreads = 1;
- options.MaxConnections = 1;
- options.MaxQueueSize = 10;
- options.MaxFQueueSize = 2;
- options.nFThreads = 2;
- options.KeepAliveEnabled = true;
- options.RejectExcessConnections = rejectExcessConnections;
- THttpServer srv(&server, options);
- UNIT_ASSERT(srv.Start());
- UNIT_ASSERT(server.Lock.TryAcquire());
- std::atomic<size_t> threadsFinished = 0;
- TVector<THolder<IThreadFactory::IThread>> threads;
- auto func = [port, keepAlive, &threadsFinished]() {
- try {
- TTestRequest r(port);
- r.KeepAliveConnection = keepAlive;
- r.Execute();
- } catch (...) {
- }
- ++threadsFinished;
- };
- threads.push_back(SystemThreadFactory()->Run(func));
- while (server.Replies.load() != 1) { //wait while we have one connection inside server
- Sleep(TDuration::MilliSeconds(1));
- }
- for (size_t i = 1; i < 3; ++i) {
- threads.push_back(SystemThreadFactory()->Run(func));
- //in case of rejectExcessConnections next requests will fail, otherwise will stuck inside server queue
- while ((rejectExcessConnections ? threadsFinished.load() : srv.GetRequestQueueSize()) != i) {
- Sleep(TDuration::MilliSeconds(1));
- }
- }
- server.Lock.Release();
- for (auto&& thread : threads) {
- thread->Join();
- }
- TStringStream opts;
- opts << " [" << rejectExcessConnections << ", " << keepAlive << "] ";
- UNIT_ASSERT_EQUAL_C(server.MaxConns, 2, opts.Str() + "we should get MaxConn notification 2 times, got " + ToString(server.MaxConns.load()));
- if (rejectExcessConnections) {
- UNIT_ASSERT_EQUAL_C(server.Replies, 1, opts.Str() + "only one request should have been processed, got " + ToString(server.Replies.load()));
- } else {
- UNIT_ASSERT_VALUES_EQUAL(server.Replies.load(), 3);
- }
- }
- }
- }
- class TShooter {
- public:
- struct TCounters {
- public:
- TCounters() = default;
- TCounters(const TCounters& other)
- : Fail(other.Fail.load())
- , Success(other.Success.load())
- {
- }
- public:
- std::atomic<size_t> Fail = 0;
- std::atomic<size_t> Success = 0;
- };
- public:
- TShooter(size_t threadCount, ui16 port)
- : Counters_(threadCount)
- {
- for (size_t i = 0; i < threadCount; ++i) {
- auto func = [i, port, this] () {
- for (;;) {
- try {
- TTestRequest r(port);
- r.KeepAliveConnection = true;
- for (size_t j = 0; j < 100; ++j) {
- if (Stopped_.load()) {
- return;
- }
- r.Execute();
- Sleep(TDuration::MilliSeconds(1) * RandomNumber<float>());
- Counters_[i].Success++;
- }
- } catch (TSystemError& e) {
- UNIT_ASSERT_C(e.Status() == ECONNRESET || e.Status() == ECONNREFUSED, CurrentExceptionMessage());
- Counters_[i].Fail++;
- } catch (THttpReadException&) {
- Counters_[i].Fail++;
- } catch (...) {
- UNIT_ASSERT_C(false, CurrentExceptionMessage());
- }
- }
- };
- Threads_.push_back(SystemThreadFactory()->Run(func));
- }
- }
- void Stop() {
- Stopped_.store(true);
- for (auto& thread : Threads_) {
- thread->Join();
- }
- }
- void WaitProgress() const {
- auto snapshot = Counters_;
- for (;;) {
- size_t haveProgress = 0;
- for (size_t i = 0; i < Counters_.size(); ++i) {
- haveProgress += (Counters_[i].Fail.load() + Counters_[i].Success.load()) > (snapshot[i].Fail + snapshot[i].Success);
- }
- if (haveProgress == Counters_.size()) {
- return;
- }
- Sleep(TDuration::MilliSeconds(1));
- }
- }
- const auto& GetCounters() const {
- return Counters_;
- }
- ~TShooter() {
- Stop();
- }
- private:
- TVector<THolder<IThreadFactory::IThread>> Threads_;
- std::atomic<bool> Stopped_ = false;
- TVector<TCounters> Counters_;
- };
- struct TTestConfig {
- bool OneShot = false;
- ui32 ListenerThreads = 1;
- };
- TVector<TTestConfig> testConfigs = {
- {.OneShot = false, .ListenerThreads = 1},
- {.OneShot = true, .ListenerThreads = 1},
- {.OneShot = true, .ListenerThreads = 4},
- {.OneShot = true, .ListenerThreads = 63},
- };
- THttpServer::TOptions ApplyConfig(const THttpServer::TOptions& opts, const TTestConfig& cfg) {
- THttpServer::TOptions res = opts;
- res.OneShotPoll = cfg.OneShot;
- res.nListenerThreads = cfg.ListenerThreads;
- return res;
- }
- Y_UNIT_TEST(TestStartStop) {
- TPortManager pm;
- const ui16 port = pm.GetPort();
- const size_t threadCount = 5;
- TShooter shooter(threadCount, port);
- TString res = TestData();
- for (const auto& cfg : testConfigs) {
- TEchoServer serverImpl(res);
- THttpServer server(&serverImpl, ApplyConfig(THttpServer::TOptions(port).EnableKeepAlive(true), cfg));
- for (size_t i = 0; i < 100; ++i) {
- UNIT_ASSERT(server.Start());
- shooter.WaitProgress();
- {
- auto before = shooter.GetCounters();
- shooter.WaitProgress();
- auto after = shooter.GetCounters();
- for (size_t i = 0; i < before.size(); ++i) {
- UNIT_ASSERT(before[i].Success < after[i].Success);
- UNIT_ASSERT(before[i].Fail == after[i].Fail);
- }
- }
- server.Stop();
- shooter.WaitProgress();
- {
- auto before = shooter.GetCounters();
- shooter.WaitProgress();
- auto after = shooter.GetCounters();
- for (size_t i = 0; i < before.size(); ++i) {
- UNIT_ASSERT(before[i].Success == after[i].Success);
- UNIT_ASSERT(before[i].Fail < after[i].Fail);
- }
- }
- }
- }
- }
- Y_UNIT_TEST(TestMaxConnections) {
- class TMaxConnServer
- : public TEchoServer
- {
- public:
- using TEchoServer::TEchoServer;
- void OnMaxConn() override {
- ++MaxConns;
- }
- public:
- std::atomic<size_t> MaxConns = 0;
- };
- TPortManager pm;
- const ui16 port = pm.GetPort();
- const size_t maxConnections = 5;
- TString res = TestData();
- for (const auto& cfg : testConfigs) {
- TMaxConnServer serverImpl(res);
- THttpServer server(&serverImpl, ApplyConfig(THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxConnections(maxConnections), cfg));
- UNIT_ASSERT(server.Start());
- TShooter shooter(maxConnections + 1, port);
- for (size_t i = 0; i < 100; ++i) {
- const size_t prev = serverImpl.MaxConns.load();
- while (serverImpl.MaxConns.load() < prev + 100) {
- Sleep(TDuration::MilliSeconds(1));
- }
- }
- shooter.Stop();
- server.Stop();
- for (const auto& c : shooter.GetCounters()) {
- UNIT_ASSERT(c.Success > 0);
- UNIT_ASSERT(c.Fail > 0);
- UNIT_ASSERT(c.Success > c.Fail);
- }
- }
- }
- Y_UNIT_TEST(StartFail) {
- TString res = TestData();
- TEchoServer serverImpl(res);
- {
- THttpServer server(&serverImpl, THttpServer::TOptions(1));
- UNIT_ASSERT(!server.GetErrorCode());
- UNIT_ASSERT(!server.Start());
- UNIT_ASSERT(server.GetErrorCode());
- }
- {
- TPortManager pm;
- const ui16 port = pm.GetPort();
- THttpServer server1(&serverImpl, THttpServer::TOptions(port));
- UNIT_ASSERT(server1.Start());
- UNIT_ASSERT(!server1.GetErrorCode());
- THttpServer server2(&serverImpl, THttpServer::TOptions(port));
- UNIT_ASSERT(!server2.Start());
- UNIT_ASSERT(server2.GetErrorCode());
- }
- }
- inline TString ToString(const THashSet<TString>& hs) {
- TString res = "";
- for (auto s : hs) {
- if (res) {
- res.append(",");
- }
- res.append("\"").append(s).append("\"");
- }
- return res;
- }
- Y_UNIT_TEST(TestTTLExceed) {
- // Checks that one of request returns "TTL Exceed"
- // First request waits for server.Lock.Release() for one threaded TSleepingServer
- // So second request in queue should fail with TTL Exceed, because fist one lock thread pool for (ttl + 1) ms
- TPortManager portManager;
- const ui16 port = portManager.GetPort();
- TString res = TestData(25);
- const size_t ttl = 10;
- TSleepingServer server{ttl};
- THttpServer::TOptions options(port);
- options.nThreads = 1;
- options.MaxConnections = 2;
- THttpServer srv(&server, options);
- UNIT_ASSERT(srv.Start());
- UNIT_ASSERT(server.Lock.TryAcquire());
- THashSet<TString> results;
- TMutex resultLock;
- auto func = [port, &resultLock, &results]() {
- try {
- TTestRequest r(port);
- TString result = r.Execute();
- with_lock(resultLock) {
- results.insert(result);
- }
- } catch (...) {
- }
- };
- auto t1 = SystemThreadFactory()->Run(func);
- auto t2 = SystemThreadFactory()->Run(func);
- Sleep(TDuration::MilliSeconds(ttl + 1));
- server.Lock.Release();
- t1->Join();
- t2->Join();
- UNIT_ASSERT_EQUAL_C(results, (THashSet<TString>({"Zoooo", "TTL Exceed"})), "Results is {" + ToString(results) + "}");
- }
- }
|