connection.c 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Server connection definitions
  11. */
  12. #include "common.h"
  13. static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
  14. gearmand_error_t *ret);
  15. /*
  16. * Public definitions
  17. */
  18. gearman_server_con_st *gearman_server_con_add(gearman_server_thread_st *thread, gearmand_con_st *dcon, gearmand_error_t *ret)
  19. {
  20. gearman_server_con_st *con;
  21. con= _server_con_create(thread, dcon, ret);
  22. if (con == NULL)
  23. {
  24. return NULL;
  25. }
  26. if ((*ret= gearman_io_set_fd(&(con->con), dcon->fd)) != GEARMAN_SUCCESS)
  27. {
  28. gearman_server_con_free(con);
  29. return NULL;
  30. }
  31. *ret= gearmand_io_set_events(con, POLLIN);
  32. if (*ret != GEARMAN_SUCCESS)
  33. {
  34. gearmand_gerror("gearmand_io_set_events", *ret);
  35. gearman_server_con_free(con);
  36. return NULL;
  37. }
  38. return con;
  39. }
  40. static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
  41. gearmand_error_t *ret)
  42. {
  43. gearman_server_con_st *con;
  44. if (thread->free_con_count > 0)
  45. {
  46. con= thread->free_con_list;
  47. GEARMAN_LIST_DEL(thread->free_con, con,)
  48. }
  49. else
  50. {
  51. con= (gearman_server_con_st *)malloc(sizeof(gearman_server_con_st));
  52. if (con == NULL)
  53. {
  54. gearmand_perror("malloc");
  55. *ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  56. return NULL;
  57. }
  58. }
  59. assert(con);
  60. if (!con)
  61. {
  62. gearmand_error("Neigther an allocated gearman_server_con_st() or free listed could be found");
  63. *ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  64. return NULL;
  65. }
  66. gearmand_connection_options_t options[] = { GEARMAND_CON_MAX };
  67. gearmand_connection_init(thread->gearman, &(con->con), dcon, options);
  68. con->con.root= con;
  69. con->is_sleeping= false;
  70. con->is_exceptions= false;
  71. con->is_dead= false;
  72. con->is_noop_sent= false;
  73. con->ret= 0;
  74. con->io_list= false;
  75. con->proc_list= false;
  76. con->proc_removed= false;
  77. con->io_packet_count= 0;
  78. con->proc_packet_count= 0;
  79. con->worker_count= 0;
  80. con->client_count= 0;
  81. con->thread= thread;
  82. con->packet= NULL;
  83. con->io_packet_list= NULL;
  84. con->io_packet_end= NULL;
  85. con->proc_packet_list= NULL;
  86. con->proc_packet_end= NULL;
  87. con->io_next= NULL;
  88. con->io_prev= NULL;
  89. con->proc_next= NULL;
  90. con->proc_prev= NULL;
  91. con->worker_list= NULL;
  92. con->client_list= NULL;
  93. con->_host= dcon->host;
  94. con->_port= dcon->port;
  95. strcpy(con->id, "-");
  96. con->protocol.context= NULL;
  97. con->protocol.context_free_fn= NULL;
  98. con->protocol.packet_pack_fn= gearmand_packet_pack;
  99. con->protocol.packet_unpack_fn= gearmand_packet_unpack;
  100. int error;
  101. if (! (error= pthread_mutex_lock(&thread->lock)))
  102. {
  103. GEARMAN_LIST_ADD(thread->con, con,);
  104. (void) pthread_mutex_unlock(&thread->lock);
  105. }
  106. else
  107. {
  108. errno= error;
  109. gearmand_perror("pthread_mutex_lock");
  110. gearman_server_con_free(con);
  111. *ret= GEARMAN_ERRNO;
  112. return NULL;
  113. }
  114. return con;
  115. }
  116. void gearman_server_con_free(gearman_server_con_st *con)
  117. {
  118. gearman_server_thread_st *thread= con->thread;
  119. gearman_server_packet_st *packet;
  120. con->_host= NULL;
  121. con->_port= NULL;
  122. if (Server->flags.threaded &&
  123. !(con->proc_removed) && !(Server->proc_shutdown))
  124. {
  125. con->is_dead= true;
  126. con->is_sleeping= false;
  127. con->is_exceptions= false;
  128. con->is_noop_sent= false;
  129. gearman_server_con_proc_add(con);
  130. return;
  131. }
  132. gearmand_io_free(&(con->con));
  133. if (con->protocol.context != NULL && con->protocol.context_free_fn != NULL)
  134. {
  135. con->protocol.context_free_fn(con, (void *)con->protocol.context);
  136. }
  137. if (con->proc_list)
  138. {
  139. gearman_server_con_proc_remove(con);
  140. }
  141. if (con->io_list)
  142. {
  143. gearman_server_con_io_remove(con);
  144. }
  145. if (con->packet != NULL)
  146. {
  147. if (&(con->packet->packet) != con->con.recv_packet)
  148. {
  149. gearmand_packet_free(&(con->packet->packet));
  150. }
  151. gearman_server_packet_free(con->packet, con->thread, true);
  152. }
  153. while (con->io_packet_list != NULL)
  154. {
  155. gearman_server_io_packet_remove(con);
  156. }
  157. while (con->proc_packet_list != NULL)
  158. {
  159. packet= gearman_server_proc_packet_remove(con);
  160. gearmand_packet_free(&(packet->packet));
  161. gearman_server_packet_free(packet, con->thread, true);
  162. }
  163. gearman_server_con_free_workers(con);
  164. while (con->client_list != NULL)
  165. {
  166. gearman_server_client_free(con->client_list);
  167. }
  168. (void) pthread_mutex_lock(&thread->lock);
  169. GEARMAN_LIST_DEL(con->thread->con, con,)
  170. (void) pthread_mutex_unlock(&thread->lock);
  171. if (thread->free_con_count < GEARMAN_MAX_FREE_SERVER_CON)
  172. {
  173. GEARMAN_LIST_ADD(thread->free_con, con,)
  174. }
  175. else
  176. {
  177. gearmand_crazy("free");
  178. free(con);
  179. }
  180. }
  181. gearmand_io_st *gearman_server_con_con(gearman_server_con_st *con)
  182. {
  183. return &con->con;
  184. }
  185. gearmand_con_st *gearman_server_con_data(gearman_server_con_st *con)
  186. {
  187. return gearman_io_context(&(con->con));
  188. }
  189. const char *gearman_server_con_id(gearman_server_con_st *con)
  190. {
  191. return con->id;
  192. }
  193. void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
  194. size_t size)
  195. {
  196. if (size >= GEARMAN_SERVER_CON_ID_SIZE)
  197. size= GEARMAN_SERVER_CON_ID_SIZE - 1;
  198. memcpy(con->id, id, size);
  199. con->id[size]= 0;
  200. }
  201. void gearman_server_con_free_worker(gearman_server_con_st *con,
  202. char *function_name,
  203. size_t function_name_size)
  204. {
  205. gearman_server_worker_st *worker= con->worker_list;
  206. gearman_server_worker_st *prev_worker= NULL;
  207. while (worker != NULL)
  208. {
  209. if (worker->function->function_name_size == function_name_size &&
  210. !memcmp(worker->function->function_name, function_name,
  211. function_name_size))
  212. {
  213. gearman_server_worker_free(worker);
  214. /* Set worker to the last kept worker, or the beginning of the list. */
  215. if (prev_worker == NULL)
  216. worker= con->worker_list;
  217. else
  218. worker= prev_worker;
  219. }
  220. else
  221. {
  222. /* Save this so we don't need to scan the list again if one is removed. */
  223. prev_worker= worker;
  224. worker= worker->con_next;
  225. }
  226. }
  227. }
  228. void gearman_server_con_free_workers(gearman_server_con_st *con)
  229. {
  230. while (con->worker_list != NULL)
  231. gearman_server_worker_free(con->worker_list);
  232. }
  233. void gearman_server_con_io_add(gearman_server_con_st *con)
  234. {
  235. if (con->io_list)
  236. return;
  237. (void) pthread_mutex_lock(&con->thread->lock);
  238. GEARMAN_LIST_ADD(con->thread->io, con, io_)
  239. con->io_list= true;
  240. /* Looks funny, but need to check io_count locked, but call run unlocked. */
  241. if (con->thread->io_count == 1 && con->thread->run_fn)
  242. {
  243. (void) pthread_mutex_unlock(&con->thread->lock);
  244. (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
  245. }
  246. else
  247. {
  248. (void) pthread_mutex_unlock(&con->thread->lock);
  249. }
  250. }
  251. void gearman_server_con_io_remove(gearman_server_con_st *con)
  252. {
  253. (void) pthread_mutex_lock(&con->thread->lock);
  254. if (con->io_list)
  255. {
  256. GEARMAN_LIST_DEL(con->thread->io, con, io_)
  257. con->io_list= false;
  258. }
  259. (void) pthread_mutex_unlock(&con->thread->lock);
  260. }
  261. gearman_server_con_st *
  262. gearman_server_con_io_next(gearman_server_thread_st *thread)
  263. {
  264. gearman_server_con_st *con= thread->io_list;
  265. if (con == NULL)
  266. return NULL;
  267. gearman_server_con_io_remove(con);
  268. return con;
  269. }
  270. void gearman_server_con_proc_add(gearman_server_con_st *con)
  271. {
  272. if (con->proc_list)
  273. return;
  274. (void) pthread_mutex_lock(&con->thread->lock);
  275. GEARMAN_LIST_ADD(con->thread->proc, con, proc_)
  276. con->proc_list= true;
  277. (void) pthread_mutex_unlock(&con->thread->lock);
  278. if (! (Server->proc_shutdown) &&
  279. !(Server->proc_wakeup))
  280. {
  281. (void) pthread_mutex_lock(&(Server->proc_lock));
  282. Server->proc_wakeup= true;
  283. (void) pthread_cond_signal(&(Server->proc_cond));
  284. (void) pthread_mutex_unlock(&(Server->proc_lock));
  285. }
  286. }
  287. void gearman_server_con_proc_remove(gearman_server_con_st *con)
  288. {
  289. (void) pthread_mutex_lock(&con->thread->lock);
  290. if (con->proc_list)
  291. {
  292. GEARMAN_LIST_DEL(con->thread->proc, con, proc_)
  293. con->proc_list= false;
  294. }
  295. (void) pthread_mutex_unlock(&con->thread->lock);
  296. }
  297. gearman_server_con_st *
  298. gearman_server_con_proc_next(gearman_server_thread_st *thread)
  299. {
  300. gearman_server_con_st *con;
  301. if (thread->proc_list == NULL)
  302. return NULL;
  303. (void) pthread_mutex_lock(&thread->lock);
  304. con= thread->proc_list;
  305. while (con != NULL)
  306. {
  307. GEARMAN_LIST_DEL(thread->proc, con, proc_)
  308. con->proc_list= false;
  309. if (!(con->proc_removed))
  310. break;
  311. con= thread->proc_list;
  312. }
  313. (void) pthread_mutex_unlock(&thread->lock);
  314. return con;
  315. }
  316. void gearmand_connection_set_protocol(gearman_server_con_st *connection, void *context,
  317. gearmand_connection_protocol_context_free_fn *free_fn,
  318. gearmand_packet_pack_fn *pack,
  319. gearmand_packet_unpack_fn *unpack)
  320. {
  321. connection->protocol.context= context;
  322. connection->protocol.context_free_fn= free_fn;
  323. connection->protocol.packet_pack_fn= pack;
  324. connection->protocol.packet_unpack_fn= unpack;
  325. }
  326. void *gearmand_connection_protocol_context(const gearman_server_con_st *connection)
  327. {
  328. return connection->protocol.context;
  329. }