|
- #include "http2.h"
- #include "conn_cache.h"
- #include "details.h"
- #include "factory.h"
- #include "http_common.h"
- #include "smart_ptr.h"
- #include "utils.h"
- #include <library/cpp/http/push_parser/http_parser.h>
- #include <library/cpp/http/misc/httpcodes.h>
- #include <library/cpp/http/misc/parsed_request.h>
- #include <library/cpp/neh/asio/executor.h>
- #include <util/generic/singleton.h>
- #include <util/generic/vector.h>
- #include <util/network/iovec.h>
- #include <util/stream/output.h>
- #include <util/stream/zlib.h>
- #include <util/system/condvar.h>
- #include <util/system/mutex.h>
- #include <util/system/spinlock.h>
- #include <util/system/yassert.h>
- #include <util/thread/factory.h>
- #include <util/thread/singleton.h>
- #include <util/system/sanitizers.h>
- #include <util/system/thread.h>
- #include <atomic>
- #if defined(_unix_)
- #include <sys/ioctl.h>
- #endif
- #if defined(_linux_)
- #undef SIOCGSTAMP
- #undef SIOCGSTAMPNS
- #include <linux/sockios.h>
- #define FIONWRITE SIOCOUTQ
- #endif
- //#define DEBUG_HTTP2
- #ifdef DEBUG_HTTP2
- #define DBGOUT(args) Cout << args << Endl;
- #else
- #define DBGOUT(args)
- #endif
- using namespace NDns;
- using namespace NAsio;
- using namespace NNeh;
- using namespace NNeh::NHttp;
- using namespace NNeh::NHttp2;
- using namespace std::placeholders;
- //
- // has complex keep-alive references between entities in multi-thread enviroment,
- // this create risks for races/memory leak, etc..
- // so connecting/disconnecting entities must be doing carefully
- //
- // handler <=-> request <==> connection(socket) <= handlers, stored in io_service
- // ^
- // +== connections_cache
- // '=>' -- shared/intrusive ptr
- // '->' -- weak_ptr
- //
- static TDuration FixTimeoutForSanitizer(const TDuration timeout) {
- ui64 multiplier = 1;
- if (NSan::ASanIsOn()) {
- // https://github.com/google/sanitizers/wiki/AddressSanitizer
- multiplier = 4;
- } else if (NSan::MSanIsOn()) {
- // via https://github.com/google/sanitizers/wiki/MemorySanitizer
- multiplier = 3;
- } else if (NSan::TSanIsOn()) {
- // via https://clang.llvm.org/docs/ThreadSanitizer.html
- multiplier = 15;
- }
- return TDuration::FromValue(timeout.GetValue() * multiplier);
- }
- TDuration THttp2Options::ConnectTimeout = FixTimeoutForSanitizer(TDuration::MilliSeconds(1000));
- TDuration THttp2Options::InputDeadline = TDuration::Max();
- TDuration THttp2Options::OutputDeadline = TDuration::Max();
- TDuration THttp2Options::SymptomSlowConnect = FixTimeoutForSanitizer(TDuration::MilliSeconds(10));
- size_t THttp2Options::InputBufferSize = 16 * 1024;
- bool THttp2Options::KeepInputBufferForCachedConnections = false;
- size_t THttp2Options::AsioThreads = 4;
- size_t THttp2Options::AsioServerThreads = 4;
- bool THttp2Options::EnsureSendingCompleteByAck = false;
- int THttp2Options::Backlog = 100;
- TDuration THttp2Options::ServerInputDeadline = FixTimeoutForSanitizer(TDuration::MilliSeconds(500));
- TDuration THttp2Options::ServerOutputDeadline = TDuration::Max();
- TDuration THttp2Options::ServerInputDeadlineKeepAliveMax = FixTimeoutForSanitizer(TDuration::Seconds(120));
- TDuration THttp2Options::ServerInputDeadlineKeepAliveMin = FixTimeoutForSanitizer(TDuration::Seconds(10));
- bool THttp2Options::ServerUseDirectWrite = false;
- bool THttp2Options::UseResponseAsErrorMessage = false;
- bool THttp2Options::FullHeadersAsErrorMessage = false;
- bool THttp2Options::ErrorDetailsAsResponseBody = false;
- bool THttp2Options::RedirectionNotError = false;
- bool THttp2Options::AnyResponseIsNotError = false;
- bool THttp2Options::TcpKeepAlive = false;
- i32 THttp2Options::LimitRequestsPerConnection = -1;
- bool THttp2Options::QuickAck = false;
- bool THttp2Options::UseAsyncSendRequest = false;
- bool THttp2Options::RespectHostInHttpServerNetworkAddress = false;
- bool THttp2Options::Set(TStringBuf name, TStringBuf value) {
- #define HTTP2_TRY_SET(optType, optName) \
- if (name == TStringBuf(#optName)) { \
- optName = FromString<optType>(value); \
- }
- HTTP2_TRY_SET(TDuration, ConnectTimeout)
- else HTTP2_TRY_SET(TDuration, InputDeadline)
- else HTTP2_TRY_SET(TDuration, OutputDeadline)
- else HTTP2_TRY_SET(TDuration, SymptomSlowConnect) else HTTP2_TRY_SET(size_t, InputBufferSize) else HTTP2_TRY_SET(bool, KeepInputBufferForCachedConnections) else HTTP2_TRY_SET(size_t, AsioThreads) else HTTP2_TRY_SET(size_t, AsioServerThreads) else HTTP2_TRY_SET(bool, EnsureSendingCompleteByAck) else HTTP2_TRY_SET(int, Backlog) else HTTP2_TRY_SET(TDuration, ServerInputDeadline) else HTTP2_TRY_SET(TDuration, ServerOutputDeadline) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMax) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMin) else HTTP2_TRY_SET(bool, ServerUseDirectWrite) else HTTP2_TRY_SET(bool, UseResponseAsErrorMessage) else HTTP2_TRY_SET(bool, FullHeadersAsErrorMessage) else HTTP2_TRY_SET(bool, ErrorDetailsAsResponseBody) else HTTP2_TRY_SET(bool, RedirectionNotError) else HTTP2_TRY_SET(bool, AnyResponseIsNotError) else HTTP2_TRY_SET(bool, TcpKeepAlive) else HTTP2_TRY_SET(i32, LimitRequestsPerConnection) else HTTP2_TRY_SET(bool, QuickAck)
- else HTTP2_TRY_SET(bool, UseAsyncSendRequest) else {
- return false;
- }
- return true;
- }
- namespace NNeh {
- const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType);
- }
- namespace {
- //#define DEBUG_STAT
- #ifdef DEBUG_STAT
- struct TDebugStat {
- static std::atomic<size_t> ConnTotal;
- static std::atomic<size_t> ConnActive;
- static std::atomic<size_t> ConnCached;
- static std::atomic<size_t> ConnDestroyed;
- static std::atomic<size_t> ConnFailed;
- static std::atomic<size_t> ConnConnCanceled;
- static std::atomic<size_t> ConnSlow;
- static std::atomic<size_t> Conn2Success;
- static std::atomic<size_t> ConnPurgedInCache;
- static std::atomic<size_t> ConnDestroyedInCache;
- static std::atomic<size_t> RequestTotal;
- static std::atomic<size_t> RequestSuccessed;
- static std::atomic<size_t> RequestFailed;
- static void Print() {
- Cout << "ct=" << ConnTotal.load(std::memory_order_acquire)
- << " ca=" << ConnActive.load(std::memory_order_acquire)
- << " cch=" << ConnCached.load(std::memory_order_acquire)
- << " cd=" << ConnDestroyed.load(std::memory_order_acquire)
- << " cf=" << ConnFailed.load(std::memory_order_acquire)
- << " ccc=" << ConnConnCanceled.load(std::memory_order_acquire)
- << " csl=" << ConnSlow.load(std::memory_order_acquire)
- << " c2s=" << Conn2Success.load(std::memory_order_acquire)
- << " cpc=" << ConnPurgedInCache.load(std::memory_order_acquire)
- << " cdc=" << ConnDestroyedInCache.load(std::memory_order_acquire)
- << " rt=" << RequestTotal.load(std::memory_order_acquire)
- << " rs=" << RequestSuccessed.load(std::memory_order_acquire)
- << " rf=" << RequestFailed.load(std::memory_order_acquire)
- << Endl;
- }
- };
- std::atomic<size_t> TDebugStat::ConnTotal = 0;
- std::atomic<size_t> TDebugStat::ConnActive = 0;
- std::atomic<size_t> TDebugStat::ConnCached = 0;
- std::atomic<size_t> TDebugStat::ConnDestroyed = 0;
- std::atomic<size_t> TDebugStat::ConnFailed = 0;
- std::atomic<size_t> TDebugStat::ConnConnCanceled = 0;
- std::atomic<size_t> TDebugStat::ConnSlow = 0;
- std::atomic<size_t> TDebugStat::Conn2Success = 0;
- std::atomic<size_t> TDebugStat::ConnPurgedInCache = 0;
- std::atomic<size_t> TDebugStat::ConnDestroyedInCache = 0;
- std::atomic<size_t> TDebugStat::RequestTotal = 0;
- std::atomic<size_t> TDebugStat::RequestSuccessed = 0;
- std::atomic<size_t> TDebugStat::RequestFailed = 0;
- #endif
- inline void PrepareSocket(SOCKET s, const TRequestSettings& requestSettings = TRequestSettings()) {
- if (requestSettings.NoDelay) {
- SetNoDelay(s, true);
- }
- }
- bool Compress(TData& data, const TString& compressionScheme) {
- if (compressionScheme == "gzip" && data.size() > 23) { // there is no string less than 24 bytes long that might be compressed with gzip
- try {
- TData gzipped(data.size());
- TMemoryOutput out(gzipped.data(), gzipped.size());
- TZLibCompress c(&out, ZLib::GZip);
- c.Write(data.data(), data.size());
- c.Finish();
- gzipped.resize(out.Buf() - gzipped.data());
- data.swap(gzipped);
- return true;
- } catch (yexception&) {
- // gzipped data occupies more space than original data
- }
- }
- return false;
- }
- class THttpRequestBuffers: public NAsio::TTcpSocket::IBuffers {
- public:
- THttpRequestBuffers(TRequestData::TPtr rd)
- : Req_(rd)
- , Parts_(Req_->Parts())
- , IOvec_(Parts_.data(), Parts_.size())
- {
- }
- TContIOVector* GetIOvec() override {
- return &IOvec_;
- }
- private:
- TRequestData::TPtr Req_;
- TVector<IOutputStream::TPart> Parts_;
- TContIOVector IOvec_;
- };
- struct TRequestGet1: public TRequestGet {
- static inline TStringBuf Name() noexcept {
- return TStringBuf("http");
- }
- };
- struct TRequestPost1: public TRequestPost {
- static inline TStringBuf Name() noexcept {
- return TStringBuf("post");
- }
- };
- struct TRequestFull1: public TRequestFull {
- static inline TStringBuf Name() noexcept {
- return TStringBuf("full");
- }
- };
- struct TRequestGet2: public TRequestGet {
- static inline TStringBuf Name() noexcept {
- return TStringBuf("http2");
- }
- };
- struct TRequestPost2: public TRequestPost {
- static inline TStringBuf Name() noexcept {
- return TStringBuf("post2");
- }
- };
- struct TRequestFull2: public TRequestFull {
- static inline TStringBuf Name() noexcept {
- return TStringBuf("full2");
- }
- };
- struct TRequestUnixSocketGet: public TRequestGet {
- static inline TStringBuf Name() noexcept {
- return TStringBuf("http+unix");
- }
- static TRequestSettings RequestSettings() {
- return TRequestSettings()
- .SetNoDelay(false)
- .SetResolverType(EResolverType::EUNIXSOCKET);
- }
- };
- struct TRequestUnixSocketPost: public TRequestPost {
- static inline TStringBuf Name() noexcept {
- return TStringBuf("post+unix");
- }
- static TRequestSettings RequestSettings() {
- return TRequestSettings()
- .SetNoDelay(false)
- .SetResolverType(EResolverType::EUNIXSOCKET);
- }
- };
- struct TRequestUnixSocketFull: public TRequestFull {
- static inline TStringBuf Name() noexcept {
- return TStringBuf("full+unix");
- }
- static TRequestSettings RequestSettings() {
- return TRequestSettings()
- .SetNoDelay(false)
- .SetResolverType(EResolverType::EUNIXSOCKET);
- }
- };
- typedef TAutoPtr<THttpRequestBuffers> THttpRequestBuffersPtr;
- class THttpRequest;
- typedef TSharedPtrB<THttpRequest> THttpRequestRef;
- class THttpConn;
- typedef TIntrusivePtr<THttpConn> THttpConnRef;
- typedef std::function<TRequestData::TPtr(const TMessage&, const TParsedLocation&)> TRequestBuilder;
- class THttpRequest {
- public:
- class THandle: public TSimpleHandle {
- public:
- THandle(IOnRecv* f, const TMessage& msg, TStatCollector* s) noexcept
- : TSimpleHandle(f, msg, s)
- {
- }
- bool MessageSendedCompletely() const noexcept override {
- if (TSimpleHandle::MessageSendedCompletely()) {
- return true;
- }
- THttpRequestRef req(GetRequest());
- if (!!req && req->RequestSendedCompletely()) {
- const_cast<THandle*>(this)->SetSendComplete();
- }
- return TSimpleHandle::MessageSendedCompletely();
- }
- void Cancel() noexcept override {
- if (TSimpleHandle::Canceled()) {
- return;
- }
- THttpRequestRef req(GetRequest());
- if (!!req) {
- TSimpleHandle::Cancel();
- req->Cancel();
- }
- }
- void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) {
- #ifdef DEBUG_STAT
- ++TDebugStat::RequestFailed;
- #endif
- if (rsp) {
- TSimpleHandle::NotifyError(error, rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers());
- } else {
- TSimpleHandle::NotifyError(error);
- }
- ReleaseRequest();
- }
- //not thread safe!
- void SetRequest(const TWeakPtrB<THttpRequest>& r) noexcept {
- Req_ = r;
- }
- private:
- THttpRequestRef GetRequest() const noexcept {
- TGuard<TSpinLock> g(SP_);
- return Req_;
- }
- void ReleaseRequest() noexcept {
- TWeakPtrB<THttpRequest> tmp;
- TGuard<TSpinLock> g(SP_);
- tmp.Swap(Req_);
- }
- mutable TSpinLock SP_;
- TWeakPtrB<THttpRequest> Req_;
- };
- typedef TIntrusivePtr<THandle> THandleRef;
- static void Run(THandleRef& h, const TMessage& msg, TRequestBuilder f, const TRequestSettings& s) {
- THttpRequestRef req(new THttpRequest(h, msg, f, s));
- req->WeakThis_ = req;
- h->SetRequest(req->WeakThis_);
- req->Run(req);
- }
- ~THttpRequest() {
- DBGOUT("~THttpRequest()");
- }
- private:
- THttpRequest(THandleRef& h, TMessage msg, TRequestBuilder f, const TRequestSettings& s)
- : Hndl_(h)
- , RequestBuilder_(f)
- , RequestSettings_(s)
- , Msg_(std::move(msg))
- , Loc_(Msg_.Addr)
- , Addr_(Resolve(Loc_.Host, Loc_.GetPort(), RequestSettings_.ResolverType))
- , AddrIter_(Addr_->Addr.Begin())
- , Canceled_(false)
- , RequestSendedCompletely_(false)
- {
- }
- void Run(THttpRequestRef& req);
- public:
- THttpRequestBuffersPtr BuildRequest() {
- return new THttpRequestBuffers(RequestBuilder_(Msg_, Loc_));
- }
- TRequestSettings RequestSettings() {
- return RequestSettings_;
- }
- //can create a spare socket in an attempt to decrease connecting time
- void OnDetectSlowConnecting();
- //remove extra connection on success connec
- void OnConnect(THttpConn* c);
- //have some response input
- void OnBeginRead() noexcept {
- RequestSendedCompletely_ = true;
- }
- void OnResponse(TAutoPtr<THttpParser>& rsp);
- void OnConnectFailed(THttpConn* c, const TErrorCode& ec);
- void OnSystemError(THttpConn* c, const TErrorCode& ec);
- void OnError(THttpConn* c, const TString& errorText);
- bool RequestSendedCompletely() noexcept;
- void Cancel() noexcept;
- private:
- void NotifyResponse(const TString& resp, const TString& firstLine, const THttpHeaders& headers) {
- THandleRef h(ReleaseHandler());
- if (!!h) {
- h->NotifyResponse(resp, firstLine, headers);
- }
- }
- void NotifyError(
- const TString& errorText,
- TError::TType errorType = TError::UnknownType,
- i32 errorCode = 0, i32 systemErrorCode = 0) {
- NotifyError(new TError(errorText, errorType, errorCode, systemErrorCode));
- }
- void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) {
- THandleRef h(ReleaseHandler());
- if (!!h) {
- h->NotifyError(error, rsp);
- }
- }
- void Finalize(THttpConn* skipConn = nullptr) noexcept;
- inline THandleRef ReleaseHandler() noexcept {
- THandleRef h;
- {
- TGuard<TSpinLock> g(SL_);
- h.Swap(Hndl_);
- }
- return h;
- }
- inline THttpConnRef GetConn() noexcept {
- TGuard<TSpinLock> g(SL_);
- return Conn_;
- }
- inline THttpConnRef ReleaseConn() noexcept {
- THttpConnRef c;
- {
- TGuard<TSpinLock> g(SL_);
- c.Swap(Conn_);
- }
- return c;
- }
- inline THttpConnRef ReleaseConn2() noexcept {
- THttpConnRef c;
- {
- TGuard<TSpinLock> g(SL_);
- c.Swap(Conn2_);
- }
- return c;
- }
- TSpinLock SL_; //guaranted calling notify() only once (prevent race between asio thread and current)
- THandleRef Hndl_;
- TRequestBuilder RequestBuilder_;
- TRequestSettings RequestSettings_;
- const TMessage Msg_;
- const TParsedLocation Loc_;
- const TResolvedHost* Addr_;
- TNetworkAddress::TIterator AddrIter_;
- THttpConnRef Conn_;
- THttpConnRef Conn2_; //concurrent connection used, if detected slow connecting on first connection
- TWeakPtrB<THttpRequest> WeakThis_;
- TAtomicBool Canceled_;
- TAtomicBool RequestSendedCompletely_;
- };
- TAtomicCounter* HttpOutConnCounter();
- class THttpConn: public TThrRefBase {
- public:
- static THttpConnRef Create(TIOService& srv);
- ~THttpConn() override {
- DBGOUT("~THttpConn()");
- Req_.Reset();
- HttpOutConnCounter()->Dec();
- #ifdef DEBUG_STAT
- ++TDebugStat::ConnDestroyed;
- #endif
- }
- void StartRequest(THttpRequestRef req, const TEndpoint& ep, size_t addrId, TDuration slowConn) {
- {
- //thread safe linking connection->request
- TGuard<TSpinLock> g(SL_);
- Req_ = req;
- }
- AddrId_ = addrId;
- try {
- TDuration connectDeadline(THttp2Options::ConnectTimeout);
- if (THttp2Options::ConnectTimeout > slowConn) {
- //use append non fatal connect deadline, so on first timedout
- //report about slow connecting to THttpRequest, and continue wait ConnectDeadline_ period
- connectDeadline = slowConn;
- ConnectDeadline_ = THttp2Options::ConnectTimeout - slowConn;
- }
- DBGOUT("AsyncConnect to " << ep.IpToString());
- AS_.AsyncConnect(ep, std::bind(&THttpConn::OnConnect, THttpConnRef(this), _1, _2), connectDeadline);
- } catch (...) {
- ReleaseRequest();
- throw;
- }
- }
- //start next request on keep-alive connection
- bool StartNextRequest(THttpRequestRef& req) {
- if (Finalized_) {
- return false;
- }
- {
- //thread safe linking connection->request
- TGuard<TSpinLock> g(SL_);
- Req_ = req;
- }
- RequestWritten_ = false;
- BeginReadResponse_ = false;
- try {
- TErrorCode ec;
- SendRequest(req->BuildRequest(), ec); //throw std::bad_alloc
- if (ec.Value() == ECANCELED) {
- OnCancel();
- } else if (ec) {
- OnError(ec);
- }
- } catch (...) {
- OnError(CurrentExceptionMessage());
- throw;
- }
- return true;
- }
- //connection received from cache must be validated before using
- //(process removing closed conection from cache consume some time)
- inline bool IsValid() const noexcept {
- return !Finalized_;
- }
- void SetCached(bool v) noexcept {
- Cached_ = v;
- }
- void Close() noexcept {
- try {
- Cancel();
- } catch (...) {
- }
- }
- void DetachRequest() noexcept {
- ReleaseRequest();
- }
- void Cancel() { //throw std::bad_alloc
- if (!Canceled_) {
- Canceled_ = true;
- Finalized_ = true;
- OnCancel();
- AS_.AsyncCancel();
- }
- }
- void OnCancel() {
- THttpRequestRef r(ReleaseRequest());
- if (!!r) {
- static const TString reqCanceled("request canceled");
- r->OnError(this, reqCanceled);
- }
- }
- bool RequestSendedCompletely() const noexcept {
- DBGOUT("RequestSendedCompletely()");
- if (!Connected_ || !RequestWritten_) {
- return false;
- }
- if (BeginReadResponse_) {
- return true;
- }
- #if defined(FIONWRITE)
- if (THttp2Options::EnsureSendingCompleteByAck) {
- int nbytes = Max<int>();
- int err = ioctl(AS_.Native(), FIONWRITE, &nbytes);
- return err ? false : nbytes == 0;
- }
- #endif
- return true;
- }
- TIOService& GetIOService() const noexcept {
- return AS_.GetIOService();
- }
- private:
- THttpConn(TIOService& srv)
- : AddrId_(0)
- , AS_(srv)
- , BuffSize_(THttp2Options::InputBufferSize)
- , Connected_(false)
- , Cached_(false)
- , Canceled_(false)
- , Finalized_(false)
- , InAsyncRead_(false)
- , RequestWritten_(false)
- , BeginReadResponse_(false)
- {
- HttpOutConnCounter()->Inc();
- }
- //can be called only from asio
- void OnConnect(const TErrorCode& ec, IHandlingContext& ctx) {
- DBGOUT("THttpConn::OnConnect: " << ec.Value());
- if (Y_UNLIKELY(ec)) {
- if (ec.Value() == ETIMEDOUT && ConnectDeadline_.GetValue()) {
- //detect slow connecting (yet not reached final timeout)
- DBGOUT("OnConnectTimingCheck");
- THttpRequestRef req(GetRequest());
- if (!req) {
- return; //cancel from client thread can ahead us
- }
- TDuration newDeadline(ConnectDeadline_);
- ConnectDeadline_ = TDuration::Zero(); //next timeout is final
- req->OnDetectSlowConnecting();
- //continue wait connect
- ctx.ContinueUseHandler(newDeadline);
- return;
- }
- #ifdef DEBUG_STAT
- if (ec.Value() != ECANCELED) {
- ++TDebugStat::ConnFailed;
- } else {
- ++TDebugStat::ConnConnCanceled;
- }
- #endif
- if (ec.Value() == EIO) {
- //try get more detail error info
- char buf[1];
- TErrorCode errConnect;
- AS_.ReadSome(buf, 1, errConnect);
- OnConnectFailed(errConnect.Value() ? errConnect : ec);
- } else if (ec.Value() == ECANCELED) {
- // not try connecting to next host ip addr, simple fail
- OnError(ec);
- } else {
- OnConnectFailed(ec);
- }
- } else {
- Connected_ = true;
- THttpRequestRef req(GetRequest());
- if (!req || Canceled_) {
- return;
- }
- try {
- PrepareSocket(AS_.Native(), req->RequestSettings());
- if (THttp2Options::TcpKeepAlive) {
- SetKeepAlive(AS_.Native(), true);
- }
- } catch (TSystemError& err) {
- TErrorCode ec2(err.Status());
- OnError(ec2);
- return;
- }
- req->OnConnect(this);
- THttpRequestBuffersPtr ptr(req->BuildRequest());
- PrepareParser();
- TErrorCode ec3;
- SendRequest(ptr, ec3);
- if (ec3) {
- OnError(ec3);
- }
- }
- }
- void PrepareParser() {
- Prs_ = new THttpParser();
- Prs_->Prepare();
- }
- void SendRequest(const THttpRequestBuffersPtr& bfs, TErrorCode& ec) { //throw std::bad_alloc
- if (!THttp2Options::UseAsyncSendRequest) {
- size_t amount = AS_.WriteSome(*bfs->GetIOvec(), ec);
- if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK && ec.Value() != EINPROGRESS) {
- return;
- }
- ec.Assign(0);
- bfs->GetIOvec()->Proceed(amount);
- if (bfs->GetIOvec()->Complete()) {
- RequestWritten_ = true;
- StartRead();
- } else {
- SendRequestAsync(bfs);
- }
- } else {
- SendRequestAsync(bfs);
- }
- }
- void SendRequestAsync(const THttpRequestBuffersPtr& bfs) {
- NAsio::TTcpSocket::TSendedData sd(bfs.Release());
- AS_.AsyncWrite(sd, std::bind(&THttpConn::OnWrite, THttpConnRef(this), _1, _2, _3), THttp2Options::OutputDeadline);
- }
- void OnWrite(const TErrorCode& err, size_t amount, IHandlingContext& ctx) {
- Y_UNUSED(amount);
- Y_UNUSED(ctx);
- if (err) {
- OnError(err);
- } else {
- DBGOUT("OnWrite()");
- RequestWritten_ = true;
- StartRead();
- }
- }
- inline void StartRead() {
- if (!InAsyncRead_ && !Canceled_) {
- InAsyncRead_ = true;
- AS_.AsyncPollRead(std::bind(&THttpConn::OnCanRead, THttpConnRef(this), _1, _2), THttp2Options::InputDeadline);
- }
- }
- //can be called only from asio
- void OnReadSome(const TErrorCode& err, size_t bytes, IHandlingContext& ctx) {
- if (Y_UNLIKELY(err)) {
- OnError(err);
- return;
- }
- if (!BeginReadResponse_) {
- //used in MessageSendedCompletely()
- BeginReadResponse_ = true;
- THttpRequestRef r(GetRequest());
- if (!!r) {
- r->OnBeginRead();
- }
- }
- DBGOUT("receive:" << TStringBuf(Buff_.Get(), bytes));
- try {
- if (!Prs_) {
- throw yexception() << TStringBuf("receive some data while not in request");
- }
- #if defined(_linux_)
- if (THttp2Options::QuickAck) {
- SetSockOpt(AS_.Native(), SOL_TCP, TCP_QUICKACK, (int)1);
- }
- #endif
- DBGOUT("parse:");
- while (!Prs_->Parse(Buff_.Get(), bytes)) {
- if (BuffSize_ == bytes) {
- TErrorCode ec;
- bytes = AS_.ReadSome(Buff_.Get(), BuffSize_, ec);
- if (!ec) {
- continue;
- }
- if (ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) {
- OnError(ec);
- return;
- }
- }
- //continue async. read from socket
- ctx.ContinueUseHandler(THttp2Options::InputDeadline);
- return;
- }
- //succesfully reach end of http response
- THttpRequestRef r(ReleaseRequest());
- if (!r) {
- //lost race to req. canceling
- DBGOUT("connection failed");
- return;
- }
- DBGOUT("response:");
- bool keepALive = Prs_->IsKeepAlive();
- r->OnResponse(Prs_);
- if (!keepALive) {
- return;
- }
- //continue use connection (keep-alive mode)
- PrepareParser();
- if (!THttp2Options::KeepInputBufferForCachedConnections) {
- Buff_.Destroy();
- }
- //continue async. read from socket
- ctx.ContinueUseHandler(THttp2Options::InputDeadline);
- PutSelfToCache();
- } catch (...) {
- OnError(CurrentExceptionMessage());
- }
- }
- void PutSelfToCache();
- //method for reaction on input data for re-used keep-alive connection,
- //which free/release buffer when was placed in cache
- void OnCanRead(const TErrorCode& err, IHandlingContext& ctx) {
- if (Y_UNLIKELY(err)) {
- OnError(err);
- } else {
- if (!Buff_) {
- Buff_.Reset(new char[BuffSize_]);
- }
- TErrorCode ec;
- OnReadSome(ec, AS_.ReadSome(Buff_.Get(), BuffSize_, ec), ctx);
- }
- }
- //unlink connection and request, thread-safe mark connection as non valid
- inline THttpRequestRef GetRequest() noexcept {
- TGuard<TSpinLock> g(SL_);
- return Req_;
- }
- inline THttpRequestRef ReleaseRequest() noexcept {
- THttpRequestRef r;
- {
- TGuard<TSpinLock> g(SL_);
- r.Swap(Req_);
- }
- return r;
- }
- void OnConnectFailed(const TErrorCode& ec);
- inline void OnError(const TErrorCode& ec) {
- OnError(ec.Text());
- }
- inline void OnError(const TString& errText);
- size_t AddrId_;
- NAsio::TTcpSocket AS_;
- TArrayHolder<char> Buff_; //input buffer
- const size_t BuffSize_;
- TAutoPtr<THttpParser> Prs_; //input data parser & parsed info storage
- TSpinLock SL_;
- THttpRequestRef Req_; //current request
- TDuration ConnectDeadline_;
- TAtomicBool Connected_;
- TAtomicBool Cached_;
- TAtomicBool Canceled_;
- TAtomicBool Finalized_;
- bool InAsyncRead_;
- TAtomicBool RequestWritten_;
- TAtomicBool BeginReadResponse_;
- };
- //conn limits monitoring, cache clean, contain used in http clients asio threads/executors
- class THttpConnManager: public IThreadFactory::IThreadAble {
- public:
- THttpConnManager()
- : TotalConn(0)
- , EP_(THttp2Options::AsioThreads)
- , InPurging_(0)
- , MaxConnId_(0)
- , Shutdown_(false)
- {
- T_ = SystemThreadFactory()->Run(this);
- Limits.SetSoft(40000);
- Limits.SetHard(50000);
- }
- ~THttpConnManager() override {
- {
- TGuard<TMutex> g(PurgeMutex_);
- Shutdown_ = true;
- CondPurge_.Signal();
- }
- EP_.SyncShutdown();
- T_->Join();
- }
- inline void SetLimits(size_t softLimit, size_t hardLimit) noexcept {
- Limits.SetSoft(softLimit);
- Limits.SetHard(hardLimit);
- }
- inline std::pair<size_t, size_t> GetLimits() const noexcept {
- return {Limits.Soft(), Limits.Hard()};
- }
- inline void CheckLimits() {
- if (ExceedSoftLimit()) {
- SuggestPurgeCache();
- if (ExceedHardLimit()) {
- Y_ABORT("neh::http2 output connections limit reached");
- //ythrow yexception() << "neh::http2 output connections limit reached";
- }
- }
- }
- inline bool Get(THttpConnRef& conn, size_t addrId) {
- #ifdef DEBUG_STAT
- TDebugStat::ConnTotal.store(TotalConn.Val(), std::memory_order_release);
- TDebugStat::ConnActive.store(Active(), std::memory_order_release);
- TDebugStat::ConnCached.store(Cache_.Size(), std::memory_order_release);
- #endif
- return Cache_.Get(conn, addrId);
- }
- inline void Put(THttpConnRef& conn, size_t addrId) {
- if (Y_LIKELY(!Shutdown_ && !ExceedHardLimit() && !CacheDisabled())) {
- if (Y_UNLIKELY(addrId > (size_t)AtomicGet(MaxConnId_))) {
- AtomicSet(MaxConnId_, addrId);
- }
- Cache_.Put(conn, addrId);
- } else {
- conn->Close();
- conn.Drop();
- }
- }
- inline size_t OnConnError(size_t addrId) {
- return Cache_.Validate(addrId);
- }
- TIOService& GetIOService() {
- return EP_.GetExecutor().GetIOService();
- }
- bool CacheDisabled() const {
- return Limits.Soft() == 0;
- }
- bool IsShutdown() const noexcept {
- return Shutdown_;
- }
- TAtomicCounter TotalConn;
- private:
- inline size_t Total() const noexcept {
- return TotalConn.Val();
- }
- inline size_t Active() const noexcept {
- return TFdLimits::ExceedLimit(Total(), Cache_.Size());
- }
- inline size_t ExceedSoftLimit() const noexcept {
- return TFdLimits::ExceedLimit(Total(), Limits.Soft());
- }
- inline size_t ExceedHardLimit() const noexcept {
- return TFdLimits::ExceedLimit(Total(), Limits.Hard());
- }
- void SuggestPurgeCache() {
- if (AtomicTryLock(&InPurging_)) {
- //evaluate the usefulness of purging the cache
- //если в кеше мало соединений (< MaxConnId_/16 или 64), не чистим кеш
- if (Cache_.Size() > (Min((size_t)AtomicGet(MaxConnId_), (size_t)1024U) >> 4)) {
- //по мере приближения к hardlimit нужда в чистке cache приближается к 100%
- size_t closenessToHardLimit256 = ((Active() + 1) << 8) / (Limits.Delta() + 1);
- //чем больше соединений в кеше, а не в работе, тем менее нужен кеш (можно его почистить)
- size_t cacheUselessness256 = ((Cache_.Size() + 1) << 8) / (Active() + 1);
- //итого, - пороги срабатывания:
- //при достижении soft-limit, если соединения в кеше, а не в работе
- //на полпути от soft-limit к hard-limit, если в кеше больше половины соединений
- //при приближении к hardlimit пытаться почистить кеш почти постоянно
- if ((closenessToHardLimit256 + cacheUselessness256) >= 256U) {
- TGuard<TMutex> g(PurgeMutex_);
- CondPurge_.Signal();
- return; //memo: thread MUST unlock InPurging_ (see DoExecute())
- }
- }
- AtomicUnlock(&InPurging_);
- }
- }
- void DoExecute() override {
- TThread::SetCurrentThreadName("NehHttpConnMngr");
- while (true) {
- {
- TGuard<TMutex> g(PurgeMutex_);
- if (Shutdown_)
- return;
- CondPurge_.WaitI(PurgeMutex_);
- }
- PurgeCache();
- AtomicUnlock(&InPurging_);
- }
- }
- void PurgeCache() noexcept {
- //try remove at least ExceedSoftLimit() oldest connections from cache
- //вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша)
- size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (Cache_.Size() + 1))), (size_t)256U);
- size_t processed = 0;
- size_t maxConnId = AtomicGet(MaxConnId_);
- for (size_t i = 0; i <= maxConnId && !Shutdown_; ++i) {
- processed += Cache_.Purge(i, frac256);
- if (processed > 32) {
- #ifdef DEBUG_STAT
- TDebugStat::ConnPurgedInCache += processed;
- #endif
- processed = 0;
- Sleep(TDuration::MilliSeconds(10)); //prevent big spike cpu/system usage
- }
- }
- }
- TFdLimits Limits;
- TExecutorsPool EP_;
- TConnCache<THttpConn> Cache_;
- TAtomic InPurging_;
- TAtomic MaxConnId_;
- TAutoPtr<IThreadFactory::IThread> T_;
- TCondVar CondPurge_;
- TMutex PurgeMutex_;
- TAtomicBool Shutdown_;
- };
- THttpConnManager* HttpConnManager() {
- return Singleton<THttpConnManager>();
- }
- TAtomicCounter* HttpOutConnCounter() {
- return &HttpConnManager()->TotalConn;
- }
- THttpConnRef THttpConn::Create(TIOService& srv) {
- if (HttpConnManager()->IsShutdown()) {
- throw yexception() << "can't create connection with shutdowned service";
- }
- return new THttpConn(srv);
- }
- void THttpConn::PutSelfToCache() {
- THttpConnRef c(this);
- HttpConnManager()->Put(c, AddrId_);
- }
- void THttpConn::OnConnectFailed(const TErrorCode& ec) {
- THttpRequestRef r(GetRequest());
- if (!!r) {
- r->OnConnectFailed(this, ec);
- }
- OnError(ec);
- }
- void THttpConn::OnError(const TString& errText) {
- Finalized_ = true;
- if (Connected_) {
- Connected_ = false;
- TErrorCode ec;
- AS_.Shutdown(NAsio::TTcpSocket::ShutdownBoth, ec);
- } else {
- if (AS_.IsOpen()) {
- AS_.AsyncCancel();
- }
- }
- THttpRequestRef r(ReleaseRequest());
- if (!!r) {
- r->OnError(this, errText);
- } else {
- if (Cached_) {
- size_t res = HttpConnManager()->OnConnError(AddrId_);
- Y_UNUSED(res);
- #ifdef DEBUG_STAT
- TDebugStat::ConnDestroyedInCache += res;
- #endif
- }
- }
- }
- void THttpRequest::Run(THttpRequestRef& req) {
- #ifdef DEBUG_STAT
- if ((++TDebugStat::RequestTotal & 0xFFF) == 0) {
- TDebugStat::Print();
- }
- #endif
- try {
- while (!Canceled_) {
- THttpConnRef conn;
- if (HttpConnManager()->Get(conn, Addr_->Id)) {
- DBGOUT("Use connection from cache");
- Conn_ = conn; //thread magic
- if (!conn->StartNextRequest(req)) {
- continue; //if use connection from cache, ignore write error and try another conn
- }
- } else {
- HttpConnManager()->CheckLimits(); //here throw exception if reach hard limit (or atexit() state)
- Conn_ = THttpConn::Create(HttpConnManager()->GetIOService());
- TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_));
- Conn_->StartRequest(req, ep, Addr_->Id, THttp2Options::SymptomSlowConnect); // can throw
- }
- break;
- }
- } catch (...) {
- Conn_.Reset();
- throw;
- }
- }
- //it seems we have lost TCP SYN packet, create extra connection for decrease response time
- void THttpRequest::OnDetectSlowConnecting() {
- #ifdef DEBUG_STAT
- ++TDebugStat::ConnSlow;
- #endif
- //use some io_service (Run() thread-executor), from first conn. for more thread safety
- THttpConnRef conn = GetConn();
- if (!conn) {
- return;
- }
- THttpConnRef conn2;
- try {
- conn2 = THttpConn::Create(conn->GetIOService());
- } catch (...) {
- return; // cant create spare connection, simple continue use only main
- }
- {
- TGuard<TSpinLock> g(SL_);
- Conn2_ = conn2;
- }
- if (Y_UNLIKELY(Canceled_)) {
- ReleaseConn2();
- } else {
- //use connect timeout for disable detecting slow connecting on second conn.
- TEndpoint ep(new NAddr::TAddrInfo(&*Addr_->Addr.Begin()));
- try {
- conn2->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::ConnectTimeout);
- } catch (...) {
- // ignore errors on spare connection
- ReleaseConn2();
- }
- }
- }
- void THttpRequest::OnConnect(THttpConn* c) {
- THttpConnRef extraConn;
- {
- TGuard<TSpinLock> g(SL_);
- if (Y_UNLIKELY(!!Conn2_)) {
- //has pair concurrent conn, 'should stay only one'
- if (Conn2_.Get() == c) {
- #ifdef DEBUG_STAT
- ++TDebugStat::Conn2Success;
- #endif
- Conn2_.Swap(Conn_);
- }
- extraConn.Swap(Conn2_);
- }
- }
- if (!!extraConn) {
- extraConn->DetachRequest(); //prevent call OnError()
- extraConn->Close();
- }
- }
- void THttpRequest::OnResponse(TAutoPtr<THttpParser>& rsp) {
- DBGOUT("THttpRequest::OnResponse()");
- ReleaseConn();
- if (Y_LIKELY(((rsp->RetCode() >= 200 && rsp->RetCode() < (!THttp2Options::RedirectionNotError ? 300 : 400)) || THttp2Options::AnyResponseIsNotError))) {
- NotifyResponse(rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers());
- } else {
- TString message;
- if (THttp2Options::FullHeadersAsErrorMessage) {
- TStringStream err;
- err << rsp->FirstLine();
- THttpHeaders hdrs = rsp->Headers();
- for (auto h = hdrs.begin(); h < hdrs.end(); h++) {
- err << h->ToString() << TStringBuf("\r\n");
- }
- message = err.Str();
- } else if (THttp2Options::UseResponseAsErrorMessage) {
- message = rsp->DecodedContent();
- } else {
- TStringStream err;
- err << TStringBuf("request failed(") << rsp->FirstLine() << TStringBuf(")");
- message = err.Str();
- }
- NotifyError(new TError(message, TError::ProtocolSpecific, rsp->RetCode()), rsp.Get());
- }
- }
- void THttpRequest::OnConnectFailed(THttpConn* c, const TErrorCode& ec) {
- DBGOUT("THttpRequest::OnConnectFailed()");
- //detach/discard failed conn, try connect to next ip addr (if can)
- THttpConnRef cc(GetConn());
- if (c != cc.Get() || AddrIter_ == Addr_->Addr.End() || ++AddrIter_ == Addr_->Addr.End() || Canceled_) {
- return OnSystemError(c, ec);
- }
- // can try next host addr
- c->DetachRequest();
- c->Close();
- THttpConnRef nextConn;
- try {
- nextConn = THttpConn::Create(HttpConnManager()->GetIOService());
- } catch (...) {
- OnSystemError(nullptr, ec);
- return;
- }
- {
- THttpConnRef nc = nextConn;
- TGuard<TSpinLock> g(SL_);
- Conn_.Swap(nc);
- }
- TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_));
- try {
- nextConn->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::SymptomSlowConnect);
- } catch (...) {
- OnError(nullptr, CurrentExceptionMessage());
- return;
- }
- if (Canceled_) {
- OnError(nullptr, "canceled");
- }
- }
- void THttpRequest::OnSystemError(THttpConn* c, const TErrorCode& ec) {
- DBGOUT("THttpRequest::OnSystemError()");
- NotifyError(ec.Text(), TError::TType::UnknownType, 0, ec.Value());
- Finalize(c);
- }
- void THttpRequest::OnError(THttpConn* c, const TString& errorText) {
- DBGOUT("THttpRequest::OnError()");
- NotifyError(errorText);
- Finalize(c);
- }
- bool THttpRequest::RequestSendedCompletely() noexcept {
- if (RequestSendedCompletely_) {
- return true;
- }
- THttpConnRef c(GetConn());
- return !!c ? c->RequestSendedCompletely() : false;
- }
- void THttpRequest::Cancel() noexcept {
- if (!Canceled_) {
- Canceled_ = true;
- try {
- static const TString canceled("Canceled");
- NotifyError(canceled, TError::Cancelled);
- Finalize();
- } catch (...) {
- }
- }
- }
- inline void FinalizeConn(THttpConnRef& c, THttpConn* skipConn) noexcept {
- if (!!c && c.Get() != skipConn) {
- c->DetachRequest();
- c->Close();
- }
- }
- void THttpRequest::Finalize(THttpConn* skipConn) noexcept {
- THttpConnRef c1(ReleaseConn());
- FinalizeConn(c1, skipConn);
- THttpConnRef c2(ReleaseConn2());
- FinalizeConn(c2, skipConn);
- }
- /////////////////////////////////// server side ////////////////////////////////////
- TAtomicCounter* HttpInConnCounter() {
- return Singleton<TAtomicCounter>();
- }
- TFdLimits* HttpInConnLimits() {
- return Singleton<TFdLimits>();
- }
- class THttpServer: public IRequester {
- typedef TAutoPtr<TTcpAcceptor> TTcpAcceptorPtr;
- typedef TAtomicSharedPtr<TTcpSocket> TTcpSocketRef;
- class TConn;
- typedef TSharedPtrB<TConn> TConnRef;
- class TRequest: public IHttpRequest {
- public:
- TRequest(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser>& p)
- : C_(c)
- , P_(p)
- , RemoteHost_(C_->RemoteHost())
- , CompressionScheme_(P_->GetBestCompressionScheme())
- , H_(TStringBuf(P_->FirstLine()))
- {
- }
- ~TRequest() override {
- if (!!C_) {
- try {
- C_->SendError(Id(), 503, "service unavailable (request ignored)", P_->HttpVersion(), {});
- } catch (...) {
- DBGOUT("~TRequest()::SendFail() exception");
- }
- }
- }
- TAtomicBase Id() const {
- return Id_;
- }
- protected:
- TStringBuf Scheme() const override {
- return TStringBuf("http");
- }
- TString RemoteHost() const override {
- return RemoteHost_;
- }
- TStringBuf Service() const override {
- return TStringBuf(H_.Path).Skip(1);
- }
- const THttpHeaders& Headers() const override {
- return P_->Headers();
- }
- TStringBuf Method() const override {
- return H_.Method;
- }
- TStringBuf Body() const override {
- return P_->DecodedContent();
- }
- TStringBuf Cgi() const override {
- return H_.Cgi;
- }
- TStringBuf RequestId() const override {
- return TStringBuf();
- }
- bool Canceled() const override {
- if (!C_) {
- return false;
- }
- return C_->IsCanceled();
- }
- void SendReply(TData& data) override {
- SendReply(data, TString(), HttpCodes::HTTP_OK);
- }
- void SendReply(TData& data, const TString& headers, int httpCode) override {
- if (!!C_) {
- C_->Send(Id(), data, CompressionScheme_, P_->HttpVersion(), headers, httpCode);
- C_.Reset();
- }
- }
- void SendError(TResponseError err, const THttpErrorDetails& details) override {
- static const unsigned errorToHttpCode[IRequest::MaxResponseError] =
- {
- 400,
- 403,
- 404,
- 429,
- 500,
- 501,
- 502,
- 503,
- 509};
- if (!!C_) {
- C_->SendError(Id(), errorToHttpCode[err], details.Details, P_->HttpVersion(), details.Headers);
- C_.Reset();
- }
- }
- static TAtomicBase NextId() {
- static TAtomic idGenerator = 0;
- TAtomicBase id = 0;
- do {
- id = AtomicIncrement(idGenerator);
- } while (!id);
- return id;
- }
- TSharedPtrB<TConn> C_;
- TAutoPtr<THttpParser> P_;
- TString RemoteHost_;
- TString CompressionScheme_;
- TParsedHttpFull H_;
- TAtomicBase Id_ = NextId();
- };
- class TRequestGet: public TRequest {
- public:
- TRequestGet(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p)
- : TRequest(c, p)
- {
- }
- TStringBuf Data() const override {
- return H_.Cgi;
- }
- };
- class TRequestPost: public TRequest {
- public:
- TRequestPost(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p)
- : TRequest(c, p)
- {
- }
- TStringBuf Data() const override {
- return P_->DecodedContent();
- }
- };
- class TConn {
- private:
- TConn(THttpServer& hs, const TTcpSocketRef& s)
- : HS_(hs)
- , AS_(s)
- , RemoteHost_(NNeh::PrintHostByRfc(*AS_->RemoteEndpoint().Addr()))
- , BuffSize_(THttp2Options::InputBufferSize)
- , Buff_(new char[BuffSize_])
- , Canceled_(false)
- , LeftRequestsToDisconnect_(hs.LimitRequestsPerConnection)
- {
- DBGOUT("THttpServer::TConn()");
- HS_.OnCreateConn();
- }
- inline TConnRef SelfRef() noexcept {
- return WeakThis_;
- }
- public:
- static void Create(THttpServer& hs, const TTcpSocketRef& s) {
- TSharedPtrB<TConn> conn(new TConn(hs, s));
- conn->WeakThis_ = conn;
- conn->ExpectNewRequest();
- conn->AS_->AsyncPollRead(std::bind(&TConn::OnCanRead, conn, _1, _2), THttp2Options::ServerInputDeadline);
- }
- ~TConn() {
- DBGOUT("~THttpServer::TConn(" << (!AS_ ? -666 : AS_->Native()) << ")");
- HS_.OnDestroyConn();
- }
- private:
- void ExpectNewRequest() {
- P_.Reset(new THttpParser(THttpParser::Request));
- P_->Prepare();
- }
- void OnCanRead(const TErrorCode& ec, IHandlingContext& ctx) {
- if (ec) {
- OnError();
- } else {
- TErrorCode ec2;
- OnReadSome(ec2, AS_->ReadSome(Buff_.Get(), BuffSize_, ec2), ctx);
- }
- }
- void OnError() {
- DBGOUT("Srv OnError(" << (!AS_ ? -666 : AS_->Native()) << ")");
- Canceled_ = true;
- AS_->AsyncCancel();
- }
- void OnReadSome(const TErrorCode& ec, size_t amount, IHandlingContext& ctx) {
- if (ec || !amount) {
- OnError();
- return;
- }
- DBGOUT("ReadSome(" << (!AS_ ? -666 : AS_->Native()) << "): " << amount);
- try {
- size_t buffPos = 0;
- //DBGOUT("receive and parse: " << TStringBuf(Buff_.Get(), amount));
- while (P_->Parse(Buff_.Get() + buffPos, amount - buffPos)) {
- if (!P_->IsKeepAlive() || LeftRequestsToDisconnect_ == 1) {
- SeenMessageWithoutKeepalive_ = true;
- }
- char rt = *P_->FirstLine().data();
- const size_t extraDataSize = P_->GetExtraDataSize();
- if (rt == 'P' || rt == 'p') {
- OnRequest(new TRequestPost(WeakThis_, P_));
- } else {
- OnRequest(new TRequestGet(WeakThis_, P_));
- }
- if (extraDataSize) {
- // has http pipelining
- buffPos = amount - extraDataSize;
- ExpectNewRequest();
- } else {
- ExpectNewRequest();
- ctx.ContinueUseHandler(HS_.GetKeepAliveTimeout());
- return;
- }
- }
- ctx.ContinueUseHandler(THttp2Options::ServerInputDeadline);
- } catch (...) {
- OnError();
- }
- }
- void OnRequest(TRequest* r) {
- DBGOUT("OnRequest()");
- if (AtomicGet(PrimaryResponse_)) {
- // has pipelining
- PipelineOrder_.Enqueue(r->Id());
- } else {
- AtomicSet(PrimaryResponse_, r->Id());
- }
- HS_.OnRequest(r);
- OnRequestDone();
- }
- void OnRequestDone() {
- DBGOUT("OnRequestDone()");
- if (LeftRequestsToDisconnect_ > 0) {
- --LeftRequestsToDisconnect_;
- }
- }
- static void PrintHttpVersion(IOutputStream& out, const THttpVersion& ver) {
- out << TStringBuf("HTTP/") << ver.Major << TStringBuf(".") << ver.Minor;
- }
- struct TResponseData : TThrRefBase {
- TResponseData(size_t reqId, TTcpSocket::TSendedData data)
- : RequestId_(reqId)
- , Data_(data)
- {
- }
- size_t RequestId_;
- TTcpSocket::TSendedData Data_;
- };
- typedef TIntrusivePtr<TResponseData> TResponseDataRef;
- public:
- //called non thread-safe (from outside thread)
- void Send(TAtomicBase requestId, TData& data, const TString& compressionScheme, const THttpVersion& ver, const TString& headers, int httpCode) {
- class THttpResponseFormatter {
- public:
- THttpResponseFormatter(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection) {
- Header.Reserve(128 + contentEncoding.size() + theHeaders.size());
- PrintHttpVersion(Header, theVer);
- Header << TStringBuf(" ") << theHttpCode << ' ' << HttpCodeStr(theHttpCode);
- if (Compress(theData, contentEncoding)) {
- Header << TStringBuf("\r\nContent-Encoding: ") << contentEncoding;
- }
- Header << TStringBuf("\r\nContent-Length: ") << theData.size();
- if (closeConnection) {
- Header << TStringBuf("\r\nConnection: close");
- } else if (Y_LIKELY(theVer.Major > 1 || theVer.Minor > 0)) {
- // since HTTP/1.1 Keep-Alive is default behaviour
- Header << TStringBuf("\r\nConnection: Keep-Alive");
- }
- if (theHeaders) {
- Header << theHeaders;
- }
- Header << TStringBuf("\r\n\r\n");
- Body.swap(theData);
- Parts[0].buf = Header.Data();
- Parts[0].len = Header.Size();
- Parts[1].buf = Body.data();
- Parts[1].len = Body.size();
- }
- TStringStream Header;
- TData Body;
- IOutputStream::TPart Parts[2];
- };
- class TBuffers: public THttpResponseFormatter, public TTcpSocket::IBuffers {
- public:
- TBuffers(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection)
- : THttpResponseFormatter(theData, contentEncoding, theVer, theHeaders, theHttpCode, closeConnection)
- , IOVec(Parts, 2)
- {
- }
- TContIOVector* GetIOvec() override {
- return &IOVec;
- }
- TContIOVector IOVec;
- };
- TTcpSocket::TSendedData sd(new TBuffers(data, compressionScheme, ver, headers, httpCode, SeenMessageWithoutKeepalive_));
- SendData(requestId, sd);
- }
- //called non thread-safe (from outside thread)
- void SendError(TAtomicBase requestId, unsigned httpCode, const TString& descr, const THttpVersion& ver, const TString& headers) {
- if (Canceled_) {
- return;
- }
- class THttpErrorResponseFormatter {
- public:
- THttpErrorResponseFormatter(unsigned theHttpCode, const TString& theDescr, const THttpVersion& theVer, bool closeConnection, const TString& headers) {
- PrintHttpVersion(Answer, theVer);
- Answer << TStringBuf(" ") << theHttpCode << TStringBuf(" ");
- if (theDescr.size() && !THttp2Options::ErrorDetailsAsResponseBody) {
- // Reason-Phrase = *<TEXT, excluding CR, LF>
- // replace bad chars to '.'
- TString reasonPhrase = theDescr;
- for (TString::iterator it = reasonPhrase.begin(); it != reasonPhrase.end(); ++it) {
- char& ch = *it;
- if (ch == ' ') {
- continue;
- }
- if (((ch & 31) == ch) || static_cast<unsigned>(ch) == 127 || (static_cast<unsigned>(ch) & 0x80)) {
- //CTLs || DEL(127) || non ascii
- // (ch <= 32) || (ch >= 127)
- ch = '.';
- }
- }
- Answer << reasonPhrase;
- } else {
- Answer << HttpCodeStr(static_cast<int>(theHttpCode));
- }
- if (closeConnection) {
- Answer << TStringBuf("\r\nConnection: close");
- }
- if (headers) {
- Answer << "\r\n" << headers;
- }
- if (THttp2Options::ErrorDetailsAsResponseBody) {
- Answer << TStringBuf("\r\nContent-Length:") << theDescr.size() << "\r\n\r\n" << theDescr;
- } else {
- Answer << "\r\n"
- "Content-Length:0\r\n\r\n"sv;
- }
- Parts[0].buf = Answer.Data();
- Parts[0].len = Answer.Size();
- }
- TStringStream Answer;
- IOutputStream::TPart Parts[1];
- };
- class TBuffers: public THttpErrorResponseFormatter, public TTcpSocket::IBuffers {
- public:
- TBuffers(
- unsigned theHttpCode,
- const TString& theDescr,
- const THttpVersion& theVer,
- bool closeConnection,
- const TString& headers
- )
- : THttpErrorResponseFormatter(theHttpCode, theDescr, theVer, closeConnection, headers)
- , IOVec(Parts, 1)
- {
- }
- TContIOVector* GetIOvec() override {
- return &IOVec;
- }
- TContIOVector IOVec;
- };
- TTcpSocket::TSendedData sd(new TBuffers(httpCode, descr, ver, SeenMessageWithoutKeepalive_, headers));
- SendData(requestId, sd);
- }
- void ProcessPipeline() {
- // on successfull response to current (PrimaryResponse_) request
- TAtomicBase requestId;
- if (PipelineOrder_.Dequeue(&requestId)) {
- TAtomicBase oldReqId;
- do {
- oldReqId = AtomicGet(PrimaryResponse_);
- Y_ABORT_UNLESS(oldReqId, "race inside http pipelining");
- } while (!AtomicCas(&PrimaryResponse_, requestId, oldReqId));
- ProcessResponsesData();
- } else {
- TAtomicBase oldReqId = AtomicGet(PrimaryResponse_);
- if (oldReqId) {
- while (!AtomicCas(&PrimaryResponse_, 0, oldReqId)) {
- Y_ABORT_UNLESS(oldReqId == AtomicGet(PrimaryResponse_), "race inside http pipelining [2]");
- }
- }
- }
- }
- void ProcessResponsesData() {
- // process responses data queue, send response (if already have next PrimaryResponse_)
- TResponseDataRef rd;
- while (ResponsesDataQueue_.Dequeue(&rd)) {
- ResponsesData_[rd->RequestId_] = rd;
- }
- TAtomicBase requestId = AtomicGet(PrimaryResponse_);
- if (requestId) {
- THashMap<TAtomicBase, TResponseDataRef>::iterator it = ResponsesData_.find(requestId);
- if (it != ResponsesData_.end()) {
- // has next primary response
- rd = it->second;
- ResponsesData_.erase(it);
- AS_->AsyncWrite(rd->Data_, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline);
- }
- }
- }
- private:
- void SendData(TAtomicBase requestId, TTcpSocket::TSendedData sd) {
- TContIOVector& vec = *sd->GetIOvec();
- if (requestId != AtomicGet(PrimaryResponse_)) {
- // already has another request for response first, so push this to queue
- // + enqueue event for safe checking queue (at local/transport thread)
- TResponseDataRef rdr = new TResponseData(requestId, sd);
- ResponsesDataQueue_.Enqueue(rdr);
- AS_->GetIOService().Post(std::bind(&TConn::ProcessResponsesData, SelfRef()));
- return;
- }
- if (THttp2Options::ServerUseDirectWrite) {
- vec.Proceed(AS_->WriteSome(vec));
- }
- if (!vec.Complete()) {
- DBGOUT("AsyncWrite()");
- AS_->AsyncWrite(sd, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline);
- } else {
- // run ProcessPipeline at safe thread
- AS_->GetIOService().Post(std::bind(&TConn::ProcessPipeline, SelfRef()));
- }
- }
- void OnSend(const TErrorCode& ec, size_t amount, IHandlingContext&) {
- Y_UNUSED(amount);
- if (ec) {
- OnError();
- } else {
- ProcessPipeline();
- }
- if (SeenMessageWithoutKeepalive_) {
- TErrorCode shutdown_ec;
- AS_->Shutdown(TTcpSocket::ShutdownBoth, shutdown_ec);
- }
- }
- public:
- bool IsCanceled() const noexcept {
- return Canceled_;
- }
- const TString& RemoteHost() const noexcept {
- return RemoteHost_;
- }
- private:
- TWeakPtrB<TConn> WeakThis_;
- THttpServer& HS_;
- TTcpSocketRef AS_;
- TString RemoteHost_;
- size_t BuffSize_;
- TArrayHolder<char> Buff_;
- TAutoPtr<THttpParser> P_;
- // pipeline supporting
- TAtomic PrimaryResponse_ = 0;
- TLockFreeQueue<TAtomicBase> PipelineOrder_;
- TLockFreeQueue<TResponseDataRef> ResponsesDataQueue_;
- THashMap<TAtomicBase, TResponseDataRef> ResponsesData_;
- TAtomicBool Canceled_;
- TAtomicBool SeenMessageWithoutKeepalive_ = false;
- i32 LeftRequestsToDisconnect_ = -1;
- };
- ///////////////////////////////////////////////////////////
- public:
- THttpServer(IOnRequest* cb, const TParsedLocation& loc)
- : E_(THttp2Options::AsioServerThreads)
- , CB_(cb)
- , LimitRequestsPerConnection(THttp2Options::LimitRequestsPerConnection)
- {
- TNetworkAddress addr = THttp2Options::RespectHostInHttpServerNetworkAddress ?
- TNetworkAddress(TString(loc.Host), loc.GetPort())
- : TNetworkAddress(loc.GetPort());
- for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) {
- TEndpoint ep(new NAddr::TAddrInfo(&*it));
- TTcpAcceptorPtr a(new TTcpAcceptor(AcceptExecutor_.GetIOService()));
- DBGOUT("bind:" << ep.IpToString() << ":" << ep.Port());
- a->Bind(ep);
- a->Listen(THttp2Options::Backlog);
- StartAccept(a.Get());
- A_.push_back(a);
- }
- }
- ~THttpServer() override {
- AcceptExecutor_.SyncShutdown(); //cancel operation for all current sockets (include acceptors)
- A_.clear(); //stop listening
- E_.SyncShutdown();
- }
- void OnAccept(TTcpAcceptor* a, TAtomicSharedPtr<TTcpSocket> s, const TErrorCode& ec, IHandlingContext&) {
- if (Y_UNLIKELY(ec)) {
- if (ec.Value() == ECANCELED) {
- return;
- } else if (ec.Value() == EMFILE || ec.Value() == ENFILE || ec.Value() == ENOMEM || ec.Value() == ENOBUFS) {
- //reach some os limit, suspend accepting
- TAtomicSharedPtr<TDeadlineTimer> dt(new TDeadlineTimer(a->GetIOService()));
- dt->AsyncWaitExpireAt(TDuration::Seconds(30), std::bind(&THttpServer::OnTimeoutSuspendAccept, this, a, dt, _1, _2));
- return;
- } else {
- Cdbg << "acc: " << ec.Text() << Endl;
- }
- } else {
- if (static_cast<size_t>(HttpInConnCounter()->Val()) < HttpInConnLimits()->Hard()) {
- try {
- SetNonBlock(s->Native());
- PrepareSocket(s->Native());
- TConn::Create(*this, s);
- } catch (TSystemError& err) {
- TErrorCode ec2(err.Status());
- Cdbg << "acc: " << ec2.Text() << Endl;
- }
- } //else accepted socket will be closed
- }
- StartAccept(a); //continue accepting
- }
- void OnTimeoutSuspendAccept(TTcpAcceptor* a, TAtomicSharedPtr<TDeadlineTimer>, const TErrorCode& ec, IHandlingContext&) {
- if (!ec) {
- DBGOUT("resume acceptor")
- StartAccept(a);
- }
- }
- void OnRequest(IRequest* r) {
- try {
- CB_->OnRequest(r);
- } catch (...) {
- Cdbg << CurrentExceptionMessage() << Endl;
- }
- }
- protected:
- void OnCreateConn() noexcept {
- HttpInConnCounter()->Inc();
- }
- void OnDestroyConn() noexcept {
- HttpInConnCounter()->Dec();
- }
- TDuration GetKeepAliveTimeout() const noexcept {
- size_t cc = HttpInConnCounter()->Val();
- TFdLimits lim(*HttpInConnLimits());
- if (!TFdLimits::ExceedLimit(cc, lim.Soft())) {
- return THttp2Options::ServerInputDeadlineKeepAliveMax;
- }
- if (cc > lim.Hard()) {
- cc = lim.Hard();
- }
- TDuration::TValue softTuneRange = THttp2Options::ServerInputDeadlineKeepAliveMax.Seconds() - THttp2Options::ServerInputDeadlineKeepAliveMin.Seconds();
- return TDuration::Seconds((softTuneRange * (cc - lim.Soft())) / (lim.Hard() - lim.Soft() + 1)) + THttp2Options::ServerInputDeadlineKeepAliveMin;
- }
- private:
- void StartAccept(TTcpAcceptor* a) {
- TAtomicSharedPtr<TTcpSocket> s(new TTcpSocket(E_.Size() ? E_.GetExecutor().GetIOService() : AcceptExecutor_.GetIOService()));
- a->AsyncAccept(*s, std::bind(&THttpServer::OnAccept, this, a, s, _1, _2));
- }
- TIOServiceExecutor AcceptExecutor_;
- TVector<TTcpAcceptorPtr> A_;
- TExecutorsPool E_;
- IOnRequest* CB_;
- public:
- const i32 LimitRequestsPerConnection;
- };
- template <class T>
- class THttp2Protocol: public IProtocol {
- public:
- IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
- return new THttpServer(cb, loc);
- }
- THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
- THttpRequest::THandleRef ret(new THttpRequest::THandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss)));
- try {
- THttpRequest::Run(ret, msg, &T::Build, T::RequestSettings());
- } catch (...) {
- ret->ResetOnRecv();
- throw;
- }
- return ret.Get();
- }
- TStringBuf Scheme() const noexcept override {
- return T::Name();
- }
- bool SetOption(TStringBuf name, TStringBuf value) override {
- return THttp2Options::Set(name, value);
- }
- };
- }
- namespace NNeh {
- IProtocol* Http1Protocol() {
- return Singleton<THttp2Protocol<TRequestGet1>>();
- }
- IProtocol* Post1Protocol() {
- return Singleton<THttp2Protocol<TRequestPost1>>();
- }
- IProtocol* Full1Protocol() {
- return Singleton<THttp2Protocol<TRequestFull1>>();
- }
- IProtocol* Http2Protocol() {
- return Singleton<THttp2Protocol<TRequestGet2>>();
- }
- IProtocol* Post2Protocol() {
- return Singleton<THttp2Protocol<TRequestPost2>>();
- }
- IProtocol* Full2Protocol() {
- return Singleton<THttp2Protocol<TRequestFull2>>();
- }
- IProtocol* UnixSocketGetProtocol() {
- return Singleton<THttp2Protocol<TRequestUnixSocketGet>>();
- }
- IProtocol* UnixSocketPostProtocol() {
- return Singleton<THttp2Protocol<TRequestUnixSocketPost>>();
- }
- IProtocol* UnixSocketFullProtocol() {
- return Singleton<THttp2Protocol<TRequestUnixSocketFull>>();
- }
- void SetHttp2OutputConnectionsLimits(size_t softLimit, size_t hardLimit) {
- HttpConnManager()->SetLimits(softLimit, hardLimit);
- }
- void SetHttp2InputConnectionsLimits(size_t softLimit, size_t hardLimit) {
- HttpInConnLimits()->SetSoft(softLimit);
- HttpInConnLimits()->SetHard(hardLimit);
- }
- TAtomicBase GetHttpOutputConnectionCount() {
- return HttpOutConnCounter()->Val();
- }
- std::pair<size_t, size_t> GetHttpOutputConnectionLimits() {
- return HttpConnManager()->GetLimits();
- }
- TAtomicBase GetHttpInputConnectionCount() {
- return HttpInConnCounter()->Val();
- }
- void SetHttp2InputConnectionsTimeouts(unsigned minSeconds, unsigned maxSeconds) {
- THttp2Options::ServerInputDeadlineKeepAliveMin = TDuration::Seconds(minSeconds);
- THttp2Options::ServerInputDeadlineKeepAliveMax = TDuration::Seconds(maxSeconds);
- }
- class TUnixSocketResolver {
- public:
- NDns::TResolvedHost* Resolve(const TString& path) {
- TString unixSocketPath = path;
- if (path.size() > 2 && path[0] == '[' && path[path.size() - 1] == ']') {
- unixSocketPath = path.substr(1, path.size() - 2);
- }
- if (auto resolvedUnixSocket = ResolvedUnixSockets_.FindPtr(unixSocketPath)) {
- return resolvedUnixSocket->Get();
- }
- TNetworkAddress na{TUnixSocketPath(unixSocketPath)};
- ResolvedUnixSockets_[unixSocketPath] = MakeHolder<NDns::TResolvedHost>(unixSocketPath, na);
- return ResolvedUnixSockets_[unixSocketPath].Get();
- }
- private:
- THashMap<TString, THolder<NDns::TResolvedHost>> ResolvedUnixSockets_;
- };
- TUnixSocketResolver* UnixSocketResolver() {
- return FastTlsSingleton<TUnixSocketResolver>();
- }
- const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType) {
- if (resolverType == EResolverType::EUNIXSOCKET) {
- return UnixSocketResolver()->Resolve(TString(host));
- }
- return NDns::CachedResolve(NDns::TResolveInfo(host, port));
- }
- }
|