http.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. #include "http.h"
  2. #include "http_ex.h"
  3. #include <library/cpp/threading/equeue/equeue.h>
  4. #include <util/generic/buffer.h>
  5. #include <util/generic/intrlist.h>
  6. #include <util/generic/yexception.h>
  7. #include <util/network/address.h>
  8. #include <util/network/socket.h>
  9. #include <util/network/poller.h>
  10. #include <library/cpp/deprecated/atomic/atomic.h>
  11. #include <util/system/compat.h> // stricmp, strnicmp, strlwr, strupr, stpcpy
  12. #include <util/system/defaults.h>
  13. #include <util/system/event.h>
  14. #include <util/system/mutex.h>
  15. #include <util/system/pipe.h>
  16. #include <util/system/thread.h>
  17. #include <util/thread/factory.h>
  18. #include <cerrno>
  19. #include <cstring>
  20. using namespace NAddr;
  21. namespace {
  22. class IPollAble {
  23. public:
  24. inline IPollAble() noexcept {
  25. }
  26. virtual ~IPollAble() {
  27. }
  28. virtual void OnPollEvent(TInstant now) = 0;
  29. };
  30. struct TShouldStop {
  31. };
  32. struct TWakeupPollAble: public IPollAble {
  33. void OnPollEvent(TInstant) override {
  34. throw TShouldStop();
  35. }
  36. };
  37. }
  38. class TClientConnection: public IPollAble, public TIntrusiveListItem<TClientConnection> {
  39. public:
  40. TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef);
  41. ~TClientConnection() override;
  42. void OnPollEvent(TInstant now) override;
  43. inline void Activate(TInstant now) noexcept;
  44. inline void DeActivate();
  45. inline void Reject();
  46. void ScheduleDelete();
  47. public:
  48. TSocket Socket_;
  49. NAddr::IRemoteAddrRef ListenerSockAddrRef_;
  50. THttpServer::TImpl* HttpServ_ = nullptr;
  51. bool Reject_ = false;
  52. TInstant LastUsed;
  53. TInstant AcceptMoment;
  54. size_t ReceivedRequests = 0;
  55. struct TCleanupState {
  56. ui64 ThreadMask = 0;
  57. bool Closed = false;
  58. } CleanupState_;
  59. };
  60. class THttpServer::TImpl {
  61. public:
  62. class TConnections {
  63. public:
  64. inline TConnections(TSocketPoller* poller, const THttpServerOptions& options)
  65. : Poller_(poller)
  66. , Options(options)
  67. {
  68. }
  69. inline ~TConnections() {
  70. }
  71. inline void Add(TClientConnection* c) noexcept {
  72. TGuard<TMutex> g(Mutex_);
  73. Conns_.PushBack(c);
  74. if (Options.OneShotPoll) {
  75. Poller_->WaitReadOneShot(c->Socket_, (void*)static_cast<const IPollAble*>(c));
  76. } else {
  77. Poller_->WaitRead(c->Socket_, (void*)static_cast<const IPollAble*>(c));
  78. }
  79. }
  80. void Cleanup(size_t threadNum) {
  81. if (Options.nListenerThreads < 2) {
  82. return;
  83. }
  84. TIntrusiveListWithAutoDelete<TClientConnection, TDelete> toDelete;
  85. {
  86. TGuard<TMutex> g(Mutex_);
  87. PendingDelete_.ForEach([&toDelete, threadNum](TClientConnection * conn) {
  88. if (!(conn->CleanupState_.ThreadMask &= ~((ui64)1 << threadNum))) {
  89. toDelete.PushBack(conn);
  90. }
  91. });
  92. }
  93. }
  94. inline void Erase(TClientConnection* c, TInstant now) noexcept {
  95. TGuard<TMutex> g(Mutex_);
  96. EraseUnsafe(c, /*removeFromPoller*/!Options.OneShotPoll);
  97. if (Options.ExpirationTimeout > TDuration::Zero()) {
  98. TryRemovingUnsafe(now - Options.ExpirationTimeout);
  99. }
  100. }
  101. inline void Clear() noexcept {
  102. TGuard<TMutex> g(Mutex_);
  103. Conns_.Clear();
  104. }
  105. inline bool RemoveOld(TInstant border) noexcept {
  106. TGuard<TMutex> g(Mutex_);
  107. return TryRemovingUnsafe(border);
  108. }
  109. bool TryRemovingUnsafe(TInstant border) noexcept {
  110. if (Conns_.Empty()) {
  111. return false;
  112. }
  113. TClientConnection* c = &*(Conns_.Begin());
  114. if (c->LastUsed > border) {
  115. return false;
  116. }
  117. EraseUnsafe(c);
  118. if (Options.nListenerThreads > 1) {
  119. c->ScheduleDelete();
  120. PendingDelete_.PushBack(c);
  121. } else {
  122. delete c;
  123. }
  124. return true;
  125. }
  126. void EraseUnsafe(TClientConnection* c, bool removeFromPoller = true) noexcept {
  127. if (removeFromPoller) {
  128. Poller_->Unwait(c->Socket_);
  129. }
  130. c->Unlink();
  131. }
  132. public:
  133. TMutex Mutex_;
  134. TIntrusiveListWithAutoDelete<TClientConnection, TDelete> Conns_;
  135. TIntrusiveListWithAutoDelete<TClientConnection, TDelete> PendingDelete_;
  136. TSocketPoller* Poller_ = nullptr;
  137. const THttpServerOptions& Options;
  138. };
  139. TAutoPtr<TClientRequest> CreateRequest(TAutoPtr<TClientConnection> c) {
  140. THolder<TClientRequest> obj(Cb_->CreateClient());
  141. obj->Conn_.Reset(c.Release());
  142. return obj;
  143. }
  144. void AddRequestFromSocket(const TSocket& s, TInstant now, NAddr::IRemoteAddrRef listenerSockAddrRef) {
  145. if (MaxRequestsReached()) {
  146. Cb_->OnMaxConn();
  147. bool wasRemoved = Connections->RemoveOld(TInstant::Max());
  148. if (!wasRemoved && Options_.RejectExcessConnections) {
  149. (new TClientConnection(s, this, listenerSockAddrRef))->Reject();
  150. return;
  151. }
  152. }
  153. auto connection = new TClientConnection(s, this, listenerSockAddrRef);
  154. connection->LastUsed = now;
  155. connection->DeActivate();
  156. }
  157. void SaveErrorCode() {
  158. ErrorCode = WSAGetLastError();
  159. }
  160. int GetErrorCode() const {
  161. return ErrorCode;
  162. }
  163. const char* GetError() const {
  164. return LastSystemErrorText(ErrorCode);
  165. }
  166. bool Start() {
  167. Poller.Reset(new TSocketPoller());
  168. Connections.Reset(new TConnections(Poller.Get(), Options_));
  169. // throws on error
  170. TPipeHandle::Pipe(ListenWakeupReadFd, ListenWakeupWriteFd);
  171. SetNonBlock(ListenWakeupWriteFd, true);
  172. SetNonBlock(ListenWakeupReadFd, true);
  173. Poller->WaitRead(ListenWakeupReadFd, &WakeupPollAble);
  174. ErrorCode = 0;
  175. std::function<void(TSocket)> callback = [&](TSocket socket) {
  176. THolder<TListenSocket> ls(new TListenSocket(socket, this));
  177. if (Options_.OneShotPoll) {
  178. Poller->WaitReadOneShot(socket, static_cast<IPollAble*>(ls.Get()));
  179. } else {
  180. Poller->WaitRead(socket, static_cast<IPollAble*>(ls.Get()));
  181. }
  182. Reqs.PushBack(ls.Release());
  183. };
  184. bool addressesBound = TryToBindAddresses(Options_, &callback);
  185. if (!addressesBound) {
  186. SaveErrorCode();
  187. return false;
  188. }
  189. Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
  190. FailRequests->Start(Options_.nFThreads, Options_.MaxFQueueSize);
  191. Cb_->OnListenStart();
  192. try {
  193. RunningListeners_.store(Options_.nListenerThreads);
  194. for (size_t i = 0; i < Options_.nListenerThreads; ++i) {
  195. ListenThreads.push_back(MakeHolder<TThread>([this, threadNum = i]() {
  196. ListenSocket(threadNum);
  197. }));
  198. ListenThreads.back()->Start();
  199. }
  200. } catch (const yexception&) {
  201. SaveErrorCode();
  202. return false;
  203. }
  204. return true;
  205. }
  206. void JoinListenerThreads() {
  207. while (!ListenThreads.empty()) {
  208. ListenThreads.back()->Join();
  209. ListenThreads.pop_back();
  210. }
  211. }
  212. void Wait() {
  213. Cb_->OnWait();
  214. TGuard<TMutex> g(StopMutex);
  215. JoinListenerThreads();
  216. }
  217. void Stop() {
  218. Shutdown();
  219. TGuard<TMutex> g(StopMutex);
  220. JoinListenerThreads();
  221. while (AtomicGet(ConnectionCount)) {
  222. usleep(10000);
  223. Connections->Clear();
  224. }
  225. Connections.Destroy();
  226. Poller.Destroy();
  227. }
  228. void Shutdown() {
  229. ListenWakeupWriteFd.Write("", 1);
  230. // ignore result
  231. }
  232. void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {
  233. struct TFailRequest: public THttpClientRequestEx {
  234. inline TFailRequest(TAutoPtr<TClientRequest> parent) {
  235. Conn_.Reset(parent->Conn_.Release());
  236. HttpConn_.Reset(parent->HttpConn_.Release());
  237. }
  238. bool Reply(void*) override {
  239. if (!ProcessHeaders()) {
  240. return true;
  241. }
  242. ProcessFailRequest(0);
  243. return true;
  244. }
  245. };
  246. if (!fail && Requests->Add(req.Get())) {
  247. Y_UNUSED(req.Release());
  248. } else {
  249. req = new TFailRequest(req);
  250. if (FailRequests->Add(req.Get())) {
  251. Y_UNUSED(req.Release());
  252. } else {
  253. Cb_->OnFailRequest(-1);
  254. }
  255. }
  256. }
  257. size_t GetRequestQueueSize() const {
  258. return Requests->Size();
  259. }
  260. size_t GetFailQueueSize() const {
  261. return FailRequests->Size();
  262. }
  263. const IThreadPool& GetRequestQueue() const {
  264. return *Requests;
  265. }
  266. const IThreadPool& GetFailQueue() const {
  267. return *FailRequests;
  268. }
  269. class TListenSocket: public IPollAble, public TIntrusiveListItem<TListenSocket> {
  270. public:
  271. inline TListenSocket(const TSocket& s, TImpl* parent)
  272. : S_(s)
  273. , Server_(parent)
  274. , SockAddrRef_(GetSockAddr(S_))
  275. {
  276. }
  277. ~TListenSocket() override {
  278. }
  279. void OnPollEvent(TInstant) override {
  280. SOCKET s = ::accept(S_, nullptr, nullptr);
  281. if (Server_->Options_.OneShotPoll) {
  282. Server_->Poller->WaitReadOneShot(S_, this);
  283. }
  284. if (s == INVALID_SOCKET) {
  285. ythrow yexception() << "accept: " << LastSystemErrorText();
  286. }
  287. Server_->AddRequestFromSocket(s, TInstant::Now(), SockAddrRef_);
  288. }
  289. SOCKET GetSocket() const noexcept {
  290. return S_;
  291. }
  292. private:
  293. TSocket S_;
  294. TImpl* Server_ = nullptr;
  295. NAddr::IRemoteAddrRef SockAddrRef_;
  296. };
  297. void ListenSocket(size_t threadNum) {
  298. TThread::SetCurrentThreadName(Options_.ListenThreadName.c_str());
  299. TVector<void*> events;
  300. events.resize(Options_.EpollMaxEvents);
  301. TInstant now = TInstant::Now();
  302. for (;;) {
  303. try {
  304. Connections->Cleanup(threadNum);
  305. const TInstant deadline = Options_.PollTimeout == TDuration::Zero() ? TInstant::Max() : now + Options_.PollTimeout;
  306. const size_t ret = Poller->WaitD(events.data(), events.size(), deadline);
  307. now = TInstant::Now();
  308. for (size_t i = 0; i < ret; ++i) {
  309. ((IPollAble*)events[i])->OnPollEvent(now);
  310. }
  311. if (ret == 0 && Options_.ExpirationTimeout > TDuration::Zero()) {
  312. Connections->RemoveOld(now - Options_.ExpirationTimeout);
  313. }
  314. // When MaxConnections is limited or ExpirationTimeout is set, OnPollEvent can call
  315. // RemoveOld and destroy other IPollAble* objects in the
  316. // poller. Thus in this case we can safely process only one
  317. // event from the poller at a time.
  318. if (!Options_.MaxConnections && Options_.ExpirationTimeout == TDuration::Zero()) {
  319. if (ret >= events.size()) {
  320. events.resize(ret * 2);
  321. }
  322. }
  323. } catch (const TShouldStop&) {
  324. break;
  325. } catch (...) {
  326. Cb_->OnException();
  327. }
  328. }
  329. if (0 == --RunningListeners_) {
  330. while (!Reqs.Empty()) {
  331. THolder<TListenSocket> ls(Reqs.PopFront());
  332. Poller->Unwait(ls->GetSocket());
  333. }
  334. Requests->Stop();
  335. FailRequests->Stop();
  336. Cb_->OnListenStop();
  337. }
  338. }
  339. void RestartRequestThreads(ui32 nTh, ui32 maxQS) {
  340. Requests->Stop();
  341. Options_.nThreads = nTh;
  342. Options_.MaxQueueSize = maxQS;
  343. Requests->Start(Options_.nThreads, Options_.MaxQueueSize);
  344. }
  345. TImpl(THttpServer* parent, ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options_)
  346. : Requests(mainWorkers)
  347. , FailRequests(failWorkers)
  348. , Options_(options_)
  349. , Cb_(cb)
  350. , Parent_(parent)
  351. {
  352. if (Options_.nListenerThreads > 1) {
  353. Options_.OneShotPoll = true;
  354. const auto minPollTimeout = TDuration::MilliSeconds(100);
  355. if (!Options_.PollTimeout || Options_.PollTimeout > minPollTimeout) {
  356. Options_.PollTimeout = minPollTimeout;
  357. }
  358. Y_ENSURE(Options_.nListenerThreads < 64);
  359. }
  360. }
  361. TImpl(THttpServer* parent, ICallBack* cb, const TOptions& options, IThreadFactory* factory)
  362. : TImpl(
  363. parent,
  364. cb,
  365. MakeThreadPool<TSimpleThreadPool>(factory, options.UseElasticQueues, cb, options.RequestsThreadName),
  366. MakeThreadPool<TThreadPool>(factory, options.UseElasticQueues, nullptr, options.FailRequestsThreadName),
  367. options) {
  368. }
  369. ~TImpl() {
  370. try {
  371. Stop();
  372. } catch (...) {
  373. }
  374. }
  375. inline const TOptions& Options() const noexcept {
  376. return Options_;
  377. }
  378. inline void DecreaseConnections() noexcept {
  379. AtomicDecrement(ConnectionCount);
  380. }
  381. inline void IncreaseConnections() noexcept {
  382. AtomicIncrement(ConnectionCount);
  383. }
  384. inline i64 GetClientCount() const {
  385. return AtomicGet(ConnectionCount);
  386. }
  387. inline bool MaxRequestsReached() const {
  388. return Options_.MaxConnections && ((size_t)GetClientCount() >= Options_.MaxConnections);
  389. }
  390. TVector<THolder<TThread>> ListenThreads;
  391. std::atomic<size_t> RunningListeners_ = 0;
  392. TIntrusiveListWithAutoDelete<TListenSocket, TDelete> Reqs;
  393. TPipeHandle ListenWakeupReadFd;
  394. TPipeHandle ListenWakeupWriteFd;
  395. TMtpQueueRef Requests;
  396. TMtpQueueRef FailRequests;
  397. TAtomic ConnectionCount = 0;
  398. THolder<TSocketPoller> Poller;
  399. THolder<TConnections> Connections;
  400. int ErrorCode = 0;
  401. TOptions Options_;
  402. ICallBack* Cb_ = nullptr;
  403. THttpServer* Parent_ = nullptr;
  404. TWakeupPollAble WakeupPollAble;
  405. TMutex StopMutex;
  406. private:
  407. template <class TThreadPool_>
  408. static THolder<IThreadPool> MakeThreadPool(IThreadFactory* factory, bool elastic, ICallBack* callback = nullptr, const TString& threadName = {}) {
  409. if (!factory) {
  410. factory = SystemThreadFactory();
  411. }
  412. THolder<IThreadPool> pool;
  413. const auto params = IThreadPool::TParams().SetFactory(factory).SetThreadName(threadName);
  414. if (callback) {
  415. pool = MakeHolder<TThreadPoolBinder<TThreadPool_, THttpServer::ICallBack>>(callback, params);
  416. } else {
  417. pool = MakeHolder<TThreadPool_>(params);
  418. }
  419. if (elastic) {
  420. pool = MakeHolder<TElasticQueue>(std::move(pool));
  421. }
  422. return pool;
  423. }
  424. };
  425. THttpServer::THttpServer(ICallBack* cb, const TOptions& options, IThreadFactory* pool)
  426. : Impl_(new TImpl(this, cb, options, pool))
  427. {
  428. }
  429. THttpServer::THttpServer(ICallBack* cb, TMtpQueueRef mainWorkers, TMtpQueueRef failWorkers, const TOptions& options)
  430. : Impl_(new TImpl(this, cb, mainWorkers, failWorkers, options))
  431. {
  432. }
  433. THttpServer::~THttpServer() {
  434. }
  435. i64 THttpServer::GetClientCount() const {
  436. return Impl_->GetClientCount();
  437. }
  438. bool THttpServer::Start() {
  439. return Impl_->Start();
  440. }
  441. void THttpServer::Stop() {
  442. Impl_->Stop();
  443. }
  444. void THttpServer::Shutdown() {
  445. Impl_->Shutdown();
  446. }
  447. void THttpServer::Wait() {
  448. Impl_->Wait();
  449. }
  450. int THttpServer::GetErrorCode() {
  451. return Impl_->GetErrorCode();
  452. }
  453. const char* THttpServer::GetError() {
  454. return Impl_->GetError();
  455. }
  456. void THttpServer::RestartRequestThreads(ui32 n, ui32 queue) {
  457. Impl_->RestartRequestThreads(n, queue);
  458. }
  459. const THttpServer::TOptions& THttpServer::Options() const noexcept {
  460. return Impl_->Options();
  461. }
  462. size_t THttpServer::GetRequestQueueSize() const {
  463. return Impl_->GetRequestQueueSize();
  464. }
  465. size_t THttpServer::GetFailQueueSize() const {
  466. return Impl_->GetFailQueueSize();
  467. }
  468. const IThreadPool& THttpServer::GetRequestQueue() const {
  469. return Impl_->GetRequestQueue();
  470. }
  471. const IThreadPool& THttpServer::GetFailQueue() const {
  472. return Impl_->GetFailQueue();
  473. }
  474. bool THttpServer::MaxRequestsReached() const {
  475. return Impl_->MaxRequestsReached();
  476. }
  477. TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, NAddr::IRemoteAddrRef listenerSockAddrRef)
  478. : Socket_(s)
  479. , ListenerSockAddrRef_(listenerSockAddrRef)
  480. , HttpServ_(serv)
  481. {
  482. SetNoDelay(Socket_, true);
  483. const TDuration& clientTimeout = HttpServ_->Options().ClientTimeout;
  484. if (clientTimeout != TDuration::Zero()) {
  485. SetSocketTimeout(Socket_, (long)clientTimeout.Seconds(), clientTimeout.MilliSecondsOfSecond());
  486. }
  487. HttpServ_->IncreaseConnections();
  488. }
  489. TClientConnection::~TClientConnection() {
  490. if (!CleanupState_.Closed) {
  491. HttpServ_->DecreaseConnections();
  492. }
  493. }
  494. void TClientConnection::ScheduleDelete() {
  495. Socket_.Close();
  496. HttpServ_->DecreaseConnections();
  497. CleanupState_.ThreadMask = ((ui64)1 << HttpServ_->Options().nListenerThreads) - 1;
  498. CleanupState_.Closed = true;
  499. }
  500. void TClientConnection::OnPollEvent(TInstant now) {
  501. THolder<TClientConnection> this_(this);
  502. Activate(now);
  503. {
  504. char tmp[1];
  505. if (::recv(Socket_, tmp, 1, MSG_PEEK) < 1) {
  506. /*
  507. * We can received a FIN so our socket was moved to
  508. * TCP_CLOSE_WAIT state. Check it before adding work
  509. * for this socket.
  510. */
  511. return;
  512. }
  513. }
  514. THolder<TClientRequest> obj(HttpServ_->CreateRequest(this_));
  515. AcceptMoment = now;
  516. HttpServ_->AddRequest(obj, Reject_);
  517. }
  518. void TClientConnection::Activate(TInstant now) noexcept {
  519. HttpServ_->Connections->Erase(this, now);
  520. LastUsed = now;
  521. ++ReceivedRequests;
  522. }
  523. void TClientConnection::DeActivate() {
  524. HttpServ_->Connections->Add(this);
  525. }
  526. void TClientConnection::Reject() {
  527. Reject_ = true;
  528. HttpServ_->Connections->Add(this);
  529. }
  530. TClientRequest::TClientRequest() {
  531. }
  532. TClientRequest::~TClientRequest() {
  533. }
  534. bool TClientRequest::Reply(void* /*ThreadSpecificResource*/) {
  535. if (strnicmp(RequestString.data(), "GET ", 4)) {
  536. Output() << "HTTP/1.0 501 Not Implemented\r\n\r\n";
  537. } else {
  538. Output() << "HTTP/1.0 200 OK\r\n"
  539. "Content-Type: text/html\r\n"
  540. "\r\n"
  541. "Hello World!\r\n";
  542. }
  543. return true;
  544. }
  545. bool TClientRequest::IsLocal() const {
  546. return HasLocalAddress(Socket());
  547. }
  548. bool TClientRequest::CheckLoopback() {
  549. bool isLocal = false;
  550. try {
  551. isLocal = IsLocal();
  552. } catch (const yexception& e) {
  553. Output() << "HTTP/1.0 500 Oops\r\n\r\n"
  554. << e.what() << "\r\n";
  555. return false;
  556. }
  557. if (!isLocal) {
  558. Output() << "HTTP/1.0 403 Permission denied\r\n"
  559. "Content-Type: text/html; charset=windows-1251\r\n"
  560. "Connection: close\r\n"
  561. "\r\n"
  562. "<html><head><title>Permission denied</title></head>"
  563. "<body><h1>Permission denied</h1>"
  564. "<p>This request must be sent from the localhost.</p>"
  565. "</body></html>\r\n";
  566. return false;
  567. }
  568. return true;
  569. }
  570. void TClientRequest::ReleaseConnection() {
  571. if (Conn_ && HttpConn_ && HttpServ()->Options().KeepAliveEnabled && HttpConn_->CanBeKeepAlive() && (!HttpServ()->Options().RejectExcessConnections || !HttpServ()->MaxRequestsReached())) {
  572. Output().Finish();
  573. Conn_->DeActivate();
  574. Y_UNUSED(Conn_.Release());
  575. }
  576. }
  577. void TClientRequest::ResetConnection() {
  578. if (HttpConn_) {
  579. // send RST packet to client
  580. HttpConn_->Reset();
  581. HttpConn_.Destroy();
  582. }
  583. }
  584. void TClientRequest::Process(void* ThreadSpecificResource) {
  585. THolder<TClientRequest> this_(this);
  586. auto* serverImpl = Conn_->HttpServ_;
  587. try {
  588. if (!HttpConn_) {
  589. const size_t outputBufferSize = HttpServ()->Options().OutputBufferSize;
  590. if (outputBufferSize) {
  591. HttpConn_.Reset(new THttpServerConn(Socket(), outputBufferSize));
  592. } else {
  593. HttpConn_.Reset(new THttpServerConn(Socket()));
  594. }
  595. auto maxRequestsPerConnection = HttpServ()->Options().MaxRequestsPerConnection;
  596. HttpConn_->Output()->EnableKeepAlive(HttpServ()->Options().KeepAliveEnabled && (!maxRequestsPerConnection || Conn_->ReceivedRequests < maxRequestsPerConnection));
  597. HttpConn_->Output()->EnableCompression(HttpServ()->Options().CompressionEnabled);
  598. }
  599. if (!BeforeParseRequestOk(ThreadSpecificResource)) {
  600. ReleaseConnection();
  601. return;
  602. }
  603. if (ParsedHeaders.empty()) {
  604. RequestString = Input().FirstLine();
  605. const THttpHeaders& h = Input().Headers();
  606. ParsedHeaders.reserve(h.Count());
  607. for (THttpHeaders::TConstIterator it = h.Begin(); it != h.End(); ++it) {
  608. ParsedHeaders.emplace_back(it->Name(), it->Value());
  609. }
  610. }
  611. if (Reply(ThreadSpecificResource)) {
  612. ReleaseConnection();
  613. /*
  614. * *this will be destroyed...
  615. */
  616. return;
  617. }
  618. } catch (...) {
  619. serverImpl->Cb_->OnException();
  620. throw;
  621. }
  622. Y_UNUSED(this_.Release());
  623. }
  624. void TClientRequest::ProcessFailRequest(int failstate) {
  625. Output() << "HTTP/1.1 503 Service Unavailable\r\n"
  626. "Content-Type: text/plain\r\n"
  627. "Content-Length: 21\r\n"
  628. "\r\n"
  629. "Service Unavailable\r\n";
  630. TString url;
  631. if (!strnicmp(RequestString.data(), "GET ", 4)) {
  632. // Trying to extract url...
  633. const char* str = RequestString.data();
  634. // Skipping spaces before url...
  635. size_t start = 3;
  636. while (str[start] == ' ') {
  637. ++start;
  638. }
  639. if (str[start]) {
  640. // Traversing url...
  641. size_t idx = start;
  642. while (str[idx] != ' ' && str[idx]) {
  643. ++idx;
  644. }
  645. url = RequestString.substr(start, idx - start);
  646. }
  647. }
  648. const THttpServer::ICallBack::TFailLogData d = {
  649. failstate,
  650. url,
  651. };
  652. // Handling failure...
  653. Conn_->HttpServ_->Cb_->OnFailRequestEx(d);
  654. Output().Flush();
  655. }
  656. THttpServer* TClientRequest::HttpServ() const noexcept {
  657. return Conn_->HttpServ_->Parent_;
  658. }
  659. const TSocket& TClientRequest::Socket() const noexcept {
  660. return Conn_->Socket_;
  661. }
  662. NAddr::IRemoteAddrRef TClientRequest::GetListenerSockAddrRef() const noexcept {
  663. return Conn_->ListenerSockAddrRef_;
  664. }
  665. TInstant TClientRequest::AcceptMoment() const noexcept {
  666. return Conn_->AcceptMoment;
  667. }
  668. /*
  669. * TRequestReplier
  670. */
  671. TRequestReplier::TRequestReplier() {
  672. }
  673. TRequestReplier::~TRequestReplier() {
  674. }
  675. bool TRequestReplier::Reply(void* threadSpecificResource) {
  676. const TReplyParams params = {
  677. threadSpecificResource, Input(), Output()};
  678. return DoReply(params);
  679. }
  680. bool TryToBindAddresses(const THttpServerOptions& options, const std::function<void(TSocket)>* callbackOnBoundAddress) {
  681. THttpServerOptions::TBindAddresses addrs;
  682. try {
  683. options.BindAddresses(addrs);
  684. } catch (const std::exception&) {
  685. return false;
  686. }
  687. for (const auto& na : addrs) {
  688. for (TNetworkAddress::TIterator ai = na.Begin(); ai != na.End(); ++ai) {
  689. NAddr::TAddrInfo addr(&*ai);
  690. TSocket socket(::socket(addr.Addr()->sa_family, SOCK_STREAM, 0));
  691. if (socket == INVALID_SOCKET) {
  692. return false;
  693. }
  694. FixIPv6ListenSocket(socket);
  695. if (options.ReuseAddress) {
  696. int yes = 1;
  697. ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes));
  698. }
  699. if (options.ReusePort) {
  700. SetReusePort(socket, true);
  701. }
  702. if (::bind(socket, addr.Addr(), addr.Len()) == SOCKET_ERROR) {
  703. return false;
  704. }
  705. if (::listen(socket, options.ListenBacklog) == SOCKET_ERROR) {
  706. return false;
  707. }
  708. if (callbackOnBoundAddress != nullptr) {
  709. (*callbackOnBoundAddress)(socket);
  710. }
  711. }
  712. }
  713. return true;
  714. }