grpc_client_low.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586
  1. #include "grpc_client_low.h"
  2. #include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
  3. #include <contrib/libs/grpc/include/grpc/support/log.h>
  4. #include <library/cpp/containers/stack_vector/stack_vec.h>
  5. #include <util/string/printf.h>
  6. #include <util/system/thread.h>
  7. #include <util/random/random.h>
  8. #if !defined(_WIN32) && !defined(_WIN64)
  9. #include <sys/types.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. #include <netinet/tcp.h>
  13. #endif
  14. namespace NGrpc {
  15. void EnableGRpcTracing() {
  16. grpc_tracer_set_enabled("tcp", true);
  17. grpc_tracer_set_enabled("client_channel", true);
  18. grpc_tracer_set_enabled("channel", true);
  19. grpc_tracer_set_enabled("api", true);
  20. grpc_tracer_set_enabled("connectivity_state", true);
  21. grpc_tracer_set_enabled("handshaker", true);
  22. grpc_tracer_set_enabled("http", true);
  23. grpc_tracer_set_enabled("http2_stream_state", true);
  24. grpc_tracer_set_enabled("op_failure", true);
  25. grpc_tracer_set_enabled("timer", true);
  26. gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
  27. }
  28. class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
  29. public:
  30. TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
  31. : Idle_(idle)
  32. , Count_(count)
  33. , Interval_(interval)
  34. {
  35. grpc_socket_mutator_init(this, &VTable);
  36. }
  37. private:
  38. static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
  39. return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
  40. }
  41. template<typename TVal>
  42. bool SetOption(int fd, int level, int optname, const TVal& value) {
  43. return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
  44. }
  45. bool SetOption(int fd) {
  46. if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
  47. Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl;
  48. return false;
  49. }
  50. #ifdef _linux_
  51. if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
  52. Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl;
  53. return false;
  54. }
  55. if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
  56. Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl;
  57. return false;
  58. }
  59. if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
  60. Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl;
  61. return false;
  62. }
  63. #endif
  64. return true;
  65. }
  66. static bool Mutate(int fd, grpc_socket_mutator* mutator) {
  67. auto self = Cast(mutator);
  68. return self->SetOption(fd);
  69. }
  70. static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
  71. const auto* selfA = Cast(a);
  72. const auto* selfB = Cast(b);
  73. auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
  74. auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
  75. return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
  76. }
  77. static void Destroy(grpc_socket_mutator* mutator) {
  78. delete Cast(mutator);
  79. }
  80. static grpc_socket_mutator_vtable VTable;
  81. const int Idle_;
  82. const int Count_;
  83. const int Interval_;
  84. };
  85. grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
  86. {
  87. &TGRpcKeepAliveSocketMutator::Mutate,
  88. &TGRpcKeepAliveSocketMutator::Compare,
  89. &TGRpcKeepAliveSocketMutator::Destroy
  90. };
  91. TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
  92. : TcpKeepAliveSettings_(tcpKeepAliveSettings)
  93. , ExpireTime_(expireTime)
  94. , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
  95. {}
  96. void TChannelPool::GetStubsHolderLocked(
  97. const TString& channelId,
  98. const TGRpcClientConfig& config,
  99. std::function<void(TStubsHolder&)> cb)
  100. {
  101. {
  102. std::shared_lock readGuard(RWMutex_);
  103. const auto it = Pool_.find(channelId);
  104. if (it != Pool_.end()) {
  105. if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
  106. return cb(it->second);
  107. }
  108. }
  109. }
  110. {
  111. std::unique_lock writeGuard(RWMutex_);
  112. {
  113. auto it = Pool_.find(channelId);
  114. if (it != Pool_.end()) {
  115. if (!it->second.IsChannelBroken()) {
  116. EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
  117. auto now = Now();
  118. LastUsedQueue_.emplace(now, channelId);
  119. it->second.SetLastUseTime(now);
  120. return cb(it->second);
  121. } else {
  122. // This channel can't be used. Remove from pool to create new one
  123. EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
  124. Pool_.erase(it);
  125. }
  126. }
  127. }
  128. TGRpcKeepAliveSocketMutator* mutator = nullptr;
  129. // will be destroyed inside grpc
  130. if (TcpKeepAliveSettings_.Enabled) {
  131. mutator = new TGRpcKeepAliveSocketMutator(
  132. TcpKeepAliveSettings_.Idle,
  133. TcpKeepAliveSettings_.Count,
  134. TcpKeepAliveSettings_.Interval
  135. );
  136. }
  137. cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
  138. LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
  139. }
  140. }
  141. void TChannelPool::DeleteChannel(const TString& channelId) {
  142. std::unique_lock writeLock(RWMutex_);
  143. auto poolIt = Pool_.find(channelId);
  144. if (poolIt != Pool_.end()) {
  145. EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
  146. Pool_.erase(poolIt);
  147. }
  148. }
  149. void TChannelPool::DeleteExpiredStubsHolders() {
  150. std::unique_lock writeLock(RWMutex_);
  151. auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
  152. for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){
  153. Pool_.erase(i->second);
  154. }
  155. LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired);
  156. }
  157. void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
  158. auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
  159. auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
  160. Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
  161. LastUsedQueue_.erase(pos);
  162. }
  163. static void PullEvents(grpc::CompletionQueue* cq) {
  164. TThread::SetCurrentThreadName("grpc_client");
  165. while (true) {
  166. void* tag;
  167. bool ok;
  168. if (!cq->Next(&tag, &ok)) {
  169. break;
  170. }
  171. if (auto* ev = static_cast<IQueueClientEvent*>(tag)) {
  172. if (!ev->Execute(ok)) {
  173. ev->Destroy();
  174. }
  175. }
  176. }
  177. }
  178. class TGRpcClientLow::TContextImpl final
  179. : public std::enable_shared_from_this<TContextImpl>
  180. , public IQueueClientContext
  181. {
  182. friend class TGRpcClientLow;
  183. using TCallback = std::function<void()>;
  184. using TContextPtr = std::shared_ptr<TContextImpl>;
  185. public:
  186. ~TContextImpl() override {
  187. Y_VERIFY(CountChildren() == 0,
  188. "Destructor called with non-empty children");
  189. if (Parent) {
  190. Parent->ForgetContext(this);
  191. } else if (Y_LIKELY(Owner)) {
  192. Owner->ForgetContext(this);
  193. }
  194. }
  195. /**
  196. * Helper for locking child pointer from a parent container
  197. */
  198. static TContextPtr LockChildPtr(TContextImpl* ptr) {
  199. if (ptr) {
  200. // N.B. it is safe to do as long as it's done under a mutex and
  201. // pointer is among valid children. When that's the case we
  202. // know that TContextImpl destructor has not finished yet, so
  203. // the object is valid. The lock() method may return nullptr
  204. // though, if the object is being destructed right now.
  205. return ptr->weak_from_this().lock();
  206. } else {
  207. return nullptr;
  208. }
  209. }
  210. void ForgetContext(TContextImpl* child) {
  211. std::unique_lock<std::mutex> guard(Mutex);
  212. auto removed = RemoveChild(child);
  213. Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child);
  214. }
  215. IQueueClientContextPtr CreateContext() override {
  216. auto self = shared_from_this();
  217. auto child = std::make_shared<TContextImpl>();
  218. {
  219. std::unique_lock<std::mutex> guard(Mutex);
  220. AddChild(child.get());
  221. // It's now safe to initialize parent and owner
  222. child->Parent = std::move(self);
  223. child->Owner = Owner;
  224. child->CQ = CQ;
  225. // Propagate cancellation to a child context
  226. if (Cancelled.load(std::memory_order_relaxed)) {
  227. child->Cancelled.store(true, std::memory_order_relaxed);
  228. }
  229. }
  230. return child;
  231. }
  232. grpc::CompletionQueue* CompletionQueue() override {
  233. Y_VERIFY(Owner, "Uninitialized context");
  234. return CQ;
  235. }
  236. bool IsCancelled() const override {
  237. return Cancelled.load(std::memory_order_acquire);
  238. }
  239. bool Cancel() override {
  240. TStackVec<TCallback, 1> callbacks;
  241. TStackVec<TContextPtr, 2> children;
  242. {
  243. std::unique_lock<std::mutex> guard(Mutex);
  244. if (Cancelled.load(std::memory_order_relaxed)) {
  245. // Already cancelled in another thread
  246. return false;
  247. }
  248. callbacks.reserve(Callbacks.size());
  249. children.reserve(CountChildren());
  250. for (auto& callback : Callbacks) {
  251. callbacks.emplace_back().swap(callback);
  252. }
  253. Callbacks.clear();
  254. // Collect all children we need to cancel
  255. // N.B. we don't clear children links (cleared by destructors)
  256. // N.B. some children may be stuck in destructors at the moment
  257. for (TContextImpl* ptr : InlineChildren) {
  258. if (auto child = LockChildPtr(ptr)) {
  259. children.emplace_back(std::move(child));
  260. }
  261. }
  262. for (auto* ptr : Children) {
  263. if (auto child = LockChildPtr(ptr)) {
  264. children.emplace_back(std::move(child));
  265. }
  266. }
  267. Cancelled.store(true, std::memory_order_release);
  268. }
  269. // Call directly subscribed callbacks
  270. if (callbacks) {
  271. RunCallbacksNoExcept(callbacks);
  272. }
  273. // Cancel all children
  274. for (auto& child : children) {
  275. child->Cancel();
  276. child.reset();
  277. }
  278. return true;
  279. }
  280. void SubscribeCancel(TCallback callback) override {
  281. Y_VERIFY(callback, "SubscribeCancel called with an empty callback");
  282. {
  283. std::unique_lock<std::mutex> guard(Mutex);
  284. if (!Cancelled.load(std::memory_order_relaxed)) {
  285. Callbacks.emplace_back().swap(callback);
  286. return;
  287. }
  288. }
  289. // Already cancelled, run immediately
  290. callback();
  291. }
  292. private:
  293. void AddChild(TContextImpl* child) {
  294. for (TContextImpl*& slot : InlineChildren) {
  295. if (!slot) {
  296. slot = child;
  297. return;
  298. }
  299. }
  300. Children.insert(child);
  301. }
  302. bool RemoveChild(TContextImpl* child) {
  303. for (TContextImpl*& slot : InlineChildren) {
  304. if (slot == child) {
  305. slot = nullptr;
  306. return true;
  307. }
  308. }
  309. return Children.erase(child);
  310. }
  311. size_t CountChildren() {
  312. size_t count = 0;
  313. for (TContextImpl* ptr : InlineChildren) {
  314. if (ptr) {
  315. ++count;
  316. }
  317. }
  318. return count + Children.size();
  319. }
  320. template<class TCallbacks>
  321. static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept {
  322. for (auto& callback : callbacks) {
  323. if (callback) {
  324. callback();
  325. callback = nullptr;
  326. }
  327. }
  328. }
  329. private:
  330. // We want a simple lock here, without extra memory allocations
  331. std::mutex Mutex;
  332. // These fields are initialized on successful registration
  333. TContextPtr Parent;
  334. TGRpcClientLow* Owner = nullptr;
  335. grpc::CompletionQueue* CQ = nullptr;
  336. // Some children are stored inline, others are in a set
  337. std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
  338. std::unordered_set<TContextImpl*> Children;
  339. // Single callback is stored without extra allocations
  340. TStackVec<TCallback, 1> Callbacks;
  341. // Atomic flag for a faster IsCancelled() implementation
  342. std::atomic<bool> Cancelled;
  343. };
  344. TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
  345. : UseCompletionQueuePerThread_(useCompletionQueuePerThread)
  346. {
  347. Init(numWorkerThread);
  348. }
  349. void TGRpcClientLow::Init(size_t numWorkerThread) {
  350. SetCqState(WORKING);
  351. if (UseCompletionQueuePerThread_) {
  352. for (size_t i = 0; i < numWorkerThread; i++) {
  353. CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
  354. auto* cq = CQS_.back().get();
  355. WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
  356. PullEvents(cq);
  357. }).Release());
  358. }
  359. } else {
  360. CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
  361. auto* cq = CQS_.back().get();
  362. for (size_t i = 0; i < numWorkerThread; i++) {
  363. WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
  364. PullEvents(cq);
  365. }).Release());
  366. }
  367. }
  368. }
  369. void TGRpcClientLow::AddWorkerThreadForTest() {
  370. if (UseCompletionQueuePerThread_) {
  371. CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
  372. auto* cq = CQS_.back().get();
  373. WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
  374. PullEvents(cq);
  375. }).Release());
  376. } else {
  377. auto* cq = CQS_.back().get();
  378. WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
  379. PullEvents(cq);
  380. }).Release());
  381. }
  382. }
  383. TGRpcClientLow::~TGRpcClientLow() {
  384. StopInternal(true);
  385. WaitInternal();
  386. }
  387. void TGRpcClientLow::Stop(bool wait) {
  388. StopInternal(false);
  389. if (wait) {
  390. WaitInternal();
  391. }
  392. }
  393. void TGRpcClientLow::StopInternal(bool silent) {
  394. bool shutdown;
  395. TVector<TContextImpl::TContextPtr> cancelQueue;
  396. {
  397. std::unique_lock<std::mutex> guard(Mtx_);
  398. auto allowStateChange = [&]() {
  399. switch (GetCqState()) {
  400. case WORKING:
  401. return true;
  402. case STOP_SILENT:
  403. return !silent;
  404. case STOP_EXPLICIT:
  405. return false;
  406. }
  407. Y_UNREACHABLE();
  408. };
  409. if (!allowStateChange()) {
  410. // Completion queue is already stopping
  411. return;
  412. }
  413. SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
  414. if (!silent && !Contexts_.empty()) {
  415. cancelQueue.reserve(Contexts_.size());
  416. for (auto* ptr : Contexts_) {
  417. // N.B. some contexts may be stuck in destructors
  418. if (auto context = TContextImpl::LockChildPtr(ptr)) {
  419. cancelQueue.emplace_back(std::move(context));
  420. }
  421. }
  422. }
  423. shutdown = Contexts_.empty();
  424. }
  425. for (auto& context : cancelQueue) {
  426. context->Cancel();
  427. context.reset();
  428. }
  429. if (shutdown) {
  430. for (auto& cq : CQS_) {
  431. cq->Shutdown();
  432. }
  433. }
  434. }
  435. void TGRpcClientLow::WaitInternal() {
  436. std::unique_lock<std::mutex> guard(JoinMutex_);
  437. for (auto& ti : WorkerThreads_) {
  438. ti->Join();
  439. }
  440. }
  441. void TGRpcClientLow::WaitIdle() {
  442. std::unique_lock<std::mutex> guard(Mtx_);
  443. while (!Contexts_.empty()) {
  444. ContextsEmpty_.wait(guard);
  445. }
  446. }
  447. std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
  448. std::unique_lock<std::mutex> guard(Mtx_);
  449. auto allowCreateContext = [&]() {
  450. switch (GetCqState()) {
  451. case WORKING:
  452. return true;
  453. case STOP_SILENT:
  454. case STOP_EXPLICIT:
  455. return false;
  456. }
  457. Y_UNREACHABLE();
  458. };
  459. if (!allowCreateContext()) {
  460. // New context creation is forbidden
  461. return nullptr;
  462. }
  463. auto context = std::make_shared<TContextImpl>();
  464. Contexts_.insert(context.get());
  465. context->Owner = this;
  466. if (UseCompletionQueuePerThread_) {
  467. context->CQ = CQS_[RandomNumber(CQS_.size())].get();
  468. } else {
  469. context->CQ = CQS_[0].get();
  470. }
  471. return context;
  472. }
  473. void TGRpcClientLow::ForgetContext(TContextImpl* context) {
  474. bool shutdown = false;
  475. {
  476. std::unique_lock<std::mutex> guard(Mtx_);
  477. if (!Contexts_.erase(context)) {
  478. Y_FAIL("Unexpected ForgetContext(%p)", context);
  479. }
  480. if (Contexts_.empty()) {
  481. if (IsStopping()) {
  482. shutdown = true;
  483. }
  484. ContextsEmpty_.notify_all();
  485. }
  486. }
  487. if (shutdown) {
  488. // This was the last context, shutdown CQ
  489. for (auto& cq : CQS_) {
  490. cq->Shutdown();
  491. }
  492. }
  493. }
  494. } // namespace NGRpc