job.c 21 KB


  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Server job definitions
  41. */
  42. #include <config.h>
  43. #include <libgearman-server/common.h>
  44. #include <string.h>
  45. #include <libgearman-server/list.h>
  46. #include <libgearman-server/hash.h>
  47. /*
  48. * Private declarations
  49. */
  50. /**
  51. * @addtogroup gearman_server_job_private Private Server Job Functions
  52. * @ingroup gearman_server_job
  53. * @{
  54. */
  55. /**
  56. * Generate hash key for job handles and unique IDs.
  57. */
  58. static uint32_t _server_job_hash(const char *key, size_t key_size);
  59. /**
  60. * Appends a worker onto the end of a list of workers.
  61. */
  62. static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
  63. gearman_server_worker_st *worker);
  64. /**
  65. * Get a server job structure from the unique ID. If data_size is non-zero,
  66. * then unique points to the workload data and not a real unique key.
  67. */
  68. static gearman_server_job_st *
  69. _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
  70. gearman_server_function_st *server_function,
  71. const char *unique, size_t data_size);
  72. /** @} */
  73. #pragma GCC diagnostic ignored "-Wold-style-cast"
  74. /*
  75. * Public definitions
  76. */
  77. gearman_server_job_st * gearman_server_job_add(gearman_server_st *server,
  78. const char *function_name, size_t function_name_size,
  79. const char *unique, size_t unique_size,
  80. const void *data, size_t data_size,
  81. gearmand_job_priority_t priority,
  82. gearman_server_client_st *server_client,
  83. gearmand_error_t *ret_ptr,
  84. int64_t when)
  85. {
  86. return gearman_server_job_add_reducer(server,
  87. function_name, function_name_size,
  88. unique, unique_size,
  89. NULL, 0, // reducer
  90. data, data_size,
  91. priority, server_client, ret_ptr, when);
  92. }
  93. gearman_server_job_st *
  94. gearman_server_job_add_reducer(gearman_server_st *server,
  95. const char *function_name, size_t function_name_size,
  96. const char *unique, size_t unique_size,
  97. const char *reducer_name, size_t reducer_size,
  98. const void *data, size_t data_size,
  99. gearmand_job_priority_t priority,
  100. gearman_server_client_st *server_client,
  101. gearmand_error_t *ret_ptr,
  102. int64_t when)
  103. {
  104. gearman_server_function_st *server_function= gearman_server_function_get(server, function_name, function_name_size);
  105. if (server_function == NULL)
  106. {
  107. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  108. return NULL;
  109. }
  110. uint32_t key;
  111. gearman_server_job_st *server_job;
  112. if (unique_size == 0)
  113. {
  114. server_job= NULL;
  115. key= 0;
  116. }
  117. else
  118. {
  119. if (unique_size == 1 && *unique == '-')
  120. {
  121. if (data_size == 0)
  122. {
  123. key= 0;
  124. server_job= NULL;
  125. }
  126. else
  127. {
  128. /* Look up job via unique data when unique = '-'. */
  129. key= _server_job_hash(data, data_size);
  130. server_job= _server_job_get_unique(server, key, server_function, data,
  131. data_size);
  132. }
  133. }
  134. else
  135. {
  136. /* Look up job via unique ID first to make sure it's not a duplicate. */
  137. key= _server_job_hash(unique, unique_size);
  138. server_job= _server_job_get_unique(server, key, server_function, unique,
  139. 0);
  140. }
  141. }
  142. if (server_job == NULL)
  143. {
  144. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Comparing queue %u to limit %u for priority %u",
  145. server_function->job_total, server_function->max_queue_size[priority],
  146. priority);
  147. if (server_function->max_queue_size[priority] > 0 &&
  148. server_function->job_total >= server_function->max_queue_size[priority])
  149. {
  150. *ret_ptr= GEARMAN_JOB_QUEUE_FULL;
  151. return NULL;
  152. }
  153. server_job= gearman_server_job_create(server);
  154. if (server_job == NULL)
  155. {
  156. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  157. return NULL;
  158. }
  159. server_job->priority= priority;
  160. server_job->function= server_function;
  161. server_function->job_total++;
  162. int checked_length;
  163. checked_length= snprintf(server_job->job_handle, GEARMAND_JOB_HANDLE_SIZE, "%s:%u",
  164. server->job_handle_prefix, server->job_handle_count);
  165. if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
  166. {
  167. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Job handle plus handle count beyond GEARMAND_JOB_HANDLE_SIZE: %s:%u",
  168. server->job_handle_prefix, server->job_handle_count);
  169. }
  170. checked_length= snprintf(server_job->unique, GEARMAN_UNIQUE_SIZE, "%.*s",
  171. (int)unique_size, unique);
  172. if (checked_length >= GEARMAN_UNIQUE_SIZE || checked_length < 0)
  173. {
  174. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "We received a unique beyond GEARMAN_UNIQUE_SIZE: %.*s", (int)unique_size, unique);
  175. }
  176. server->job_handle_count++;
  177. server_job->data= data;
  178. server_job->data_size= data_size;
  179. server_job->when= when;
  180. if (reducer_size)
  181. {
  182. strncpy(server_job->reducer, reducer_name, reducer_size);
  183. server_job->reducer[reducer_size]= 0;
  184. }
  185. else
  186. {
  187. server_job->reducer[0]= 0;
  188. }
  189. server_job->unique_key= key;
  190. key= key % GEARMAND_JOB_HASH_SIZE;
  191. GEARMAN_HASH_ADD(server->unique, key, server_job, unique_);
  192. key= _server_job_hash(server_job->job_handle,
  193. strlen(server_job->job_handle));
  194. server_job->job_handle_key= key;
  195. key= key % GEARMAND_JOB_HASH_SIZE;
  196. gearmand_hash_server_add(server, key, server_job);
  197. if (server->state.queue_startup)
  198. {
  199. server_job->job_queued= true;
  200. }
  201. else if (server_client == NULL && server->queue._add_fn != NULL)
  202. {
  203. *ret_ptr= (*(server->queue._add_fn))(server,
  204. (void *)server->queue._context,
  205. server_job->unique,
  206. unique_size,
  207. function_name,
  208. function_name_size,
  209. data, data_size, priority,
  210. when);
  211. if (gearmand_failed(*ret_ptr))
  212. {
  213. server_job->data= NULL;
  214. gearman_server_job_free(server_job);
  215. return NULL;
  216. }
  217. if (server->queue._flush_fn != NULL)
  218. {
  219. *ret_ptr= (*(server->queue._flush_fn))(server,
  220. (void *)server->queue._context);
  221. if (*ret_ptr != GEARMAN_SUCCESS)
  222. {
  223. server_job->data= NULL;
  224. gearman_server_job_free(server_job);
  225. return NULL;
  226. }
  227. }
  228. server_job->job_queued= true;
  229. }
  230. *ret_ptr= gearman_server_job_queue(server_job);
  231. if (gearmand_failed(*ret_ptr))
  232. {
  233. if (server_client == NULL && server->queue._done_fn != NULL)
  234. {
  235. /* Do our best to remove the job from the queue. */
  236. (void)(*(server->queue._done_fn))(server,
  237. (void *)server->queue._context,
  238. server_job->unique, unique_size,
  239. server_job->function->function_name,
  240. server_job->function->function_name_size);
  241. }
  242. gearman_server_job_free(server_job);
  243. return NULL;
  244. }
  245. }
  246. else
  247. {
  248. *ret_ptr= GEARMAN_JOB_EXISTS;
  249. }
  250. if (server_client)
  251. {
  252. server_client->job= server_job;
  253. GEARMAN_LIST_ADD(server_job->client, server_client, job_)
  254. }
  255. return server_job;
  256. }
  257. gearman_server_job_st *
  258. gearman_server_job_create(gearman_server_st *server)
  259. {
  260. gearman_server_job_st *server_job;
  261. if (server->free_job_count > 0)
  262. {
  263. server_job= server->free_job_list;
  264. gearmand_server_free_job_list_free(server, server_job);
  265. }
  266. else
  267. {
  268. server_job= (gearman_server_job_st *)malloc(sizeof(gearman_server_job_st));
  269. if (server_job == NULL)
  270. return NULL;
  271. }
  272. server_job->ignore_job= false;
  273. server_job->job_queued= false;
  274. server_job->retries= 0;
  275. server_job->priority= 0;
  276. server_job->job_handle_key= 0;
  277. server_job->unique_key= 0;
  278. server_job->client_count= 0;
  279. server_job->numerator= 0;
  280. server_job->denominator= 0;
  281. server_job->data_size= 0;
  282. server_job->next= NULL;
  283. server_job->prev= NULL;
  284. server_job->unique_next= NULL;
  285. server_job->unique_prev= NULL;
  286. server_job->worker_next= NULL;
  287. server_job->worker_prev= NULL;
  288. server_job->function= NULL;
  289. server_job->function_next= NULL;
  290. server_job->data= NULL;
  291. server_job->client_list= NULL;
  292. server_job->worker= NULL;
  293. server_job->job_handle[0]= 0;
  294. server_job->unique[0]= 0;
  295. return server_job;
  296. }
  297. void gearman_server_job_free(gearman_server_job_st *server_job)
  298. {
  299. uint32_t key;
  300. if (! server_job)
  301. {
  302. return;
  303. }
  304. if (server_job->worker != NULL)
  305. {
  306. server_job->function->job_running--;
  307. }
  308. server_job->function->job_total--;
  309. if (server_job->data != NULL)
  310. free((void *)(server_job->data));
  311. while (server_job->client_list != NULL)
  312. gearman_server_client_free(server_job->client_list);
  313. if (server_job->worker != NULL)
  314. GEARMAN_LIST_DEL(server_job->worker->job, server_job, worker_)
  315. key= server_job->unique_key % GEARMAND_JOB_HASH_SIZE;
  316. GEARMAN_HASH_DEL(Server->unique, key, server_job, unique_);
  317. key= server_job->job_handle_key % GEARMAND_JOB_HASH_SIZE;
  318. gearmand_hash_server_free(Server, key, server_job);
  319. if (Server->free_job_count < GEARMAN_MAX_FREE_SERVER_JOB)
  320. {
  321. gearmand_server_job_list_add(Server, server_job);
  322. }
  323. else
  324. {
  325. free(server_job);
  326. }
  327. }
  328. gearman_server_job_st *gearman_server_job_get(gearman_server_st *server,
  329. const char *job_handle,
  330. gearman_server_con_st *worker_con)
  331. {
  332. uint32_t key;
  333. key= _server_job_hash(job_handle, strlen(job_handle));
  334. for (gearman_server_job_st *server_job= server->job_hash[key % GEARMAND_JOB_HASH_SIZE];
  335. server_job != NULL; server_job= server_job->next)
  336. {
  337. if (server_job->job_handle_key == key &&
  338. !strcmp(server_job->job_handle, job_handle))
  339. {
  340. /* Check to make sure the worker asking for the job still owns the job. */
  341. if (worker_con != NULL &&
  342. (server_job->worker == NULL || server_job->worker->con != worker_con))
  343. {
  344. return NULL;
  345. }
  346. return server_job;
  347. }
  348. }
  349. return NULL;
  350. }
  351. gearman_server_job_st *
  352. gearman_server_job_peek(gearman_server_con_st *server_con)
  353. {
  354. for (gearman_server_worker_st *server_worker= server_con->worker_list;
  355. server_worker != NULL;
  356. server_worker= server_worker->con_next)
  357. {
  358. if (server_worker->function->job_count != 0)
  359. {
  360. for (gearmand_job_priority_t priority= GEARMAND_JOB_PRIORITY_HIGH;
  361. priority != GEARMAND_JOB_PRIORITY_MAX; priority++)
  362. {
  363. gearman_server_job_st *server_job;
  364. server_job= server_worker->function->job_list[priority];
  365. int64_t current_time= (int64_t)time(NULL);
  366. while(server_job &&
  367. server_job->when != 0 &&
  368. server_job->when > current_time)
  369. {
  370. server_job= server_job->function_next;
  371. }
  372. if (server_job != NULL)
  373. {
  374. if (server_job->ignore_job)
  375. {
  376. /* This is only happens when a client disconnects from a foreground
  377. job. We do this because we don't want to run the job anymore. */
  378. server_job->ignore_job= false;
  379. gearman_server_job_free(gearman_server_job_take(server_con));
  380. return gearman_server_job_peek(server_con);
  381. }
  382. return server_job;
  383. }
  384. }
  385. }
  386. }
  387. return NULL;
  388. }
  389. static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
  390. gearman_server_worker_st *worker)
  391. {
  392. worker->con_prev= NULL;
  393. worker->con_next= list;
  394. while (worker->con_next != NULL)
  395. {
  396. worker->con_prev= worker->con_next;
  397. worker->con_next= worker->con_next->con_next;
  398. }
  399. if (worker->con_prev)
  400. worker->con_prev->con_next= worker;
  401. }
  402. gearman_server_job_st *gearman_server_job_take(gearman_server_con_st *server_con)
  403. {
  404. for (gearman_server_worker_st *server_worker= server_con->worker_list; server_worker; server_worker= server_worker->con_next)
  405. {
  406. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Jobs available %lu", (unsigned long)(server_worker->function->job_count));
  407. if (server_worker->function->job_count)
  408. {
  409. if (server_worker == NULL)
  410. {
  411. return NULL;
  412. }
  413. if (Server->flags.round_robin)
  414. {
  415. GEARMAN_LIST_DEL(server_con->worker, server_worker, con_)
  416. _server_con_worker_list_append(server_con->worker_list, server_worker);
  417. ++server_con->worker_count;
  418. if (server_con->worker_list == NULL)
  419. {
  420. server_con->worker_list= server_worker;
  421. }
  422. }
  423. gearmand_job_priority_t priority;
  424. for (priority= GEARMAND_JOB_PRIORITY_HIGH; priority < GEARMAND_JOB_PRIORITY_LOW; priority++)
  425. {
  426. if (server_worker->function->job_list[priority])
  427. {
  428. break;
  429. }
  430. }
  431. gearman_server_job_st *server_job= server_job= server_worker->function->job_list[priority];
  432. gearman_server_job_st *previous_job= server_job;
  433. int64_t current_time= (int64_t)time(NULL);
  434. while (server_job && server_job->when != 0 && server_job->when > current_time)
  435. {
  436. previous_job= server_job;
  437. server_job= server_job->function_next;
  438. }
  439. if (server_job)
  440. {
  441. if (server_job->function->job_list[priority] == server_job)
  442. {
  443. // If it's the head of the list, advance it
  444. server_job->function->job_list[priority]= server_job->function_next;
  445. }
  446. else
  447. {
  448. // Otherwise, just remove the item from the list
  449. previous_job->function_next= server_job->function_next;
  450. }
  451. // If it's the tail of the list, move the tail back
  452. if (server_job->function->job_end[priority] == server_job)
  453. {
  454. server_job->function->job_end[priority]= previous_job;
  455. }
  456. server_job->function->job_count--;
  457. server_job->worker= server_worker;
  458. GEARMAN_LIST_ADD(server_worker->job, server_job, worker_)
  459. server_job->function->job_running++;
  460. if (server_job->ignore_job)
  461. {
  462. gearman_server_job_free(server_job);
  463. return gearman_server_job_take(server_con);
  464. }
  465. return server_job;
  466. }
  467. }
  468. }
  469. return NULL;
  470. }
  471. gearmand_error_t gearman_server_job_queue(gearman_server_job_st *job)
  472. {
  473. gearman_server_client_st *client;
  474. gearman_server_worker_st *worker;
  475. uint32_t noop_sent;
  476. gearmand_error_t ret;
  477. if (job->worker)
  478. {
  479. job->retries++;
  480. if (Server->job_retries == job->retries)
  481. {
  482. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  483. "Dropped job due to max retry count: %s %s",
  484. job->job_handle, job->unique);
  485. for (client= job->client_list; client != NULL; client= client->job_next)
  486. {
  487. ret= gearman_server_io_packet_add(client->con, false,
  488. GEARMAN_MAGIC_RESPONSE,
  489. GEARMAN_COMMAND_WORK_FAIL,
  490. job->job_handle,
  491. (size_t)strlen(job->job_handle),
  492. NULL);
  493. if (gearmand_failed(ret))
  494. {
  495. return ret;
  496. }
  497. }
  498. /* Remove from persistent queue if one exists. */
  499. if (job->job_queued && Server->queue._done_fn != NULL)
  500. {
  501. ret= (*(Server->queue._done_fn))(Server,
  502. (void *)Server->queue._context,
  503. job->unique,
  504. (size_t)strlen(job->unique),
  505. job->function->function_name,
  506. job->function->function_name_size);
  507. if (ret != GEARMAN_SUCCESS)
  508. return ret;
  509. }
  510. gearman_server_job_free(job);
  511. return GEARMAN_SUCCESS;
  512. }
  513. GEARMAN_LIST_DEL(job->worker->job, job, worker_)
  514. job->worker= NULL;
  515. job->function->job_running--;
  516. job->function_next= NULL;
  517. job->numerator= 0;
  518. job->denominator= 0;
  519. }
  520. /* Queue NOOP for possible sleeping workers. */
  521. if (job->function->worker_list != NULL)
  522. {
  523. worker= job->function->worker_list;
  524. noop_sent= 0;
  525. do
  526. {
  527. if (worker->con->is_sleeping && ! (worker->con->is_noop_sent))
  528. {
  529. ret= gearman_server_io_packet_add(worker->con, false,
  530. GEARMAN_MAGIC_RESPONSE,
  531. GEARMAN_COMMAND_NOOP, NULL);
  532. if (gearmand_failed(ret))
  533. {
  534. gearmand_gerror("gearman_server_io_packet_add", ret);
  535. return ret;
  536. }
  537. worker->con->is_noop_sent= true;
  538. noop_sent++;
  539. }
  540. worker= worker->function_next;
  541. }
  542. while (worker != job->function->worker_list &&
  543. (Server->worker_wakeup == 0 ||
  544. noop_sent < Server->worker_wakeup));
  545. job->function->worker_list= worker;
  546. }
  547. /* Queue the job to be run. */
  548. if (job->function->job_list[job->priority] == NULL)
  549. {
  550. job->function->job_list[job->priority]= job;
  551. }
  552. else
  553. {
  554. job->function->job_end[job->priority]->function_next= job;
  555. }
  556. job->function->job_end[job->priority]= job;
  557. job->function->job_count++;
  558. return GEARMAN_SUCCESS;
  559. }
  560. /*
  561. * Private definitions
  562. */
  563. static uint32_t _server_job_hash(const char *key, size_t key_size)
  564. {
  565. const char *ptr= key;
  566. int32_t value= 0;
  567. while (key_size--)
  568. {
  569. value += (int32_t)*ptr++;
  570. value += (value << 10);
  571. value ^= (value >> 6);
  572. }
  573. value += (value << 3);
  574. value ^= (value >> 11);
  575. value += (value << 15);
  576. return (uint32_t)(value == 0 ? 1 : value);
  577. }
  578. static gearman_server_job_st *
  579. _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
  580. gearman_server_function_st *server_function,
  581. const char *unique, size_t data_size)
  582. {
  583. gearman_server_job_st *server_job;
  584. for (server_job= server->unique_hash[unique_key % GEARMAND_JOB_HASH_SIZE];
  585. server_job != NULL; server_job= server_job->unique_next)
  586. {
  587. if (data_size == 0)
  588. {
  589. if (server_job->function == server_function &&
  590. server_job->unique_key == unique_key &&
  591. !strcmp(server_job->unique, unique))
  592. {
  593. return server_job;
  594. }
  595. }
  596. else
  597. {
  598. if (server_job->function == server_function &&
  599. server_job->unique_key == unique_key &&
  600. server_job->data_size == data_size &&
  601. !memcmp(server_job->data, unique, data_size))
  602. {
  603. return server_job;
  604. }
  605. }
  606. }
  607. return NULL;
  608. }