connection.c 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Server connection definitions
  11. */
  12. #include <config.h>
  13. #include <libgearman-server/common.h>
  14. #include <string.h>
  15. #include <errno.h>
  16. #include <assert.h>
  17. static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
  18. gearmand_error_t *ret);
  19. /*
  20. * Public definitions
  21. */
  22. gearman_server_con_st *gearman_server_con_add(gearman_server_thread_st *thread, gearmand_con_st *dcon, gearmand_error_t *ret)
  23. {
  24. gearman_server_con_st *con;
  25. con= _server_con_create(thread, dcon, ret);
  26. if (con == NULL)
  27. {
  28. return NULL;
  29. }
  30. if ((*ret= gearman_io_set_fd(&(con->con), dcon->fd)) != GEARMAN_SUCCESS)
  31. {
  32. gearman_server_con_free(con);
  33. return NULL;
  34. }
  35. *ret= gearmand_io_set_events(con, POLLIN);
  36. if (*ret != GEARMAN_SUCCESS)
  37. {
  38. gearmand_gerror("gearmand_io_set_events", *ret);
  39. gearman_server_con_free(con);
  40. return NULL;
  41. }
  42. return con;
  43. }
  44. static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
  45. gearmand_error_t *ret)
  46. {
  47. gearman_server_con_st *con;
  48. if (thread->free_con_count > 0)
  49. {
  50. con= thread->free_con_list;
  51. GEARMAN_LIST_DEL(thread->free_con, con,)
  52. }
  53. else
  54. {
  55. con= (gearman_server_con_st *)malloc(sizeof(gearman_server_con_st));
  56. if (con == NULL)
  57. {
  58. gearmand_perror("malloc");
  59. *ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  60. return NULL;
  61. }
  62. }
  63. assert(con);
  64. if (!con)
  65. {
  66. gearmand_error("Neigther an allocated gearman_server_con_st() or free listed could be found");
  67. *ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  68. return NULL;
  69. }
  70. gearmand_connection_options_t options[]= { GEARMAND_CON_MAX };
  71. gearmand_connection_init(thread->gearman, &(con->con), dcon, options);
  72. con->con.root= con;
  73. con->is_sleeping= false;
  74. con->is_exceptions= false;
  75. con->is_dead= false;
  76. con->is_noop_sent= false;
  77. con->ret= 0;
  78. con->io_list= false;
  79. con->proc_list= false;
  80. con->proc_removed= false;
  81. con->io_packet_count= 0;
  82. con->proc_packet_count= 0;
  83. con->worker_count= 0;
  84. con->client_count= 0;
  85. con->thread= thread;
  86. con->packet= NULL;
  87. con->io_packet_list= NULL;
  88. con->io_packet_end= NULL;
  89. con->proc_packet_list= NULL;
  90. con->proc_packet_end= NULL;
  91. con->io_next= NULL;
  92. con->io_prev= NULL;
  93. con->proc_next= NULL;
  94. con->proc_prev= NULL;
  95. con->worker_list= NULL;
  96. con->client_list= NULL;
  97. con->_host= dcon->host;
  98. con->_port= dcon->port;
  99. strcpy(con->id, "-");
  100. con->protocol.context= NULL;
  101. con->protocol.context_free_fn= NULL;
  102. con->protocol.packet_pack_fn= gearmand_packet_pack;
  103. con->protocol.packet_unpack_fn= gearmand_packet_unpack;
  104. int error;
  105. if (! (error= pthread_mutex_lock(&thread->lock)))
  106. {
  107. GEARMAN_LIST_ADD(thread->con, con,);
  108. (void) pthread_mutex_unlock(&thread->lock);
  109. }
  110. else
  111. {
  112. errno= error;
  113. gearmand_perror("pthread_mutex_lock");
  114. gearman_server_con_free(con);
  115. *ret= GEARMAN_ERRNO;
  116. return NULL;
  117. }
  118. return con;
  119. }
  120. void gearman_server_con_free(gearman_server_con_st *con)
  121. {
  122. gearman_server_thread_st *thread= con->thread;
  123. gearman_server_packet_st *packet;
  124. con->_host= NULL;
  125. con->_port= NULL;
  126. if (Server->flags.threaded && !(con->proc_removed) && !(Server->proc_shutdown))
  127. {
  128. con->is_dead= true;
  129. con->is_sleeping= false;
  130. con->is_exceptions= false;
  131. con->is_noop_sent= false;
  132. gearman_server_con_proc_add(con);
  133. return;
  134. }
  135. gearmand_io_free(&(con->con));
  136. if (con->protocol.context != NULL && con->protocol.context_free_fn != NULL)
  137. {
  138. con->protocol.context_free_fn(con, (void *)con->protocol.context);
  139. }
  140. if (con->proc_list)
  141. {
  142. gearman_server_con_proc_remove(con);
  143. }
  144. if (con->io_list)
  145. {
  146. gearman_server_con_io_remove(con);
  147. }
  148. if (con->packet != NULL)
  149. {
  150. if (&(con->packet->packet) != con->con.recv_packet)
  151. {
  152. gearmand_packet_free(&(con->packet->packet));
  153. }
  154. gearman_server_packet_free(con->packet, con->thread, true);
  155. }
  156. while (con->io_packet_list != NULL)
  157. {
  158. gearman_server_io_packet_remove(con);
  159. }
  160. while (con->proc_packet_list != NULL)
  161. {
  162. packet= gearman_server_proc_packet_remove(con);
  163. gearmand_packet_free(&(packet->packet));
  164. gearman_server_packet_free(packet, con->thread, true);
  165. }
  166. gearman_server_con_free_workers(con);
  167. while (con->client_list != NULL)
  168. {
  169. gearman_server_client_free(con->client_list);
  170. }
  171. (void) pthread_mutex_lock(&thread->lock);
  172. GEARMAN_LIST_DEL(con->thread->con, con,)
  173. (void) pthread_mutex_unlock(&thread->lock);
  174. if (thread->free_con_count < GEARMAN_MAX_FREE_SERVER_CON)
  175. {
  176. GEARMAN_LIST_ADD(thread->free_con, con,)
  177. }
  178. else
  179. {
  180. gearmand_debug("free");
  181. free(con);
  182. }
  183. }
  184. gearmand_io_st *gearman_server_con_con(gearman_server_con_st *con)
  185. {
  186. return &con->con;
  187. }
  188. gearmand_con_st *gearman_server_con_data(gearman_server_con_st *con)
  189. {
  190. return gearman_io_context(&(con->con));
  191. }
  192. const char *gearman_server_con_id(gearman_server_con_st *con)
  193. {
  194. return con->id;
  195. }
  196. void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
  197. size_t size)
  198. {
  199. if (size >= GEARMAN_SERVER_CON_ID_SIZE)
  200. size= GEARMAN_SERVER_CON_ID_SIZE - 1;
  201. memcpy(con->id, id, size);
  202. con->id[size]= 0;
  203. }
  204. void gearman_server_con_free_worker(gearman_server_con_st *con,
  205. char *function_name,
  206. size_t function_name_size)
  207. {
  208. gearman_server_worker_st *worker= con->worker_list;
  209. gearman_server_worker_st *prev_worker= NULL;
  210. while (worker != NULL)
  211. {
  212. if (worker->function->function_name_size == function_name_size &&
  213. !memcmp(worker->function->function_name, function_name,
  214. function_name_size))
  215. {
  216. gearman_server_worker_free(worker);
  217. /* Set worker to the last kept worker, or the beginning of the list. */
  218. if (prev_worker == NULL)
  219. worker= con->worker_list;
  220. else
  221. worker= prev_worker;
  222. }
  223. else
  224. {
  225. /* Save this so we don't need to scan the list again if one is removed. */
  226. prev_worker= worker;
  227. worker= worker->con_next;
  228. }
  229. }
  230. }
  231. void gearman_server_con_free_workers(gearman_server_con_st *con)
  232. {
  233. while (con->worker_list != NULL)
  234. gearman_server_worker_free(con->worker_list);
  235. }
  236. void gearman_server_con_io_add(gearman_server_con_st *con)
  237. {
  238. if (con->io_list)
  239. return;
  240. (void) pthread_mutex_lock(&con->thread->lock);
  241. GEARMAN_LIST_ADD(con->thread->io, con, io_)
  242. con->io_list= true;
  243. /* Looks funny, but need to check io_count locked, but call run unlocked. */
  244. if (con->thread->io_count == 1 && con->thread->run_fn)
  245. {
  246. (void) pthread_mutex_unlock(&con->thread->lock);
  247. (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
  248. }
  249. else
  250. {
  251. (void) pthread_mutex_unlock(&con->thread->lock);
  252. }
  253. }
  254. void gearman_server_con_io_remove(gearman_server_con_st *con)
  255. {
  256. (void) pthread_mutex_lock(&con->thread->lock);
  257. if (con->io_list)
  258. {
  259. GEARMAN_LIST_DEL(con->thread->io, con, io_)
  260. con->io_list= false;
  261. }
  262. (void) pthread_mutex_unlock(&con->thread->lock);
  263. }
  264. gearman_server_con_st *
  265. gearman_server_con_io_next(gearman_server_thread_st *thread)
  266. {
  267. gearman_server_con_st *con= thread->io_list;
  268. if (con == NULL)
  269. return NULL;
  270. gearman_server_con_io_remove(con);
  271. return con;
  272. }
  273. void gearman_server_con_proc_add(gearman_server_con_st *con)
  274. {
  275. if (con->proc_list)
  276. return;
  277. (void) pthread_mutex_lock(&con->thread->lock);
  278. GEARMAN_LIST_ADD(con->thread->proc, con, proc_)
  279. con->proc_list= true;
  280. (void) pthread_mutex_unlock(&con->thread->lock);
  281. if (! (Server->proc_shutdown) && !(Server->proc_wakeup))
  282. {
  283. (void) pthread_mutex_lock(&(Server->proc_lock));
  284. Server->proc_wakeup= true;
  285. (void) pthread_cond_signal(&(Server->proc_cond));
  286. (void) pthread_mutex_unlock(&(Server->proc_lock));
  287. }
  288. }
  289. void gearman_server_con_proc_remove(gearman_server_con_st *con)
  290. {
  291. (void) pthread_mutex_lock(&con->thread->lock);
  292. if (con->proc_list)
  293. {
  294. GEARMAN_LIST_DEL(con->thread->proc, con, proc_)
  295. con->proc_list= false;
  296. }
  297. (void) pthread_mutex_unlock(&con->thread->lock);
  298. }
  299. gearman_server_con_st *
  300. gearman_server_con_proc_next(gearman_server_thread_st *thread)
  301. {
  302. gearman_server_con_st *con;
  303. if (thread->proc_list == NULL)
  304. return NULL;
  305. (void) pthread_mutex_lock(&thread->lock);
  306. con= thread->proc_list;
  307. while (con != NULL)
  308. {
  309. GEARMAN_LIST_DEL(thread->proc, con, proc_)
  310. con->proc_list= false;
  311. if (!(con->proc_removed))
  312. break;
  313. con= thread->proc_list;
  314. }
  315. (void) pthread_mutex_unlock(&thread->lock);
  316. return con;
  317. }
  318. void gearmand_connection_set_protocol(gearman_server_con_st *connection, void *context,
  319. gearmand_connection_protocol_context_free_fn *free_fn,
  320. gearmand_packet_pack_fn *pack,
  321. gearmand_packet_unpack_fn *unpack)
  322. {
  323. connection->protocol.context= context;
  324. connection->protocol.context_free_fn= free_fn;
  325. connection->protocol.packet_pack_fn= pack;
  326. connection->protocol.packet_unpack_fn= unpack;
  327. }
  328. void *gearmand_connection_protocol_context(const gearman_server_con_st *connection)
  329. {
  330. return connection->protocol.context;
  331. }