thread.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Server Thread Definitions
  11. */
  12. #include <libgearman-server/common.h>
  13. #define GEARMAN_CORE
  14. #include <libgearman/command.h>
  15. #ifdef __cplusplus
  16. #include <cassert>
  17. #include <cerrno>
  18. #else
  19. #include <assert.h>
  20. #include <errno.h>
  21. #endif
  22. /*
  23. * Private declarations
  24. */
  25. /**
  26. * @addtogroup gearman_server_private Private Server Functions
  27. * @ingroup gearman_server
  28. * @{
  29. */
  30. /**
  31. * Try reading packets for a connection.
  32. */
  33. static gearmand_error_t _thread_packet_read(gearman_server_con_st *con);
  34. /**
  35. * Flush outgoing packets for a connection.
  36. */
  37. static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con);
  38. /**
  39. * Start processing thread for the server.
  40. */
  41. static gearmand_error_t _proc_thread_start(gearman_server_st *server);
  42. /**
  43. * Kill processing thread for the server.
  44. */
  45. static void _proc_thread_kill(gearman_server_st *server);
  46. /**
  47. * Processing thread.
  48. */
  49. static void *_proc(void *data);
  50. /** @} */
  51. /*
  52. * Public definitions
  53. */
  54. bool gearman_server_thread_init(gearman_server_st *server,
  55. gearman_server_thread_st *thread,
  56. gearman_log_server_fn *log_function,
  57. gearmand_thread_st *context,
  58. gearmand_event_watch_fn *event_watch)
  59. {
  60. assert(server);
  61. assert(thread);
  62. if (server->thread_count == 1)
  63. {
  64. /* The server is going to be multi-threaded, start processing thread. */
  65. if (_proc_thread_start(server) != GEARMAN_SUCCESS)
  66. {
  67. return false;
  68. }
  69. }
  70. thread->con_count= 0;
  71. thread->io_count= 0;
  72. thread->proc_count= 0;
  73. thread->free_con_count= 0;
  74. thread->free_packet_count= 0;
  75. thread->log_fn= log_function;
  76. thread->log_context= context;
  77. thread->run_fn= NULL;
  78. thread->run_fn_arg= NULL;
  79. thread->con_list= NULL;
  80. thread->io_list= NULL;
  81. thread->proc_list= NULL;
  82. thread->free_con_list= NULL;
  83. thread->free_packet_list= NULL;
  84. int error;
  85. if ((error= pthread_mutex_init(&(thread->lock), NULL)))
  86. {
  87. errno= error;
  88. gearmand_perror("pthread_mutex_init");
  89. return false;
  90. }
  91. GEARMAN_LIST_ADD(server->thread, thread,);
  92. thread->gearman= &(thread->gearmand_connection_list_static);
  93. gearmand_connection_list_init(thread->gearman, event_watch, NULL);
  94. return true;
  95. }
  96. void gearman_server_thread_free(gearman_server_thread_st *thread)
  97. {
  98. gearman_server_con_st *con;
  99. gearman_server_packet_st *packet;
  100. _proc_thread_kill(Server);
  101. while (thread->con_list != NULL)
  102. {
  103. gearman_server_con_free(thread->con_list);
  104. }
  105. while (thread->free_con_list != NULL)
  106. {
  107. con= thread->free_con_list;
  108. thread->free_con_list= con->next;
  109. gearmand_debug("free");
  110. free(con);
  111. }
  112. while (thread->free_packet_list != NULL)
  113. {
  114. packet= thread->free_packet_list;
  115. thread->free_packet_list= packet->next;
  116. gearmand_debug("free");
  117. free(packet);
  118. }
  119. if (thread->gearman != NULL)
  120. gearman_connection_list_free(thread->gearman);
  121. pthread_mutex_destroy(&(thread->lock));
  122. GEARMAN_LIST_DEL(Server->thread, thread,)
  123. }
  124. void gearman_server_thread_set_run(gearman_server_thread_st *thread,
  125. gearman_server_thread_run_fn *run_fn,
  126. void *run_fn_arg)
  127. {
  128. thread->run_fn= run_fn;
  129. thread->run_fn_arg= run_fn_arg;
  130. }
  131. gearmand_con_st *
  132. gearman_server_thread_run(gearman_server_thread_st *thread,
  133. gearmand_error_t *ret_ptr)
  134. {
  135. /* If we are multi-threaded, we may have packets to flush or connections that
  136. should start reading again. */
  137. if (Server->flags.threaded)
  138. {
  139. gearman_server_con_st *server_con;
  140. while ((server_con= gearman_server_con_io_next(thread)) != NULL)
  141. {
  142. if (server_con->is_dead)
  143. {
  144. if (server_con->proc_removed)
  145. {
  146. gearman_server_con_free(server_con);
  147. }
  148. continue;
  149. }
  150. if (server_con->ret != GEARMAN_SUCCESS)
  151. {
  152. *ret_ptr= server_con->ret;
  153. return gearman_server_con_data(server_con);
  154. }
  155. /* See if any outgoing packets were queued. */
  156. *ret_ptr= _thread_packet_flush(server_con);
  157. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  158. {
  159. return gearman_server_con_data(server_con);
  160. }
  161. }
  162. }
  163. /* Check for new activity on connections. */
  164. {
  165. gearman_server_con_st *server_con;
  166. while ((server_con= gearmand_ready(thread->gearman)))
  167. {
  168. /* Try to read new packets. */
  169. if (server_con->con.revents & POLLIN)
  170. {
  171. *ret_ptr= _thread_packet_read(server_con);
  172. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  173. return gearman_server_con_data(server_con);
  174. }
  175. /* Flush existing outgoing packets. */
  176. if (server_con->con.revents & POLLOUT)
  177. {
  178. *ret_ptr= _thread_packet_flush(server_con);
  179. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  180. {
  181. return gearman_server_con_data(server_con);
  182. }
  183. }
  184. }
  185. }
  186. /* Start flushing new outgoing packets if we are single threaded. */
  187. if (! (Server->flags.threaded))
  188. {
  189. gearman_server_con_st *server_con;
  190. while ((server_con= gearman_server_con_io_next(thread)))
  191. {
  192. *ret_ptr= _thread_packet_flush(server_con);
  193. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  194. {
  195. return gearman_server_con_data(server_con);
  196. }
  197. }
  198. }
  199. /* Check for the two shutdown modes. */
  200. if (Server->shutdown)
  201. {
  202. *ret_ptr= GEARMAN_SHUTDOWN;
  203. }
  204. else if (Server->shutdown_graceful)
  205. {
  206. if (Server->job_count == 0)
  207. {
  208. *ret_ptr= GEARMAN_SHUTDOWN;
  209. }
  210. else
  211. {
  212. *ret_ptr= GEARMAN_SHUTDOWN_GRACEFUL;
  213. }
  214. }
  215. else
  216. {
  217. *ret_ptr= GEARMAN_SUCCESS;
  218. }
  219. return NULL;
  220. }
  221. /*
  222. * Private definitions
  223. */
  224. static gearmand_error_t _thread_packet_read(gearman_server_con_st *con)
  225. {
  226. while (1)
  227. {
  228. if (con->packet == NULL)
  229. {
  230. if (! (con->packet= gearman_server_packet_create(con->thread, true)))
  231. {
  232. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  233. }
  234. }
  235. gearmand_error_t ret;
  236. if (gearmand_failed(ret= gearman_io_recv(con, true)))
  237. {
  238. if (ret == GEARMAN_IO_WAIT)
  239. {
  240. break;
  241. }
  242. gearman_server_packet_free(con->packet, con->thread, true);
  243. con->packet= NULL;
  244. return ret;
  245. }
  246. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  247. "Received %s %s:%u",
  248. gearman_command_info(con->packet->packet.command)->name,
  249. con->_host == NULL ? "-" : con->_host,
  250. con->_port == NULL ? "-" : con->_port);
  251. /* We read a complete packet. */
  252. if (Server->flags.threaded)
  253. {
  254. /* Multi-threaded, queue for the processing thread to run. */
  255. gearman_server_proc_packet_add(con, con->packet);
  256. con->packet= NULL;
  257. }
  258. else
  259. {
  260. /* Single threaded, run the command here. */
  261. gearmand_error_t rc= gearman_server_run_command(con, &(con->packet->packet));
  262. gearmand_packet_free(&(con->packet->packet));
  263. gearman_server_packet_free(con->packet, con->thread, true);
  264. con->packet= NULL;
  265. if (gearmand_failed(rc))
  266. return rc;
  267. }
  268. }
  269. return GEARMAN_SUCCESS;
  270. }
  271. static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con)
  272. {
  273. /* Check to see if we've already tried to avoid excessive system calls. */
  274. if (con->con.events & POLLOUT)
  275. return GEARMAN_IO_WAIT;
  276. while (con->io_packet_list)
  277. {
  278. gearmand_error_t ret;
  279. ret= gearman_io_send(con, &(con->io_packet_list->packet),
  280. con->io_packet_list->next == NULL ? true : false);
  281. if (gearmand_failed(ret))
  282. {
  283. return ret;
  284. }
  285. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  286. "Sent %s to %s:%d",
  287. gearman_command_info(con->io_packet_list->packet.command)->name,
  288. con->_host == NULL ? "-" : con->_host,
  289. con->_port == NULL ? "-" : con->_port);
  290. gearman_server_io_packet_remove(con);
  291. }
  292. /* Clear the POLLOUT flag. */
  293. return gearmand_io_set_events(con, POLLIN);
  294. }
  295. static gearmand_error_t _proc_thread_start(gearman_server_st *server)
  296. {
  297. if ((errno= pthread_mutex_init(&(server->proc_lock), NULL)))
  298. {
  299. gearmand_perror("pthread_mutex_init");
  300. return GEARMAN_ERRNO;
  301. }
  302. if ((errno= pthread_cond_init(&(server->proc_cond), NULL)))
  303. {
  304. gearmand_perror("pthread_cond_init");
  305. return GEARMAN_ERRNO;
  306. }
  307. pthread_attr_t attr;
  308. if ((errno= pthread_attr_init(&attr)))
  309. {
  310. gearmand_perror("pthread_attr_init");
  311. return GEARMAN_ERRNO;
  312. }
  313. if ((errno= pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)))
  314. {
  315. gearmand_perror("pthread_attr_setscope");
  316. (void) pthread_attr_destroy(&attr);
  317. return GEARMAN_ERRNO;
  318. }
  319. if ((errno= pthread_create(&(server->proc_id), &attr, _proc, server)))
  320. {
  321. gearmand_perror("pthread_create");
  322. (void) pthread_attr_destroy(&attr);
  323. return GEARMAN_ERRNO;
  324. }
  325. (void) pthread_attr_destroy(&attr);
  326. server->flags.threaded= true;
  327. return GEARMAN_SUCCESS;
  328. }
  329. static void _proc_thread_kill(gearman_server_st *server)
  330. {
  331. if (! (server->flags.threaded) || server->proc_shutdown)
  332. return;
  333. server->proc_shutdown= true;
  334. /* Signal proc thread to shutdown. */
  335. (void) pthread_mutex_lock(&(server->proc_lock));
  336. (void) pthread_cond_signal(&(server->proc_cond));
  337. (void) pthread_mutex_unlock(&(server->proc_lock));
  338. /* Wait for the proc thread to exit and then cleanup. */
  339. (void) pthread_join(server->proc_id, NULL);
  340. (void) pthread_cond_destroy(&(server->proc_cond));
  341. (void) pthread_mutex_destroy(&(server->proc_lock));
  342. }
  343. static void *_proc(void *data)
  344. {
  345. gearman_server_st *server= (gearman_server_st *)data;
  346. gearman_server_thread_st *thread;
  347. gearman_server_con_st *con;
  348. gearman_server_packet_st *packet;
  349. gearmand_initialize_thread_logging("[ proc ]");
  350. while (1)
  351. {
  352. (void) pthread_mutex_lock(&(server->proc_lock));
  353. while (server->proc_wakeup == false)
  354. {
  355. if (server->proc_shutdown)
  356. {
  357. (void) pthread_mutex_unlock(&(server->proc_lock));
  358. return NULL;
  359. }
  360. (void) pthread_cond_wait(&(server->proc_cond), &(server->proc_lock));
  361. }
  362. server->proc_wakeup= false;
  363. (void) pthread_mutex_unlock(&(server->proc_lock));
  364. for (thread= server->thread_list; thread != NULL; thread= thread->next)
  365. {
  366. while ((con= gearman_server_con_proc_next(thread)) != NULL)
  367. {
  368. if (con->is_dead)
  369. {
  370. gearman_server_con_free_workers(con);
  371. while (con->client_list != NULL)
  372. gearman_server_client_free(con->client_list);
  373. con->proc_removed= true;
  374. gearman_server_con_io_add(con);
  375. continue;
  376. }
  377. while (1)
  378. {
  379. packet= gearman_server_proc_packet_remove(con);
  380. if (packet == NULL)
  381. break;
  382. con->ret= gearman_server_run_command(con, &(packet->packet));
  383. gearmand_packet_free(&(packet->packet));
  384. gearman_server_packet_free(packet, con->thread, false);
  385. }
  386. }
  387. }
  388. }
  389. }