gearmand_thread.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Gearmand Thread Definitions
  41. */
  42. #include "gear_config.h"
  43. #include "libgearman-server/common.h"
  44. #include <libgearman-server/gearmand.h>
  45. #include <cassert>
  46. #include <cerrno>
  47. #include <memory>
  48. #include <csignal>
  49. /*
  50. * Private declarations
  51. */
  52. namespace
  53. {
  54. #if defined(HAVE_PTHREAD_TIMEDJOIN_NP) && HAVE_PTHREAD_TIMEDJOIN_NP
  55. bool fill_timespec(struct timespec& ts)
  56. {
  57. #if defined(HAVE_CLOCK_GETTIME) && HAVE_CLOCK_GETTIME
  58. if (HAVE_CLOCK_GETTIME) // This won't be called on OSX, etc,...
  59. {
  60. if (clock_gettime(CLOCK_REALTIME, &ts) == -1)
  61. {
  62. gearmand_perror(errno, "clock_gettime(CLOCK_REALTIME)");
  63. return false;
  64. }
  65. }
  66. #else
  67. {
  68. struct timeval tv;
  69. if (gettimeofday(&tv, NULL) == -1)
  70. {
  71. gearmand_perror(errno, "gettimeofday()");
  72. return false;
  73. }
  74. TIMEVAL_TO_TIMESPEC(&tv, &ts);
  75. }
  76. #endif
  77. return true;
  78. }
  79. #endif // defined(HAVE_PTHREAD_TIMEDJOIN_NP) && HAVE_PTHREAD_TIMEDJOIN_NP
  80. }
  81. /**
  82. * @addtogroup gearmand_thread_private Private Gearmand Thread Functions
  83. * @ingroup gearmand_thread
  84. * @{
  85. */
  86. static void *_thread(void *data);
  87. static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st *dthread);
  88. static void _run(gearman_server_thread_st *thread, void *fn_arg);
  89. static gearmand_error_t _wakeup_init(gearmand_thread_st *thread);
  90. static void _wakeup_close(gearmand_thread_st *thread);
  91. static void _wakeup_clear(gearmand_thread_st *thread);
  92. static void _wakeup_event(int fd, short events, void *arg);
  93. static void _clear_events(gearmand_thread_st *thread);
  94. namespace {
  95. gearmand_error_t gearmand_connection_watch(gearmand_io_st *con, short events, void *)
  96. {
  97. short set_events= 0;
  98. gearmand_con_st* dcon= gearman_io_context(con);
  99. if (events & POLLIN)
  100. {
  101. set_events|= EV_READ;
  102. }
  103. if (events & POLLOUT)
  104. {
  105. set_events|= EV_WRITE;
  106. }
  107. if (dcon->last_events != set_events)
  108. {
  109. if (dcon->last_events)
  110. {
  111. if (event_del(&(dcon->event)) == -1)
  112. {
  113. gearmand_perror(errno, "event_del");
  114. assert_msg(false, "event_del");
  115. }
  116. }
  117. event_set(&(dcon->event), dcon->fd, set_events | EV_PERSIST, _con_ready, dcon);
  118. if (event_base_set(dcon->thread->base, &(dcon->event)) == -1)
  119. {
  120. gearmand_perror(errno, "event_base_set");
  121. assert_msg(false, "event_del");
  122. }
  123. if (event_add(&(dcon->event), NULL) == -1)
  124. {
  125. gearmand_perror(errno, "event_add");
  126. return GEARMAND_EVENT;
  127. }
  128. dcon->last_events= set_events;
  129. }
  130. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  131. "%15s:%5s Watching %6s %s",
  132. dcon->host, dcon->port,
  133. events & POLLIN ? "POLLIN" : "",
  134. events & POLLOUT ? "POLLOUT" : "");
  135. return GEARMAND_SUCCESS;
  136. }
  137. }
  138. gearmand_thread_st::gearmand_thread_st(gearmand_st& gearmand_):
  139. is_thread_lock{false},
  140. is_wakeup_event{false},
  141. count{0},
  142. dcon_count{0},
  143. dcon_add_count{0},
  144. free_dcon_count{0},
  145. wakeup_fd{},
  146. #if __GNUC__ == 4 && __GNUC_MINOR__ < 9
  147. _gearmand(gearmand_),
  148. #else
  149. _gearmand{gearmand_},
  150. #endif
  151. next{nullptr},
  152. prev{nullptr},
  153. base{nullptr},
  154. dcon_list{nullptr},
  155. dcon_add_list{nullptr},
  156. free_dcon_list{nullptr},
  157. server_thread{},
  158. id{},
  159. lock{}
  160. //TODO uniform initialisation doesn't help with wakeup_event
  161. {
  162. }
  163. /** @} */
  164. /*
  165. * Public definitions
  166. */
  167. gearmand_error_t gearmand_thread_create(gearmand_st& gearmand)
  168. {
  169. gearmand_thread_st* thread= new (std::nothrow) gearmand_thread_st(gearmand);
  170. if (!thread)
  171. {
  172. return gearmand_merror("new", gearmand_thread_st, 1);
  173. }
  174. if (! gearman_server_thread_init(gearmand_server(&gearmand), &(thread->server_thread),
  175. _log, thread, gearmand_connection_watch))
  176. {
  177. delete thread;
  178. gearmand_fatal("gearman_server_thread_init(NULL)");
  179. return GEARMAND_MEMORY_ALLOCATION_FAILURE;
  180. }
  181. thread->is_thread_lock= false;
  182. thread->is_wakeup_event= false;
  183. thread->count= 0;
  184. thread->dcon_count= 0;
  185. thread->dcon_add_count= 0;
  186. thread->free_dcon_count= 0;
  187. thread->wakeup_fd[0]= -1;
  188. thread->wakeup_fd[1]= -1;
  189. GEARMAND_LIST__ADD(Gearmand()->thread, thread);
  190. thread->dcon_list= nullptr;
  191. thread->dcon_add_list= nullptr;
  192. thread->free_dcon_list= nullptr;
  193. /* If we have no threads, we still create a fake thread that uses the main
  194. libevent instance. Otherwise create a libevent instance for each thread. */
  195. if (gearmand.threads == 0)
  196. {
  197. thread->base= gearmand.base;
  198. }
  199. else
  200. {
  201. gearmand_debug("Initializing libevent for IO thread");
  202. assert(thread->base == nullptr);
  203. thread->base= event_base_new();
  204. if (!thread->base)
  205. {
  206. gearmand_thread_free(thread);
  207. gearmand_fatal("event_base_new()");
  208. return GEARMAND_EVENT;
  209. }
  210. }
  211. gearmand_error_t ret;
  212. if (gearmand_failed(ret= _wakeup_init(thread)))
  213. {
  214. gearmand_thread_free(thread);
  215. return ret;
  216. }
  217. /* If we are not running multi-threaded, just return the thread context. */
  218. if (gearmand.threads == 0)
  219. {
  220. return GEARMAND_SUCCESS;
  221. }
  222. thread->count= gearmand.thread_count;
  223. int pthread_ret= pthread_mutex_init(&(thread->lock), nullptr);
  224. if (pthread_ret != 0)
  225. {
  226. thread->count= 0;
  227. gearmand_thread_free(thread);
  228. return gearmand_fatal_perror(pthread_ret, "pthread_mutex_init");
  229. }
  230. thread->is_thread_lock= true;
  231. thread->server_thread.run(_run, thread);
  232. pthread_ret= pthread_create(&(thread->id), nullptr, _thread, thread);
  233. if (pthread_ret != 0)
  234. {
  235. thread->count= 0;
  236. gearmand_thread_free(thread);
  237. return gearmand_perror(pthread_ret, "pthread_create");
  238. }
  239. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u created", thread->count);
  240. return GEARMAND_SUCCESS;
  241. }
  242. void gearmand_thread_free(gearmand_thread_st *thread)
  243. {
  244. if (thread)
  245. {
  246. if (Gearmand()->threads and thread->count > 0)
  247. {
  248. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Shutting down thread %u", thread->count);
  249. gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
  250. int pthread_error= -1;
  251. #if defined(HAVE_PTHREAD_TIMEDJOIN_NP) && HAVE_PTHREAD_TIMEDJOIN_NP
  252. {
  253. struct timespec ts;
  254. if (fill_timespec(ts))
  255. {
  256. ts.tv_sec+= 300;
  257. pthread_error= pthread_timedjoin_np(thread->id, nullptr, &ts);
  258. if (pthread_error)
  259. {
  260. gearmand_perror(pthread_error, "pthread_timedjoin_np");
  261. }
  262. }
  263. if (pthread_error != 0)
  264. {
  265. pthread_error= pthread_kill(thread->id, SIGQUIT);
  266. if (pthread_error)
  267. {
  268. gearmand_perror(pthread_error, "pthread_kill(, SIGQUIT)");
  269. }
  270. pthread_error= pthread_join(thread->id, nullptr);
  271. }
  272. }
  273. #else
  274. pthread_error= pthread_join(thread->id, nullptr);
  275. #endif
  276. if (pthread_error)
  277. {
  278. gearmand_perror(pthread_error, "pthread_join");
  279. }
  280. }
  281. if (thread->is_thread_lock)
  282. {
  283. int pthread_error;
  284. if ((pthread_error= pthread_mutex_destroy(&(thread->lock))))
  285. {
  286. gearmand_perror(pthread_error, "pthread_mutex_destroy");
  287. }
  288. }
  289. _wakeup_close(thread);
  290. while (thread->dcon_list)
  291. {
  292. gearmand_con_free(thread->dcon_list);
  293. }
  294. while (thread->dcon_add_list)
  295. {
  296. gearmand_con_st* dcon= thread->dcon_add_list;
  297. thread->dcon_add_list= dcon->next;
  298. dcon->close_socket();
  299. delete dcon;
  300. }
  301. while (thread->free_dcon_list)
  302. {
  303. gearmand_con_st* dcon= thread->free_dcon_list;
  304. thread->free_dcon_list= dcon->next;
  305. delete dcon;
  306. }
  307. gearman_server_thread_free(&(thread->server_thread));
  308. GEARMAND_LIST__DEL(Gearmand()->thread, thread);
  309. if (Gearmand()->threads > 0)
  310. {
  311. if (thread->base)
  312. {
  313. if (Gearmand()->base == thread->base)
  314. {
  315. Gearmand()->base= nullptr;
  316. }
  317. event_base_free(thread->base);
  318. thread->base= nullptr;
  319. }
  320. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u shutdown complete", thread->count);
  321. }
  322. delete thread;
  323. }
  324. }
  325. void gearmand_thread_wakeup(gearmand_thread_st *thread,
  326. gearmand_wakeup_t wakeup)
  327. {
  328. uint8_t buffer= wakeup;
  329. /* If this fails, there is not much we can really do. This should never fail
  330. though if the main gearmand thread is still active. */
  331. int limit= 5;
  332. ssize_t written;
  333. while (--limit)
  334. {
  335. if ((written= write(thread->wakeup_fd[1], &buffer, 1)) != 1)
  336. {
  337. if (written < 0)
  338. {
  339. switch (errno)
  340. {
  341. case EINTR:
  342. continue;
  343. default:
  344. break;
  345. }
  346. gearmand_perror(errno, gearmand_strwakeup(wakeup));
  347. }
  348. else
  349. {
  350. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  351. "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);
  352. }
  353. }
  354. break;
  355. }
  356. }
  357. void gearmand_thread_run(gearmand_thread_st *thread)
  358. {
  359. while (1)
  360. {
  361. gearmand_error_t ret;
  362. gearmand_con_st *dcon= gearman_server_thread_run(&(thread->server_thread), &ret);
  363. if (ret == GEARMAND_SUCCESS or
  364. ret == GEARMAND_IO_WAIT or
  365. ret == GEARMAND_SHUTDOWN_GRACEFUL)
  366. {
  367. return;
  368. }
  369. if (!dcon)
  370. {
  371. /* We either got a GEARMAND_SHUTDOWN or some other fatal internal error.
  372. Either way, we want to shut the server down. */
  373. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  374. return;
  375. }
  376. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Disconnected %s:%s", dcon->host, dcon->port);
  377. gearmand_con_free(dcon);
  378. }
  379. }
  380. #pragma GCC diagnostic push
  381. #ifndef __INTEL_COMPILER
  382. # pragma GCC diagnostic ignored "-Wold-style-cast"
  383. #endif
  384. /*
  385. * Private definitions
  386. */
  387. static void *_thread(void *data)
  388. {
  389. gearmand_thread_st *thread= (gearmand_thread_st *)data;
  390. char buffer[BUFSIZ];
  391. int length= snprintf(buffer, sizeof(buffer), "[%6u ]", thread->count);
  392. if (length <= 0 or sizeof(length) >= sizeof(buffer))
  393. {
  394. assert(0);
  395. buffer[0]= 0;
  396. }
  397. (void)gearmand_initialize_thread_logging(buffer);
  398. gearmand_debug("Entering thread event loop");
  399. if (event_base_loop(thread->base, 0) == -1)
  400. {
  401. gearmand_fatal("event_base_loop(-1)");
  402. Gearmand()->ret= GEARMAND_EVENT;
  403. }
  404. gearmand_debug("Exiting thread event loop");
  405. return nullptr;
  406. }
  407. static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st*)
  408. {
  409. if (Gearmand())
  410. {
  411. (*Gearmand()->log_fn)(line, verbose, (void *)Gearmand()->log_context);
  412. }
  413. }
  414. static void _run(gearman_server_thread_st*, void *fn_arg)
  415. {
  416. if (fn_arg)
  417. {
  418. gearmand_thread_st *dthread= (gearmand_thread_st*)fn_arg;
  419. gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
  420. }
  421. }
  422. static gearmand_error_t _wakeup_init(gearmand_thread_st *thread)
  423. {
  424. gearmand_debug("Creating IO thread wakeup pipe");
  425. #if defined(HAVE_PIPE2) && HAVE_PIPE2
  426. if (pipe2(thread->wakeup_fd, O_NONBLOCK) == -1)
  427. {
  428. return gearmand_perror(errno, "pipe");
  429. }
  430. #else
  431. if (pipe(thread->wakeup_fd) == -1)
  432. {
  433. return gearmand_perror(errno, "pipe");
  434. }
  435. gearmand_error_t local_ret;
  436. if ((local_ret= gearmand_sockfd_nonblock(thread->wakeup_fd[0])))
  437. {
  438. return local_ret;
  439. }
  440. #endif
  441. event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
  442. _wakeup_event, thread);
  443. if (event_base_set(thread->base, &(thread->wakeup_event)) == -1)
  444. {
  445. gearmand_perror(errno, "event_base_set");
  446. }
  447. if (event_add(&(thread->wakeup_event), nullptr) < 0)
  448. {
  449. gearmand_perror(errno, "event_add");
  450. return GEARMAND_EVENT;
  451. }
  452. thread->is_wakeup_event= true;
  453. return GEARMAND_SUCCESS;
  454. }
  455. static void _wakeup_close(gearmand_thread_st *thread)
  456. {
  457. _wakeup_clear(thread);
  458. if (thread->wakeup_fd[0] >= 0)
  459. {
  460. gearmand_debug("Closing IO thread wakeup pipe");
  461. gearmand_pipe_close(thread->wakeup_fd[0]);
  462. thread->wakeup_fd[0]= -1;
  463. gearmand_pipe_close(thread->wakeup_fd[1]);
  464. thread->wakeup_fd[1]= -1;
  465. }
  466. }
  467. static void _wakeup_clear(gearmand_thread_st *thread)
  468. {
  469. if (thread->is_wakeup_event)
  470. {
  471. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Clearing event for IO thread wakeup pipe %u", thread->count);
  472. if (event_del(&(thread->wakeup_event)) < 0)
  473. {
  474. gearmand_perror(errno, "event_del() failure, shutdown may hang");
  475. }
  476. thread->is_wakeup_event= false;
  477. }
  478. }
  479. #pragma GCC diagnostic push
  480. #pragma GCC diagnostic ignored "-Wunreachable-code"
  481. static void _wakeup_event(int fd, short events __attribute__ ((unused)), void *arg)
  482. {
  483. gearmand_thread_st *thread= (gearmand_thread_st *)arg;
  484. uint8_t buffer[GEARMAND_PIPE_BUFFER_SIZE];
  485. ssize_t ret;
  486. while (1)
  487. {
  488. ret= read(fd, buffer, GEARMAND_PIPE_BUFFER_SIZE);
  489. if (ret == 0)
  490. {
  491. _clear_events(thread);
  492. gearmand_fatal("read(EOF)");
  493. Gearmand()->ret= GEARMAND_PIPE_EOF;
  494. return;
  495. }
  496. else if (ret == -1)
  497. {
  498. int local_errno= errno;
  499. if (local_errno == EINTR)
  500. {
  501. continue;
  502. }
  503. if (local_errno == EAGAIN)
  504. {
  505. break;
  506. }
  507. _clear_events(thread);
  508. gearmand_perror(local_errno, "_wakeup_event:read");
  509. Gearmand()->ret= GEARMAND_ERRNO;
  510. return;
  511. }
  512. for (ssize_t x= 0; x < ret; x++)
  513. {
  514. switch ((gearmand_wakeup_t)buffer[x])
  515. {
  516. case GEARMAND_WAKEUP_PAUSE:
  517. gearmand_debug("Received PAUSE wakeup event");
  518. break;
  519. case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
  520. gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
  521. if (gearman_server_shutdown_graceful(&(Gearmand()->server)) == GEARMAND_SHUTDOWN)
  522. {
  523. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  524. }
  525. break;
  526. case GEARMAND_WAKEUP_SHUTDOWN:
  527. gearmand_debug("Received SHUTDOWN wakeup event");
  528. _clear_events(thread);
  529. break;
  530. case GEARMAND_WAKEUP_CON:
  531. gearmand_debug("Received CON wakeup event");
  532. gearmand_con_check_queue(thread);
  533. break;
  534. case GEARMAND_WAKEUP_RUN:
  535. gearmand_debug("Received RUN wakeup event");
  536. gearmand_thread_run(thread);
  537. break;
  538. default:
  539. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
  540. _clear_events(thread);
  541. Gearmand()->ret= GEARMAND_UNKNOWN_STATE;
  542. break;
  543. }
  544. }
  545. }
  546. }
  547. #pragma GCC diagnostic pop
  548. static void _clear_events(gearmand_thread_st *thread)
  549. {
  550. _wakeup_clear(thread);
  551. while (thread->dcon_list)
  552. {
  553. gearmand_con_free(thread->dcon_list);
  554. }
  555. }
  556. #pragma GCC diagnostic pop