thread.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Server Thread Definitions
  11. */
  12. #include <config.h>
  13. #include <libgearman-server/common.h>
  14. #define GEARMAN_CORE
  15. #include <libgearman/command.h>
  16. #ifdef __cplusplus
  17. #include <cassert>
  18. #include <cerrno>
  19. #else
  20. #include <assert.h>
  21. #include <errno.h>
  22. #endif
  23. /*
  24. * Private declarations
  25. */
  26. /**
  27. * @addtogroup gearman_server_private Private Server Functions
  28. * @ingroup gearman_server
  29. * @{
  30. */
  31. /**
  32. * Try reading packets for a connection.
  33. */
  34. static gearmand_error_t _thread_packet_read(gearman_server_con_st *con);
  35. /**
  36. * Flush outgoing packets for a connection.
  37. */
  38. static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con);
  39. /**
  40. * Start processing thread for the server.
  41. */
  42. static gearmand_error_t _proc_thread_start(gearman_server_st *server);
  43. /**
  44. * Kill processing thread for the server.
  45. */
  46. static void _proc_thread_kill(gearman_server_st *server);
  47. /**
  48. * Processing thread.
  49. */
  50. static void *_proc(void *data);
  51. /** @} */
  52. /*
  53. * Public definitions
  54. */
  55. bool gearman_server_thread_init(gearman_server_st *server,
  56. gearman_server_thread_st *thread,
  57. gearman_log_server_fn *log_function,
  58. gearmand_thread_st *context,
  59. gearmand_event_watch_fn *event_watch)
  60. {
  61. assert(server);
  62. assert(thread);
  63. if (server->thread_count == 1)
  64. {
  65. /* The server is going to be multi-threaded, start processing thread. */
  66. if (_proc_thread_start(server) != GEARMAN_SUCCESS)
  67. {
  68. return false;
  69. }
  70. }
  71. thread->con_count= 0;
  72. thread->io_count= 0;
  73. thread->proc_count= 0;
  74. thread->free_con_count= 0;
  75. thread->free_packet_count= 0;
  76. thread->log_fn= log_function;
  77. thread->log_context= context;
  78. thread->run_fn= NULL;
  79. thread->run_fn_arg= NULL;
  80. thread->con_list= NULL;
  81. thread->io_list= NULL;
  82. thread->proc_list= NULL;
  83. thread->free_con_list= NULL;
  84. thread->free_packet_list= NULL;
  85. int error;
  86. if ((error= pthread_mutex_init(&(thread->lock), NULL)))
  87. {
  88. errno= error;
  89. gearmand_perror("pthread_mutex_init");
  90. return false;
  91. }
  92. GEARMAN_LIST_ADD(server->thread, thread,);
  93. thread->gearman= &(thread->gearmand_connection_list_static);
  94. gearmand_connection_list_init(thread->gearman, event_watch, NULL);
  95. return true;
  96. }
  97. void gearman_server_thread_free(gearman_server_thread_st *thread)
  98. {
  99. gearman_server_con_st *con;
  100. gearman_server_packet_st *packet;
  101. _proc_thread_kill(Server);
  102. while (thread->con_list != NULL)
  103. {
  104. gearman_server_con_free(thread->con_list);
  105. }
  106. while (thread->free_con_list != NULL)
  107. {
  108. con= thread->free_con_list;
  109. thread->free_con_list= con->next;
  110. gearmand_debug("free");
  111. free(con);
  112. }
  113. while (thread->free_packet_list != NULL)
  114. {
  115. packet= thread->free_packet_list;
  116. thread->free_packet_list= packet->next;
  117. gearmand_debug("free");
  118. free(packet);
  119. }
  120. if (thread->gearman != NULL)
  121. gearman_connection_list_free(thread->gearman);
  122. pthread_mutex_destroy(&(thread->lock));
  123. GEARMAN_LIST_DEL(Server->thread, thread,)
  124. }
  125. void gearman_server_thread_set_run(gearman_server_thread_st *thread,
  126. gearman_server_thread_run_fn *run_fn,
  127. void *run_fn_arg)
  128. {
  129. thread->run_fn= run_fn;
  130. thread->run_fn_arg= run_fn_arg;
  131. }
  132. gearmand_con_st *
  133. gearman_server_thread_run(gearman_server_thread_st *thread,
  134. gearmand_error_t *ret_ptr)
  135. {
  136. /* If we are multi-threaded, we may have packets to flush or connections that
  137. should start reading again. */
  138. if (Server->flags.threaded)
  139. {
  140. gearman_server_con_st *server_con;
  141. while ((server_con= gearman_server_con_io_next(thread)) != NULL)
  142. {
  143. if (server_con->is_dead)
  144. {
  145. if (server_con->proc_removed)
  146. {
  147. gearman_server_con_free(server_con);
  148. }
  149. continue;
  150. }
  151. if (server_con->ret != GEARMAN_SUCCESS)
  152. {
  153. *ret_ptr= server_con->ret;
  154. return gearman_server_con_data(server_con);
  155. }
  156. /* See if any outgoing packets were queued. */
  157. *ret_ptr= _thread_packet_flush(server_con);
  158. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  159. {
  160. return gearman_server_con_data(server_con);
  161. }
  162. }
  163. }
  164. /* Check for new activity on connections. */
  165. {
  166. gearman_server_con_st *server_con;
  167. while ((server_con= gearmand_ready(thread->gearman)))
  168. {
  169. /* Try to read new packets. */
  170. if (server_con->con.revents & POLLIN)
  171. {
  172. *ret_ptr= _thread_packet_read(server_con);
  173. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  174. return gearman_server_con_data(server_con);
  175. }
  176. /* Flush existing outgoing packets. */
  177. if (server_con->con.revents & POLLOUT)
  178. {
  179. *ret_ptr= _thread_packet_flush(server_con);
  180. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  181. {
  182. return gearman_server_con_data(server_con);
  183. }
  184. }
  185. }
  186. }
  187. /* Start flushing new outgoing packets if we are single threaded. */
  188. if (! (Server->flags.threaded))
  189. {
  190. gearman_server_con_st *server_con;
  191. while ((server_con= gearman_server_con_io_next(thread)))
  192. {
  193. *ret_ptr= _thread_packet_flush(server_con);
  194. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  195. {
  196. return gearman_server_con_data(server_con);
  197. }
  198. }
  199. }
  200. /* Check for the two shutdown modes. */
  201. if (Server->shutdown)
  202. {
  203. *ret_ptr= GEARMAN_SHUTDOWN;
  204. }
  205. else if (Server->shutdown_graceful)
  206. {
  207. if (Server->job_count == 0)
  208. {
  209. *ret_ptr= GEARMAN_SHUTDOWN;
  210. }
  211. else
  212. {
  213. *ret_ptr= GEARMAN_SHUTDOWN_GRACEFUL;
  214. }
  215. }
  216. else
  217. {
  218. *ret_ptr= GEARMAN_SUCCESS;
  219. }
  220. return NULL;
  221. }
  222. /*
  223. * Private definitions
  224. */
  225. static gearmand_error_t _thread_packet_read(gearman_server_con_st *con)
  226. {
  227. while (1)
  228. {
  229. if (con->packet == NULL)
  230. {
  231. if (! (con->packet= gearman_server_packet_create(con->thread, true)))
  232. {
  233. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  234. }
  235. }
  236. gearmand_error_t ret;
  237. if (gearmand_failed(ret= gearman_io_recv(con, true)))
  238. {
  239. if (ret == GEARMAN_IO_WAIT)
  240. {
  241. break;
  242. }
  243. gearman_server_packet_free(con->packet, con->thread, true);
  244. con->packet= NULL;
  245. return ret;
  246. }
  247. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  248. "Received %s %s:%u",
  249. gearman_command_info(con->packet->packet.command)->name,
  250. con->_host == NULL ? "-" : con->_host,
  251. con->_port == NULL ? "-" : con->_port);
  252. /* We read a complete packet. */
  253. if (Server->flags.threaded)
  254. {
  255. /* Multi-threaded, queue for the processing thread to run. */
  256. gearman_server_proc_packet_add(con, con->packet);
  257. con->packet= NULL;
  258. }
  259. else
  260. {
  261. /* Single threaded, run the command here. */
  262. gearmand_error_t rc= gearman_server_run_command(con, &(con->packet->packet));
  263. gearmand_packet_free(&(con->packet->packet));
  264. gearman_server_packet_free(con->packet, con->thread, true);
  265. con->packet= NULL;
  266. if (gearmand_failed(rc))
  267. return rc;
  268. }
  269. }
  270. return GEARMAN_SUCCESS;
  271. }
  272. static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con)
  273. {
  274. /* Check to see if we've already tried to avoid excessive system calls. */
  275. if (con->con.events & POLLOUT)
  276. return GEARMAN_IO_WAIT;
  277. while (con->io_packet_list)
  278. {
  279. gearmand_error_t ret;
  280. ret= gearman_io_send(con, &(con->io_packet_list->packet),
  281. con->io_packet_list->next == NULL ? true : false);
  282. if (gearmand_failed(ret))
  283. {
  284. return ret;
  285. }
  286. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  287. "Sent %s to %s:%d",
  288. gearman_command_info(con->io_packet_list->packet.command)->name,
  289. con->_host == NULL ? "-" : con->_host,
  290. con->_port == NULL ? "-" : con->_port);
  291. gearman_server_io_packet_remove(con);
  292. }
  293. /* Clear the POLLOUT flag. */
  294. return gearmand_io_set_events(con, POLLIN);
  295. }
  296. static gearmand_error_t _proc_thread_start(gearman_server_st *server)
  297. {
  298. if ((errno= pthread_mutex_init(&(server->proc_lock), NULL)))
  299. {
  300. gearmand_perror("pthread_mutex_init");
  301. return GEARMAN_ERRNO;
  302. }
  303. if ((errno= pthread_cond_init(&(server->proc_cond), NULL)))
  304. {
  305. gearmand_perror("pthread_cond_init");
  306. return GEARMAN_ERRNO;
  307. }
  308. pthread_attr_t attr;
  309. if ((errno= pthread_attr_init(&attr)))
  310. {
  311. gearmand_perror("pthread_attr_init");
  312. return GEARMAN_ERRNO;
  313. }
  314. if ((errno= pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)))
  315. {
  316. gearmand_perror("pthread_attr_setscope");
  317. (void) pthread_attr_destroy(&attr);
  318. return GEARMAN_ERRNO;
  319. }
  320. if ((errno= pthread_create(&(server->proc_id), &attr, _proc, server)))
  321. {
  322. gearmand_perror("pthread_create");
  323. (void) pthread_attr_destroy(&attr);
  324. return GEARMAN_ERRNO;
  325. }
  326. (void) pthread_attr_destroy(&attr);
  327. server->flags.threaded= true;
  328. return GEARMAN_SUCCESS;
  329. }
  330. static void _proc_thread_kill(gearman_server_st *server)
  331. {
  332. if (! (server->flags.threaded) || server->proc_shutdown)
  333. return;
  334. server->proc_shutdown= true;
  335. /* Signal proc thread to shutdown. */
  336. (void) pthread_mutex_lock(&(server->proc_lock));
  337. (void) pthread_cond_signal(&(server->proc_cond));
  338. (void) pthread_mutex_unlock(&(server->proc_lock));
  339. /* Wait for the proc thread to exit and then cleanup. */
  340. (void) pthread_join(server->proc_id, NULL);
  341. (void) pthread_cond_destroy(&(server->proc_cond));
  342. (void) pthread_mutex_destroy(&(server->proc_lock));
  343. }
  344. static void *_proc(void *data)
  345. {
  346. gearman_server_st *server= (gearman_server_st *)data;
  347. gearman_server_thread_st *thread;
  348. gearman_server_con_st *con;
  349. gearman_server_packet_st *packet;
  350. gearmand_initialize_thread_logging("[ proc ]");
  351. while (1)
  352. {
  353. (void) pthread_mutex_lock(&(server->proc_lock));
  354. while (server->proc_wakeup == false)
  355. {
  356. if (server->proc_shutdown)
  357. {
  358. (void) pthread_mutex_unlock(&(server->proc_lock));
  359. return NULL;
  360. }
  361. (void) pthread_cond_wait(&(server->proc_cond), &(server->proc_lock));
  362. }
  363. server->proc_wakeup= false;
  364. (void) pthread_mutex_unlock(&(server->proc_lock));
  365. for (thread= server->thread_list; thread != NULL; thread= thread->next)
  366. {
  367. while ((con= gearman_server_con_proc_next(thread)) != NULL)
  368. {
  369. if (con->is_dead)
  370. {
  371. gearman_server_con_free_workers(con);
  372. while (con->client_list != NULL)
  373. gearman_server_client_free(con->client_list);
  374. con->proc_removed= true;
  375. gearman_server_con_io_add(con);
  376. continue;
  377. }
  378. while (1)
  379. {
  380. packet= gearman_server_proc_packet_remove(con);
  381. if (packet == NULL)
  382. break;
  383. con->ret= gearman_server_run_command(con, &(packet->packet));
  384. gearmand_packet_free(&(packet->packet));
  385. gearman_server_packet_free(packet, con->thread, false);
  386. }
  387. }
  388. }
  389. }
  390. }