gearmand_con.c 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Gearmand Connection Definitions
  11. */
  12. #include <libgearman-server/common.h>
  13. #include <libgearman-server/gearmand.h>
  14. #include <string.h>
  15. #include <errno.h>
  16. #include <assert.h>
  17. #include <iso646.h>
  18. /*
  19. * Private declarations
  20. */
  21. /**
  22. * @addtogroup gearmand_con_private Private Gearmand Connection Functions
  23. * @ingroup gearmand_con
  24. * @{
  25. */
  26. static void _con_ready(int fd, short events, void *arg);
  27. static gearmand_error_t _con_add(gearmand_thread_st *thread,
  28. gearmand_con_st *con);
  29. /** @} */
  30. /*
  31. * Public definitions
  32. */
  33. gearmand_error_t gearmand_con_create(gearmand_st *gearmand, int fd,
  34. const char *host, const char *port,
  35. gearmand_connection_add_fn *add_fn)
  36. {
  37. gearmand_con_st *dcon;
  38. if (gearmand->free_dcon_count > 0)
  39. {
  40. dcon= gearmand->free_dcon_list;
  41. GEARMAN_LIST_DEL(gearmand->free_dcon, dcon,)
  42. }
  43. else
  44. {
  45. dcon= (gearmand_con_st *)malloc(sizeof(gearmand_con_st));
  46. if (dcon == NULL)
  47. {
  48. gearmand_perror("malloc");
  49. gearmand_sockfd_close(fd);
  50. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  51. }
  52. memset(&dcon->event, 0, sizeof(struct event));
  53. }
  54. dcon->last_events= 0;
  55. dcon->fd= fd;
  56. dcon->next= NULL;
  57. dcon->prev= NULL;
  58. dcon->server_con= NULL;
  59. dcon->add_fn= NULL;
  60. strncpy(dcon->host, host, NI_MAXHOST - 1);
  61. strncpy(dcon->port, port, NI_MAXSERV - 1);
  62. dcon->add_fn= add_fn;
  63. /* If we are not threaded, just add the connection now. */
  64. if (gearmand->threads == 0)
  65. {
  66. dcon->thread= gearmand->thread_list;
  67. return _con_add(gearmand->thread_list, dcon);
  68. }
  69. /* We do a simple round-robin connection queue algorithm here. */
  70. if (gearmand->thread_add_next == NULL)
  71. gearmand->thread_add_next= gearmand->thread_list;
  72. dcon->thread= gearmand->thread_add_next;
  73. /* We don't need to lock if the list is empty. */
  74. if (dcon->thread->dcon_add_count == 0 &&
  75. dcon->thread->free_dcon_count < gearmand->max_thread_free_dcon_count)
  76. {
  77. GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
  78. gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
  79. }
  80. else
  81. {
  82. uint32_t free_dcon_count;
  83. gearmand_con_st *free_dcon_list;
  84. (void ) pthread_mutex_lock(&(dcon->thread->lock));
  85. GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
  86. /* Take the free connection structures back to reuse. */
  87. free_dcon_list= dcon->thread->free_dcon_list;
  88. free_dcon_count= dcon->thread->free_dcon_count;
  89. dcon->thread->free_dcon_list= NULL;
  90. dcon->thread->free_dcon_count= 0;
  91. (void ) pthread_mutex_unlock(&(dcon->thread->lock));
  92. /* Only wakeup the thread if this is the first in the queue. We don't need
  93. to lock around the count check, worst case it was already picked up and
  94. we send an extra byte. */
  95. if (dcon->thread->dcon_add_count == 1)
  96. gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
  97. /* Put the free connection structures we grabbed on the main list. */
  98. while (free_dcon_list != NULL)
  99. {
  100. dcon= free_dcon_list;
  101. GEARMAN_LIST_DEL(free_dcon, dcon,)
  102. GEARMAN_LIST_ADD(gearmand->free_dcon, dcon,)
  103. }
  104. }
  105. gearmand->thread_add_next= gearmand->thread_add_next->next;
  106. return GEARMAN_SUCCESS;
  107. }
  108. void gearmand_con_free(gearmand_con_st *dcon)
  109. {
  110. if (event_initialized(&(dcon->event)))
  111. {
  112. if (event_del(&(dcon->event)) < 0)
  113. {
  114. gearmand_perror("event_del");
  115. }
  116. else
  117. {
  118. /* This gets around a libevent bug when both POLLIN and POLLOUT are set. */
  119. event_set(&(dcon->event), dcon->fd, EV_READ, _con_ready, dcon);
  120. event_base_set(dcon->thread->base, &(dcon->event));
  121. if (event_add(&(dcon->event), NULL) < 0)
  122. {
  123. gearmand_perror("event_add");
  124. }
  125. else
  126. {
  127. if (event_del(&(dcon->event)) < 0)
  128. {
  129. gearmand_perror("event_del");
  130. }
  131. }
  132. }
  133. }
  134. // @note server_con could be null if we failed to complete the initial
  135. // connection.
  136. if (dcon->server_con)
  137. {
  138. gearman_server_con_free(dcon->server_con);
  139. }
  140. GEARMAN_LIST_DEL(dcon->thread->dcon, dcon,)
  141. gearmand_sockfd_close(dcon->fd);
  142. if (Gearmand()->free_dcon_count < GEARMAN_MAX_FREE_SERVER_CON)
  143. {
  144. if (Gearmand()->threads == 0)
  145. {
  146. GEARMAN_LIST_ADD(Gearmand()->free_dcon, dcon,)
  147. }
  148. else
  149. {
  150. /* Lock here because the main thread may be emptying this. */
  151. int error;
  152. if (not (error= pthread_mutex_lock(&(dcon->thread->lock))))
  153. {
  154. GEARMAN_LIST_ADD(dcon->thread->free_dcon, dcon,);
  155. (void ) pthread_mutex_unlock(&(dcon->thread->lock));
  156. }
  157. else
  158. {
  159. errno= error;
  160. gearmand_perror("pthread_mutex_lock");
  161. }
  162. }
  163. }
  164. else
  165. {
  166. gearmand_debug("free");
  167. free(dcon);
  168. }
  169. }
  170. void gearmand_con_check_queue(gearmand_thread_st *thread)
  171. {
  172. /* Dirty check is fine here, wakeup is always sent after add completes. */
  173. if (thread->dcon_add_count == 0)
  174. return;
  175. /* We want to add new connections inside the lock because other threads may
  176. walk the thread's dcon_list while holding the lock. */
  177. while (thread->dcon_add_list != NULL)
  178. {
  179. int error;
  180. if (not (error= pthread_mutex_lock(&(thread->lock))))
  181. {
  182. gearmand_con_st *dcon= thread->dcon_add_list;
  183. GEARMAN_LIST_DEL(thread->dcon_add, dcon,);
  184. if ((error= pthread_mutex_unlock(&(thread->lock))) != 0)
  185. {
  186. errno= error;
  187. gearmand_perror("pthread_mutex_unlock");
  188. gearmand_fatal("Error in locking forcing a shutdown");
  189. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  190. }
  191. gearmand_error_t rc;
  192. if ((rc= _con_add(thread, dcon)) != GEARMAN_SUCCESS)
  193. {
  194. gearmand_gerror("_con_add() has failed, please report any crashes that occur immediatly after this.", rc);
  195. gearmand_con_free(dcon);
  196. }
  197. }
  198. else
  199. {
  200. errno= error;
  201. gearmand_perror("pthread_mutex_lock");
  202. gearmand_fatal("Lock could not be taken on thread->, shutdown to occur");
  203. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  204. }
  205. }
  206. }
  207. gearmand_error_t gearmand_connection_watch(gearmand_io_st *con, short events,
  208. void *context __attribute__ ((unused)))
  209. {
  210. (void) context;
  211. gearmand_con_st *dcon;
  212. short set_events= 0;
  213. dcon= (gearmand_con_st *)gearman_io_context(con);
  214. if (events & POLLIN)
  215. set_events|= EV_READ;
  216. if (events & POLLOUT)
  217. set_events|= EV_WRITE;
  218. if (dcon->last_events != set_events)
  219. {
  220. if (dcon->last_events)
  221. {
  222. if (event_del(&(dcon->event)) < 0)
  223. {
  224. gearmand_perror("event_del");
  225. assert(! "event_del");
  226. }
  227. }
  228. event_set(&(dcon->event), dcon->fd, set_events | EV_PERSIST, _con_ready, dcon);
  229. event_base_set(dcon->thread->base, &(dcon->event));
  230. if (event_add(&(dcon->event), NULL) < 0)
  231. {
  232. gearmand_perror("event_add");
  233. return GEARMAN_EVENT;
  234. }
  235. dcon->last_events= set_events;
  236. }
  237. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  238. "%15s:%5s Watching %6s %s",
  239. dcon->host, dcon->port,
  240. events & POLLIN ? "POLLIN" : "",
  241. events & POLLOUT ? "POLLOUT" : "");
  242. return GEARMAN_SUCCESS;
  243. }
  244. /*
  245. * Private definitions
  246. */
  247. static void _con_ready(int fd __attribute__ ((unused)), short events,
  248. void *arg)
  249. {
  250. gearmand_con_st *dcon= (gearmand_con_st *)arg;
  251. short revents= 0;
  252. if (events & EV_READ)
  253. revents|= POLLIN;
  254. if (events & EV_WRITE)
  255. revents|= POLLOUT;
  256. gearmand_error_t ret= gearmand_io_set_revents(dcon->server_con, revents);
  257. if (gearmand_failed(ret))
  258. {
  259. gearmand_gerror("gearmand_io_set_revents", ret);
  260. gearmand_con_free(dcon);
  261. return;
  262. }
  263. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  264. "%15:%5 Ready %6s %s",
  265. dcon->host, dcon->port,
  266. revents & POLLIN ? "POLLIN" : "",
  267. revents & POLLOUT ? "POLLOUT" : "");
  268. gearmand_thread_run(dcon->thread);
  269. }
  270. static gearmand_error_t _con_add(gearmand_thread_st *thread,
  271. gearmand_con_st *dcon)
  272. {
  273. gearmand_error_t ret= GEARMAN_SUCCESS;
  274. dcon->server_con= gearman_server_con_add(&(thread->server_thread), dcon, &ret);
  275. assert(dcon->server_con || ret != GEARMAN_SUCCESS);
  276. assert(! dcon->server_con || ret == GEARMAN_SUCCESS);
  277. if (not dcon->server_con)
  278. {
  279. gearmand_sockfd_close(dcon->fd);
  280. return ret;
  281. }
  282. if (dcon->add_fn)
  283. {
  284. ret= (*dcon->add_fn)(dcon->server_con);
  285. if (gearmand_failed(ret))
  286. {
  287. gearman_server_con_free(dcon->server_con);
  288. gearmand_sockfd_close(dcon->fd);
  289. return ret;
  290. }
  291. }
  292. GEARMAN_LIST_ADD(thread->dcon, dcon,)
  293. return GEARMAN_SUCCESS;
  294. }