123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- #pragma once
- #include "wfmo.h"
- #include "stat.h"
- #include <library/cpp/http/io/headers.h>
- #include <util/generic/ptr.h>
- #include <util/generic/string.h>
- #include <util/datetime/base.h>
- #include <utility>
- namespace NNeh {
- struct TMessage {
- TMessage() = default;
- inline TMessage(TString addr, TString data)
- : Addr(std::move(addr))
- , Data(std::move(data))
- {
- }
- static TMessage FromString(TStringBuf request);
- TString Addr;
- TString Data;
- };
- using TMessageRef = TAutoPtr<TMessage>;
- struct TError {
- public:
- enum TType {
- UnknownType,
- Cancelled,
- ProtocolSpecific
- };
- TError(TString text, TType type = UnknownType, i32 code = 0, i32 systemCode = 0)
- : Type(std::move(type))
- , Code(code)
- , Text(text)
- , SystemCode(systemCode)
- {
- }
- TType Type = UnknownType;
- i32 Code = 0; // protocol specific code (example(http): 404)
- TString Text;
- i32 SystemCode = 0; // system error code
- };
- using TErrorRef = TAutoPtr<TError>;
- struct TResponse;
- using TResponseRef = TAutoPtr<TResponse>;
- struct TResponse {
- inline TResponse(TMessage req,
- TString data,
- const TDuration duration)
- : TResponse(std::move(req), std::move(data), duration, {} /* firstLine */, {} /* headers */, {} /* error */)
- {
- }
- inline TResponse(TMessage req,
- TString data,
- const TDuration duration,
- TString firstLine,
- THttpHeaders headers)
- : TResponse(std::move(req), std::move(data), duration, std::move(firstLine), std::move(headers), {} /* error */)
- {
- }
- inline TResponse(TMessage req,
- TString data,
- const TDuration duration,
- TString firstLine,
- THttpHeaders headers,
- TErrorRef error)
- : Request(std::move(req))
- , Data(std::move(data))
- , Duration(duration)
- , FirstLine(std::move(firstLine))
- , Headers(std::move(headers))
- , Error_(std::move(error))
- {
- }
- inline static TResponseRef FromErrorText(TMessage msg, TString error, const TDuration duration) {
- return new TResponse(std::move(msg), {} /* data */, duration, {} /* firstLine */, {} /* headers */, new TError(std::move(error)));
- }
- inline static TResponseRef FromError(TMessage msg, TErrorRef error, const TDuration duration) {
- return new TResponse(std::move(msg), {} /* data */, duration, {} /* firstLine */, {} /* headers */, error);
- }
- inline static TResponseRef FromError(TMessage msg, TErrorRef error, const TDuration duration,
- TString data, TString firstLine, THttpHeaders headers)
- {
- return new TResponse(std::move(msg), std::move(data), duration, std::move(firstLine), std::move(headers), error);
- }
- inline static TResponseRef FromError(
- TMessage msg,
- TErrorRef error,
- TString data,
- const TDuration duration,
- TString firstLine,
- THttpHeaders headers)
- {
- return new TResponse(std::move(msg), std::move(data), duration, std::move(firstLine), std::move(headers), error);
- }
- inline bool IsError() const {
- return Error_.Get();
- }
- inline TError::TType GetErrorType() const {
- return Error_.Get() ? Error_->Type : TError::UnknownType;
- }
- inline i32 GetErrorCode() const {
- return Error_.Get() ? Error_->Code : 0;
- }
- inline i32 GetSystemErrorCode() const {
- return Error_.Get() ? Error_->SystemCode : 0;
- }
- inline TString GetErrorText() const {
- return Error_.Get() ? Error_->Text : TString();
- }
- const TMessage Request;
- const TString Data;
- const TDuration Duration;
- const TString FirstLine;
- THttpHeaders Headers;
- private:
- THolder<TError> Error_;
- };
- class THandle;
- class IOnRecv {
- public:
- virtual ~IOnRecv() = default;
- virtual void OnNotify(THandle&) {
- } //callback on receive response
- virtual void OnEnd() {
- } //response was extracted by Wait() method, - OnRecv() will not be called
- virtual void OnRecv(THandle& resp) = 0; //callback on destroy handler
- };
- class THandle: public TThrRefBase, public TWaitHandle {
- public:
- inline THandle(IOnRecv* f, TStatCollector* s = nullptr) noexcept
- : F_(f)
- , Stat_(s)
- {
- }
- ~THandle() override {
- if (F_) {
- try {
- F_->OnRecv(*this);
- } catch (...) {
- }
- }
- }
- virtual bool MessageSendedCompletely() const noexcept {
- //TODO
- return true;
- }
- virtual void Cancel() noexcept {
- //TODO
- if (!!Stat_)
- Stat_->OnCancel();
- }
- inline const TResponse* Response() const noexcept {
- return R_.Get();
- }
- //method MUST be called only after success Wait() for this handle or from callback IOnRecv::OnRecv()
- //else exist chance for memory leak (race between Get()/Notify())
- inline TResponseRef Get() noexcept {
- return R_;
- }
- inline bool Wait(TResponseRef& msg, const TInstant deadLine) {
- if (WaitForOne(*this, deadLine)) {
- if (F_) {
- F_->OnEnd();
- F_ = nullptr;
- }
- msg = Get();
- return true;
- }
- return false;
- }
- inline bool Wait(TResponseRef& msg, const TDuration timeOut) {
- return Wait(msg, timeOut.ToDeadLine());
- }
- inline bool Wait(TResponseRef& msg) {
- return Wait(msg, TInstant::Max());
- }
- inline TResponseRef Wait(const TInstant deadLine) {
- TResponseRef ret;
- Wait(ret, deadLine);
- return ret;
- }
- inline TResponseRef Wait(const TDuration timeOut) {
- return Wait(timeOut.ToDeadLine());
- }
- inline TResponseRef Wait() {
- return Wait(TInstant::Max());
- }
- protected:
- inline void Notify(TResponseRef resp) {
- if (!!Stat_) {
- if (!resp || resp->IsError()) {
- Stat_->OnFail();
- } else {
- Stat_->OnSuccess();
- }
- }
- R_.Swap(resp);
- if (F_) {
- try {
- F_->OnNotify(*this);
- } catch (...) {
- }
- }
- Signal();
- }
- IOnRecv* F_;
- private:
- TResponseRef R_;
- THolder<TStatCollector> Stat_;
- };
- using THandleRef = TIntrusivePtr<THandle>;
- THandleRef Request(const TMessage& msg, IOnRecv* fallback);
- inline THandleRef Request(const TMessage& msg) {
- return Request(msg, nullptr);
- }
- THandleRef Request(const TString& req, IOnRecv* fallback);
- inline THandleRef Request(const TString& req) {
- return Request(req, nullptr);
- }
- class IMultiRequester {
- public:
- virtual ~IMultiRequester() = default;
- virtual void Add(const THandleRef& req) = 0;
- virtual void Del(const THandleRef& req) = 0;
- virtual bool Wait(THandleRef& req, TInstant deadLine) = 0;
- virtual bool IsEmpty() const = 0;
- inline void Schedule(const TString& req) {
- Add(Request(req));
- }
- inline bool Wait(THandleRef& req, TDuration timeOut) {
- return Wait(req, timeOut.ToDeadLine());
- }
- inline bool Wait(THandleRef& req) {
- return Wait(req, TInstant::Max());
- }
- inline bool Wait(TResponseRef& resp, TInstant deadLine) {
- THandleRef req;
- while (Wait(req, deadLine)) {
- resp = req->Get();
- if (!!resp) {
- return true;
- }
- }
- return false;
- }
- inline bool Wait(TResponseRef& resp) {
- return Wait(resp, TInstant::Max());
- }
- };
- using IMultiRequesterRef = TAutoPtr<IMultiRequester>;
- IMultiRequesterRef CreateRequester();
- bool SetProtocolOption(TStringBuf protoOption, TStringBuf value);
- }
|