1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354 |
- #include "stdafx.h"
- #include "udp_http.h"
- #include "udp_client_server.h"
- #include "udp_socket.h"
- #include "cpu_affinity.h"
- #include <library/cpp/threading/atomic/bool.h>
- #include <util/system/hp_timer.h>
- #include <util/thread/lfqueue.h>
- #include <util/system/thread.h>
- #include <util/system/spinlock.h>
- #if !defined(_win_)
- #include <signal.h>
- #include <pthread.h>
- #endif
- #include "block_chain.h"
- #include <util/system/shmat.h>
- #include <atomic>
- namespace NNetliba {
- const float HTTP_TIMEOUT = 15.0f;
- const int MIN_SHARED_MEM_PACKET = 1000;
- static ::NAtomic::TBool PanicAttack;
- static std::atomic<NHPTimer::STime> LastHeartbeat;
- static std::atomic<double> HeartbeatTimeout;
- static int GetPacketSize(TRequest* req) {
- if (req && req->Data.Get())
- return req->Data->GetSize();
- return 0;
- }
- static bool IsLocalFast(const TUdpAddress& addr) {
- if (addr.IsIPv4()) {
- return IsLocalIPv4(addr.GetIPv4());
- } else {
- return IsLocalIPv6(addr.Network, addr.Interface);
- }
- }
- bool IsLocal(const TUdpAddress& addr) {
- InitLocalIPList();
- return IsLocalFast(addr);
- }
- TUdpHttpRequest::~TUdpHttpRequest() {
- }
- TUdpHttpResponse::~TUdpHttpResponse() {
- }
- class TRequesterUserQueueSizes: public TThrRefBase {
- public:
- TAtomic ReqCount, RespCount;
- TAtomic ReqQueueSize, RespQueueSize;
- TRequesterUserQueueSizes()
- : ReqCount(0)
- , RespCount(0)
- , ReqQueueSize(0)
- , RespQueueSize(0)
- {
- }
- };
- template <class T>
- void EraseList(TLockFreeQueue<T*>* data) {
- T* ptr = nullptr;
- while (data->Dequeue(&ptr)) {
- delete ptr;
- }
- }
- class TRequesterUserQueues: public TThrRefBase {
- TIntrusivePtr<TRequesterUserQueueSizes> QueueSizes;
- TLockFreeQueue<TUdpHttpRequest*> ReqList;
- TLockFreeQueue<TUdpHttpResponse*> ResponseList;
- TLockFreeStack<TGUID> CancelList, SendRequestAccList; // any order will do
- TMuxEvent AsyncEvent;
- void UpdateAsyncSignalState() {
- // not sure about this one. Idea is that AsyncEvent.Reset() is a memory barrier
- if (ReqList.IsEmpty() && ResponseList.IsEmpty() && CancelList.IsEmpty() && SendRequestAccList.IsEmpty()) {
- AsyncEvent.Reset();
- if (!ReqList.IsEmpty() || !ResponseList.IsEmpty() || !CancelList.IsEmpty() || !SendRequestAccList.IsEmpty())
- AsyncEvent.Signal();
- }
- }
- ~TRequesterUserQueues() override {
- EraseList(&ReqList);
- EraseList(&ResponseList);
- }
- public:
- TRequesterUserQueues(TRequesterUserQueueSizes* queueSizes)
- : QueueSizes(queueSizes)
- {
- }
- TUdpHttpRequest* GetRequest();
- TUdpHttpResponse* GetResponse();
- bool GetRequestCancel(TGUID* req) {
- bool res = CancelList.Dequeue(req);
- UpdateAsyncSignalState();
- return res;
- }
- bool GetSendRequestAcc(TGUID* req) {
- bool res = SendRequestAccList.Dequeue(req);
- UpdateAsyncSignalState();
- return res;
- }
- void AddRequest(TUdpHttpRequest* res) {
- AtomicAdd(QueueSizes->ReqCount, 1);
- AtomicAdd(QueueSizes->ReqQueueSize, GetPacketSize(res->DataHolder.Get()));
- ReqList.Enqueue(res);
- AsyncEvent.Signal();
- }
- void AddResponse(TUdpHttpResponse* res) {
- AtomicAdd(QueueSizes->RespCount, 1);
- AtomicAdd(QueueSizes->RespQueueSize, GetPacketSize(res->DataHolder.Get()));
- ResponseList.Enqueue(res);
- AsyncEvent.Signal();
- }
- void AddCancel(const TGUID& req) {
- CancelList.Enqueue(req);
- AsyncEvent.Signal();
- }
- void AddSendRequestAcc(const TGUID& req) {
- SendRequestAccList.Enqueue(req);
- AsyncEvent.Signal();
- }
- TMuxEvent& GetAsyncEvent() {
- return AsyncEvent;
- }
- void AsyncSignal() {
- AsyncEvent.Signal();
- }
- };
- struct TOutRequestState {
- enum EState {
- S_SENDING,
- S_WAITING,
- S_WAITING_PING_SENDING,
- S_WAITING_PING_SENT,
- S_CANCEL_AFTER_SENDING
- };
- EState State;
- TUdpAddress Address;
- double TimePassed;
- int PingTransferId;
- TIntrusivePtr<TRequesterUserQueues> UserQueues;
- TOutRequestState()
- : State(S_SENDING)
- , TimePassed(0)
- , PingTransferId(-1)
- {
- }
- };
- struct TInRequestState {
- enum EState {
- S_WAITING,
- S_RESPONSE_SENDING,
- S_CANCELED,
- };
- EState State;
- TUdpAddress Address;
- TInRequestState()
- : State(S_WAITING)
- {
- }
- TInRequestState(const TUdpAddress& address)
- : State(S_WAITING)
- , Address(address)
- {
- }
- };
- enum EHttpPacket {
- PKT_REQUEST,
- PKT_PING,
- PKT_PING_RESPONSE,
- PKT_RESPONSE,
- PKT_GETDEBUGINFO,
- PKT_LOCAL_REQUEST,
- PKT_LOCAL_RESPONSE,
- PKT_CANCEL,
- };
- class TUdpHttp: public IRequester {
- enum EDir {
- DIR_OUT,
- DIR_IN
- };
- struct TTransferPurpose {
- EDir Dir;
- TGUID Guid;
- TTransferPurpose()
- : Dir(DIR_OUT)
- {
- }
- TTransferPurpose(EDir dir, TGUID guid)
- : Dir(dir)
- , Guid(guid)
- {
- }
- };
- struct TSendRequest {
- TUdpAddress Addr;
- TAutoPtr<TRopeDataPacket> Data;
- TGUID ReqGuid;
- TIntrusivePtr<TWaitResponse> WR;
- TIntrusivePtr<TRequesterUserQueues> UserQueues;
- ui32 Crc32;
- TSendRequest()
- : Crc32(0)
- {
- }
- TSendRequest(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket>* data, const TGUID& reqguid, TWaitResponse* wr, TRequesterUserQueues* userQueues)
- : Addr(addr)
- , Data(*data)
- , ReqGuid(reqguid)
- , WR(wr)
- , UserQueues(userQueues)
- , Crc32(CalcChecksum(Data->GetChain()))
- {
- }
- };
- struct TSendResponse {
- TVector<char> Data;
- TGUID ReqGuid;
- ui32 DataCrc32;
- EPacketPriority Priority;
- TSendResponse()
- : DataCrc32(0)
- , Priority(PP_NORMAL)
- {
- }
- TSendResponse(const TGUID& reqguid, EPacketPriority prior, TVector<char>* data)
- : ReqGuid(reqguid)
- , DataCrc32(0)
- , Priority(prior)
- {
- if (data && !data->empty()) {
- data->swap(Data);
- DataCrc32 = TIncrementalChecksumCalcer::CalcBlockSum(&Data[0], Data.ysize());
- }
- }
- };
- struct TCancelRequest {
- TGUID ReqGuid;
- TCancelRequest() = default;
- TCancelRequest(const TGUID& reqguid)
- : ReqGuid(reqguid)
- {
- }
- };
- struct TBreakRequest {
- TGUID ReqGuid;
- TBreakRequest() = default;
- TBreakRequest(const TGUID& reqguid)
- : ReqGuid(reqguid)
- {
- }
- };
- TThread myThread;
- bool KeepRunning, AbortTransactions;
- TSpinLock cs;
- TSystemEvent HasStarted;
- NHPTimer::STime PingsSendT;
- TIntrusivePtr<IUdpHost> Host;
- TIntrusivePtr<NNetlibaSocket::ISocket> Socket;
- typedef THashMap<TGUID, TOutRequestState, TGUIDHash> TOutRequestHash;
- typedef THashMap<TGUID, TInRequestState, TGUIDHash> TInRequestHash;
- TOutRequestHash OutRequests;
- TInRequestHash InRequests;
- typedef THashMap<int, TTransferPurpose> TTransferHash;
- TTransferHash TransferHash;
- typedef THashMap<TGUID, TIntrusivePtr<TWaitResponse>, TGUIDHash> TSyncRequests;
- TSyncRequests SyncRequests;
- // hold it here to not construct on every DoSends()
- typedef THashSet<TGUID, TGUIDHash> TAnticipateCancels;
- TAnticipateCancels AnticipateCancels;
- TLockFreeQueue<TSendRequest*> SendReqList;
- TLockFreeQueue<TSendResponse*> SendRespList;
- TLockFreeQueue<TCancelRequest> CancelReqList;
- TLockFreeQueue<TBreakRequest> BreakReqList;
- TIntrusivePtr<TRequesterUserQueueSizes> QueueSizes;
- TIntrusivePtr<TRequesterUserQueues> UserQueues;
- struct TStatsRequest: public TThrRefBase {
- enum EReq {
- PENDING_SIZE,
- DEBUG_INFO,
- HAS_IN_REQUEST,
- GET_PEER_ADDRESS,
- GET_PEER_QUEUE_STATS,
- };
- EReq Req;
- TRequesterPendingDataStats PendingDataSize;
- TString DebugInfo;
- TGUID RequestId;
- TUdpAddress PeerAddress;
- TIntrusivePtr<IPeerQueueStats> QueueStats;
- bool RequestFound;
- TSystemEvent Complete;
- TStatsRequest(EReq req)
- : Req(req)
- , RequestFound(false)
- {
- }
- };
- TLockFreeQueue<TIntrusivePtr<TStatsRequest>> StatsReqList;
- bool ReportRequestCancel;
- bool ReportSendRequestAcc;
- void FinishRequest(TOutRequestHash::iterator i, TUdpHttpResponse::EResult ok, TAutoPtr<TRequest> data, const char* error = nullptr) {
- TOutRequestState& s = i->second;
- TUdpHttpResponse* res = new TUdpHttpResponse;
- res->DataHolder = data;
- res->ReqId = i->first;
- res->PeerAddress = s.Address;
- res->Ok = ok;
- if (ok == TUdpHttpResponse::FAILED)
- res->Error = error ? error : "request failed";
- else if (ok == TUdpHttpResponse::CANCELED)
- res->Error = error ? error : "request cancelled";
- TSyncRequests::iterator k = SyncRequests.find(res->ReqId);
- if (k != SyncRequests.end()) {
- TIntrusivePtr<TWaitResponse>& wr = k->second;
- wr->SetResponse(res);
- SyncRequests.erase(k);
- } else {
- s.UserQueues->AddResponse(res);
- }
- OutRequests.erase(i);
- }
- int SendWithHighPriority(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data) {
- ui32 crc32 = CalcChecksum(data->GetChain());
- return Host->Send(addr, data.Release(), crc32, nullptr, PP_HIGH);
- }
- void ProcessIncomingPackets() {
- TVector<TGUID, TCustomAllocator<TGUID>> failedRequests;
- for (;;) {
- TAutoPtr<TRequest> req = Host->GetRequest();
- if (req.Get() == nullptr) {
- if (!failedRequests.empty()) {
- // we want to handle following sequence of events
- // <- send ping
- // -> send response over IB
- // -> send ping response (no such request) over UDP
- // Now if we are lucky enough we can get IB response waiting in the IB receive queue
- // at the same time response sender will receive "send complete" from IB
- // indeed, IB delivered message (but it was not parsed by ib_cs.cpp yet)
- // so after receiving "send response complete" event resposne sender can legally response
- // to pings with "no such request"
- // but ping responses can be sent over UDP
- // So we can run into situation with negative ping response in
- // UDP receive queue and response waiting unprocessed in IB receive queue
- // to check that there is no response in the IB queue we have to process IB queues
- // so we call IBStep()
- Host->IBStep();
- req = Host->GetRequest();
- if (req.Get() == nullptr) {
- break;
- }
- } else {
- break;
- }
- }
- TBlockChainIterator reqData(req->Data->GetChain());
- char pktType;
- reqData.Read(&pktType, 1);
- switch (pktType) {
- case PKT_REQUEST:
- case PKT_LOCAL_REQUEST: {
- //printf("recv PKT_REQUEST or PKT_LOCAL_REQUEST\n");
- TGUID reqId = req->Guid;
- TInRequestHash::iterator z = InRequests.find(reqId);
- if (z != InRequests.end()) {
- // oops, this request already exists!
- // might happen if request can be stored in single packet
- // and this packet had source IP broken during transmission and managed to pass crc checks
- // since we already reported wrong source address for this request to the user
- // the best thing we can do is to stop the program to avoid further complications
- // but we just report the accident to stderr
- fprintf(stderr, "Jackpot, same request %s received twice from %s and earlier from %s\n",
- GetGuidAsString(reqId).c_str(), GetAddressAsString(z->second.Address).c_str(),
- GetAddressAsString(req->Address).c_str());
- } else {
- InRequests[reqId] = TInRequestState(req->Address);
- //printf("InReq %s PKT_REQUEST recv ... -> S_WAITING\n", GetGuidAsString(reqId).c_str());
- TUdpHttpRequest* res = new TUdpHttpRequest;
- res->ReqId = reqId;
- res->PeerAddress = req->Address;
- res->DataHolder = req;
- UserQueues->AddRequest(res);
- }
- } break;
- case PKT_PING: {
- //printf("recv PKT_PING\n");
- TGUID guid;
- reqData.Read(&guid, sizeof(guid));
- bool ok = InRequests.find(guid) != InRequests.end();
- TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
- ms->Write((char)PKT_PING_RESPONSE);
- ms->Write(guid);
- ms->Write(ok);
- SendWithHighPriority(req->Address, ms.Release());
- //printf("InReq %s PKT_PING recv Sending PKT_PING_RESPONSE\n", GetGuidAsString(guid).c_str());
- //printf("got PKT_PING, responding %d\n", (int)ok);
- } break;
- case PKT_PING_RESPONSE: {
- //printf("recv PKT_PING_RESPONSE\n");
- TGUID guid;
- bool ok;
- reqData.Read(&guid, sizeof(guid));
- reqData.Read(&ok, sizeof(ok));
- TOutRequestHash::iterator i = OutRequests.find(guid);
- if (i == OutRequests.end()) {
- ; //Y_ASSERT(0); // actually possible with some packet orders
- } else {
- if (!ok) {
- // can not delete request at this point
- // since we can receive failed ping and response at the same moment
- // consider sequence: client sends ping, server sends response
- // and replies false to ping as reply is sent
- // we can not receive failed ping_response earlier then response itself
- // but we can receive them simultaneously
- failedRequests.push_back(guid);
- //printf("OutReq %s PKT_PING_RESPONSE recv no such query -> failed\n", GetGuidAsString(guid).c_str());
- } else {
- TOutRequestState& s = i->second;
- switch (s.State) {
- case TOutRequestState::S_WAITING_PING_SENDING: {
- Y_ASSERT(s.PingTransferId >= 0);
- TTransferHash::iterator k = TransferHash.find(s.PingTransferId);
- if (k != TransferHash.end())
- TransferHash.erase(k);
- else
- Y_ASSERT(0);
- s.PingTransferId = -1;
- s.TimePassed = 0;
- s.State = TOutRequestState::S_WAITING;
- //printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENDING -> S_WAITING\n", GetGuidAsString(guid).c_str());
- } break;
- case TOutRequestState::S_WAITING_PING_SENT:
- s.TimePassed = 0;
- s.State = TOutRequestState::S_WAITING;
- //printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENT -> S_WAITING\n", GetGuidAsString(guid).c_str());
- break;
- default:
- Y_ASSERT(0);
- break;
- }
- }
- }
- } break;
- case PKT_RESPONSE:
- case PKT_LOCAL_RESPONSE: {
- //printf("recv PKT_RESPONSE or PKT_LOCAL_RESPONSE\n");
- TGUID guid;
- reqData.Read(&guid, sizeof(guid));
- TOutRequestHash::iterator i = OutRequests.find(guid);
- if (i == OutRequests.end()) {
- ; //Y_ASSERT(0); // does happen
- //printf("OutReq %s PKT_RESPONSE recv for non-existing req\n", GetGuidAsString(guid).c_str());
- } else {
- FinishRequest(i, TUdpHttpResponse::OK, req);
- //printf("OutReq %s PKT_RESPONSE recv ... -> ok\n", GetGuidAsString(guid).c_str());
- }
- } break;
- case PKT_CANCEL: {
- //printf("recv PKT_CANCEL\n");
- TGUID guid;
- reqData.Read(&guid, sizeof(guid));
- TInRequestHash::iterator i = InRequests.find(guid);
- if (i == InRequests.end()) {
- ; //Y_ASSERT(0); // may happen
- //printf("InReq %s PKT_CANCEL recv for non-existing req\n", GetGuidAsString(guid).c_str());
- } else {
- TInRequestState& s = i->second;
- if (s.State != TInRequestState::S_CANCELED && ReportRequestCancel)
- UserQueues->AddCancel(guid);
- s.State = TInRequestState::S_CANCELED;
- //printf("InReq %s PKT_CANCEL recv\n", GetGuidAsString(guid).c_str());
- }
- } break;
- case PKT_GETDEBUGINFO: {
- //printf("recv PKT_GETDEBUGINFO\n");
- TString dbgInfo = GetDebugInfoLocked();
- TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
- ms->Write(dbgInfo.c_str(), (int)dbgInfo.size());
- SendWithHighPriority(req->Address, ms);
- } break;
- default:
- Y_ASSERT(0);
- }
- }
- // cleanup failed requests
- for (size_t k = 0; k < failedRequests.size(); ++k) {
- const TGUID& guid = failedRequests[k];
- TOutRequestHash::iterator i = OutRequests.find(guid);
- if (i != OutRequests.end())
- FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: recv no such query");
- }
- }
- void AnalyzeSendResults() {
- TSendResult res;
- while (Host->GetSendResult(&res)) {
- //printf("Send result received\n");
- TTransferHash::iterator k1 = TransferHash.find(res.TransferId);
- if (k1 != TransferHash.end()) {
- const TTransferPurpose& tp = k1->second;
- switch (tp.Dir) {
- case DIR_OUT: {
- TOutRequestHash::iterator i = OutRequests.find(tp.Guid);
- if (i != OutRequests.end()) {
- const TGUID& reqId = i->first;
- TOutRequestState& s = i->second;
- switch (s.State) {
- case TOutRequestState::S_SENDING:
- if (!res.Success) {
- FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_SENDING");
- //printf("OutReq %s AnalyzeSendResults() S_SENDING -> failed\n", GetGuidAsString(reqId).c_str());
- } else {
- if (ReportSendRequestAcc) {
- if (s.UserQueues.Get()) {
- s.UserQueues->AddSendRequestAcc(reqId);
- } else {
- // waitable request?
- TSyncRequests::iterator k2 = SyncRequests.find(reqId);
- if (k2 != SyncRequests.end()) {
- TIntrusivePtr<TWaitResponse>& wr = k2->second;
- wr->SetRequestSent();
- }
- }
- }
- s.State = TOutRequestState::S_WAITING;
- //printf("OutReq %s AnalyzeSendResults() S_SENDING -> S_WAITING\n", GetGuidAsString(reqId).c_str());
- s.TimePassed = 0;
- }
- break;
- case TOutRequestState::S_CANCEL_AFTER_SENDING:
- DoSendCancel(s.Address, reqId);
- FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request failed: state S_CANCEL_AFTER_SENDING");
- break;
- case TOutRequestState::S_WAITING:
- case TOutRequestState::S_WAITING_PING_SENT:
- Y_ASSERT(0);
- break;
- case TOutRequestState::S_WAITING_PING_SENDING:
- Y_ASSERT(s.PingTransferId >= 0 && s.PingTransferId == res.TransferId);
- if (!res.Success) {
- FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_WAITING_PING_SENDING");
- //printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> failed\n", GetGuidAsString(reqId).c_str());
- } else {
- s.PingTransferId = -1;
- s.State = TOutRequestState::S_WAITING_PING_SENT;
- //printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> S_WAITING_PING_SENT\n", GetGuidAsString(reqId).c_str());
- s.TimePassed = 0;
- }
- break;
- default:
- Y_ASSERT(0);
- break;
- }
- }
- } break;
- case DIR_IN: {
- TInRequestHash::iterator i = InRequests.find(tp.Guid);
- if (i != InRequests.end()) {
- Y_ASSERT(i->second.State == TInRequestState::S_RESPONSE_SENDING || i->second.State == TInRequestState::S_CANCELED);
- InRequests.erase(i);
- //if (res.Success)
- // printf("InReq %s AnalyzeSendResults() ... -> finished\n", GetGuidAsString(tp.Guid).c_str());
- //else
- // printf("InReq %s AnalyzeSendResults() ... -> failed response send\n", GetGuidAsString(tp.Guid).c_str());
- }
- } break;
- default:
- Y_ASSERT(0);
- break;
- }
- TransferHash.erase(k1);
- }
- }
- }
- void SendPingsIfNeeded() {
- NHPTimer::STime tChk = PingsSendT;
- float deltaT = (float)NHPTimer::GetTimePassed(&tChk);
- if (deltaT < 0.05) {
- return;
- }
- PingsSendT = tChk;
- deltaT = ClampVal(deltaT, 0.0f, HTTP_TIMEOUT / 3);
- {
- for (TOutRequestHash::iterator i = OutRequests.begin(); i != OutRequests.end();) {
- TOutRequestHash::iterator curIt = i++;
- TOutRequestState& s = curIt->second;
- const TGUID& guid = curIt->first;
- switch (s.State) {
- case TOutRequestState::S_WAITING:
- s.TimePassed += deltaT;
- if (s.TimePassed > HTTP_TIMEOUT) {
- TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
- ms->Write((char)PKT_PING);
- ms->Write(guid);
- int transId = SendWithHighPriority(s.Address, ms.Release());
- TransferHash[transId] = TTransferPurpose(DIR_OUT, guid);
- s.State = TOutRequestState::S_WAITING_PING_SENDING;
- //printf("OutReq %s SendPingsIfNeeded() S_WAITING -> S_WAITING_PING_SENDING\n", GetGuidAsString(guid).c_str());
- s.PingTransferId = transId;
- }
- break;
- case TOutRequestState::S_WAITING_PING_SENT:
- s.TimePassed += deltaT;
- if (s.TimePassed > HTTP_TIMEOUT) {
- //printf("OutReq %s SendPingsIfNeeded() S_WAITING_PING_SENT -> failed\n", GetGuidAsString(guid).c_str());
- FinishRequest(curIt, TUdpHttpResponse::FAILED, nullptr, "request failed: http timeout in state S_WAITING_PING_SENT");
- }
- break;
- default:
- break;
- }
- }
- }
- }
- void Step() {
- {
- TGuard<TSpinLock> lock(cs);
- DoSends();
- }
- Host->Step();
- for (TIntrusivePtr<TStatsRequest> req; StatsReqList.Dequeue(&req);) {
- switch (req->Req) {
- case TStatsRequest::PENDING_SIZE:
- Host->GetPendingDataSize(&req->PendingDataSize);
- break;
- case TStatsRequest::DEBUG_INFO: {
- TGuard<TSpinLock> lock(cs);
- req->DebugInfo = GetDebugInfoLocked();
- } break;
- case TStatsRequest::HAS_IN_REQUEST: {
- TGuard<TSpinLock> lock(cs);
- req->RequestFound = (InRequests.find(req->RequestId) != InRequests.end());
- } break;
- case TStatsRequest::GET_PEER_ADDRESS: {
- TGuard<TSpinLock> lock(cs);
- TInRequestHash::const_iterator i = InRequests.find(req->RequestId);
- if (i != InRequests.end()) {
- req->PeerAddress = i->second.Address;
- } else {
- TOutRequestHash::const_iterator o = OutRequests.find(req->RequestId);
- if (o != OutRequests.end()) {
- req->PeerAddress = o->second.Address;
- } else {
- req->PeerAddress = TUdpAddress();
- }
- }
- } break;
- case TStatsRequest::GET_PEER_QUEUE_STATS:
- req->QueueStats = Host->GetQueueStats(req->PeerAddress);
- break;
- default:
- Y_ASSERT(0);
- break;
- }
- req->Complete.Signal();
- }
- {
- TGuard<TSpinLock> lock(cs);
- DoSends();
- ProcessIncomingPackets();
- AnalyzeSendResults();
- SendPingsIfNeeded();
- }
- }
- void Wait() {
- Host->Wait(0.1f);
- }
- void DoSendCancel(const TUdpAddress& addr, const TGUID& req) {
- TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
- ms->Write((char)PKT_CANCEL);
- ms->Write(req);
- SendWithHighPriority(addr, ms);
- }
- void DoSends() {
- {
- TBreakRequest rb;
- while (BreakReqList.Dequeue(&rb)) {
- InRequests.erase(rb.ReqGuid);
- }
- }
- {
- // cancelling requests
- TCancelRequest rc;
- while (CancelReqList.Dequeue(&rc)) {
- TOutRequestHash::iterator i = OutRequests.find(rc.ReqGuid);
- if (i == OutRequests.end()) {
- AnticipateCancels.insert(rc.ReqGuid);
- continue; // cancelling non existing request is ok
- }
- TOutRequestState& s = i->second;
- if (s.State == TOutRequestState::S_SENDING) {
- // we are in trouble - have not sent request and we already have to cancel it, wait send
- s.State = TOutRequestState::S_CANCEL_AFTER_SENDING;
- } else {
- DoSendCancel(s.Address, rc.ReqGuid);
- FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request canceled: notify requested side");
- }
- }
- }
- {
- // sending replies
- for (TSendResponse* rd = nullptr; SendRespList.Dequeue(&rd); delete rd) {
- TInRequestHash::iterator i = InRequests.find(rd->ReqGuid);
- if (i == InRequests.end()) {
- Y_ASSERT(0);
- continue;
- }
- TInRequestState& s = i->second;
- if (s.State == TInRequestState::S_CANCELED) {
- // need not send response for the canceled request
- InRequests.erase(i);
- continue;
- }
- Y_ASSERT(s.State == TInRequestState::S_WAITING);
- s.State = TInRequestState::S_RESPONSE_SENDING;
- //printf("InReq %s SendResponse() ... -> S_RESPONSE_SENDING (pkt %s)\n", GetGuidAsString(reqId).c_str(), GetGuidAsString(lowPktGuid).c_str());
- TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
- ui32 crc32 = 0;
- int dataSize = rd->Data.ysize();
- if (rd->Data.ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(s.Address)) {
- TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
- if (shm->Create(dataSize)) {
- ms->Write((char)PKT_LOCAL_RESPONSE);
- ms->Write(rd->ReqGuid);
- memcpy(shm->GetPtr(), &rd->Data[0], dataSize);
- TVector<char> empty;
- rd->Data.swap(empty);
- ms->AttachSharedData(shm);
- crc32 = CalcChecksum(ms->GetChain());
- }
- }
- if (ms->GetSharedData() == nullptr) {
- ms->Write((char)PKT_RESPONSE);
- ms->Write(rd->ReqGuid);
- // to offload crc calcs from inner thread, crc of data[] is calced outside and passed in DataCrc32
- // this means that we are calculating crc when shared memory is used
- // it is hard to avoid since in SendResponse() we don't know if shared mem will be used (peer address is not available there)
- TIncrementalChecksumCalcer csCalcer;
- AddChain(&csCalcer, ms->GetChain());
- // here we are replicating the way WriteDestructive serializes data
- csCalcer.AddBlock(&dataSize, sizeof(dataSize));
- csCalcer.AddBlockSum(rd->DataCrc32, dataSize);
- crc32 = csCalcer.CalcChecksum();
- ms->WriteDestructive(&rd->Data);
- //ui32 chkCrc = CalcChecksum(ms->GetChain()); // can not use since its slow for large responses
- //Y_ASSERT(chkCrc == crc32);
- }
- int transId = Host->Send(s.Address, ms.Release(), crc32, nullptr, rd->Priority);
- TransferHash[transId] = TTransferPurpose(DIR_IN, rd->ReqGuid);
- }
- }
- {
- // sending requests
- for (TSendRequest* rd = nullptr; SendReqList.Dequeue(&rd); delete rd) {
- Y_ASSERT(OutRequests.find(rd->ReqGuid) == OutRequests.end());
- {
- TOutRequestState& s = OutRequests[rd->ReqGuid];
- s.State = TOutRequestState::S_SENDING;
- s.Address = rd->Addr;
- s.UserQueues = rd->UserQueues;
- //printf("OutReq %s SendRequest() ... -> S_SENDING\n", GetGuidAsString(guid).c_str());
- }
- if (rd->WR.Get())
- SyncRequests[rd->ReqGuid] = rd->WR;
- if (AnticipateCancels.find(rd->ReqGuid) != AnticipateCancels.end()) {
- FinishRequest(OutRequests.find(rd->ReqGuid), TUdpHttpResponse::CANCELED, nullptr, "request canceled before transmitting");
- } else {
- TGUID pktGuid = rd->ReqGuid; // request packet id should match request id
- int transId = Host->Send(rd->Addr, rd->Data.Release(), rd->Crc32, &pktGuid, PP_NORMAL);
- TransferHash[transId] = TTransferPurpose(DIR_OUT, rd->ReqGuid);
- }
- }
- }
- if (!AnticipateCancels.empty()) {
- AnticipateCancels.clear();
- }
- }
- public:
- void SendRequestImpl(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId,
- TWaitResponse* wr, TRequesterUserQueues* userQueues) {
- if (data && data->size() > MAX_PACKET_SIZE) {
- Y_ABORT_UNLESS(0, "data size is too large");
- }
- //printf("SendRequest(%s)\n", url.c_str());
- if (wr)
- wr->SetReqId(reqId);
- TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
- if (data && data->ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(addr)) {
- int dataSize = data->ysize();
- TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
- if (shm->Create(dataSize)) {
- ms->Write((char)PKT_LOCAL_REQUEST);
- ms->WriteStroka(url);
- memcpy(shm->GetPtr(), &(*data)[0], dataSize);
- TVector<char> empty;
- data->swap(empty);
- ms->AttachSharedData(shm);
- }
- }
- if (ms->GetSharedData() == nullptr) {
- ms->Write((char)PKT_REQUEST);
- ms->WriteStroka(url);
- ms->WriteDestructive(data);
- }
- SendReqList.Enqueue(new TSendRequest(addr, &ms, reqId, wr, userQueues));
- Host->CancelWait();
- }
- void SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId) override {
- SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get());
- }
- void CancelRequest(const TGUID& reqId) override {
- CancelReqList.Enqueue(TCancelRequest(reqId));
- Host->CancelWait();
- }
- void BreakRequest(const TGUID& reqId) override {
- BreakReqList.Enqueue(TBreakRequest(reqId));
- Host->CancelWait();
- }
- void SendResponseImpl(const TGUID& reqId, EPacketPriority prior, TVector<char>* data) // non-virtual, for direct call from TRequestOps
- {
- if (data && data->size() > MAX_PACKET_SIZE) {
- Y_ABORT_UNLESS(0, "data size is too large");
- }
- SendRespList.Enqueue(new TSendResponse(reqId, prior, data));
- Host->CancelWait();
- }
- void SendResponse(const TGUID& reqId, TVector<char>* data) override {
- SendResponseImpl(reqId, PP_NORMAL, data);
- }
- void SendResponseLowPriority(const TGUID& reqId, TVector<char>* data) override {
- SendResponseImpl(reqId, PP_LOW, data);
- }
- TUdpHttpRequest* GetRequest() override {
- return UserQueues->GetRequest();
- }
- TUdpHttpResponse* GetResponse() override {
- return UserQueues->GetResponse();
- }
- bool GetRequestCancel(TGUID* req) override {
- return UserQueues->GetRequestCancel(req);
- }
- bool GetSendRequestAcc(TGUID* req) override {
- return UserQueues->GetSendRequestAcc(req);
- }
- TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
- TIntrusivePtr<TWaitResponse> wr = WaitableRequest(addr, url, data);
- wr->Wait();
- return wr->GetResponse();
- }
- TIntrusivePtr<TWaitResponse> WaitableRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
- TIntrusivePtr<TWaitResponse> wr = new TWaitResponse;
- TGUID reqId;
- CreateGuid(&reqId);
- SendRequestImpl(addr, url, data, reqId, wr.Get(), nullptr);
- return wr;
- }
- TMuxEvent& GetAsyncEvent() override {
- return UserQueues->GetAsyncEvent();
- }
- int GetPort() override {
- return Socket.Get() ? Socket->GetPort() : 0;
- }
- void StopNoWait() override {
- AbortTransactions = true;
- KeepRunning = false;
- UserQueues->AsyncSignal();
- // calcel all outgoing requests
- TGuard<TSpinLock> lock(cs);
- while (!OutRequests.empty()) {
- // cancel without informing peer that we are cancelling the request
- FinishRequest(OutRequests.begin(), TUdpHttpResponse::CANCELED, nullptr, "request canceled: inside TUdpHttp::StopNoWait()");
- }
- }
- void ExecStatsRequest(TIntrusivePtr<TStatsRequest> req) {
- StatsReqList.Enqueue(req);
- Host->CancelWait();
- req->Complete.Wait();
- }
- TUdpAddress GetPeerAddress(const TGUID& reqId) override {
- TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::GET_PEER_ADDRESS);
- req->RequestId = reqId;
- ExecStatsRequest(req);
- return req->PeerAddress;
- }
- void GetPendingDataSize(TRequesterPendingDataStats* res) override {
- TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::PENDING_SIZE);
- ExecStatsRequest(req);
- *res = req->PendingDataSize;
- }
- bool HasRequest(const TGUID& reqId) override {
- TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::HAS_IN_REQUEST);
- req->RequestId = reqId;
- ExecStatsRequest(req);
- return req->RequestFound;
- }
- private:
- void FinishOutstandingTransactions() {
- // wait all pending requests, all new requests are canceled
- while ((!OutRequests.empty() || !InRequests.empty() || !SendRespList.IsEmpty() || !SendReqList.IsEmpty()) && !PanicAttack) {
- while (TUdpHttpRequest* req = GetRequest()) {
- TInRequestHash::iterator i = InRequests.find(req->ReqId);
- //printf("dropping request(%s) (thread %d)\n", req->Url.c_str(), ThreadId());
- delete req;
- if (i == InRequests.end()) {
- Y_ASSERT(0);
- continue;
- }
- InRequests.erase(i);
- }
- Step();
- sleep(0);
- }
- }
- static void* ExecServerThread(void* param) {
- BindToSocket(0);
- SetHighestThreadPriority();
- TUdpHttp* pThis = (TUdpHttp*)param;
- pThis->Host = CreateUdpHost(pThis->Socket);
- pThis->HasStarted.Signal();
- if (!pThis->Host) {
- pThis->Socket.Drop();
- return nullptr;
- }
- NHPTimer::GetTime(&pThis->PingsSendT);
- while (pThis->KeepRunning && !PanicAttack) {
- if (HeartbeatTimeout.load(std::memory_order_acquire) > 0) {
- NHPTimer::STime chk = LastHeartbeat.load(std::memory_order_acquire);
- double passed = NHPTimer::GetTimePassed(&chk);
- if (passed > HeartbeatTimeout.load(std::memory_order_acquire)) {
- StopAllNetLibaThreads();
- fprintf(stderr, "%s\tTUdpHttp\tWaiting for %0.2f, time limit %0.2f, commit a suicide!11\n", Now().ToStringUpToSeconds().c_str(), passed, HeartbeatTimeout.load(std::memory_order_acquire));
- fflush(stderr);
- #ifndef _win_
- killpg(0, SIGKILL);
- #endif
- abort();
- break;
- }
- }
- pThis->Step();
- pThis->Wait();
- }
- if (!pThis->AbortTransactions && !PanicAttack)
- pThis->FinishOutstandingTransactions();
- pThis->Host = nullptr;
- return nullptr;
- }
- ~TUdpHttp() override {
- if (myThread.Running()) {
- KeepRunning = false;
- myThread.Join();
- }
- for (TIntrusivePtr<TStatsRequest> req; StatsReqList.Dequeue(&req);) {
- req->Complete.Signal();
- }
- }
- public:
- TUdpHttp()
- : myThread(TThread::TParams(ExecServerThread, (void*)this).SetName("nl6_udp_host"))
- , KeepRunning(true)
- , AbortTransactions(false)
- , PingsSendT(0)
- , ReportRequestCancel(false)
- , ReportSendRequestAcc(false)
- {
- NHPTimer::GetTime(&PingsSendT);
- QueueSizes = new TRequesterUserQueueSizes;
- UserQueues = new TRequesterUserQueues(QueueSizes.Get());
- }
- bool Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
- Y_ASSERT(Host.Get() == nullptr);
- Socket = socket;
- myThread.Start();
- HasStarted.Wait();
- if (Host.Get()) {
- return true;
- }
- Socket.Drop();
- return false;
- }
- TString GetDebugInfoLocked() {
- TString res = KeepRunning ? "State: running\n" : "State: stopping\n";
- res += Host->GetDebugInfo();
- char buf[1000];
- TRequesterUserQueueSizes* qs = QueueSizes.Get();
- snprintf(buf, sizeof(buf), "\nRequest queue %d (%d bytes)\n", (int)AtomicGet(qs->ReqCount), (int)AtomicGet(qs->ReqQueueSize));
- res += buf;
- snprintf(buf, sizeof(buf), "Response queue %d (%d bytes)\n", (int)AtomicGet(qs->RespCount), (int)AtomicGet(qs->RespQueueSize));
- res += buf;
- const char* outReqStateNames[] = {
- "S_SENDING",
- "S_WAITING",
- "S_WAITING_PING_SENDING",
- "S_WAITING_PING_SENT",
- "S_CANCEL_AFTER_SENDING"};
- const char* inReqStateNames[] = {
- "S_WAITING",
- "S_RESPONSE_SENDING",
- "S_CANCELED"};
- res += "\nOut requests:\n";
- for (TOutRequestHash::const_iterator i = OutRequests.begin(); i != OutRequests.end(); ++i) {
- const TGUID& gg = i->first;
- const TOutRequestState& s = i->second;
- bool isSync = SyncRequests.find(gg) != SyncRequests.end();
- snprintf(buf, sizeof(buf), "%s\t%s %s TimePassed: %g %s\n",
- GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), outReqStateNames[s.State],
- s.TimePassed * 1000,
- isSync ? "isSync" : "");
- res += buf;
- }
- res += "\nIn requests:\n";
- for (TInRequestHash::const_iterator i = InRequests.begin(); i != InRequests.end(); ++i) {
- const TGUID& gg = i->first;
- const TInRequestState& s = i->second;
- snprintf(buf, sizeof(buf), "%s\t%s %s\n",
- GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), inReqStateNames[s.State]);
- res += buf;
- }
- return res;
- }
- TString GetDebugInfo() override {
- TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::DEBUG_INFO);
- ExecStatsRequest(req);
- return req->DebugInfo;
- }
- void GetRequestQueueSize(TRequesterQueueStats* res) override {
- TRequesterUserQueueSizes* qs = QueueSizes.Get();
- res->ReqCount = (int)AtomicGet(qs->ReqCount);
- res->RespCount = (int)AtomicGet(qs->RespCount);
- res->ReqQueueSize = (int)AtomicGet(qs->ReqQueueSize);
- res->RespQueueSize = (int)AtomicGet(qs->RespQueueSize);
- }
- TRequesterUserQueueSizes* GetQueueSizes() const {
- return QueueSizes.Get();
- }
- IRequestOps* CreateSubRequester() override;
- void EnableReportRequestCancel() override {
- ReportRequestCancel = true;
- }
- void EnableReportSendRequestAcc() override {
- ReportSendRequestAcc = true;
- }
- TIntrusivePtr<IPeerQueueStats> GetQueueStats(const TUdpAddress& addr) override {
- TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::GET_PEER_QUEUE_STATS);
- req->PeerAddress = addr;
- ExecStatsRequest(req);
- return req->QueueStats;
- }
- };
- //////////////////////////////////////////////////////////////////////////
- static void ReadShm(TSharedMemory* shm, TVector<char>* data) {
- Y_ASSERT(shm);
- int dataSize = shm->GetSize();
- data->yresize(dataSize);
- memcpy(&(*data)[0], shm->GetPtr(), dataSize);
- }
- static void LoadRequestData(TUdpHttpRequest* res) {
- if (!res)
- return;
- {
- TBlockChainIterator reqData(res->DataHolder->Data->GetChain());
- char pktType;
- reqData.Read(&pktType, 1);
- ReadArr(&reqData, &res->Url);
- if (pktType == PKT_REQUEST) {
- ReadYArr(&reqData, &res->Data);
- } else if (pktType == PKT_LOCAL_REQUEST) {
- ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data);
- } else
- Y_ASSERT(0);
- if (reqData.HasFailed()) {
- Y_ASSERT(0 && "wrong format, memory corruption suspected");
- res->Url = "";
- res->Data.clear();
- }
- }
- res->DataHolder.Reset(nullptr);
- }
- static void LoadResponseData(TUdpHttpResponse* res) {
- if (!res || res->DataHolder.Get() == nullptr)
- return;
- {
- TBlockChainIterator reqData(res->DataHolder->Data->GetChain());
- char pktType;
- reqData.Read(&pktType, 1);
- TGUID guid;
- reqData.Read(&guid, sizeof(guid));
- Y_ASSERT(res->ReqId == guid);
- if (pktType == PKT_RESPONSE) {
- ReadYArr(&reqData, &res->Data);
- } else if (pktType == PKT_LOCAL_RESPONSE) {
- ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data);
- } else
- Y_ASSERT(0);
- if (reqData.HasFailed()) {
- Y_ASSERT(0 && "wrong format, memory corruption suspected");
- res->Ok = TUdpHttpResponse::FAILED;
- res->Data.clear();
- res->Error = "wrong response format";
- }
- }
- res->DataHolder.Reset(nullptr);
- }
- //////////////////////////////////////////////////////////////////////////
- // IRequestOps::TWaitResponse
- TUdpHttpResponse* IRequestOps::TWaitResponse::GetResponse() {
- if (!Response)
- return nullptr;
- TUdpHttpResponse* res = Response;
- Response = nullptr;
- LoadResponseData(res);
- return res;
- }
- void IRequestOps::TWaitResponse::SetResponse(TUdpHttpResponse* r) {
- Y_ASSERT(Response == nullptr || r == nullptr);
- if (r)
- Response = r;
- CompleteEvent.Signal();
- }
- //////////////////////////////////////////////////////////////////////////
- // TRequesterUserQueues
- TUdpHttpRequest* TRequesterUserQueues::GetRequest() {
- TUdpHttpRequest* res = nullptr;
- ReqList.Dequeue(&res);
- if (res) {
- AtomicAdd(QueueSizes->ReqCount, -1);
- AtomicAdd(QueueSizes->ReqQueueSize, -GetPacketSize(res->DataHolder.Get()));
- }
- UpdateAsyncSignalState();
- LoadRequestData(res);
- return res;
- }
- TUdpHttpResponse* TRequesterUserQueues::GetResponse() {
- TUdpHttpResponse* res = nullptr;
- ResponseList.Dequeue(&res);
- if (res) {
- AtomicAdd(QueueSizes->RespCount, -1);
- AtomicAdd(QueueSizes->RespQueueSize, -GetPacketSize(res->DataHolder.Get()));
- }
- UpdateAsyncSignalState();
- LoadResponseData(res);
- return res;
- }
- //////////////////////////////////////////////////////////////////////////
- class TRequestOps: public IRequestOps {
- TIntrusivePtr<TUdpHttp> Requester;
- TIntrusivePtr<TRequesterUserQueues> UserQueues;
- public:
- TRequestOps(TUdpHttp* req)
- : Requester(req)
- {
- UserQueues = new TRequesterUserQueues(req->GetQueueSizes());
- }
- void SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId) override {
- Requester->SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get());
- }
- void CancelRequest(const TGUID& reqId) override {
- Requester->CancelRequest(reqId);
- }
- void BreakRequest(const TGUID& reqId) override {
- Requester->BreakRequest(reqId);
- }
- void SendResponse(const TGUID& reqId, TVector<char>* data) override {
- Requester->SendResponseImpl(reqId, PP_NORMAL, data);
- }
- void SendResponseLowPriority(const TGUID& reqId, TVector<char>* data) override {
- Requester->SendResponseImpl(reqId, PP_LOW, data);
- }
- TUdpHttpRequest* GetRequest() override {
- Y_ASSERT(0);
- //return UserQueues.GetRequest();
- return nullptr; // all requests are routed to the main requester
- }
- TUdpHttpResponse* GetResponse() override {
- return UserQueues->GetResponse();
- }
- bool GetRequestCancel(TGUID*) override {
- Y_ASSERT(0);
- return false; // all request cancels are routed to the main requester
- }
- bool GetSendRequestAcc(TGUID* req) override {
- return UserQueues->GetSendRequestAcc(req);
- }
- // sync mode
- TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
- return Requester->Request(addr, url, data);
- }
- TIntrusivePtr<TWaitResponse> WaitableRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
- return Requester->WaitableRequest(addr, url, data);
- }
- //
- TMuxEvent& GetAsyncEvent() override {
- return UserQueues->GetAsyncEvent();
- }
- };
- IRequestOps* TUdpHttp::CreateSubRequester() {
- return new TRequestOps(this);
- }
- //////////////////////////////////////////////////////////////////////////
- void AbortOnFailedRequest(TUdpHttpResponse* answer) {
- if (answer && answer->Ok == TUdpHttpResponse::FAILED) {
- fprintf(stderr, "Failed request to host %s\n", GetAddressAsString(answer->PeerAddress).data());
- fprintf(stderr, "Error description: %s\n", answer->Error.data());
- fflush(nullptr);
- Y_ASSERT(0);
- abort();
- }
- }
- TString GetDebugInfo(const TUdpAddress& addr, double timeout) {
- NHPTimer::STime start;
- NHPTimer::GetTime(&start);
- TIntrusivePtr<IUdpHost> host = CreateUdpHost(0);
- {
- TAutoPtr<TRopeDataPacket> rq = new TRopeDataPacket;
- rq->Write((char)PKT_GETDEBUGINFO);
- ui32 crc32 = CalcChecksum(rq->GetChain());
- host->Send(addr, rq.Release(), crc32, nullptr, PP_HIGH);
- }
- for (;;) {
- TAutoPtr<TRequest> ptr = host->GetRequest();
- if (ptr.Get()) {
- TBlockChainIterator reqData(ptr->Data->GetChain());
- int sz = reqData.GetSize();
- TString res;
- res.resize(sz);
- reqData.Read(res.begin(), sz);
- return res;
- }
- host->Step();
- host->Wait(0.1f);
- NHPTimer::STime now;
- NHPTimer::GetTime(&now);
- if (NHPTimer::GetSeconds(now - start) > timeout) {
- return TString();
- }
- }
- }
- void Kill(const TUdpAddress& addr) {
- TIntrusivePtr<IUdpHost> host = CreateUdpHost(0);
- host->Kill(addr);
- }
- void StopAllNetLibaThreads() {
- PanicAttack = true; // AAAA!!!!
- }
- void SetNetLibaHeartbeatTimeout(double timeoutSec) {
- NetLibaHeartbeat();
- HeartbeatTimeout.store(timeoutSec, std::memory_order_release);
- }
- void NetLibaHeartbeat() {
- NHPTimer::STime now;
- NHPTimer::GetTime(&now);
- LastHeartbeat.store(now, std::memory_order_release);
- }
- IRequester* CreateHttpUdpRequester(int port) {
- if (PanicAttack)
- return nullptr;
- TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateSocket();
- socket->Open(port);
- if (!socket->IsValid())
- return nullptr;
- return CreateHttpUdpRequester(socket);
- }
- IRequester* CreateHttpUdpRequester(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
- if (PanicAttack)
- return nullptr;
- TIntrusivePtr<TUdpHttp> res(new TUdpHttp);
- if (!res->Start(socket))
- return nullptr;
- return res.Release();
- }
- }
|