udp_http.h 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. #pragma once
  2. #include "udp_address.h"
  3. #include "udp_debug.h"
  4. #include "net_queue_stat.h"
  5. #include <util/network/init.h>
  6. #include <util/generic/ptr.h>
  7. #include <util/generic/guid.h>
  8. #include <library/cpp/threading/mux_event/mux_event.h>
  9. #include <library/cpp/netliba/socket/socket.h>
  10. namespace NNetliba {
  11. const ui64 MAX_PACKET_SIZE = 0x70000000;
  12. struct TRequest;
  13. struct TUdpHttpRequest {
  14. TAutoPtr<TRequest> DataHolder;
  15. TGUID ReqId;
  16. TString Url;
  17. TUdpAddress PeerAddress;
  18. TVector<char> Data;
  19. ~TUdpHttpRequest();
  20. };
  21. struct TUdpHttpResponse {
  22. enum EResult {
  23. FAILED = 0,
  24. OK = 1,
  25. CANCELED = 2
  26. };
  27. TAutoPtr<TRequest> DataHolder;
  28. TGUID ReqId;
  29. TUdpAddress PeerAddress;
  30. TVector<char> Data;
  31. EResult Ok;
  32. TString Error;
  33. ~TUdpHttpResponse();
  34. };
  35. // vector<char> *data - vector will be cleared upon call
  36. struct IRequestOps: public TThrRefBase {
  37. class TWaitResponse: public TThrRefBase, public TNonCopyable {
  38. TGUID ReqId;
  39. TMuxEvent CompleteEvent;
  40. TUdpHttpResponse* Response;
  41. bool RequestSent;
  42. ~TWaitResponse() override {
  43. delete GetResponse();
  44. }
  45. public:
  46. TWaitResponse()
  47. : Response(nullptr)
  48. , RequestSent(false)
  49. {
  50. }
  51. void Wait() {
  52. CompleteEvent.Wait();
  53. }
  54. bool Wait(int ms) {
  55. return CompleteEvent.Wait(ms);
  56. }
  57. TUdpHttpResponse* GetResponse();
  58. bool IsRequestSent() const {
  59. return RequestSent;
  60. }
  61. void SetResponse(TUdpHttpResponse* r);
  62. void SetReqId(const TGUID& reqId) {
  63. ReqId = reqId;
  64. }
  65. const TGUID& GetReqId() {
  66. return ReqId;
  67. }
  68. void SetRequestSent() {
  69. RequestSent = true;
  70. }
  71. };
  72. // async
  73. virtual void SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId) = 0;
  74. TGUID SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) {
  75. TGUID reqId;
  76. CreateGuid(&reqId);
  77. SendRequest(addr, url, data, reqId);
  78. return reqId;
  79. }
  80. virtual void CancelRequest(const TGUID& reqId) = 0; //cancel request from requester side
  81. virtual void BreakRequest(const TGUID& reqId) = 0; //break request-response from requester side
  82. virtual void SendResponse(const TGUID& reqId, TVector<char>* data) = 0;
  83. virtual void SendResponseLowPriority(const TGUID& reqId, TVector<char>* data) = 0;
  84. virtual TUdpHttpRequest* GetRequest() = 0;
  85. virtual TUdpHttpResponse* GetResponse() = 0;
  86. virtual bool GetRequestCancel(TGUID* req) = 0;
  87. virtual bool GetSendRequestAcc(TGUID* req) = 0;
  88. // sync mode
  89. virtual TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector<char>* data) = 0;
  90. virtual TIntrusivePtr<TWaitResponse> WaitableRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) = 0;
  91. //
  92. virtual TMuxEvent& GetAsyncEvent() = 0;
  93. };
  94. struct IRequester: public IRequestOps {
  95. virtual int GetPort() = 0;
  96. virtual void StopNoWait() = 0;
  97. virtual TUdpAddress GetPeerAddress(const TGUID& reqId) = 0;
  98. virtual void GetPendingDataSize(TRequesterPendingDataStats* res) = 0;
  99. virtual bool HasRequest(const TGUID& reqId) = 0;
  100. virtual TString GetDebugInfo() = 0;
  101. virtual void GetRequestQueueSize(TRequesterQueueStats* res) = 0;
  102. virtual IRequestOps* CreateSubRequester() = 0;
  103. virtual void EnableReportRequestCancel() = 0;
  104. virtual void EnableReportSendRequestAcc() = 0;
  105. virtual TIntrusivePtr<IPeerQueueStats> GetQueueStats(const TUdpAddress& addr) = 0;
  106. ui64 GetPendingDataSize() {
  107. TRequesterPendingDataStats pds;
  108. GetPendingDataSize(&pds);
  109. return pds.InpDataSize + pds.OutDataSize;
  110. }
  111. };
  112. IRequester* CreateHttpUdpRequester(int port);
  113. IRequester* CreateHttpUdpRequester(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket);
  114. void SetUdpMaxBandwidthPerIP(float f);
  115. void SetUdpSlowStart(bool enable);
  116. void SetCongCtrlChannelInflate(float inflate);
  117. void EnableUseTOSforAcks(bool enable);
  118. void EnableROCE(bool f);
  119. void AbortOnFailedRequest(TUdpHttpResponse* answer);
  120. TString GetDebugInfo(const TUdpAddress& addr, double timeout = 60);
  121. void Kill(const TUdpAddress& addr);
  122. void StopAllNetLibaThreads();
  123. // if heartbeat timeout is set and NetLibaHeartbeat() is not called for timeoutSec
  124. // then StopAllNetLibaThreads() will be called
  125. void SetNetLibaHeartbeatTimeout(double timeoutSec);
  126. void NetLibaHeartbeat();
  127. bool IsLocal(const TUdpAddress& addr);
  128. }