123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808 |
- #include "netliba_udp_http.h"
- #include "utils.h"
- #include <library/cpp/netliba/v6/cpu_affinity.h>
- #include <library/cpp/netliba/v6/stdafx.h>
- #include <library/cpp/netliba/v6/udp_client_server.h>
- #include <library/cpp/netliba/v6/udp_socket.h>
- #include <library/cpp/netliba/v6/block_chain.h> // depend on another headers
- #include <util/system/hp_timer.h>
- #include <util/system/shmat.h>
- #include <util/system/spinlock.h>
- #include <util/system/thread.h>
- #include <util/system/types.h>
- #include <util/system/yassert.h>
- #include <util/thread/lfqueue.h>
- #include <atomic>
- #if !defined(_win_)
- #include <signal.h>
- #include <pthread.h>
- #endif
- using namespace NNetliba;
- namespace {
- const float HTTP_TIMEOUT = 15.0f;
- const size_t MIN_SHARED_MEM_PACKET = 1000;
- const size_t MAX_PACKET_SIZE = 0x70000000;
- NNeh::TAtomicBool PanicAttack;
- std::atomic<NHPTimer::STime> LastHeartbeat;
- std::atomic<double> HeartbeatTimeout;
- bool IsLocal(const TUdpAddress& addr) {
- return addr.IsIPv4() ? IsLocalIPv4(addr.GetIPv4()) : IsLocalIPv6(addr.Network, addr.Interface);
- }
- void StopAllNetLibaThreads() {
- PanicAttack = true; // AAAA!!!!
- }
- void ReadShm(TSharedMemory* shm, TVector<char>* data) {
- Y_ASSERT(shm);
- int dataSize = shm->GetSize();
- data->yresize(dataSize);
- memcpy(&(*data)[0], shm->GetPtr(), dataSize);
- }
- void ReadShm(TSharedMemory* shm, TString* data) {
- Y_ASSERT(shm);
- size_t dataSize = shm->GetSize();
- data->ReserveAndResize(dataSize);
- memcpy(data->begin(), shm->GetPtr(), dataSize);
- }
- template <class T>
- void EraseList(TLockFreeQueue<T*>* data) {
- T* ptr = 0;
- while (data->Dequeue(&ptr)) {
- delete ptr;
- }
- }
- enum EHttpPacket {
- PKT_REQUEST,
- PKT_PING,
- PKT_PING_RESPONSE,
- PKT_RESPONSE,
- PKT_LOCAL_REQUEST,
- PKT_LOCAL_RESPONSE,
- PKT_CANCEL,
- };
- }
- namespace NNehNetliba {
- TUdpHttpMessage::TUdpHttpMessage(const TGUID& reqId, const TUdpAddress& peerAddr)
- : ReqId(reqId)
- , PeerAddress(peerAddr)
- {
- }
- TUdpHttpRequest::TUdpHttpRequest(TAutoPtr<TRequest>& dataHolder, const TGUID& reqId, const TUdpAddress& peerAddr)
- : TUdpHttpMessage(reqId, peerAddr)
- {
- TBlockChainIterator reqData(dataHolder->Data->GetChain());
- char pktType;
- reqData.Read(&pktType, 1);
- ReadArr(&reqData, &Url);
- if (pktType == PKT_REQUEST) {
- ReadYArr(&reqData, &Data);
- } else if (pktType == PKT_LOCAL_REQUEST) {
- ReadShm(dataHolder->Data->GetSharedData(), &Data);
- } else {
- Y_ASSERT(0);
- }
- if (reqData.HasFailed()) {
- Y_ASSERT(0 && "wrong format, memory corruption suspected");
- Url = "";
- Data.clear();
- }
- }
- TUdpHttpResponse::TUdpHttpResponse(TAutoPtr<TRequest>& dataHolder, const TGUID& reqId, const TUdpAddress& peerAddr, EResult result, const char* error)
- : TUdpHttpMessage(reqId, peerAddr)
- , Ok(result)
- {
- if (result == TUdpHttpResponse::FAILED) {
- Error = error ? error : "request failed";
- } else if (result == TUdpHttpResponse::CANCELED) {
- Error = error ? error : "request cancelled";
- } else {
- TBlockChainIterator reqData(dataHolder->Data->GetChain());
- if (Y_UNLIKELY(reqData.HasFailed())) {
- Y_ASSERT(0 && "wrong format, memory corruption suspected");
- Ok = TUdpHttpResponse::FAILED;
- Data.clear();
- Error = "wrong response format";
- } else {
- char pktType;
- reqData.Read(&pktType, 1);
- TGUID guid;
- reqData.Read(&guid, sizeof(guid));
- Y_ASSERT(ReqId == guid);
- if (pktType == PKT_RESPONSE) {
- ReadArr<TString>(&reqData, &Data);
- } else if (pktType == PKT_LOCAL_RESPONSE) {
- ReadShm(dataHolder->Data->GetSharedData(), &Data);
- } else {
- Y_ASSERT(0);
- }
- }
- }
- }
- class TUdpHttp: public IRequester {
- enum EDir {
- DIR_OUT,
- DIR_IN
- };
- struct TInRequestState {
- enum EState {
- S_WAITING,
- S_RESPONSE_SENDING,
- S_CANCELED,
- };
- TInRequestState()
- : State(S_WAITING)
- {
- }
- TInRequestState(const TUdpAddress& address)
- : State(S_WAITING)
- , Address(address)
- {
- }
- EState State;
- TUdpAddress Address;
- };
- struct TOutRequestState {
- enum EState {
- S_SENDING,
- S_WAITING,
- S_WAITING_PING_SENDING,
- S_WAITING_PING_SENT,
- S_CANCEL_AFTER_SENDING
- };
- TOutRequestState()
- : State(S_SENDING)
- , TimePassed(0)
- , PingTransferId(-1)
- {
- }
- EState State;
- TUdpAddress Address;
- double TimePassed;
- int PingTransferId;
- IEventsCollectorRef EventsCollector;
- };
- struct TTransferPurpose {
- EDir Dir;
- TGUID Guid;
- TTransferPurpose()
- : Dir(DIR_OUT)
- {
- }
- TTransferPurpose(EDir dir, TGUID guid)
- : Dir(dir)
- , Guid(guid)
- {
- }
- };
- struct TSendRequest {
- TSendRequest() = default;
- TSendRequest(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket>* data, const TGUID& reqGuid, const IEventsCollectorRef& eventsCollector)
- : Addr(addr)
- , Data(*data)
- , ReqGuid(reqGuid)
- , EventsCollector(eventsCollector)
- , Crc32(CalcChecksum(Data->GetChain()))
- {
- }
- TUdpAddress Addr;
- TAutoPtr<TRopeDataPacket> Data;
- TGUID ReqGuid;
- IEventsCollectorRef EventsCollector;
- ui32 Crc32;
- };
- struct TSendResponse {
- TSendResponse() = default;
- TSendResponse(const TGUID& reqGuid, EPacketPriority prior, TVector<char>* data)
- : ReqGuid(reqGuid)
- , DataCrc32(0)
- , Priority(prior)
- {
- if (data && !data->empty()) {
- data->swap(Data);
- DataCrc32 = TIncrementalChecksumCalcer::CalcBlockSum(&Data[0], Data.ysize());
- }
- }
- TVector<char> Data;
- TGUID ReqGuid;
- ui32 DataCrc32;
- EPacketPriority Priority;
- };
- typedef THashMap<TGUID, TOutRequestState, TGUIDHash> TOutRequestHash;
- typedef THashMap<TGUID, TInRequestState, TGUIDHash> TInRequestHash;
- public:
- TUdpHttp(const IEventsCollectorRef& eventsCollector)
- : MyThread_(ExecServerThread, (void*)this)
- , AbortTransactions_(false)
- , Port_(0)
- , EventCollector_(eventsCollector)
- , ReportRequestCancel_(false)
- , ReporRequestAck_(false)
- , PhysicalCpu_(-1)
- {
- }
- ~TUdpHttp() override {
- if (MyThread_.Running()) {
- AtomicSet(KeepRunning_, 0);
- MyThread_.Join();
- }
- }
- bool Start(int port, int physicalCpu) {
- Y_ASSERT(Host_.Get() == nullptr);
- Port_ = port;
- PhysicalCpu_ = physicalCpu;
- MyThread_.Start();
- HasStarted_.Wait();
- return Host_.Get() != nullptr;
- }
- void EnableReportRequestCancel() override {
- ReportRequestCancel_ = true;
- }
- void EnableReportRequestAck() override {
- ReporRequestAck_ = true;
- }
- void SendRequest(const TUdpAddress& addr, const TString& url, const TString& data, const TGUID& reqId) override {
- Y_ABORT_UNLESS(
- data.size() < MAX_PACKET_SIZE,
- "data size is too large; data.size()=%" PRISZT ", MAX_PACKET_SIZE=%" PRISZT,
- data.size(), MAX_PACKET_SIZE);
- TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
- if (data.size() > MIN_SHARED_MEM_PACKET && IsLocal(addr)) {
- TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
- if (shm->Create(data.size())) {
- ms->Write((char)PKT_LOCAL_REQUEST);
- ms->WriteStroka(url);
- memcpy(shm->GetPtr(), data.begin(), data.size());
- ms->AttachSharedData(shm);
- }
- }
- if (ms->GetSharedData() == nullptr) {
- ms->Write((char)PKT_REQUEST);
- ms->WriteStroka(url);
- struct TStrokaStorage: public TThrRefBase, public TString {
- TStrokaStorage(const TString& s)
- : TString(s)
- {
- }
- };
- TStrokaStorage* ss = new TStrokaStorage(data);
- ms->Write((int)ss->size());
- ms->AddBlock(ss, ss->begin(), ss->size());
- }
- SendReqList_.Enqueue(new TSendRequest(addr, &ms, reqId, EventCollector_));
- Host_->CancelWait();
- }
- void CancelRequest(const TGUID& reqId) override {
- CancelReqList_.Enqueue(reqId);
- Host_->CancelWait();
- }
- void SendResponse(const TGUID& reqId, TVector<char>* data) override {
- if (data && data->size() > MAX_PACKET_SIZE) {
- Y_ABORT(
- "data size is too large; data->size()=%" PRISZT ", MAX_PACKET_SIZE=%" PRISZT,
- data->size(), MAX_PACKET_SIZE);
- }
- SendRespList_.Enqueue(new TSendResponse(reqId, PP_NORMAL, data));
- Host_->CancelWait();
- }
- void StopNoWait() override {
- AbortTransactions_ = true;
- AtomicSet(KeepRunning_, 0);
- // calcel all outgoing requests
- TGuard<TSpinLock> lock(Spn_);
- while (!OutRequests_.empty()) {
- // cancel without informing peer that we are cancelling the request
- FinishRequest(OutRequests_.begin(), TUdpHttpResponse::CANCELED, nullptr, "request canceled: inside TUdpHttp::StopNoWait()");
- }
- }
- private:
- void FinishRequest(TOutRequestHash::iterator i, TUdpHttpResponse::EResult ok, TRequestPtr data, const char* error = nullptr) {
- TOutRequestState& s = i->second;
- s.EventsCollector->AddResponse(new TUdpHttpResponse(data, i->first, s.Address, ok, error));
- OutRequests_.erase(i);
- }
- int SendWithHighPriority(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data) {
- ui32 crc32 = CalcChecksum(data->GetChain());
- return Host_->Send(addr, data.Release(), crc32, nullptr, PP_HIGH);
- }
- void ProcessIncomingPackets() {
- TVector<TGUID> failedRequests;
- for (;;) {
- TAutoPtr<TRequest> req = Host_->GetRequest();
- if (req.Get() == nullptr)
- break;
- TBlockChainIterator reqData(req->Data->GetChain());
- char pktType;
- reqData.Read(&pktType, 1);
- switch (pktType) {
- case PKT_REQUEST:
- case PKT_LOCAL_REQUEST: {
- TGUID reqId = req->Guid;
- TInRequestHash::iterator z = InRequests_.find(reqId);
- if (z != InRequests_.end()) {
- // oops, this request already exists!
- // might happen if request can be stored in single packet
- // and this packet had source IP broken during transmission and managed to pass crc checks
- // since we already reported wrong source address for this request to the user
- // the best thing we can do is to stop the program to avoid further complications
- // but we just report the accident to stderr
- fprintf(stderr, "Jackpot, same request %s received twice from %s and earlier from %s\n",
- GetGuidAsString(reqId).c_str(), GetAddressAsString(z->second.Address).c_str(),
- GetAddressAsString(req->Address).c_str());
- } else {
- InRequests_[reqId] = TInRequestState(req->Address);
- EventCollector_->AddRequest(new TUdpHttpRequest(req, reqId, req->Address));
- }
- } break;
- case PKT_PING: {
- TGUID guid;
- reqData.Read(&guid, sizeof(guid));
- bool ok = InRequests_.find(guid) != InRequests_.end();
- TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
- ms->Write((char)PKT_PING_RESPONSE);
- ms->Write(guid);
- ms->Write(ok);
- SendWithHighPriority(req->Address, ms.Release());
- } break;
- case PKT_PING_RESPONSE: {
- TGUID guid;
- bool ok;
- reqData.Read(&guid, sizeof(guid));
- reqData.Read(&ok, sizeof(ok));
- TOutRequestHash::iterator i = OutRequests_.find(guid);
- if (i == OutRequests_.end())
- ; //Y_ASSERT(0); // actually possible with some packet orders
- else {
- if (!ok) {
- // can not delete request at this point
- // since we can receive failed ping and response at the same moment
- // consider sequence: client sends ping, server sends response
- // and replies false to ping as reply is sent
- // we can not receive failed ping_response earlier then response itself
- // but we can receive them simultaneously
- failedRequests.push_back(guid);
- } else {
- TOutRequestState& s = i->second;
- switch (s.State) {
- case TOutRequestState::S_WAITING_PING_SENDING: {
- Y_ASSERT(s.PingTransferId >= 0);
- TTransferHash::iterator k = TransferHash_.find(s.PingTransferId);
- if (k != TransferHash_.end())
- TransferHash_.erase(k);
- else
- Y_ASSERT(0);
- s.PingTransferId = -1;
- s.TimePassed = 0;
- s.State = TOutRequestState::S_WAITING;
- } break;
- case TOutRequestState::S_WAITING_PING_SENT:
- s.TimePassed = 0;
- s.State = TOutRequestState::S_WAITING;
- break;
- default:
- Y_ASSERT(0);
- break;
- }
- }
- }
- } break;
- case PKT_RESPONSE:
- case PKT_LOCAL_RESPONSE: {
- TGUID guid;
- reqData.Read(&guid, sizeof(guid));
- TOutRequestHash::iterator i = OutRequests_.find(guid);
- if (i == OutRequests_.end()) {
- ; //Y_ASSERT(0); // does happen
- } else {
- FinishRequest(i, TUdpHttpResponse::OK, req);
- }
- } break;
- case PKT_CANCEL: {
- TGUID guid;
- reqData.Read(&guid, sizeof(guid));
- TInRequestHash::iterator i = InRequests_.find(guid);
- if (i == InRequests_.end()) {
- ; //Y_ASSERT(0); // may happen
- } else {
- TInRequestState& s = i->second;
- if (s.State != TInRequestState::S_CANCELED && ReportRequestCancel_)
- EventCollector_->AddCancel(guid);
- s.State = TInRequestState::S_CANCELED;
- }
- } break;
- default:
- Y_ASSERT(0);
- }
- }
- // cleanup failed requests
- for (size_t k = 0; k < failedRequests.size(); ++k) {
- const TGUID& guid = failedRequests[k];
- TOutRequestHash::iterator i = OutRequests_.find(guid);
- if (i != OutRequests_.end())
- FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "failed udp ping");
- }
- }
- void AnalyzeSendResults() {
- TSendResult res;
- while (Host_->GetSendResult(&res)) {
- TTransferHash::iterator k = TransferHash_.find(res.TransferId);
- if (k != TransferHash_.end()) {
- const TTransferPurpose& tp = k->second;
- switch (tp.Dir) {
- case DIR_OUT: {
- TOutRequestHash::iterator i = OutRequests_.find(tp.Guid);
- if (i != OutRequests_.end()) {
- const TGUID& reqId = i->first;
- TOutRequestState& s = i->second;
- switch (s.State) {
- case TOutRequestState::S_SENDING:
- if (!res.Success) {
- FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_SENDING");
- } else {
- if (ReporRequestAck_ && !!s.EventsCollector) {
- s.EventsCollector->AddRequestAck(reqId);
- }
- s.State = TOutRequestState::S_WAITING;
- s.TimePassed = 0;
- }
- break;
- case TOutRequestState::S_CANCEL_AFTER_SENDING:
- DoSendCancel(s.Address, reqId);
- FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request failed: state S_CANCEL_AFTER_SENDING");
- break;
- case TOutRequestState::S_WAITING:
- case TOutRequestState::S_WAITING_PING_SENT:
- Y_ASSERT(0);
- break;
- case TOutRequestState::S_WAITING_PING_SENDING:
- Y_ASSERT(s.PingTransferId >= 0 && s.PingTransferId == res.TransferId);
- if (!res.Success) {
- FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_WAITING_PING_SENDING");
- } else {
- s.PingTransferId = -1;
- s.State = TOutRequestState::S_WAITING_PING_SENT;
- s.TimePassed = 0;
- }
- break;
- default:
- Y_ASSERT(0);
- break;
- }
- }
- } break;
- case DIR_IN: {
- TInRequestHash::iterator i = InRequests_.find(tp.Guid);
- if (i != InRequests_.end()) {
- Y_ASSERT(i->second.State == TInRequestState::S_RESPONSE_SENDING || i->second.State == TInRequestState::S_CANCELED);
- InRequests_.erase(i);
- }
- } break;
- default:
- Y_ASSERT(0);
- break;
- }
- TransferHash_.erase(k);
- }
- }
- }
- void SendPingsIfNeeded() {
- NHPTimer::STime tChk = PingsSendT_;
- float deltaT = (float)NHPTimer::GetTimePassed(&tChk);
- if (deltaT < 0.05) {
- return;
- }
- PingsSendT_ = tChk;
- deltaT = ClampVal(deltaT, 0.0f, HTTP_TIMEOUT / 3);
- {
- for (TOutRequestHash::iterator i = OutRequests_.begin(); i != OutRequests_.end();) {
- TOutRequestHash::iterator curIt = i++;
- TOutRequestState& s = curIt->second;
- const TGUID& guid = curIt->first;
- switch (s.State) {
- case TOutRequestState::S_WAITING:
- s.TimePassed += deltaT;
- if (s.TimePassed > HTTP_TIMEOUT) {
- TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
- ms->Write((char)PKT_PING);
- ms->Write(guid);
- int transId = SendWithHighPriority(s.Address, ms.Release());
- TransferHash_[transId] = TTransferPurpose(DIR_OUT, guid);
- s.State = TOutRequestState::S_WAITING_PING_SENDING;
- s.PingTransferId = transId;
- }
- break;
- case TOutRequestState::S_WAITING_PING_SENT:
- s.TimePassed += deltaT;
- if (s.TimePassed > HTTP_TIMEOUT) {
- FinishRequest(curIt, TUdpHttpResponse::FAILED, nullptr, "request failed: http timeout in state S_WAITING_PING_SENT");
- }
- break;
- default:
- break;
- }
- }
- }
- }
- void Step() {
- {
- TGuard<TSpinLock> lock(Spn_);
- DoSends();
- }
- Host_->Step();
- {
- TGuard<TSpinLock> lock(Spn_);
- DoSends();
- ProcessIncomingPackets();
- AnalyzeSendResults();
- SendPingsIfNeeded();
- }
- }
- void Wait() {
- Host_->Wait(0.1f);
- }
- void DoSendCancel(const TUdpAddress& addr, const TGUID& req) {
- TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
- ms->Write((char)PKT_CANCEL);
- ms->Write(req);
- SendWithHighPriority(addr, ms);
- }
- void DoSends() {
- {
- // cancelling requests
- TGUID reqGuid;
- while (CancelReqList_.Dequeue(&reqGuid)) {
- TOutRequestHash::iterator i = OutRequests_.find(reqGuid);
- if (i == OutRequests_.end()) {
- AnticipateCancels_.insert(reqGuid);
- continue; // cancelling non existing request is ok
- }
- TOutRequestState& s = i->second;
- if (s.State == TOutRequestState::S_SENDING) {
- // we are in trouble - have not sent request and we already have to cancel it, wait send
- s.State = TOutRequestState::S_CANCEL_AFTER_SENDING;
- s.EventsCollector->AddCancel(i->first);
- } else {
- DoSendCancel(s.Address, reqGuid);
- FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request canceled: notify requested side");
- }
- }
- }
- {
- // sending replies
- for (TSendResponse* rd = nullptr; SendRespList_.Dequeue(&rd); delete rd) {
- TInRequestHash::iterator i = InRequests_.find(rd->ReqGuid);
- if (i == InRequests_.end()) {
- Y_ASSERT(0);
- continue;
- }
- TInRequestState& s = i->second;
- if (s.State == TInRequestState::S_CANCELED) {
- // need not send response for the canceled request
- InRequests_.erase(i);
- continue;
- }
- Y_ASSERT(s.State == TInRequestState::S_WAITING);
- s.State = TInRequestState::S_RESPONSE_SENDING;
- TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
- ui32 crc32 = 0;
- int dataSize = rd->Data.ysize();
- if (rd->Data.size() > MIN_SHARED_MEM_PACKET && IsLocal(s.Address)) {
- TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
- if (shm->Create(dataSize)) {
- ms->Write((char)PKT_LOCAL_RESPONSE);
- ms->Write(rd->ReqGuid);
- memcpy(shm->GetPtr(), &rd->Data[0], dataSize);
- TVector<char> empty;
- rd->Data.swap(empty);
- ms->AttachSharedData(shm);
- crc32 = CalcChecksum(ms->GetChain());
- }
- }
- if (ms->GetSharedData() == nullptr) {
- ms->Write((char)PKT_RESPONSE);
- ms->Write(rd->ReqGuid);
- // to offload crc calcs from inner thread, crc of data[] is calced outside and passed in DataCrc32
- // this means that we are calculating crc when shared memory is used
- // it is hard to avoid since in SendResponse() we don't know if shared mem will be used
- // (peer address is not available there)
- TIncrementalChecksumCalcer csCalcer;
- AddChain(&csCalcer, ms->GetChain());
- // here we are replicating the way WriteDestructive serializes data
- csCalcer.AddBlock(&dataSize, sizeof(dataSize));
- csCalcer.AddBlockSum(rd->DataCrc32, dataSize);
- crc32 = csCalcer.CalcChecksum();
- ms->WriteDestructive(&rd->Data);
- //ui32 chkCrc = CalcChecksum(ms->GetChain()); // can not use since its slow for large responses
- //Y_ASSERT(chkCrc == crc32);
- }
- int transId = Host_->Send(s.Address, ms.Release(), crc32, nullptr, rd->Priority);
- TransferHash_[transId] = TTransferPurpose(DIR_IN, rd->ReqGuid);
- }
- }
- {
- // sending requests
- for (TSendRequest* rd = nullptr; SendReqList_.Dequeue(&rd); delete rd) {
- Y_ASSERT(OutRequests_.find(rd->ReqGuid) == OutRequests_.end());
- {
- TOutRequestState& s = OutRequests_[rd->ReqGuid];
- s.State = TOutRequestState::S_SENDING;
- s.Address = rd->Addr;
- s.EventsCollector = rd->EventsCollector;
- }
- if (AnticipateCancels_.find(rd->ReqGuid) != AnticipateCancels_.end()) {
- FinishRequest(OutRequests_.find(rd->ReqGuid), TUdpHttpResponse::CANCELED, nullptr, "Canceled (before transmit)");
- } else {
- TGUID pktGuid = rd->ReqGuid; // request packet id should match request id
- int transId = Host_->Send(rd->Addr, rd->Data.Release(), rd->Crc32, &pktGuid, PP_NORMAL);
- TransferHash_[transId] = TTransferPurpose(DIR_OUT, rd->ReqGuid);
- }
- }
- }
- if (!AnticipateCancels_.empty()) {
- AnticipateCancels_.clear();
- }
- }
- void FinishOutstandingTransactions() {
- // wait all pending requests, all new requests are canceled
- while ((!OutRequests_.empty() || !InRequests_.empty() || !SendRespList_.IsEmpty() || !SendReqList_.IsEmpty()) && !PanicAttack) {
- Step();
- sleep(0);
- }
- }
- static void* ExecServerThread(void* param) {
- TUdpHttp* pThis = (TUdpHttp*)param;
- if (pThis->GetPhysicalCpu() >= 0) {
- BindToSocket(pThis->GetPhysicalCpu());
- }
- SetHighestThreadPriority();
- TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateSocket();
- socket->Open(pThis->Port_);
- if (socket->IsValid()) {
- pThis->Port_ = socket->GetPort();
- pThis->Host_ = CreateUdpHost(socket);
- } else {
- pThis->Host_ = nullptr;
- }
- pThis->HasStarted_.Signal();
- if (!pThis->Host_)
- return nullptr;
- NHPTimer::GetTime(&pThis->PingsSendT_);
- while (AtomicGet(pThis->KeepRunning_) && !PanicAttack) {
- if (HeartbeatTimeout.load(std::memory_order_acquire) > 0) {
- NHPTimer::STime chk = LastHeartbeat.load(std::memory_order_acquire);
- if (NHPTimer::GetTimePassed(&chk) > HeartbeatTimeout.load(std::memory_order_acquire)) {
- StopAllNetLibaThreads();
- #ifndef _win_
- killpg(0, SIGKILL);
- #endif
- abort();
- break;
- }
- }
- pThis->Step();
- pThis->Wait();
- }
- if (!pThis->AbortTransactions_ && !PanicAttack) {
- pThis->FinishOutstandingTransactions();
- }
- pThis->Host_ = nullptr;
- return nullptr;
- }
- int GetPhysicalCpu() const noexcept {
- return PhysicalCpu_;
- }
- private:
- TThread MyThread_;
- TAtomic KeepRunning_ = 1;
- bool AbortTransactions_;
- TSpinLock Spn_;
- TSystemEvent HasStarted_;
- NHPTimer::STime PingsSendT_;
- TIntrusivePtr<IUdpHost> Host_;
- int Port_;
- TOutRequestHash OutRequests_;
- TInRequestHash InRequests_;
- typedef THashMap<int, TTransferPurpose> TTransferHash;
- TTransferHash TransferHash_;
- // hold it here to not construct on every DoSends()
- typedef THashSet<TGUID, TGUIDHash> TAnticipateCancels;
- TAnticipateCancels AnticipateCancels_;
- TLockFreeQueue<TSendRequest*> SendReqList_;
- TLockFreeQueue<TSendResponse*> SendRespList_;
- TLockFreeQueue<TGUID> CancelReqList_;
- TIntrusivePtr<IEventsCollector> EventCollector_;
- bool ReportRequestCancel_;
- bool ReporRequestAck_;
- int PhysicalCpu_;
- };
- IRequesterRef CreateHttpUdpRequester(int port, const IEventsCollectorRef& ec, int physicalCpu) {
- TUdpHttp* udpHttp = new TUdpHttp(ec);
- IRequesterRef res(udpHttp);
- if (!udpHttp->Start(port, physicalCpu)) {
- if (port) {
- ythrow yexception() << "netliba can't bind port=" << port;
- } else {
- ythrow yexception() << "netliba can't bind random port";
- }
- }
- return res;
- }
- }
|