gearmand_thread.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Gearmand Thread Definitions
  41. */
  42. #include "gear_config.h"
  43. #include "libgearman-server/common.h"
  44. #include <libgearman-server/gearmand.h>
  45. #include <cassert>
  46. #include <cerrno>
  47. #include <memory>
  48. #include <csignal>
  49. /*
  50. * Private declarations
  51. */
  52. namespace
  53. {
  54. #if defined(HAVE_PTHREAD_TIMEDJOIN_NP) && HAVE_PTHREAD_TIMEDJOIN_NP
  55. bool fill_timespec(struct timespec& ts)
  56. {
  57. #if defined(HAVE_CLOCK_GETTIME) && HAVE_CLOCK_GETTIME
  58. if (HAVE_CLOCK_GETTIME) // This won't be called on OSX, etc,...
  59. {
  60. if (clock_gettime(CLOCK_REALTIME, &ts) == -1)
  61. {
  62. gearmand_perror(errno, "clock_gettime(CLOCK_REALTIME)");
  63. return false;
  64. }
  65. }
  66. #else
  67. {
  68. struct timeval tv;
  69. if (gettimeofday(&tv, NULL) == -1)
  70. {
  71. gearmand_perror(errno, "gettimeofday()");
  72. return false;
  73. }
  74. TIMEVAL_TO_TIMESPEC(&tv, &ts);
  75. }
  76. #endif
  77. return true;
  78. }
  79. #endif // defined(HAVE_PTHREAD_TIMEDJOIN_NP) && HAVE_PTHREAD_TIMEDJOIN_NP
  80. }
  81. /**
  82. * @addtogroup gearmand_thread_private Private Gearmand Thread Functions
  83. * @ingroup gearmand_thread
  84. * @{
  85. */
  86. static void *_thread(void *data);
  87. static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st *dthread);
  88. static void _run(gearman_server_thread_st *thread, void *fn_arg);
  89. static gearmand_error_t _wakeup_init(gearmand_thread_st *thread);
  90. static void _wakeup_close(gearmand_thread_st *thread);
  91. static void _wakeup_clear(gearmand_thread_st *thread);
  92. static void _wakeup_event(int fd, short events, void *arg);
  93. static void _clear_events(gearmand_thread_st *thread);
  94. namespace {
  95. gearmand_error_t gearmand_connection_watch(gearmand_io_st *con, short events, void *)
  96. {
  97. short set_events= 0;
  98. gearmand_con_st* dcon= gearman_io_context(con);
  99. if (events & POLLIN)
  100. {
  101. set_events|= EV_READ;
  102. }
  103. if (events & POLLOUT)
  104. {
  105. set_events|= EV_WRITE;
  106. }
  107. if (dcon->last_events != set_events)
  108. {
  109. if (dcon->last_events)
  110. {
  111. if (event_del(&(dcon->event)) == -1)
  112. {
  113. gearmand_perror(errno, "event_del");
  114. assert_msg(false, "event_del");
  115. }
  116. }
  117. event_set(&(dcon->event), dcon->fd, set_events | EV_PERSIST, _con_ready, dcon);
  118. if (event_base_set(dcon->thread->base, &(dcon->event)) == -1)
  119. {
  120. gearmand_perror(errno, "event_base_set");
  121. assert_msg(false, "event_del");
  122. }
  123. if (event_add(&(dcon->event), NULL) == -1)
  124. {
  125. gearmand_perror(errno, "event_add");
  126. return GEARMAND_EVENT;
  127. }
  128. dcon->last_events= set_events;
  129. }
  130. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  131. "%15s:%5s Watching %6s %s",
  132. dcon->host, dcon->port,
  133. events & POLLIN ? "POLLIN" : "",
  134. events & POLLOUT ? "POLLOUT" : "");
  135. return GEARMAND_SUCCESS;
  136. }
  137. }
  138. gearmand_thread_st::gearmand_thread_st(gearmand_st& gearmand_):
  139. is_thread_lock(false),
  140. is_wakeup_event(false),
  141. count(0),
  142. dcon_count(0),
  143. dcon_add_count(0),
  144. free_dcon_count(0),
  145. _gearmand(gearmand_),
  146. next(NULL),
  147. prev(NULL),
  148. base(NULL),
  149. dcon_list(NULL),
  150. dcon_add_list(NULL),
  151. free_dcon_list(0)
  152. {
  153. }
  154. /** @} */
  155. /*
  156. * Public definitions
  157. */
  158. gearmand_error_t gearmand_thread_create(gearmand_st& gearmand)
  159. {
  160. gearmand_thread_st* thread= new (std::nothrow) gearmand_thread_st(gearmand);
  161. if (thread == NULL)
  162. {
  163. return gearmand_merror("new", gearmand_thread_st, 1);
  164. }
  165. if (! gearman_server_thread_init(gearmand_server(&gearmand), &(thread->server_thread),
  166. _log, thread, gearmand_connection_watch))
  167. {
  168. delete thread;
  169. gearmand_fatal("gearman_server_thread_init(NULL)");
  170. return GEARMAND_MEMORY_ALLOCATION_FAILURE;
  171. }
  172. thread->is_thread_lock= false;
  173. thread->is_wakeup_event= false;
  174. thread->count= 0;
  175. thread->dcon_count= 0;
  176. thread->dcon_add_count= 0;
  177. thread->free_dcon_count= 0;
  178. thread->wakeup_fd[0]= -1;
  179. thread->wakeup_fd[1]= -1;
  180. GEARMAND_LIST__ADD(Gearmand()->thread, thread);
  181. thread->dcon_list= NULL;
  182. thread->dcon_add_list= NULL;
  183. thread->free_dcon_list= NULL;
  184. /* If we have no threads, we still create a fake thread that uses the main
  185. libevent instance. Otherwise create a libevent instance for each thread. */
  186. if (gearmand.threads == 0)
  187. {
  188. thread->base= gearmand.base;
  189. }
  190. else
  191. {
  192. gearmand_debug("Initializing libevent for IO thread");
  193. assert(thread->base == NULL);
  194. thread->base= event_base_new();
  195. if (thread->base == NULL)
  196. {
  197. gearmand_thread_free(thread);
  198. gearmand_fatal("event_base_new()");
  199. return GEARMAND_EVENT;
  200. }
  201. }
  202. gearmand_error_t ret;
  203. if (gearmand_failed(ret= _wakeup_init(thread)))
  204. {
  205. gearmand_thread_free(thread);
  206. return ret;
  207. }
  208. /* If we are not running multi-threaded, just return the thread context. */
  209. if (gearmand.threads == 0)
  210. {
  211. return GEARMAND_SUCCESS;
  212. }
  213. thread->count= gearmand.thread_count;
  214. int pthread_ret= pthread_mutex_init(&(thread->lock), NULL);
  215. if (pthread_ret != 0)
  216. {
  217. thread->count= 0;
  218. gearmand_thread_free(thread);
  219. return gearmand_fatal_perror(pthread_ret, "pthread_mutex_init");
  220. }
  221. thread->is_thread_lock= true;
  222. thread->server_thread.run(_run, thread);
  223. pthread_ret= pthread_create(&(thread->id), NULL, _thread, thread);
  224. if (pthread_ret != 0)
  225. {
  226. thread->count= 0;
  227. gearmand_thread_free(thread);
  228. return gearmand_perror(pthread_ret, "pthread_create");
  229. }
  230. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u created", thread->count);
  231. return GEARMAND_SUCCESS;
  232. }
  233. void gearmand_thread_free(gearmand_thread_st *thread)
  234. {
  235. if (thread)
  236. {
  237. if (Gearmand()->threads and thread->count > 0)
  238. {
  239. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Shutting down thread %u", thread->count);
  240. gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
  241. int pthread_error= -1;
  242. #if defined(HAVE_PTHREAD_TIMEDJOIN_NP) && HAVE_PTHREAD_TIMEDJOIN_NP
  243. {
  244. struct timespec ts;
  245. if (fill_timespec(ts))
  246. {
  247. ts.tv_sec+= 300;
  248. pthread_error= pthread_timedjoin_np(thread->id, NULL, &ts);
  249. if (pthread_error)
  250. {
  251. gearmand_perror(pthread_error, "pthread_timedjoin_np");
  252. }
  253. }
  254. if (pthread_error != 0)
  255. {
  256. pthread_error= pthread_kill(thread->id, SIGQUIT);
  257. if (pthread_error)
  258. {
  259. gearmand_perror(pthread_error, "pthread_kill(, SIGQUIT)");
  260. }
  261. pthread_error= pthread_join(thread->id, NULL);
  262. }
  263. }
  264. #else
  265. pthread_error= pthread_join(thread->id, NULL);
  266. #endif
  267. if (pthread_error)
  268. {
  269. gearmand_perror(pthread_error, "pthread_join");
  270. }
  271. }
  272. if (thread->is_thread_lock)
  273. {
  274. int pthread_error;
  275. if ((pthread_error= pthread_mutex_destroy(&(thread->lock))))
  276. {
  277. gearmand_perror(pthread_error, "pthread_mutex_destroy");
  278. }
  279. }
  280. _wakeup_close(thread);
  281. while (thread->dcon_list != NULL)
  282. {
  283. gearmand_con_free(thread->dcon_list);
  284. }
  285. while (thread->dcon_add_list != NULL)
  286. {
  287. gearmand_con_st* dcon= thread->dcon_add_list;
  288. thread->dcon_add_list= dcon->next;
  289. dcon->close_socket();
  290. delete dcon;
  291. }
  292. while (thread->free_dcon_list != NULL)
  293. {
  294. gearmand_con_st* dcon= thread->free_dcon_list;
  295. thread->free_dcon_list= dcon->next;
  296. delete dcon;
  297. }
  298. gearman_server_thread_free(&(thread->server_thread));
  299. GEARMAND_LIST__DEL(Gearmand()->thread, thread);
  300. if (Gearmand()->threads > 0)
  301. {
  302. if (thread->base != NULL)
  303. {
  304. if (Gearmand()->base == thread->base)
  305. {
  306. Gearmand()->base= NULL;
  307. }
  308. event_base_free(thread->base);
  309. thread->base= NULL;
  310. }
  311. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u shutdown complete", thread->count);
  312. }
  313. delete thread;
  314. }
  315. }
  316. void gearmand_thread_wakeup(gearmand_thread_st *thread,
  317. gearmand_wakeup_t wakeup)
  318. {
  319. uint8_t buffer= wakeup;
  320. /* If this fails, there is not much we can really do. This should never fail
  321. though if the main gearmand thread is still active. */
  322. int limit= 5;
  323. ssize_t written;
  324. while (--limit)
  325. {
  326. if ((written= write(thread->wakeup_fd[1], &buffer, 1)) != 1)
  327. {
  328. if (written < 0)
  329. {
  330. switch (errno)
  331. {
  332. case EINTR:
  333. continue;
  334. default:
  335. break;
  336. }
  337. gearmand_perror(errno, gearmand_strwakeup(wakeup));
  338. }
  339. else
  340. {
  341. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  342. "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);
  343. }
  344. }
  345. break;
  346. }
  347. }
  348. void gearmand_thread_run(gearmand_thread_st *thread)
  349. {
  350. while (1)
  351. {
  352. gearmand_error_t ret;
  353. gearmand_con_st *dcon= gearman_server_thread_run(&(thread->server_thread), &ret);
  354. if (ret == GEARMAND_SUCCESS or
  355. ret == GEARMAND_IO_WAIT or
  356. ret == GEARMAND_SHUTDOWN_GRACEFUL)
  357. {
  358. return;
  359. }
  360. if (dcon == NULL)
  361. {
  362. /* We either got a GEARMAND_SHUTDOWN or some other fatal internal error.
  363. Either way, we want to shut the server down. */
  364. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  365. return;
  366. }
  367. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Disconnected %s:%s", dcon->host, dcon->port);
  368. gearmand_con_free(dcon);
  369. }
  370. }
  371. #pragma GCC diagnostic push
  372. #ifndef __INTEL_COMPILER
  373. # pragma GCC diagnostic ignored "-Wold-style-cast"
  374. #endif
  375. /*
  376. * Private definitions
  377. */
  378. static void *_thread(void *data)
  379. {
  380. gearmand_thread_st *thread= (gearmand_thread_st *)data;
  381. char buffer[BUFSIZ];
  382. int length= snprintf(buffer, sizeof(buffer), "[%6u ]", thread->count);
  383. if (length <= 0 or sizeof(length) >= sizeof(buffer))
  384. {
  385. assert(0);
  386. buffer[0]= 0;
  387. }
  388. (void)gearmand_initialize_thread_logging(buffer);
  389. gearmand_debug("Entering thread event loop");
  390. if (event_base_loop(thread->base, 0) == -1)
  391. {
  392. gearmand_fatal("event_base_loop(-1)");
  393. Gearmand()->ret= GEARMAND_EVENT;
  394. }
  395. gearmand_debug("Exiting thread event loop");
  396. return NULL;
  397. }
  398. static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st*)
  399. {
  400. if (Gearmand())
  401. {
  402. (*Gearmand()->log_fn)(line, verbose, (void *)Gearmand()->log_context);
  403. }
  404. }
  405. static void _run(gearman_server_thread_st*, void *fn_arg)
  406. {
  407. if (fn_arg)
  408. {
  409. gearmand_thread_st *dthread= (gearmand_thread_st*)fn_arg;
  410. gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
  411. }
  412. }
  413. static gearmand_error_t _wakeup_init(gearmand_thread_st *thread)
  414. {
  415. gearmand_debug("Creating IO thread wakeup pipe");
  416. #if defined(HAVE_PIPE2) && HAVE_PIPE2
  417. if (pipe2(thread->wakeup_fd, O_NONBLOCK) == -1)
  418. {
  419. return gearmand_perror(errno, "pipe");
  420. }
  421. #else
  422. if (pipe(thread->wakeup_fd) == -1)
  423. {
  424. return gearmand_perror(errno, "pipe");
  425. }
  426. gearmand_error_t local_ret;
  427. if ((local_ret= gearmand_sockfd_nonblock(thread->wakeup_fd[0])))
  428. {
  429. return local_ret;
  430. }
  431. #endif
  432. event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
  433. _wakeup_event, thread);
  434. if (event_base_set(thread->base, &(thread->wakeup_event)) == -1)
  435. {
  436. gearmand_perror(errno, "event_base_set");
  437. }
  438. if (event_add(&(thread->wakeup_event), NULL) < 0)
  439. {
  440. gearmand_perror(errno, "event_add");
  441. return GEARMAND_EVENT;
  442. }
  443. thread->is_wakeup_event= true;
  444. return GEARMAND_SUCCESS;
  445. }
  446. static void _wakeup_close(gearmand_thread_st *thread)
  447. {
  448. _wakeup_clear(thread);
  449. if (thread->wakeup_fd[0] >= 0)
  450. {
  451. gearmand_debug("Closing IO thread wakeup pipe");
  452. gearmand_pipe_close(thread->wakeup_fd[0]);
  453. thread->wakeup_fd[0]= -1;
  454. gearmand_pipe_close(thread->wakeup_fd[1]);
  455. thread->wakeup_fd[1]= -1;
  456. }
  457. }
  458. static void _wakeup_clear(gearmand_thread_st *thread)
  459. {
  460. if (thread->is_wakeup_event)
  461. {
  462. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Clearing event for IO thread wakeup pipe %u", thread->count);
  463. if (event_del(&(thread->wakeup_event)) < 0)
  464. {
  465. gearmand_perror(errno, "event_del() failure, shutdown may hang");
  466. }
  467. thread->is_wakeup_event= false;
  468. }
  469. }
  470. #pragma GCC diagnostic push
  471. #pragma GCC diagnostic ignored "-Wunreachable-code"
  472. static void _wakeup_event(int fd, short events __attribute__ ((unused)), void *arg)
  473. {
  474. gearmand_thread_st *thread= (gearmand_thread_st *)arg;
  475. uint8_t buffer[GEARMAND_PIPE_BUFFER_SIZE];
  476. ssize_t ret;
  477. while (1)
  478. {
  479. ret= read(fd, buffer, GEARMAND_PIPE_BUFFER_SIZE);
  480. if (ret == 0)
  481. {
  482. _clear_events(thread);
  483. gearmand_fatal("read(EOF)");
  484. Gearmand()->ret= GEARMAND_PIPE_EOF;
  485. return;
  486. }
  487. else if (ret == -1)
  488. {
  489. int local_errno= errno;
  490. if (local_errno == EINTR)
  491. {
  492. continue;
  493. }
  494. if (local_errno == EAGAIN)
  495. {
  496. break;
  497. }
  498. _clear_events(thread);
  499. gearmand_perror(local_errno, "_wakeup_event:read");
  500. Gearmand()->ret= GEARMAND_ERRNO;
  501. return;
  502. }
  503. for (ssize_t x= 0; x < ret; x++)
  504. {
  505. switch ((gearmand_wakeup_t)buffer[x])
  506. {
  507. case GEARMAND_WAKEUP_PAUSE:
  508. gearmand_debug("Received PAUSE wakeup event");
  509. break;
  510. case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
  511. gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
  512. if (gearman_server_shutdown_graceful(&(Gearmand()->server)) == GEARMAND_SHUTDOWN)
  513. {
  514. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  515. }
  516. break;
  517. case GEARMAND_WAKEUP_SHUTDOWN:
  518. gearmand_debug("Received SHUTDOWN wakeup event");
  519. _clear_events(thread);
  520. break;
  521. case GEARMAND_WAKEUP_CON:
  522. gearmand_debug("Received CON wakeup event");
  523. gearmand_con_check_queue(thread);
  524. break;
  525. case GEARMAND_WAKEUP_RUN:
  526. gearmand_debug("Received RUN wakeup event");
  527. gearmand_thread_run(thread);
  528. break;
  529. default:
  530. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
  531. _clear_events(thread);
  532. Gearmand()->ret= GEARMAND_UNKNOWN_STATE;
  533. break;
  534. }
  535. }
  536. }
  537. }
  538. #pragma GCC diagnostic pop
  539. static void _clear_events(gearmand_thread_st *thread)
  540. {
  541. _wakeup_clear(thread);
  542. while (thread->dcon_list != NULL)
  543. {
  544. gearmand_con_free(thread->dcon_list);
  545. }
  546. }
  547. #pragma GCC diagnostic pop