gearmand.cc 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Gearmand Definitions
  41. */
  42. #include "gear_config.h"
  43. #include "libgearman-server/common.h"
  44. #include <cerrno>
  45. #include <netdb.h>
  46. #include <sys/socket.h>
  47. #include <sys/types.h>
  48. #include <sys/utsname.h>
  49. #include <unistd.h>
  50. #include <set>
  51. #include <string>
  52. #include <libgearman-server/gearmand.h>
  53. #include <libgearman-server/struct/port.h>
  54. #include <libgearman-server/plugins.h>
  55. #include <libgearman-server/timer.h>
  56. using namespace gearmand;
  57. #ifndef SOCK_NONBLOCK
  58. # define SOCK_NONBLOCK 0
  59. #endif
  60. /*
  61. * Private declarations
  62. */
  63. /**
  64. * @addtogroup gearmand_private Private Gearman Daemon Functions
  65. * @ingroup gearmand
  66. * @{
  67. */
  68. static gearmand_error_t _listen_init(gearmand_st *gearmand);
  69. static void _listen_close(gearmand_st *gearmand);
  70. static gearmand_error_t _listen_watch(gearmand_st *gearmand);
  71. static void _listen_clear(gearmand_st *gearmand);
  72. static void _listen_event(int fd, short events, void *arg);
  73. static gearmand_error_t _wakeup_init(gearmand_st *gearmand);
  74. static void _wakeup_close(gearmand_st *gearmand);
  75. static gearmand_error_t _wakeup_watch(gearmand_st *gearmand);
  76. static void _wakeup_clear(gearmand_st *gearmand);
  77. static void _wakeup_event(int fd, short events, void *arg);
  78. static gearmand_error_t _watch_events(gearmand_st *gearmand);
  79. static void _clear_events(gearmand_st *gearmand);
  80. static void _close_events(gearmand_st *gearmand);
  81. static bool gearman_server_create(gearman_server_st& server,
  82. uint8_t job_retries,
  83. uint8_t worker_wakeup,
  84. bool round_robin);
  85. static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
  86. void *context, const gearmand_verbose_t verbose);
  87. static void gearman_server_free(gearman_server_st& server)
  88. {
  89. /* All threads should be cleaned up before calling this. */
  90. assert(server.thread_list == NULL);
  91. for (uint32_t key= 0; key < GEARMAND_JOB_HASH_SIZE; key++)
  92. {
  93. while (server.job_hash[key] != NULL)
  94. {
  95. gearman_server_job_free(server.job_hash[key]);
  96. }
  97. }
  98. while (server.function_list != NULL)
  99. {
  100. gearman_server_function_free(&server, server.function_list);
  101. }
  102. while (server.free_packet_list != NULL)
  103. {
  104. gearman_server_packet_st *packet= server.free_packet_list;
  105. server.free_packet_list= packet->next;
  106. delete packet;
  107. }
  108. while (server.free_job_list != NULL)
  109. {
  110. gearman_server_job_st* job= server.free_job_list;
  111. server.free_job_list= job->next;
  112. delete job;
  113. }
  114. while (server.free_client_list != NULL)
  115. {
  116. gearman_server_client_st* client= server.free_client_list;
  117. server.free_client_list= client->con_next;
  118. delete client;
  119. }
  120. while (server.free_worker_list != NULL)
  121. {
  122. gearman_server_worker_st* worker= server.free_worker_list;
  123. server.free_worker_list= worker->con_next;
  124. delete worker;
  125. }
  126. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "removing queue: %s", (server.queue_version == QUEUE_VERSION_CLASS) ? "CLASS" : "FUNCTION");
  127. if (server.queue_version == QUEUE_VERSION_CLASS)
  128. {
  129. delete server.queue.object;
  130. assert(server.queue.functions == NULL);
  131. }
  132. else if (server.queue_version == QUEUE_VERSION_FUNCTION)
  133. {
  134. delete server.queue.functions;
  135. assert(server.queue.object == NULL);
  136. }
  137. else
  138. {
  139. gearmand_debug("Unknown queue type in removal");
  140. }
  141. }
  142. /** @} */
  143. #ifndef __INTEL_COMPILER
  144. #pragma GCC diagnostic ignored "-Wold-style-cast"
  145. #endif
  146. /*
  147. * Public definitions
  148. */
  149. static gearmand_st *_global_gearmand= NULL;
  150. gearmand_st *Gearmand(void)
  151. {
  152. if (!_global_gearmand)
  153. {
  154. gearmand_error("Gearmand() was called before it was allocated");
  155. assert(! "Gearmand() was called before it was allocated");
  156. }
  157. assert(_global_gearmand);
  158. return _global_gearmand;
  159. }
  160. gearmand_st *gearmand_create(const char *host_arg,
  161. uint32_t threads_arg,
  162. int backlog_arg,
  163. uint8_t job_retries,
  164. uint8_t worker_wakeup,
  165. gearmand_log_fn *log_function, void *log_context, const gearmand_verbose_t verbose_arg,
  166. bool round_robin,
  167. bool exceptions_)
  168. {
  169. gearmand_st *gearmand;
  170. assert(_global_gearmand == NULL);
  171. if (_global_gearmand)
  172. {
  173. gearmand_error("You have called gearmand_create() twice within your application.");
  174. _exit(EXIT_FAILURE);
  175. }
  176. gearmand= new (std::nothrow) gearmand_st;
  177. if (gearmand == NULL)
  178. {
  179. gearmand_merror("new", gearmand_st, 0);
  180. return NULL;
  181. }
  182. if (gearman_server_create(gearmand->server, job_retries, worker_wakeup, round_robin) == false)
  183. {
  184. delete gearmand;
  185. return NULL;
  186. }
  187. gearmand->is_listen_event= false;
  188. gearmand->is_wakeup_event= false;
  189. gearmand->_exceptions= exceptions_;
  190. gearmand->verbose= verbose_arg;
  191. gearmand->timeout= -1;
  192. gearmand->ret= GEARMAN_SUCCESS;
  193. gearmand->backlog= backlog_arg;
  194. gearmand->threads= threads_arg;
  195. gearmand->port_count= 0;
  196. gearmand->thread_count= 0;
  197. gearmand->free_dcon_count= 0;
  198. gearmand->max_thread_free_dcon_count= 0;
  199. gearmand->wakeup_fd[0]= -1;
  200. gearmand->wakeup_fd[1]= -1;
  201. gearmand->host= host_arg;
  202. gearmand->log_fn= NULL;
  203. gearmand->log_context= NULL;
  204. gearmand->base= NULL;
  205. gearmand->port_list= NULL;
  206. gearmand->thread_list= NULL;
  207. gearmand->thread_add_next= NULL;
  208. gearmand->free_dcon_list= NULL;
  209. _global_gearmand= gearmand;
  210. gearmand_set_log_fn(gearmand, log_function, log_context, verbose_arg);
  211. return gearmand;
  212. }
  213. void gearmand_free(gearmand_st *gearmand)
  214. {
  215. if (gearmand)
  216. {
  217. _close_events(gearmand);
  218. if (gearmand->threads > 0)
  219. {
  220. gearmand_debug("Shutting down all threads");
  221. }
  222. while (gearmand->thread_list != NULL)
  223. {
  224. gearmand_thread_free(gearmand->thread_list);
  225. }
  226. while (gearmand->free_dcon_list != NULL)
  227. {
  228. gearmand_con_st *dcon;
  229. dcon= gearmand->free_dcon_list;
  230. gearmand->free_dcon_list= dcon->next;
  231. delete dcon;
  232. }
  233. if (gearmand->base != NULL)
  234. {
  235. event_base_free(gearmand->base);
  236. gearmand->base= NULL;
  237. }
  238. gearman_server_free(gearmand->server);
  239. for (uint32_t x= 0; x < gearmand->port_count; x++)
  240. {
  241. if (gearmand->port_list[x].listen_fd != NULL)
  242. {
  243. free(gearmand->port_list[x].listen_fd);
  244. }
  245. if (gearmand->port_list[x].listen_event != NULL)
  246. {
  247. free(gearmand->port_list[x].listen_event);
  248. }
  249. }
  250. if (gearmand->port_list != NULL)
  251. {
  252. free(gearmand->port_list);
  253. }
  254. gearmand_info("Shutdown complete");
  255. delete gearmand;
  256. }
  257. }
  258. static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
  259. void *context, const gearmand_verbose_t verbose)
  260. {
  261. gearmand->log_fn= function;
  262. gearmand->log_context= context;
  263. gearmand->verbose= verbose;
  264. }
  265. gearmand_error_t gearmand_port_add(gearmand_st *gearmand, const char *port,
  266. gearmand_connection_add_fn *function)
  267. {
  268. gearmand_port_st *port_list;
  269. port_list= (gearmand_port_st *)realloc(gearmand->port_list,
  270. sizeof(gearmand_port_st) * (gearmand->port_count + 1));
  271. if (port_list == NULL)
  272. {
  273. gearmand_perror(errno, "realloc");
  274. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  275. }
  276. strncpy(port_list[gearmand->port_count].port, port, NI_MAXSERV);
  277. port_list[gearmand->port_count].listen_count= 0;
  278. port_list[gearmand->port_count].add_fn= function;
  279. port_list[gearmand->port_count].listen_fd= NULL;
  280. port_list[gearmand->port_count].listen_event= NULL;
  281. gearmand->port_list= port_list;
  282. gearmand->port_count++;
  283. return GEARMAN_SUCCESS;
  284. }
  285. gearman_server_st *gearmand_server(gearmand_st *gearmand)
  286. {
  287. return &gearmand->server;
  288. }
  289. gearmand_error_t gearmand_run(gearmand_st *gearmand)
  290. {
  291. libgearman::server::Epoch epoch;
  292. /* Initialize server components. */
  293. if (gearmand->base == NULL)
  294. {
  295. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up(%lu), verbose set to %s",
  296. (unsigned long)(getpid()),
  297. gearmand_verbose_name(gearmand->verbose));
  298. if (gearmand->threads > 0)
  299. {
  300. /* Set the number of free connection structures each thread should keep
  301. around before the main thread is forced to take them. We compute this
  302. here so we don't need to on every new connection. */
  303. gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
  304. gearmand->threads) / 2);
  305. }
  306. gearmand->base= static_cast<struct event_base *>(event_base_new());
  307. if (gearmand->base == NULL)
  308. {
  309. gearmand_fatal("event_base_new(NULL)");
  310. return GEARMAN_EVENT;
  311. }
  312. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Method for libevent: %s", event_base_get_method(gearmand->base));
  313. gearmand->ret= _listen_init(gearmand);
  314. if (gearmand->ret != GEARMAN_SUCCESS)
  315. {
  316. return gearmand->ret;
  317. }
  318. gearmand->ret= _wakeup_init(gearmand);
  319. if (gearmand->ret != GEARMAN_SUCCESS)
  320. {
  321. return gearmand->ret;
  322. }
  323. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Creating %u threads", gearmand->threads);
  324. /* If we have 0 threads we still need to create a fake one for context. */
  325. uint32_t x= 0;
  326. do
  327. {
  328. gearmand->ret= gearmand_thread_create(gearmand);
  329. if (gearmand->ret != GEARMAN_SUCCESS)
  330. return gearmand->ret;
  331. x++;
  332. }
  333. while (x < gearmand->threads);
  334. gearmand_debug("replaying queue: begin");
  335. gearmand->ret= gearman_server_queue_replay(gearmand->server);
  336. if (gearmand_failed(gearmand->ret))
  337. {
  338. return gearmand_gerror("failed to reload queue", gearmand->ret);
  339. }
  340. gearmand_debug("replaying queue: end");
  341. }
  342. gearmand->ret= _watch_events(gearmand);
  343. if (gearmand_failed(gearmand->ret))
  344. {
  345. return gearmand->ret;
  346. }
  347. gearmand_debug("Entering main event loop");
  348. if (event_base_loop(gearmand->base, 0) == -1)
  349. {
  350. gearmand_fatal("event_base_loop(-1)");
  351. return GEARMAN_EVENT;
  352. }
  353. gearmand_debug("Exited main event loop");
  354. return gearmand->ret;
  355. }
  356. void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
  357. {
  358. uint8_t buffer= wakeup;
  359. /* If this fails, there is not much we can really do. This should never fail
  360. though if the main gearmand thread is still active. */
  361. int limit= 5;
  362. ssize_t written;
  363. while (--limit)
  364. {
  365. if ((written= write(gearmand->wakeup_fd[1], &buffer, 1)) != 1)
  366. {
  367. if (written < 0)
  368. {
  369. switch (errno)
  370. {
  371. case EINTR:
  372. continue;
  373. default:
  374. break;
  375. }
  376. gearmand_perror(errno, gearmand_strwakeup(wakeup));
  377. }
  378. else
  379. {
  380. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  381. "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);
  382. }
  383. }
  384. break;
  385. }
  386. }
  387. /*
  388. * Private definitions
  389. */
  390. gearmand_error_t set_socket(int& fd, struct addrinfo *addrinfo_next)
  391. {
  392. /* Call to socket() can fail for some getaddrinfo results, try another. */
  393. fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
  394. addrinfo_next->ai_protocol);
  395. if (fd == -1)
  396. {
  397. return gearmand_perror(errno, "socket()");
  398. }
  399. #ifdef IPV6_V6ONLY
  400. {
  401. int flags= 1;
  402. if (addrinfo_next->ai_family == AF_INET6)
  403. {
  404. flags= 1;
  405. if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &flags, sizeof(flags)) == -1)
  406. {
  407. return gearmand_perror(errno, "setsockopt(IPV6_V6ONLY)");
  408. }
  409. }
  410. }
  411. #endif
  412. if (0) // Add in when we have server working as a library again.
  413. {
  414. if (FD_CLOEXEC)
  415. {
  416. int flags;
  417. do
  418. {
  419. flags= fcntl(fd, F_GETFD, 0);
  420. } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
  421. if (flags != -1)
  422. {
  423. int rval;
  424. do
  425. {
  426. rval= fcntl (fd, F_SETFD, flags | FD_CLOEXEC);
  427. } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
  428. // we currently ignore the case where rval is -1
  429. }
  430. }
  431. }
  432. {
  433. int flags= 1;
  434. if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags)) == -1)
  435. {
  436. return gearmand_perror(errno, "setsockopt(SO_REUSEADDR)");
  437. }
  438. }
  439. {
  440. int flags= 1;
  441. if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)) == -1)
  442. {
  443. return gearmand_perror(errno, "setsockopt(SO_KEEPALIVE)");
  444. }
  445. }
  446. {
  447. struct linger ling= {0, 0};
  448. if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) == -1)
  449. {
  450. return gearmand_perror(errno, "setsockopt(SO_LINGER)");
  451. }
  452. }
  453. {
  454. int flags= 1;
  455. if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags)) == -1)
  456. {
  457. return gearmand_perror(errno, "setsockopt(TCP_NODELAY)");
  458. }
  459. }
  460. return GEARMAN_SUCCESS;
  461. }
  462. static const uint32_t bind_timeout= 20; // Number is not special, but look at INFO messages if you decide to change it.
  463. typedef std::pair<std::string, std::string> host_port_t;
  464. static gearmand_error_t _listen_init(gearmand_st *gearmand)
  465. {
  466. for (uint32_t x= 0; x < gearmand->port_count; x++)
  467. {
  468. struct addrinfo hints;
  469. struct addrinfo *addrinfo;
  470. gearmand_port_st *port= &gearmand->port_list[x];
  471. memset(&hints, 0, sizeof(struct addrinfo));
  472. hints.ai_flags= AI_PASSIVE;
  473. hints.ai_socktype= SOCK_STREAM;
  474. {
  475. int ret= getaddrinfo(gearmand->host, port->port, &hints, &addrinfo);
  476. if (ret != 0)
  477. {
  478. char buffer[1024];
  479. int length= snprintf(buffer, sizeof(buffer), "%s:%s", gearmand->host ? gearmand->host : "<any>", port->port);
  480. if (length <= 0 or size_t(length) >= sizeof(buffer))
  481. {
  482. buffer[0]= 0;
  483. }
  484. return gearmand_gai_error(buffer, ret);
  485. }
  486. }
  487. std::set<host_port_t> unique_hosts;
  488. for (struct addrinfo *addrinfo_next= addrinfo; addrinfo_next != NULL;
  489. addrinfo_next= addrinfo_next->ai_next)
  490. {
  491. char host[NI_MAXHOST];
  492. {
  493. int ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
  494. NI_MAXHOST, port->port, NI_MAXSERV,
  495. NI_NUMERICHOST | NI_NUMERICSERV);
  496. if (ret != 0)
  497. {
  498. gearmand_gai_error("getaddrinfo", ret);
  499. strncpy(host, "-", sizeof(host));
  500. strncpy(port->port, "-", sizeof(port->port));
  501. }
  502. }
  503. std::string host_string(host);
  504. std::string port_string(port->port);
  505. host_port_t check= std::make_pair(host_string, port_string);
  506. if (unique_hosts.find(check) != unique_hosts.end())
  507. {
  508. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Already listening on %s:%s", host, port->port);
  509. continue;
  510. }
  511. unique_hosts.insert(check);
  512. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Trying to listen on %s:%s", host, port->port);
  513. /*
  514. @note logic for this pulled from Drizzle.
  515. Sometimes the port is not released fast enough when stopping and
  516. restarting the server. This happens quite often with the test suite
  517. on busy Linux systems. Retry to bind the address at these intervals:
  518. Sleep intervals: 1, 2, 4, 6, 9, 13, 17, 22, ...
  519. Retry at second: 1, 3, 7, 13, 22, 35, 52, 74, ...
  520. Limit the sequence by drizzled_bind_timeout.
  521. */
  522. uint32_t waited;
  523. uint32_t this_wait;
  524. uint32_t retry;
  525. int ret= -1;
  526. int fd;
  527. for (waited= 0, retry= 1; ; retry++, waited+= this_wait)
  528. {
  529. {
  530. gearmand_error_t socket_ret;
  531. if (gearmand_failed(socket_ret= set_socket(fd, addrinfo_next)))
  532. {
  533. gearmand_sockfd_close(fd);
  534. return socket_ret;
  535. }
  536. }
  537. errno= 0;
  538. if ((ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen)) == 0)
  539. {
  540. // Success
  541. break;
  542. }
  543. // Protect our error
  544. ret= errno;
  545. gearmand_sockfd_close(fd);
  546. if (waited >= bind_timeout)
  547. {
  548. return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Timeout occured when calling bind() for %s:%s", host, port->port);
  549. }
  550. if (ret != EADDRINUSE)
  551. {
  552. return gearmand_perror(ret, "bind");
  553. }
  554. this_wait= retry * retry / 3 + 1;
  555. // We are in single user threads, so strerror() is fine.
  556. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u + %u >= %u",
  557. strerror(ret), host, port->port,
  558. waited, this_wait, bind_timeout);
  559. struct timespec requested= { this_wait, 0 };
  560. nanosleep(&requested, NULL);
  561. }
  562. if (listen(fd, gearmand->backlog) == -1)
  563. {
  564. gearmand_perror(errno, "listen");
  565. gearmand_sockfd_close(fd);
  566. return GEARMAN_ERRNO;
  567. }
  568. // Scoping note for eventual transformation
  569. {
  570. int* fd_list= (int *)realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
  571. if (fd_list == NULL)
  572. {
  573. gearmand_perror(errno, "realloc");
  574. gearmand_sockfd_close(fd);
  575. return GEARMAN_ERRNO;
  576. }
  577. port->listen_fd= fd_list;
  578. }
  579. port->listen_fd[port->listen_count]= fd;
  580. port->listen_count++;
  581. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Listening on %s:%s (%d)", host, port->port, fd);
  582. }
  583. freeaddrinfo(addrinfo);
  584. /* Report last socket() error if we couldn't find an address to bind. */
  585. if (port->listen_fd == NULL)
  586. {
  587. gearmand_fatal("Could not bind/listen to any addresses");
  588. return GEARMAN_ERRNO;
  589. }
  590. port->listen_event= (struct event *)malloc(sizeof(struct event) * port->listen_count);
  591. if (port->listen_event == NULL)
  592. {
  593. return gearmand_merror("malloc", struct event, port->listen_count);
  594. }
  595. for (uint32_t y= 0; y < port->listen_count; y++)
  596. {
  597. event_set(&(port->listen_event[y]), port->listen_fd[y],
  598. EV_READ | EV_PERSIST, _listen_event, port);
  599. event_base_set(gearmand->base, &(port->listen_event[y]));
  600. }
  601. }
  602. return GEARMAN_SUCCESS;
  603. }
  604. static void _listen_close(gearmand_st *gearmand)
  605. {
  606. _listen_clear(gearmand);
  607. for (uint32_t x= 0; x < gearmand->port_count; x++)
  608. {
  609. for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
  610. {
  611. if (gearmand->port_list[x].listen_fd[y] >= 0)
  612. {
  613. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Closing listening socket (%d)", gearmand->port_list[x].listen_fd[y]);
  614. gearmand_sockfd_close(gearmand->port_list[x].listen_fd[y]);
  615. gearmand->port_list[x].listen_fd[y]= -1;
  616. }
  617. }
  618. }
  619. }
  620. static gearmand_error_t _listen_watch(gearmand_st *gearmand)
  621. {
  622. if (gearmand->is_listen_event)
  623. {
  624. return GEARMAN_SUCCESS;
  625. }
  626. for (uint32_t x= 0; x < gearmand->port_count; x++)
  627. {
  628. for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
  629. {
  630. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Adding event for listening socket (%d)",
  631. gearmand->port_list[x].listen_fd[y]);
  632. if (event_add(&(gearmand->port_list[x].listen_event[y]), NULL) < 0)
  633. {
  634. gearmand_perror(errno, "event_add");
  635. return GEARMAN_EVENT;
  636. }
  637. }
  638. }
  639. gearmand->is_listen_event= true;
  640. return GEARMAN_SUCCESS;
  641. }
  642. static void _listen_clear(gearmand_st *gearmand)
  643. {
  644. if (gearmand->is_listen_event)
  645. {
  646. for (uint32_t x= 0; x < gearmand->port_count; x++)
  647. {
  648. for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
  649. {
  650. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
  651. "Clearing event for listening socket (%d)",
  652. gearmand->port_list[x].listen_fd[y]);
  653. if (event_del(&(gearmand->port_list[x].listen_event[y])) < 0)
  654. {
  655. gearmand_perror(errno, "We tried to event_del() an event which no longer existed");
  656. assert(! "We tried to event_del() an event which no longer existed");
  657. }
  658. }
  659. }
  660. gearmand->is_listen_event= false;
  661. }
  662. }
  663. static void _listen_event(int event_fd, short events __attribute__ ((unused)), void *arg)
  664. {
  665. gearmand_port_st *port= (gearmand_port_st *)arg;
  666. struct sockaddr sa;
  667. socklen_t sa_len= sizeof(sa);
  668. int fd= -1;
  669. #if defined(HAVE_ACCEPT4) && HAVE_ACCEPT4
  670. fd= accept4(event_fd, &sa, &sa_len, 0); // SOCK_NONBLOCK);
  671. #endif
  672. if (fd == -1)
  673. {
  674. fd= accept(event_fd, &sa, &sa_len);
  675. }
  676. if (fd == -1)
  677. {
  678. int local_error= errno;
  679. switch (local_error)
  680. {
  681. case EINTR:
  682. return;
  683. case ECONNABORTED:
  684. case EMFILE:
  685. gearmand_perror(local_error, "accept");
  686. return;
  687. default:
  688. break;
  689. }
  690. _clear_events(Gearmand());
  691. Gearmand()->ret= gearmand_perror(local_error, "accept");
  692. return;
  693. }
  694. /*
  695. Since this is numeric, it should never fail. Even if it did we don't want to really error from it.
  696. */
  697. char host[NI_MAXHOST];
  698. char port_str[NI_MAXSERV];
  699. int error= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
  700. NI_NUMERICHOST | NI_NUMERICSERV);
  701. if (error != 0)
  702. {
  703. gearmand_gai_error("getnameinfo", error);
  704. strncpy(host, "-", sizeof(host));
  705. strncpy(port_str, "-", sizeof(port_str));
  706. }
  707. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Accepted connection from %s:%s", host, port_str);
  708. gearmand_error_t ret;
  709. ret= gearmand_con_create(Gearmand(), fd, host, port_str, port->add_fn);
  710. if (ret == GEARMAN_MEMORY_ALLOCATION_FAILURE)
  711. {
  712. gearmand_sockfd_close(fd);
  713. return;
  714. }
  715. else if (ret != GEARMAN_SUCCESS)
  716. {
  717. Gearmand()->ret= ret;
  718. _clear_events(Gearmand());
  719. }
  720. }
  721. static gearmand_error_t _wakeup_init(gearmand_st *gearmand)
  722. {
  723. gearmand_debug("Creating wakeup pipe");
  724. if (pipe(gearmand->wakeup_fd) < 0)
  725. {
  726. gearmand_perror(errno, "pipe");
  727. return GEARMAN_ERRNO;
  728. }
  729. int returned_flags;
  730. if ((returned_flags= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0)) < 0)
  731. {
  732. gearmand_perror(errno, "fcntl:F_GETFL");
  733. return GEARMAN_ERRNO;
  734. }
  735. if (fcntl(gearmand->wakeup_fd[0], F_SETFL, returned_flags | O_NONBLOCK) < 0)
  736. {
  737. gearmand_perror(errno, "F_SETFL");
  738. return GEARMAN_ERRNO;
  739. }
  740. event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
  741. EV_READ | EV_PERSIST, _wakeup_event, gearmand);
  742. event_base_set(gearmand->base, &(gearmand->wakeup_event));
  743. return GEARMAN_SUCCESS;
  744. }
  745. static void _wakeup_close(gearmand_st *gearmand)
  746. {
  747. _wakeup_clear(gearmand);
  748. if (gearmand->wakeup_fd[0] >= 0)
  749. {
  750. gearmand_debug("Closing wakeup pipe");
  751. gearmand_pipe_close(gearmand->wakeup_fd[0]);
  752. gearmand->wakeup_fd[0]= -1;
  753. gearmand_pipe_close(gearmand->wakeup_fd[1]);
  754. gearmand->wakeup_fd[1]= -1;
  755. }
  756. }
  757. static gearmand_error_t _wakeup_watch(gearmand_st *gearmand)
  758. {
  759. if (gearmand->is_wakeup_event)
  760. {
  761. return GEARMAN_SUCCESS;
  762. }
  763. gearmand_debug("Adding event for wakeup pipe");
  764. if (event_add(&(gearmand->wakeup_event), NULL) < 0)
  765. {
  766. gearmand_perror(errno, "event_add");
  767. return GEARMAN_EVENT;
  768. }
  769. gearmand->is_wakeup_event= true;
  770. return GEARMAN_SUCCESS;
  771. }
  772. static void _wakeup_clear(gearmand_st *gearmand)
  773. {
  774. if (gearmand->is_wakeup_event)
  775. {
  776. gearmand_debug("Clearing event for wakeup pipe");
  777. if (event_del(&(gearmand->wakeup_event)) < 0)
  778. {
  779. gearmand_perror(errno, "We tried to event_del() an event which no longer existed");
  780. assert(! "We tried to event_del() an event which no longer existed");
  781. }
  782. gearmand->is_wakeup_event= false;
  783. }
  784. }
  785. static void _wakeup_event(int fd, short, void *arg)
  786. {
  787. gearmand_st *gearmand= (gearmand_st *)arg;
  788. while (1)
  789. {
  790. uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
  791. ssize_t ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
  792. if (ret == 0)
  793. {
  794. _clear_events(gearmand);
  795. gearmand_fatal("read(EOF)");
  796. gearmand->ret= GEARMAN_PIPE_EOF;
  797. return;
  798. }
  799. else if (ret == -1)
  800. {
  801. int local_error= errno;
  802. if (local_error == EINTR)
  803. {
  804. continue;
  805. }
  806. if (local_error == EAGAIN)
  807. {
  808. break;
  809. }
  810. _clear_events(gearmand);
  811. gearmand_perror(local_error, "_wakeup_event:read");
  812. gearmand->ret= GEARMAN_ERRNO;
  813. return;
  814. }
  815. for (ssize_t x= 0; x < ret; x++)
  816. {
  817. switch ((gearmand_wakeup_t)buffer[x])
  818. {
  819. case GEARMAND_WAKEUP_PAUSE:
  820. gearmand_debug("Received PAUSE wakeup event");
  821. _clear_events(gearmand);
  822. gearmand->ret= GEARMAN_PAUSE;
  823. break;
  824. case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
  825. gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
  826. _listen_close(gearmand);
  827. for (gearmand_thread_st* thread= gearmand->thread_list;
  828. thread != NULL;
  829. thread= thread->next)
  830. {
  831. gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
  832. }
  833. gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
  834. break;
  835. case GEARMAND_WAKEUP_SHUTDOWN:
  836. gearmand_debug("Received SHUTDOWN wakeup event");
  837. _clear_events(gearmand);
  838. gearmand->ret= GEARMAN_SHUTDOWN;
  839. break;
  840. case GEARMAND_WAKEUP_CON:
  841. case GEARMAND_WAKEUP_RUN:
  842. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
  843. _clear_events(gearmand);
  844. gearmand->ret= GEARMAN_UNKNOWN_STATE;
  845. break;
  846. }
  847. }
  848. }
  849. }
  850. static gearmand_error_t _watch_events(gearmand_st *gearmand)
  851. {
  852. gearmand_error_t ret= _listen_watch(gearmand);
  853. if (ret != GEARMAN_SUCCESS)
  854. {
  855. return ret;
  856. }
  857. ret= _wakeup_watch(gearmand);
  858. if (ret != GEARMAN_SUCCESS)
  859. {
  860. return ret;
  861. }
  862. return GEARMAN_SUCCESS;
  863. }
  864. static void _clear_events(gearmand_st *gearmand)
  865. {
  866. _listen_clear(gearmand);
  867. _wakeup_clear(gearmand);
  868. /*
  869. If we are not threaded, tell the fake thread to shutdown now to clear
  870. connections. Otherwise we will never exit the libevent loop.
  871. */
  872. if (gearmand->threads == 0 && gearmand->thread_list != NULL)
  873. {
  874. gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
  875. }
  876. }
  877. static void _close_events(gearmand_st *gearmand)
  878. {
  879. _listen_close(gearmand);
  880. _wakeup_close(gearmand);
  881. }
  882. /** @} */
  883. /*
  884. * Public Definitions
  885. */
  886. const char *gearmand_version(void)
  887. {
  888. return PACKAGE_VERSION;
  889. }
  890. const char *gearmand_bugreport(void)
  891. {
  892. return PACKAGE_BUGREPORT;
  893. }
  894. const char *gearmand_verbose_name(gearmand_verbose_t verbose)
  895. {
  896. switch (verbose)
  897. {
  898. case GEARMAND_VERBOSE_FATAL:
  899. return "FATAL";
  900. case GEARMAND_VERBOSE_ALERT:
  901. return "ALERT";
  902. case GEARMAND_VERBOSE_CRITICAL:
  903. return "CRITICAL";
  904. case GEARMAND_VERBOSE_ERROR:
  905. return "ERROR";
  906. case GEARMAND_VERBOSE_WARN:
  907. return "WARNING";
  908. case GEARMAND_VERBOSE_NOTICE:
  909. return "NOTICE";
  910. case GEARMAND_VERBOSE_INFO:
  911. return "INFO";
  912. case GEARMAND_VERBOSE_DEBUG:
  913. return "DEBUG";
  914. default:
  915. break;
  916. }
  917. return "UNKNOWN";
  918. }
  919. bool gearmand_verbose_check(const char *name, gearmand_verbose_t& level)
  920. {
  921. bool success= true;
  922. if (strcmp("FATAL", name) == 0)
  923. {
  924. level= GEARMAND_VERBOSE_FATAL;
  925. }
  926. else if (strcmp("ALERT", name) == 0)
  927. {
  928. level= GEARMAND_VERBOSE_ALERT;
  929. }
  930. else if (strcmp("CRITICAL", name) == 0)
  931. {
  932. level= GEARMAND_VERBOSE_CRITICAL;
  933. }
  934. else if (strcmp("ERROR", name) == 0)
  935. {
  936. level= GEARMAND_VERBOSE_ERROR;
  937. }
  938. else if (strcmp("WARNING", name) == 0)
  939. {
  940. level= GEARMAND_VERBOSE_WARN;
  941. }
  942. else if (strcmp("NOTICE", name) == 0)
  943. {
  944. level= GEARMAND_VERBOSE_NOTICE;
  945. }
  946. else if (strcmp("INFO", name) == 0)
  947. {
  948. level= GEARMAND_VERBOSE_INFO;
  949. }
  950. else if (strcmp("DEBUG", name) == 0)
  951. {
  952. level= GEARMAND_VERBOSE_DEBUG;
  953. }
  954. else
  955. {
  956. success= false;
  957. }
  958. return success;
  959. }
  960. static bool gearman_server_create(gearman_server_st& server,
  961. uint8_t job_retries_arg,
  962. uint8_t worker_wakeup_arg,
  963. bool round_robin_arg)
  964. {
  965. struct utsname un;
  966. server.state.queue_startup= false;
  967. server.flags.round_robin= round_robin_arg;
  968. server.flags.threaded= false;
  969. server.shutdown= false;
  970. server.shutdown_graceful= false;
  971. server.proc_wakeup= false;
  972. server.proc_shutdown= false;
  973. server.job_retries= job_retries_arg;
  974. server.worker_wakeup= worker_wakeup_arg;
  975. server.thread_count= 0;
  976. server.free_packet_count= 0;
  977. server.function_count= 0;
  978. server.job_count= 0;
  979. server.unique_count= 0;
  980. server.free_job_count= 0;
  981. server.free_client_count= 0;
  982. server.free_worker_count= 0;
  983. server.thread_list= NULL;
  984. server.free_packet_list= NULL;
  985. server.function_list= NULL;
  986. server.free_job_list= NULL;
  987. server.free_client_list= NULL;
  988. server.free_worker_list= NULL;
  989. server.queue_version= QUEUE_VERSION_NONE;
  990. server.queue.object= NULL;
  991. server.queue.functions= NULL;
  992. memset(server.job_hash, 0,
  993. sizeof(gearman_server_job_st *) * GEARMAND_JOB_HASH_SIZE);
  994. memset(server.unique_hash, 0,
  995. sizeof(gearman_server_job_st *) * GEARMAND_JOB_HASH_SIZE);
  996. if (uname(&un) == -1)
  997. {
  998. gearman_server_free(server);
  999. return false;
  1000. }
  1001. int checked_length= snprintf(server.job_handle_prefix, GEARMAND_JOB_HANDLE_SIZE, "H:%s", un.nodename);
  1002. if (checked_length >= GEARMAND_JOB_HANDLE_SIZE or checked_length <= 0)
  1003. {
  1004. gearman_server_free(server);
  1005. return false;
  1006. }
  1007. server.job_handle_count= 1;
  1008. return true;
  1009. }