thread.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Server Thread Definitions
  41. */
  42. #include "gear_config.h"
  43. #include "libgearman-server/common.h"
  44. #include <libgearman/command.h>
  45. #ifdef __cplusplus
  46. # include <cassert>
  47. # include <cerrno>
  48. #else
  49. # include <assert.h>
  50. # include <errno.h>
  51. #endif
  52. /*
  53. * Private declarations
  54. */
  55. /**
  56. * @addtogroup gearman_server_private Private Server Functions
  57. * @ingroup gearman_server
  58. * @{
  59. */
  60. /**
  61. * Try reading packets for a connection.
  62. */
  63. static gearmand_error_t _thread_packet_read(gearman_server_con_st *con);
  64. /**
  65. * Flush outgoing packets for a connection.
  66. */
  67. static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con);
  68. /**
  69. * Start processing thread for the server.
  70. */
  71. static gearmand_error_t _proc_thread_start(gearman_server_st *server);
  72. /**
  73. * Kill processing thread for the server.
  74. */
  75. static void _proc_thread_kill(gearman_server_st *server);
  76. /** @} */
  77. /*
  78. * Public definitions
  79. */
  80. bool gearman_server_thread_init(gearman_server_st *server,
  81. gearman_server_thread_st *thread,
  82. gearman_log_server_fn *log_function,
  83. gearmand_thread_st *context,
  84. gearmand_event_watch_fn *event_watch)
  85. {
  86. assert(server);
  87. assert(thread);
  88. if (server->thread_count == 1)
  89. {
  90. /* The server is going to be multi-threaded, start processing thread. */
  91. if (_proc_thread_start(server) != GEARMAN_SUCCESS)
  92. {
  93. return false;
  94. }
  95. }
  96. thread->con_count= 0;
  97. thread->io_count= 0;
  98. thread->proc_count= 0;
  99. thread->to_be_freed_count= 0;
  100. thread->free_con_count= 0;
  101. thread->free_packet_count= 0;
  102. thread->log_fn= log_function;
  103. thread->log_context= context;
  104. thread->run_fn= NULL;
  105. thread->run_fn_arg= NULL;
  106. thread->con_list= NULL;
  107. thread->io_list= NULL;
  108. thread->proc_list= NULL;
  109. thread->free_con_list= NULL;
  110. thread->free_packet_list= NULL;
  111. thread->to_be_freed_list= NULL;
  112. int error;
  113. if ((error= pthread_mutex_init(&(thread->lock), NULL)))
  114. {
  115. errno= error;
  116. gearmand_perror("pthread_mutex_init");
  117. return false;
  118. }
  119. GEARMAN_LIST_ADD(server->thread, thread,);
  120. thread->gearman= &(thread->gearmand_connection_list_static);
  121. gearmand_connection_list_init(thread->gearman, event_watch, NULL);
  122. return true;
  123. }
  124. void gearman_server_thread_free(gearman_server_thread_st *thread)
  125. {
  126. _proc_thread_kill(Server);
  127. while (thread->con_list != NULL)
  128. {
  129. gearman_server_con_free(thread->con_list);
  130. }
  131. while (thread->free_con_list != NULL)
  132. {
  133. gearman_server_con_st *con= thread->free_con_list;
  134. thread->free_con_list= con->next;
  135. destroy_gearman_server_con_st(con);
  136. }
  137. while (thread->free_packet_list != NULL)
  138. {
  139. gearman_server_packet_st *packet= thread->free_packet_list;
  140. thread->free_packet_list= packet->next;
  141. destroy_gearman_server_packet_st(packet);
  142. }
  143. if (thread->gearman != NULL)
  144. {
  145. gearman_connection_list_free(thread->gearman);
  146. }
  147. pthread_mutex_destroy(&(thread->lock));
  148. GEARMAN_LIST_DEL(Server->thread, thread,)
  149. }
  150. void gearman_server_thread_set_run(gearman_server_thread_st *thread,
  151. gearman_server_thread_run_fn *run_fn,
  152. void *run_fn_arg)
  153. {
  154. thread->run_fn= run_fn;
  155. thread->run_fn_arg= run_fn_arg;
  156. }
  157. gearmand_con_st *
  158. gearman_server_thread_run(gearman_server_thread_st *thread,
  159. gearmand_error_t *ret_ptr)
  160. {
  161. /* If we are multi-threaded, we may have packets to flush or connections that
  162. should start reading again. */
  163. if (Server->flags.threaded)
  164. {
  165. gearman_server_con_st *server_con;
  166. while ((server_con= gearman_server_con_to_be_freed_next(thread)) != NULL)
  167. {
  168. if (server_con->is_dead && server_con->proc_removed)
  169. gearman_server_con_free(server_con);
  170. else
  171. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "con %llu isn't dead %d or proc removed %d, but is in to_be_freed_list",
  172. server_con, server_con->is_dead, server_con->proc_removed);
  173. }
  174. while ((server_con= gearman_server_con_io_next(thread)) != NULL)
  175. {
  176. if (server_con->is_dead)
  177. {
  178. gearman_server_con_attempt_free(server_con);
  179. continue;
  180. }
  181. if (server_con->ret != GEARMAN_SUCCESS)
  182. {
  183. *ret_ptr= server_con->ret;
  184. return gearman_server_con_data(server_con);
  185. }
  186. /* See if any outgoing packets were queued. */
  187. *ret_ptr= _thread_packet_flush(server_con);
  188. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  189. {
  190. return gearman_server_con_data(server_con);
  191. }
  192. }
  193. }
  194. /* Check for new activity on connections. */
  195. {
  196. gearman_server_con_st *server_con;
  197. while ((server_con= gearmand_ready(thread->gearman)))
  198. {
  199. /* Try to read new packets. */
  200. if (server_con->con.revents & POLLIN)
  201. {
  202. *ret_ptr= _thread_packet_read(server_con);
  203. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  204. return gearman_server_con_data(server_con);
  205. }
  206. /* Flush existing outgoing packets. */
  207. if (server_con->con.revents & POLLOUT)
  208. {
  209. *ret_ptr= _thread_packet_flush(server_con);
  210. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  211. {
  212. return gearman_server_con_data(server_con);
  213. }
  214. }
  215. }
  216. }
  217. /* Start flushing new outgoing packets if we are single threaded. */
  218. if (! (Server->flags.threaded))
  219. {
  220. gearman_server_con_st *server_con;
  221. while ((server_con= gearman_server_con_io_next(thread)))
  222. {
  223. *ret_ptr= _thread_packet_flush(server_con);
  224. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  225. {
  226. return gearman_server_con_data(server_con);
  227. }
  228. }
  229. }
  230. /* Check for the two shutdown modes. */
  231. if (Server->shutdown)
  232. {
  233. *ret_ptr= GEARMAN_SHUTDOWN;
  234. }
  235. else if (Server->shutdown_graceful)
  236. {
  237. if (Server->job_count == 0)
  238. {
  239. *ret_ptr= GEARMAN_SHUTDOWN;
  240. }
  241. else
  242. {
  243. *ret_ptr= GEARMAN_SHUTDOWN_GRACEFUL;
  244. }
  245. }
  246. else
  247. {
  248. *ret_ptr= GEARMAN_SUCCESS;
  249. }
  250. return NULL;
  251. }
  252. /*
  253. * Private definitions
  254. */
  255. static gearmand_error_t _thread_packet_read(gearman_server_con_st *con)
  256. {
  257. while (1)
  258. {
  259. if (con->packet == NULL)
  260. {
  261. if (! (con->packet= gearman_server_packet_create(con->thread, true)))
  262. {
  263. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  264. }
  265. }
  266. gearmand_error_t ret;
  267. if (gearmand_failed(ret= gearman_io_recv(con, true)))
  268. {
  269. if (ret == GEARMAN_IO_WAIT)
  270. {
  271. break;
  272. }
  273. gearman_server_packet_free(con->packet, con->thread, true);
  274. con->packet= NULL;
  275. return ret;
  276. }
  277. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  278. "Received %s %s:%u",
  279. gearman_command_info(con->packet->packet.command)->name,
  280. con->_host == NULL ? "-" : con->_host,
  281. con->_port == NULL ? "-" : con->_port);
  282. /* We read a complete packet. */
  283. if (Server->flags.threaded)
  284. {
  285. /* Multi-threaded, queue for the processing thread to run. */
  286. gearman_server_proc_packet_add(con, con->packet);
  287. con->packet= NULL;
  288. }
  289. else
  290. {
  291. /* Single threaded, run the command here. */
  292. gearmand_error_t rc= gearman_server_run_command(con, &(con->packet->packet));
  293. gearmand_packet_free(&(con->packet->packet));
  294. gearman_server_packet_free(con->packet, con->thread, true);
  295. con->packet= NULL;
  296. if (gearmand_failed(rc))
  297. {
  298. return rc;
  299. }
  300. }
  301. }
  302. return GEARMAN_SUCCESS;
  303. }
  304. static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con)
  305. {
  306. /* Check to see if we've already tried to avoid excessive system calls. */
  307. if (con->con.events & POLLOUT)
  308. {
  309. return GEARMAN_IO_WAIT;
  310. }
  311. while (con->io_packet_list)
  312. {
  313. gearmand_error_t ret= gearman_io_send(con, &(con->io_packet_list->packet),
  314. con->io_packet_list->next == NULL ? true : false);
  315. if (gearmand_failed(ret))
  316. {
  317. return ret;
  318. }
  319. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  320. "Sent %s to %s:%d",
  321. gearman_command_info(con->io_packet_list->packet.command)->name,
  322. con->_host == NULL ? "-" : con->_host,
  323. con->_port == NULL ? "-" : con->_port);
  324. gearman_server_io_packet_remove(con);
  325. }
  326. /* Clear the POLLOUT flag. */
  327. return gearmand_io_set_events(con, POLLIN);
  328. }
  329. static gearmand_error_t _proc_thread_start(gearman_server_st *server)
  330. {
  331. if ((errno= pthread_mutex_init(&(server->proc_lock), NULL)))
  332. {
  333. gearmand_perror("pthread_mutex_init");
  334. return GEARMAN_ERRNO;
  335. }
  336. if ((errno= pthread_cond_init(&(server->proc_cond), NULL)))
  337. {
  338. gearmand_perror("pthread_cond_init");
  339. return GEARMAN_ERRNO;
  340. }
  341. pthread_attr_t attr;
  342. if ((errno= pthread_attr_init(&attr)))
  343. {
  344. gearmand_perror("pthread_attr_init");
  345. return GEARMAN_ERRNO;
  346. }
  347. if ((errno= pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)))
  348. {
  349. gearmand_perror("pthread_attr_setscope");
  350. (void) pthread_attr_destroy(&attr);
  351. return GEARMAN_ERRNO;
  352. }
  353. if ((errno= pthread_create(&(server->proc_id), &attr, _proc, server)))
  354. {
  355. gearmand_perror("pthread_create");
  356. (void) pthread_attr_destroy(&attr);
  357. return GEARMAN_ERRNO;
  358. }
  359. (void) pthread_attr_destroy(&attr);
  360. server->flags.threaded= true;
  361. return GEARMAN_SUCCESS;
  362. }
  363. static void _proc_thread_kill(gearman_server_st *server)
  364. {
  365. if (! (server->flags.threaded) || server->proc_shutdown)
  366. {
  367. return;
  368. }
  369. server->proc_shutdown= true;
  370. /* Signal proc thread to shutdown. */
  371. (void) pthread_mutex_lock(&(server->proc_lock));
  372. (void) pthread_cond_signal(&(server->proc_cond));
  373. (void) pthread_mutex_unlock(&(server->proc_lock));
  374. /* Wait for the proc thread to exit and then cleanup. */
  375. (void) pthread_join(server->proc_id, NULL);
  376. (void) pthread_cond_destroy(&(server->proc_cond));
  377. (void) pthread_mutex_destroy(&(server->proc_lock));
  378. }