#pragma once #include #include #include #include #include struct grpc_cq_completion; namespace NUnifiedAgent { enum class EIOStatus { Ok, Error }; class IIOCallback { public: virtual ~IIOCallback() = default; virtual IIOCallback* Ref() = 0; virtual void OnIOCompleted(EIOStatus status) = 0; }; template class TIOCallback: public IIOCallback { public: explicit TIOCallback(TCallback&& callback, TCounter* counter) : Callback(std::move(callback)) , Counter(counter) { } IIOCallback* Ref() override { Counter->Ref(); return this; } void OnIOCompleted(EIOStatus status) override { Callback(status); Counter->UnRef(); } private: TCallback Callback; TCounter* Counter; }; template THolder MakeIOCallback(TCallback&& callback, TCounter* counter) { return MakeHolder>(std::move(callback), counter); } template THolder MakeIOCallback(TTarget* target, void (TTarget::*method)(EIOStatus), TCounter* counter = nullptr) { return MakeIOCallback([target, method](EIOStatus status) { ((*target).*method)(status); }, counter ? counter : target); } class TGrpcNotification: private ::grpc::internal::CompletionQueueTag { public: TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder&& ioCallback); ~TGrpcNotification(); void Trigger(); private: bool FinalizeResult(void** tag, bool* status) override; private: grpc::CompletionQueue& CompletionQueue; THolder IOCallback; THolder Completion; std::atomic InQueue; }; class TGrpcTimer: private IIOCallback { public: TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder&& ioCallback); void Set(TInstant triggerTime); void Cancel(); private: IIOCallback* Ref() override; void OnIOCompleted(EIOStatus status) override; private: grpc::CompletionQueue& CompletionQueue; THolder IOCallback; grpc::Alarm Alarm; bool AlarmIsSet; TFMaybe NextTriggerTime; }; class TGrpcCompletionQueuePoller { public: explicit TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue); void Start(); void Join(); private: grpc::CompletionQueue& Queue; std::thread Thread; }; class TGrpcCompletionQueueHost { public: TGrpcCompletionQueueHost(); void Start(); void Stop(); inline grpc::CompletionQueue& GetCompletionQueue() noexcept { return CompletionQueue; } private: grpc::CompletionQueue CompletionQueue; TGrpcCompletionQueuePoller Poller; }; gpr_timespec InstantToTimespec(TInstant instant); void EnsureGrpcConfigured(); void StartGrpcTracing(); void FinishGrpcTracing(); }