job.c 14 KB


  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Server job definitions
  41. */
  42. #include "gear_config.h"
  43. #include "libgearman-server/common.h"
  44. #include <string.h>
  45. #include <libgearman-server/list.h>
  46. #include <libgearman-server/hash.h>
  47. #include <libgearman-server/queue.h>
  48. /*
  49. * Private declarations
  50. */
  51. /**
  52. * @addtogroup gearman_server_job_private Private Server Job Functions
  53. * @ingroup gearman_server_job
  54. * @{
  55. */
  56. /**
  57. * Get a server job structure from the unique ID. If data_size is non-zero,
  58. * then unique points to the workload data and not a real unique key.
  59. */
  60. static gearman_server_job_st * _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
  61. gearman_server_function_st *server_function,
  62. const char *unique, size_t data_size)
  63. {
  64. gearman_server_job_st *server_job;
  65. for (server_job= server->unique_hash[unique_key % GEARMAND_JOB_HASH_SIZE];
  66. server_job != NULL; server_job= server_job->unique_next)
  67. {
  68. if (data_size == 0)
  69. {
  70. if (server_job->function == server_function &&
  71. server_job->unique_key == unique_key &&
  72. !strcmp(server_job->unique, unique))
  73. {
  74. return server_job;
  75. }
  76. }
  77. else
  78. {
  79. if (server_job->function == server_function &&
  80. server_job->unique_key == unique_key &&
  81. server_job->data_size == data_size &&
  82. memcmp(server_job->data, unique, data_size) == 0)
  83. {
  84. return server_job;
  85. }
  86. }
  87. }
  88. return NULL;
  89. }
  90. /** @} */
  91. #pragma GCC diagnostic ignored "-Wold-style-cast"
  92. /*
  93. * Public definitions
  94. */
  95. gearman_server_job_st * gearman_server_job_add(gearman_server_st *server,
  96. const char *function_name, size_t function_name_size,
  97. const char *unique, size_t unique_size,
  98. const void *data, size_t data_size,
  99. gearman_job_priority_t priority,
  100. gearman_server_client_st *server_client,
  101. gearmand_error_t *ret_ptr,
  102. int64_t when)
  103. {
  104. return gearman_server_job_add_reducer(server,
  105. function_name, function_name_size,
  106. unique, unique_size,
  107. NULL, 0, // reducer
  108. data, data_size,
  109. priority, server_client, ret_ptr, when);
  110. }
  111. gearman_server_job_st *
  112. gearman_server_job_add_reducer(gearman_server_st *server,
  113. const char *function_name, size_t function_name_size,
  114. const char *unique, size_t unique_size,
  115. const char *reducer_name, size_t reducer_size,
  116. const void *data, size_t data_size,
  117. gearman_job_priority_t priority,
  118. gearman_server_client_st *server_client,
  119. gearmand_error_t *ret_ptr,
  120. int64_t when)
  121. {
  122. gearman_server_function_st *server_function= gearman_server_function_get(server, function_name, function_name_size);
  123. if (server_function == NULL)
  124. {
  125. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  126. return NULL;
  127. }
  128. uint32_t key;
  129. gearman_server_job_st *server_job;
  130. if (unique_size == 0)
  131. {
  132. server_job= NULL;
  133. key= 0;
  134. }
  135. else
  136. {
  137. if (unique_size == 1 && *unique == '-')
  138. {
  139. if (data_size == 0)
  140. {
  141. key= 0;
  142. server_job= NULL;
  143. }
  144. else
  145. {
  146. /* Look up job via unique data when unique = '-'. */
  147. key= _server_job_hash((const char*)data, data_size);
  148. server_job= _server_job_get_unique(server, key, server_function, (const char*)data, data_size);
  149. }
  150. }
  151. else
  152. {
  153. /* Look up job via unique ID first to make sure it's not a duplicate. */
  154. key= _server_job_hash(unique, unique_size);
  155. server_job= _server_job_get_unique(server, key, server_function, unique, 0);
  156. }
  157. }
  158. if (server_job == NULL)
  159. {
  160. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Comparing queue %u to limit %u for priority %u",
  161. server_function->job_total, server_function->max_queue_size[priority],
  162. priority);
  163. if (server_function->max_queue_size[priority] > 0 &&
  164. server_function->job_total >= server_function->max_queue_size[priority])
  165. {
  166. *ret_ptr= GEARMAN_JOB_QUEUE_FULL;
  167. return NULL;
  168. }
  169. server_job= gearman_server_job_create(server);
  170. if (server_job == NULL)
  171. {
  172. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  173. return NULL;
  174. }
  175. server_job->priority= priority;
  176. server_job->function= server_function;
  177. server_function->job_total++;
  178. int checked_length;
  179. checked_length= snprintf(server_job->job_handle, GEARMAND_JOB_HANDLE_SIZE, "%s:%u",
  180. server->job_handle_prefix, server->job_handle_count);
  181. if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
  182. {
  183. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Job handle plus handle count beyond GEARMAND_JOB_HANDLE_SIZE: %s:%u",
  184. server->job_handle_prefix, server->job_handle_count);
  185. }
  186. server_job->unique_length= unique_size;
  187. checked_length= snprintf(server_job->unique, GEARMAN_MAX_UNIQUE_SIZE, "%.*s",
  188. (int)unique_size, unique);
  189. if (checked_length >= GEARMAN_MAX_UNIQUE_SIZE || checked_length < 0)
  190. {
  191. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "We received a unique beyond GEARMAN_MAX_UNIQUE_SIZE: %.*s", (int)unique_size, unique);
  192. }
  193. server->job_handle_count++;
  194. server_job->data= data;
  195. server_job->data_size= data_size;
  196. server_job->when= when;
  197. if (reducer_size)
  198. {
  199. strncpy(server_job->reducer, reducer_name, reducer_size);
  200. server_job->reducer[reducer_size]= 0;
  201. }
  202. else
  203. {
  204. server_job->reducer[0]= 0;
  205. }
  206. server_job->unique_key= key;
  207. key= key % GEARMAND_JOB_HASH_SIZE;
  208. GEARMAN_HASH_ADD(server->unique, key, server_job, unique_);
  209. key= _server_job_hash(server_job->job_handle,
  210. strlen(server_job->job_handle));
  211. server_job->job_handle_key= key;
  212. key= key % GEARMAND_JOB_HASH_SIZE;
  213. gearmand_hash_server_add(server, key, server_job);
  214. if (server->state.queue_startup)
  215. {
  216. server_job->job_queued= true;
  217. }
  218. else if (server_client == NULL)
  219. {
  220. *ret_ptr= gearman_queue_add(server,
  221. server_job->unique, unique_size,
  222. function_name,
  223. function_name_size,
  224. data, data_size, priority,
  225. when);
  226. if (gearmand_failed(*ret_ptr))
  227. {
  228. server_job->data= NULL;
  229. gearman_server_job_free(server_job);
  230. return NULL;
  231. }
  232. {
  233. *ret_ptr= gearman_queue_flush(server);
  234. if (*ret_ptr != GEARMAN_SUCCESS)
  235. {
  236. server_job->data= NULL;
  237. gearman_server_job_free(server_job);
  238. return NULL;
  239. }
  240. }
  241. server_job->job_queued= true;
  242. }
  243. *ret_ptr= gearman_server_job_queue(server_job);
  244. if (gearmand_failed(*ret_ptr))
  245. {
  246. if (server_client == NULL)
  247. {
  248. /* Do our best to remove the job from the queue. */
  249. (void)gearman_queue_done(server,
  250. server_job->unique, unique_size,
  251. server_job->function->function_name,
  252. server_job->function->function_name_size);
  253. }
  254. gearman_server_job_free(server_job);
  255. return NULL;
  256. }
  257. }
  258. else
  259. {
  260. *ret_ptr= GEARMAN_JOB_EXISTS;
  261. }
  262. if (server_client)
  263. {
  264. server_client->job= server_job;
  265. GEARMAN_LIST_ADD(server_job->client, server_client, job_)
  266. }
  267. return server_job;
  268. }
  269. void gearman_server_job_free(gearman_server_job_st *server_job)
  270. {
  271. if (server_job == NULL)
  272. {
  273. return;
  274. }
  275. if (server_job->worker != NULL)
  276. {
  277. server_job->function->job_running--;
  278. }
  279. server_job->function->job_total--;
  280. if (server_job->data != NULL)
  281. {
  282. free((void *)(server_job->data));
  283. server_job->data= NULL;
  284. }
  285. while (server_job->client_list != NULL)
  286. {
  287. gearman_server_client_free(server_job->client_list);
  288. }
  289. if (server_job->worker != NULL)
  290. {
  291. GEARMAN_LIST_DEL(server_job->worker->job, server_job, worker_)
  292. }
  293. uint32_t key= server_job->unique_key % GEARMAND_JOB_HASH_SIZE;
  294. GEARMAN_HASH_DEL(Server->unique, key, server_job, unique_);
  295. key= server_job->job_handle_key % GEARMAND_JOB_HASH_SIZE;
  296. gearmand_hash_server_free(Server, key, server_job);
  297. if (Server->free_job_count < GEARMAN_MAX_FREE_SERVER_JOB)
  298. {
  299. gearmand_server_job_list_add(Server, server_job);
  300. }
  301. else
  302. {
  303. destroy_gearman_server_job_st(server_job);
  304. }
  305. }
  306. gearmand_error_t gearman_server_job_queue(gearman_server_job_st *job)
  307. {
  308. if (job->worker)
  309. {
  310. job->retries++;
  311. if (Server->job_retries == job->retries)
  312. {
  313. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  314. "Dropped job due to max retry count: %s %.*s",
  315. job->job_handle,
  316. (int)job->unique_length, job->unique);
  317. gearman_server_client_st *client;
  318. for (client= job->client_list; client != NULL; client= client->job_next)
  319. {
  320. gearmand_error_t ret= gearman_server_io_packet_add(client->con, false,
  321. GEARMAN_MAGIC_RESPONSE,
  322. GEARMAN_COMMAND_WORK_FAIL,
  323. job->job_handle,
  324. (size_t)strlen(job->job_handle),
  325. NULL);
  326. if (gearmand_failed(ret))
  327. {
  328. return ret;
  329. }
  330. }
  331. /* Remove from persistent queue if one exists. */
  332. if (job->job_queued)
  333. {
  334. gearmand_error_t ret= gearman_queue_done(Server,
  335. job->unique, job->unique_length,
  336. job->function->function_name,
  337. job->function->function_name_size);
  338. if (ret != GEARMAN_SUCCESS)
  339. {
  340. return ret;
  341. }
  342. }
  343. gearman_server_job_free(job);
  344. return GEARMAN_SUCCESS;
  345. }
  346. GEARMAN_LIST_DEL(job->worker->job, job, worker_)
  347. job->worker= NULL;
  348. job->function->job_running--;
  349. job->function_next= NULL;
  350. job->numerator= 0;
  351. job->denominator= 0;
  352. }
  353. /* Queue NOOP for possible sleeping workers. */
  354. if (job->function->worker_list != NULL)
  355. {
  356. gearman_server_worker_st *worker= job->function->worker_list;
  357. uint32_t noop_sent= 0;
  358. do
  359. {
  360. if (worker->con->is_sleeping && ! (worker->con->is_noop_sent))
  361. {
  362. gearmand_error_t ret= gearman_server_io_packet_add(worker->con, false,
  363. GEARMAN_MAGIC_RESPONSE,
  364. GEARMAN_COMMAND_NOOP, NULL);
  365. if (gearmand_failed(ret))
  366. {
  367. gearmand_gerror("gearman_server_io_packet_add", ret);
  368. return ret;
  369. }
  370. worker->con->is_noop_sent= true;
  371. noop_sent++;
  372. }
  373. worker= worker->function_next;
  374. }
  375. while (worker != job->function->worker_list &&
  376. (Server->worker_wakeup == 0 ||
  377. noop_sent < Server->worker_wakeup));
  378. job->function->worker_list= worker;
  379. }
  380. /* Queue the job to be run. */
  381. if (job->function->job_list[job->priority] == NULL)
  382. {
  383. job->function->job_list[job->priority]= job;
  384. }
  385. else
  386. {
  387. job->function->job_end[job->priority]->function_next= job;
  388. }
  389. job->function->job_end[job->priority]= job;
  390. job->function->job_count++;
  391. return GEARMAN_SUCCESS;
  392. }
  393. /*
  394. * Private definitions
  395. */