grpc_client_low.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. #include "grpc_client_low.h"
  2. #include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
  3. #include <contrib/libs/grpc/include/grpc/support/log.h>
  4. #include <library/cpp/containers/stack_vector/stack_vec.h>
  5. #include <util/string/printf.h>
  6. #include <util/system/thread.h>
  7. #include <util/random/random.h>
  8. #if !defined(_WIN32) && !defined(_WIN64)
  9. #include <sys/types.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. #include <netinet/tcp.h>
  13. #endif
  14. namespace NGrpc {
  15. void EnableGRpcTracing() {
  16. grpc_tracer_set_enabled("tcp", true);
  17. grpc_tracer_set_enabled("client_channel", true);
  18. grpc_tracer_set_enabled("channel", true);
  19. grpc_tracer_set_enabled("api", true);
  20. grpc_tracer_set_enabled("connectivity_state", true);
  21. grpc_tracer_set_enabled("handshaker", true);
  22. grpc_tracer_set_enabled("http", true);
  23. grpc_tracer_set_enabled("http2_stream_state", true);
  24. grpc_tracer_set_enabled("op_failure", true);
  25. grpc_tracer_set_enabled("timer", true);
  26. gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
  27. }
  28. class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
  29. public:
  30. TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
  31. : Idle_(idle)
  32. , Count_(count)
  33. , Interval_(interval)
  34. {
  35. grpc_socket_mutator_init(this, &VTable);
  36. }
  37. private:
  38. static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
  39. return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
  40. }
  41. template<typename TVal>
  42. bool SetOption(int fd, int level, int optname, const TVal& value) {
  43. return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
  44. }
  45. bool SetOption(int fd) {
  46. if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
  47. Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl;
  48. return false;
  49. }
  50. #ifdef _linux_
  51. if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
  52. Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl;
  53. return false;
  54. }
  55. if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
  56. Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl;
  57. return false;
  58. }
  59. if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
  60. Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl;
  61. return false;
  62. }
  63. #endif
  64. return true;
  65. }
  66. static bool Mutate(int fd, grpc_socket_mutator* mutator) {
  67. auto self = Cast(mutator);
  68. return self->SetOption(fd);
  69. }
  70. static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
  71. const auto* selfA = Cast(a);
  72. const auto* selfB = Cast(b);
  73. auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
  74. auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
  75. return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
  76. }
  77. static void Destroy(grpc_socket_mutator* mutator) {
  78. delete Cast(mutator);
  79. }
  80. static bool Mutate2(const grpc_mutate_socket_info* info, grpc_socket_mutator* mutator) {
  81. auto self = Cast(mutator);
  82. return self->SetOption(info->fd);
  83. }
  84. static grpc_socket_mutator_vtable VTable;
  85. const int Idle_;
  86. const int Count_;
  87. const int Interval_;
  88. };
  89. grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
  90. {
  91. &TGRpcKeepAliveSocketMutator::Mutate,
  92. &TGRpcKeepAliveSocketMutator::Compare,
  93. &TGRpcKeepAliveSocketMutator::Destroy,
  94. &TGRpcKeepAliveSocketMutator::Mutate2
  95. };
  96. TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
  97. : TcpKeepAliveSettings_(tcpKeepAliveSettings)
  98. , ExpireTime_(expireTime)
  99. , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
  100. {}
  101. void TChannelPool::GetStubsHolderLocked(
  102. const TString& channelId,
  103. const TGRpcClientConfig& config,
  104. std::function<void(TStubsHolder&)> cb)
  105. {
  106. {
  107. std::shared_lock readGuard(RWMutex_);
  108. const auto it = Pool_.find(channelId);
  109. if (it != Pool_.end()) {
  110. if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
  111. return cb(it->second);
  112. }
  113. }
  114. }
  115. {
  116. std::unique_lock writeGuard(RWMutex_);
  117. {
  118. auto it = Pool_.find(channelId);
  119. if (it != Pool_.end()) {
  120. if (!it->second.IsChannelBroken()) {
  121. EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
  122. auto now = Now();
  123. LastUsedQueue_.emplace(now, channelId);
  124. it->second.SetLastUseTime(now);
  125. return cb(it->second);
  126. } else {
  127. // This channel can't be used. Remove from pool to create new one
  128. EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
  129. Pool_.erase(it);
  130. }
  131. }
  132. }
  133. TGRpcKeepAliveSocketMutator* mutator = nullptr;
  134. // will be destroyed inside grpc
  135. if (TcpKeepAliveSettings_.Enabled) {
  136. mutator = new TGRpcKeepAliveSocketMutator(
  137. TcpKeepAliveSettings_.Idle,
  138. TcpKeepAliveSettings_.Count,
  139. TcpKeepAliveSettings_.Interval
  140. );
  141. }
  142. cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
  143. LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
  144. }
  145. }
  146. void TChannelPool::DeleteChannel(const TString& channelId) {
  147. std::unique_lock writeLock(RWMutex_);
  148. auto poolIt = Pool_.find(channelId);
  149. if (poolIt != Pool_.end()) {
  150. EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
  151. Pool_.erase(poolIt);
  152. }
  153. }
  154. void TChannelPool::DeleteExpiredStubsHolders() {
  155. std::unique_lock writeLock(RWMutex_);
  156. auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
  157. for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){
  158. Pool_.erase(i->second);
  159. }
  160. LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired);
  161. }
  162. void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
  163. auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
  164. auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
  165. Y_ABORT_UNLESS(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
  166. LastUsedQueue_.erase(pos);
  167. }
  168. static void PullEvents(grpc::CompletionQueue* cq) {
  169. TThread::SetCurrentThreadName("grpc_client");
  170. while (true) {
  171. void* tag;
  172. bool ok;
  173. if (!cq->Next(&tag, &ok)) {
  174. break;
  175. }
  176. if (auto* ev = static_cast<IQueueClientEvent*>(tag)) {
  177. if (!ev->Execute(ok)) {
  178. ev->Destroy();
  179. }
  180. }
  181. }
  182. }
  183. class TGRpcClientLow::TContextImpl final
  184. : public std::enable_shared_from_this<TContextImpl>
  185. , public IQueueClientContext
  186. {
  187. friend class TGRpcClientLow;
  188. using TCallback = std::function<void()>;
  189. using TContextPtr = std::shared_ptr<TContextImpl>;
  190. public:
  191. ~TContextImpl() override {
  192. Y_ABORT_UNLESS(CountChildren() == 0,
  193. "Destructor called with non-empty children");
  194. if (Parent) {
  195. Parent->ForgetContext(this);
  196. } else if (Y_LIKELY(Owner)) {
  197. Owner->ForgetContext(this);
  198. }
  199. }
  200. /**
  201. * Helper for locking child pointer from a parent container
  202. */
  203. static TContextPtr LockChildPtr(TContextImpl* ptr) {
  204. if (ptr) {
  205. // N.B. it is safe to do as long as it's done under a mutex and
  206. // pointer is among valid children. When that's the case we
  207. // know that TContextImpl destructor has not finished yet, so
  208. // the object is valid. The lock() method may return nullptr
  209. // though, if the object is being destructed right now.
  210. return ptr->weak_from_this().lock();
  211. } else {
  212. return nullptr;
  213. }
  214. }
  215. void ForgetContext(TContextImpl* child) {
  216. std::unique_lock<std::mutex> guard(Mutex);
  217. auto removed = RemoveChild(child);
  218. Y_ABORT_UNLESS(removed, "Unexpected ForgetContext(%p)", child);
  219. }
  220. IQueueClientContextPtr CreateContext() override {
  221. auto self = shared_from_this();
  222. auto child = std::make_shared<TContextImpl>();
  223. {
  224. std::unique_lock<std::mutex> guard(Mutex);
  225. AddChild(child.get());
  226. // It's now safe to initialize parent and owner
  227. child->Parent = std::move(self);
  228. child->Owner = Owner;
  229. child->CQ = CQ;
  230. // Propagate cancellation to a child context
  231. if (Cancelled.load(std::memory_order_relaxed)) {
  232. child->Cancelled.store(true, std::memory_order_relaxed);
  233. }
  234. }
  235. return child;
  236. }
  237. grpc::CompletionQueue* CompletionQueue() override {
  238. Y_ABORT_UNLESS(Owner, "Uninitialized context");
  239. return CQ;
  240. }
  241. bool IsCancelled() const override {
  242. return Cancelled.load(std::memory_order_acquire);
  243. }
  244. bool Cancel() override {
  245. TStackVec<TCallback, 1> callbacks;
  246. TStackVec<TContextPtr, 2> children;
  247. {
  248. std::unique_lock<std::mutex> guard(Mutex);
  249. if (Cancelled.load(std::memory_order_relaxed)) {
  250. // Already cancelled in another thread
  251. return false;
  252. }
  253. callbacks.reserve(Callbacks.size());
  254. children.reserve(CountChildren());
  255. for (auto& callback : Callbacks) {
  256. callbacks.emplace_back().swap(callback);
  257. }
  258. Callbacks.clear();
  259. // Collect all children we need to cancel
  260. // N.B. we don't clear children links (cleared by destructors)
  261. // N.B. some children may be stuck in destructors at the moment
  262. for (TContextImpl* ptr : InlineChildren) {
  263. if (auto child = LockChildPtr(ptr)) {
  264. children.emplace_back(std::move(child));
  265. }
  266. }
  267. for (auto* ptr : Children) {
  268. if (auto child = LockChildPtr(ptr)) {
  269. children.emplace_back(std::move(child));
  270. }
  271. }
  272. Cancelled.store(true, std::memory_order_release);
  273. }
  274. // Call directly subscribed callbacks
  275. if (callbacks) {
  276. RunCallbacksNoExcept(callbacks);
  277. }
  278. // Cancel all children
  279. for (auto& child : children) {
  280. child->Cancel();
  281. child.reset();
  282. }
  283. return true;
  284. }
  285. void SubscribeCancel(TCallback callback) override {
  286. Y_ABORT_UNLESS(callback, "SubscribeCancel called with an empty callback");
  287. {
  288. std::unique_lock<std::mutex> guard(Mutex);
  289. if (!Cancelled.load(std::memory_order_relaxed)) {
  290. Callbacks.emplace_back().swap(callback);
  291. return;
  292. }
  293. }
  294. // Already cancelled, run immediately
  295. callback();
  296. }
  297. private:
  298. void AddChild(TContextImpl* child) {
  299. for (TContextImpl*& slot : InlineChildren) {
  300. if (!slot) {
  301. slot = child;
  302. return;
  303. }
  304. }
  305. Children.insert(child);
  306. }
  307. bool RemoveChild(TContextImpl* child) {
  308. for (TContextImpl*& slot : InlineChildren) {
  309. if (slot == child) {
  310. slot = nullptr;
  311. return true;
  312. }
  313. }
  314. return Children.erase(child);
  315. }
  316. size_t CountChildren() {
  317. size_t count = 0;
  318. for (TContextImpl* ptr : InlineChildren) {
  319. if (ptr) {
  320. ++count;
  321. }
  322. }
  323. return count + Children.size();
  324. }
  325. template<class TCallbacks>
  326. static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept {
  327. for (auto& callback : callbacks) {
  328. if (callback) {
  329. callback();
  330. callback = nullptr;
  331. }
  332. }
  333. }
  334. private:
  335. // We want a simple lock here, without extra memory allocations
  336. std::mutex Mutex;
  337. // These fields are initialized on successful registration
  338. TContextPtr Parent;
  339. TGRpcClientLow* Owner = nullptr;
  340. grpc::CompletionQueue* CQ = nullptr;
  341. // Some children are stored inline, others are in a set
  342. std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
  343. std::unordered_set<TContextImpl*> Children;
  344. // Single callback is stored without extra allocations
  345. TStackVec<TCallback, 1> Callbacks;
  346. // Atomic flag for a faster IsCancelled() implementation
  347. std::atomic<bool> Cancelled;
  348. };
  349. TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
  350. : UseCompletionQueuePerThread_(useCompletionQueuePerThread)
  351. {
  352. Init(numWorkerThread);
  353. }
  354. void TGRpcClientLow::Init(size_t numWorkerThread) {
  355. SetCqState(WORKING);
  356. if (UseCompletionQueuePerThread_) {
  357. for (size_t i = 0; i < numWorkerThread; i++) {
  358. CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
  359. auto* cq = CQS_.back().get();
  360. WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
  361. PullEvents(cq);
  362. }).Release());
  363. }
  364. } else {
  365. CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
  366. auto* cq = CQS_.back().get();
  367. for (size_t i = 0; i < numWorkerThread; i++) {
  368. WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
  369. PullEvents(cq);
  370. }).Release());
  371. }
  372. }
  373. }
  374. void TGRpcClientLow::AddWorkerThreadForTest() {
  375. if (UseCompletionQueuePerThread_) {
  376. CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
  377. auto* cq = CQS_.back().get();
  378. WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
  379. PullEvents(cq);
  380. }).Release());
  381. } else {
  382. auto* cq = CQS_.back().get();
  383. WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
  384. PullEvents(cq);
  385. }).Release());
  386. }
  387. }
  388. TGRpcClientLow::~TGRpcClientLow() {
  389. StopInternal(true);
  390. WaitInternal();
  391. }
  392. void TGRpcClientLow::Stop(bool wait) {
  393. StopInternal(false);
  394. if (wait) {
  395. WaitInternal();
  396. }
  397. }
  398. void TGRpcClientLow::StopInternal(bool silent) {
  399. bool shutdown;
  400. TVector<TContextImpl::TContextPtr> cancelQueue;
  401. {
  402. std::unique_lock<std::mutex> guard(Mtx_);
  403. auto allowStateChange = [&]() {
  404. switch (GetCqState()) {
  405. case WORKING:
  406. return true;
  407. case STOP_SILENT:
  408. return !silent;
  409. case STOP_EXPLICIT:
  410. return false;
  411. }
  412. Y_UNREACHABLE();
  413. };
  414. if (!allowStateChange()) {
  415. // Completion queue is already stopping
  416. return;
  417. }
  418. SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
  419. if (!silent && !Contexts_.empty()) {
  420. cancelQueue.reserve(Contexts_.size());
  421. for (auto* ptr : Contexts_) {
  422. // N.B. some contexts may be stuck in destructors
  423. if (auto context = TContextImpl::LockChildPtr(ptr)) {
  424. cancelQueue.emplace_back(std::move(context));
  425. }
  426. }
  427. }
  428. shutdown = Contexts_.empty();
  429. }
  430. for (auto& context : cancelQueue) {
  431. context->Cancel();
  432. context.reset();
  433. }
  434. if (shutdown) {
  435. for (auto& cq : CQS_) {
  436. cq->Shutdown();
  437. }
  438. }
  439. }
  440. void TGRpcClientLow::WaitInternal() {
  441. std::unique_lock<std::mutex> guard(JoinMutex_);
  442. for (auto& ti : WorkerThreads_) {
  443. ti->Join();
  444. }
  445. }
  446. void TGRpcClientLow::WaitIdle() {
  447. std::unique_lock<std::mutex> guard(Mtx_);
  448. while (!Contexts_.empty()) {
  449. ContextsEmpty_.wait(guard);
  450. }
  451. }
  452. std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
  453. std::unique_lock<std::mutex> guard(Mtx_);
  454. auto allowCreateContext = [&]() {
  455. switch (GetCqState()) {
  456. case WORKING:
  457. return true;
  458. case STOP_SILENT:
  459. case STOP_EXPLICIT:
  460. return false;
  461. }
  462. Y_UNREACHABLE();
  463. };
  464. if (!allowCreateContext()) {
  465. // New context creation is forbidden
  466. return nullptr;
  467. }
  468. auto context = std::make_shared<TContextImpl>();
  469. Contexts_.insert(context.get());
  470. context->Owner = this;
  471. if (UseCompletionQueuePerThread_) {
  472. context->CQ = CQS_[RandomNumber(CQS_.size())].get();
  473. } else {
  474. context->CQ = CQS_[0].get();
  475. }
  476. return context;
  477. }
  478. void TGRpcClientLow::ForgetContext(TContextImpl* context) {
  479. bool shutdown = false;
  480. {
  481. std::unique_lock<std::mutex> guard(Mtx_);
  482. if (!Contexts_.erase(context)) {
  483. Y_ABORT("Unexpected ForgetContext(%p)", context);
  484. }
  485. if (Contexts_.empty()) {
  486. if (IsStopping()) {
  487. shutdown = true;
  488. }
  489. ContextsEmpty_.notify_all();
  490. }
  491. }
  492. if (shutdown) {
  493. // This was the last context, shutdown CQ
  494. for (auto& cq : CQS_) {
  495. cq->Shutdown();
  496. }
  497. }
  498. }
  499. } // namespace NGRpc