gearmand.cc 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Gearmand Definitions
  41. */
  42. #include "gear_config.h"
  43. #include "libgearman-server/common.h"
  44. #include <cerrno>
  45. #include <netdb.h>
  46. #include <sys/socket.h>
  47. #include <sys/types.h>
  48. #include <sys/utsname.h>
  49. #include <unistd.h>
  50. #include <set>
  51. #include <string>
  52. #include <vector>
  53. #include <libgearman-server/gearmand.h>
  54. #include "libgearman-server/struct/port.h"
  55. #include "libgearman-server/plugins.h"
  56. #include "libgearman-server/timer.h"
  57. #include "libgearman-server/queue.h"
  58. #include "util/memory.h"
  59. using namespace org::tangent;
  60. using namespace gearmand;
  61. #ifndef SOCK_NONBLOCK
  62. # define SOCK_NONBLOCK 0
  63. #endif
  64. #ifndef SOL_TCP
  65. # define SOL_TCP 0
  66. #endif
  67. #ifndef TCP_KEEPIDLE
  68. # define TCP_KEEPIDLE 0
  69. #endif
  70. #ifndef TCP_KEEPINTVL
  71. # define TCP_KEEPINTVL 0
  72. #endif
  73. #ifndef TCP_KEEPCNT
  74. # define TCP_KEEPCNT 0
  75. #endif
  76. /*
  77. * Private declarations
  78. */
  79. /**
  80. * @addtogroup gearmand_private Private Gearman Daemon Functions
  81. * @ingroup gearmand
  82. * @{
  83. */
  84. static gearmand_error_t _listen_init(gearmand_st *gearmand);
  85. static void _listen_close(gearmand_st *gearmand);
  86. static gearmand_error_t _listen_watch(gearmand_st *gearmand);
  87. static void _listen_clear(gearmand_st *gearmand);
  88. static void _listen_event(int fd, short events, void *arg);
  89. static gearmand_error_t _wakeup_init(gearmand_st *gearmand);
  90. static void _wakeup_close(gearmand_st *gearmand);
  91. static gearmand_error_t _wakeup_watch(gearmand_st *gearmand);
  92. static void _wakeup_clear(gearmand_st *gearmand);
  93. static void _wakeup_event(int fd, short events, void *arg);
  94. static gearmand_error_t _watch_events(gearmand_st *gearmand);
  95. static void _clear_events(gearmand_st *gearmand);
  96. static void _close_events(gearmand_st *gearmand);
  97. static bool gearman_server_create(gearman_server_st& server,
  98. const uint32_t job_retries,
  99. const char *job_handle_prefix,
  100. uint8_t worker_wakeup,
  101. bool round_robin,
  102. uint32_t hashtable_buckets);
  103. static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
  104. void *context, const gearmand_verbose_t verbose);
  105. static void gearman_server_free(gearman_server_st& server)
  106. {
  107. /* All threads should be cleaned up before calling this. */
  108. assert(server.thread_list == NULL);
  109. for (uint32_t key= 0; key < server.hashtable_buckets; key++)
  110. {
  111. while (server.job_hash[key] != NULL)
  112. {
  113. gearman_server_save_job(server, server.job_hash[key]);
  114. gearman_server_job_free(server.job_hash[key]);
  115. }
  116. }
  117. gearman_queue_flush(&server);
  118. for (uint32_t function_key= 0; function_key < GEARMAND_DEFAULT_HASH_SIZE;
  119. function_key++)
  120. {
  121. while(server.function_hash[function_key] != NULL)
  122. {
  123. gearman_server_function_free(&server, server.function_hash[function_key]);
  124. }
  125. }
  126. while (server.free_packet_list != NULL)
  127. {
  128. gearman_server_packet_st *packet= server.free_packet_list;
  129. server.free_packet_list= packet->next;
  130. delete packet;
  131. }
  132. while (server.free_job_list != NULL)
  133. {
  134. gearman_server_job_st* job= server.free_job_list;
  135. server.free_job_list= job->next;
  136. delete job;
  137. }
  138. while (server.free_client_list != NULL)
  139. {
  140. gearman_server_client_st* client= server.free_client_list;
  141. server.free_client_list= client->con_next;
  142. delete client;
  143. }
  144. while (server.free_worker_list != NULL)
  145. {
  146. gearman_server_worker_st* worker= server.free_worker_list;
  147. server.free_worker_list= worker->con_next;
  148. delete worker;
  149. }
  150. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "removing queue: %s", (server.queue_version == QUEUE_VERSION_CLASS) ? "CLASS" : "FUNCTION");
  151. if (server.queue_version == QUEUE_VERSION_CLASS)
  152. {
  153. delete server.queue.object;
  154. assert(server.queue.functions == NULL);
  155. }
  156. else if (server.queue_version == QUEUE_VERSION_FUNCTION)
  157. {
  158. delete server.queue.functions;
  159. assert(server.queue.object == NULL);
  160. }
  161. else
  162. {
  163. gearmand_debug("Unknown queue type in removal");
  164. }
  165. free(server.job_hash);
  166. free(server.unique_hash);
  167. free(server.function_hash);
  168. }
  169. /** @} */
  170. #pragma GCC diagnostic push
  171. #ifndef __INTEL_COMPILER
  172. #pragma GCC diagnostic ignored "-Wold-style-cast"
  173. #endif
  174. /*
  175. * Public definitions
  176. */
  177. static gearmand_st *_global_gearmand= NULL;
  178. gearmand_st *Gearmand(void)
  179. {
  180. if (_global_gearmand == NULL)
  181. {
  182. assert_msg(false, "Gearmand() was called before it was allocated");
  183. gearmand_error("Gearmand() was called before it was allocated");
  184. }
  185. assert(_global_gearmand);
  186. return _global_gearmand;
  187. }
  188. gearmand_st *gearmand_create(gearmand_config_st *config,
  189. const char *host_arg,
  190. uint32_t threads_arg,
  191. int backlog_arg,
  192. const uint32_t job_retries,
  193. const char *job_handle_prefix,
  194. uint8_t worker_wakeup,
  195. gearmand_log_fn *log_function, void *log_context, const gearmand_verbose_t verbose_arg,
  196. bool round_robin,
  197. bool exceptions_,
  198. uint32_t hashtable_buckets)
  199. {
  200. assert(_global_gearmand == NULL);
  201. if (_global_gearmand)
  202. {
  203. gearmand_error("You have called gearmand_create() twice within your application.");
  204. _exit(EXIT_FAILURE);
  205. }
  206. #ifdef HAVE_LIBEVENT2_PTHREADS
  207. evthread_use_pthreads();
  208. #else
  209. #warn "HAVE_LIBEVENT2_PTHREADS not defined, can't initialize thread-support. This build will emit warnings and disable threads."
  210. if (threads_arg > 0)
  211. {
  212. threads_arg= 0;
  213. gearmand_log_warning("Threads disabled because this build could not locate libevent2 or its pthreads support.");
  214. }
  215. #endif
  216. gearmand_st* gearmand= new (std::nothrow) gearmand_st(host_arg, threads_arg, backlog_arg, verbose_arg, exceptions_);
  217. if (gearmand == NULL)
  218. {
  219. gearmand_perror(errno, "Failed to new() gearmand_st");
  220. return NULL;
  221. }
  222. _global_gearmand= gearmand;
  223. gearmand->socketopt()= config->config.sockopt();
  224. if (gearman_server_create(gearmand->server, job_retries,
  225. job_handle_prefix, worker_wakeup,
  226. round_robin, hashtable_buckets) == false)
  227. {
  228. delete gearmand;
  229. _global_gearmand= NULL;
  230. return NULL;
  231. }
  232. gearmand_set_log_fn(gearmand, log_function, log_context, verbose_arg);
  233. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "THREADS: %u", threads_arg);
  234. return gearmand;
  235. }
  236. void gearmand_free(gearmand_st *gearmand)
  237. {
  238. if (gearmand)
  239. {
  240. _close_events(gearmand);
  241. if (gearmand->threads > 0)
  242. {
  243. gearmand_debug("Shutting down all threads");
  244. }
  245. while (gearmand->thread_list != NULL)
  246. {
  247. gearmand_thread_free(gearmand->thread_list);
  248. }
  249. while (gearmand->free_dcon_list != NULL)
  250. {
  251. gearmand_con_st* dcon= gearmand->free_dcon_list;
  252. gearmand->free_dcon_list= dcon->next;
  253. delete dcon;
  254. }
  255. if (gearmand->base != NULL)
  256. {
  257. event_base_free(gearmand->base);
  258. gearmand->base= NULL;
  259. }
  260. gearman_server_free(gearmand->server);
  261. gearmand_info("Shutdown complete");
  262. delete gearmand;
  263. }
  264. }
  265. static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
  266. void *context, const gearmand_verbose_t verbose)
  267. {
  268. gearmand->log_fn= function;
  269. gearmand->log_context= context;
  270. gearmand->verbose= verbose;
  271. }
  272. bool gearmand_exceptions(gearmand_st *gearmand)
  273. {
  274. return gearmand->exceptions();
  275. }
  276. gearmand_error_t gearmand_port_add(gearmand_st *gearmand, const char *port,
  277. gearmand_connection_add_fn *function,
  278. gearmand_connection_remove_fn* remove_)
  279. {
  280. assert(gearmand);
  281. gearmand->_port_list.resize(gearmand->_port_list.size() +1);
  282. memcpy(gearmand->_port_list.back().port, port, NI_MAXSERV);
  283. gearmand->_port_list.back().port[NI_MAXSERV -1]= '\0';
  284. gearmand->_port_list.back().add_fn(function);
  285. gearmand->_port_list.back().remove_fn(remove_);
  286. return GEARMAND_SUCCESS;
  287. }
  288. gearman_server_st *gearmand_server(gearmand_st *gearmand)
  289. {
  290. return &gearmand->server;
  291. }
  292. gearmand_error_t gearmand_run(gearmand_st *gearmand)
  293. {
  294. libgearman::server::Epoch epoch;
  295. /* Initialize server components. */
  296. if (gearmand->base == NULL)
  297. {
  298. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up with pid %lu, verbose is set to %s",
  299. (unsigned long)(getpid()),
  300. gearmand_verbose_name(gearmand->verbose));
  301. if (gearmand->threads > 0)
  302. {
  303. /* Set the number of free connection structures each thread should keep
  304. around before the main thread is forced to take them. We compute this
  305. here so we don't need to on every new connection. */
  306. gearmand->max_thread_free_dcon_count= ((GEARMAND_MAX_FREE_SERVER_CON /
  307. gearmand->threads) / 2);
  308. }
  309. gearmand->base= static_cast<struct event_base *>(event_base_new());
  310. if (gearmand->base == NULL)
  311. {
  312. gearmand_fatal("event_base_new(NULL)");
  313. return GEARMAND_EVENT;
  314. }
  315. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Method for libevent: %s", event_base_get_method(gearmand->base));
  316. gearmand->ret= _listen_init(gearmand);
  317. if (gearmand->ret != GEARMAND_SUCCESS)
  318. {
  319. return gearmand->ret;
  320. }
  321. gearmand->ret= _wakeup_init(gearmand);
  322. if (gearmand->ret != GEARMAND_SUCCESS)
  323. {
  324. return gearmand->ret;
  325. }
  326. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Creating %u threads", gearmand->threads);
  327. /* If we have 0 threads we still need to create a fake one for context. */
  328. uint32_t x= 0;
  329. do
  330. {
  331. gearmand->ret= gearmand_thread_create(*gearmand);
  332. if (gearmand->ret != GEARMAND_SUCCESS)
  333. return gearmand->ret;
  334. x++;
  335. }
  336. while (x < gearmand->threads);
  337. gearmand_debug("replaying queue: begin");
  338. gearmand->ret= gearman_server_queue_replay(gearmand->server);
  339. if (gearmand_failed(gearmand->ret))
  340. {
  341. return gearmand_gerror("failed to reload queue", gearmand->ret);
  342. }
  343. gearmand_debug("replaying queue: end");
  344. }
  345. gearmand->ret= _watch_events(gearmand);
  346. if (gearmand_failed(gearmand->ret))
  347. {
  348. return gearmand->ret;
  349. }
  350. gearmand_debug("Entering main event loop");
  351. if (event_base_loop(gearmand->base, 0) == -1)
  352. {
  353. gearmand_fatal("event_base_loop(-1)");
  354. return GEARMAND_EVENT;
  355. }
  356. gearmand_debug("Exited main event loop");
  357. return gearmand->ret;
  358. }
  359. void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
  360. {
  361. /*
  362. If this fails, there is not much we can really do. This should never fail though if the main gearmand thread is still active.
  363. */
  364. if (gearmand->wakeup_fd[1] != -1)
  365. {
  366. int limit= 5;
  367. while (--limit) // limit is for EINTR
  368. {
  369. ssize_t written;
  370. uint8_t buffer= wakeup;
  371. if ((written= write(gearmand->wakeup_fd[1], &buffer, 1)) != 1)
  372. {
  373. if (written < 0)
  374. {
  375. switch (errno)
  376. {
  377. case EINTR:
  378. continue;
  379. default:
  380. break;
  381. }
  382. gearmand_perror(errno, gearmand_strwakeup(wakeup));
  383. }
  384. else
  385. {
  386. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  387. "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);
  388. }
  389. }
  390. return;
  391. }
  392. }
  393. }
  394. /*
  395. * Private definitions
  396. */
  397. gearmand_error_t set_socket(gearmand_st* gearmand, int& fd, struct addrinfo *addrinfo_next)
  398. {
  399. /* Call to socket() can fail for some getaddrinfo results, try another. */
  400. fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
  401. addrinfo_next->ai_protocol);
  402. if (fd == -1)
  403. {
  404. return gearmand_perror(errno, "socket()");
  405. }
  406. #ifdef IPV6_V6ONLY
  407. {
  408. int flags= 1;
  409. if (addrinfo_next->ai_family == AF_INET6)
  410. {
  411. flags= 1;
  412. if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &flags, sizeof(flags)) == -1)
  413. {
  414. return gearmand_perror(errno, "setsockopt(IPV6_V6ONLY)");
  415. }
  416. }
  417. }
  418. #endif
  419. if (0) // Add in when we have server working as a library again.
  420. {
  421. if (FD_CLOEXEC)
  422. {
  423. int flags;
  424. do
  425. {
  426. flags= fcntl(fd, F_GETFD, 0);
  427. } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
  428. if (flags != -1)
  429. {
  430. int rval;
  431. do
  432. {
  433. rval= fcntl (fd, F_SETFD, flags | FD_CLOEXEC);
  434. } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
  435. // we currently ignore the case where rval is -1
  436. }
  437. }
  438. }
  439. {
  440. int flags= 1;
  441. if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags)) == -1)
  442. {
  443. return gearmand_perror(errno, "setsockopt(SO_REUSEADDR)");
  444. }
  445. }
  446. if (gearmand->socketopt().keepalive())
  447. {
  448. int flags= 1;
  449. if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)) == -1)
  450. {
  451. return gearmand_perror(errno, "setsockopt(SO_KEEPALIVE)");
  452. }
  453. if (SOL_TCP)
  454. {
  455. #if defined(TCP_KEEPIDLE) && TCP_KEEPIDLE
  456. if (TCP_KEEPIDLE and gearmand->socketopt().keepalive_idle() != -1)
  457. {
  458. int optval= gearmand->socketopt().keepalive_idle();
  459. if (setsockopt(fd, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1)
  460. {
  461. return gearmand_perror(errno, "setsockopt(TCP_KEEPIDLE)");
  462. }
  463. }
  464. #endif
  465. #if defined(TCP_KEEPINTVL) && TCP_KEEPINTVL
  466. if (TCP_KEEPINTVL and gearmand->socketopt().keepalive_interval() != -1)
  467. {
  468. int optval= gearmand->socketopt().keepalive_interval();
  469. if (setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval)) == -1)
  470. {
  471. return gearmand_perror(errno, "setsockopt(TCP_KEEPINTVL)");
  472. }
  473. }
  474. #endif
  475. #if defined(TCP_KEEPCNT) && TCP_KEEPCNT
  476. if (TCP_KEEPCNT and gearmand->socketopt().keepalive_count() != -1)
  477. {
  478. int optval= gearmand->socketopt().keepalive_count();
  479. if (setsockopt(fd, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(optval)) == -1)
  480. {
  481. return gearmand_perror(errno, "setsockopt(TCP_KEEPCNT)");
  482. }
  483. }
  484. #endif
  485. }
  486. }
  487. {
  488. struct linger ling= {0, 0};
  489. if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) == -1)
  490. {
  491. return gearmand_perror(errno, "setsockopt(SO_LINGER)");
  492. }
  493. }
  494. {
  495. int flags= 1;
  496. if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags)) == -1)
  497. {
  498. return gearmand_perror(errno, "setsockopt(TCP_NODELAY)");
  499. }
  500. }
  501. return GEARMAND_SUCCESS;
  502. }
  503. static const uint32_t bind_timeout= 20; // Number is not special, but look at INFO messages if you decide to change it.
  504. typedef std::pair<std::string, std::string> host_port_t;
  505. static gearmand_error_t _listen_init(gearmand_st *gearmand)
  506. {
  507. for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
  508. {
  509. struct addrinfo hints;
  510. struct addrinfo *addrinfo= NULL;
  511. gearmand_port_st *port= &gearmand->_port_list[x];
  512. memset(&hints, 0, sizeof(struct addrinfo));
  513. hints.ai_flags= AI_PASSIVE;
  514. hints.ai_socktype= SOCK_STREAM;
  515. {
  516. int ret= getaddrinfo(gearmand->host, port->port, &hints, &addrinfo);
  517. if (ret != 0)
  518. {
  519. char buffer[1024];
  520. int length= snprintf(buffer, sizeof(buffer), "%s:%s", gearmand->host ? gearmand->host : "<any>", port->port);
  521. if (length <= 0 or size_t(length) >= sizeof(buffer))
  522. {
  523. buffer[0]= 0;
  524. }
  525. if (addrinfo)
  526. {
  527. freeaddrinfo(addrinfo);
  528. }
  529. return gearmand_gai_error(buffer, ret);
  530. }
  531. }
  532. std::set<host_port_t> unique_hosts;
  533. for (struct addrinfo *addrinfo_next= addrinfo; addrinfo_next != NULL;
  534. addrinfo_next= addrinfo_next->ai_next)
  535. {
  536. char host[NI_MAXHOST];
  537. {
  538. int ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
  539. NI_MAXHOST, port->port, NI_MAXSERV,
  540. NI_NUMERICHOST | NI_NUMERICSERV);
  541. if (ret != 0)
  542. {
  543. gearmand_gai_error("getaddrinfo", ret);
  544. strncpy(host, "-", sizeof(host));
  545. strncpy(port->port, "-", sizeof(port->port));
  546. }
  547. }
  548. std::string host_string(host);
  549. std::string port_string(port->port);
  550. host_port_t check= std::make_pair(host_string, port_string);
  551. if (unique_hosts.find(check) != unique_hosts.end())
  552. {
  553. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Already listening on %s:%s", host, port->port);
  554. continue;
  555. }
  556. unique_hosts.insert(check);
  557. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Trying to listen on %s:%s", host, port->port);
  558. /*
  559. @note logic for this pulled from Drizzle.
  560. Sometimes the port is not released fast enough when stopping and
  561. restarting the server. This happens quite often with the test suite
  562. on busy Linux systems. Retry to bind the address at these intervals:
  563. Sleep intervals: 1, 2, 4, 6, 9, 13, 17, 22, ...
  564. Retry at second: 1, 3, 7, 13, 22, 35, 52, 74, ...
  565. Limit the sequence by drizzled_bind_timeout.
  566. */
  567. uint32_t waited;
  568. uint32_t this_wait;
  569. uint32_t retry;
  570. int ret= -1;
  571. int fd;
  572. for (waited= 0, retry= 1; ; retry++, waited+= this_wait)
  573. {
  574. {
  575. gearmand_error_t socket_ret;
  576. if (gearmand_failed(socket_ret= set_socket(gearmand, fd, addrinfo_next)))
  577. {
  578. gearmand_sockfd_close(fd);
  579. freeaddrinfo(addrinfo);
  580. return socket_ret;
  581. }
  582. }
  583. errno= 0;
  584. if ((ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen)) == 0)
  585. {
  586. // Success
  587. break;
  588. }
  589. // Protect our error
  590. ret= errno;
  591. gearmand_sockfd_close(fd);
  592. if (waited >= bind_timeout)
  593. {
  594. freeaddrinfo(addrinfo);
  595. return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Timeout occurred when calling bind() for %s:%s", host, port->port);
  596. }
  597. if (ret != EADDRINUSE)
  598. {
  599. freeaddrinfo(addrinfo);
  600. return gearmand_perror(ret, "bind");
  601. }
  602. this_wait= retry * retry / 3 + 1;
  603. // We are in single user threads, so strerror() is fine.
  604. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u + %u >= %u",
  605. strerror(ret), host, port->port,
  606. waited, this_wait, bind_timeout);
  607. struct timespec requested;
  608. requested.tv_sec= this_wait;
  609. requested.tv_nsec= 0;
  610. nanosleep(&requested, NULL);
  611. }
  612. if (listen(fd, gearmand->backlog) == -1)
  613. {
  614. gearmand_perror(errno, "listen");
  615. gearmand_sockfd_close(fd);
  616. freeaddrinfo(addrinfo);
  617. return GEARMAND_ERRNO;
  618. }
  619. // Scoping note for eventual transformation
  620. {
  621. int* fd_list= (int *)realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
  622. if (fd_list == NULL)
  623. {
  624. gearmand_perror(errno, "realloc");
  625. gearmand_sockfd_close(fd);
  626. freeaddrinfo(addrinfo);
  627. return GEARMAND_ERRNO;
  628. }
  629. port->listen_fd= fd_list;
  630. }
  631. port->listen_fd[port->listen_count]= fd;
  632. port->listen_count++;
  633. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Listening on %s:%s (%d)", host, port->port, fd);
  634. }
  635. freeaddrinfo(addrinfo);
  636. /* Report last socket() error if we couldn't find an address to bind. */
  637. if (port->listen_fd == NULL)
  638. {
  639. return gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Could not bind/listen to any addresses");
  640. }
  641. assert(port->listen_event == NULL);
  642. port->listen_event= (struct event *)malloc(sizeof(struct event) * port->listen_count); // libevent POD
  643. if (port->listen_event == NULL)
  644. {
  645. return gearmand_merror("malloc(sizeof(struct event) * port->listen_count)", struct event, port->listen_count);
  646. }
  647. for (uint32_t y= 0; y < port->listen_count; ++y)
  648. {
  649. event_set(&(port->listen_event[y]), port->listen_fd[y], EV_READ | EV_PERSIST, _listen_event, port);
  650. if (event_base_set(gearmand->base, &(port->listen_event[y])) == -1)
  651. {
  652. return gearmand_perror(errno, "event_base_set()");
  653. }
  654. }
  655. }
  656. return GEARMAND_SUCCESS;
  657. }
  658. static void _listen_close(gearmand_st *gearmand)
  659. {
  660. _listen_clear(gearmand);
  661. for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
  662. {
  663. for (uint32_t y= 0; y < gearmand->_port_list[x].listen_count; ++y)
  664. {
  665. if (gearmand->_port_list[x].listen_fd[y] >= 0)
  666. {
  667. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Closing listening socket (%d)", gearmand->_port_list[x].listen_fd[y]);
  668. gearmand_sockfd_close(gearmand->_port_list[x].listen_fd[y]);
  669. gearmand->_port_list[x].listen_fd[y]= -1;
  670. }
  671. }
  672. }
  673. }
  674. static gearmand_error_t _listen_watch(gearmand_st *gearmand)
  675. {
  676. if (gearmand->is_listen_event)
  677. {
  678. return GEARMAND_SUCCESS;
  679. }
  680. for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
  681. {
  682. for (uint32_t y= 0; y < gearmand->_port_list[x].listen_count; y++)
  683. {
  684. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Adding event for listening socket (%d)",
  685. gearmand->_port_list[x].listen_fd[y]);
  686. if (event_add(&(gearmand->_port_list[x].listen_event[y]), NULL) < 0)
  687. {
  688. gearmand_perror(errno, "event_add");
  689. return GEARMAND_EVENT;
  690. }
  691. }
  692. }
  693. gearmand->is_listen_event= true;
  694. return GEARMAND_SUCCESS;
  695. }
  696. static void _listen_clear(gearmand_st *gearmand)
  697. {
  698. if (gearmand->is_listen_event)
  699. {
  700. for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
  701. {
  702. for (uint32_t y= 0; y < gearmand->_port_list[x].listen_count; y++)
  703. {
  704. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
  705. "Clearing event for listening socket (%d)",
  706. gearmand->_port_list[x].listen_fd[y]);
  707. if (event_del(&(gearmand->_port_list[x].listen_event[y])) == -1)
  708. {
  709. gearmand_perror(errno, "We tried to event_del() an event which no longer existed");
  710. assert_msg(false, "We tried to event_del() an event which no longer existed");
  711. }
  712. }
  713. }
  714. gearmand->is_listen_event= false;
  715. }
  716. }
  717. static void _listen_event(int event_fd, short events __attribute__ ((unused)), void *arg)
  718. {
  719. gearmand_port_st *port= (gearmand_port_st *)arg;
  720. struct sockaddr sa;
  721. socklen_t sa_len= sizeof(sa);
  722. #if defined(HAVE_ACCEPT4) && HAVE_ACCEPT4
  723. int fd= accept4(event_fd, &sa, &sa_len, SOCK_NONBLOCK); // SOCK_NONBLOCK);
  724. #else
  725. int fd= accept(event_fd, &sa, &sa_len);
  726. #endif
  727. if (fd == -1)
  728. {
  729. int local_error= errno;
  730. switch (local_error)
  731. {
  732. case EINTR:
  733. case EMFILE:
  734. return;
  735. case ECONNABORTED:
  736. gearmand_perror(local_error, "accept");
  737. return;
  738. default:
  739. break;
  740. }
  741. _clear_events(Gearmand());
  742. Gearmand()->ret= gearmand_perror(local_error, "accept");
  743. return;
  744. }
  745. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "accept() fd:%d", fd);
  746. /*
  747. Since this is numeric, it should never fail. Even if it did we don't want to really error from it.
  748. */
  749. char host[NI_MAXHOST];
  750. char port_str[NI_MAXSERV];
  751. int error= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
  752. NI_NUMERICHOST | NI_NUMERICSERV);
  753. if (error != 0)
  754. {
  755. gearmand_gai_error("getnameinfo", error);
  756. strncpy(host, "-", sizeof(host));
  757. strncpy(port_str, "-", sizeof(port_str));
  758. }
  759. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Accepted connection from %s:%s", host, port_str);
  760. {
  761. int flags= 1;
  762. if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)) == -1)
  763. {
  764. gearmand_log_perror(GEARMAN_DEFAULT_LOG_PARAM, errno, "%s:%s setsockopt(SO_KEEPALIVE)", host, port_str);
  765. }
  766. }
  767. gearmand_error_t ret= gearmand_con_create(Gearmand(), fd, host, port_str, port);
  768. if (ret == GEARMAND_MEMORY_ALLOCATION_FAILURE)
  769. {
  770. gearmand_sockfd_close(fd);
  771. return;
  772. }
  773. else if (ret != GEARMAND_SUCCESS)
  774. {
  775. Gearmand()->ret= ret;
  776. _clear_events(Gearmand());
  777. }
  778. }
  779. static gearmand_error_t _wakeup_init(gearmand_st *gearmand)
  780. {
  781. gearmand_debug("Creating wakeup pipe");
  782. #if defined(HAVE_PIPE2) && HAVE_PIPE2
  783. if (pipe2(gearmand->wakeup_fd, O_NONBLOCK) == -1)
  784. {
  785. return gearmand_fatal_perror(errno, "pipe2(gearmand->wakeup_fd)");
  786. }
  787. #else
  788. if (pipe(gearmand->wakeup_fd) == -1)
  789. {
  790. return gearmand_fatal_perror(errno, "pipe(gearmand->wakeup_fd)");
  791. }
  792. gearmand_error_t local_ret;
  793. if ((local_ret= gearmand_sockfd_nonblock(gearmand->wakeup_fd[0])))
  794. {
  795. return local_ret;
  796. }
  797. #endif
  798. event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
  799. EV_READ | EV_PERSIST, _wakeup_event, gearmand);
  800. if (event_base_set(gearmand->base, &(gearmand->wakeup_event)) == -1)
  801. {
  802. gearmand_perror(errno, "event_base_set");
  803. }
  804. return GEARMAND_SUCCESS;
  805. }
  806. static void _wakeup_close(gearmand_st *gearmand)
  807. {
  808. _wakeup_clear(gearmand);
  809. if (gearmand->wakeup_fd[0] >= 0)
  810. {
  811. gearmand_debug("Closing wakeup pipe");
  812. gearmand_pipe_close(gearmand->wakeup_fd[0]);
  813. gearmand->wakeup_fd[0]= -1;
  814. gearmand_pipe_close(gearmand->wakeup_fd[1]);
  815. gearmand->wakeup_fd[1]= -1;
  816. }
  817. }
  818. static gearmand_error_t _wakeup_watch(gearmand_st *gearmand)
  819. {
  820. if (gearmand->is_wakeup_event)
  821. {
  822. return GEARMAND_SUCCESS;
  823. }
  824. gearmand_debug("Adding event for wakeup pipe");
  825. if (event_add(&(gearmand->wakeup_event), NULL) < 0)
  826. {
  827. gearmand_perror(errno, "event_add");
  828. return GEARMAND_EVENT;
  829. }
  830. gearmand->is_wakeup_event= true;
  831. return GEARMAND_SUCCESS;
  832. }
  833. static void _wakeup_clear(gearmand_st *gearmand)
  834. {
  835. if (gearmand->is_wakeup_event)
  836. {
  837. gearmand_debug("Clearing event for wakeup pipe");
  838. if (event_del(&(gearmand->wakeup_event)) < 0)
  839. {
  840. gearmand_perror(errno, "We tried to event_del() an event which no longer existed");
  841. assert_msg(false, "We tried to event_del() an event which no longer existed");
  842. }
  843. gearmand->is_wakeup_event= false;
  844. }
  845. }
  846. static void _wakeup_event(int fd, short, void *arg)
  847. {
  848. gearmand_st *gearmand= (gearmand_st *)arg;
  849. while (1)
  850. {
  851. uint8_t buffer[GEARMAND_PIPE_BUFFER_SIZE];
  852. ssize_t ret= read(fd, buffer, GEARMAND_PIPE_BUFFER_SIZE);
  853. if (ret == 0)
  854. {
  855. _clear_events(gearmand);
  856. gearmand_fatal("read(EOF)");
  857. gearmand->ret= GEARMAND_PIPE_EOF;
  858. return;
  859. }
  860. else if (ret == -1)
  861. {
  862. int local_error= errno;
  863. if (local_error == EINTR)
  864. {
  865. continue;
  866. }
  867. if (local_error == EAGAIN)
  868. {
  869. break;
  870. }
  871. _clear_events(gearmand);
  872. gearmand->ret= gearmand_perror(local_error, "_wakeup_event:read");
  873. return;
  874. }
  875. for (ssize_t x= 0; x < ret; ++x)
  876. {
  877. switch ((gearmand_wakeup_t)buffer[x])
  878. {
  879. case GEARMAND_WAKEUP_PAUSE:
  880. gearmand_debug("Received PAUSE wakeup event");
  881. _clear_events(gearmand);
  882. gearmand->ret= GEARMAND_PAUSE;
  883. break;
  884. case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
  885. gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
  886. _listen_close(gearmand);
  887. for (gearmand_thread_st* thread= gearmand->thread_list;
  888. thread != NULL;
  889. thread= thread->next)
  890. {
  891. gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
  892. }
  893. gearmand->ret= GEARMAND_SHUTDOWN_GRACEFUL;
  894. break;
  895. case GEARMAND_WAKEUP_SHUTDOWN:
  896. gearmand_debug("Received SHUTDOWN wakeup event");
  897. _clear_events(gearmand);
  898. gearmand->ret= GEARMAND_SHUTDOWN;
  899. break;
  900. case GEARMAND_WAKEUP_CON:
  901. case GEARMAND_WAKEUP_RUN:
  902. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
  903. _clear_events(gearmand);
  904. gearmand->ret= GEARMAND_UNKNOWN_STATE;
  905. break;
  906. }
  907. }
  908. }
  909. }
  910. static gearmand_error_t _watch_events(gearmand_st *gearmand)
  911. {
  912. gearmand_error_t ret;
  913. if (gearmand_failed(ret= _listen_watch(gearmand)))
  914. {
  915. return ret;
  916. }
  917. if (gearmand_failed(ret= _wakeup_watch(gearmand)))
  918. {
  919. return ret;
  920. }
  921. return GEARMAND_SUCCESS;
  922. }
  923. static void _clear_events(gearmand_st *gearmand)
  924. {
  925. _listen_clear(gearmand);
  926. _wakeup_clear(gearmand);
  927. /*
  928. If we are not threaded, tell the fake thread to shutdown now to clear
  929. connections. Otherwise we will never exit the libevent loop.
  930. */
  931. if (gearmand->threads == 0 && gearmand->thread_list != NULL)
  932. {
  933. gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
  934. }
  935. }
  936. static void _close_events(gearmand_st *gearmand)
  937. {
  938. _listen_close(gearmand);
  939. _wakeup_close(gearmand);
  940. }
  941. /** @} */
  942. /*
  943. * Public Definitions
  944. */
  945. const char *gearmand_version(void)
  946. {
  947. return PACKAGE_VERSION;
  948. }
  949. const char *gearmand_bugreport(void)
  950. {
  951. return PACKAGE_BUGREPORT;
  952. }
  953. const char *gearmand_verbose_name(gearmand_verbose_t verbose)
  954. {
  955. switch (verbose)
  956. {
  957. case GEARMAND_VERBOSE_FATAL:
  958. return "FATAL";
  959. case GEARMAND_VERBOSE_ALERT:
  960. return "ALERT";
  961. case GEARMAND_VERBOSE_CRITICAL:
  962. return "CRITICAL";
  963. case GEARMAND_VERBOSE_ERROR:
  964. return "ERROR";
  965. case GEARMAND_VERBOSE_WARN:
  966. return "WARNING";
  967. case GEARMAND_VERBOSE_NOTICE:
  968. return "NOTICE";
  969. case GEARMAND_VERBOSE_INFO:
  970. return "INFO";
  971. case GEARMAND_VERBOSE_DEBUG:
  972. return "DEBUG";
  973. }
  974. assert_msg(false, "Invalid result");
  975. gearmand_fatal("Invalid gearmand_verbose_t used.");
  976. return "UNKNOWN";
  977. }
  978. bool gearmand_verbose_check(const char *name, gearmand_verbose_t& level)
  979. {
  980. bool success= true;
  981. if (strcmp("FATAL", name) == 0)
  982. {
  983. level= GEARMAND_VERBOSE_FATAL;
  984. }
  985. else if (strcmp("ALERT", name) == 0)
  986. {
  987. level= GEARMAND_VERBOSE_ALERT;
  988. }
  989. else if (strcmp("CRITICAL", name) == 0)
  990. {
  991. level= GEARMAND_VERBOSE_CRITICAL;
  992. }
  993. else if (strcmp("ERROR", name) == 0)
  994. {
  995. level= GEARMAND_VERBOSE_ERROR;
  996. }
  997. else if (strcmp("WARNING", name) == 0)
  998. {
  999. level= GEARMAND_VERBOSE_WARN;
  1000. }
  1001. else if (strcmp("NOTICE", name) == 0)
  1002. {
  1003. level= GEARMAND_VERBOSE_NOTICE;
  1004. }
  1005. else if (strcmp("INFO", name) == 0)
  1006. {
  1007. level= GEARMAND_VERBOSE_INFO;
  1008. }
  1009. else if (strcmp("DEBUG", name) == 0)
  1010. {
  1011. level= GEARMAND_VERBOSE_DEBUG;
  1012. }
  1013. else
  1014. {
  1015. success= false;
  1016. }
  1017. return success;
  1018. }
  1019. static bool gearman_server_create(gearman_server_st& server,
  1020. const uint32_t job_retries_arg,
  1021. const char *job_handle_prefix,
  1022. uint8_t worker_wakeup_arg,
  1023. bool round_robin_arg,
  1024. uint32_t hashtable_buckets)
  1025. {
  1026. server.state.queue_startup= false;
  1027. server.flags.round_robin= round_robin_arg;
  1028. server.flags.threaded= false;
  1029. server.shutdown= false;
  1030. server.shutdown_graceful= false;
  1031. server.proc_wakeup= false;
  1032. server.proc_shutdown= false;
  1033. server.job_retries= job_retries_arg;
  1034. server.worker_wakeup= worker_wakeup_arg;
  1035. server.thread_count= 0;
  1036. server.free_packet_count= 0;
  1037. server.function_count= 0;
  1038. server.job_count= 0;
  1039. server.unique_count= 0;
  1040. server.free_job_count= 0;
  1041. server.free_client_count= 0;
  1042. server.free_worker_count= 0;
  1043. server.thread_list= NULL;
  1044. server.free_packet_list= NULL;
  1045. server.free_job_list= NULL;
  1046. server.free_client_list= NULL;
  1047. server.free_worker_list= NULL;
  1048. server.queue_version= QUEUE_VERSION_NONE;
  1049. server.queue.object= NULL;
  1050. server.queue.functions= NULL;
  1051. server.function_hash= (gearman_server_function_st **) calloc(GEARMAND_DEFAULT_HASH_SIZE, sizeof(gearman_server_function_st *));
  1052. if (server.function_hash == NULL)
  1053. {
  1054. gearmand_merror("calloc", server.function_hash, GEARMAND_DEFAULT_HASH_SIZE);
  1055. return false;
  1056. }
  1057. server.hashtable_buckets= hashtable_buckets;
  1058. server.job_hash= (gearman_server_job_st **) calloc(hashtable_buckets, sizeof(gearman_server_job_st *));
  1059. if (server.job_hash == NULL)
  1060. {
  1061. gearmand_merror("calloc", server.job_hash, hashtable_buckets);
  1062. return false;
  1063. }
  1064. server.unique_hash= (gearman_server_job_st **) calloc(hashtable_buckets, sizeof(gearman_server_job_st *));
  1065. if (server.unique_hash == NULL)
  1066. {
  1067. gearmand_merror("calloc", server.unique_hash, hashtable_buckets);
  1068. return false;
  1069. }
  1070. int checked_length= -1;
  1071. if (job_handle_prefix)
  1072. {
  1073. checked_length= snprintf(server.job_handle_prefix, GEARMAND_JOB_HANDLE_SIZE, "%s", job_handle_prefix);
  1074. }
  1075. else
  1076. {
  1077. struct utsname un;
  1078. if (uname(&un) == -1)
  1079. {
  1080. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, errno, "uname(&un) failed");
  1081. gearman_server_free(server);
  1082. return false;
  1083. }
  1084. checked_length= snprintf(server.job_handle_prefix, GEARMAND_JOB_HANDLE_SIZE, "H:%s", un.nodename);
  1085. }
  1086. if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
  1087. {
  1088. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Available length %d not enough to store job handle prefix %s",
  1089. GEARMAND_JOB_HANDLE_SIZE, server.job_handle_prefix);
  1090. gearman_server_free(server);
  1091. return false;
  1092. }
  1093. server.job_handle_count= 1;
  1094. return true;
  1095. }
  1096. gearmand_error_t gearmand_set_socket_keepalive(gearmand_st *gearmand, bool keepalive_)
  1097. {
  1098. if (gearmand)
  1099. {
  1100. gearmand->socketopt().keepalive(keepalive_);
  1101. return GEARMAND_SUCCESS;
  1102. }
  1103. return GEARMAND_INVALID_ARGUMENT;
  1104. }
  1105. gearmand_error_t gearmand_set_socket_keepalive_idle(gearmand_st *gearmand, int keepalive_idle_)
  1106. {
  1107. if (gearmand)
  1108. {
  1109. gearmand->socketopt().keepalive_idle(keepalive_idle_);
  1110. return GEARMAND_SUCCESS;
  1111. }
  1112. return GEARMAND_INVALID_ARGUMENT;
  1113. }
  1114. gearmand_error_t gearmand_set_socket_keepalive_interval(gearmand_st *gearmand, int keepalive_interval_)
  1115. {
  1116. if (gearmand)
  1117. {
  1118. gearmand->socketopt().keepalive_interval(keepalive_interval_);
  1119. return GEARMAND_SUCCESS;
  1120. }
  1121. return GEARMAND_INVALID_ARGUMENT;
  1122. }
  1123. gearmand_error_t gearmand_set_socket_keepalive_count(gearmand_st *gearmand, int keepalive_count_)
  1124. {
  1125. if (gearmand)
  1126. {
  1127. gearmand->socketopt().keepalive_count(keepalive_count_);
  1128. return GEARMAND_SUCCESS;
  1129. }
  1130. return GEARMAND_INVALID_ARGUMENT;
  1131. }
  1132. #pragma GCC diagnostic pop