connection.cc 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Server connection definitions
  41. */
  42. #include "gear_config.h"
  43. #include "libgearman-server/common.h"
  44. #include <string.h>
  45. #include <errno.h>
  46. #include <cassert>
  47. #include <algorithm>
  48. static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
  49. gearmand_error_t& ret);
  50. /*
  51. * Public definitions
  52. */
  53. gearman_server_con_st *gearman_server_con_add(gearman_server_thread_st *thread, gearmand_con_st *dcon, gearmand_error_t& ret)
  54. {
  55. gearman_server_con_st *con= _server_con_create(thread, dcon, ret);
  56. if (con)
  57. {
  58. if ((ret= gearman_io_set_fd(&(con->con), dcon->fd)) != GEARMAND_SUCCESS)
  59. {
  60. gearman_server_con_free(con);
  61. return NULL;
  62. }
  63. ret= gearmand_io_set_events(con, POLLIN);
  64. if (ret != GEARMAND_SUCCESS)
  65. {
  66. gearmand_gerror("gearmand_io_set_events", ret);
  67. gearman_server_con_free(con);
  68. return NULL;
  69. }
  70. }
  71. return con;
  72. }
  73. static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread,
  74. gearmand_con_st *dcon,
  75. gearmand_error_t& ret)
  76. {
  77. gearman_server_con_st *con;
  78. if (thread->free_con_count > 0)
  79. {
  80. con= thread->free_con_list;
  81. GEARMAND_LIST__DEL(thread->free_con, con);
  82. }
  83. else
  84. {
  85. con= new (std::nothrow) gearman_server_con_st;
  86. if (con == NULL)
  87. {
  88. ret= gearmand_perror(errno, "new() build_gearman_server_con_st");
  89. return NULL;
  90. }
  91. }
  92. assert(con);
  93. if (con == NULL)
  94. {
  95. gearmand_error("Neigther an allocated gearman_server_con_st() or free listed could be found");
  96. ret= GEARMAND_MEMORY_ALLOCATION_FAILURE;
  97. return NULL;
  98. }
  99. gearmand_connection_options_t options[]= { GEARMAND_CON_MAX };
  100. gearmand_connection_init(thread->gearman, &(con->con), dcon, options);
  101. con->con.root= con;
  102. con->is_sleeping= false;
  103. con->is_exceptions= Gearmand()->_exceptions;
  104. con->is_dead= false;
  105. con->is_cleaned_up = false;
  106. con->is_noop_sent= false;
  107. con->ret= GEARMAND_SUCCESS;
  108. con->io_list= false;
  109. con->proc_list= false;
  110. con->to_be_freed_list= false;
  111. con->proc_removed= false;
  112. con->io_packet_count= 0;
  113. con->proc_packet_count= 0;
  114. con->worker_count= 0;
  115. con->client_count= 0;
  116. con->thread= thread;
  117. con->packet= NULL;
  118. con->io_packet_list= NULL;
  119. con->io_packet_end= NULL;
  120. con->proc_packet_list= NULL;
  121. con->proc_packet_end= NULL;
  122. con->io_next= NULL;
  123. con->io_prev= NULL;
  124. con->proc_next= NULL;
  125. con->proc_prev= NULL;
  126. con->to_be_freed_next= NULL;
  127. con->to_be_freed_prev= NULL;
  128. con->worker_list= NULL;
  129. con->client_list= NULL;
  130. con->_host= dcon->host;
  131. con->_port= dcon->port;
  132. strcpy(con->id, "-");
  133. con->timeout_event= NULL;
  134. con->protocol= NULL;
  135. con->_ssl= NULL;
  136. int error;
  137. if ((error= pthread_mutex_lock(&thread->lock)) == 0)
  138. {
  139. GEARMAND_LIST__ADD(thread->con, con);
  140. if ((error= pthread_mutex_unlock(&thread->lock)))
  141. {
  142. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
  143. gearman_server_con_free(con);
  144. ret= GEARMAND_ERRNO;
  145. return NULL;
  146. }
  147. }
  148. else
  149. {
  150. assert(error);
  151. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
  152. gearman_server_con_free(con);
  153. ret= GEARMAND_ERRNO;
  154. return NULL;
  155. }
  156. return con;
  157. }
  158. void gearman_server_con_attempt_free(gearman_server_con_st *con)
  159. {
  160. con->_host= NULL;
  161. con->_port= NULL;
  162. if (Server->flags.threaded)
  163. {
  164. if (!(con->proc_removed) and !(Server->proc_shutdown))
  165. {
  166. gearman_server_con_delete_timeout(con);
  167. con->is_dead= true;
  168. con->is_sleeping= false;
  169. con->is_exceptions= Gearmand()->_exceptions;
  170. con->is_noop_sent= false;
  171. gearman_server_con_proc_add(con);
  172. }
  173. }
  174. else
  175. {
  176. gearman_server_con_free(con);
  177. }
  178. }
  179. void gearman_server_con_free(gearman_server_con_st *con)
  180. {
  181. gearman_server_thread_st *thread= con->thread;
  182. con->_host= NULL;
  183. con->_port= NULL;
  184. // Correct location?
  185. #if defined(HAVE_SSL) && HAVE_SSL
  186. if (con->_ssl)
  187. {
  188. SSL_shutdown(con->_ssl);
  189. SSL_free(con->_ssl);
  190. con->_ssl= NULL;
  191. }
  192. #endif // defined(HAVE_SSL)
  193. gearman_server_con_delete_timeout(con);
  194. if (con->is_cleaned_up)
  195. {
  196. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "con %p is already cleaned-up. returning", con);
  197. return;
  198. }
  199. gearmand_io_free(&(con->con));
  200. con->protocol_release();
  201. if (con->packet != NULL)
  202. {
  203. if (&(con->packet->packet) != con->con.recv_packet)
  204. {
  205. gearmand_packet_free(&(con->packet->packet));
  206. }
  207. gearman_server_packet_free(con->packet, con->thread, true);
  208. }
  209. while (con->io_packet_list != NULL)
  210. {
  211. gearman_server_io_packet_remove(con);
  212. }
  213. while (con->proc_packet_list != NULL)
  214. {
  215. gearman_server_packet_st* packet= gearman_server_proc_packet_remove(con);
  216. gearmand_packet_free(&(packet->packet));
  217. gearman_server_packet_free(packet, con->thread, true);
  218. }
  219. gearman_server_con_free_workers(con);
  220. while (con->client_list != NULL)
  221. {
  222. gearman_server_client_free(con->client_list);
  223. }
  224. if (con->timeout_event != NULL)
  225. {
  226. if (event_del(con->timeout_event) == -1)
  227. {
  228. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, errno, "calling event_del() on timeout_event failed");
  229. }
  230. free(con->timeout_event);
  231. con->timeout_event= NULL;
  232. }
  233. if (con->proc_list)
  234. {
  235. gearman_server_con_proc_remove(con);
  236. }
  237. if (con->io_list)
  238. {
  239. gearman_server_con_io_remove(con);
  240. }
  241. int lock_error;
  242. if ((lock_error= pthread_mutex_lock(&thread->lock)) == 0)
  243. {
  244. GEARMAND_LIST__DEL(con->thread->con, con);
  245. if ((lock_error= pthread_mutex_unlock(&thread->lock)))
  246. {
  247. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
  248. }
  249. }
  250. else
  251. {
  252. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
  253. }
  254. assert(lock_error == 0);
  255. if (thread->free_con_count < GEARMAND_MAX_FREE_SERVER_CON)
  256. {
  257. GEARMAND_LIST__ADD(thread->free_con, con);
  258. con->is_cleaned_up = true;
  259. return;
  260. }
  261. delete con;
  262. }
  263. gearmand_io_st *gearman_server_con_con(gearman_server_con_st *con)
  264. {
  265. assert(con);
  266. return &con->con;
  267. }
  268. gearmand_con_st *gearman_server_con_data(gearman_server_con_st *con)
  269. {
  270. assert(con);
  271. return gearman_io_context(&(con->con));
  272. }
  273. const char *gearman_server_con_id(gearman_server_con_st *con)
  274. {
  275. assert(con);
  276. return con->id;
  277. }
  278. void gearman_server_con_set_id(gearman_server_con_st *con,
  279. const char *id,
  280. const size_t size)
  281. {
  282. size_t min_size= std::min(size, size_t(GEARMAND_SERVER_CON_ID_SIZE -1));
  283. memcpy(con->id, id, min_size);
  284. con->id[min_size]= 0;
  285. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  286. "identifier set to %.*s",
  287. (uint32_t)min_size, con->id);
  288. }
  289. void gearman_server_con_free_worker(gearman_server_con_st *con,
  290. char *function_name,
  291. size_t function_name_size)
  292. {
  293. gearman_server_worker_st *worker= con->worker_list;
  294. gearman_server_worker_st *prev_worker= NULL;
  295. while (worker != NULL)
  296. {
  297. if (worker->function->function_name_size == function_name_size &&
  298. !memcmp(worker->function->function_name, function_name,
  299. function_name_size))
  300. {
  301. gearman_server_worker_free(worker);
  302. /* Set worker to the last kept worker, or the beginning of the list. */
  303. if (prev_worker == NULL)
  304. {
  305. worker= con->worker_list;
  306. }
  307. else
  308. {
  309. worker= prev_worker;
  310. }
  311. }
  312. else
  313. {
  314. /* Save this so we don't need to scan the list again if one is removed. */
  315. prev_worker= worker;
  316. worker= worker->con_next;
  317. }
  318. }
  319. }
  320. void gearman_server_con_free_workers(gearman_server_con_st *con)
  321. {
  322. while (con->worker_list != NULL)
  323. {
  324. gearman_server_worker_free(con->worker_list);
  325. }
  326. }
  327. void gearman_server_con_to_be_freed_add(gearman_server_con_st *con)
  328. {
  329. int lock_error;
  330. if ((lock_error= pthread_mutex_lock(&con->thread->lock)) == 0)
  331. {
  332. if (con->to_be_freed_list)
  333. {
  334. if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
  335. {
  336. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
  337. }
  338. assert(lock_error == 0);
  339. return;
  340. }
  341. }
  342. else
  343. {
  344. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_lock");
  345. }
  346. assert(lock_error == 0);
  347. GEARMAND_LIST_ADD(con->thread->to_be_freed, con, to_be_freed_);
  348. con->to_be_freed_list = true;
  349. /* Looks funny, but need to check to_be_freed_count locked, but call run unlocked. */
  350. if (con->thread->to_be_freed_count == 1 && con->thread->run_fn)
  351. {
  352. if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
  353. {
  354. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
  355. }
  356. assert(lock_error == 0);
  357. (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
  358. }
  359. else
  360. {
  361. if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
  362. {
  363. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
  364. }
  365. assert(lock_error == 0);
  366. }
  367. }
  368. gearman_server_con_st * gearman_server_con_to_be_freed_next(gearman_server_thread_st *thread)
  369. {
  370. gearman_server_con_st *con;
  371. if (thread->to_be_freed_list == NULL)
  372. {
  373. return NULL;
  374. }
  375. int lock_error;
  376. if ((lock_error= pthread_mutex_lock(&thread->lock)) == 0)
  377. {
  378. con= thread->to_be_freed_list;
  379. while (con != NULL)
  380. {
  381. GEARMAND_LIST_DEL(thread->to_be_freed, con, to_be_freed_);
  382. if (con->to_be_freed_list)
  383. {
  384. con->to_be_freed_list= false;
  385. break;
  386. }
  387. con= thread->to_be_freed_list;
  388. }
  389. if ((lock_error= pthread_mutex_unlock(&thread->lock)))
  390. {
  391. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
  392. }
  393. return con;
  394. }
  395. else
  396. {
  397. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_lock");
  398. }
  399. assert(lock_error == 0);
  400. return NULL;
  401. }
  402. void gearman_server_con_io_add(gearman_server_con_st *con)
  403. {
  404. if (con->io_list)
  405. {
  406. return;
  407. }
  408. int lock_error;
  409. if ((lock_error= pthread_mutex_lock(&con->thread->lock)) == 0)
  410. {
  411. GEARMAND_LIST_ADD(con->thread->io, con, io_);
  412. con->io_list= true;
  413. /* Looks funny, but need to check io_count locked, but call run unlocked. */
  414. if (con->thread->io_count == 1 && con->thread->run_fn)
  415. {
  416. if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
  417. {
  418. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
  419. }
  420. (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
  421. }
  422. else
  423. {
  424. if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
  425. {
  426. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
  427. }
  428. }
  429. }
  430. else
  431. {
  432. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "pthread_mutex_lock(%d), programming error, please report", lock_error);
  433. }
  434. assert(lock_error == 0);
  435. }
  436. void gearman_server_con_io_remove(gearman_server_con_st *con)
  437. {
  438. int lock_error;
  439. if ((lock_error= pthread_mutex_lock(&con->thread->lock)) == 0)
  440. {
  441. if (con->io_list)
  442. {
  443. GEARMAND_LIST_DEL(con->thread->io, con, io_);
  444. con->io_list= false;
  445. }
  446. if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
  447. {
  448. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
  449. }
  450. }
  451. else
  452. {
  453. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_lock");
  454. }
  455. assert(lock_error == 0);
  456. }
  457. gearman_server_con_st *
  458. gearman_server_con_io_next(gearman_server_thread_st *thread)
  459. {
  460. gearman_server_con_st *con= thread->io_list;
  461. if (con)
  462. {
  463. gearman_server_con_io_remove(con);
  464. }
  465. return con;
  466. }
  467. void gearman_server_con_proc_add(gearman_server_con_st *con)
  468. {
  469. if (con->proc_list)
  470. {
  471. return;
  472. }
  473. int pthread_error;
  474. if ((pthread_error= pthread_mutex_lock(&con->thread->lock)) == 0)
  475. {
  476. GEARMAND_LIST_ADD(con->thread->proc, con, proc_);
  477. con->proc_list= true;
  478. if ((pthread_error= pthread_mutex_unlock(&con->thread->lock)))
  479. {
  480. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
  481. }
  482. if (! (Server->proc_shutdown) && !(Server->proc_wakeup))
  483. {
  484. if ((pthread_error= pthread_mutex_lock(&(Server->proc_lock))) == 0)
  485. {
  486. Server->proc_wakeup= true;
  487. if ((pthread_error= pthread_cond_signal(&(Server->proc_cond))) == 0)
  488. {
  489. if ((pthread_error= pthread_mutex_unlock(&(Server->proc_lock))))
  490. {
  491. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
  492. }
  493. }
  494. else
  495. {
  496. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_cond_signal");
  497. }
  498. }
  499. else
  500. {
  501. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
  502. }
  503. }
  504. }
  505. else
  506. {
  507. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
  508. }
  509. }
  510. void gearman_server_con_proc_remove(gearman_server_con_st *con)
  511. {
  512. int pthread_error;
  513. if ((pthread_error= pthread_mutex_lock(&con->thread->lock)) == 0)
  514. {
  515. if (con->proc_list)
  516. {
  517. GEARMAND_LIST_DEL(con->thread->proc, con, proc_);
  518. con->proc_list= false;
  519. }
  520. if ((pthread_error= pthread_mutex_unlock(&con->thread->lock)))
  521. {
  522. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
  523. }
  524. }
  525. else
  526. {
  527. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
  528. }
  529. }
  530. gearman_server_con_st *
  531. gearman_server_con_proc_next(gearman_server_thread_st *thread)
  532. {
  533. if (thread->proc_list == NULL)
  534. {
  535. return NULL;
  536. }
  537. gearman_server_con_st *con= NULL;
  538. int pthread_error;
  539. if ((pthread_error= pthread_mutex_lock(&thread->lock)) == 0)
  540. {
  541. con= thread->proc_list;
  542. while (con != NULL)
  543. {
  544. GEARMAND_LIST_DEL(thread->proc, con, proc_);
  545. con->proc_list= false;
  546. if (!(con->proc_removed))
  547. {
  548. break;
  549. }
  550. con= thread->proc_list;
  551. }
  552. if ((pthread_error= pthread_mutex_unlock(&thread->lock)))
  553. {
  554. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
  555. }
  556. }
  557. else
  558. {
  559. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
  560. }
  561. return con;
  562. }
  563. static void _server_job_timeout(int fd, short event, void *arg)
  564. {
  565. (void)fd;
  566. (void)event;
  567. gearman_server_job_st *job= (gearman_server_job_st *)arg;
  568. /* A timeout has ocurred on a job, re-queue it */
  569. gearmand_log_warning(GEARMAN_DEFAULT_LOG_PARAM,
  570. "Worker timeout reached on job, requeueing: %s %s",
  571. job->job_handle, job->unique);
  572. gearmand_error_t ret= gearman_server_job_queue(job);
  573. if (ret != GEARMAND_SUCCESS)
  574. {
  575. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  576. "Failed trying to requeue job after timeout, job lost: %s %s",
  577. job->job_handle, job->unique);
  578. gearman_server_job_free(job);
  579. }
  580. }
  581. gearmand_error_t gearman_server_con_add_job_timeout(gearman_server_con_st *con, gearman_server_job_st *job)
  582. {
  583. if (job)
  584. {
  585. gearman_server_worker_st *worker;
  586. for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
  587. {
  588. /* Assumes the functions are always fetched from the same server structure */
  589. if (worker->function == job->function)
  590. {
  591. break;
  592. }
  593. }
  594. /* It makes no sense to add a timeout to a connection that has no workers for a job */
  595. assert(worker);
  596. if (worker)
  597. {
  598. // We treat 0 and -1 as being the same (i.e. no timer)
  599. if (worker->timeout > 0)
  600. {
  601. if (worker->timeout < 1000)
  602. {
  603. worker->timeout= 1000;
  604. }
  605. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Adding timeout on %s for %s (%ld)",
  606. job->function->function_name,
  607. job->job_handle,
  608. worker->timeout);
  609. if (con->timeout_event == NULL)
  610. {
  611. gearmand_con_st *dcon= con->con.context;
  612. con->timeout_event= (struct event *)malloc(sizeof(struct event)); // libevent POD
  613. if (con->timeout_event == NULL)
  614. {
  615. return gearmand_merror("malloc(sizeof(struct event)", struct event, 1);
  616. }
  617. timeout_set(con->timeout_event, _server_job_timeout, job);
  618. if (event_base_set(dcon->thread->base, con->timeout_event) == -1)
  619. {
  620. gearmand_perror(errno, "event_base_set");
  621. }
  622. }
  623. /* XXX Right now, if a worker has diff timeouts for functions I think
  624. this will overwrite any existing timeouts on that event. One
  625. solution to that would be to record the timeout from last time,
  626. and only set this one if it is longer than that one. */
  627. struct timeval timeout_tv = { 0 , 0 };
  628. time_t milliseconds= worker->timeout;
  629. timeout_tv.tv_sec= milliseconds / 1000;
  630. timeout_tv.tv_usec= (suseconds_t)((milliseconds % 1000) * 1000);
  631. timeout_add(con->timeout_event, &timeout_tv);
  632. }
  633. else if (con->timeout_event) // Delete the timer if it exists
  634. {
  635. gearman_server_con_delete_timeout(con);
  636. }
  637. }
  638. }
  639. return GEARMAND_SUCCESS;
  640. }
  641. void gearman_server_con_delete_timeout(gearman_server_con_st *con)
  642. {
  643. if (con->timeout_event)
  644. {
  645. timeout_del(con->timeout_event);
  646. free(con->timeout_event);
  647. con->timeout_event= NULL;
  648. }
  649. }
  650. gearman_server_con_st *gearmand_ready(gearmand_connection_list_st *universal)
  651. {
  652. if (universal->ready_con_list)
  653. {
  654. gearmand_io_st *con= universal->ready_con_list;
  655. con->options.ready= false;
  656. GEARMAND_LIST_DEL(universal->ready_con, con, ready_);
  657. return con->root;
  658. }
  659. return NULL;
  660. }