thread.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Server Thread Definitions
  41. */
  42. #include "gear_config.h"
  43. #include "libgearman-server/common.h"
  44. #include <libgearman/command.h>
  45. #ifdef __cplusplus
  46. # include <cassert>
  47. # include <cerrno>
  48. #else
  49. # include <assert.h>
  50. # include <errno.h>
  51. #endif
  52. /*
  53. * Private declarations
  54. */
  55. /**
  56. * @addtogroup gearman_server_private Private Server Functions
  57. * @ingroup gearman_server
  58. * @{
  59. */
  60. /**
  61. * Try reading packets for a connection.
  62. */
  63. static gearmand_error_t _thread_packet_read(gearman_server_con_st *con);
  64. /**
  65. * Flush outgoing packets for a connection.
  66. */
  67. static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con);
  68. /**
  69. * Start processing thread for the server.
  70. */
  71. static gearmand_error_t _proc_thread_start(gearman_server_st *server);
  72. /**
  73. * Kill processing thread for the server.
  74. */
  75. static void _proc_thread_kill(gearman_server_st *server);
  76. /** @} */
  77. /*
  78. * Public definitions
  79. */
  80. bool gearman_server_thread_init(gearman_server_st *server,
  81. gearman_server_thread_st *thread,
  82. gearman_log_server_fn *log_function,
  83. gearmand_thread_st *context,
  84. gearmand_event_watch_fn *event_watch)
  85. {
  86. assert(server);
  87. assert(thread);
  88. if (server->thread_count == 1)
  89. {
  90. /* The server is going to be multi-threaded, start processing thread. */
  91. if (_proc_thread_start(server) != GEARMAN_SUCCESS)
  92. {
  93. return false;
  94. }
  95. }
  96. thread->con_count= 0;
  97. thread->io_count= 0;
  98. thread->proc_count= 0;
  99. thread->to_be_freed_count= 0;
  100. thread->free_con_count= 0;
  101. thread->free_packet_count= 0;
  102. thread->log_fn= log_function;
  103. thread->log_context= context;
  104. thread->run_fn= NULL;
  105. thread->run_fn_arg= NULL;
  106. thread->con_list= NULL;
  107. thread->io_list= NULL;
  108. thread->proc_list= NULL;
  109. thread->free_con_list= NULL;
  110. thread->free_packet_list= NULL;
  111. thread->to_be_freed_list= NULL;
  112. int error;
  113. if ((error= pthread_mutex_init(&(thread->lock), NULL)))
  114. {
  115. gearmand_perror(error, "pthread_mutex_init");
  116. return false;
  117. }
  118. GEARMAN_LIST_ADD(server->thread, thread,);
  119. thread->gearman= &(thread->gearmand_connection_list_static);
  120. gearmand_connection_list_init(thread->gearman, event_watch, NULL);
  121. return true;
  122. }
  123. void gearman_server_thread_free(gearman_server_thread_st *thread)
  124. {
  125. _proc_thread_kill(Server);
  126. while (thread->con_list != NULL)
  127. {
  128. gearman_server_con_free(thread->con_list);
  129. }
  130. while (thread->free_con_list != NULL)
  131. {
  132. gearman_server_con_st *con= thread->free_con_list;
  133. thread->free_con_list= con->next;
  134. destroy_gearman_server_con_st(con);
  135. }
  136. while (thread->free_packet_list != NULL)
  137. {
  138. gearman_server_packet_st *packet= thread->free_packet_list;
  139. thread->free_packet_list= packet->next;
  140. destroy_gearman_server_packet_st(packet);
  141. }
  142. if (thread->gearman != NULL)
  143. {
  144. gearman_connection_list_free(thread->gearman);
  145. }
  146. pthread_mutex_destroy(&(thread->lock));
  147. GEARMAN_LIST_DEL(Server->thread, thread,)
  148. }
  149. void gearman_server_thread_set_run(gearman_server_thread_st *thread,
  150. gearman_server_thread_run_fn *run_fn,
  151. void *run_fn_arg)
  152. {
  153. thread->run_fn= run_fn;
  154. thread->run_fn_arg= run_fn_arg;
  155. }
  156. gearmand_con_st *
  157. gearman_server_thread_run(gearman_server_thread_st *thread,
  158. gearmand_error_t *ret_ptr)
  159. {
  160. /* If we are multi-threaded, we may have packets to flush or connections that
  161. should start reading again. */
  162. if (Server->flags.threaded)
  163. {
  164. gearman_server_con_st *server_con;
  165. while ((server_con= gearman_server_con_to_be_freed_next(thread)) != NULL)
  166. {
  167. if (server_con->is_dead && server_con->proc_removed)
  168. gearman_server_con_free(server_con);
  169. else
  170. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "con %llu isn't dead %d or proc removed %d, but is in to_be_freed_list",
  171. server_con, server_con->is_dead, server_con->proc_removed);
  172. }
  173. while ((server_con= gearman_server_con_io_next(thread)) != NULL)
  174. {
  175. if (server_con->is_dead)
  176. {
  177. gearman_server_con_attempt_free(server_con);
  178. continue;
  179. }
  180. if (server_con->ret != GEARMAN_SUCCESS)
  181. {
  182. *ret_ptr= server_con->ret;
  183. return gearman_server_con_data(server_con);
  184. }
  185. /* See if any outgoing packets were queued. */
  186. *ret_ptr= _thread_packet_flush(server_con);
  187. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  188. {
  189. return gearman_server_con_data(server_con);
  190. }
  191. }
  192. }
  193. /* Check for new activity on connections. */
  194. {
  195. gearman_server_con_st *server_con;
  196. while ((server_con= gearmand_ready(thread->gearman)))
  197. {
  198. /* Try to read new packets. */
  199. if (server_con->con.revents & POLLIN)
  200. {
  201. *ret_ptr= _thread_packet_read(server_con);
  202. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  203. return gearman_server_con_data(server_con);
  204. }
  205. /* Flush existing outgoing packets. */
  206. if (server_con->con.revents & POLLOUT)
  207. {
  208. *ret_ptr= _thread_packet_flush(server_con);
  209. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  210. {
  211. return gearman_server_con_data(server_con);
  212. }
  213. }
  214. }
  215. }
  216. /* Start flushing new outgoing packets if we are single threaded. */
  217. if (! (Server->flags.threaded))
  218. {
  219. gearman_server_con_st *server_con;
  220. while ((server_con= gearman_server_con_io_next(thread)))
  221. {
  222. *ret_ptr= _thread_packet_flush(server_con);
  223. if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
  224. {
  225. return gearman_server_con_data(server_con);
  226. }
  227. }
  228. }
  229. /* Check for the two shutdown modes. */
  230. if (Server->shutdown)
  231. {
  232. *ret_ptr= GEARMAN_SHUTDOWN;
  233. }
  234. else if (Server->shutdown_graceful)
  235. {
  236. if (Server->job_count == 0)
  237. {
  238. *ret_ptr= GEARMAN_SHUTDOWN;
  239. }
  240. else
  241. {
  242. *ret_ptr= GEARMAN_SHUTDOWN_GRACEFUL;
  243. }
  244. }
  245. else
  246. {
  247. *ret_ptr= GEARMAN_SUCCESS;
  248. }
  249. return NULL;
  250. }
  251. /*
  252. * Private definitions
  253. */
  254. static gearmand_error_t _thread_packet_read(gearman_server_con_st *con)
  255. {
  256. while (1)
  257. {
  258. if (con->packet == NULL)
  259. {
  260. if (! (con->packet= gearman_server_packet_create(con->thread, true)))
  261. {
  262. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  263. }
  264. }
  265. gearmand_error_t ret;
  266. if (gearmand_failed(ret= gearman_io_recv(con, true)))
  267. {
  268. if (ret == GEARMAN_IO_WAIT)
  269. {
  270. break;
  271. }
  272. gearman_server_packet_free(con->packet, con->thread, true);
  273. con->packet= NULL;
  274. return ret;
  275. }
  276. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  277. "Received %s %s:%u",
  278. gearman_command_info(con->packet->packet.command)->name,
  279. con->_host == NULL ? "-" : con->_host,
  280. con->_port == NULL ? "-" : con->_port);
  281. /* We read a complete packet. */
  282. if (Server->flags.threaded)
  283. {
  284. /* Multi-threaded, queue for the processing thread to run. */
  285. gearman_server_proc_packet_add(con, con->packet);
  286. con->packet= NULL;
  287. }
  288. else
  289. {
  290. /* Single threaded, run the command here. */
  291. gearmand_error_t rc= gearman_server_run_command(con, &(con->packet->packet));
  292. gearmand_packet_free(&(con->packet->packet));
  293. gearman_server_packet_free(con->packet, con->thread, true);
  294. con->packet= NULL;
  295. if (gearmand_failed(rc))
  296. {
  297. return rc;
  298. }
  299. }
  300. }
  301. return GEARMAN_SUCCESS;
  302. }
  303. static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con)
  304. {
  305. /* Check to see if we've already tried to avoid excessive system calls. */
  306. if (con->con.events & POLLOUT)
  307. {
  308. return GEARMAN_IO_WAIT;
  309. }
  310. while (con->io_packet_list)
  311. {
  312. gearmand_error_t ret= gearman_io_send(con, &(con->io_packet_list->packet),
  313. con->io_packet_list->next == NULL ? true : false);
  314. if (gearmand_failed(ret))
  315. {
  316. return ret;
  317. }
  318. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  319. "Sent %s to %s:%d",
  320. gearman_command_info(con->io_packet_list->packet.command)->name,
  321. con->_host == NULL ? "-" : con->_host,
  322. con->_port == NULL ? "-" : con->_port);
  323. gearman_server_io_packet_remove(con);
  324. }
  325. /* Clear the POLLOUT flag. */
  326. return gearmand_io_set_events(con, POLLIN);
  327. }
  328. static gearmand_error_t _proc_thread_start(gearman_server_st *server)
  329. {
  330. int error;
  331. if ((error= pthread_mutex_init(&(server->proc_lock), NULL)))
  332. {
  333. return gearmand_perror(error, "pthread_mutex_init");
  334. }
  335. if ((error= pthread_cond_init(&(server->proc_cond), NULL)))
  336. {
  337. return gearmand_perror(error, "pthread_cond_init");
  338. }
  339. pthread_attr_t attr;
  340. if ((error= pthread_attr_init(&attr)))
  341. {
  342. return gearmand_perror(error, "pthread_attr_init");
  343. }
  344. if ((error= pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)))
  345. {
  346. (void) pthread_attr_destroy(&attr);
  347. return gearmand_perror(error, "pthread_attr_setscope");
  348. }
  349. if ((error= pthread_create(&(server->proc_id), &attr, _proc, server)))
  350. {
  351. (void) pthread_attr_destroy(&attr);
  352. return gearmand_perror(error, "pthread_create");
  353. }
  354. if ((error= pthread_attr_destroy(&attr)))
  355. {
  356. gearmand_perror(error, "pthread_create");
  357. }
  358. server->flags.threaded= true;
  359. return GEARMAN_SUCCESS;
  360. }
  361. static void _proc_thread_kill(gearman_server_st *server)
  362. {
  363. if (! (server->flags.threaded) || server->proc_shutdown)
  364. {
  365. return;
  366. }
  367. server->proc_shutdown= true;
  368. /* Signal proc thread to shutdown. */
  369. int error;
  370. if ((error= pthread_mutex_lock(&(server->proc_lock))) == 0)
  371. {
  372. if ((error= pthread_cond_signal(&(server->proc_cond))))
  373. {
  374. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_cond_signal() failed, forcing a shutdown");
  375. }
  376. if ((error= pthread_mutex_unlock(&(server->proc_lock))))
  377. {
  378. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_unlock() failed, forcing a shutdown");
  379. }
  380. }
  381. else
  382. {
  383. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock() failed, forcing a shutdown");
  384. }
  385. /* Wait for the proc thread to exit and then cleanup. */
  386. if ((error= pthread_join(server->proc_id, NULL)))
  387. {
  388. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_join() failed, forcing a shutdown");
  389. }
  390. if ((error= pthread_cond_destroy(&(server->proc_cond))))
  391. {
  392. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_cond_destroy() failed, forcing a shutdown");
  393. }
  394. if ((error= pthread_mutex_destroy(&(server->proc_lock))))
  395. {
  396. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_destroy() failed, forcing a shutdown");
  397. }
  398. }