gearmand.cc 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Gearmand Definitions
  11. */
  12. #include <config.h>
  13. #include <libgearman-server/common.h>
  14. #include <cerrno>
  15. #include <netdb.h>
  16. #include <sys/socket.h>
  17. #include <sys/types.h>
  18. #include <sys/utsname.h>
  19. #include <set>
  20. #include <string>
  21. #include <libgearman-server/gearmand.h>
  22. #include <libgearman-server/struct/port.h>
  23. #include <libgearman-server/plugins.h>
  24. #include <libgearman-server/timer.h>
  25. using namespace gearmand;
  26. /*
  27. * Private declarations
  28. */
  29. /**
  30. * @addtogroup gearmand_private Private Gearman Daemon Functions
  31. * @ingroup gearmand
  32. * @{
  33. */
  34. static gearmand_error_t _listen_init(gearmand_st *gearmand);
  35. static void _listen_close(gearmand_st *gearmand);
  36. static gearmand_error_t _listen_watch(gearmand_st *gearmand);
  37. static void _listen_clear(gearmand_st *gearmand);
  38. static void _listen_event(int fd, short events, void *arg);
  39. static gearmand_error_t _wakeup_init(gearmand_st *gearmand);
  40. static void _wakeup_close(gearmand_st *gearmand);
  41. static gearmand_error_t _wakeup_watch(gearmand_st *gearmand);
  42. static void _wakeup_clear(gearmand_st *gearmand);
  43. static void _wakeup_event(int fd, short events, void *arg);
  44. static gearmand_error_t _watch_events(gearmand_st *gearmand);
  45. static void _clear_events(gearmand_st *gearmand);
  46. static void _close_events(gearmand_st *gearmand);
  47. static bool gearman_server_create(gearman_server_st *server,
  48. uint8_t job_retries,
  49. uint8_t worker_wakeup,
  50. bool round_robin);
  51. static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
  52. void *context, const gearmand_verbose_t verbose);
  53. static void gearman_server_free(gearman_server_st *server)
  54. {
  55. /* All threads should be cleaned up before calling this. */
  56. assert(server->thread_list == NULL);
  57. for (uint32_t key= 0; key < GEARMAND_JOB_HASH_SIZE; key++)
  58. {
  59. while (server->job_hash[key] != NULL)
  60. {
  61. gearman_server_job_free(server->job_hash[key]);
  62. }
  63. }
  64. while (server->function_list != NULL)
  65. {
  66. gearman_server_function_free(server, server->function_list);
  67. }
  68. while (server->free_packet_list != NULL)
  69. {
  70. gearman_server_packet_st *packet= server->free_packet_list;
  71. server->free_packet_list= packet->next;
  72. delete packet;
  73. }
  74. while (server->free_job_list != NULL)
  75. {
  76. gearman_server_job_st* job= server->free_job_list;
  77. server->free_job_list= job->next;
  78. delete job;
  79. }
  80. while (server->free_client_list != NULL)
  81. {
  82. gearman_server_client_st* client= server->free_client_list;
  83. server->free_client_list= client->con_next;
  84. delete client;
  85. }
  86. while (server->free_worker_list != NULL)
  87. {
  88. gearman_server_worker_st* worker= server->free_worker_list;
  89. server->free_worker_list= worker->con_next;
  90. delete worker;
  91. }
  92. }
  93. /** @} */
  94. #ifndef __INTEL_COMPILER
  95. #pragma GCC diagnostic ignored "-Wold-style-cast"
  96. #endif
  97. /*
  98. * Public definitions
  99. */
  100. static gearmand_st *_global_gearmand= NULL;
  101. gearmand_st *Gearmand(void)
  102. {
  103. if (!_global_gearmand)
  104. {
  105. gearmand_error("Gearmand() was called before it was allocated");
  106. assert(! "Gearmand() was called before it was allocated");
  107. }
  108. assert(_global_gearmand);
  109. return _global_gearmand;
  110. }
  111. gearmand_st *gearmand_create(const char *host_arg,
  112. uint32_t threads_arg,
  113. int backlog_arg,
  114. uint8_t job_retries,
  115. uint8_t worker_wakeup,
  116. gearmand_log_fn *log_function, void *log_context, const gearmand_verbose_t verbose_arg,
  117. bool round_robin)
  118. {
  119. gearmand_st *gearmand;
  120. assert(_global_gearmand == NULL);
  121. if (_global_gearmand)
  122. {
  123. gearmand_error("You have called gearmand_create() twice within your application.");
  124. _exit(EXIT_FAILURE);
  125. }
  126. gearmand= new (std::nothrow) gearmand_st;
  127. if (gearmand == NULL)
  128. {
  129. gearmand_merror("new", gearmand_st, 0);
  130. return NULL;
  131. }
  132. if (gearman_server_create(&(gearmand->server), job_retries, worker_wakeup, round_robin) == false)
  133. {
  134. delete gearmand;
  135. return NULL;
  136. }
  137. gearmand->is_listen_event= false;
  138. gearmand->is_wakeup_event= false;
  139. gearmand->verbose= verbose_arg;
  140. gearmand->timeout= -1;
  141. gearmand->ret= GEARMAN_SUCCESS;
  142. gearmand->backlog= backlog_arg;
  143. gearmand->threads= threads_arg;
  144. gearmand->port_count= 0;
  145. gearmand->thread_count= 0;
  146. gearmand->free_dcon_count= 0;
  147. gearmand->max_thread_free_dcon_count= 0;
  148. gearmand->wakeup_fd[0]= -1;
  149. gearmand->wakeup_fd[1]= -1;
  150. gearmand->host= host_arg;
  151. gearmand->log_fn= NULL;
  152. gearmand->log_context= NULL;
  153. gearmand->base= NULL;
  154. gearmand->port_list= NULL;
  155. gearmand->thread_list= NULL;
  156. gearmand->thread_add_next= NULL;
  157. gearmand->free_dcon_list= NULL;
  158. _global_gearmand= gearmand;
  159. gearmand_set_log_fn(gearmand, log_function, log_context, verbose_arg);
  160. return gearmand;
  161. }
  162. void gearmand_free(gearmand_st *gearmand)
  163. {
  164. _close_events(gearmand);
  165. if (gearmand->threads > 0)
  166. {
  167. gearmand_debug("Shutting down all threads");
  168. }
  169. while (gearmand->thread_list != NULL)
  170. {
  171. gearmand_thread_free(gearmand->thread_list);
  172. }
  173. while (gearmand->free_dcon_list != NULL)
  174. {
  175. gearmand_con_st *dcon;
  176. dcon= gearmand->free_dcon_list;
  177. gearmand->free_dcon_list= dcon->next;
  178. free(dcon);
  179. }
  180. if (gearmand->base != NULL)
  181. {
  182. event_base_free(gearmand->base);
  183. }
  184. gearman_server_free(&(gearmand->server));
  185. for (uint32_t x= 0; x < gearmand->port_count; x++)
  186. {
  187. if (gearmand->port_list[x].listen_fd != NULL)
  188. {
  189. free(gearmand->port_list[x].listen_fd);
  190. }
  191. if (gearmand->port_list[x].listen_event != NULL)
  192. {
  193. free(gearmand->port_list[x].listen_event);
  194. }
  195. }
  196. if (gearmand->port_list != NULL)
  197. {
  198. free(gearmand->port_list);
  199. }
  200. gearmand_info("Shutdown complete");
  201. delete gearmand;
  202. }
  203. static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
  204. void *context, const gearmand_verbose_t verbose)
  205. {
  206. gearmand->log_fn= function;
  207. gearmand->log_context= context;
  208. gearmand->verbose= verbose;
  209. }
  210. gearmand_error_t gearmand_port_add(gearmand_st *gearmand, const char *port,
  211. gearmand_connection_add_fn *function)
  212. {
  213. gearmand_port_st *port_list;
  214. port_list= (gearmand_port_st *)realloc(gearmand->port_list,
  215. sizeof(gearmand_port_st) * (gearmand->port_count + 1));
  216. if (port_list == NULL)
  217. {
  218. gearmand_perror("realloc");
  219. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  220. }
  221. strncpy(port_list[gearmand->port_count].port, port, NI_MAXSERV);
  222. port_list[gearmand->port_count].listen_count= 0;
  223. port_list[gearmand->port_count].add_fn= function;
  224. port_list[gearmand->port_count].listen_fd= NULL;
  225. port_list[gearmand->port_count].listen_event= NULL;
  226. gearmand->port_list= port_list;
  227. gearmand->port_count++;
  228. return GEARMAN_SUCCESS;
  229. }
  230. gearman_server_st *gearmand_server(gearmand_st *gearmand)
  231. {
  232. return &gearmand->server;
  233. }
  234. gearmand_error_t gearmand_run(gearmand_st *gearmand)
  235. {
  236. libgearman::server::Epoch epoch;
  237. /* Initialize server components. */
  238. if (gearmand->base == NULL)
  239. {
  240. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up, verbose set to %s",
  241. gearmand_verbose_name(gearmand->verbose));
  242. if (gearmand->threads > 0)
  243. {
  244. /* Set the number of free connection structures each thread should keep
  245. around before the main thread is forced to take them. We compute this
  246. here so we don't need to on every new connection. */
  247. gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
  248. gearmand->threads) / 2);
  249. }
  250. gearmand->base= static_cast<struct event_base *>(event_base_new());
  251. if (gearmand->base == NULL)
  252. {
  253. gearmand_fatal("event_base_new(NULL)");
  254. return GEARMAN_EVENT;
  255. }
  256. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Method for libevent: %s", event_base_get_method(gearmand->base));
  257. gearmand->ret= _listen_init(gearmand);
  258. if (gearmand->ret != GEARMAN_SUCCESS)
  259. {
  260. return gearmand->ret;
  261. }
  262. gearmand->ret= _wakeup_init(gearmand);
  263. if (gearmand->ret != GEARMAN_SUCCESS)
  264. {
  265. return gearmand->ret;
  266. }
  267. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Creating %u threads", gearmand->threads);
  268. /* If we have 0 threads we still need to create a fake one for context. */
  269. uint32_t x= 0;
  270. do
  271. {
  272. gearmand->ret= gearmand_thread_create(gearmand);
  273. if (gearmand->ret != GEARMAN_SUCCESS)
  274. return gearmand->ret;
  275. x++;
  276. }
  277. while (x < gearmand->threads);
  278. gearmand->ret= gearman_server_queue_replay(&(gearmand->server));
  279. if (gearmand->ret != GEARMAN_SUCCESS)
  280. {
  281. return gearmand->ret;
  282. }
  283. }
  284. gearmand->ret= _watch_events(gearmand);
  285. if (gearmand->ret != GEARMAN_SUCCESS)
  286. {
  287. return gearmand->ret;
  288. }
  289. gearmand_debug("Entering main event loop");
  290. if (event_base_loop(gearmand->base, 0) == -1)
  291. {
  292. gearmand_fatal("event_base_loop(-1)");
  293. return GEARMAN_EVENT;
  294. }
  295. gearmand_debug("Exited main event loop");
  296. return gearmand->ret;
  297. }
  298. void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
  299. {
  300. uint8_t buffer= wakeup;
  301. /* If this fails, there is not much we can really do. This should never fail
  302. though if the main gearmand thread is still active. */
  303. ssize_t written;
  304. if ((written= write(gearmand->wakeup_fd[1], &buffer, 1)) != 1)
  305. {
  306. if (written < 0)
  307. {
  308. gearmand_perror(gearmand_strwakeup(wakeup));
  309. }
  310. else
  311. {
  312. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  313. "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);
  314. }
  315. }
  316. }
  317. /*
  318. * Private definitions
  319. */
  320. static const uint32_t bind_timeout= 6; // Number is not special, but look at INFO messages if you decide to change it.
  321. typedef std::pair<std::string, std::string> host_port_t;
  322. static gearmand_error_t _listen_init(gearmand_st *gearmand)
  323. {
  324. for (uint32_t x= 0; x < gearmand->port_count; x++)
  325. {
  326. struct linger ling= {0, 0};
  327. struct addrinfo hints;
  328. struct addrinfo *addrinfo;
  329. gearmand_port_st *port= &gearmand->port_list[x];
  330. memset(&hints, 0, sizeof(struct addrinfo));
  331. hints.ai_flags= AI_PASSIVE;
  332. hints.ai_socktype= SOCK_STREAM;
  333. {
  334. int ret= getaddrinfo(gearmand->host, port->port, &hints, &addrinfo);
  335. if (ret != 0)
  336. {
  337. char buffer[1024];
  338. snprintf(buffer, sizeof(buffer), "%s:%s", gearmand->host ? gearmand->host : "<any>", port->port);
  339. gearmand_gai_error(buffer, ret);
  340. return GEARMAN_ERRNO;
  341. }
  342. }
  343. std::set<host_port_t> unique_hosts;
  344. for (struct addrinfo *addrinfo_next= addrinfo; addrinfo_next != NULL;
  345. addrinfo_next= addrinfo_next->ai_next)
  346. {
  347. int fd;
  348. char host[NI_MAXHOST];
  349. {
  350. int ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
  351. NI_MAXHOST, port->port, NI_MAXSERV,
  352. NI_NUMERICHOST | NI_NUMERICSERV);
  353. if (ret != 0)
  354. {
  355. gearmand_gai_error("getaddrinfo", ret);
  356. strncpy(host, "-", sizeof(host));
  357. strncpy(port->port, "-", sizeof(port->port));
  358. }
  359. }
  360. std::string host_string(host);
  361. std::string port_string(port->port);
  362. host_port_t check= std::make_pair(host_string, port_string);
  363. if (unique_hosts.find(check) != unique_hosts.end())
  364. {
  365. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Already listening on %s:%s", host, port->port);
  366. continue;
  367. }
  368. unique_hosts.insert(check);
  369. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Trying to listen on %s:%s", host, port->port);
  370. /* Call to socket() can fail for some getaddrinfo results, try another. */
  371. fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
  372. addrinfo_next->ai_protocol);
  373. if (fd == -1)
  374. {
  375. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Failed to listen on %s:%s", host, port->port);
  376. continue;
  377. }
  378. int flags= 1;
  379. #ifdef IPV6_V6ONLY
  380. if (addrinfo_next->ai_family == AF_INET6)
  381. {
  382. flags= 1;
  383. if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &flags, sizeof(flags)) == -1)
  384. {
  385. gearmand_perror("setsockopt(IPV6_V6ONLY)");
  386. return GEARMAN_ERRNO;
  387. }
  388. }
  389. #endif
  390. {
  391. int ret= fcntl(fd, F_SETFD, FD_CLOEXEC);
  392. if (ret != 0 || !(fcntl(fd, F_GETFD, 0) & FD_CLOEXEC))
  393. {
  394. gearmand_perror("fcntl(FD_CLOEXEC)");
  395. return GEARMAN_ERRNO;
  396. }
  397. }
  398. {
  399. if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags)) == -1)
  400. {
  401. gearmand_perror("setsockopt(SO_REUSEADDR)");
  402. return GEARMAN_ERRNO;
  403. }
  404. }
  405. {
  406. if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)) == -1)
  407. {
  408. gearmand_perror("setsockopt(SO_KEEPALIVE)");
  409. return GEARMAN_ERRNO;
  410. }
  411. }
  412. {
  413. if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) == -1)
  414. {
  415. gearmand_perror("setsockopt(SO_LINGER)");
  416. return GEARMAN_ERRNO;
  417. }
  418. }
  419. {
  420. if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags)) == -1)
  421. {
  422. gearmand_perror("setsockopt(TCP_NODELAY)");
  423. return GEARMAN_ERRNO;
  424. }
  425. }
  426. /*
  427. @note logic for this pulled from Drizzle.
  428. Sometimes the port is not released fast enough when stopping and
  429. restarting the server. This happens quite often with the test suite
  430. on busy Linux systems. Retry to bind the address at these intervals:
  431. Sleep intervals: 1, 2, 4, 6, 9, 13, 17, 22, ...
  432. Retry at second: 1, 3, 7, 13, 22, 35, 52, 74, ...
  433. Limit the sequence by drizzled_bind_timeout.
  434. */
  435. uint32_t waited;
  436. uint32_t this_wait;
  437. uint32_t retry;
  438. int ret= -1;
  439. for (waited= 0, retry= 1; ; retry++, waited+= this_wait)
  440. {
  441. if (((ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen)) == 0) or
  442. (errno != EADDRINUSE) || (waited >= bind_timeout))
  443. {
  444. break;
  445. }
  446. // We are in single user threads, so strerror() is fine.
  447. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u >= %u",
  448. strerror(errno), host, port->port,
  449. waited, bind_timeout);
  450. this_wait= retry * retry / 3 + 1;
  451. sleep(this_wait);
  452. }
  453. if (ret < 0)
  454. {
  455. gearmand_perror("bind");
  456. gearmand_sockfd_close(fd);
  457. if (errno == EADDRINUSE)
  458. {
  459. if (port->listen_fd == NULL)
  460. {
  461. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Address already in use %s:%s", host, port->port);
  462. }
  463. continue;
  464. }
  465. gearmand_perror("bind");
  466. return GEARMAN_ERRNO;
  467. }
  468. if (listen(fd, gearmand->backlog) == -1)
  469. {
  470. gearmand_perror("listen");
  471. gearmand_sockfd_close(fd);
  472. return GEARMAN_ERRNO;
  473. }
  474. // Scoping note for eventual transformation
  475. {
  476. int *fd_list;
  477. fd_list= (int *)realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
  478. if (fd_list == NULL)
  479. {
  480. gearmand_perror("realloc");
  481. gearmand_sockfd_close(fd);
  482. return GEARMAN_ERRNO;
  483. }
  484. port->listen_fd= fd_list;
  485. }
  486. port->listen_fd[port->listen_count]= fd;
  487. port->listen_count++;
  488. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Listening on %s:%s (%d)", host, port->port, fd);
  489. }
  490. freeaddrinfo(addrinfo);
  491. /* Report last socket() error if we couldn't find an address to bind. */
  492. if (port->listen_fd == NULL)
  493. {
  494. gearmand_fatal("Could not bind/listen to any addresses");
  495. return GEARMAN_ERRNO;
  496. }
  497. port->listen_event= (struct event *)malloc(sizeof(struct event) * port->listen_count);
  498. if (port->listen_event == NULL)
  499. {
  500. return gearmand_merror("malloc", struct event, port->listen_count);
  501. }
  502. for (uint32_t y= 0; y < port->listen_count; y++)
  503. {
  504. event_set(&(port->listen_event[y]), port->listen_fd[y],
  505. EV_READ | EV_PERSIST, _listen_event, port);
  506. event_base_set(gearmand->base, &(port->listen_event[y]));
  507. }
  508. }
  509. return GEARMAN_SUCCESS;
  510. }
  511. static void _listen_close(gearmand_st *gearmand)
  512. {
  513. _listen_clear(gearmand);
  514. for (uint32_t x= 0; x < gearmand->port_count; x++)
  515. {
  516. for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
  517. {
  518. if (gearmand->port_list[x].listen_fd[y] >= 0)
  519. {
  520. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Closing listening socket (%d)", gearmand->port_list[x].listen_fd[y]);
  521. gearmand_sockfd_close(gearmand->port_list[x].listen_fd[y]);
  522. gearmand->port_list[x].listen_fd[y]= -1;
  523. }
  524. }
  525. }
  526. }
  527. static gearmand_error_t _listen_watch(gearmand_st *gearmand)
  528. {
  529. if (gearmand->is_listen_event)
  530. {
  531. return GEARMAN_SUCCESS;
  532. }
  533. for (uint32_t x= 0; x < gearmand->port_count; x++)
  534. {
  535. for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
  536. {
  537. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Adding event for listening socket (%d)",
  538. gearmand->port_list[x].listen_fd[y]);
  539. if (event_add(&(gearmand->port_list[x].listen_event[y]), NULL) < 0)
  540. {
  541. gearmand_perror("event_add");
  542. return GEARMAN_EVENT;
  543. }
  544. }
  545. }
  546. gearmand->is_listen_event= true;
  547. return GEARMAN_SUCCESS;
  548. }
  549. static void _listen_clear(gearmand_st *gearmand)
  550. {
  551. if (gearmand->is_listen_event == false)
  552. {
  553. return;
  554. }
  555. for (uint32_t x= 0; x < gearmand->port_count; x++)
  556. {
  557. for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
  558. {
  559. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
  560. "Clearing event for listening socket (%d)",
  561. gearmand->port_list[x].listen_fd[y]);
  562. if (event_del(&(gearmand->port_list[x].listen_event[y])) < 0)
  563. {
  564. gearmand_perror("We tried to event_del() an event which no longer existed");
  565. assert(! "We tried to event_del() an event which no longer existed");
  566. }
  567. }
  568. }
  569. gearmand->is_listen_event= false;
  570. }
  571. static void _listen_event(int fd, short events __attribute__ ((unused)), void *arg)
  572. {
  573. gearmand_port_st *port= (gearmand_port_st *)arg;
  574. struct sockaddr sa;
  575. socklen_t sa_len= sizeof(sa);
  576. fd= accept(fd, &sa, &sa_len);
  577. if (fd == -1)
  578. {
  579. if (errno == EINTR)
  580. {
  581. return;
  582. }
  583. else if (errno == ECONNABORTED)
  584. {
  585. gearmand_perror("accept");
  586. return;
  587. }
  588. else if (errno == EMFILE)
  589. {
  590. gearmand_perror("accept");
  591. return;
  592. }
  593. _clear_events(Gearmand());
  594. gearmand_perror("accept");
  595. Gearmand()->ret= GEARMAN_ERRNO;
  596. return;
  597. }
  598. /*
  599. Since this is numeric, it should never fail. Even if it did we don't want to really error from it.
  600. */
  601. char host[NI_MAXHOST];
  602. char port_str[NI_MAXSERV];
  603. int error= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
  604. NI_NUMERICHOST | NI_NUMERICSERV);
  605. if (error != 0)
  606. {
  607. gearmand_gai_error("getnameinfo", error);
  608. strncpy(host, "-", sizeof(host));
  609. strncpy(port_str, "-", sizeof(port_str));
  610. }
  611. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Accepted connection from %s:%s", host, port_str);
  612. gearmand_error_t ret;
  613. ret= gearmand_con_create(Gearmand(), fd, host, port_str, port->add_fn);
  614. if (ret == GEARMAN_MEMORY_ALLOCATION_FAILURE)
  615. {
  616. gearmand_sockfd_close(fd);
  617. return;
  618. }
  619. else if (ret != GEARMAN_SUCCESS)
  620. {
  621. Gearmand()->ret= ret;
  622. _clear_events(Gearmand());
  623. }
  624. }
  625. static gearmand_error_t _wakeup_init(gearmand_st *gearmand)
  626. {
  627. gearmand_debug("Creating wakeup pipe");
  628. if (pipe(gearmand->wakeup_fd) < 0)
  629. {
  630. gearmand_perror("pipe");
  631. return GEARMAN_ERRNO;
  632. }
  633. int returned_flags;
  634. if ((returned_flags= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0)) < 0)
  635. {
  636. gearmand_perror("fcntl:F_GETFL");
  637. return GEARMAN_ERRNO;
  638. }
  639. if (fcntl(gearmand->wakeup_fd[0], F_SETFL, returned_flags | O_NONBLOCK) < 0)
  640. {
  641. gearmand_perror("F_SETFL");
  642. return GEARMAN_ERRNO;
  643. }
  644. event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
  645. EV_READ | EV_PERSIST, _wakeup_event, gearmand);
  646. event_base_set(gearmand->base, &(gearmand->wakeup_event));
  647. return GEARMAN_SUCCESS;
  648. }
  649. static void _wakeup_close(gearmand_st *gearmand)
  650. {
  651. _wakeup_clear(gearmand);
  652. if (gearmand->wakeup_fd[0] >= 0)
  653. {
  654. gearmand_debug("Closing wakeup pipe");
  655. gearmand_pipe_close(gearmand->wakeup_fd[0]);
  656. gearmand->wakeup_fd[0]= -1;
  657. gearmand_pipe_close(gearmand->wakeup_fd[1]);
  658. gearmand->wakeup_fd[1]= -1;
  659. }
  660. }
  661. static gearmand_error_t _wakeup_watch(gearmand_st *gearmand)
  662. {
  663. if (gearmand->is_wakeup_event)
  664. {
  665. return GEARMAN_SUCCESS;
  666. }
  667. gearmand_debug("Adding event for wakeup pipe");
  668. if (event_add(&(gearmand->wakeup_event), NULL) < 0)
  669. {
  670. gearmand_perror("event_add");
  671. return GEARMAN_EVENT;
  672. }
  673. gearmand->is_wakeup_event= true;
  674. return GEARMAN_SUCCESS;
  675. }
  676. static void _wakeup_clear(gearmand_st *gearmand)
  677. {
  678. if (gearmand->is_wakeup_event)
  679. {
  680. gearmand_debug("Clearing event for wakeup pipe");
  681. if (event_del(&(gearmand->wakeup_event)) < 0)
  682. {
  683. gearmand_perror("We tried to event_del() an event which no longer existed");
  684. assert(! "We tried to event_del() an event which no longer existed");
  685. }
  686. gearmand->is_wakeup_event= false;
  687. }
  688. }
  689. static void _wakeup_event(int fd, short events __attribute__ ((unused)),
  690. void *arg)
  691. {
  692. gearmand_st *gearmand= (gearmand_st *)arg;
  693. uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
  694. gearmand_thread_st *thread;
  695. while (1)
  696. {
  697. ssize_t ret;
  698. ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
  699. if (ret == 0)
  700. {
  701. _clear_events(gearmand);
  702. gearmand_fatal("read(EOF)");
  703. gearmand->ret= GEARMAN_PIPE_EOF;
  704. return;
  705. }
  706. else if (ret == -1)
  707. {
  708. if (errno == EINTR)
  709. {
  710. continue;
  711. }
  712. if (errno == EAGAIN)
  713. {
  714. break;
  715. }
  716. _clear_events(gearmand);
  717. gearmand_perror("_wakeup_event:read");
  718. gearmand->ret= GEARMAN_ERRNO;
  719. return;
  720. }
  721. for (ssize_t x= 0; x < ret; x++)
  722. {
  723. switch ((gearmand_wakeup_t)buffer[x])
  724. {
  725. case GEARMAND_WAKEUP_PAUSE:
  726. gearmand_debug("Received PAUSE wakeup event");
  727. _clear_events(gearmand);
  728. gearmand->ret= GEARMAN_PAUSE;
  729. break;
  730. case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
  731. gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
  732. _listen_close(gearmand);
  733. for (thread= gearmand->thread_list; thread != NULL;
  734. thread= thread->next)
  735. {
  736. gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
  737. }
  738. gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
  739. break;
  740. case GEARMAND_WAKEUP_SHUTDOWN:
  741. gearmand_debug("Received SHUTDOWN wakeup event");
  742. _clear_events(gearmand);
  743. gearmand->ret= GEARMAN_SHUTDOWN;
  744. break;
  745. case GEARMAND_WAKEUP_CON:
  746. case GEARMAND_WAKEUP_RUN:
  747. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
  748. _clear_events(gearmand);
  749. gearmand->ret= GEARMAN_UNKNOWN_STATE;
  750. break;
  751. }
  752. }
  753. }
  754. }
  755. static gearmand_error_t _watch_events(gearmand_st *gearmand)
  756. {
  757. gearmand_error_t ret;
  758. ret= _listen_watch(gearmand);
  759. if (ret != GEARMAN_SUCCESS)
  760. {
  761. return ret;
  762. }
  763. ret= _wakeup_watch(gearmand);
  764. if (ret != GEARMAN_SUCCESS)
  765. {
  766. return ret;
  767. }
  768. return GEARMAN_SUCCESS;
  769. }
  770. static void _clear_events(gearmand_st *gearmand)
  771. {
  772. _listen_clear(gearmand);
  773. _wakeup_clear(gearmand);
  774. /*
  775. If we are not threaded, tell the fake thread to shutdown now to clear
  776. connections. Otherwise we will never exit the libevent loop.
  777. */
  778. if (gearmand->threads == 0 && gearmand->thread_list != NULL)
  779. {
  780. gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
  781. }
  782. }
  783. static void _close_events(gearmand_st *gearmand)
  784. {
  785. _listen_close(gearmand);
  786. _wakeup_close(gearmand);
  787. }
  788. /** @} */
  789. /*
  790. * Public Definitions
  791. */
  792. const char *gearmand_version(void)
  793. {
  794. return PACKAGE_VERSION;
  795. }
  796. const char *gearmand_bugreport(void)
  797. {
  798. return PACKAGE_BUGREPORT;
  799. }
  800. const char *gearmand_verbose_name(gearmand_verbose_t verbose)
  801. {
  802. switch (verbose)
  803. {
  804. case GEARMAND_VERBOSE_FATAL:
  805. return "FATAL";
  806. case GEARMAND_VERBOSE_ALERT:
  807. return "ALERT";
  808. case GEARMAND_VERBOSE_CRITICAL:
  809. return "CRITICAL";
  810. case GEARMAND_VERBOSE_ERROR:
  811. return "ERROR";
  812. case GEARMAND_VERBOSE_WARN:
  813. return "WARNING";
  814. case GEARMAND_VERBOSE_NOTICE:
  815. return "NOTICE";
  816. case GEARMAND_VERBOSE_INFO:
  817. return "INFO";
  818. case GEARMAND_VERBOSE_DEBUG:
  819. return "DEBUG";
  820. default:
  821. break;
  822. }
  823. return "UNKNOWN";
  824. }
  825. bool gearmand_verbose_check(const char *name, gearmand_verbose_t& level)
  826. {
  827. bool success= true;
  828. if (strcmp("FATAL", name) == 0)
  829. {
  830. level= GEARMAND_VERBOSE_FATAL;
  831. }
  832. else if (strcmp("ALERT", name) == 0)
  833. {
  834. level= GEARMAND_VERBOSE_ALERT;
  835. }
  836. else if (strcmp("CRITICAL", name) == 0)
  837. {
  838. level= GEARMAND_VERBOSE_CRITICAL;
  839. }
  840. else if (strcmp("ERROR", name) == 0)
  841. {
  842. level= GEARMAND_VERBOSE_ERROR;
  843. }
  844. else if (strcmp("WARNING", name) == 0)
  845. {
  846. level= GEARMAND_VERBOSE_WARN;
  847. }
  848. else if (strcmp("NOTICE", name) == 0)
  849. {
  850. level= GEARMAND_VERBOSE_NOTICE;
  851. }
  852. else if (strcmp("INFO", name) == 0)
  853. {
  854. level= GEARMAND_VERBOSE_INFO;
  855. }
  856. else if (strcmp("DEBUG", name) == 0)
  857. {
  858. level= GEARMAND_VERBOSE_DEBUG;
  859. }
  860. else
  861. {
  862. success= false;
  863. }
  864. return success;
  865. }
  866. static bool gearman_server_create(gearman_server_st *server,
  867. uint8_t job_retries_arg,
  868. uint8_t worker_wakeup_arg,
  869. bool round_robin_arg)
  870. {
  871. struct utsname un;
  872. assert(server);
  873. server->state.queue_startup= false;
  874. server->flags.round_robin= round_robin_arg;
  875. server->flags.threaded= false;
  876. server->shutdown= false;
  877. server->shutdown_graceful= false;
  878. server->proc_wakeup= false;
  879. server->proc_shutdown= false;
  880. server->job_retries= job_retries_arg;
  881. server->worker_wakeup= worker_wakeup_arg;
  882. server->thread_count= 0;
  883. server->free_packet_count= 0;
  884. server->function_count= 0;
  885. server->job_count= 0;
  886. server->unique_count= 0;
  887. server->free_job_count= 0;
  888. server->free_client_count= 0;
  889. server->free_worker_count= 0;
  890. server->thread_list= NULL;
  891. server->free_packet_list= NULL;
  892. server->function_list= NULL;
  893. server->free_job_list= NULL;
  894. server->free_client_list= NULL;
  895. server->free_worker_list= NULL;
  896. server->queue._context= NULL;
  897. server->queue._add_fn= NULL;
  898. server->queue._flush_fn= NULL;
  899. server->queue._done_fn= NULL;
  900. server->queue._replay_fn= NULL;
  901. memset(server->job_hash, 0,
  902. sizeof(gearman_server_job_st *) * GEARMAND_JOB_HASH_SIZE);
  903. memset(server->unique_hash, 0,
  904. sizeof(gearman_server_job_st *) * GEARMAND_JOB_HASH_SIZE);
  905. if (uname(&un) == -1)
  906. {
  907. gearman_server_free(server);
  908. return false;
  909. }
  910. int checked_length= snprintf(server->job_handle_prefix, GEARMAND_JOB_HANDLE_SIZE, "H:%s", un.nodename);
  911. if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
  912. {
  913. gearman_server_free(server);
  914. return false;
  915. }
  916. server->job_handle_count= 1;
  917. return true;
  918. }