gearmand_con.c 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Gearmand Connection Definitions
  11. */
  12. #include <libgearman-server/common.h>
  13. #include <libgearman-server/gearmand.h>
  14. #include <string.h>
  15. #include <errno.h>
  16. #include <assert.h>
  17. /*
  18. * Private declarations
  19. */
  20. /**
  21. * @addtogroup gearmand_con_private Private Gearmand Connection Functions
  22. * @ingroup gearmand_con
  23. * @{
  24. */
  25. static void _con_ready(int fd, short events, void *arg);
  26. static gearmand_error_t _con_add(gearmand_thread_st *thread,
  27. gearmand_con_st *con);
  28. /** @} */
  29. /*
  30. * Public definitions
  31. */
  32. gearmand_error_t gearmand_con_create(gearmand_st *gearmand, int fd,
  33. const char *host, const char *port,
  34. gearmand_connection_add_fn *add_fn)
  35. {
  36. gearmand_con_st *dcon;
  37. if (gearmand->free_dcon_count > 0)
  38. {
  39. dcon= gearmand->free_dcon_list;
  40. GEARMAN_LIST_DEL(gearmand->free_dcon, dcon,)
  41. }
  42. else
  43. {
  44. dcon= (gearmand_con_st *)malloc(sizeof(gearmand_con_st));
  45. if (dcon == NULL)
  46. {
  47. gearmand_perror("malloc");
  48. gearmand_sockfd_close(fd);
  49. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  50. }
  51. memset(&dcon->event, 0, sizeof(struct event));
  52. }
  53. dcon->last_events= 0;
  54. dcon->fd= fd;
  55. dcon->next= NULL;
  56. dcon->prev= NULL;
  57. dcon->server_con= NULL;
  58. dcon->add_fn= NULL;
  59. strncpy(dcon->host, host, NI_MAXHOST - 1);
  60. strncpy(dcon->port, port, NI_MAXSERV - 1);
  61. dcon->add_fn= add_fn;
  62. /* If we are not threaded, just add the connection now. */
  63. if (gearmand->threads == 0)
  64. {
  65. dcon->thread= gearmand->thread_list;
  66. return _con_add(gearmand->thread_list, dcon);
  67. }
  68. /* We do a simple round-robin connection queue algorithm here. */
  69. if (gearmand->thread_add_next == NULL)
  70. gearmand->thread_add_next= gearmand->thread_list;
  71. dcon->thread= gearmand->thread_add_next;
  72. /* We don't need to lock if the list is empty. */
  73. if (dcon->thread->dcon_add_count == 0 &&
  74. dcon->thread->free_dcon_count < gearmand->max_thread_free_dcon_count)
  75. {
  76. GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
  77. gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
  78. }
  79. else
  80. {
  81. uint32_t free_dcon_count;
  82. gearmand_con_st *free_dcon_list;
  83. (void ) pthread_mutex_lock(&(dcon->thread->lock));
  84. GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
  85. /* Take the free connection structures back to reuse. */
  86. free_dcon_list= dcon->thread->free_dcon_list;
  87. free_dcon_count= dcon->thread->free_dcon_count;
  88. dcon->thread->free_dcon_list= NULL;
  89. dcon->thread->free_dcon_count= 0;
  90. (void ) pthread_mutex_unlock(&(dcon->thread->lock));
  91. /* Only wakeup the thread if this is the first in the queue. We don't need
  92. to lock around the count check, worst case it was already picked up and
  93. we send an extra byte. */
  94. if (dcon->thread->dcon_add_count == 1)
  95. gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
  96. /* Put the free connection structures we grabbed on the main list. */
  97. while (free_dcon_list != NULL)
  98. {
  99. dcon= free_dcon_list;
  100. GEARMAN_LIST_DEL(free_dcon, dcon,)
  101. GEARMAN_LIST_ADD(gearmand->free_dcon, dcon,)
  102. }
  103. }
  104. gearmand->thread_add_next= gearmand->thread_add_next->next;
  105. return GEARMAN_SUCCESS;
  106. }
  107. void gearmand_con_free(gearmand_con_st *dcon)
  108. {
  109. if (event_initialized(&(dcon->event)))
  110. {
  111. if (event_del(&(dcon->event)) < 0)
  112. {
  113. gearmand_perror("event_del");
  114. }
  115. else
  116. {
  117. /* This gets around a libevent bug when both POLLIN and POLLOUT are set. */
  118. event_set(&(dcon->event), dcon->fd, EV_READ, _con_ready, dcon);
  119. event_base_set(dcon->thread->base, &(dcon->event));
  120. if (event_add(&(dcon->event), NULL) < 0)
  121. {
  122. gearmand_perror("event_add");
  123. }
  124. else
  125. {
  126. if (event_del(&(dcon->event)) < 0)
  127. {
  128. gearmand_perror("event_del");
  129. }
  130. }
  131. }
  132. }
  133. // @note server_con could be null if we failed to complete the initial
  134. // connection.
  135. if (dcon->server_con)
  136. {
  137. gearman_server_con_free(dcon->server_con);
  138. }
  139. GEARMAN_LIST_DEL(dcon->thread->dcon, dcon,)
  140. gearmand_sockfd_close(dcon->fd);
  141. if (Gearmand()->free_dcon_count < GEARMAN_MAX_FREE_SERVER_CON)
  142. {
  143. if (Gearmand()->threads == 0)
  144. {
  145. GEARMAN_LIST_ADD(Gearmand()->free_dcon, dcon,)
  146. }
  147. else
  148. {
  149. /* Lock here because the main thread may be emptying this. */
  150. int error;
  151. if (! (error= pthread_mutex_lock(&(dcon->thread->lock))))
  152. {
  153. GEARMAN_LIST_ADD(dcon->thread->free_dcon, dcon,);
  154. (void ) pthread_mutex_unlock(&(dcon->thread->lock));
  155. }
  156. else
  157. {
  158. errno= error;
  159. gearmand_perror("pthread_mutex_lock");
  160. }
  161. }
  162. }
  163. else
  164. {
  165. gearmand_crazy("free");
  166. free(dcon);
  167. }
  168. }
  169. void gearmand_con_check_queue(gearmand_thread_st *thread)
  170. {
  171. /* Dirty check is fine here, wakeup is always sent after add completes. */
  172. if (thread->dcon_add_count == 0)
  173. return;
  174. /* We want to add new connections inside the lock because other threads may
  175. walk the thread's dcon_list while holding the lock. */
  176. while (thread->dcon_add_list != NULL)
  177. {
  178. int error;
  179. if (! (error= pthread_mutex_lock(&(thread->lock))))
  180. {
  181. gearmand_con_st *dcon= thread->dcon_add_list;
  182. GEARMAN_LIST_DEL(thread->dcon_add, dcon,);
  183. if ((error= pthread_mutex_unlock(&(thread->lock))) != 0)
  184. {
  185. errno= error;
  186. gearmand_perror("pthread_mutex_unlock");
  187. gearmand_fatal("Error in locking forcing a shutdown");
  188. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  189. }
  190. gearmand_error_t rc;
  191. if ((rc= _con_add(thread, dcon)) != GEARMAN_SUCCESS)
  192. {
  193. gearmand_gerror("_con_add() has failed, please report any crashes that occur immediatly after this.", rc);
  194. gearmand_con_free(dcon);
  195. }
  196. }
  197. else
  198. {
  199. errno= error;
  200. gearmand_perror("pthread_mutex_lock");
  201. gearmand_fatal("Lock could not be taken on thread->, shutdown to occur");
  202. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  203. }
  204. }
  205. }
  206. gearmand_error_t gearmand_connection_watch(gearmand_io_st *con, short events,
  207. void *context __attribute__ ((unused)))
  208. {
  209. (void) context;
  210. gearmand_con_st *dcon;
  211. short set_events= 0;
  212. dcon= (gearmand_con_st *)gearman_io_context(con);
  213. if (events & POLLIN)
  214. set_events|= EV_READ;
  215. if (events & POLLOUT)
  216. set_events|= EV_WRITE;
  217. if (dcon->last_events != set_events)
  218. {
  219. if (dcon->last_events != 0)
  220. {
  221. if (event_del(&(dcon->event)) < 0)
  222. {
  223. gearmand_perror("event_del");
  224. assert(! "event_del");
  225. }
  226. }
  227. event_set(&(dcon->event), dcon->fd, set_events | EV_PERSIST, _con_ready, dcon);
  228. event_base_set(dcon->thread->base, &(dcon->event));
  229. if (event_add(&(dcon->event), NULL) < 0)
  230. {
  231. gearmand_perror("event_add");
  232. return GEARMAN_EVENT;
  233. }
  234. dcon->last_events= set_events;
  235. }
  236. gearmand_log_crazy("%15s:%5s Watching %6s %s",
  237. dcon->host, dcon->port,
  238. events & POLLIN ? "POLLIN" : "",
  239. events & POLLOUT ? "POLLOUT" : "");
  240. return GEARMAN_SUCCESS;
  241. }
  242. /*
  243. * Private definitions
  244. */
  245. static void _con_ready(int fd __attribute__ ((unused)), short events,
  246. void *arg)
  247. {
  248. gearmand_con_st *dcon= (gearmand_con_st *)arg;
  249. short revents= 0;
  250. if (events & EV_READ)
  251. revents|= POLLIN;
  252. if (events & EV_WRITE)
  253. revents|= POLLOUT;
  254. gearmand_error_t ret;
  255. ret= gearmand_io_set_revents(dcon->server_con, revents);
  256. if (ret != GEARMAN_SUCCESS)
  257. {
  258. gearmand_gerror("gearmand_io_set_revents", ret);
  259. gearmand_con_free(dcon);
  260. return;
  261. }
  262. gearmand_log_crazy("%15s:%5s Ready %6s %s",
  263. dcon->host, dcon->port,
  264. revents & POLLIN ? "POLLIN" : "",
  265. revents & POLLOUT ? "POLLOUT" : "");
  266. gearmand_thread_run(dcon->thread);
  267. }
  268. static gearmand_error_t _con_add(gearmand_thread_st *thread,
  269. gearmand_con_st *dcon)
  270. {
  271. gearmand_error_t ret= GEARMAN_SUCCESS;
  272. dcon->server_con= gearman_server_con_add(&(thread->server_thread), dcon, &ret);
  273. assert(dcon->server_con || ret != GEARMAN_SUCCESS);
  274. assert(! dcon->server_con || ret == GEARMAN_SUCCESS);
  275. if (dcon->server_con == NULL)
  276. {
  277. gearmand_sockfd_close(dcon->fd);
  278. return ret;
  279. }
  280. if (dcon->add_fn != NULL)
  281. {
  282. ret= (*dcon->add_fn)(dcon->server_con);
  283. if (ret != GEARMAN_SUCCESS)
  284. {
  285. gearman_server_con_free(dcon->server_con);
  286. gearmand_sockfd_close(dcon->fd);
  287. return ret;
  288. }
  289. }
  290. gearmand_log_info("%15s:%5s Connected", dcon->host, dcon->port);
  291. GEARMAN_LIST_ADD(thread->dcon, dcon,)
  292. return GEARMAN_SUCCESS;
  293. }