gearmand_thread.cc 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Gearmand Thread Definitions
  11. */
  12. #include <config.h>
  13. #include <libgearman-server/common.h>
  14. #include <libgearman-server/gearmand.h>
  15. #include <cassert>
  16. #include <cerrno>
  17. #include <libgearman-server/list.h>
  18. /*
  19. * Private declarations
  20. */
  21. /**
  22. * @addtogroup gearmand_thread_private Private Gearmand Thread Functions
  23. * @ingroup gearmand_thread
  24. * @{
  25. */
  26. static void *_thread(void *data);
  27. static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st *dthread);
  28. static void _run(gearman_server_thread_st *thread, void *fn_arg);
  29. static gearmand_error_t _wakeup_init(gearmand_thread_st *thread);
  30. static void _wakeup_close(gearmand_thread_st *thread);
  31. static void _wakeup_clear(gearmand_thread_st *thread);
  32. static void _wakeup_event(int fd, short events, void *arg);
  33. static void _clear_events(gearmand_thread_st *thread);
  34. /** @} */
  35. /*
  36. * Public definitions
  37. */
  38. gearmand_error_t gearmand_thread_create(gearmand_st *gearmand)
  39. {
  40. gearmand_thread_st *thread;
  41. gearmand_error_t ret;
  42. thread= static_cast<gearmand_thread_st *>(malloc(sizeof(gearmand_thread_st)));
  43. if (not thread)
  44. {
  45. return gearmand_merror("malloc", gearmand_thread_st, 1);
  46. }
  47. if (! gearman_server_thread_init(gearmand_server(gearmand), &(thread->server_thread),
  48. _log, thread, gearmand_connection_watch))
  49. {
  50. free(thread);
  51. gearmand_fatal("gearman_server_thread_init(NULL)");
  52. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  53. }
  54. thread->is_thread_lock= false;
  55. thread->is_wakeup_event= false;
  56. thread->count= 0;
  57. thread->dcon_count= 0;
  58. thread->dcon_add_count= 0;
  59. thread->free_dcon_count= 0;
  60. thread->wakeup_fd[0]= -1;
  61. thread->wakeup_fd[1]= -1;
  62. gearmand_thread_list_add(thread);
  63. thread->dcon_list= NULL;
  64. thread->dcon_add_list= NULL;
  65. thread->free_dcon_list= NULL;
  66. /* If we have no threads, we still create a fake thread that uses the main
  67. libevent instance. Otherwise create a libevent instance for each thread. */
  68. if (gearmand->threads == 0)
  69. {
  70. thread->base= gearmand->base;
  71. }
  72. else
  73. {
  74. gearmand_debug("Initializing libevent for IO thread");
  75. thread->base= static_cast<struct event_base *>(event_base_new());
  76. if (thread->base == NULL)
  77. {
  78. gearmand_thread_free(thread);
  79. gearmand_fatal("event_base_new(NULL)");
  80. return GEARMAN_EVENT;
  81. }
  82. }
  83. ret= _wakeup_init(thread);
  84. if (ret != GEARMAN_SUCCESS)
  85. {
  86. gearmand_thread_free(thread);
  87. return ret;
  88. }
  89. /* If we are not running multi-threaded, just return the thread context. */
  90. if (gearmand->threads == 0)
  91. return GEARMAN_SUCCESS;
  92. thread->count= gearmand->thread_count;
  93. int pthread_ret;
  94. pthread_ret= pthread_mutex_init(&(thread->lock), NULL);
  95. if (pthread_ret != 0)
  96. {
  97. thread->count= 0;
  98. gearmand_thread_free(thread);
  99. errno= pthread_ret;
  100. gearmand_fatal_perror("pthread_mutex_init");
  101. return GEARMAN_ERRNO;
  102. }
  103. thread->is_thread_lock= true;
  104. gearman_server_thread_set_run(&(thread->server_thread), _run, thread);
  105. pthread_ret= pthread_create(&(thread->id), NULL, _thread, thread);
  106. if (pthread_ret != 0)
  107. {
  108. thread->count= 0;
  109. gearmand_thread_free(thread);
  110. errno= pthread_ret;
  111. gearmand_perror("pthread_create");
  112. return GEARMAN_ERRNO;
  113. }
  114. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u created", thread->count);
  115. return GEARMAN_SUCCESS;
  116. }
  117. void gearmand_thread_free(gearmand_thread_st *thread)
  118. {
  119. gearmand_con_st *dcon;
  120. if (Gearmand()->threads && thread->count > 0)
  121. {
  122. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Shutting down thread %u", thread->count);
  123. gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
  124. (void) pthread_join(thread->id, NULL);
  125. }
  126. if (thread->is_thread_lock)
  127. {
  128. (void) pthread_mutex_destroy(&(thread->lock));
  129. }
  130. _wakeup_close(thread);
  131. while (thread->dcon_list != NULL)
  132. {
  133. gearmand_con_free(thread->dcon_list);
  134. }
  135. while (thread->dcon_add_list != NULL)
  136. {
  137. dcon= thread->dcon_add_list;
  138. thread->dcon_add_list= dcon->next;
  139. gearmand_sockfd_close(dcon->fd);
  140. free(dcon);
  141. }
  142. while (thread->free_dcon_list != NULL)
  143. {
  144. dcon= thread->free_dcon_list;
  145. thread->free_dcon_list= dcon->next;
  146. free(dcon);
  147. }
  148. gearman_server_thread_free(&(thread->server_thread));
  149. gearmand_thread_list_free(thread);
  150. if (Gearmand()->threads > 0)
  151. {
  152. if (thread->base != NULL)
  153. {
  154. event_base_free(thread->base);
  155. }
  156. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u shutdown complete", thread->count);
  157. }
  158. free(thread);
  159. }
  160. void gearmand_thread_wakeup(gearmand_thread_st *thread,
  161. gearmand_wakeup_t wakeup)
  162. {
  163. uint8_t buffer= wakeup;
  164. /* If this fails, there is not much we can really do. This should never fail
  165. though if the thread is still active. */
  166. if (write(thread->wakeup_fd[1], &buffer, 1) != 1)
  167. {
  168. gearmand_perror("write");
  169. }
  170. }
  171. void gearmand_thread_run(gearmand_thread_st *thread)
  172. {
  173. while (1)
  174. {
  175. gearmand_error_t ret;
  176. gearmand_con_st *dcon= gearman_server_thread_run(&(thread->server_thread), &ret);
  177. if (ret == GEARMAN_SUCCESS || ret == GEARMAN_IO_WAIT ||
  178. ret == GEARMAN_SHUTDOWN_GRACEFUL)
  179. {
  180. return;
  181. }
  182. if (not dcon)
  183. {
  184. /* We either got a GEARMAN_SHUTDOWN or some other fatal internal error.
  185. Either way, we want to shut the server down. */
  186. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  187. return;
  188. }
  189. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Disconnected %s:%s", dcon->host, dcon->port);
  190. gearmand_con_free(dcon);
  191. }
  192. }
  193. #ifndef __INTEL_COMPILER
  194. #pragma GCC diagnostic ignored "-Wold-style-cast"
  195. #endif
  196. /*
  197. * Private definitions
  198. */
  199. static void *_thread(void *data)
  200. {
  201. gearmand_thread_st *thread= (gearmand_thread_st *)data;
  202. char buffer[BUFSIZ];
  203. snprintf(buffer, sizeof(buffer), "[%6u ]", thread->count);
  204. gearmand_initialize_thread_logging(buffer);
  205. gearmand_debug("Entering thread event loop");
  206. if (event_base_loop(thread->base, 0) == -1)
  207. {
  208. gearmand_fatal("event_base_loop(-1)");
  209. Gearmand()->ret= GEARMAN_EVENT;
  210. }
  211. gearmand_debug("Exiting thread event loop");
  212. return NULL;
  213. }
  214. static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st *dthread)
  215. {
  216. (void)dthread;
  217. (*Gearmand()->log_fn)(line, verbose, (void *)Gearmand()->log_context);
  218. }
  219. static void _run(gearman_server_thread_st *thread __attribute__ ((unused)),
  220. void *fn_arg)
  221. {
  222. gearmand_thread_st *dthread= (gearmand_thread_st *)fn_arg;
  223. gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
  224. }
  225. static gearmand_error_t _wakeup_init(gearmand_thread_st *thread)
  226. {
  227. int ret;
  228. gearmand_debug("Creating IO thread wakeup pipe");
  229. ret= pipe(thread->wakeup_fd);
  230. if (ret == -1)
  231. {
  232. gearmand_perror("pipe");
  233. return GEARMAN_ERRNO;
  234. }
  235. ret= fcntl(thread->wakeup_fd[0], F_GETFL, 0);
  236. if (ret == -1)
  237. {
  238. gearmand_perror("fcntl(F_GETFL)");
  239. return GEARMAN_ERRNO;
  240. }
  241. ret= fcntl(thread->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
  242. if (ret == -1)
  243. {
  244. gearmand_perror("fcntl(F_SETFL)");
  245. return GEARMAN_ERRNO;
  246. }
  247. event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
  248. _wakeup_event, thread);
  249. event_base_set(thread->base, &(thread->wakeup_event));
  250. if (event_add(&(thread->wakeup_event), NULL) < 0)
  251. {
  252. gearmand_perror("event_add");
  253. return GEARMAN_EVENT;
  254. }
  255. thread->is_wakeup_event= true;
  256. return GEARMAN_SUCCESS;
  257. }
  258. static void _wakeup_close(gearmand_thread_st *thread)
  259. {
  260. _wakeup_clear(thread);
  261. if (thread->wakeup_fd[0] >= 0)
  262. {
  263. gearmand_debug("Closing IO thread wakeup pipe");
  264. gearmand_pipe_close(thread->wakeup_fd[0]);
  265. thread->wakeup_fd[0]= -1;
  266. gearmand_pipe_close(thread->wakeup_fd[1]);
  267. thread->wakeup_fd[1]= -1;
  268. }
  269. }
  270. static void _wakeup_clear(gearmand_thread_st *thread)
  271. {
  272. if (thread->is_wakeup_event)
  273. {
  274. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Clearing event for IO thread wakeup pipe %u", thread->count);
  275. if (event_del(&(thread->wakeup_event)) < 0)
  276. {
  277. gearmand_perror("event_del");
  278. assert(! "event_del");
  279. }
  280. thread->is_wakeup_event= false;
  281. }
  282. }
  283. static void _wakeup_event(int fd, short events __attribute__ ((unused)), void *arg)
  284. {
  285. gearmand_thread_st *thread= (gearmand_thread_st *)arg;
  286. uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
  287. ssize_t ret;
  288. while (1)
  289. {
  290. ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
  291. if (ret == 0)
  292. {
  293. _clear_events(thread);
  294. gearmand_fatal("read(EOF)");
  295. Gearmand()->ret= GEARMAN_PIPE_EOF;
  296. return;
  297. }
  298. else if (ret == -1)
  299. {
  300. if (errno == EINTR)
  301. continue;
  302. if (errno == EAGAIN)
  303. break;
  304. _clear_events(thread);
  305. gearmand_perror("_wakeup_event:read");
  306. Gearmand()->ret= GEARMAN_ERRNO;
  307. return;
  308. }
  309. for (ssize_t x= 0; x < ret; x++)
  310. {
  311. switch ((gearmand_wakeup_t)buffer[x])
  312. {
  313. case GEARMAND_WAKEUP_PAUSE:
  314. gearmand_debug("Received PAUSE wakeup event");
  315. break;
  316. case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
  317. gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
  318. if (gearman_server_shutdown_graceful(&(Gearmand()->server)) == GEARMAN_SHUTDOWN)
  319. {
  320. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  321. }
  322. break;
  323. case GEARMAND_WAKEUP_SHUTDOWN:
  324. gearmand_debug("Received SHUTDOWN wakeup event");
  325. _clear_events(thread);
  326. break;
  327. case GEARMAND_WAKEUP_CON:
  328. gearmand_debug("Received CON wakeup event");
  329. gearmand_con_check_queue(thread);
  330. break;
  331. case GEARMAND_WAKEUP_RUN:
  332. gearmand_debug("Received RUN wakeup event");
  333. gearmand_thread_run(thread);
  334. break;
  335. default:
  336. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
  337. _clear_events(thread);
  338. Gearmand()->ret= GEARMAN_UNKNOWN_STATE;
  339. break;
  340. }
  341. }
  342. }
  343. }
  344. static void _clear_events(gearmand_thread_st *thread)
  345. {
  346. _wakeup_clear(thread);
  347. while (thread->dcon_list != NULL)
  348. {
  349. gearmand_con_free(thread->dcon_list);
  350. }
  351. }