gearmand.cc 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Gearmand Definitions
  11. */
  12. #include <libgearman-server/common.h>
  13. #include <errno.h>
  14. #include <netdb.h>
  15. #include <sys/socket.h>
  16. #include <sys/types.h>
  17. #include <sys/utsname.h>
  18. #include <set>
  19. #include <string>
  20. #include <libgearman-server/gearmand.h>
  21. #include <libgearman-server/struct/port.h>
  22. #include <libgearman-server/plugins.h>
  23. using namespace gearmand;
  24. /*
  25. * Private declarations
  26. */
  27. /**
  28. * @addtogroup gearmand_private Private Gearman Daemon Functions
  29. * @ingroup gearmand
  30. * @{
  31. */
  32. static gearmand_error_t _listen_init(gearmand_st *gearmand);
  33. static void _listen_close(gearmand_st *gearmand);
  34. static gearmand_error_t _listen_watch(gearmand_st *gearmand);
  35. static void _listen_clear(gearmand_st *gearmand);
  36. static void _listen_event(int fd, short events, void *arg);
  37. static gearmand_error_t _wakeup_init(gearmand_st *gearmand);
  38. static void _wakeup_close(gearmand_st *gearmand);
  39. static gearmand_error_t _wakeup_watch(gearmand_st *gearmand);
  40. static void _wakeup_clear(gearmand_st *gearmand);
  41. static void _wakeup_event(int fd, short events, void *arg);
  42. static gearmand_error_t _watch_events(gearmand_st *gearmand);
  43. static void _clear_events(gearmand_st *gearmand);
  44. static void _close_events(gearmand_st *gearmand);
  45. static bool gearman_server_create(gearman_server_st *server,
  46. uint8_t job_retries,
  47. uint8_t worker_wakeup,
  48. bool round_robin);
  49. static void gearman_server_free(gearman_server_st *server);
  50. static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
  51. void *context, const gearmand_verbose_t verbose);
  52. /** @} */
  53. #ifndef __INTEL_COMPILER
  54. #pragma GCC diagnostic ignored "-Wold-style-cast"
  55. #endif
  56. /*
  57. * Public definitions
  58. */
  59. static gearmand_st *_global_gearmand= NULL;
  60. gearmand_st *Gearmand(void)
  61. {
  62. if (!_global_gearmand)
  63. {
  64. gearmand_error("Gearmand() was called before it was allocated");
  65. assert(! "Gearmand() was called before it was allocated");
  66. }
  67. assert(_global_gearmand);
  68. return _global_gearmand;
  69. }
  70. gearmand_st *gearmand_create(const char *host_arg,
  71. const char *port,
  72. uint32_t threads_arg,
  73. int backlog_arg,
  74. uint8_t job_retries,
  75. uint8_t worker_wakeup,
  76. gearmand_log_fn *log_function, void *log_context, const gearmand_verbose_t verbose_arg,
  77. bool round_robin)
  78. {
  79. gearmand_st *gearmand;
  80. assert(_global_gearmand == NULL);
  81. if (_global_gearmand)
  82. {
  83. gearmand_error("You have called gearmand_create() twice within your application.");
  84. _exit(EXIT_FAILURE);
  85. }
  86. gearmand= (gearmand_st *)malloc(sizeof(gearmand_st));
  87. if (gearmand == NULL)
  88. {
  89. gearmand_merror("malloc", gearmand_st, 0);
  90. return NULL;
  91. }
  92. if (! gearman_server_create(&(gearmand->server), job_retries, worker_wakeup, round_robin))
  93. {
  94. gearmand_debug("free");
  95. free(gearmand);
  96. return NULL;
  97. }
  98. gearmand->is_listen_event= false;
  99. gearmand->is_wakeup_event= false;
  100. gearmand->verbose= verbose_arg;
  101. gearmand->timeout= -1;
  102. gearmand->ret= GEARMAN_SUCCESS;
  103. gearmand->backlog= backlog_arg;
  104. gearmand->threads= threads_arg;
  105. gearmand->port_count= 0;
  106. gearmand->thread_count= 0;
  107. gearmand->free_dcon_count= 0;
  108. gearmand->max_thread_free_dcon_count= 0;
  109. gearmand->wakeup_fd[0]= -1;
  110. gearmand->wakeup_fd[1]= -1;
  111. gearmand->host= host_arg;
  112. gearmand->log_fn= NULL;
  113. gearmand->log_context= NULL;
  114. gearmand->base= NULL;
  115. gearmand->port_list= NULL;
  116. gearmand->thread_list= NULL;
  117. gearmand->thread_add_next= NULL;
  118. gearmand->free_dcon_list= NULL;
  119. gearmand_error_t rc;
  120. if (port && port[0] == 0)
  121. {
  122. struct servent *gearman_servent= getservbyname(GEARMAN_DEFAULT_TCP_SERVICE, NULL);
  123. if (gearman_servent && gearman_servent->s_name)
  124. {
  125. rc= gearmand_port_add(gearmand, gearman_servent->s_name, NULL);
  126. }
  127. else
  128. {
  129. rc= gearmand_port_add(gearmand, GEARMAN_DEFAULT_TCP_PORT_STRING, NULL);
  130. }
  131. }
  132. else
  133. {
  134. rc= gearmand_port_add(gearmand, port, NULL);
  135. }
  136. if (rc != GEARMAN_SUCCESS)
  137. {
  138. gearmand_free(gearmand);
  139. return NULL;
  140. }
  141. _global_gearmand= gearmand;
  142. gearmand_set_log_fn(gearmand, log_function, log_context, verbose_arg);
  143. return gearmand;
  144. }
  145. void gearmand_free(gearmand_st *gearmand)
  146. {
  147. _close_events(gearmand);
  148. if (gearmand->threads > 0)
  149. {
  150. gearmand_debug("Shutting down all threads");
  151. }
  152. while (gearmand->thread_list != NULL)
  153. {
  154. gearmand_thread_free(gearmand->thread_list);
  155. }
  156. while (gearmand->free_dcon_list != NULL)
  157. {
  158. gearmand_con_st *dcon;
  159. dcon= gearmand->free_dcon_list;
  160. gearmand->free_dcon_list= dcon->next;
  161. gearmand_debug("free");
  162. free(dcon);
  163. }
  164. if (gearmand->base != NULL)
  165. {
  166. event_base_free(gearmand->base);
  167. }
  168. gearman_server_free(&(gearmand->server));
  169. for (uint32_t x= 0; x < gearmand->port_count; x++)
  170. {
  171. if (gearmand->port_list[x].listen_fd != NULL)
  172. {
  173. gearmand_debug("free");
  174. free(gearmand->port_list[x].listen_fd);
  175. }
  176. if (gearmand->port_list[x].listen_event != NULL)
  177. {
  178. gearmand_debug("free");
  179. free(gearmand->port_list[x].listen_event);
  180. }
  181. }
  182. if (gearmand->port_list != NULL)
  183. {
  184. gearmand_debug("free");
  185. free(gearmand->port_list);
  186. }
  187. gearmand_info("Shutdown complete");
  188. gearmand_debug("free");
  189. free(gearmand);
  190. }
  191. static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
  192. void *context, const gearmand_verbose_t verbose)
  193. {
  194. gearmand->log_fn= function;
  195. gearmand->log_context= context;
  196. gearmand->verbose= verbose;
  197. }
  198. gearmand_error_t gearmand_port_add(gearmand_st *gearmand, const char *port,
  199. gearmand_connection_add_fn *function)
  200. {
  201. gearmand_port_st *port_list;
  202. port_list= (gearmand_port_st *)realloc(gearmand->port_list,
  203. sizeof(gearmand_port_st) * (gearmand->port_count + 1));
  204. if (port_list == NULL)
  205. {
  206. gearmand_perror("realloc");
  207. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  208. }
  209. strncpy(port_list[gearmand->port_count].port, port, NI_MAXSERV);
  210. port_list[gearmand->port_count].listen_count= 0;
  211. port_list[gearmand->port_count].add_fn= function;
  212. port_list[gearmand->port_count].listen_fd= NULL;
  213. port_list[gearmand->port_count].listen_event= NULL;
  214. gearmand->port_list= port_list;
  215. gearmand->port_count++;
  216. return GEARMAN_SUCCESS;
  217. }
  218. gearman_server_st *gearmand_server(gearmand_st *gearmand)
  219. {
  220. return &gearmand->server;
  221. }
  222. gearmand_error_t gearmand_run(gearmand_st *gearmand)
  223. {
  224. uint32_t x;
  225. /* Initialize server components. */
  226. if (gearmand->base == NULL)
  227. {
  228. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up, verbose set to %s",
  229. gearmand_verbose_name(gearmand->verbose));
  230. if (gearmand->threads > 0)
  231. {
  232. /* Set the number of free connection structures each thread should keep
  233. around before the main thread is forced to take them. We compute this
  234. here so we don't need to on every new connection. */
  235. gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
  236. gearmand->threads) / 2);
  237. }
  238. gearmand_debug("Initializing libevent for main thread");
  239. gearmand->base= static_cast<struct event_base *>(event_base_new());
  240. if (gearmand->base == NULL)
  241. {
  242. gearmand_fatal("event_base_new(NULL)");
  243. return GEARMAN_EVENT;
  244. }
  245. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Method for libevent: %s", event_base_get_method(gearmand->base));
  246. gearmand->ret= _listen_init(gearmand);
  247. if (gearmand->ret != GEARMAN_SUCCESS)
  248. return gearmand->ret;
  249. gearmand->ret= _wakeup_init(gearmand);
  250. if (gearmand->ret != GEARMAN_SUCCESS)
  251. return gearmand->ret;
  252. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Creating %u threads", gearmand->threads);
  253. /* If we have 0 threads we still need to create a fake one for context. */
  254. x= 0;
  255. do
  256. {
  257. gearmand->ret= gearmand_thread_create(gearmand);
  258. if (gearmand->ret != GEARMAN_SUCCESS)
  259. return gearmand->ret;
  260. x++;
  261. }
  262. while (x < gearmand->threads);
  263. gearmand->ret= gearman_server_queue_replay(&(gearmand->server));
  264. if (gearmand->ret != GEARMAN_SUCCESS)
  265. return gearmand->ret;
  266. }
  267. gearmand->ret= _watch_events(gearmand);
  268. if (gearmand->ret != GEARMAN_SUCCESS)
  269. return gearmand->ret;
  270. gearmand_debug("Entering main event loop");
  271. if (event_base_loop(gearmand->base, 0) == -1)
  272. {
  273. gearmand_fatal("event_base_loop(-1)");
  274. return GEARMAN_EVENT;
  275. }
  276. gearmand_debug("Exited main event loop");
  277. return gearmand->ret;
  278. }
  279. void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
  280. {
  281. uint8_t buffer= wakeup;
  282. /* If this fails, there is not much we can really do. This should never fail
  283. though if the main gearmand thread is still active. */
  284. ssize_t written;
  285. if ((written= write(gearmand->wakeup_fd[1], &buffer, 1)) != 1)
  286. {
  287. if (written < 0)
  288. {
  289. gearmand_perror(gearmand_strwakeup(wakeup));
  290. }
  291. else
  292. {
  293. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  294. "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);
  295. }
  296. }
  297. }
  298. /*
  299. * Private definitions
  300. */
  301. static const uint32_t bind_timeout= 6; // Number is not special, but look at INFO messages if you decide to change it.
  302. typedef std::pair<std::string, std::string> host_port_t;
  303. static gearmand_error_t _listen_init(gearmand_st *gearmand)
  304. {
  305. for (uint32_t x= 0; x < gearmand->port_count; x++)
  306. {
  307. struct linger ling= {0, 0};
  308. struct gearmand_port_st *port;
  309. struct addrinfo hints;
  310. struct addrinfo *addrinfo;
  311. port= &gearmand->port_list[x];
  312. memset(&hints, 0, sizeof(struct addrinfo));
  313. hints.ai_flags= AI_PASSIVE;
  314. hints.ai_socktype= SOCK_STREAM;
  315. int ret= getaddrinfo(gearmand->host, port->port, &hints, &addrinfo);
  316. if (ret != 0)
  317. {
  318. char buffer[1024];
  319. snprintf(buffer, sizeof(buffer), "%s:%s", gearmand->host ? gearmand->host : "<any>", port->port);
  320. gearmand_gai_error(buffer, ret);
  321. return GEARMAN_ERRNO;
  322. }
  323. std::set<host_port_t> unique_hosts;
  324. for (struct addrinfo *addrinfo_next= addrinfo; addrinfo_next != NULL;
  325. addrinfo_next= addrinfo_next->ai_next)
  326. {
  327. int fd;
  328. char host[NI_MAXHOST];
  329. ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
  330. NI_MAXHOST, port->port, NI_MAXSERV,
  331. NI_NUMERICHOST | NI_NUMERICSERV);
  332. if (ret != 0)
  333. {
  334. gearmand_gai_error("getaddrinfo", ret);
  335. strcpy(host, "-");
  336. strcpy(port->port, "-");
  337. }
  338. std::string host_string(host);
  339. std::string port_string(port->port);
  340. host_port_t check= std::make_pair(host_string, port_string);
  341. if (unique_hosts.find(check) != unique_hosts.end())
  342. {
  343. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Already listening on %s:%s", host, port->port);
  344. continue;
  345. }
  346. unique_hosts.insert(check);
  347. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Trying to listen on %s:%s", host, port->port);
  348. /* Call to socket() can fail for some getaddrinfo results, try another. */
  349. fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
  350. addrinfo_next->ai_protocol);
  351. if (fd == -1)
  352. {
  353. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Failed to listen on %s:%s", host, port->port);
  354. continue;
  355. }
  356. int flags= 1;
  357. #ifdef IPV6_V6ONLY
  358. if (addrinfo_next->ai_family == AF_INET6)
  359. {
  360. flags= 1;
  361. ret= setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &flags, sizeof(flags));
  362. if (ret != 0)
  363. {
  364. gearmand_perror("setsockopt(IPV6_V6ONLY)");
  365. return GEARMAN_ERRNO;
  366. }
  367. }
  368. #endif
  369. ret= fcntl(fd, F_SETFD, FD_CLOEXEC);
  370. if (ret != 0 || !(fcntl(fd, F_GETFD, 0) & FD_CLOEXEC))
  371. {
  372. gearmand_perror("fcntl(FD_CLOEXEC)");
  373. return GEARMAN_ERRNO;
  374. }
  375. ret= setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
  376. if (ret != 0)
  377. {
  378. gearmand_perror("setsockopt(SO_REUSEADDR)");
  379. return GEARMAN_ERRNO;
  380. }
  381. ret= setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
  382. if (ret != 0)
  383. {
  384. gearmand_perror("setsockopt(SO_KEEPALIVE)");
  385. return GEARMAN_ERRNO;
  386. }
  387. ret= setsockopt(fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
  388. if (ret != 0)
  389. {
  390. gearmand_perror("setsockopt(SO_LINGER)");
  391. return GEARMAN_ERRNO;
  392. }
  393. ret= setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
  394. if (ret != 0)
  395. {
  396. gearmand_perror("setsockopt(TCP_NODELAY)");
  397. return GEARMAN_ERRNO;
  398. }
  399. /*
  400. @note logic for this pulled from Drizzle.
  401. Sometimes the port is not released fast enough when stopping and
  402. restarting the server. This happens quite often with the test suite
  403. on busy Linux systems. Retry to bind the address at these intervals:
  404. Sleep intervals: 1, 2, 4, 6, 9, 13, 17, 22, ...
  405. Retry at second: 1, 3, 7, 13, 22, 35, 52, 74, ...
  406. Limit the sequence by drizzled_bind_timeout.
  407. */
  408. uint32_t waited;
  409. uint32_t this_wait;
  410. uint32_t retry;
  411. for (waited= 0, retry= 1; ; retry++, waited+= this_wait)
  412. {
  413. if (((ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen)) == 0) or
  414. (errno != EADDRINUSE) || (waited >= bind_timeout))
  415. {
  416. break;
  417. }
  418. // We are in single user threads, so strerror() is fine.
  419. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u >= %u", strerror(ret), host, port->port,
  420. waited, bind_timeout);
  421. this_wait= retry * retry / 3 + 1;
  422. sleep(this_wait);
  423. }
  424. if (ret < 0)
  425. {
  426. gearmand_perror("bind");
  427. gearmand_sockfd_close(fd);
  428. if (errno == EADDRINUSE)
  429. {
  430. if (not port->listen_fd)
  431. {
  432. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Address already in use %s:%s", host, port->port);
  433. }
  434. continue;
  435. }
  436. gearmand_perror("bind");
  437. return GEARMAN_ERRNO;
  438. }
  439. if (listen(fd, gearmand->backlog) == -1)
  440. {
  441. gearmand_perror("listen");
  442. gearmand_sockfd_close(fd);
  443. return GEARMAN_ERRNO;
  444. }
  445. // Scoping note for eventual transformation
  446. {
  447. int *fd_list;
  448. fd_list= (int *)realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
  449. if (fd_list == NULL)
  450. {
  451. gearmand_perror("realloc");
  452. gearmand_sockfd_close(fd);
  453. return GEARMAN_ERRNO;
  454. }
  455. port->listen_fd= fd_list;
  456. }
  457. port->listen_fd[port->listen_count]= fd;
  458. port->listen_count++;
  459. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Listening on %s:%s (%d)", host, port->port, fd);
  460. }
  461. freeaddrinfo(addrinfo);
  462. /* Report last socket() error if we couldn't find an address to bind. */
  463. if (port->listen_fd == NULL)
  464. {
  465. gearmand_fatal("Could not bind/listen to any addresses");
  466. return GEARMAN_ERRNO;
  467. }
  468. port->listen_event= (struct event *)malloc(sizeof(struct event) * port->listen_count);
  469. if (port->listen_event == NULL)
  470. {
  471. return gearmand_merror("malloc", struct event, port->listen_count);
  472. }
  473. for (uint32_t y= 0; y < port->listen_count; y++)
  474. {
  475. event_set(&(port->listen_event[y]), port->listen_fd[y],
  476. EV_READ | EV_PERSIST, _listen_event, port);
  477. event_base_set(gearmand->base, &(port->listen_event[y]));
  478. }
  479. }
  480. return GEARMAN_SUCCESS;
  481. }
  482. static void _listen_close(gearmand_st *gearmand)
  483. {
  484. _listen_clear(gearmand);
  485. for (uint32_t x= 0; x < gearmand->port_count; x++)
  486. {
  487. for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
  488. {
  489. if (gearmand->port_list[x].listen_fd[y] >= 0)
  490. {
  491. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Closing listening socket (%d)", gearmand->port_list[x].listen_fd[y]);
  492. gearmand_sockfd_close(gearmand->port_list[x].listen_fd[y]);
  493. gearmand->port_list[x].listen_fd[y]= -1;
  494. }
  495. }
  496. }
  497. }
  498. static gearmand_error_t _listen_watch(gearmand_st *gearmand)
  499. {
  500. if (gearmand->is_listen_event)
  501. return GEARMAN_SUCCESS;
  502. for (uint32_t x= 0; x < gearmand->port_count; x++)
  503. {
  504. for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
  505. {
  506. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Adding event for listening socket (%d)",
  507. gearmand->port_list[x].listen_fd[y]);
  508. if (event_add(&(gearmand->port_list[x].listen_event[y]), NULL) < 0)
  509. {
  510. gearmand_perror("event_add");
  511. return GEARMAN_EVENT;
  512. }
  513. }
  514. }
  515. gearmand->is_listen_event= true;
  516. return GEARMAN_SUCCESS;
  517. }
  518. static void _listen_clear(gearmand_st *gearmand)
  519. {
  520. if (! (gearmand->is_listen_event))
  521. return;
  522. for (uint32_t x= 0; x < gearmand->port_count; x++)
  523. {
  524. for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
  525. {
  526. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
  527. "Clearing event for listening socket (%d)",
  528. gearmand->port_list[x].listen_fd[y]);
  529. if (event_del(&(gearmand->port_list[x].listen_event[y])) < 0)
  530. {
  531. gearmand_perror("We tried to event_del() an event which no longer existed");
  532. assert(! "We tried to event_del() an event which no longer existed");
  533. }
  534. }
  535. }
  536. gearmand->is_listen_event= false;
  537. }
  538. static void _listen_event(int fd, short events __attribute__ ((unused)), void *arg)
  539. {
  540. gearmand_port_st *port= (gearmand_port_st *)arg;
  541. struct sockaddr sa;
  542. socklen_t sa_len;
  543. char host[NI_MAXHOST];
  544. char port_str[NI_MAXSERV];
  545. sa_len= sizeof(sa);
  546. fd= accept(fd, &sa, &sa_len);
  547. if (fd == -1)
  548. {
  549. if (errno == EINTR)
  550. {
  551. return;
  552. }
  553. else if (errno == EMFILE)
  554. {
  555. gearmand_perror("accept");
  556. return;
  557. }
  558. _clear_events(Gearmand());
  559. gearmand_perror("accept");
  560. Gearmand()->ret= GEARMAN_ERRNO;
  561. return;
  562. }
  563. /*
  564. Since this is numeric, it should never fail. Even if it did we don't want to really error from it.
  565. */
  566. int error;
  567. error= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
  568. NI_NUMERICHOST | NI_NUMERICSERV);
  569. if (error != 0)
  570. {
  571. gearmand_gai_error("getnameinfo", error);
  572. strcpy(host, "-");
  573. strcpy(port_str, "-");
  574. }
  575. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Accepted connection from %s:%s", host, port_str);
  576. gearmand_error_t ret;
  577. ret= gearmand_con_create(Gearmand(), fd, host, port_str, port->add_fn);
  578. if (ret == GEARMAN_MEMORY_ALLOCATION_FAILURE)
  579. {
  580. gearmand_sockfd_close(fd);
  581. return;
  582. }
  583. else if (ret != GEARMAN_SUCCESS)
  584. {
  585. Gearmand()->ret= ret;
  586. _clear_events(Gearmand());
  587. }
  588. }
  589. static gearmand_error_t _wakeup_init(gearmand_st *gearmand)
  590. {
  591. gearmand_debug("Creating wakeup pipe");
  592. if (pipe(gearmand->wakeup_fd) < 0)
  593. {
  594. gearmand_perror("pipe");
  595. return GEARMAN_ERRNO;
  596. }
  597. int returned_flags;
  598. if ((returned_flags= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0)) < 0)
  599. {
  600. gearmand_perror("fcntl:F_GETFL");
  601. return GEARMAN_ERRNO;
  602. }
  603. if (fcntl(gearmand->wakeup_fd[0], F_SETFL, returned_flags | O_NONBLOCK) < 0)
  604. {
  605. gearmand_perror("F_SETFL");
  606. return GEARMAN_ERRNO;
  607. }
  608. event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
  609. EV_READ | EV_PERSIST, _wakeup_event, gearmand);
  610. event_base_set(gearmand->base, &(gearmand->wakeup_event));
  611. return GEARMAN_SUCCESS;
  612. }
  613. static void _wakeup_close(gearmand_st *gearmand)
  614. {
  615. _wakeup_clear(gearmand);
  616. if (gearmand->wakeup_fd[0] >= 0)
  617. {
  618. gearmand_debug("Closing wakeup pipe");
  619. gearmand_pipe_close(gearmand->wakeup_fd[0]);
  620. gearmand->wakeup_fd[0]= -1;
  621. gearmand_pipe_close(gearmand->wakeup_fd[1]);
  622. gearmand->wakeup_fd[1]= -1;
  623. }
  624. }
  625. static gearmand_error_t _wakeup_watch(gearmand_st *gearmand)
  626. {
  627. if (gearmand->is_wakeup_event)
  628. return GEARMAN_SUCCESS;
  629. gearmand_debug("Adding event for wakeup pipe");
  630. if (event_add(&(gearmand->wakeup_event), NULL) < 0)
  631. {
  632. gearmand_perror("event_add");
  633. return GEARMAN_EVENT;
  634. }
  635. gearmand->is_wakeup_event= true;
  636. return GEARMAN_SUCCESS;
  637. }
  638. static void _wakeup_clear(gearmand_st *gearmand)
  639. {
  640. if (gearmand->is_wakeup_event)
  641. {
  642. gearmand_debug("Clearing event for wakeup pipe");
  643. if (event_del(&(gearmand->wakeup_event)) < 0)
  644. {
  645. gearmand_perror("We tried to event_del() an event which no longer existed");
  646. assert(! "We tried to event_del() an event which no longer existed");
  647. }
  648. gearmand->is_wakeup_event= false;
  649. }
  650. }
  651. static void _wakeup_event(int fd, short events __attribute__ ((unused)),
  652. void *arg)
  653. {
  654. gearmand_st *gearmand= (gearmand_st *)arg;
  655. uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
  656. gearmand_thread_st *thread;
  657. while (1)
  658. {
  659. ssize_t ret;
  660. ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
  661. if (ret == 0)
  662. {
  663. _clear_events(gearmand);
  664. gearmand_fatal("read(EOF)");
  665. gearmand->ret= GEARMAN_PIPE_EOF;
  666. return;
  667. }
  668. else if (ret == -1)
  669. {
  670. if (errno == EINTR)
  671. continue;
  672. if (errno == EAGAIN)
  673. break;
  674. _clear_events(gearmand);
  675. gearmand_perror("_wakeup_event:read");
  676. gearmand->ret= GEARMAN_ERRNO;
  677. return;
  678. }
  679. for (ssize_t x= 0; x < ret; x++)
  680. {
  681. switch ((gearmand_wakeup_t)buffer[x])
  682. {
  683. case GEARMAND_WAKEUP_PAUSE:
  684. gearmand_debug("Received PAUSE wakeup event");
  685. _clear_events(gearmand);
  686. gearmand->ret= GEARMAN_PAUSE;
  687. break;
  688. case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
  689. gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
  690. _listen_close(gearmand);
  691. for (thread= gearmand->thread_list; thread != NULL;
  692. thread= thread->next)
  693. {
  694. gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
  695. }
  696. gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
  697. break;
  698. case GEARMAND_WAKEUP_SHUTDOWN:
  699. gearmand_debug("Received SHUTDOWN wakeup event");
  700. _clear_events(gearmand);
  701. gearmand->ret= GEARMAN_SHUTDOWN;
  702. break;
  703. case GEARMAND_WAKEUP_CON:
  704. case GEARMAND_WAKEUP_RUN:
  705. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
  706. _clear_events(gearmand);
  707. gearmand->ret= GEARMAN_UNKNOWN_STATE;
  708. break;
  709. }
  710. }
  711. }
  712. }
  713. static gearmand_error_t _watch_events(gearmand_st *gearmand)
  714. {
  715. gearmand_error_t ret;
  716. ret= _listen_watch(gearmand);
  717. if (ret != GEARMAN_SUCCESS)
  718. return ret;
  719. ret= _wakeup_watch(gearmand);
  720. if (ret != GEARMAN_SUCCESS)
  721. return ret;
  722. return GEARMAN_SUCCESS;
  723. }
  724. static void _clear_events(gearmand_st *gearmand)
  725. {
  726. _listen_clear(gearmand);
  727. _wakeup_clear(gearmand);
  728. /*
  729. If we are not threaded, tell the fake thread to shutdown now to clear
  730. connections. Otherwise we will never exit the libevent loop.
  731. */
  732. if (gearmand->threads == 0 && gearmand->thread_list != NULL)
  733. {
  734. gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
  735. }
  736. }
  737. static void _close_events(gearmand_st *gearmand)
  738. {
  739. _listen_close(gearmand);
  740. _wakeup_close(gearmand);
  741. }
  742. /** @} */
  743. /*
  744. * Public Definitions
  745. */
  746. const char *gearmand_version(void)
  747. {
  748. return PACKAGE_VERSION;
  749. }
  750. const char *gearmand_bugreport(void)
  751. {
  752. return PACKAGE_BUGREPORT;
  753. }
  754. const char *gearmand_verbose_name(gearmand_verbose_t verbose)
  755. {
  756. switch (verbose)
  757. {
  758. case GEARMAND_VERBOSE_FATAL:
  759. return "FATAL";
  760. case GEARMAND_VERBOSE_ALERT:
  761. return "ALERT";
  762. case GEARMAND_VERBOSE_CRITICAL:
  763. return "CRITICAL";
  764. case GEARMAND_VERBOSE_ERROR:
  765. return "ERROR";
  766. case GEARMAND_VERBOSE_WARN:
  767. return "WARNING";
  768. case GEARMAND_VERBOSE_NOTICE:
  769. return "NOTICE";
  770. case GEARMAND_VERBOSE_INFO:
  771. return "INFO";
  772. case GEARMAND_VERBOSE_DEBUG:
  773. return "DEBUG";
  774. default:
  775. break;
  776. }
  777. return "UNKNOWN";
  778. }
  779. bool gearmand_verbose_check(const char *name, gearmand_verbose_t& level)
  780. {
  781. bool success= true;
  782. if (strcmp("FATAL", name) == 0)
  783. {
  784. level= GEARMAND_VERBOSE_FATAL;
  785. }
  786. else if (strcmp("ALERT", name) == 0)
  787. {
  788. level= GEARMAND_VERBOSE_ALERT;
  789. }
  790. else if (strcmp("CRITICAL", name) == 0)
  791. {
  792. level= GEARMAND_VERBOSE_CRITICAL;
  793. }
  794. else if (strcmp("ERROR", name) == 0)
  795. {
  796. level= GEARMAND_VERBOSE_ERROR;
  797. }
  798. else if (strcmp("WARNING", name) == 0)
  799. {
  800. level= GEARMAND_VERBOSE_WARN;
  801. }
  802. else if (strcmp("NOTICE", name) == 0)
  803. {
  804. level= GEARMAND_VERBOSE_NOTICE;
  805. }
  806. else if (strcmp("INFO", name) == 0)
  807. {
  808. level= GEARMAND_VERBOSE_INFO;
  809. }
  810. else if (strcmp("DEBUG", name) == 0)
  811. {
  812. level= GEARMAND_VERBOSE_DEBUG;
  813. }
  814. else
  815. {
  816. success= false;
  817. }
  818. return success;
  819. }
  820. static bool gearman_server_create(gearman_server_st *server,
  821. uint8_t job_retries_arg,
  822. uint8_t worker_wakeup_arg,
  823. bool round_robin_arg)
  824. {
  825. struct utsname un;
  826. assert(server);
  827. server->state.queue_startup= false;
  828. server->flags.round_robin= round_robin_arg;
  829. server->flags.threaded= false;
  830. server->shutdown= false;
  831. server->shutdown_graceful= false;
  832. server->proc_wakeup= false;
  833. server->proc_shutdown= false;
  834. server->job_retries= job_retries_arg;
  835. server->worker_wakeup= worker_wakeup_arg;
  836. server->thread_count= 0;
  837. server->free_packet_count= 0;
  838. server->function_count= 0;
  839. server->job_count= 0;
  840. server->unique_count= 0;
  841. server->free_job_count= 0;
  842. server->free_client_count= 0;
  843. server->free_worker_count= 0;
  844. server->thread_list= NULL;
  845. server->free_packet_list= NULL;
  846. server->function_list= NULL;
  847. server->free_job_list= NULL;
  848. server->free_client_list= NULL;
  849. server->free_worker_list= NULL;
  850. server->queue._context= NULL;
  851. server->queue._add_fn= NULL;
  852. server->queue._flush_fn= NULL;
  853. server->queue._done_fn= NULL;
  854. server->queue._replay_fn= NULL;
  855. memset(server->job_hash, 0,
  856. sizeof(gearman_server_job_st *) * GEARMAND_JOB_HASH_SIZE);
  857. memset(server->unique_hash, 0,
  858. sizeof(gearman_server_job_st *) * GEARMAND_JOB_HASH_SIZE);
  859. if (uname(&un) == -1)
  860. {
  861. gearman_server_free(server);
  862. return false;
  863. }
  864. int checked_length= snprintf(server->job_handle_prefix, GEARMAND_JOB_HANDLE_SIZE, "H:%s", un.nodename);
  865. if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
  866. {
  867. gearman_server_free(server);
  868. return false;
  869. }
  870. server->job_handle_count= 1;
  871. return true;
  872. }
  873. static void gearman_server_free(gearman_server_st *server)
  874. {
  875. uint32_t key;
  876. gearman_server_packet_st *packet;
  877. gearman_server_job_st *job;
  878. gearman_server_client_st *client;
  879. gearman_server_worker_st *worker;
  880. /* All threads should be cleaned up before calling this. */
  881. assert(server->thread_list == NULL);
  882. for (key= 0; key < GEARMAND_JOB_HASH_SIZE; key++)
  883. {
  884. while (server->job_hash[key] != NULL)
  885. {
  886. gearman_server_job_free(server->job_hash[key]);
  887. }
  888. }
  889. while (server->function_list != NULL)
  890. {
  891. gearman_server_function_free(server, server->function_list);
  892. }
  893. while (server->free_packet_list != NULL)
  894. {
  895. packet= server->free_packet_list;
  896. server->free_packet_list= packet->next;
  897. free(packet);
  898. }
  899. while (server->free_job_list != NULL)
  900. {
  901. job= server->free_job_list;
  902. server->free_job_list= job->next;
  903. free(job);
  904. }
  905. while (server->free_client_list != NULL)
  906. {
  907. client= server->free_client_list;
  908. server->free_client_list= client->con_next;
  909. free(client);
  910. }
  911. while (server->free_worker_list != NULL)
  912. {
  913. worker= server->free_worker_list;
  914. server->free_worker_list= worker->con_next;
  915. free(worker);
  916. }
  917. }