grpc_server.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. #pragma once
  2. #include "grpc_request_base.h"
  3. #include "logger.h"
  4. #include <library/cpp/threading/future/future.h>
  5. #include <util/generic/ptr.h>
  6. #include <util/generic/string.h>
  7. #include <util/generic/vector.h>
  8. #include <util/generic/maybe.h>
  9. #include <util/generic/queue.h>
  10. #include <util/generic/hash_set.h>
  11. #include <util/system/types.h>
  12. #include <util/system/mutex.h>
  13. #include <util/thread/factory.h>
  14. #include <grpc++/grpc++.h>
  15. namespace NGrpc {
  16. constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
  17. struct TSslData {
  18. TString Cert;
  19. TString Key;
  20. TString Root;
  21. bool DoRequestClientCertificate = false;
  22. };
  23. struct IExternalListener
  24. : public TThrRefBase
  25. {
  26. using TPtr = TIntrusivePtr<IExternalListener>;
  27. virtual void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) = 0;
  28. virtual void Start() = 0;
  29. virtual void Stop() = 0;
  30. };
  31. //! Server's options.
  32. struct TServerOptions {
  33. #define DECLARE_FIELD(name, type, default) \
  34. type name{default}; \
  35. inline TServerOptions& Set##name(const type& value) { \
  36. name = value; \
  37. return *this; \
  38. }
  39. //! Hostname of server to bind to.
  40. DECLARE_FIELD(Host, TString, "[::]");
  41. //! Service port.
  42. DECLARE_FIELD(Port, ui16, 0);
  43. //! Number of worker threads.
  44. DECLARE_FIELD(WorkerThreads, size_t, 2);
  45. //! Number of workers per completion queue, i.e. when
  46. // WorkerThreads=8 and PriorityWorkersPerCompletionQueue=2
  47. // there will be 4 completion queues. When set to 0 then
  48. // only UseCompletionQueuePerThread affects number of CQ.
  49. DECLARE_FIELD(WorkersPerCompletionQueue, size_t, 0);
  50. //! Obsolete. Create one completion queue per thread.
  51. // Setting true equals to the WorkersPerCompletionQueue=1
  52. DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
  53. //! Memory quota size for grpc server in bytes. Zero means unlimited.
  54. DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0);
  55. //! Enable Grpc memory quota feature.
  56. DECLARE_FIELD(EnableGRpcMemoryQuota, bool, false);
  57. //! How long to wait until pending rpcs are forcefully terminated.
  58. DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30));
  59. //! In/Out message size limit
  60. DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT);
  61. //! Use GRpc keepalive
  62. DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>());
  63. //! GRPC_ARG_KEEPALIVE_TIME_MS setting
  64. DECLARE_FIELD(KeepAliveIdleTimeoutTriggerSec, int, 0);
  65. //! Deprecated, ths option ignored. Will be removed soon.
  66. DECLARE_FIELD(KeepAliveMaxProbeCount, int, 0);
  67. //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting
  68. DECLARE_FIELD(KeepAliveProbeIntervalSec, int, 0);
  69. //! Max number of requests processing by services (global limit for grpc server)
  70. DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000);
  71. //! SSL server data
  72. DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>());
  73. //! GRPC auth
  74. DECLARE_FIELD(UseAuth, bool, false);
  75. //! Default compression level. Used when no compression options provided by client.
  76. // Mapping to particular compression algorithm depends on client.
  77. DECLARE_FIELD(DefaultCompressionLevel, grpc_compression_level, GRPC_COMPRESS_LEVEL_NONE);
  78. //! Custom configurator for ServerBuilder.
  79. DECLARE_FIELD(ServerBuilderMutator, std::function<void(grpc::ServerBuilder&)>, [](grpc::ServerBuilder&){});
  80. DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr);
  81. //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).
  82. DECLARE_FIELD(Logger, TLoggerPtr, nullptr);
  83. #undef DECLARE_FIELD
  84. };
  85. class IQueueEvent {
  86. public:
  87. virtual ~IQueueEvent() = default;
  88. //! Execute an action defined by implementation.
  89. virtual bool Execute(bool ok) = 0;
  90. //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also
  91. // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does
  92. // not implement in flight management.
  93. virtual void Process() {}
  94. //! Finish and destroy request.
  95. virtual void DestroyRequest() = 0;
  96. };
  97. class ICancelableContext {
  98. public:
  99. virtual void Shutdown() = 0;
  100. virtual ~ICancelableContext() = default;
  101. private:
  102. template<class T>
  103. friend class TGrpcServiceBase;
  104. // Shard assigned by RegisterRequestCtx. This field is not thread-safe
  105. // because RegisterRequestCtx may only be called once for a single service,
  106. // so it's only assigned once.
  107. size_t ShardIndex = size_t(-1);
  108. };
  109. template <class TLimit>
  110. class TInFlightLimiterImpl {
  111. public:
  112. explicit TInFlightLimiterImpl(const TLimit& limit)
  113. : Limit_(limit)
  114. {}
  115. bool Inc() {
  116. i64 newVal;
  117. i64 prev;
  118. do {
  119. prev = AtomicGet(CurInFlightReqs_);
  120. Y_ABORT_UNLESS(prev >= 0);
  121. if (Limit_ && prev > Limit_) {
  122. return false;
  123. }
  124. newVal = prev + 1;
  125. } while (!AtomicCas(&CurInFlightReqs_, newVal, prev));
  126. return true;
  127. }
  128. void Dec() {
  129. i64 newVal = AtomicDecrement(CurInFlightReqs_);
  130. Y_ABORT_UNLESS(newVal >= 0);
  131. }
  132. i64 GetCurrentInFlight() const {
  133. return AtomicGet(CurInFlightReqs_);
  134. }
  135. private:
  136. const TLimit Limit_;
  137. TAtomic CurInFlightReqs_ = 0;
  138. };
  139. using TGlobalLimiter = TInFlightLimiterImpl<i64>;
  140. class IGRpcService: public TThrRefBase {
  141. public:
  142. virtual grpc::Service* GetService() = 0;
  143. virtual void StopService() noexcept = 0;
  144. virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;
  145. virtual void InitService(
  146. const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs,
  147. TLoggerPtr logger,
  148. size_t index)
  149. {
  150. InitService(cqs[index % cqs.size()].get(), logger);
  151. }
  152. virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
  153. virtual bool IsUnsafeToShutdown() const = 0;
  154. virtual size_t RequestsInProgress() const = 0;
  155. /**
  156. * Called before service is added to the server builder. This allows
  157. * service to inspect server options and initialize accordingly.
  158. */
  159. virtual void SetServerOptions(const TServerOptions& options) = 0;
  160. };
  161. template<typename T>
  162. class TGrpcServiceBase: public IGRpcService {
  163. public:
  164. class TShutdownGuard {
  165. using TOwner = TGrpcServiceBase<T>;
  166. friend class TGrpcServiceBase<T>;
  167. public:
  168. TShutdownGuard()
  169. : Owner(nullptr)
  170. { }
  171. ~TShutdownGuard() {
  172. Release();
  173. }
  174. TShutdownGuard(TShutdownGuard&& other)
  175. : Owner(other.Owner)
  176. {
  177. other.Owner = nullptr;
  178. }
  179. TShutdownGuard& operator=(TShutdownGuard&& other) {
  180. if (Y_LIKELY(this != &other)) {
  181. Release();
  182. Owner = other.Owner;
  183. other.Owner = nullptr;
  184. }
  185. return *this;
  186. }
  187. explicit operator bool() const {
  188. return bool(Owner);
  189. }
  190. void Release() {
  191. if (Owner) {
  192. AtomicDecrement(Owner->GuardCount_);
  193. Owner = nullptr;
  194. }
  195. }
  196. TShutdownGuard(const TShutdownGuard&) = delete;
  197. TShutdownGuard& operator=(const TShutdownGuard&) = delete;
  198. private:
  199. explicit TShutdownGuard(TOwner* owner)
  200. : Owner(owner)
  201. { }
  202. private:
  203. TOwner* Owner;
  204. };
  205. public:
  206. using TCurrentGRpcService = T;
  207. void StopService() noexcept override {
  208. AtomicSet(ShuttingDown_, 1);
  209. for (auto& shard : Shards_) {
  210. with_lock(shard.Lock_) {
  211. // Send TryCansel to event (can be send after finishing).
  212. // Actual dtors will be called from grpc thread, so deadlock impossible
  213. for (auto* request : shard.Requests_) {
  214. request->Shutdown();
  215. }
  216. }
  217. }
  218. }
  219. TShutdownGuard ProtectShutdown() noexcept {
  220. AtomicIncrement(GuardCount_);
  221. if (IsShuttingDown()) {
  222. AtomicDecrement(GuardCount_);
  223. return { };
  224. }
  225. return TShutdownGuard(this);
  226. }
  227. bool IsUnsafeToShutdown() const override {
  228. return AtomicGet(GuardCount_) > 0;
  229. }
  230. size_t RequestsInProgress() const override {
  231. size_t c = 0;
  232. for (auto& shard : Shards_) {
  233. with_lock(shard.Lock_) {
  234. c += shard.Requests_.size();
  235. }
  236. }
  237. return c;
  238. }
  239. void SetServerOptions(const TServerOptions& options) override {
  240. SslServer_ = bool(options.SslData);
  241. NeedAuth_ = options.UseAuth;
  242. }
  243. void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {}
  244. //! Check if the server is going to shut down.
  245. bool IsShuttingDown() const {
  246. return AtomicGet(ShuttingDown_);
  247. }
  248. bool SslServer() const {
  249. return SslServer_;
  250. }
  251. bool NeedAuth() const {
  252. return NeedAuth_;
  253. }
  254. bool RegisterRequestCtx(ICancelableContext* req) {
  255. if (Y_LIKELY(req->ShardIndex == size_t(-1))) {
  256. req->ShardIndex = NextShard_.fetch_add(1, std::memory_order_relaxed) % Shards_.size();
  257. }
  258. auto& shard = Shards_[req->ShardIndex];
  259. with_lock(shard.Lock_) {
  260. if (IsShuttingDown()) {
  261. return false;
  262. }
  263. auto r = shard.Requests_.emplace(req);
  264. Y_ABORT_UNLESS(r.second, "Ctx already registered");
  265. }
  266. return true;
  267. }
  268. void DeregisterRequestCtx(ICancelableContext* req) {
  269. Y_ABORT_UNLESS(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index");
  270. auto& shard = Shards_[req->ShardIndex];
  271. with_lock(shard.Lock_) {
  272. Y_ABORT_UNLESS(shard.Requests_.erase(req), "Ctx is not registered");
  273. }
  274. }
  275. protected:
  276. using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService;
  277. TGrpcAsyncService Service_;
  278. TGrpcAsyncService* GetService() override {
  279. return &Service_;
  280. }
  281. private:
  282. TAtomic ShuttingDown_ = 0;
  283. TAtomic GuardCount_ = 0;
  284. bool SslServer_ = false;
  285. bool NeedAuth_ = false;
  286. struct TShard {
  287. TAdaptiveLock Lock_;
  288. THashSet<ICancelableContext*> Requests_;
  289. };
  290. // Note: benchmarks showed 4 shards is enough to scale to ~30 threads
  291. TVector<TShard> Shards_{ size_t(4) };
  292. std::atomic<size_t> NextShard_{ 0 };
  293. };
  294. class TGRpcServer {
  295. public:
  296. using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
  297. TGRpcServer(const TServerOptions& opts);
  298. ~TGRpcServer();
  299. void AddService(IGRpcServicePtr service);
  300. void Start();
  301. // Send stop to registred services and call Shutdown on grpc server
  302. // This method MUST be called before destroying TGRpcServer
  303. void Stop();
  304. ui16 GetPort() const;
  305. TString GetHost() const;
  306. const TVector<IGRpcServicePtr>& GetServices() const;
  307. private:
  308. using IThreadRef = TAutoPtr<IThreadFactory::IThread>;
  309. const TServerOptions Options_;
  310. std::unique_ptr<grpc::Server> Server_;
  311. std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
  312. TVector<IThreadRef> Ts;
  313. TVector<IGRpcServicePtr> Services_;
  314. TGlobalLimiter Limiter_;
  315. };
  316. } // namespace NGrpc