grpc_io.h 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. #pragma once
  2. #include <library/cpp/unified_agent_client/async_joiner.h>
  3. #include <library/cpp/unified_agent_client/f_maybe.h>
  4. #include <contrib/libs/grpc/include/grpcpp/alarm.h>
  5. #include <contrib/libs/grpc/include/grpc++/grpc++.h>
  6. #include <thread>
  7. struct grpc_cq_completion;
  8. namespace NUnifiedAgent {
  9. enum class EIOStatus {
  10. Ok,
  11. Error
  12. };
  13. class IIOCallback {
  14. public:
  15. virtual ~IIOCallback() = default;
  16. virtual IIOCallback* Ref() = 0;
  17. virtual void OnIOCompleted(EIOStatus status) = 0;
  18. };
  19. template<typename TCallback, typename TCounter>
  20. class TIOCallback: public IIOCallback {
  21. public:
  22. explicit TIOCallback(TCallback&& callback, TCounter* counter)
  23. : Callback(std::move(callback))
  24. , Counter(counter)
  25. {
  26. }
  27. IIOCallback* Ref() override {
  28. Counter->Ref();
  29. return this;
  30. }
  31. void OnIOCompleted(EIOStatus status) override {
  32. Callback(status);
  33. Counter->UnRef();
  34. }
  35. private:
  36. TCallback Callback;
  37. TCounter* Counter;
  38. };
  39. template<typename TCallback, typename TCounter>
  40. THolder<IIOCallback> MakeIOCallback(TCallback&& callback, TCounter* counter) {
  41. return MakeHolder<TIOCallback<TCallback, TCounter>>(std::move(callback), counter);
  42. }
  43. template<typename TTarget, typename TCounter = TTarget>
  44. THolder<IIOCallback> MakeIOCallback(TTarget* target, void (TTarget::*method)(EIOStatus),
  45. TCounter* counter = nullptr)
  46. {
  47. return MakeIOCallback([target, method](EIOStatus status) { ((*target).*method)(status); },
  48. counter ? counter : target);
  49. }
  50. class TGrpcNotification: private ::grpc::internal::CompletionQueueTag {
  51. public:
  52. TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback);
  53. ~TGrpcNotification();
  54. void Trigger();
  55. private:
  56. bool FinalizeResult(void** tag, bool* status) override;
  57. private:
  58. grpc::CompletionQueue& CompletionQueue;
  59. THolder<IIOCallback> IOCallback;
  60. THolder<grpc_cq_completion> Completion;
  61. std::atomic<bool> InQueue;
  62. };
  63. class TGrpcTimer: private IIOCallback {
  64. public:
  65. TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback);
  66. void Set(TInstant triggerTime);
  67. void Cancel();
  68. private:
  69. IIOCallback* Ref() override;
  70. void OnIOCompleted(EIOStatus status) override;
  71. private:
  72. grpc::CompletionQueue& CompletionQueue;
  73. THolder<IIOCallback> IOCallback;
  74. grpc::Alarm Alarm;
  75. bool AlarmIsSet;
  76. TFMaybe<TInstant> NextTriggerTime;
  77. };
  78. class TGrpcCompletionQueuePoller {
  79. public:
  80. explicit TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue);
  81. void Start();
  82. void Join();
  83. private:
  84. grpc::CompletionQueue& Queue;
  85. std::thread Thread;
  86. };
  87. class TGrpcCompletionQueueHost {
  88. public:
  89. TGrpcCompletionQueueHost();
  90. void Start();
  91. void Stop();
  92. inline grpc::CompletionQueue& GetCompletionQueue() noexcept {
  93. return CompletionQueue;
  94. }
  95. private:
  96. grpc::CompletionQueue CompletionQueue;
  97. TGrpcCompletionQueuePoller Poller;
  98. };
  99. gpr_timespec InstantToTimespec(TInstant instant);
  100. void EnsureGrpcConfigured();
  101. void StartGrpcTracing();
  102. void FinishGrpcTracing();
  103. }