grpc_io.cpp 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. #include "grpc_io.h"
  2. #include <contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h>
  3. #include <contrib/libs/grpc/src/core/lib/iomgr/exec_ctx.h>
  4. #include <contrib/libs/grpc/src/core/lib/iomgr/executor.h>
  5. #include <contrib/libs/grpc/src/core/lib/surface/completion_queue.h>
  6. #include <contrib/libs/grpc/include/grpc/impl/codegen/log.h>
  7. #include <util/generic/yexception.h>
  8. #include <util/string/cast.h>
  9. #include <util/system/env.h>
  10. #include <util/system/mutex.h>
  11. #include <util/system/thread.h>
  12. namespace NUnifiedAgent {
  13. namespace {
  14. std::once_flag GrpcConfigured{};
  15. }
  16. TGrpcNotification::TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
  17. : CompletionQueue(completionQueue)
  18. , IOCallback(std::move(ioCallback))
  19. , Completion(MakeHolder<grpc_cq_completion>())
  20. , InQueue(false)
  21. {
  22. }
  23. TGrpcNotification::~TGrpcNotification() = default;
  24. void TGrpcNotification::Trigger() {
  25. {
  26. bool inQueue = false;
  27. if (!InQueue.compare_exchange_strong(inQueue, true)) {
  28. return;
  29. }
  30. }
  31. grpc_core::ApplicationCallbackExecCtx callbackExecCtx;
  32. grpc_core::ExecCtx execCtx;
  33. IOCallback->Ref();
  34. Y_ABORT_UNLESS(grpc_cq_begin_op(CompletionQueue.cq(), this));
  35. grpc_cq_end_op(CompletionQueue.cq(), this, y_absl::OkStatus(),
  36. [](void* self, grpc_cq_completion*) {
  37. Y_ABORT_UNLESS(static_cast<TGrpcNotification*>(self)->InQueue.exchange(false));
  38. },
  39. this, Completion.Get());
  40. }
  41. bool TGrpcNotification::FinalizeResult(void** tag, bool*) {
  42. *tag = IOCallback.Get();
  43. return true;
  44. }
  45. TGrpcTimer::TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
  46. : CompletionQueue(completionQueue)
  47. , IOCallback(std::move(ioCallback))
  48. , Alarm()
  49. , AlarmIsSet(false)
  50. , NextTriggerTime(Nothing())
  51. {
  52. }
  53. void TGrpcTimer::Set(TInstant triggerTime) {
  54. if (AlarmIsSet) {
  55. NextTriggerTime = triggerTime;
  56. Alarm.Cancel();
  57. } else {
  58. AlarmIsSet = true;
  59. Alarm.Set(&CompletionQueue, InstantToTimespec(triggerTime), Ref());
  60. }
  61. }
  62. void TGrpcTimer::Cancel() {
  63. NextTriggerTime.Clear();
  64. if (AlarmIsSet) {
  65. Alarm.Cancel();
  66. }
  67. }
  68. IIOCallback* TGrpcTimer::Ref() {
  69. IOCallback->Ref();
  70. return this;
  71. }
  72. void TGrpcTimer::OnIOCompleted(EIOStatus status) {
  73. Y_ABORT_UNLESS(AlarmIsSet);
  74. if (NextTriggerTime) {
  75. Alarm.Set(&CompletionQueue, InstantToTimespec(*NextTriggerTime), this);
  76. NextTriggerTime.Clear();
  77. } else {
  78. AlarmIsSet = false;
  79. IOCallback->OnIOCompleted(status);
  80. }
  81. }
  82. TGrpcCompletionQueuePoller::TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue)
  83. : Queue(queue)
  84. , Thread()
  85. {
  86. }
  87. void TGrpcCompletionQueuePoller::Start() {
  88. Thread = std::thread([this]() {
  89. TThread::SetCurrentThreadName("ua_grpc_cq");
  90. void* tag;
  91. bool ok;
  92. while (Queue.Next(&tag, &ok)) {
  93. try {
  94. static_cast<IIOCallback*>(tag)->OnIOCompleted(ok ? EIOStatus::Ok : EIOStatus::Error);
  95. } catch (...) {
  96. Y_ABORT("unexpected exception [%s]", CurrentExceptionMessage().c_str());
  97. }
  98. }
  99. });
  100. }
  101. void TGrpcCompletionQueuePoller::Join() {
  102. Thread.join();
  103. }
  104. TGrpcCompletionQueueHost::TGrpcCompletionQueueHost()
  105. : CompletionQueue()
  106. , Poller(CompletionQueue)
  107. {
  108. }
  109. void TGrpcCompletionQueueHost::Start() {
  110. Poller.Start();
  111. }
  112. void TGrpcCompletionQueueHost::Stop() {
  113. CompletionQueue.Shutdown();
  114. Poller.Join();
  115. }
  116. gpr_timespec InstantToTimespec(TInstant instant) {
  117. gpr_timespec result;
  118. result.clock_type = GPR_CLOCK_REALTIME;
  119. result.tv_sec = static_cast<int64_t>(instant.Seconds());
  120. result.tv_nsec = instant.NanoSecondsOfSecond();
  121. return result;
  122. }
  123. void EnsureGrpcConfigured() {
  124. std::call_once(GrpcConfigured, []() {
  125. const auto limitStr = GetEnv("UA_GRPC_EXECUTOR_THREADS_LIMIT");
  126. ui64 limit;
  127. if (limitStr.empty() || !TryFromString(limitStr, limit)) {
  128. limit = 2;
  129. }
  130. grpc_core::Executor::SetThreadsLimit(limit);
  131. grpc_event_engine::experimental::ThreadPool::SetThreadsLimit(limit);
  132. });
  133. }
  134. void StartGrpcTracing() {
  135. grpc_tracer_set_enabled("all", true);
  136. gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
  137. }
  138. void FinishGrpcTracing() {
  139. grpc_tracer_set_enabled("all", false);
  140. gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
  141. }
  142. }