grpc_server.cpp 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. #include "grpc_server.h"
  2. #include <util/string/join.h>
  3. #include <util/generic/yexception.h>
  4. #include <util/system/thread.h>
  5. #include <util/generic/map.h>
  6. #include <grpc++/resource_quota.h>
  7. #include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
  8. #if !defined(_WIN32) && !defined(_WIN64)
  9. #include <sys/socket.h>
  10. #include <netinet/in.h>
  11. #include <netinet/tcp.h>
  12. #endif
  13. namespace NGrpc {
  14. using NThreading::TFuture;
  15. static void PullEvents(grpc::ServerCompletionQueue* cq) {
  16. TThread::SetCurrentThreadName("grpc_server");
  17. while (true) {
  18. void* tag; // uniquely identifies a request.
  19. bool ok;
  20. if (cq->Next(&tag, &ok)) {
  21. IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
  22. if (!ev->Execute(ok)) {
  23. ev->DestroyRequest();
  24. }
  25. } else {
  26. break;
  27. }
  28. }
  29. }
  30. TGRpcServer::TGRpcServer(const TServerOptions& opts)
  31. : Options_(opts)
  32. , Limiter_(Options_.MaxGlobalRequestInFlight)
  33. {}
  34. TGRpcServer::~TGRpcServer() {
  35. Y_ABORT_UNLESS(Ts.empty());
  36. Services_.clear();
  37. }
  38. void TGRpcServer::AddService(IGRpcServicePtr service) {
  39. Services_.push_back(service);
  40. }
  41. void TGRpcServer::Start() {
  42. TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695
  43. using grpc::ServerBuilder;
  44. using grpc::ResourceQuota;
  45. ServerBuilder builder;
  46. auto credentials = grpc::InsecureServerCredentials();
  47. if (Options_.SslData) {
  48. grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;
  49. keycert.cert_chain = std::move(Options_.SslData->Cert);
  50. keycert.private_key = std::move(Options_.SslData->Key);
  51. grpc::SslServerCredentialsOptions sslOps;
  52. sslOps.pem_root_certs = std::move(Options_.SslData->Root);
  53. sslOps.pem_key_cert_pairs.push_back(keycert);
  54. if (Options_.SslData->DoRequestClientCertificate) {
  55. sslOps.client_certificate_request = GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY;
  56. }
  57. credentials = grpc::SslServerCredentials(sslOps);
  58. }
  59. if (Options_.ExternalListener) {
  60. Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor(
  61. ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD,
  62. credentials
  63. ));
  64. } else {
  65. builder.AddListeningPort(server_address, credentials);
  66. }
  67. builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
  68. builder.SetMaxSendMessageSize(Options_.MaxMessageSize);
  69. for (IGRpcServicePtr service : Services_) {
  70. service->SetServerOptions(Options_);
  71. builder.RegisterService(service->GetService());
  72. service->SetGlobalLimiterHandle(&Limiter_);
  73. }
  74. class TKeepAliveOption: public grpc::ServerBuilderOption {
  75. public:
  76. TKeepAliveOption(int idle, int interval)
  77. : Idle(idle)
  78. , Interval(interval)
  79. , KeepAliveEnabled(true)
  80. {}
  81. TKeepAliveOption()
  82. : Idle(0)
  83. , Interval(0)
  84. , KeepAliveEnabled(false)
  85. {}
  86. void UpdateArguments(grpc::ChannelArguments *args) override {
  87. args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);
  88. args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
  89. if (KeepAliveEnabled) {
  90. args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
  91. args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
  92. args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000);
  93. args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000);
  94. args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000);
  95. }
  96. }
  97. void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
  98. {}
  99. private:
  100. const int Idle;
  101. const int Interval;
  102. const bool KeepAliveEnabled;
  103. };
  104. if (Options_.KeepAliveEnable) {
  105. builder.SetOption(std::make_unique<TKeepAliveOption>(
  106. Options_.KeepAliveIdleTimeoutTriggerSec,
  107. Options_.KeepAliveProbeIntervalSec));
  108. } else {
  109. builder.SetOption(std::make_unique<TKeepAliveOption>());
  110. }
  111. size_t completionQueueCount = 1;
  112. if (Options_.WorkersPerCompletionQueue) {
  113. size_t threadsPerQueue = Max(std::size_t{1}, Options_.WorkersPerCompletionQueue);
  114. completionQueueCount = (Options_.WorkerThreads + threadsPerQueue - 1) / threadsPerQueue; // ceiling
  115. } else if (Options_.UseCompletionQueuePerThread) {
  116. completionQueueCount = Options_.WorkerThreads;
  117. }
  118. CQS_.reserve(completionQueueCount);
  119. for (size_t i = 0; i < completionQueueCount; ++i) {
  120. CQS_.push_back(builder.AddCompletionQueue());
  121. }
  122. if (Options_.GRpcMemoryQuotaBytes) {
  123. // See details KIKIMR-6932
  124. if (Options_.EnableGRpcMemoryQuota) {
  125. grpc::ResourceQuota quota("memory_bound");
  126. quota.Resize(Options_.GRpcMemoryQuotaBytes);
  127. builder.SetResourceQuota(quota);
  128. Cerr << "Set GRpc memory quota to: " << Options_.GRpcMemoryQuotaBytes << Endl;
  129. } else {
  130. Cerr << "GRpc memory quota was set but disabled due to issues with grpc quoter"
  131. ", to enable it use EnableGRpcMemoryQuota option" << Endl;
  132. }
  133. }
  134. Options_.ServerBuilderMutator(builder);
  135. builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel);
  136. Server_ = builder.BuildAndStart();
  137. if (!Server_) {
  138. ythrow yexception() << "can't start grpc server on " << server_address;
  139. }
  140. size_t index = 0;
  141. for (IGRpcServicePtr service : Services_) {
  142. // TODO: provide something else for services instead of ServerCompletionQueue
  143. service->InitService(CQS_, Options_.Logger, index++);
  144. }
  145. Ts.reserve(Options_.WorkerThreads);
  146. for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
  147. auto* cq = &CQS_[i % CQS_.size()];
  148. Ts.push_back(SystemThreadFactory()->Run([cq] {
  149. PullEvents(cq->get());
  150. }));
  151. }
  152. if (Options_.ExternalListener) {
  153. Options_.ExternalListener->Start();
  154. }
  155. }
  156. void TGRpcServer::Stop() {
  157. for (auto& service : Services_) {
  158. service->StopService();
  159. }
  160. auto now = TInstant::Now();
  161. if (Server_) {
  162. i64 sec = Options_.GRpcShutdownDeadline.Seconds();
  163. Y_ABORT_UNLESS(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
  164. i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
  165. Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
  166. }
  167. for (ui64 attempt = 0; ; ++attempt) {
  168. bool unsafe = false;
  169. size_t infly = 0;
  170. for (auto& service : Services_) {
  171. unsafe |= service->IsUnsafeToShutdown();
  172. infly += service->RequestsInProgress();
  173. }
  174. if (!unsafe && !infly)
  175. break;
  176. auto spent = (TInstant::Now() - now).SecondsFloat();
  177. if (attempt % 300 == 0) {
  178. // don't log too much
  179. Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl;
  180. }
  181. if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat())
  182. break;
  183. Sleep(TDuration::MilliSeconds(10));
  184. }
  185. // Always shutdown the completion queue after the server.
  186. for (auto& cq : CQS_) {
  187. cq->Shutdown();
  188. }
  189. for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
  190. (*ti)->Join();
  191. }
  192. Ts.clear();
  193. if (Options_.ExternalListener) {
  194. Options_.ExternalListener->Stop();
  195. }
  196. }
  197. ui16 TGRpcServer::GetPort() const {
  198. return Options_.Port;
  199. }
  200. TString TGRpcServer::GetHost() const {
  201. return Options_.Host;
  202. }
  203. const TVector<TGRpcServer::IGRpcServicePtr>& TGRpcServer::GetServices() const {
  204. return Services_;
  205. }
  206. } // namespace NGrpc