grpc_io.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. #include "grpc_io.h"
  2. #include <contrib/libs/grpc/src/core/lib/iomgr/exec_ctx.h>
  3. #include <contrib/libs/grpc/src/core/lib/iomgr/executor.h>
  4. #include <contrib/libs/grpc/src/core/lib/surface/completion_queue.h>
  5. #include <contrib/libs/grpc/include/grpc/impl/codegen/log.h>
  6. #include <util/generic/yexception.h>
  7. #include <util/string/cast.h>
  8. #include <util/system/env.h>
  9. #include <util/system/mutex.h>
  10. #include <util/system/thread.h>
  11. namespace NUnifiedAgent {
  12. namespace {
  13. std::once_flag GrpcConfigured{};
  14. }
  15. TGrpcNotification::TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
  16. : CompletionQueue(completionQueue)
  17. , IOCallback(std::move(ioCallback))
  18. , Completion(MakeHolder<grpc_cq_completion>())
  19. , InQueue(false)
  20. {
  21. }
  22. TGrpcNotification::~TGrpcNotification() = default;
  23. void TGrpcNotification::Trigger() {
  24. {
  25. bool inQueue = false;
  26. if (!InQueue.compare_exchange_strong(inQueue, true)) {
  27. return;
  28. }
  29. }
  30. grpc_core::ApplicationCallbackExecCtx callbackExecCtx;
  31. grpc_core::ExecCtx execCtx;
  32. IOCallback->Ref();
  33. Y_ABORT_UNLESS(grpc_cq_begin_op(CompletionQueue.cq(), this));
  34. grpc_cq_end_op(CompletionQueue.cq(), this, y_absl::OkStatus(),
  35. [](void* self, grpc_cq_completion*) {
  36. Y_ABORT_UNLESS(static_cast<TGrpcNotification*>(self)->InQueue.exchange(false));
  37. },
  38. this, Completion.Get());
  39. }
  40. bool TGrpcNotification::FinalizeResult(void** tag, bool*) {
  41. *tag = IOCallback.Get();
  42. return true;
  43. }
  44. TGrpcTimer::TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
  45. : CompletionQueue(completionQueue)
  46. , IOCallback(std::move(ioCallback))
  47. , Alarm()
  48. , AlarmIsSet(false)
  49. , NextTriggerTime(Nothing())
  50. {
  51. }
  52. void TGrpcTimer::Set(TInstant triggerTime) {
  53. if (AlarmIsSet) {
  54. NextTriggerTime = triggerTime;
  55. Alarm.Cancel();
  56. } else {
  57. AlarmIsSet = true;
  58. Alarm.Set(&CompletionQueue, InstantToTimespec(triggerTime), Ref());
  59. }
  60. }
  61. void TGrpcTimer::Cancel() {
  62. NextTriggerTime.Clear();
  63. if (AlarmIsSet) {
  64. Alarm.Cancel();
  65. }
  66. }
  67. IIOCallback* TGrpcTimer::Ref() {
  68. IOCallback->Ref();
  69. return this;
  70. }
  71. void TGrpcTimer::OnIOCompleted(EIOStatus status) {
  72. Y_ABORT_UNLESS(AlarmIsSet);
  73. if (NextTriggerTime) {
  74. Alarm.Set(&CompletionQueue, InstantToTimespec(*NextTriggerTime), this);
  75. NextTriggerTime.Clear();
  76. } else {
  77. AlarmIsSet = false;
  78. IOCallback->OnIOCompleted(status);
  79. }
  80. }
  81. TGrpcCompletionQueuePoller::TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue)
  82. : Queue(queue)
  83. , Thread()
  84. {
  85. }
  86. void TGrpcCompletionQueuePoller::Start() {
  87. Thread = std::thread([this]() {
  88. TThread::SetCurrentThreadName("ua_grpc_cq");
  89. void* tag;
  90. bool ok;
  91. while (Queue.Next(&tag, &ok)) {
  92. try {
  93. static_cast<IIOCallback*>(tag)->OnIOCompleted(ok ? EIOStatus::Ok : EIOStatus::Error);
  94. } catch (...) {
  95. Y_ABORT("unexpected exception [%s]", CurrentExceptionMessage().c_str());
  96. }
  97. }
  98. });
  99. }
  100. void TGrpcCompletionQueuePoller::Join() {
  101. Thread.join();
  102. }
  103. TGrpcCompletionQueueHost::TGrpcCompletionQueueHost()
  104. : CompletionQueue()
  105. , Poller(CompletionQueue)
  106. {
  107. }
  108. void TGrpcCompletionQueueHost::Start() {
  109. Poller.Start();
  110. }
  111. void TGrpcCompletionQueueHost::Stop() {
  112. CompletionQueue.Shutdown();
  113. Poller.Join();
  114. }
  115. gpr_timespec InstantToTimespec(TInstant instant) {
  116. gpr_timespec result;
  117. result.clock_type = GPR_CLOCK_REALTIME;
  118. result.tv_sec = static_cast<int64_t>(instant.Seconds());
  119. result.tv_nsec = instant.NanoSecondsOfSecond();
  120. return result;
  121. }
  122. void EnsureGrpcConfigured() {
  123. std::call_once(GrpcConfigured, []() {
  124. const auto limitStr = GetEnv("UA_GRPC_EXECUTOR_THREADS_LIMIT");
  125. ui64 limit;
  126. if (limitStr.Empty() || !TryFromString(limitStr, limit)) {
  127. limit = 2;
  128. }
  129. grpc_core::Executor::SetThreadsLimit(limit);
  130. });
  131. }
  132. void StartGrpcTracing() {
  133. grpc_tracer_set_enabled("all", true);
  134. gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
  135. }
  136. void FinishGrpcTracing() {
  137. grpc_tracer_set_enabled("all", false);
  138. gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
  139. }
  140. }