gearmand_con.c 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Gearmand Connection Definitions
  11. */
  12. #include "common.h"
  13. #include "gearmand.h"
  14. /*
  15. * Private declarations
  16. */
  17. /**
  18. * @addtogroup gearmand_con_private Private Gearmand Connection Functions
  19. * @ingroup gearmand_con
  20. * @{
  21. */
  22. static void _con_ready(int fd, short events, void *arg);
  23. static gearman_return_t _con_add(gearmand_thread_st *thread,
  24. gearmand_con_st *con);
  25. /** @} */
  26. /*
  27. * Public definitions
  28. */
  29. gearman_return_t gearmand_con_create(gearmand_st *gearmand, int fd,
  30. const char *host, const char *port,
  31. gearman_con_add_fn *add_fn)
  32. {
  33. gearmand_con_st *dcon;
  34. gearmand_con_st *free_dcon_list;
  35. uint32_t free_dcon_count;
  36. if (gearmand->free_dcon_count > 0)
  37. {
  38. dcon= gearmand->free_dcon_list;
  39. GEARMAN_LIST_DEL(gearmand->free_dcon, dcon,)
  40. }
  41. else
  42. {
  43. dcon= malloc(sizeof(gearmand_con_st));
  44. if (dcon == NULL)
  45. {
  46. close(fd);
  47. GEARMAN_FATAL(gearmand, "gearmand_con_create:malloc")
  48. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  49. }
  50. }
  51. dcon->last_events= 0;
  52. dcon->fd= fd;
  53. dcon->next= NULL;
  54. dcon->prev= NULL;
  55. dcon->server_con= NULL;
  56. dcon->con= NULL;
  57. dcon->add_fn= NULL;
  58. strncpy(dcon->host, host, NI_MAXHOST - 1);
  59. strncpy(dcon->port, port, NI_MAXSERV - 1);
  60. dcon->add_fn= add_fn;
  61. /* If we are not threaded, just add the connection now. */
  62. if (gearmand->threads == 0)
  63. {
  64. dcon->thread= gearmand->thread_list;
  65. return _con_add(gearmand->thread_list, dcon);
  66. }
  67. /* We do a simple round-robin connection queue algorithm here. */
  68. if (gearmand->thread_add_next == NULL)
  69. gearmand->thread_add_next= gearmand->thread_list;
  70. dcon->thread= gearmand->thread_add_next;
  71. /* We don't need to lock if the list is empty. */
  72. if (dcon->thread->dcon_add_count == 0 &&
  73. dcon->thread->free_dcon_count < gearmand->max_thread_free_dcon_count)
  74. {
  75. GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
  76. gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
  77. }
  78. else
  79. {
  80. (void ) pthread_mutex_lock(&(dcon->thread->lock));
  81. GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
  82. /* Take the free connection structures back to reuse. */
  83. free_dcon_list= dcon->thread->free_dcon_list;
  84. free_dcon_count= dcon->thread->free_dcon_count;
  85. dcon->thread->free_dcon_list= NULL;
  86. dcon->thread->free_dcon_count= 0;
  87. (void ) pthread_mutex_unlock(&(dcon->thread->lock));
  88. /* Only wakeup the thread if this is the first in the queue. We don't need
  89. to lock around the count check, worst case it was already picked up and
  90. we send an extra byte. */
  91. if (dcon->thread->dcon_add_count == 1)
  92. gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
  93. /* Put the free connection structures we grabbed on the main list. */
  94. while (free_dcon_list != NULL)
  95. {
  96. dcon= free_dcon_list;
  97. GEARMAN_LIST_DEL(free_dcon, dcon,)
  98. GEARMAN_LIST_ADD(gearmand->free_dcon, dcon,)
  99. }
  100. }
  101. gearmand->thread_add_next= gearmand->thread_add_next->next;
  102. return GEARMAN_SUCCESS;
  103. }
  104. void gearmand_con_free(gearmand_con_st *dcon)
  105. {
  106. assert(event_del(&(dcon->event)) == 0);
  107. /* This gets around a libevent bug when both POLLIN and POLLOUT are set. */
  108. event_set(&(dcon->event), dcon->fd, EV_READ, _con_ready, dcon);
  109. event_base_set(dcon->thread->base, &(dcon->event));
  110. event_add(&(dcon->event), NULL);
  111. assert(event_del(&(dcon->event)) == 0);
  112. gearman_server_con_free(dcon->server_con);
  113. GEARMAN_LIST_DEL(dcon->thread->dcon, dcon,)
  114. close(dcon->fd);
  115. if (dcon->thread->gearmand->free_dcon_count < GEARMAN_MAX_FREE_SERVER_CON)
  116. {
  117. if (dcon->thread->gearmand->threads == 0)
  118. GEARMAN_LIST_ADD(dcon->thread->gearmand->free_dcon, dcon,)
  119. else
  120. {
  121. /* Lock here because the main thread may be emptying this. */
  122. (void ) pthread_mutex_lock(&(dcon->thread->lock));
  123. GEARMAN_LIST_ADD(dcon->thread->free_dcon, dcon,)
  124. (void ) pthread_mutex_unlock(&(dcon->thread->lock));
  125. }
  126. }
  127. else
  128. free(dcon);
  129. }
  130. void gearmand_con_check_queue(gearmand_thread_st *thread)
  131. {
  132. gearmand_con_st *dcon;
  133. /* Dirty check is fine here, wakeup is always sent after add completes. */
  134. if (thread->dcon_add_count == 0)
  135. return;
  136. /* We want to add new connections inside the lock because other threads may
  137. walk the thread's dcon_list while holding the lock. */
  138. while (thread->dcon_add_list != NULL)
  139. {
  140. (void ) pthread_mutex_lock(&(thread->lock));
  141. dcon= thread->dcon_add_list;
  142. GEARMAN_LIST_DEL(thread->dcon_add, dcon,)
  143. (void ) pthread_mutex_unlock(&(thread->lock));
  144. if (_con_add(thread, dcon) != GEARMAN_SUCCESS)
  145. gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
  146. }
  147. }
  148. gearman_return_t gearmand_con_watch(gearman_con_st *con, short events,
  149. void *context __attribute__ ((unused)))
  150. {
  151. (void) context;
  152. gearmand_con_st *dcon;
  153. short set_events= 0;
  154. dcon= (gearmand_con_st *)gearman_con_context(con);
  155. dcon->con= con;
  156. if (events & POLLIN)
  157. set_events|= EV_READ;
  158. if (events & POLLOUT)
  159. set_events|= EV_WRITE;
  160. if (dcon->last_events != set_events)
  161. {
  162. if (dcon->last_events != 0)
  163. assert(event_del(&(dcon->event)) == 0);
  164. event_set(&(dcon->event), dcon->fd, set_events | EV_PERSIST, _con_ready,
  165. dcon);
  166. event_base_set(dcon->thread->base, &(dcon->event));
  167. if (event_add(&(dcon->event), NULL) == -1)
  168. {
  169. GEARMAN_FATAL(dcon->thread->gearmand, "_con_watch:event_add:-1")
  170. return GEARMAN_EVENT;
  171. }
  172. dcon->last_events= set_events;
  173. }
  174. GEARMAN_CRAZY(dcon->thread->gearmand, "[%4u] %15s:%5s Watching %6s %s",
  175. dcon->thread->count, dcon->host, dcon->port,
  176. events & POLLIN ? "POLLIN" : "",
  177. events & POLLOUT ? "POLLOUT" : "")
  178. return GEARMAN_SUCCESS;
  179. }
  180. /*
  181. * Private definitions
  182. */
  183. static void _con_ready(int fd __attribute__ ((unused)), short events,
  184. void *arg)
  185. {
  186. gearmand_con_st *dcon= (gearmand_con_st *)arg;
  187. short revents= 0;
  188. gearman_return_t ret;
  189. if (events & EV_READ)
  190. revents|= POLLIN;
  191. if (events & EV_WRITE)
  192. revents|= POLLOUT;
  193. ret= gearman_con_set_revents(dcon->con, revents);
  194. if (ret != GEARMAN_SUCCESS)
  195. {
  196. gearmand_con_free(dcon);
  197. return;
  198. }
  199. GEARMAN_CRAZY(dcon->thread->gearmand, "[%4u] %15s:%5s Ready %6s %s",
  200. dcon->thread->count, dcon->host, dcon->port,
  201. revents & POLLIN ? "POLLIN" : "",
  202. revents & POLLOUT ? "POLLOUT" : "")
  203. gearmand_thread_run(dcon->thread);
  204. }
  205. static gearman_return_t _con_add(gearmand_thread_st *thread,
  206. gearmand_con_st *dcon)
  207. {
  208. gearman_return_t ret;
  209. dcon->server_con= gearman_server_con_add(&(thread->server_thread), dcon->fd,
  210. dcon);
  211. if (dcon->server_con == NULL)
  212. {
  213. close(dcon->fd);
  214. free(dcon);
  215. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  216. }
  217. gearman_server_con_set_host(dcon->server_con, dcon->host);
  218. gearman_server_con_set_port(dcon->server_con, dcon->port);
  219. if (dcon->add_fn != NULL)
  220. {
  221. ret= (*dcon->add_fn)(gearman_server_con_con(dcon->server_con));
  222. if (ret != GEARMAN_SUCCESS)
  223. {
  224. gearman_server_con_free(dcon->server_con);
  225. close(dcon->fd);
  226. free(dcon);
  227. return ret;
  228. }
  229. }
  230. GEARMAN_INFO(thread->gearmand, "[%4u] %15s:%5s Connected", thread->count,
  231. dcon->host, dcon->port)
  232. GEARMAN_LIST_ADD(thread->dcon, dcon,)
  233. return GEARMAN_SUCCESS;
  234. }