gearmand_con.cc 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Gearmand Connection Definitions
  41. */
  42. #include "gear_config.h"
  43. #include "libgearman-server/common.h"
  44. #include <libgearman-server/gearmand.h>
  45. #include <libgearman-server/queue.h>
  46. #include <cstring>
  47. #include <cerrno>
  48. #include <cassert>
  49. /*
  50. * Private declarations
  51. */
  52. void gearmand_con_st::close_socket()
  53. {
  54. gearmand_sockfd_close(fd);
  55. }
  56. /**
  57. * @addtogroup gearmand_con_private Private Gearmand Connection Functions
  58. * @ingroup gearmand_con
  59. * @{
  60. */
  61. static gearmand_error_t _con_add(gearmand_thread_st *thread,
  62. gearmand_con_st *dcon)
  63. {
  64. gearmand_error_t ret= GEARMAND_SUCCESS;
  65. dcon->server_con= gearman_server_con_add(&(thread->server_thread), dcon, ret);
  66. assert(dcon->server_con || ret != GEARMAND_SUCCESS);
  67. assert(! dcon->server_con || ret == GEARMAND_SUCCESS);
  68. if (dcon->server_con == NULL)
  69. {
  70. dcon->close_socket();
  71. return ret;
  72. }
  73. if (dcon->port_st())
  74. {
  75. ret= dcon->add_fn(dcon->server_con);
  76. if (gearmand_failed(ret))
  77. {
  78. gearman_server_con_free(dcon->server_con);
  79. dcon->close_socket();
  80. return ret;
  81. }
  82. }
  83. GEARMAND_LIST__ADD(thread->dcon, dcon);
  84. return GEARMAND_SUCCESS;
  85. }
  86. gearmand_error_t gearmand_con_st::add_fn(gearman_server_con_st* con_st_)
  87. {
  88. assert(_port_st);
  89. assert(con_st_);
  90. assert(con_st_ == server_con);
  91. return _port_st->add_fn(con_st_);
  92. }
  93. gearmand_error_t gearmand_con_st::remove_fn(gearman_server_con_st* con_st_)
  94. {
  95. assert(_port_st);
  96. assert(con_st_);
  97. return _port_st->remove_fn(con_st_);
  98. }
  99. void _con_ready(int, short events, void *arg)
  100. {
  101. gearmand_con_st *dcon= (gearmand_con_st *)(arg);
  102. short revents= 0;
  103. if (events & EV_READ)
  104. {
  105. revents|= POLLIN;
  106. }
  107. if (events & EV_WRITE)
  108. {
  109. revents|= POLLOUT;
  110. }
  111. gearmand_error_t ret= gearmand_io_set_revents(dcon->server_con, revents);
  112. if (gearmand_failed(ret))
  113. {
  114. gearmand_gerror("gearmand_io_set_revents", ret);
  115. gearmand_con_free(dcon);
  116. return;
  117. }
  118. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  119. "%s:%s Ready %6s %s",
  120. dcon->host, dcon->port,
  121. revents & POLLIN ? "POLLIN" : "",
  122. revents & POLLOUT ? "POLLOUT" : "");
  123. gearmand_thread_run(dcon->thread);
  124. }
  125. /** @} */
  126. /*
  127. * Public definitions
  128. */
  129. /**
  130. * Generate hash key for job handles and unique IDs.
  131. */
  132. uint32_t _server_job_hash(const char *key, size_t key_size)
  133. {
  134. const char *ptr= key;
  135. int32_t value= 0;
  136. while (key_size--)
  137. {
  138. value += (int32_t)*ptr++;
  139. value += (value << 10);
  140. value ^= (value >> 6);
  141. }
  142. value += (value << 3);
  143. value ^= (value >> 11);
  144. value += (value << 15);
  145. return (uint32_t)(value == 0 ? 1 : value);
  146. }
  147. void _server_con_worker_list_append(gearman_server_worker_st *list,
  148. gearman_server_worker_st *worker)
  149. {
  150. worker->con_prev= NULL;
  151. worker->con_next= list;
  152. while (worker->con_next != NULL)
  153. {
  154. worker->con_prev= worker->con_next;
  155. worker->con_next= worker->con_next->con_next;
  156. }
  157. if (worker->con_prev)
  158. {
  159. worker->con_prev->con_next= worker;
  160. }
  161. }
  162. void destroy_gearman_server_job_st(gearman_server_job_st* arg)
  163. {
  164. gearmand_debug("delete gearman_server_job_st");
  165. delete arg;
  166. }
  167. gearman_server_job_st *gearman_server_job_get_by_unique(gearman_server_st *server,
  168. const char *unique,
  169. const size_t unique_length,
  170. gearman_server_con_st *worker_con)
  171. {
  172. uint32_t key= _server_job_hash(unique, unique_length);
  173. gearman_server_job_st *server_job;
  174. for (server_job= server->unique_hash[key % server->hashtable_buckets];
  175. server_job != NULL; server_job= server_job->unique_next)
  176. {
  177. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "COMPARE unique \"%s\"(%u) == \"%s\"(%u)",
  178. bool(server_job->unique[0]) ? server_job->unique : "<null>", uint32_t(strlen(server_job->unique)),
  179. unique, uint32_t(unique_length));
  180. if (bool(server_job->unique[0]) and
  181. (strcmp(server_job->unique, unique) == 0))
  182. {
  183. /* Check to make sure the worker asking for the job still owns the job. */
  184. if (worker_con != NULL and
  185. (server_job->worker == NULL or server_job->worker->con != worker_con))
  186. {
  187. return NULL;
  188. }
  189. return server_job;
  190. }
  191. }
  192. return NULL;
  193. }
  194. gearman_server_job_st *gearman_server_job_get(gearman_server_st *server,
  195. const char *job_handle,
  196. const size_t job_handle_length,
  197. gearman_server_con_st *worker_con)
  198. {
  199. uint32_t key= _server_job_hash(job_handle, job_handle_length);
  200. for (gearman_server_job_st *server_job= server->job_hash[key % server->hashtable_buckets];
  201. server_job != NULL; server_job= server_job->next)
  202. {
  203. if (server_job->job_handle_key == key and
  204. strncmp(server_job->job_handle, job_handle, GEARMAND_JOB_HANDLE_SIZE) == 0)
  205. {
  206. /* Check to make sure the worker asking for the job still owns the job. */
  207. if (worker_con != NULL and
  208. (server_job->worker == NULL or server_job->worker->con != worker_con))
  209. {
  210. return NULL;
  211. }
  212. return server_job;
  213. }
  214. }
  215. return NULL;
  216. }
  217. gearmand_error_t gearman_server_job_cancel(gearman_server_st& server,
  218. const char *job_handle,
  219. const size_t job_handle_length)
  220. {
  221. gearmand_error_t ret= GEARMAND_NO_JOBS;
  222. uint32_t key= _server_job_hash(job_handle, job_handle_length);
  223. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "cancel: %.*s", int(job_handle_length), job_handle);
  224. for (gearman_server_job_st *server_job= server.job_hash[key % server.hashtable_buckets];
  225. server_job != NULL;
  226. server_job= server_job->next)
  227. {
  228. if (server_job->job_handle_key == key and
  229. strncmp(server_job->job_handle, job_handle, GEARMAND_JOB_HANDLE_SIZE) == 0)
  230. {
  231. /* Queue the fail packet for all clients. */
  232. for (gearman_server_client_st* client= server_job->client_list; client != NULL; client= client->job_next)
  233. {
  234. ret= gearman_server_io_packet_add(client->con, false,
  235. GEARMAN_MAGIC_RESPONSE,
  236. GEARMAN_COMMAND_WORK_FAIL,
  237. server_job->job_handle,
  238. (size_t)strlen(server_job->job_handle),
  239. NULL);
  240. if (gearmand_failed(ret))
  241. {
  242. gearmand_log_gerror_warn(GEARMAN_DEFAULT_LOG_PARAM, ret, "Failed to send WORK_FAIL packet to %s:%s", client->con->host(), client->con->port());
  243. }
  244. }
  245. /* Remove from persistent queue if one exists. */
  246. if (server_job->job_queued)
  247. {
  248. ret= gearman_queue_done(Server,
  249. server_job->unique,
  250. server_job->unique_length,
  251. server_job->function->function_name,
  252. server_job->function->function_name_size);
  253. if (gearmand_failed(ret))
  254. {
  255. return gearmand_gerror("Remove from persistent queue", ret);
  256. }
  257. }
  258. server_job->ignore_job= true;
  259. server_job->job_queued= false;
  260. return GEARMAND_SUCCESS;
  261. }
  262. }
  263. return ret;
  264. }
  265. gearman_server_job_st * gearman_server_job_peek(gearman_server_con_st *server_con)
  266. {
  267. for (gearman_server_worker_st *server_worker= server_con->worker_list;
  268. server_worker != NULL;
  269. server_worker= server_worker->con_next)
  270. {
  271. if (server_worker->function->job_count != 0)
  272. {
  273. for (gearman_job_priority_t priority= GEARMAN_JOB_PRIORITY_HIGH;
  274. priority != GEARMAN_JOB_PRIORITY_MAX;
  275. priority= gearman_job_priority_t(int(priority) +1))
  276. {
  277. gearman_server_job_st *server_job;
  278. server_job= server_worker->function->job_list[priority];
  279. int64_t current_time= (int64_t)time(NULL);
  280. while(server_job &&
  281. server_job->when != 0 &&
  282. server_job->when > current_time)
  283. {
  284. server_job= server_job->function_next;
  285. }
  286. if (server_job != NULL)
  287. {
  288. if (server_job->ignore_job)
  289. {
  290. /* This is only happens when a client disconnects from a foreground
  291. job. We do this because we don't want to run the job anymore. */
  292. server_job->ignore_job= false;
  293. gearman_server_job_free(gearman_server_job_take(server_con));
  294. return gearman_server_job_peek(server_con);
  295. }
  296. return server_job;
  297. }
  298. }
  299. }
  300. }
  301. return NULL;
  302. }
  303. gearman_server_job_st *gearman_server_job_take(gearman_server_con_st *server_con)
  304. {
  305. for (gearman_server_worker_st *server_worker= server_con->worker_list; server_worker; server_worker= server_worker->con_next)
  306. {
  307. if (server_worker->function and server_worker->function->job_count)
  308. {
  309. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Jobs available for %.*s: %lu",
  310. (int)server_worker->function->function_name_size, server_worker->function->function_name,
  311. (unsigned long)(server_worker->function->job_count));
  312. if (Server->flags.round_robin)
  313. {
  314. GEARMAND_LIST_DEL(server_con->worker, server_worker, con_)
  315. _server_con_worker_list_append(server_con->worker_list, server_worker);
  316. ++server_con->worker_count;
  317. if (server_con->worker_list == NULL)
  318. {
  319. server_con->worker_list= server_worker;
  320. }
  321. }
  322. gearman_job_priority_t priority;
  323. for (priority= GEARMAN_JOB_PRIORITY_HIGH; priority < GEARMAN_JOB_PRIORITY_LOW;
  324. priority= gearman_job_priority_t(int(priority) +1))
  325. {
  326. if (server_worker->function->job_list[priority])
  327. {
  328. break;
  329. }
  330. }
  331. gearman_server_job_st *server_job= server_worker->function->job_list[priority];
  332. gearman_server_job_st *previous_job= server_job;
  333. int64_t current_time= (int64_t)time(NULL);
  334. while (server_job and server_job->when != 0 and server_job->when > current_time)
  335. {
  336. previous_job= server_job;
  337. server_job= server_job->function_next;
  338. }
  339. if (server_job)
  340. {
  341. if (server_job->function->job_list[priority] == server_job)
  342. {
  343. // If it's the head of the list, advance it
  344. server_job->function->job_list[priority]= server_job->function_next;
  345. }
  346. else
  347. {
  348. // Otherwise, just remove the item from the list
  349. previous_job->function_next= server_job->function_next;
  350. }
  351. // If it's the tail of the list, move the tail back
  352. if (server_job->function->job_end[priority] == server_job)
  353. {
  354. server_job->function->job_end[priority]= previous_job;
  355. }
  356. server_job->function->job_count--;
  357. server_job->worker= server_worker;
  358. GEARMAND_LIST_ADD(server_worker->job, server_job, worker_);
  359. server_job->function->job_running++;
  360. if (server_job->ignore_job)
  361. {
  362. gearman_server_job_free(server_job);
  363. return gearman_server_job_take(server_con);
  364. }
  365. return server_job;
  366. }
  367. }
  368. }
  369. return NULL;
  370. }
  371. void *_proc(void *data)
  372. {
  373. gearman_server_st *server= (gearman_server_st *)data;
  374. (void)gearmand_initialize_thread_logging("[ proc ]");
  375. while (1)
  376. {
  377. int pthread_error;
  378. if ((pthread_error= pthread_mutex_lock(&(server->proc_lock))))
  379. {
  380. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
  381. return NULL;
  382. }
  383. while (server->proc_wakeup == false)
  384. {
  385. if (server->proc_shutdown)
  386. {
  387. if ((pthread_error= pthread_mutex_unlock(&(server->proc_lock))))
  388. {
  389. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
  390. }
  391. return NULL;
  392. }
  393. (void) pthread_cond_wait(&(server->proc_cond), &(server->proc_lock));
  394. }
  395. server->proc_wakeup= false;
  396. {
  397. if ((pthread_error= pthread_mutex_unlock(&(server->proc_lock))))
  398. {
  399. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
  400. }
  401. }
  402. for (gearman_server_thread_st *thread= server->thread_list; thread != NULL; thread= thread->next)
  403. {
  404. gearman_server_con_st *con;
  405. while ((con= gearman_server_con_proc_next(thread)) != NULL)
  406. {
  407. bool packet_sent = false;
  408. while (1)
  409. {
  410. gearman_server_packet_st *packet= gearman_server_proc_packet_remove(con);
  411. if (packet == NULL)
  412. {
  413. break;
  414. }
  415. con->ret= gearman_server_run_command(con, &(packet->packet));
  416. packet_sent = true;
  417. gearmand_packet_free(&(packet->packet));
  418. gearman_server_packet_free(packet, con->thread, false);
  419. }
  420. // if a packet was sent in above block, and connection is dead,
  421. // queue up into io thread so it comes back to the PROC queue for
  422. // marking proc_removed. this prevents leaking any connection objects
  423. if (packet_sent)
  424. {
  425. if (con->is_dead)
  426. {
  427. gearman_server_con_io_add(con);
  428. }
  429. }
  430. else if (con->is_dead)
  431. {
  432. gearman_server_con_free_workers(con);
  433. while (con->client_list != NULL)
  434. gearman_server_client_free(con->client_list);
  435. con->proc_removed= true;
  436. gearman_server_con_to_be_freed_add(con);
  437. }
  438. }
  439. }
  440. }
  441. }
  442. gearman_server_job_st * gearman_server_job_create(gearman_server_st *server)
  443. {
  444. gearman_server_job_st *server_job;
  445. if (server->free_job_count > 0)
  446. {
  447. server_job= server->free_job_list;
  448. GEARMAND_LIST__DEL(server->free_job, server_job);
  449. }
  450. else
  451. {
  452. server_job= new (std::nothrow) gearman_server_job_st;
  453. if (server_job == NULL)
  454. {
  455. return NULL;
  456. }
  457. }
  458. server_job->ignore_job= false;
  459. server_job->job_queued= false;
  460. server_job->retries= 0;
  461. server_job->priority= GEARMAN_JOB_PRIORITY_NORMAL;
  462. server_job->job_handle_key= 0;
  463. server_job->unique_key= 0;
  464. server_job->client_count= 0;
  465. server_job->numerator= 0;
  466. server_job->denominator= 0;
  467. server_job->data_size= 0;
  468. server_job->next= NULL;
  469. server_job->prev= NULL;
  470. server_job->unique_next= NULL;
  471. server_job->unique_prev= NULL;
  472. server_job->worker_next= NULL;
  473. server_job->worker_prev= NULL;
  474. server_job->function= NULL;
  475. server_job->function_next= NULL;
  476. server_job->data= NULL;
  477. server_job->client_list= NULL;
  478. server_job->worker= NULL;
  479. server_job->job_handle[0]= 0;
  480. server_job->unique[0]= 0;
  481. server_job->unique_length= 0;
  482. return server_job;
  483. }
  484. gearmand_error_t gearmand_con_create(gearmand_st *gearmand, int& fd,
  485. const char *host, const char *port,
  486. struct gearmand_port_st* port_st_)
  487. {
  488. gearmand_con_st *dcon;
  489. if (gearmand->free_dcon_count > 0)
  490. {
  491. dcon= gearmand->free_dcon_list;
  492. GEARMAND_LIST__DEL(gearmand->free_dcon, dcon);
  493. }
  494. else
  495. {
  496. dcon= new (std::nothrow) gearmand_con_st;
  497. if (dcon == NULL)
  498. {
  499. gearmand_perror(errno, "new build_gearmand_con_st");
  500. gearmand_sockfd_close(fd);
  501. return GEARMAND_MEMORY_ALLOCATION_FAILURE;
  502. }
  503. memset(&dcon->event, 0, sizeof(struct event));
  504. }
  505. dcon->last_events= 0;
  506. dcon->fd= fd;
  507. dcon->next= NULL;
  508. dcon->prev= NULL;
  509. dcon->server_con= NULL;
  510. strncpy(dcon->host, host, NI_MAXHOST);
  511. dcon->host[NI_MAXHOST -1]= 0;
  512. strncpy(dcon->port, port, NI_MAXSERV);
  513. dcon->port[NI_MAXSERV -1]= 0;
  514. dcon->_port_st= port_st_;
  515. /* If we are not threaded, just add the connection now. */
  516. if (gearmand->threads == 0)
  517. {
  518. dcon->thread= gearmand->thread_list;
  519. return _con_add(gearmand->thread_list, dcon);
  520. }
  521. /* We do a simple round-robin connection queue algorithm here. */
  522. if (gearmand->thread_add_next == NULL)
  523. {
  524. gearmand->thread_add_next= gearmand->thread_list;
  525. }
  526. dcon->thread= gearmand->thread_add_next;
  527. /* We don't need to lock if the list is empty. */
  528. if (dcon->thread->dcon_add_count == 0 &&
  529. dcon->thread->free_dcon_count < gearmand->max_thread_free_dcon_count)
  530. {
  531. GEARMAND_LIST__ADD(dcon->thread->dcon_add, dcon);
  532. gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
  533. }
  534. else
  535. {
  536. uint32_t free_dcon_count;
  537. gearmand_con_st *free_dcon_list= NULL;
  538. int pthread_error;
  539. if ((pthread_error= pthread_mutex_lock(&(dcon->thread->lock))) == 0)
  540. {
  541. GEARMAND_LIST__ADD(dcon->thread->dcon_add, dcon);
  542. /* Take the free connection structures back to reuse. */
  543. free_dcon_list= dcon->thread->free_dcon_list;
  544. free_dcon_count= dcon->thread->free_dcon_count;
  545. dcon->thread->free_dcon_list= NULL;
  546. dcon->thread->free_dcon_count= 0;
  547. if ((pthread_error= pthread_mutex_unlock(&(dcon->thread->lock))))
  548. {
  549. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
  550. }
  551. }
  552. else
  553. {
  554. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
  555. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  556. }
  557. /* Only wakeup the thread if this is the first in the queue. We don't need
  558. to lock around the count check, worst case it was already picked up and
  559. we send an extra byte. */
  560. if (dcon->thread->dcon_add_count == 1)
  561. {
  562. gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
  563. }
  564. /* Put the free connection structures we grabbed on the main list. */
  565. while (free_dcon_list != NULL)
  566. {
  567. dcon= free_dcon_list;
  568. GEARMAND_LIST__DEL(free_dcon, dcon);
  569. GEARMAND_LIST__ADD(gearmand->free_dcon, dcon);
  570. }
  571. }
  572. gearmand->thread_add_next= gearmand->thread_add_next->next;
  573. return GEARMAND_SUCCESS;
  574. }
  575. void gearmand_con_free(gearmand_con_st *dcon)
  576. {
  577. if (event_initialized(&(dcon->event)))
  578. {
  579. if (event_del(&(dcon->event)) == -1)
  580. {
  581. gearmand_perror(errno, "event_del");
  582. }
  583. else
  584. {
  585. /* This gets around a libevent bug when both POLLIN and POLLOUT are set. */
  586. event_set(&(dcon->event), dcon->fd, EV_READ, _con_ready, dcon);
  587. if (event_base_set(dcon->thread->base, &(dcon->event)) == -1)
  588. {
  589. gearmand_perror(errno, "event_base_set");
  590. }
  591. if (event_add(&(dcon->event), NULL) == -1)
  592. {
  593. gearmand_perror(errno, "event_add");
  594. }
  595. else
  596. {
  597. if (event_del(&(dcon->event)) == -1)
  598. {
  599. gearmand_perror(errno, "event_del");
  600. }
  601. }
  602. }
  603. }
  604. // @note server_con could be null if we failed to complete the initial
  605. // connection.
  606. if (dcon->server_con)
  607. {
  608. gearman_server_con_attempt_free(dcon->server_con);
  609. }
  610. GEARMAND_LIST__DEL(dcon->thread->dcon, dcon);
  611. dcon->close_socket();
  612. if (Gearmand()->free_dcon_count < GEARMAND_MAX_FREE_SERVER_CON)
  613. {
  614. if (Gearmand()->threads == 0)
  615. {
  616. GEARMAND_LIST__ADD(Gearmand()->free_dcon, dcon);
  617. }
  618. else
  619. {
  620. /* Lock here because the main thread may be emptying this. */
  621. int error;
  622. if ((error= pthread_mutex_lock(&(dcon->thread->lock))) == 0)
  623. {
  624. GEARMAND_LIST__ADD(dcon->thread->free_dcon, dcon);
  625. if ((error= pthread_mutex_unlock(&(dcon->thread->lock))))
  626. {
  627. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_unlock");
  628. }
  629. }
  630. else
  631. {
  632. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
  633. }
  634. }
  635. }
  636. else
  637. {
  638. delete dcon;
  639. }
  640. }
  641. void gearmand_con_check_queue(gearmand_thread_st *thread)
  642. {
  643. /* Dirty check is fine here, wakeup is always sent after add completes. */
  644. if (thread->dcon_add_count == 0)
  645. {
  646. return;
  647. }
  648. /* We want to add new connections inside the lock because other threads may
  649. walk the thread's dcon_list while holding the lock. */
  650. while (thread->dcon_add_list != NULL)
  651. {
  652. int error;
  653. if ((error= pthread_mutex_lock(&(thread->lock))) == 0)
  654. {
  655. gearmand_con_st *dcon= thread->dcon_add_list;
  656. GEARMAND_LIST__DEL(thread->dcon_add, dcon);
  657. if ((error= pthread_mutex_unlock(&(thread->lock))))
  658. {
  659. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_unlock");
  660. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  661. }
  662. gearmand_error_t rc;
  663. if ((rc= _con_add(thread, dcon)) != GEARMAND_SUCCESS)
  664. {
  665. gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, rc, "%s:%s _con_add() has failed, please report any crashes that occur immediately after this.",
  666. dcon->host,
  667. dcon->port);
  668. gearmand_con_free(dcon);
  669. }
  670. }
  671. else
  672. {
  673. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
  674. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  675. }
  676. }
  677. }