#include "stdafx.h" #include "udp_http.h" #include "udp_client_server.h" #include "udp_socket.h" #include "cpu_affinity.h" #include #include #include #include #include #if !defined(_win_) #include #include #endif #include "block_chain.h" #include #include namespace NNetliba { const float HTTP_TIMEOUT = 15.0f; const int MIN_SHARED_MEM_PACKET = 1000; static ::NAtomic::TBool PanicAttack; static std::atomic LastHeartbeat; static std::atomic HeartbeatTimeout; static int GetPacketSize(TRequest* req) { if (req && req->Data.Get()) return req->Data->GetSize(); return 0; } static bool IsLocalFast(const TUdpAddress& addr) { if (addr.IsIPv4()) { return IsLocalIPv4(addr.GetIPv4()); } else { return IsLocalIPv6(addr.Network, addr.Interface); } } bool IsLocal(const TUdpAddress& addr) { InitLocalIPList(); return IsLocalFast(addr); } TUdpHttpRequest::~TUdpHttpRequest() { } TUdpHttpResponse::~TUdpHttpResponse() { } class TRequesterUserQueueSizes: public TThrRefBase { public: TAtomic ReqCount, RespCount; TAtomic ReqQueueSize, RespQueueSize; TRequesterUserQueueSizes() : ReqCount(0) , RespCount(0) , ReqQueueSize(0) , RespQueueSize(0) { } }; template void EraseList(TLockFreeQueue* data) { T* ptr = nullptr; while (data->Dequeue(&ptr)) { delete ptr; } } class TRequesterUserQueues: public TThrRefBase { TIntrusivePtr QueueSizes; TLockFreeQueue ReqList; TLockFreeQueue ResponseList; TLockFreeStack CancelList, SendRequestAccList; // any order will do TMuxEvent AsyncEvent; void UpdateAsyncSignalState() { // not sure about this one. Idea is that AsyncEvent.Reset() is a memory barrier if (ReqList.IsEmpty() && ResponseList.IsEmpty() && CancelList.IsEmpty() && SendRequestAccList.IsEmpty()) { AsyncEvent.Reset(); if (!ReqList.IsEmpty() || !ResponseList.IsEmpty() || !CancelList.IsEmpty() || !SendRequestAccList.IsEmpty()) AsyncEvent.Signal(); } } ~TRequesterUserQueues() override { EraseList(&ReqList); EraseList(&ResponseList); } public: TRequesterUserQueues(TRequesterUserQueueSizes* queueSizes) : QueueSizes(queueSizes) { } TUdpHttpRequest* GetRequest(); TUdpHttpResponse* GetResponse(); bool GetRequestCancel(TGUID* req) { bool res = CancelList.Dequeue(req); UpdateAsyncSignalState(); return res; } bool GetSendRequestAcc(TGUID* req) { bool res = SendRequestAccList.Dequeue(req); UpdateAsyncSignalState(); return res; } void AddRequest(TUdpHttpRequest* res) { AtomicAdd(QueueSizes->ReqCount, 1); AtomicAdd(QueueSizes->ReqQueueSize, GetPacketSize(res->DataHolder.Get())); ReqList.Enqueue(res); AsyncEvent.Signal(); } void AddResponse(TUdpHttpResponse* res) { AtomicAdd(QueueSizes->RespCount, 1); AtomicAdd(QueueSizes->RespQueueSize, GetPacketSize(res->DataHolder.Get())); ResponseList.Enqueue(res); AsyncEvent.Signal(); } void AddCancel(const TGUID& req) { CancelList.Enqueue(req); AsyncEvent.Signal(); } void AddSendRequestAcc(const TGUID& req) { SendRequestAccList.Enqueue(req); AsyncEvent.Signal(); } TMuxEvent& GetAsyncEvent() { return AsyncEvent; } void AsyncSignal() { AsyncEvent.Signal(); } }; struct TOutRequestState { enum EState { S_SENDING, S_WAITING, S_WAITING_PING_SENDING, S_WAITING_PING_SENT, S_CANCEL_AFTER_SENDING }; EState State; TUdpAddress Address; double TimePassed; int PingTransferId; TIntrusivePtr UserQueues; TOutRequestState() : State(S_SENDING) , TimePassed(0) , PingTransferId(-1) { } }; struct TInRequestState { enum EState { S_WAITING, S_RESPONSE_SENDING, S_CANCELED, }; EState State; TUdpAddress Address; TInRequestState() : State(S_WAITING) { } TInRequestState(const TUdpAddress& address) : State(S_WAITING) , Address(address) { } }; enum EHttpPacket { PKT_REQUEST, PKT_PING, PKT_PING_RESPONSE, PKT_RESPONSE, PKT_GETDEBUGINFO, PKT_LOCAL_REQUEST, PKT_LOCAL_RESPONSE, PKT_CANCEL, }; class TUdpHttp: public IRequester { enum EDir { DIR_OUT, DIR_IN }; struct TTransferPurpose { EDir Dir; TGUID Guid; TTransferPurpose() : Dir(DIR_OUT) { } TTransferPurpose(EDir dir, TGUID guid) : Dir(dir) , Guid(guid) { } }; struct TSendRequest { TUdpAddress Addr; TAutoPtr Data; TGUID ReqGuid; TIntrusivePtr WR; TIntrusivePtr UserQueues; ui32 Crc32; TSendRequest() : Crc32(0) { } TSendRequest(const TUdpAddress& addr, TAutoPtr* data, const TGUID& reqguid, TWaitResponse* wr, TRequesterUserQueues* userQueues) : Addr(addr) , Data(*data) , ReqGuid(reqguid) , WR(wr) , UserQueues(userQueues) , Crc32(CalcChecksum(Data->GetChain())) { } }; struct TSendResponse { TVector Data; TGUID ReqGuid; ui32 DataCrc32; EPacketPriority Priority; TSendResponse() : DataCrc32(0) , Priority(PP_NORMAL) { } TSendResponse(const TGUID& reqguid, EPacketPriority prior, TVector* data) : ReqGuid(reqguid) , DataCrc32(0) , Priority(prior) { if (data && !data->empty()) { data->swap(Data); DataCrc32 = TIncrementalChecksumCalcer::CalcBlockSum(&Data[0], Data.ysize()); } } }; struct TCancelRequest { TGUID ReqGuid; TCancelRequest() = default; TCancelRequest(const TGUID& reqguid) : ReqGuid(reqguid) { } }; struct TBreakRequest { TGUID ReqGuid; TBreakRequest() = default; TBreakRequest(const TGUID& reqguid) : ReqGuid(reqguid) { } }; TThread myThread; bool KeepRunning, AbortTransactions; TSpinLock cs; TSystemEvent HasStarted; NHPTimer::STime PingsSendT; TIntrusivePtr Host; TIntrusivePtr Socket; typedef THashMap TOutRequestHash; typedef THashMap TInRequestHash; TOutRequestHash OutRequests; TInRequestHash InRequests; typedef THashMap TTransferHash; TTransferHash TransferHash; typedef THashMap, TGUIDHash> TSyncRequests; TSyncRequests SyncRequests; // hold it here to not construct on every DoSends() typedef THashSet TAnticipateCancels; TAnticipateCancels AnticipateCancels; TLockFreeQueue SendReqList; TLockFreeQueue SendRespList; TLockFreeQueue CancelReqList; TLockFreeQueue BreakReqList; TIntrusivePtr QueueSizes; TIntrusivePtr UserQueues; struct TStatsRequest: public TThrRefBase { enum EReq { PENDING_SIZE, DEBUG_INFO, HAS_IN_REQUEST, GET_PEER_ADDRESS, GET_PEER_QUEUE_STATS, }; EReq Req; TRequesterPendingDataStats PendingDataSize; TString DebugInfo; TGUID RequestId; TUdpAddress PeerAddress; TIntrusivePtr QueueStats; bool RequestFound; TSystemEvent Complete; TStatsRequest(EReq req) : Req(req) , RequestFound(false) { } }; TLockFreeQueue> StatsReqList; bool ReportRequestCancel; bool ReportSendRequestAcc; void FinishRequest(TOutRequestHash::iterator i, TUdpHttpResponse::EResult ok, TAutoPtr data, const char* error = nullptr) { TOutRequestState& s = i->second; TUdpHttpResponse* res = new TUdpHttpResponse; res->DataHolder = data; res->ReqId = i->first; res->PeerAddress = s.Address; res->Ok = ok; if (ok == TUdpHttpResponse::FAILED) res->Error = error ? error : "request failed"; else if (ok == TUdpHttpResponse::CANCELED) res->Error = error ? error : "request cancelled"; TSyncRequests::iterator k = SyncRequests.find(res->ReqId); if (k != SyncRequests.end()) { TIntrusivePtr& wr = k->second; wr->SetResponse(res); SyncRequests.erase(k); } else { s.UserQueues->AddResponse(res); } OutRequests.erase(i); } int SendWithHighPriority(const TUdpAddress& addr, TAutoPtr data) { ui32 crc32 = CalcChecksum(data->GetChain()); return Host->Send(addr, data.Release(), crc32, nullptr, PP_HIGH); } void ProcessIncomingPackets() { TVector> failedRequests; for (;;) { TAutoPtr req = Host->GetRequest(); if (req.Get() == nullptr) { if (!failedRequests.empty()) { // we want to handle following sequence of events // <- send ping // -> send response over IB // -> send ping response (no such request) over UDP // Now if we are lucky enough we can get IB response waiting in the IB receive queue // at the same time response sender will receive "send complete" from IB // indeed, IB delivered message (but it was not parsed by ib_cs.cpp yet) // so after receiving "send response complete" event resposne sender can legally response // to pings with "no such request" // but ping responses can be sent over UDP // So we can run into situation with negative ping response in // UDP receive queue and response waiting unprocessed in IB receive queue // to check that there is no response in the IB queue we have to process IB queues // so we call IBStep() Host->IBStep(); req = Host->GetRequest(); if (req.Get() == nullptr) { break; } } else { break; } } TBlockChainIterator reqData(req->Data->GetChain()); char pktType; reqData.Read(&pktType, 1); switch (pktType) { case PKT_REQUEST: case PKT_LOCAL_REQUEST: { //printf("recv PKT_REQUEST or PKT_LOCAL_REQUEST\n"); TGUID reqId = req->Guid; TInRequestHash::iterator z = InRequests.find(reqId); if (z != InRequests.end()) { // oops, this request already exists! // might happen if request can be stored in single packet // and this packet had source IP broken during transmission and managed to pass crc checks // since we already reported wrong source address for this request to the user // the best thing we can do is to stop the program to avoid further complications // but we just report the accident to stderr fprintf(stderr, "Jackpot, same request %s received twice from %s and earlier from %s\n", GetGuidAsString(reqId).c_str(), GetAddressAsString(z->second.Address).c_str(), GetAddressAsString(req->Address).c_str()); } else { InRequests[reqId] = TInRequestState(req->Address); //printf("InReq %s PKT_REQUEST recv ... -> S_WAITING\n", GetGuidAsString(reqId).c_str()); TUdpHttpRequest* res = new TUdpHttpRequest; res->ReqId = reqId; res->PeerAddress = req->Address; res->DataHolder = req; UserQueues->AddRequest(res); } } break; case PKT_PING: { //printf("recv PKT_PING\n"); TGUID guid; reqData.Read(&guid, sizeof(guid)); bool ok = InRequests.find(guid) != InRequests.end(); TAutoPtr ms = new TRopeDataPacket; ms->Write((char)PKT_PING_RESPONSE); ms->Write(guid); ms->Write(ok); SendWithHighPriority(req->Address, ms.Release()); //printf("InReq %s PKT_PING recv Sending PKT_PING_RESPONSE\n", GetGuidAsString(guid).c_str()); //printf("got PKT_PING, responding %d\n", (int)ok); } break; case PKT_PING_RESPONSE: { //printf("recv PKT_PING_RESPONSE\n"); TGUID guid; bool ok; reqData.Read(&guid, sizeof(guid)); reqData.Read(&ok, sizeof(ok)); TOutRequestHash::iterator i = OutRequests.find(guid); if (i == OutRequests.end()) { ; //Y_ASSERT(0); // actually possible with some packet orders } else { if (!ok) { // can not delete request at this point // since we can receive failed ping and response at the same moment // consider sequence: client sends ping, server sends response // and replies false to ping as reply is sent // we can not receive failed ping_response earlier then response itself // but we can receive them simultaneously failedRequests.push_back(guid); //printf("OutReq %s PKT_PING_RESPONSE recv no such query -> failed\n", GetGuidAsString(guid).c_str()); } else { TOutRequestState& s = i->second; switch (s.State) { case TOutRequestState::S_WAITING_PING_SENDING: { Y_ASSERT(s.PingTransferId >= 0); TTransferHash::iterator k = TransferHash.find(s.PingTransferId); if (k != TransferHash.end()) TransferHash.erase(k); else Y_ASSERT(0); s.PingTransferId = -1; s.TimePassed = 0; s.State = TOutRequestState::S_WAITING; //printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENDING -> S_WAITING\n", GetGuidAsString(guid).c_str()); } break; case TOutRequestState::S_WAITING_PING_SENT: s.TimePassed = 0; s.State = TOutRequestState::S_WAITING; //printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENT -> S_WAITING\n", GetGuidAsString(guid).c_str()); break; default: Y_ASSERT(0); break; } } } } break; case PKT_RESPONSE: case PKT_LOCAL_RESPONSE: { //printf("recv PKT_RESPONSE or PKT_LOCAL_RESPONSE\n"); TGUID guid; reqData.Read(&guid, sizeof(guid)); TOutRequestHash::iterator i = OutRequests.find(guid); if (i == OutRequests.end()) { ; //Y_ASSERT(0); // does happen //printf("OutReq %s PKT_RESPONSE recv for non-existing req\n", GetGuidAsString(guid).c_str()); } else { FinishRequest(i, TUdpHttpResponse::OK, req); //printf("OutReq %s PKT_RESPONSE recv ... -> ok\n", GetGuidAsString(guid).c_str()); } } break; case PKT_CANCEL: { //printf("recv PKT_CANCEL\n"); TGUID guid; reqData.Read(&guid, sizeof(guid)); TInRequestHash::iterator i = InRequests.find(guid); if (i == InRequests.end()) { ; //Y_ASSERT(0); // may happen //printf("InReq %s PKT_CANCEL recv for non-existing req\n", GetGuidAsString(guid).c_str()); } else { TInRequestState& s = i->second; if (s.State != TInRequestState::S_CANCELED && ReportRequestCancel) UserQueues->AddCancel(guid); s.State = TInRequestState::S_CANCELED; //printf("InReq %s PKT_CANCEL recv\n", GetGuidAsString(guid).c_str()); } } break; case PKT_GETDEBUGINFO: { //printf("recv PKT_GETDEBUGINFO\n"); TString dbgInfo = GetDebugInfoLocked(); TAutoPtr ms = new TRopeDataPacket; ms->Write(dbgInfo.c_str(), (int)dbgInfo.size()); SendWithHighPriority(req->Address, ms); } break; default: Y_ASSERT(0); } } // cleanup failed requests for (size_t k = 0; k < failedRequests.size(); ++k) { const TGUID& guid = failedRequests[k]; TOutRequestHash::iterator i = OutRequests.find(guid); if (i != OutRequests.end()) FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: recv no such query"); } } void AnalyzeSendResults() { TSendResult res; while (Host->GetSendResult(&res)) { //printf("Send result received\n"); TTransferHash::iterator k1 = TransferHash.find(res.TransferId); if (k1 != TransferHash.end()) { const TTransferPurpose& tp = k1->second; switch (tp.Dir) { case DIR_OUT: { TOutRequestHash::iterator i = OutRequests.find(tp.Guid); if (i != OutRequests.end()) { const TGUID& reqId = i->first; TOutRequestState& s = i->second; switch (s.State) { case TOutRequestState::S_SENDING: if (!res.Success) { FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_SENDING"); //printf("OutReq %s AnalyzeSendResults() S_SENDING -> failed\n", GetGuidAsString(reqId).c_str()); } else { if (ReportSendRequestAcc) { if (s.UserQueues.Get()) { s.UserQueues->AddSendRequestAcc(reqId); } else { // waitable request? TSyncRequests::iterator k2 = SyncRequests.find(reqId); if (k2 != SyncRequests.end()) { TIntrusivePtr& wr = k2->second; wr->SetRequestSent(); } } } s.State = TOutRequestState::S_WAITING; //printf("OutReq %s AnalyzeSendResults() S_SENDING -> S_WAITING\n", GetGuidAsString(reqId).c_str()); s.TimePassed = 0; } break; case TOutRequestState::S_CANCEL_AFTER_SENDING: DoSendCancel(s.Address, reqId); FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request failed: state S_CANCEL_AFTER_SENDING"); break; case TOutRequestState::S_WAITING: case TOutRequestState::S_WAITING_PING_SENT: Y_ASSERT(0); break; case TOutRequestState::S_WAITING_PING_SENDING: Y_ASSERT(s.PingTransferId >= 0 && s.PingTransferId == res.TransferId); if (!res.Success) { FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_WAITING_PING_SENDING"); //printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> failed\n", GetGuidAsString(reqId).c_str()); } else { s.PingTransferId = -1; s.State = TOutRequestState::S_WAITING_PING_SENT; //printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> S_WAITING_PING_SENT\n", GetGuidAsString(reqId).c_str()); s.TimePassed = 0; } break; default: Y_ASSERT(0); break; } } } break; case DIR_IN: { TInRequestHash::iterator i = InRequests.find(tp.Guid); if (i != InRequests.end()) { Y_ASSERT(i->second.State == TInRequestState::S_RESPONSE_SENDING || i->second.State == TInRequestState::S_CANCELED); InRequests.erase(i); //if (res.Success) // printf("InReq %s AnalyzeSendResults() ... -> finished\n", GetGuidAsString(tp.Guid).c_str()); //else // printf("InReq %s AnalyzeSendResults() ... -> failed response send\n", GetGuidAsString(tp.Guid).c_str()); } } break; default: Y_ASSERT(0); break; } TransferHash.erase(k1); } } } void SendPingsIfNeeded() { NHPTimer::STime tChk = PingsSendT; float deltaT = (float)NHPTimer::GetTimePassed(&tChk); if (deltaT < 0.05) { return; } PingsSendT = tChk; deltaT = ClampVal(deltaT, 0.0f, HTTP_TIMEOUT / 3); { for (TOutRequestHash::iterator i = OutRequests.begin(); i != OutRequests.end();) { TOutRequestHash::iterator curIt = i++; TOutRequestState& s = curIt->second; const TGUID& guid = curIt->first; switch (s.State) { case TOutRequestState::S_WAITING: s.TimePassed += deltaT; if (s.TimePassed > HTTP_TIMEOUT) { TAutoPtr ms = new TRopeDataPacket; ms->Write((char)PKT_PING); ms->Write(guid); int transId = SendWithHighPriority(s.Address, ms.Release()); TransferHash[transId] = TTransferPurpose(DIR_OUT, guid); s.State = TOutRequestState::S_WAITING_PING_SENDING; //printf("OutReq %s SendPingsIfNeeded() S_WAITING -> S_WAITING_PING_SENDING\n", GetGuidAsString(guid).c_str()); s.PingTransferId = transId; } break; case TOutRequestState::S_WAITING_PING_SENT: s.TimePassed += deltaT; if (s.TimePassed > HTTP_TIMEOUT) { //printf("OutReq %s SendPingsIfNeeded() S_WAITING_PING_SENT -> failed\n", GetGuidAsString(guid).c_str()); FinishRequest(curIt, TUdpHttpResponse::FAILED, nullptr, "request failed: http timeout in state S_WAITING_PING_SENT"); } break; default: break; } } } } void Step() { { TGuard lock(cs); DoSends(); } Host->Step(); for (TIntrusivePtr req; StatsReqList.Dequeue(&req);) { switch (req->Req) { case TStatsRequest::PENDING_SIZE: Host->GetPendingDataSize(&req->PendingDataSize); break; case TStatsRequest::DEBUG_INFO: { TGuard lock(cs); req->DebugInfo = GetDebugInfoLocked(); } break; case TStatsRequest::HAS_IN_REQUEST: { TGuard lock(cs); req->RequestFound = (InRequests.find(req->RequestId) != InRequests.end()); } break; case TStatsRequest::GET_PEER_ADDRESS: { TGuard lock(cs); TInRequestHash::const_iterator i = InRequests.find(req->RequestId); if (i != InRequests.end()) { req->PeerAddress = i->second.Address; } else { TOutRequestHash::const_iterator o = OutRequests.find(req->RequestId); if (o != OutRequests.end()) { req->PeerAddress = o->second.Address; } else { req->PeerAddress = TUdpAddress(); } } } break; case TStatsRequest::GET_PEER_QUEUE_STATS: req->QueueStats = Host->GetQueueStats(req->PeerAddress); break; default: Y_ASSERT(0); break; } req->Complete.Signal(); } { TGuard lock(cs); DoSends(); ProcessIncomingPackets(); AnalyzeSendResults(); SendPingsIfNeeded(); } } void Wait() { Host->Wait(0.1f); } void DoSendCancel(const TUdpAddress& addr, const TGUID& req) { TAutoPtr ms = new TRopeDataPacket; ms->Write((char)PKT_CANCEL); ms->Write(req); SendWithHighPriority(addr, ms); } void DoSends() { { TBreakRequest rb; while (BreakReqList.Dequeue(&rb)) { InRequests.erase(rb.ReqGuid); } } { // cancelling requests TCancelRequest rc; while (CancelReqList.Dequeue(&rc)) { TOutRequestHash::iterator i = OutRequests.find(rc.ReqGuid); if (i == OutRequests.end()) { AnticipateCancels.insert(rc.ReqGuid); continue; // cancelling non existing request is ok } TOutRequestState& s = i->second; if (s.State == TOutRequestState::S_SENDING) { // we are in trouble - have not sent request and we already have to cancel it, wait send s.State = TOutRequestState::S_CANCEL_AFTER_SENDING; } else { DoSendCancel(s.Address, rc.ReqGuid); FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request canceled: notify requested side"); } } } { // sending replies for (TSendResponse* rd = nullptr; SendRespList.Dequeue(&rd); delete rd) { TInRequestHash::iterator i = InRequests.find(rd->ReqGuid); if (i == InRequests.end()) { Y_ASSERT(0); continue; } TInRequestState& s = i->second; if (s.State == TInRequestState::S_CANCELED) { // need not send response for the canceled request InRequests.erase(i); continue; } Y_ASSERT(s.State == TInRequestState::S_WAITING); s.State = TInRequestState::S_RESPONSE_SENDING; //printf("InReq %s SendResponse() ... -> S_RESPONSE_SENDING (pkt %s)\n", GetGuidAsString(reqId).c_str(), GetGuidAsString(lowPktGuid).c_str()); TAutoPtr ms = new TRopeDataPacket; ui32 crc32 = 0; int dataSize = rd->Data.ysize(); if (rd->Data.ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(s.Address)) { TIntrusivePtr shm = new TSharedMemory; if (shm->Create(dataSize)) { ms->Write((char)PKT_LOCAL_RESPONSE); ms->Write(rd->ReqGuid); memcpy(shm->GetPtr(), &rd->Data[0], dataSize); TVector empty; rd->Data.swap(empty); ms->AttachSharedData(shm); crc32 = CalcChecksum(ms->GetChain()); } } if (ms->GetSharedData() == nullptr) { ms->Write((char)PKT_RESPONSE); ms->Write(rd->ReqGuid); // to offload crc calcs from inner thread, crc of data[] is calced outside and passed in DataCrc32 // this means that we are calculating crc when shared memory is used // it is hard to avoid since in SendResponse() we don't know if shared mem will be used (peer address is not available there) TIncrementalChecksumCalcer csCalcer; AddChain(&csCalcer, ms->GetChain()); // here we are replicating the way WriteDestructive serializes data csCalcer.AddBlock(&dataSize, sizeof(dataSize)); csCalcer.AddBlockSum(rd->DataCrc32, dataSize); crc32 = csCalcer.CalcChecksum(); ms->WriteDestructive(&rd->Data); //ui32 chkCrc = CalcChecksum(ms->GetChain()); // can not use since its slow for large responses //Y_ASSERT(chkCrc == crc32); } int transId = Host->Send(s.Address, ms.Release(), crc32, nullptr, rd->Priority); TransferHash[transId] = TTransferPurpose(DIR_IN, rd->ReqGuid); } } { // sending requests for (TSendRequest* rd = nullptr; SendReqList.Dequeue(&rd); delete rd) { Y_ASSERT(OutRequests.find(rd->ReqGuid) == OutRequests.end()); { TOutRequestState& s = OutRequests[rd->ReqGuid]; s.State = TOutRequestState::S_SENDING; s.Address = rd->Addr; s.UserQueues = rd->UserQueues; //printf("OutReq %s SendRequest() ... -> S_SENDING\n", GetGuidAsString(guid).c_str()); } if (rd->WR.Get()) SyncRequests[rd->ReqGuid] = rd->WR; if (AnticipateCancels.find(rd->ReqGuid) != AnticipateCancels.end()) { FinishRequest(OutRequests.find(rd->ReqGuid), TUdpHttpResponse::CANCELED, nullptr, "request canceled before transmitting"); } else { TGUID pktGuid = rd->ReqGuid; // request packet id should match request id int transId = Host->Send(rd->Addr, rd->Data.Release(), rd->Crc32, &pktGuid, PP_NORMAL); TransferHash[transId] = TTransferPurpose(DIR_OUT, rd->ReqGuid); } } } if (!AnticipateCancels.empty()) { AnticipateCancels.clear(); } } public: void SendRequestImpl(const TUdpAddress& addr, const TString& url, TVector* data, const TGUID& reqId, TWaitResponse* wr, TRequesterUserQueues* userQueues) { if (data && data->size() > MAX_PACKET_SIZE) { Y_ABORT_UNLESS(0, "data size is too large"); } //printf("SendRequest(%s)\n", url.c_str()); if (wr) wr->SetReqId(reqId); TAutoPtr ms = new TRopeDataPacket; if (data && data->ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(addr)) { int dataSize = data->ysize(); TIntrusivePtr shm = new TSharedMemory; if (shm->Create(dataSize)) { ms->Write((char)PKT_LOCAL_REQUEST); ms->WriteStroka(url); memcpy(shm->GetPtr(), &(*data)[0], dataSize); TVector empty; data->swap(empty); ms->AttachSharedData(shm); } } if (ms->GetSharedData() == nullptr) { ms->Write((char)PKT_REQUEST); ms->WriteStroka(url); ms->WriteDestructive(data); } SendReqList.Enqueue(new TSendRequest(addr, &ms, reqId, wr, userQueues)); Host->CancelWait(); } void SendRequest(const TUdpAddress& addr, const TString& url, TVector* data, const TGUID& reqId) override { SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get()); } void CancelRequest(const TGUID& reqId) override { CancelReqList.Enqueue(TCancelRequest(reqId)); Host->CancelWait(); } void BreakRequest(const TGUID& reqId) override { BreakReqList.Enqueue(TBreakRequest(reqId)); Host->CancelWait(); } void SendResponseImpl(const TGUID& reqId, EPacketPriority prior, TVector* data) // non-virtual, for direct call from TRequestOps { if (data && data->size() > MAX_PACKET_SIZE) { Y_ABORT_UNLESS(0, "data size is too large"); } SendRespList.Enqueue(new TSendResponse(reqId, prior, data)); Host->CancelWait(); } void SendResponse(const TGUID& reqId, TVector* data) override { SendResponseImpl(reqId, PP_NORMAL, data); } void SendResponseLowPriority(const TGUID& reqId, TVector* data) override { SendResponseImpl(reqId, PP_LOW, data); } TUdpHttpRequest* GetRequest() override { return UserQueues->GetRequest(); } TUdpHttpResponse* GetResponse() override { return UserQueues->GetResponse(); } bool GetRequestCancel(TGUID* req) override { return UserQueues->GetRequestCancel(req); } bool GetSendRequestAcc(TGUID* req) override { return UserQueues->GetSendRequestAcc(req); } TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector* data) override { TIntrusivePtr wr = WaitableRequest(addr, url, data); wr->Wait(); return wr->GetResponse(); } TIntrusivePtr WaitableRequest(const TUdpAddress& addr, const TString& url, TVector* data) override { TIntrusivePtr wr = new TWaitResponse; TGUID reqId; CreateGuid(&reqId); SendRequestImpl(addr, url, data, reqId, wr.Get(), nullptr); return wr; } TMuxEvent& GetAsyncEvent() override { return UserQueues->GetAsyncEvent(); } int GetPort() override { return Socket.Get() ? Socket->GetPort() : 0; } void StopNoWait() override { AbortTransactions = true; KeepRunning = false; UserQueues->AsyncSignal(); // calcel all outgoing requests TGuard lock(cs); while (!OutRequests.empty()) { // cancel without informing peer that we are cancelling the request FinishRequest(OutRequests.begin(), TUdpHttpResponse::CANCELED, nullptr, "request canceled: inside TUdpHttp::StopNoWait()"); } } void ExecStatsRequest(TIntrusivePtr req) { StatsReqList.Enqueue(req); Host->CancelWait(); req->Complete.Wait(); } TUdpAddress GetPeerAddress(const TGUID& reqId) override { TIntrusivePtr req = new TStatsRequest(TStatsRequest::GET_PEER_ADDRESS); req->RequestId = reqId; ExecStatsRequest(req); return req->PeerAddress; } void GetPendingDataSize(TRequesterPendingDataStats* res) override { TIntrusivePtr req = new TStatsRequest(TStatsRequest::PENDING_SIZE); ExecStatsRequest(req); *res = req->PendingDataSize; } bool HasRequest(const TGUID& reqId) override { TIntrusivePtr req = new TStatsRequest(TStatsRequest::HAS_IN_REQUEST); req->RequestId = reqId; ExecStatsRequest(req); return req->RequestFound; } private: void FinishOutstandingTransactions() { // wait all pending requests, all new requests are canceled while ((!OutRequests.empty() || !InRequests.empty() || !SendRespList.IsEmpty() || !SendReqList.IsEmpty()) && !PanicAttack) { while (TUdpHttpRequest* req = GetRequest()) { TInRequestHash::iterator i = InRequests.find(req->ReqId); //printf("dropping request(%s) (thread %d)\n", req->Url.c_str(), ThreadId()); delete req; if (i == InRequests.end()) { Y_ASSERT(0); continue; } InRequests.erase(i); } Step(); sleep(0); } } static void* ExecServerThread(void* param) { BindToSocket(0); SetHighestThreadPriority(); TUdpHttp* pThis = (TUdpHttp*)param; pThis->Host = CreateUdpHost(pThis->Socket); pThis->HasStarted.Signal(); if (!pThis->Host) { pThis->Socket.Drop(); return nullptr; } NHPTimer::GetTime(&pThis->PingsSendT); while (pThis->KeepRunning && !PanicAttack) { if (HeartbeatTimeout.load(std::memory_order_acquire) > 0) { NHPTimer::STime chk = LastHeartbeat.load(std::memory_order_acquire); double passed = NHPTimer::GetTimePassed(&chk); if (passed > HeartbeatTimeout.load(std::memory_order_acquire)) { StopAllNetLibaThreads(); fprintf(stderr, "%s\tTUdpHttp\tWaiting for %0.2f, time limit %0.2f, commit a suicide!11\n", Now().ToStringUpToSeconds().c_str(), passed, HeartbeatTimeout.load(std::memory_order_acquire)); fflush(stderr); #ifndef _win_ killpg(0, SIGKILL); #endif abort(); break; } } pThis->Step(); pThis->Wait(); } if (!pThis->AbortTransactions && !PanicAttack) pThis->FinishOutstandingTransactions(); pThis->Host = nullptr; return nullptr; } ~TUdpHttp() override { if (myThread.Running()) { KeepRunning = false; myThread.Join(); } for (TIntrusivePtr req; StatsReqList.Dequeue(&req);) { req->Complete.Signal(); } } public: TUdpHttp() : myThread(TThread::TParams(ExecServerThread, (void*)this).SetName("nl6_udp_host")) , KeepRunning(true) , AbortTransactions(false) , PingsSendT(0) , ReportRequestCancel(false) , ReportSendRequestAcc(false) { NHPTimer::GetTime(&PingsSendT); QueueSizes = new TRequesterUserQueueSizes; UserQueues = new TRequesterUserQueues(QueueSizes.Get()); } bool Start(const TIntrusivePtr& socket) { Y_ASSERT(Host.Get() == nullptr); Socket = socket; myThread.Start(); HasStarted.Wait(); if (Host.Get()) { return true; } Socket.Drop(); return false; } TString GetDebugInfoLocked() { TString res = KeepRunning ? "State: running\n" : "State: stopping\n"; res += Host->GetDebugInfo(); char buf[1000]; TRequesterUserQueueSizes* qs = QueueSizes.Get(); snprintf(buf, sizeof(buf), "\nRequest queue %d (%d bytes)\n", (int)AtomicGet(qs->ReqCount), (int)AtomicGet(qs->ReqQueueSize)); res += buf; snprintf(buf, sizeof(buf), "Response queue %d (%d bytes)\n", (int)AtomicGet(qs->RespCount), (int)AtomicGet(qs->RespQueueSize)); res += buf; const char* outReqStateNames[] = { "S_SENDING", "S_WAITING", "S_WAITING_PING_SENDING", "S_WAITING_PING_SENT", "S_CANCEL_AFTER_SENDING"}; const char* inReqStateNames[] = { "S_WAITING", "S_RESPONSE_SENDING", "S_CANCELED"}; res += "\nOut requests:\n"; for (TOutRequestHash::const_iterator i = OutRequests.begin(); i != OutRequests.end(); ++i) { const TGUID& gg = i->first; const TOutRequestState& s = i->second; bool isSync = SyncRequests.find(gg) != SyncRequests.end(); snprintf(buf, sizeof(buf), "%s\t%s %s TimePassed: %g %s\n", GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), outReqStateNames[s.State], s.TimePassed * 1000, isSync ? "isSync" : ""); res += buf; } res += "\nIn requests:\n"; for (TInRequestHash::const_iterator i = InRequests.begin(); i != InRequests.end(); ++i) { const TGUID& gg = i->first; const TInRequestState& s = i->second; snprintf(buf, sizeof(buf), "%s\t%s %s\n", GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), inReqStateNames[s.State]); res += buf; } return res; } TString GetDebugInfo() override { TIntrusivePtr req = new TStatsRequest(TStatsRequest::DEBUG_INFO); ExecStatsRequest(req); return req->DebugInfo; } void GetRequestQueueSize(TRequesterQueueStats* res) override { TRequesterUserQueueSizes* qs = QueueSizes.Get(); res->ReqCount = (int)AtomicGet(qs->ReqCount); res->RespCount = (int)AtomicGet(qs->RespCount); res->ReqQueueSize = (int)AtomicGet(qs->ReqQueueSize); res->RespQueueSize = (int)AtomicGet(qs->RespQueueSize); } TRequesterUserQueueSizes* GetQueueSizes() const { return QueueSizes.Get(); } IRequestOps* CreateSubRequester() override; void EnableReportRequestCancel() override { ReportRequestCancel = true; } void EnableReportSendRequestAcc() override { ReportSendRequestAcc = true; } TIntrusivePtr GetQueueStats(const TUdpAddress& addr) override { TIntrusivePtr req = new TStatsRequest(TStatsRequest::GET_PEER_QUEUE_STATS); req->PeerAddress = addr; ExecStatsRequest(req); return req->QueueStats; } }; ////////////////////////////////////////////////////////////////////////// static void ReadShm(TSharedMemory* shm, TVector* data) { Y_ASSERT(shm); int dataSize = shm->GetSize(); data->yresize(dataSize); memcpy(&(*data)[0], shm->GetPtr(), dataSize); } static void LoadRequestData(TUdpHttpRequest* res) { if (!res) return; { TBlockChainIterator reqData(res->DataHolder->Data->GetChain()); char pktType; reqData.Read(&pktType, 1); ReadArr(&reqData, &res->Url); if (pktType == PKT_REQUEST) { ReadYArr(&reqData, &res->Data); } else if (pktType == PKT_LOCAL_REQUEST) { ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data); } else Y_ASSERT(0); if (reqData.HasFailed()) { Y_ASSERT(0 && "wrong format, memory corruption suspected"); res->Url = ""; res->Data.clear(); } } res->DataHolder.Reset(nullptr); } static void LoadResponseData(TUdpHttpResponse* res) { if (!res || res->DataHolder.Get() == nullptr) return; { TBlockChainIterator reqData(res->DataHolder->Data->GetChain()); char pktType; reqData.Read(&pktType, 1); TGUID guid; reqData.Read(&guid, sizeof(guid)); Y_ASSERT(res->ReqId == guid); if (pktType == PKT_RESPONSE) { ReadYArr(&reqData, &res->Data); } else if (pktType == PKT_LOCAL_RESPONSE) { ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data); } else Y_ASSERT(0); if (reqData.HasFailed()) { Y_ASSERT(0 && "wrong format, memory corruption suspected"); res->Ok = TUdpHttpResponse::FAILED; res->Data.clear(); res->Error = "wrong response format"; } } res->DataHolder.Reset(nullptr); } ////////////////////////////////////////////////////////////////////////// // IRequestOps::TWaitResponse TUdpHttpResponse* IRequestOps::TWaitResponse::GetResponse() { if (!Response) return nullptr; TUdpHttpResponse* res = Response; Response = nullptr; LoadResponseData(res); return res; } void IRequestOps::TWaitResponse::SetResponse(TUdpHttpResponse* r) { Y_ASSERT(Response == nullptr || r == nullptr); if (r) Response = r; CompleteEvent.Signal(); } ////////////////////////////////////////////////////////////////////////// // TRequesterUserQueues TUdpHttpRequest* TRequesterUserQueues::GetRequest() { TUdpHttpRequest* res = nullptr; ReqList.Dequeue(&res); if (res) { AtomicAdd(QueueSizes->ReqCount, -1); AtomicAdd(QueueSizes->ReqQueueSize, -GetPacketSize(res->DataHolder.Get())); } UpdateAsyncSignalState(); LoadRequestData(res); return res; } TUdpHttpResponse* TRequesterUserQueues::GetResponse() { TUdpHttpResponse* res = nullptr; ResponseList.Dequeue(&res); if (res) { AtomicAdd(QueueSizes->RespCount, -1); AtomicAdd(QueueSizes->RespQueueSize, -GetPacketSize(res->DataHolder.Get())); } UpdateAsyncSignalState(); LoadResponseData(res); return res; } ////////////////////////////////////////////////////////////////////////// class TRequestOps: public IRequestOps { TIntrusivePtr Requester; TIntrusivePtr UserQueues; public: TRequestOps(TUdpHttp* req) : Requester(req) { UserQueues = new TRequesterUserQueues(req->GetQueueSizes()); } void SendRequest(const TUdpAddress& addr, const TString& url, TVector* data, const TGUID& reqId) override { Requester->SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get()); } void CancelRequest(const TGUID& reqId) override { Requester->CancelRequest(reqId); } void BreakRequest(const TGUID& reqId) override { Requester->BreakRequest(reqId); } void SendResponse(const TGUID& reqId, TVector* data) override { Requester->SendResponseImpl(reqId, PP_NORMAL, data); } void SendResponseLowPriority(const TGUID& reqId, TVector* data) override { Requester->SendResponseImpl(reqId, PP_LOW, data); } TUdpHttpRequest* GetRequest() override { Y_ASSERT(0); //return UserQueues.GetRequest(); return nullptr; // all requests are routed to the main requester } TUdpHttpResponse* GetResponse() override { return UserQueues->GetResponse(); } bool GetRequestCancel(TGUID*) override { Y_ASSERT(0); return false; // all request cancels are routed to the main requester } bool GetSendRequestAcc(TGUID* req) override { return UserQueues->GetSendRequestAcc(req); } // sync mode TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector* data) override { return Requester->Request(addr, url, data); } TIntrusivePtr WaitableRequest(const TUdpAddress& addr, const TString& url, TVector* data) override { return Requester->WaitableRequest(addr, url, data); } // TMuxEvent& GetAsyncEvent() override { return UserQueues->GetAsyncEvent(); } }; IRequestOps* TUdpHttp::CreateSubRequester() { return new TRequestOps(this); } ////////////////////////////////////////////////////////////////////////// void AbortOnFailedRequest(TUdpHttpResponse* answer) { if (answer && answer->Ok == TUdpHttpResponse::FAILED) { fprintf(stderr, "Failed request to host %s\n", GetAddressAsString(answer->PeerAddress).data()); fprintf(stderr, "Error description: %s\n", answer->Error.data()); fflush(nullptr); Y_ASSERT(0); abort(); } } TString GetDebugInfo(const TUdpAddress& addr, double timeout) { NHPTimer::STime start; NHPTimer::GetTime(&start); TIntrusivePtr host = CreateUdpHost(0); { TAutoPtr rq = new TRopeDataPacket; rq->Write((char)PKT_GETDEBUGINFO); ui32 crc32 = CalcChecksum(rq->GetChain()); host->Send(addr, rq.Release(), crc32, nullptr, PP_HIGH); } for (;;) { TAutoPtr ptr = host->GetRequest(); if (ptr.Get()) { TBlockChainIterator reqData(ptr->Data->GetChain()); int sz = reqData.GetSize(); TString res; res.resize(sz); reqData.Read(res.begin(), sz); return res; } host->Step(); host->Wait(0.1f); NHPTimer::STime now; NHPTimer::GetTime(&now); if (NHPTimer::GetSeconds(now - start) > timeout) { return TString(); } } } void Kill(const TUdpAddress& addr) { TIntrusivePtr host = CreateUdpHost(0); host->Kill(addr); } void StopAllNetLibaThreads() { PanicAttack = true; // AAAA!!!! } void SetNetLibaHeartbeatTimeout(double timeoutSec) { NetLibaHeartbeat(); HeartbeatTimeout.store(timeoutSec, std::memory_order_release); } void NetLibaHeartbeat() { NHPTimer::STime now; NHPTimer::GetTime(&now); LastHeartbeat.store(now, std::memory_order_release); } IRequester* CreateHttpUdpRequester(int port) { if (PanicAttack) return nullptr; TIntrusivePtr socket = NNetlibaSocket::CreateSocket(); socket->Open(port); if (!socket->IsValid()) return nullptr; return CreateHttpUdpRequester(socket); } IRequester* CreateHttpUdpRequester(const TIntrusivePtr& socket) { if (PanicAttack) return nullptr; TIntrusivePtr res(new TUdpHttp); if (!res->Start(socket)) return nullptr; return res.Release(); } }