job.c 21 KB


  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Server job definitions
  41. */
  42. #include <config.h>
  43. #include <libgearman-server/common.h>
  44. #include <string.h>
  45. #include <libgearman-server/list.h>
  46. #include <libgearman-server/hash.h>
  47. /*
  48. * Private declarations
  49. */
  50. /**
  51. * @addtogroup gearman_server_job_private Private Server Job Functions
  52. * @ingroup gearman_server_job
  53. * @{
  54. */
  55. /**
  56. * Generate hash key for job handles and unique IDs.
  57. */
  58. static uint32_t _server_job_hash(const char *key, size_t key_size);
  59. /**
  60. * Appends a worker onto the end of a list of workers.
  61. */
  62. static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
  63. gearman_server_worker_st *worker);
  64. /**
  65. * Get a server job structure from the unique ID. If data_size is non-zero,
  66. * then unique points to the workload data and not a real unique key.
  67. */
  68. static gearman_server_job_st *
  69. _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
  70. gearman_server_function_st *server_function,
  71. const char *unique, size_t data_size);
  72. /** @} */
  73. #pragma GCC diagnostic ignored "-Wold-style-cast"
  74. /*
  75. * Public definitions
  76. */
  77. gearman_server_job_st * gearman_server_job_add(gearman_server_st *server,
  78. const char *function_name, size_t function_name_size,
  79. const char *unique, size_t unique_size,
  80. const void *data, size_t data_size,
  81. gearmand_job_priority_t priority,
  82. gearman_server_client_st *server_client,
  83. gearmand_error_t *ret_ptr,
  84. int64_t when)
  85. {
  86. return gearman_server_job_add_reducer(server,
  87. function_name, function_name_size,
  88. unique, unique_size,
  89. NULL, 0, // reducer
  90. data, data_size,
  91. priority, server_client, ret_ptr, when);
  92. }
  93. gearman_server_job_st *
  94. gearman_server_job_add_reducer(gearman_server_st *server,
  95. const char *function_name, size_t function_name_size,
  96. const char *unique, size_t unique_size,
  97. const char *reducer_name, size_t reducer_size,
  98. const void *data, size_t data_size,
  99. gearmand_job_priority_t priority,
  100. gearman_server_client_st *server_client,
  101. gearmand_error_t *ret_ptr,
  102. int64_t when)
  103. {
  104. gearman_server_function_st *server_function= gearman_server_function_get(server, function_name, function_name_size);
  105. if (server_function == NULL)
  106. {
  107. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  108. return NULL;
  109. }
  110. uint32_t key;
  111. gearman_server_job_st *server_job;
  112. if (unique_size == 0)
  113. {
  114. server_job= NULL;
  115. key= 0;
  116. }
  117. else
  118. {
  119. if (unique_size == 1 && *unique == '-')
  120. {
  121. if (data_size == 0)
  122. {
  123. key= 0;
  124. server_job= NULL;
  125. }
  126. else
  127. {
  128. /* Look up job via unique data when unique = '-'. */
  129. key= _server_job_hash(data, data_size);
  130. server_job= _server_job_get_unique(server, key, server_function, data,
  131. data_size);
  132. }
  133. }
  134. else
  135. {
  136. /* Look up job via unique ID first to make sure it's not a duplicate. */
  137. key= _server_job_hash(unique, unique_size);
  138. server_job= _server_job_get_unique(server, key, server_function, unique,
  139. 0);
  140. }
  141. }
  142. if (server_job == NULL)
  143. {
  144. if (server_function->max_queue_size > 0 &&
  145. server_function->job_total >= server_function->max_queue_size)
  146. {
  147. *ret_ptr= GEARMAN_JOB_QUEUE_FULL;
  148. return NULL;
  149. }
  150. server_job= gearman_server_job_create(server);
  151. if (server_job == NULL)
  152. {
  153. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  154. return NULL;
  155. }
  156. server_job->priority= priority;
  157. server_job->function= server_function;
  158. server_function->job_total++;
  159. int checked_length;
  160. checked_length= snprintf(server_job->job_handle, GEARMAND_JOB_HANDLE_SIZE, "%s:%u",
  161. server->job_handle_prefix, server->job_handle_count);
  162. if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
  163. {
  164. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Job handle plus handle count beyond GEARMAND_JOB_HANDLE_SIZE: %s:%u",
  165. server->job_handle_prefix, server->job_handle_count);
  166. }
  167. checked_length= snprintf(server_job->unique, GEARMAN_UNIQUE_SIZE, "%.*s",
  168. (int)unique_size, unique);
  169. if (checked_length >= GEARMAN_UNIQUE_SIZE || checked_length < 0)
  170. {
  171. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "We recieved a unique beyond GEARMAN_UNIQUE_SIZE: %.*s", (int)unique_size, unique);
  172. }
  173. server->job_handle_count++;
  174. server_job->data= data;
  175. server_job->data_size= data_size;
  176. server_job->when= when;
  177. if (reducer_size)
  178. {
  179. strncpy(server_job->reducer, reducer_name, reducer_size);
  180. server_job->reducer[reducer_size]= 0;
  181. }
  182. else
  183. {
  184. server_job->reducer[0]= 0;
  185. }
  186. server_job->unique_key= key;
  187. key= key % GEARMAND_JOB_HASH_SIZE;
  188. GEARMAN_HASH_ADD(server->unique, key, server_job, unique_);
  189. key= _server_job_hash(server_job->job_handle,
  190. strlen(server_job->job_handle));
  191. server_job->job_handle_key= key;
  192. key= key % GEARMAND_JOB_HASH_SIZE;
  193. gearmand_hash_server_add(server, key, server_job);
  194. if (server->state.queue_startup)
  195. {
  196. server_job->job_queued= true;
  197. }
  198. else if (server_client == NULL && server->queue._add_fn != NULL)
  199. {
  200. *ret_ptr= (*(server->queue._add_fn))(server,
  201. (void *)server->queue._context,
  202. server_job->unique,
  203. unique_size,
  204. function_name,
  205. function_name_size,
  206. data, data_size, priority,
  207. when);
  208. if (gearmand_failed(*ret_ptr))
  209. {
  210. server_job->data= NULL;
  211. gearman_server_job_free(server_job);
  212. return NULL;
  213. }
  214. if (server->queue._flush_fn != NULL)
  215. {
  216. *ret_ptr= (*(server->queue._flush_fn))(server,
  217. (void *)server->queue._context);
  218. if (*ret_ptr != GEARMAN_SUCCESS)
  219. {
  220. server_job->data= NULL;
  221. gearman_server_job_free(server_job);
  222. return NULL;
  223. }
  224. }
  225. server_job->job_queued= true;
  226. }
  227. *ret_ptr= gearman_server_job_queue(server_job);
  228. if (gearmand_failed(*ret_ptr))
  229. {
  230. if (server_client == NULL && server->queue._done_fn != NULL)
  231. {
  232. /* Do our best to remove the job from the queue. */
  233. (void)(*(server->queue._done_fn))(server,
  234. (void *)server->queue._context,
  235. server_job->unique, unique_size,
  236. server_job->function->function_name,
  237. server_job->function->function_name_size);
  238. }
  239. gearman_server_job_free(server_job);
  240. return NULL;
  241. }
  242. }
  243. else
  244. {
  245. *ret_ptr= GEARMAN_JOB_EXISTS;
  246. }
  247. if (server_client)
  248. {
  249. server_client->job= server_job;
  250. GEARMAN_LIST_ADD(server_job->client, server_client, job_)
  251. }
  252. return server_job;
  253. }
  254. gearman_server_job_st *
  255. gearman_server_job_create(gearman_server_st *server)
  256. {
  257. gearman_server_job_st *server_job;
  258. if (server->free_job_count > 0)
  259. {
  260. server_job= server->free_job_list;
  261. gearmand_server_free_job_list_free(server, server_job);
  262. }
  263. else
  264. {
  265. server_job= (gearman_server_job_st *)malloc(sizeof(gearman_server_job_st));
  266. if (server_job == NULL)
  267. return NULL;
  268. }
  269. server_job->ignore_job= false;
  270. server_job->job_queued= false;
  271. server_job->retries= 0;
  272. server_job->priority= 0;
  273. server_job->job_handle_key= 0;
  274. server_job->unique_key= 0;
  275. server_job->client_count= 0;
  276. server_job->numerator= 0;
  277. server_job->denominator= 0;
  278. server_job->data_size= 0;
  279. server_job->next= NULL;
  280. server_job->prev= NULL;
  281. server_job->unique_next= NULL;
  282. server_job->unique_prev= NULL;
  283. server_job->worker_next= NULL;
  284. server_job->worker_prev= NULL;
  285. server_job->function= NULL;
  286. server_job->function_next= NULL;
  287. server_job->data= NULL;
  288. server_job->client_list= NULL;
  289. server_job->worker= NULL;
  290. server_job->job_handle[0]= 0;
  291. server_job->unique[0]= 0;
  292. return server_job;
  293. }
  294. void gearman_server_job_free(gearman_server_job_st *server_job)
  295. {
  296. uint32_t key;
  297. if (! server_job)
  298. {
  299. return;
  300. }
  301. if (server_job->worker != NULL)
  302. {
  303. server_job->function->job_running--;
  304. }
  305. server_job->function->job_total--;
  306. if (server_job->data != NULL)
  307. free((void *)(server_job->data));
  308. while (server_job->client_list != NULL)
  309. gearman_server_client_free(server_job->client_list);
  310. if (server_job->worker != NULL)
  311. GEARMAN_LIST_DEL(server_job->worker->job, server_job, worker_)
  312. key= server_job->unique_key % GEARMAND_JOB_HASH_SIZE;
  313. GEARMAN_HASH_DEL(Server->unique, key, server_job, unique_);
  314. key= server_job->job_handle_key % GEARMAND_JOB_HASH_SIZE;
  315. gearmand_hash_server_free(Server, key, server_job);
  316. if (Server->free_job_count < GEARMAN_MAX_FREE_SERVER_JOB)
  317. {
  318. gearmand_server_job_list_add(Server, server_job);
  319. }
  320. else
  321. {
  322. free(server_job);
  323. }
  324. }
  325. gearman_server_job_st *gearman_server_job_get(gearman_server_st *server,
  326. const char *job_handle,
  327. gearman_server_con_st *worker_con)
  328. {
  329. uint32_t key;
  330. key= _server_job_hash(job_handle, strlen(job_handle));
  331. for (gearman_server_job_st *server_job= server->job_hash[key % GEARMAND_JOB_HASH_SIZE];
  332. server_job != NULL; server_job= server_job->next)
  333. {
  334. if (server_job->job_handle_key == key &&
  335. !strcmp(server_job->job_handle, job_handle))
  336. {
  337. /* Check to make sure the worker asking for the job still owns the job. */
  338. if (worker_con != NULL &&
  339. (server_job->worker == NULL || server_job->worker->con != worker_con))
  340. {
  341. return NULL;
  342. }
  343. return server_job;
  344. }
  345. }
  346. return NULL;
  347. }
  348. gearman_server_job_st *
  349. gearman_server_job_peek(gearman_server_con_st *server_con)
  350. {
  351. for (gearman_server_worker_st *server_worker= server_con->worker_list;
  352. server_worker != NULL;
  353. server_worker= server_worker->con_next)
  354. {
  355. if (server_worker->function->job_count != 0)
  356. {
  357. for (gearmand_job_priority_t priority= GEARMAND_JOB_PRIORITY_HIGH;
  358. priority != GEARMAND_JOB_PRIORITY_MAX; priority++)
  359. {
  360. gearman_server_job_st *server_job;
  361. server_job= server_worker->function->job_list[priority];
  362. int64_t current_time= (int64_t)time(NULL);
  363. while(server_job &&
  364. server_job->when != 0 &&
  365. server_job->when > current_time)
  366. {
  367. server_job= server_job->function_next;
  368. }
  369. if (server_job != NULL)
  370. {
  371. if (server_job->ignore_job)
  372. {
  373. /* This is only happens when a client disconnects from a foreground
  374. job. We do this because we don't want to run the job anymore. */
  375. server_job->ignore_job= false;
  376. gearman_server_job_free(gearman_server_job_take(server_con));
  377. return gearman_server_job_peek(server_con);
  378. }
  379. return server_job;
  380. }
  381. }
  382. }
  383. }
  384. return NULL;
  385. }
  386. static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
  387. gearman_server_worker_st *worker)
  388. {
  389. worker->con_prev= NULL;
  390. worker->con_next= list;
  391. while (worker->con_next != NULL)
  392. {
  393. worker->con_prev= worker->con_next;
  394. worker->con_next= worker->con_next->con_next;
  395. }
  396. if (worker->con_prev)
  397. worker->con_prev->con_next= worker;
  398. }
  399. gearman_server_job_st *gearman_server_job_take(gearman_server_con_st *server_con)
  400. {
  401. for (gearman_server_worker_st *server_worker= server_con->worker_list; server_worker; server_worker= server_worker->con_next)
  402. {
  403. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Jobs available %lu", (unsigned long)(server_worker->function->job_count));
  404. if (server_worker->function->job_count)
  405. {
  406. if (server_worker == NULL)
  407. {
  408. return NULL;
  409. }
  410. if (Server->flags.round_robin)
  411. {
  412. GEARMAN_LIST_DEL(server_con->worker, server_worker, con_)
  413. _server_con_worker_list_append(server_con->worker_list, server_worker);
  414. ++server_con->worker_count;
  415. if (server_con->worker_list == NULL)
  416. {
  417. server_con->worker_list= server_worker;
  418. }
  419. }
  420. gearmand_job_priority_t priority;
  421. for (priority= GEARMAND_JOB_PRIORITY_HIGH; priority < GEARMAND_JOB_PRIORITY_LOW; priority++)
  422. {
  423. if (server_worker->function->job_list[priority])
  424. {
  425. break;
  426. }
  427. }
  428. gearman_server_job_st *server_job= server_job= server_worker->function->job_list[priority];
  429. gearman_server_job_st *previous_job= server_job;
  430. int64_t current_time= (int64_t)time(NULL);
  431. while (server_job && server_job->when != 0 && server_job->when > current_time)
  432. {
  433. previous_job= server_job;
  434. server_job= server_job->function_next;
  435. }
  436. if (server_job)
  437. {
  438. if (server_job->function->job_list[priority] == server_job)
  439. {
  440. // If it's the head of the list, advance it
  441. server_job->function->job_list[priority]= server_job->function_next;
  442. }
  443. else
  444. {
  445. // Otherwise, just remove the item from the list
  446. previous_job->function_next= server_job->function_next;
  447. }
  448. // If it's the tail of the list, move the tail back
  449. if (server_job->function->job_end[priority] == server_job)
  450. {
  451. server_job->function->job_end[priority]= previous_job;
  452. }
  453. server_job->function->job_count--;
  454. server_job->worker= server_worker;
  455. GEARMAN_LIST_ADD(server_worker->job, server_job, worker_)
  456. server_job->function->job_running++;
  457. if (server_job->ignore_job)
  458. {
  459. gearman_server_job_free(server_job);
  460. return gearman_server_job_take(server_con);
  461. }
  462. return server_job;
  463. }
  464. }
  465. }
  466. return NULL;
  467. }
  468. gearmand_error_t gearman_server_job_queue(gearman_server_job_st *job)
  469. {
  470. gearman_server_client_st *client;
  471. gearman_server_worker_st *worker;
  472. uint32_t noop_sent;
  473. gearmand_error_t ret;
  474. if (job->worker)
  475. {
  476. job->retries++;
  477. if (Server->job_retries == job->retries)
  478. {
  479. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  480. "Dropped job due to max retry count: %s %s",
  481. job->job_handle, job->unique);
  482. for (client= job->client_list; client != NULL; client= client->job_next)
  483. {
  484. ret= gearman_server_io_packet_add(client->con, false,
  485. GEARMAN_MAGIC_RESPONSE,
  486. GEARMAN_COMMAND_WORK_FAIL,
  487. job->job_handle,
  488. (size_t)strlen(job->job_handle),
  489. NULL);
  490. if (gearmand_failed(ret))
  491. {
  492. return ret;
  493. }
  494. }
  495. /* Remove from persistent queue if one exists. */
  496. if (job->job_queued && Server->queue._done_fn != NULL)
  497. {
  498. ret= (*(Server->queue._done_fn))(Server,
  499. (void *)Server->queue._context,
  500. job->unique,
  501. (size_t)strlen(job->unique),
  502. job->function->function_name,
  503. job->function->function_name_size);
  504. if (ret != GEARMAN_SUCCESS)
  505. return ret;
  506. }
  507. gearman_server_job_free(job);
  508. return GEARMAN_SUCCESS;
  509. }
  510. GEARMAN_LIST_DEL(job->worker->job, job, worker_)
  511. job->worker= NULL;
  512. job->function->job_running--;
  513. job->function_next= NULL;
  514. job->numerator= 0;
  515. job->denominator= 0;
  516. }
  517. /* Queue NOOP for possible sleeping workers. */
  518. if (job->function->worker_list != NULL)
  519. {
  520. worker= job->function->worker_list;
  521. noop_sent= 0;
  522. do
  523. {
  524. if (worker->con->is_sleeping && ! (worker->con->is_noop_sent))
  525. {
  526. ret= gearman_server_io_packet_add(worker->con, false,
  527. GEARMAN_MAGIC_RESPONSE,
  528. GEARMAN_COMMAND_NOOP, NULL);
  529. if (gearmand_failed(ret))
  530. {
  531. gearmand_gerror("gearman_server_io_packet_add", ret);
  532. return ret;
  533. }
  534. worker->con->is_noop_sent= true;
  535. noop_sent++;
  536. }
  537. worker= worker->function_next;
  538. }
  539. while (worker != job->function->worker_list &&
  540. (Server->worker_wakeup == 0 ||
  541. noop_sent < Server->worker_wakeup));
  542. job->function->worker_list= worker;
  543. }
  544. /* Queue the job to be run. */
  545. if (job->function->job_list[job->priority] == NULL)
  546. {
  547. job->function->job_list[job->priority]= job;
  548. }
  549. else
  550. {
  551. job->function->job_end[job->priority]->function_next= job;
  552. }
  553. job->function->job_end[job->priority]= job;
  554. job->function->job_count++;
  555. return GEARMAN_SUCCESS;
  556. }
  557. /*
  558. * Private definitions
  559. */
  560. static uint32_t _server_job_hash(const char *key, size_t key_size)
  561. {
  562. const char *ptr= key;
  563. int32_t value= 0;
  564. while (key_size--)
  565. {
  566. value += (int32_t)*ptr++;
  567. value += (value << 10);
  568. value ^= (value >> 6);
  569. }
  570. value += (value << 3);
  571. value ^= (value >> 11);
  572. value += (value << 15);
  573. return (uint32_t)(value == 0 ? 1 : value);
  574. }
  575. static gearman_server_job_st *
  576. _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
  577. gearman_server_function_st *server_function,
  578. const char *unique, size_t data_size)
  579. {
  580. gearman_server_job_st *server_job;
  581. for (server_job= server->unique_hash[unique_key % GEARMAND_JOB_HASH_SIZE];
  582. server_job != NULL; server_job= server_job->unique_next)
  583. {
  584. if (data_size == 0)
  585. {
  586. if (server_job->function == server_function &&
  587. server_job->unique_key == unique_key &&
  588. !strcmp(server_job->unique, unique))
  589. {
  590. return server_job;
  591. }
  592. }
  593. else
  594. {
  595. if (server_job->function == server_function &&
  596. server_job->unique_key == unique_key &&
  597. server_job->data_size == data_size &&
  598. !memcmp(server_job->data, unique, data_size))
  599. {
  600. return server_job;
  601. }
  602. }
  603. }
  604. return NULL;
  605. }