client.cc 53 KB


  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Client Definitions
  41. */
  42. #include <libgearman/common.h>
  43. #include <libgearman/add.h>
  44. #include <libgearman/connection.h>
  45. #include <cassert>
  46. #include <cerrno>
  47. #include <cstdio>
  48. #include <cstdlib>
  49. #include <cstring>
  50. #include <memory>
  51. /**
  52. * @addtogroup gearman_client_static Static Client Declarations
  53. * @ingroup gearman_client
  54. * @{
  55. */
  56. /**
  57. * Allocate a client structure.
  58. */
  59. static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone);
  60. /**
  61. * Callback function used when parsing server lists.
  62. */
  63. static gearman_return_t _client_add_server(const char *host, in_port_t port,
  64. void *context);
  65. /**
  66. * Task state machine.
  67. */
  68. static gearman_return_t _client_run_task(gearman_client_st *client,
  69. gearman_task_st *task);
  70. /**
  71. * Real do function.
  72. */
  73. static void *_client_do(gearman_client_st *client, gearman_command_t command,
  74. gearman_string_t &function,
  75. gearman_unique_t &unique,
  76. const void *workload, size_t workload_size,
  77. size_t *result_size, gearman_return_t *ret_ptr);
  78. /**
  79. * Real background do function.
  80. */
  81. static gearman_return_t _client_do_background(gearman_client_st *client,
  82. gearman_command_t command,
  83. gearman_string_t &function,
  84. gearman_unique_t &unique,
  85. gearman_string_t &workload,
  86. char *job_handle);
  87. /** @} */
  88. #if 0
  89. const char *gearman_client_strstate(gearman_client_st *self)
  90. {
  91. switch(self->state)
  92. {
  93. case GEARMAN_CLIENT_STATE_IDLE: return "GEARMAN_CLIENT_STATE_IDLE";
  94. case GEARMAN_CLIENT_STATE_NEW: return "GEARMAN_CLIENT_STATE_NEW";
  95. case GEARMAN_CLIENT_STATE_SUBMIT: return "GEARMAN_CLIENT_STATE_SUBMIT";
  96. case GEARMAN_CLIENT_STATE_PACKET: return "GEARMAN_CLIENT_STATE_PACKET";
  97. }
  98. return "";
  99. }
  100. #endif
  101. /*
  102. * Public Definitions
  103. */
  104. gearman_client_st *gearman_client_create(gearman_client_st *client)
  105. {
  106. return _client_allocate(client, false);
  107. }
  108. gearman_client_st *gearman_client_clone(gearman_client_st *client,
  109. const gearman_client_st *from)
  110. {
  111. gearman_universal_st *check;
  112. if (not from)
  113. {
  114. return _client_allocate(client, false);
  115. }
  116. client= _client_allocate(client, true);
  117. if (client == NULL)
  118. {
  119. return client;
  120. }
  121. client->options.non_blocking= from->options.non_blocking;
  122. client->options.unbuffered_result= from->options.unbuffered_result;
  123. client->options.no_new= from->options.no_new;
  124. client->options.free_tasks= from->options.free_tasks;
  125. client->actions= from->actions;
  126. check= gearman_universal_clone((&client->universal), &(from->universal));
  127. if (not check)
  128. {
  129. gearman_client_free(client);
  130. return NULL;
  131. }
  132. return client;
  133. }
  134. void gearman_client_free(gearman_client_st *client)
  135. {
  136. gearman_client_task_free_all(client);
  137. gearman_universal_free(&client->universal);
  138. if (client->options.allocated)
  139. delete client;
  140. }
  141. const char *gearman_client_error(const gearman_client_st *client)
  142. {
  143. if (not client)
  144. return NULL;
  145. return gearman_universal_error(&client->universal);
  146. }
  147. gearman_return_t gearman_client_error_code(const gearman_client_st *client)
  148. {
  149. return gearman_universal_error_code(&client->universal);
  150. }
  151. int gearman_client_errno(const gearman_client_st *client)
  152. {
  153. return gearman_universal_errno(&client->universal);
  154. }
  155. gearman_client_options_t gearman_client_options(const gearman_client_st *client)
  156. {
  157. int32_t options;
  158. memset(&options, 0, sizeof(int32_t));
  159. if (client->options.allocated)
  160. options|= int(GEARMAN_CLIENT_ALLOCATED);
  161. if (client->options.non_blocking)
  162. options|= int(GEARMAN_CLIENT_NON_BLOCKING);
  163. if (client->options.unbuffered_result)
  164. options|= int(GEARMAN_CLIENT_UNBUFFERED_RESULT);
  165. if (client->options.no_new)
  166. options|= int(GEARMAN_CLIENT_NO_NEW);
  167. if (client->options.free_tasks)
  168. options|= int(GEARMAN_CLIENT_FREE_TASKS);
  169. return gearman_client_options_t(options);
  170. }
  171. void gearman_client_set_options(gearman_client_st *client,
  172. gearman_client_options_t options)
  173. {
  174. gearman_client_options_t usable_options[]= {
  175. GEARMAN_CLIENT_NON_BLOCKING,
  176. GEARMAN_CLIENT_UNBUFFERED_RESULT,
  177. GEARMAN_CLIENT_FREE_TASKS,
  178. GEARMAN_CLIENT_MAX
  179. };
  180. gearman_client_options_t *ptr;
  181. for (ptr= usable_options; *ptr != GEARMAN_CLIENT_MAX ; ptr++)
  182. {
  183. if (options & *ptr)
  184. {
  185. gearman_client_add_options(client, *ptr);
  186. }
  187. else
  188. {
  189. gearman_client_remove_options(client, *ptr);
  190. }
  191. }
  192. }
  193. void gearman_client_add_options(gearman_client_st *client,
  194. gearman_client_options_t options)
  195. {
  196. if (options & GEARMAN_CLIENT_NON_BLOCKING)
  197. {
  198. gearman_universal_add_options(&client->universal, GEARMAN_NON_BLOCKING);
  199. client->options.non_blocking= true;
  200. }
  201. if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
  202. {
  203. client->options.unbuffered_result= true;
  204. }
  205. if (options & GEARMAN_CLIENT_FREE_TASKS)
  206. {
  207. client->options.free_tasks= true;
  208. }
  209. }
  210. void gearman_client_remove_options(gearman_client_st *client,
  211. gearman_client_options_t options)
  212. {
  213. if (options & GEARMAN_CLIENT_NON_BLOCKING)
  214. {
  215. gearman_universal_remove_options(&client->universal, GEARMAN_NON_BLOCKING);
  216. client->options.non_blocking= false;
  217. }
  218. if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
  219. {
  220. client->options.unbuffered_result= false;
  221. }
  222. if (options & GEARMAN_CLIENT_FREE_TASKS)
  223. {
  224. client->options.free_tasks= false;
  225. }
  226. }
  227. int gearman_client_timeout(gearman_client_st *client)
  228. {
  229. return gearman_universal_timeout(&client->universal);
  230. }
  231. void gearman_client_set_timeout(gearman_client_st *client, int timeout)
  232. {
  233. gearman_universal_set_timeout(&client->universal, timeout);
  234. }
  235. void *gearman_client_context(const gearman_client_st *client)
  236. {
  237. return const_cast<void *>(client->context);
  238. }
  239. void gearman_client_set_context(gearman_client_st *client, void *context)
  240. {
  241. client->context= context;
  242. }
  243. void gearman_client_set_log_fn(gearman_client_st *client,
  244. gearman_log_fn *function, void *context,
  245. gearman_verbose_t verbose)
  246. {
  247. gearman_set_log_fn(&client->universal, function, context, verbose);
  248. }
  249. void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
  250. gearman_malloc_fn *function,
  251. void *context)
  252. {
  253. gearman_set_workload_malloc_fn(&client->universal, function, context);
  254. }
  255. void gearman_client_set_workload_free_fn(gearman_client_st *client, gearman_free_fn *function, void *context)
  256. {
  257. gearman_set_workload_free_fn(&client->universal, function, context);
  258. }
  259. gearman_return_t gearman_client_add_server(gearman_client_st *client,
  260. const char *host, in_port_t port)
  261. {
  262. if (not client)
  263. {
  264. return GEARMAN_INVALID_ARGUMENT;
  265. }
  266. if (gearman_connection_create_args(&client->universal, NULL, host, port) == NULL)
  267. {
  268. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  269. }
  270. return GEARMAN_SUCCESS;
  271. }
  272. gearman_return_t gearman_client_add_servers(gearman_client_st *client,
  273. const char *servers)
  274. {
  275. return gearman_parse_servers(servers, _client_add_server, client);
  276. }
  277. void gearman_client_remove_servers(gearman_client_st *client)
  278. {
  279. gearman_free_all_cons(&client->universal);
  280. }
  281. gearman_return_t gearman_client_wait(gearman_client_st *client)
  282. {
  283. return gearman_wait(&client->universal);
  284. }
  285. void *gearman_client_do(gearman_client_st *client, const char *function_name,
  286. const char *unique, const void *workload,
  287. size_t workload_size, size_t *result_size,
  288. gearman_return_t *ret_ptr)
  289. {
  290. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  291. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  292. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB,
  293. function,
  294. local_unique,
  295. workload, workload_size, result_size, ret_ptr);
  296. }
  297. void *gearman_client_do_high(gearman_client_st *client,
  298. const char *function_name, const char *unique,
  299. const void *workload, size_t workload_size,
  300. size_t *result_size, gearman_return_t *ret_ptr)
  301. {
  302. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  303. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  304. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
  305. function,
  306. local_unique,
  307. workload, workload_size, result_size, ret_ptr);
  308. }
  309. void *gearman_client_do_low(gearman_client_st *client,
  310. const char *function_name, const char *unique,
  311. const void *workload, size_t workload_size,
  312. size_t *result_size, gearman_return_t *ret_ptr)
  313. {
  314. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  315. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  316. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
  317. function,
  318. local_unique,
  319. workload, workload_size, result_size, ret_ptr);
  320. }
  321. static inline gearman_command_t pick_command_by_priority(const gearman_job_priority_t &arg)
  322. {
  323. if (arg == GEARMAN_JOB_PRIORITY_NORMAL)
  324. return GEARMAN_COMMAND_SUBMIT_JOB;
  325. else if (arg == GEARMAN_JOB_PRIORITY_HIGH)
  326. return GEARMAN_COMMAND_SUBMIT_JOB_HIGH;
  327. return GEARMAN_COMMAND_SUBMIT_JOB_LOW;
  328. }
  329. static inline gearman_command_t pick_command_by_priority_background(const gearman_job_priority_t &arg)
  330. {
  331. if (arg == GEARMAN_JOB_PRIORITY_NORMAL)
  332. return GEARMAN_COMMAND_SUBMIT_JOB_BG;
  333. else if (arg == GEARMAN_JOB_PRIORITY_HIGH)
  334. return GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG;
  335. return GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG;
  336. }
  337. size_t gearman_client_count_tasks(gearman_client_st *client)
  338. {
  339. size_t count= 1;
  340. gearman_task_st *search= client->task_list;
  341. while ((search= search->next))
  342. {
  343. count++;
  344. }
  345. return count;
  346. }
  347. #if 0
  348. static bool _active_tasks(gearman_client_st *client)
  349. {
  350. assert(client);
  351. gearman_task_st *search= client->task_list;
  352. if (not search)
  353. return false;
  354. do
  355. {
  356. if (gearman_task_is_active(search))
  357. {
  358. return true;
  359. }
  360. } while ((search= search->next));
  361. return false;
  362. }
  363. #endif
  364. gearman_task_st *gearman_client_execute(gearman_client_st *client,
  365. const char *function_str, size_t function_length,
  366. const char *unique_str, size_t unique_length,
  367. gearman_work_t *workload,
  368. gearman_argument_t *arguments)
  369. {
  370. if (not client)
  371. {
  372. errno= EINVAL;
  373. return NULL;
  374. }
  375. if (not function_str or not function_length)
  376. {
  377. errno= EINVAL;
  378. gearman_perror(&client->universal, "gearman_function_st was NULL");
  379. return NULL;
  380. }
  381. assert(function_str and function_length);
  382. gearman_task_st *task= NULL;
  383. gearman_unique_t unique= gearman_unique_make(unique_str, unique_length);
  384. gearman_string_t function= { function_str, function_length };
  385. if (workload)
  386. {
  387. switch (workload->kind)
  388. {
  389. case GEARMAN_WORK_KIND_BACKGROUND:
  390. task= add_task(client,
  391. workload->context,
  392. pick_command_by_priority_background(workload->priority),
  393. function,
  394. unique,
  395. arguments->value,
  396. time_t(0),
  397. gearman_actions_execute_defaults());
  398. break;
  399. case GEARMAN_WORK_KIND_EPOCH:
  400. task= add_task(client,
  401. workload->context,
  402. GEARMAN_COMMAND_SUBMIT_JOB_EPOCH,
  403. function,
  404. unique,
  405. arguments->value,
  406. gearman_workload_epoch(workload),
  407. gearman_actions_execute_defaults());
  408. break;
  409. case GEARMAN_WORK_KIND_FOREGROUND:
  410. task= add_task(client,
  411. workload->context,
  412. pick_command_by_priority(workload->priority),
  413. function,
  414. unique,
  415. arguments->value,
  416. time_t(0),
  417. gearman_actions_execute_defaults());
  418. break;
  419. }
  420. }
  421. else
  422. {
  423. task= add_task(client,
  424. NULL,
  425. GEARMAN_COMMAND_SUBMIT_JOB,
  426. function,
  427. unique,
  428. arguments->value,
  429. 0,
  430. gearman_actions_execute_defaults());
  431. }
  432. if (not task)
  433. {
  434. gearman_universal_error_code(&client->universal);
  435. return NULL;
  436. }
  437. if (not workload) // We have no description, so we just run it
  438. {
  439. gearman_client_run_tasks(client);
  440. }
  441. else // Everything else, we do now.
  442. {
  443. gearman_client_run_tasks(client);
  444. }
  445. return task;
  446. }
  447. gearman_task_st *gearman_client_execute_reduce(gearman_client_st *client,
  448. const char *function_str, const size_t function_length,
  449. const char *reducer_str, const size_t reducer_length,
  450. const char *unique_str, const size_t unique_length,
  451. gearman_work_t *workload,
  452. gearman_argument_t *arguments)
  453. {
  454. if (not client)
  455. {
  456. errno= EINVAL;
  457. return NULL;
  458. }
  459. if (not function_str or not function_length)
  460. {
  461. errno= EINVAL;
  462. gearman_perror(&client->universal, "gearman_function_st was NULL");
  463. return NULL;
  464. }
  465. assert(function_str and function_length);
  466. gearman_task_st *task= NULL;
  467. gearman_unique_t unique= gearman_unique_make(unique_str, unique_length);
  468. gearman_string_t function= { function_str, function_length };
  469. gearman_string_t reducer= { reducer_str, reducer_length };
  470. if (workload)
  471. {
  472. switch (workload->kind)
  473. {
  474. case GEARMAN_WORK_KIND_BACKGROUND:
  475. task= add_task(client,
  476. GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND,
  477. workload->priority,
  478. function,
  479. reducer,
  480. unique,
  481. arguments->value,
  482. gearman_actions_execute_defaults(),
  483. time_t(0),
  484. workload->context);
  485. break;
  486. case GEARMAN_WORK_KIND_EPOCH:
  487. task= add_task(client,
  488. GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND,
  489. workload->priority,
  490. function,
  491. reducer,
  492. unique,
  493. arguments->value,
  494. gearman_actions_execute_defaults(),
  495. gearman_workload_epoch(workload),
  496. workload->context);
  497. break;
  498. case GEARMAN_WORK_KIND_FOREGROUND:
  499. task= add_task(client,
  500. GEARMAN_COMMAND_SUBMIT_REDUCE_JOB,
  501. workload->priority,
  502. function,
  503. reducer,
  504. unique,
  505. arguments->value,
  506. gearman_actions_execute_defaults(),
  507. time_t(0),
  508. workload->context);
  509. break;
  510. }
  511. }
  512. else
  513. {
  514. task= add_task(client,
  515. GEARMAN_COMMAND_SUBMIT_REDUCE_JOB,
  516. GEARMAN_JOB_PRIORITY_NORMAL,
  517. function,
  518. reducer,
  519. unique,
  520. arguments->value,
  521. gearman_actions_execute_defaults(),
  522. time_t(0),
  523. NULL);
  524. }
  525. if (task)
  526. {
  527. // Run!
  528. gearman_client_run_tasks(client);
  529. }
  530. return task;
  531. }
  532. const char *gearman_client_do_job_handle(const gearman_client_st *self)
  533. {
  534. if (not self)
  535. {
  536. errno= EINVAL;
  537. return NULL;
  538. }
  539. if (not self->task_list)
  540. {
  541. errno= EINVAL;
  542. return NULL;
  543. }
  544. return gearman_task_job_handle(self->task_list);
  545. }
  546. void gearman_client_do_status(gearman_client_st *, uint32_t *numerator, uint32_t *denominator)
  547. {
  548. if (numerator)
  549. *numerator= 0;
  550. if (denominator)
  551. *denominator= 0;
  552. }
  553. gearman_return_t gearman_client_do_background(gearman_client_st *client,
  554. const char *function_name,
  555. const char *unique,
  556. const void *workload_str,
  557. size_t workload_size,
  558. char *job_handle)
  559. {
  560. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  561. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  562. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  563. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
  564. function,
  565. local_unique,
  566. workload,
  567. job_handle);
  568. }
  569. gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
  570. const char *function_name,
  571. const char *unique,
  572. const void *workload_str,
  573. size_t workload_size,
  574. char *job_handle)
  575. {
  576. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  577. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  578. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  579. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
  580. function,
  581. local_unique,
  582. workload,
  583. job_handle);
  584. }
  585. gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
  586. const char *function_name,
  587. const char *unique,
  588. const void *workload_str,
  589. size_t workload_size,
  590. char *job_handle)
  591. {
  592. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  593. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  594. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  595. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
  596. function,
  597. local_unique,
  598. workload,
  599. job_handle);
  600. }
  601. gearman_return_t gearman_client_job_status(gearman_client_st *client,
  602. const char *job_handle,
  603. bool *is_known, bool *is_running,
  604. uint32_t *numerator,
  605. uint32_t *denominator)
  606. {
  607. gearman_return_t ret;
  608. gearman_task_st do_task, *do_task_ptr;
  609. do_task_ptr= gearman_client_add_task_status(client, &do_task, client,
  610. job_handle, &ret);
  611. if (gearman_failed(ret))
  612. return ret;
  613. assert(do_task_ptr);
  614. gearman_task_clear_fn(do_task_ptr);
  615. ret= gearman_client_run_tasks(client);
  616. if (ret != GEARMAN_IO_WAIT)
  617. {
  618. if (is_known)
  619. *is_known= do_task.options.is_known;
  620. if (is_running)
  621. *is_running= do_task.options.is_running;
  622. if (numerator)
  623. *numerator= do_task.numerator;
  624. if (denominator)
  625. *denominator= do_task.denominator;
  626. }
  627. gearman_task_free(do_task_ptr);
  628. return ret;
  629. }
  630. gearman_return_t gearman_client_echo(gearman_client_st *client,
  631. const void *workload,
  632. size_t workload_size)
  633. {
  634. return gearman_echo(&client->universal, workload, workload_size);
  635. }
  636. void gearman_client_task_free_all(gearman_client_st *client)
  637. {
  638. while (client->task_list)
  639. gearman_task_free(client->task_list);
  640. }
  641. void gearman_client_set_task_context_free_fn(gearman_client_st *client,
  642. gearman_task_context_free_fn *function)
  643. {
  644. client->task_context_free_fn= function;
  645. }
  646. gearman_task_st *gearman_client_add_task(gearman_client_st *client,
  647. gearman_task_st *task,
  648. void *context,
  649. const char *function_name,
  650. const char *unique,
  651. const void *workload_str, size_t workload_size,
  652. gearman_return_t *ret_ptr)
  653. {
  654. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  655. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  656. gearman_string_t workload= { static_cast<const char *>(workload_str), workload_size };
  657. if (not client)
  658. {
  659. errno= EINVAL;
  660. return NULL;
  661. }
  662. task= add_task(client, task,
  663. context, GEARMAN_COMMAND_SUBMIT_JOB,
  664. function,
  665. local_unique,
  666. workload,
  667. time_t(0));
  668. if (not task and ret_ptr)
  669. {
  670. *ret_ptr= gearman_universal_error_code(&client->universal);
  671. }
  672. else if (ret_ptr)
  673. {
  674. task->func= client->actions;
  675. *ret_ptr= GEARMAN_SUCCESS;
  676. }
  677. return task;
  678. }
  679. gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
  680. gearman_task_st *task,
  681. void *context,
  682. const char *function_name,
  683. const char *unique,
  684. const void *workload_str, size_t workload_size,
  685. gearman_return_t *ret_ptr)
  686. {
  687. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  688. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  689. gearman_string_t workload= { static_cast<const char *>(workload_str), workload_size };
  690. task= add_task(client, task, context,
  691. GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
  692. function,
  693. local_unique,
  694. workload, time_t(0));
  695. if (not task and ret_ptr)
  696. {
  697. *ret_ptr= gearman_universal_error_code(&client->universal);
  698. }
  699. else if (ret_ptr)
  700. {
  701. *ret_ptr= GEARMAN_SUCCESS;
  702. }
  703. return task;
  704. }
  705. gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
  706. gearman_task_st *task,
  707. void *context,
  708. const char *function_name,
  709. const char *unique,
  710. const void *workload_str, size_t workload_size,
  711. gearman_return_t *ret_ptr)
  712. {
  713. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  714. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  715. gearman_string_t workload= { static_cast<const char *>(workload_str), workload_size };
  716. task= add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
  717. function,
  718. local_unique,
  719. workload,
  720. time_t(0));
  721. if (not task and ret_ptr)
  722. {
  723. *ret_ptr= gearman_universal_error_code(&client->universal);
  724. }
  725. else if (ret_ptr)
  726. {
  727. *ret_ptr= GEARMAN_SUCCESS;
  728. }
  729. return task;
  730. }
  731. gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
  732. gearman_task_st *task,
  733. void *context,
  734. const char *function_name,
  735. const char *unique,
  736. const void *workload_str, size_t workload_size,
  737. gearman_return_t *ret_ptr)
  738. {
  739. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  740. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  741. gearman_string_t workload= { static_cast<const char *>(workload_str), workload_size };
  742. task= add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
  743. function,
  744. local_unique,
  745. workload,
  746. time_t(0));
  747. if (not task and ret_ptr)
  748. {
  749. *ret_ptr= gearman_universal_error_code(&client->universal);
  750. }
  751. else if (ret_ptr)
  752. {
  753. *ret_ptr= GEARMAN_SUCCESS;
  754. }
  755. return task;
  756. }
  757. gearman_task_st *
  758. gearman_client_add_task_high_background(gearman_client_st *client,
  759. gearman_task_st *task,
  760. void *context,
  761. const char *function_name,
  762. const char *unique,
  763. const void *workload_str, size_t workload_size,
  764. gearman_return_t *ret_ptr)
  765. {
  766. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  767. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  768. gearman_string_t workload= { static_cast<const char *>(workload_str), workload_size };
  769. task= add_task(client, task, context,
  770. GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
  771. function,
  772. local_unique,
  773. workload,
  774. time_t(0));
  775. if (not task and ret_ptr)
  776. {
  777. *ret_ptr= gearman_universal_error_code(&client->universal);
  778. }
  779. else if (ret_ptr)
  780. {
  781. *ret_ptr= GEARMAN_SUCCESS;
  782. }
  783. return task;
  784. }
  785. gearman_task_st *
  786. gearman_client_add_task_low_background(gearman_client_st *client,
  787. gearman_task_st *task,
  788. void *context,
  789. const char *function_name,
  790. const char *unique,
  791. const void *workload_str, size_t workload_size,
  792. gearman_return_t *ret_ptr)
  793. {
  794. gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
  795. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  796. gearman_string_t workload= { static_cast<const char *>(workload_str), workload_size };
  797. task= add_task(client, task, context,
  798. GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
  799. function,
  800. local_unique,
  801. workload,
  802. time_t(0));
  803. if (not task and ret_ptr)
  804. {
  805. *ret_ptr= gearman_universal_error_code(&client->universal);
  806. }
  807. else if (ret_ptr)
  808. {
  809. *ret_ptr= GEARMAN_SUCCESS;
  810. }
  811. return task;
  812. }
  813. gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
  814. gearman_task_st *task,
  815. void *context,
  816. const char *job_handle,
  817. gearman_return_t *ret_ptr)
  818. {
  819. const void *args[1];
  820. size_t args_size[1];
  821. task= gearman_task_internal_create(client, task);
  822. if (task == NULL)
  823. {
  824. if (ret_ptr)
  825. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  826. return NULL;
  827. }
  828. task->context= context;
  829. snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
  830. args[0]= job_handle;
  831. args_size[0]= strlen(job_handle);
  832. gearman_return_t rc;
  833. rc= gearman_packet_create_args(&client->universal, &(task->send),
  834. GEARMAN_MAGIC_REQUEST,
  835. GEARMAN_COMMAND_GET_STATUS,
  836. args, args_size, 1);
  837. if (gearman_success(rc))
  838. {
  839. client->new_tasks++;
  840. client->running_tasks++;
  841. task->options.send_in_use= true;
  842. }
  843. if (ret_ptr)
  844. *ret_ptr= rc;
  845. return task;
  846. }
  847. void gearman_client_set_workload_fn(gearman_client_st *client,
  848. gearman_workload_fn *function)
  849. {
  850. client->actions.workload_fn= function;
  851. }
  852. void gearman_client_set_created_fn(gearman_client_st *client,
  853. gearman_created_fn *function)
  854. {
  855. client->actions.created_fn= function;
  856. }
  857. void gearman_client_set_data_fn(gearman_client_st *client,
  858. gearman_data_fn *function)
  859. {
  860. client->actions.data_fn= function;
  861. }
  862. void gearman_client_set_warning_fn(gearman_client_st *client,
  863. gearman_warning_fn *function)
  864. {
  865. client->actions.warning_fn= function;
  866. }
  867. void gearman_client_set_status_fn(gearman_client_st *client,
  868. gearman_universal_status_fn *function)
  869. {
  870. client->actions.status_fn= function;
  871. }
  872. void gearman_client_set_complete_fn(gearman_client_st *client,
  873. gearman_complete_fn *function)
  874. {
  875. client->actions.complete_fn= function;
  876. }
  877. void gearman_client_set_exception_fn(gearman_client_st *client,
  878. gearman_exception_fn *function)
  879. {
  880. client->actions.exception_fn= function;
  881. }
  882. void gearman_client_set_fail_fn(gearman_client_st *client,
  883. gearman_fail_fn *function)
  884. {
  885. client->actions.fail_fn= function;
  886. }
  887. void gearman_client_clear_fn(gearman_client_st *client)
  888. {
  889. client->actions= gearman_actions_default();
  890. }
  891. static inline void _push_non_blocking(gearman_client_st *client)
  892. {
  893. client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
  894. client->universal.options.non_blocking= true;
  895. }
  896. static inline void _pop_non_blocking(gearman_client_st *client)
  897. {
  898. client->universal.options.non_blocking= client->options.non_blocking;
  899. assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
  900. }
  901. static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
  902. {
  903. gearman_return_t ret= GEARMAN_MAX_RETURN;
  904. switch(client->state)
  905. {
  906. case GEARMAN_CLIENT_STATE_IDLE:
  907. while (1)
  908. {
  909. /* Start any new tasks. */
  910. if (client->new_tasks > 0 && ! (client->options.no_new))
  911. {
  912. for (client->task= client->task_list; client->task;
  913. client->task= client->task->next)
  914. {
  915. if (client->task->state != GEARMAN_TASK_STATE_NEW)
  916. continue;
  917. case GEARMAN_CLIENT_STATE_NEW:
  918. gearman_return_t local_ret= _client_run_task(client, client->task);
  919. if (gearman_failed(ret) and local_ret != GEARMAN_IO_WAIT)
  920. {
  921. client->state= GEARMAN_CLIENT_STATE_NEW;
  922. return local_ret;
  923. }
  924. }
  925. if (client->new_tasks == 0)
  926. {
  927. gearman_return_t local_ret= gearman_flush_all(&client->universal);
  928. if (gearman_failed(local_ret))
  929. {
  930. return local_ret;
  931. }
  932. }
  933. }
  934. /* See if there are any connections ready for I/O. */
  935. while ((client->con= gearman_ready(&client->universal)))
  936. {
  937. if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
  938. {
  939. /* Socket is ready for writing, continue submitting jobs. */
  940. for (client->task= client->task_list; client->task;
  941. client->task= client->task->next)
  942. {
  943. if (client->task->con != client->con ||
  944. (client->task->state != GEARMAN_TASK_STATE_SUBMIT &&
  945. client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
  946. {
  947. continue;
  948. }
  949. case GEARMAN_CLIENT_STATE_SUBMIT:
  950. gearman_return_t local_ret= _client_run_task(client, client->task);
  951. if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
  952. {
  953. client->state= GEARMAN_CLIENT_STATE_SUBMIT;
  954. return local_ret;
  955. }
  956. }
  957. }
  958. if (not (client->con->revents & POLLIN))
  959. continue;
  960. /* Socket is ready for reading. */
  961. while (1)
  962. {
  963. /* Read packet on connection and find which task it belongs to. */
  964. if (client->options.unbuffered_result)
  965. {
  966. /* If client is handling the data read, make sure it's complete. */
  967. if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
  968. {
  969. for (client->task= client->task_list; client->task;
  970. client->task= client->task->next)
  971. {
  972. if (client->task->con == client->con &&
  973. (client->task->state == GEARMAN_TASK_STATE_DATA ||
  974. client->task->state == GEARMAN_TASK_STATE_COMPLETE))
  975. {
  976. break;
  977. }
  978. }
  979. assert(client->task);
  980. }
  981. else
  982. {
  983. /* Read the next packet, without buffering the data part. */
  984. client->task= NULL;
  985. (void)gearman_connection_recv(client->con, &(client->con->packet), &ret, false);
  986. }
  987. }
  988. else
  989. {
  990. /* Read the next packet, buffering the data part. */
  991. client->task= NULL;
  992. (void)gearman_connection_recv(client->con, &(client->con->packet), &ret, true);
  993. }
  994. if (client->task == NULL)
  995. {
  996. assert(ret != GEARMAN_MAX_RETURN);
  997. /* Check the return of the gearman_connection_recv() calls above. */
  998. if (gearman_failed(ret))
  999. {
  1000. if (ret == GEARMAN_IO_WAIT)
  1001. break;
  1002. client->state= GEARMAN_CLIENT_STATE_IDLE;
  1003. return ret;
  1004. }
  1005. client->con->options.packet_in_use= true;
  1006. /* We have a packet, see which task it belongs to. */
  1007. for (client->task= client->task_list; client->task;
  1008. client->task= client->task->next)
  1009. {
  1010. if (client->task->con != client->con)
  1011. continue;
  1012. if (client->con->packet.command == GEARMAN_COMMAND_JOB_CREATED)
  1013. {
  1014. if (client->task->created_id != client->con->created_id)
  1015. continue;
  1016. /* New job created, drop through below and notify task. */
  1017. client->con->created_id++;
  1018. }
  1019. else if (client->con->packet.command == GEARMAN_COMMAND_ERROR)
  1020. {
  1021. gearman_universal_set_error(&client->universal,
  1022. GEARMAN_SERVER_ERROR,
  1023. "gearman_client_run_tasks",
  1024. "%s:%.*s",
  1025. static_cast<char *>(client->con->packet.arg[0]),
  1026. int(client->con->packet.arg_size[1]),
  1027. static_cast<char *>(client->con->packet.arg[1]));
  1028. return GEARMAN_SERVER_ERROR;
  1029. }
  1030. else if (strncmp(client->task->job_handle,
  1031. static_cast<char *>(client->con->packet.arg[0]),
  1032. client->con->packet.arg_size[0]) ||
  1033. (client->con->packet.command != GEARMAN_COMMAND_WORK_FAIL &&
  1034. strlen(client->task->job_handle) != client->con->packet.arg_size[0] - 1) ||
  1035. (client->con->packet.command == GEARMAN_COMMAND_WORK_FAIL &&
  1036. strlen(client->task->job_handle) != client->con->packet.arg_size[0]))
  1037. {
  1038. continue;
  1039. }
  1040. /* Else, we have a matching result packet of some kind. */
  1041. break;
  1042. }
  1043. if (client->task == NULL)
  1044. {
  1045. /* The client has stopped waiting for the response, ignore it. */
  1046. gearman_packet_free(&(client->con->packet));
  1047. client->con->options.packet_in_use= false;
  1048. continue;
  1049. }
  1050. client->task->recv= &(client->con->packet);
  1051. }
  1052. case GEARMAN_CLIENT_STATE_PACKET:
  1053. /* Let task process job created or result packet. */
  1054. gearman_return_t local_ret= _client_run_task(client, client->task);
  1055. if (local_ret == GEARMAN_IO_WAIT)
  1056. break;
  1057. if (gearman_failed(local_ret))
  1058. {
  1059. client->state= GEARMAN_CLIENT_STATE_PACKET;
  1060. return local_ret;
  1061. }
  1062. /* Clean up the packet. */
  1063. gearman_packet_free(&(client->con->packet));
  1064. client->con->options.packet_in_use= false;
  1065. /* If all tasks are done, return. */
  1066. if (client->running_tasks == 0)
  1067. break;
  1068. }
  1069. }
  1070. /* If all tasks are done, return. */
  1071. if (client->running_tasks == 0)
  1072. {
  1073. break;
  1074. }
  1075. if (client->new_tasks > 0 && ! (client->options.no_new))
  1076. continue;
  1077. if (client->options.non_blocking)
  1078. {
  1079. /* Let the caller wait for activity. */
  1080. client->state= GEARMAN_CLIENT_STATE_IDLE;
  1081. return GEARMAN_IO_WAIT;
  1082. }
  1083. /* Wait for activity on one of the connections. */
  1084. gearman_return_t local_ret= gearman_wait(&client->universal);
  1085. if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
  1086. {
  1087. client->state= GEARMAN_CLIENT_STATE_IDLE;
  1088. return ret;
  1089. }
  1090. }
  1091. break;
  1092. }
  1093. client->state= GEARMAN_CLIENT_STATE_IDLE;
  1094. return GEARMAN_SUCCESS;
  1095. }
  1096. gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
  1097. {
  1098. if (not client)
  1099. {
  1100. return GEARMAN_INVALID_ARGUMENT;
  1101. }
  1102. if (not client->task_list)
  1103. {
  1104. gearman_error(&client->universal, GEARMAN_INVALID_ARGUMENT, "No active tasks");
  1105. return GEARMAN_INVALID_ARGUMENT;
  1106. }
  1107. _push_non_blocking(client);
  1108. gearman_return_t rc= _client_run_tasks(client);
  1109. _pop_non_blocking(client);
  1110. if (gearman_failed(rc))
  1111. {
  1112. assert(gearman_universal_error_code(&client->universal) == rc);
  1113. }
  1114. return rc;
  1115. }
  1116. /*
  1117. * Static Definitions
  1118. */
  1119. static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone)
  1120. {
  1121. if (client == NULL)
  1122. {
  1123. client= new (std::nothrow) gearman_client_st;
  1124. if (not client)
  1125. return NULL;
  1126. client->options.allocated= true;
  1127. }
  1128. else
  1129. {
  1130. client->options.allocated= false;
  1131. }
  1132. client->options.non_blocking= false;
  1133. client->options.unbuffered_result= false;
  1134. client->options.no_new= false;
  1135. client->options.free_tasks= false;
  1136. client->state= GEARMAN_CLIENT_STATE_IDLE;
  1137. client->new_tasks= 0;
  1138. client->running_tasks= 0;
  1139. client->task_count= 0;
  1140. client->context= NULL;
  1141. client->con= NULL;
  1142. client->task= NULL;
  1143. client->task_list= NULL;
  1144. client->task_context_free_fn= NULL;
  1145. gearman_client_clear_fn(client);
  1146. if (not is_clone)
  1147. {
  1148. gearman_universal_st *check;
  1149. check= gearman_universal_create(&client->universal, NULL);
  1150. if (check == NULL)
  1151. {
  1152. gearman_client_free(client);
  1153. return NULL;
  1154. }
  1155. }
  1156. return client;
  1157. }
  1158. static gearman_return_t _client_add_server(const char *host, in_port_t port,
  1159. void *context)
  1160. {
  1161. return gearman_client_add_server(static_cast<gearman_client_st *>(context), host, port);
  1162. }
  1163. static gearman_return_t _client_run_task(gearman_client_st *client, gearman_task_st *task)
  1164. {
  1165. switch(task->state)
  1166. {
  1167. case GEARMAN_TASK_STATE_NEW:
  1168. if (task->client->universal.con_list == NULL)
  1169. {
  1170. client->new_tasks--;
  1171. client->running_tasks--;
  1172. gearman_universal_set_error(&client->universal,
  1173. GEARMAN_NO_SERVERS,
  1174. "_client_run_task",
  1175. "no servers added");
  1176. return GEARMAN_NO_SERVERS;
  1177. }
  1178. for (task->con= task->client->universal.con_list; task->con;
  1179. task->con= task->con->next)
  1180. {
  1181. if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
  1182. break;
  1183. }
  1184. if (task->con == NULL)
  1185. {
  1186. client->options.no_new= true;
  1187. return GEARMAN_IO_WAIT;
  1188. }
  1189. client->new_tasks--;
  1190. if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
  1191. {
  1192. task->created_id= task->con->created_id_next;
  1193. task->con->created_id_next++;
  1194. }
  1195. case GEARMAN_TASK_STATE_SUBMIT:
  1196. while (1)
  1197. {
  1198. gearman_return_t ret;
  1199. ret= gearman_connection_send(task->con, &(task->send),
  1200. client->new_tasks == 0 ? true : false);
  1201. if (gearman_success(ret))
  1202. {
  1203. break;
  1204. }
  1205. else if (ret == GEARMAN_IO_WAIT)
  1206. {
  1207. task->state= GEARMAN_TASK_STATE_SUBMIT;
  1208. return GEARMAN_IO_WAIT;
  1209. }
  1210. else if (ret != GEARMAN_SUCCESS)
  1211. {
  1212. /* Increment this since the job submission failed. */
  1213. task->con->created_id++;
  1214. if (ret == GEARMAN_COULD_NOT_CONNECT)
  1215. {
  1216. for (task->con= task->con->next; task->con;
  1217. task->con= task->con->next)
  1218. {
  1219. if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
  1220. break;
  1221. }
  1222. }
  1223. else
  1224. {
  1225. task->con= NULL;
  1226. }
  1227. if (task->con == NULL)
  1228. {
  1229. client->running_tasks--;
  1230. return ret;
  1231. }
  1232. if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
  1233. {
  1234. task->created_id= task->con->created_id_next;
  1235. task->con->created_id_next++;
  1236. }
  1237. }
  1238. }
  1239. if (task->send.data_size > 0 && task->send.data == NULL)
  1240. {
  1241. if (not task->func.workload_fn)
  1242. {
  1243. gearman_universal_set_error(&client->universal,
  1244. GEARMAN_NEED_WORKLOAD_FN,
  1245. "_client_run_task",
  1246. "workload size > 0, but no data pointer or workload_fn was given");
  1247. return GEARMAN_NEED_WORKLOAD_FN;
  1248. }
  1249. case GEARMAN_TASK_STATE_WORKLOAD:
  1250. gearman_return_t ret= task->func.workload_fn(task);
  1251. if (gearman_failed(ret))
  1252. {
  1253. task->state= GEARMAN_TASK_STATE_WORKLOAD;
  1254. return ret;
  1255. }
  1256. }
  1257. client->options.no_new= false;
  1258. task->state= GEARMAN_TASK_STATE_WORK;
  1259. return gearman_connection_set_events(task->con, POLLIN);
  1260. case GEARMAN_TASK_STATE_WORK:
  1261. if (task->recv->command == GEARMAN_COMMAND_JOB_CREATED)
  1262. {
  1263. snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
  1264. int(task->recv->arg_size[0]),
  1265. static_cast<char *>(task->recv->arg[0]));
  1266. case GEARMAN_TASK_STATE_CREATED:
  1267. if (task->func.created_fn)
  1268. {
  1269. gearman_return_t ret= task->func.created_fn(task);
  1270. if (gearman_failed(ret))
  1271. {
  1272. task->state= GEARMAN_TASK_STATE_CREATED;
  1273. return ret;
  1274. }
  1275. }
  1276. if (task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
  1277. task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
  1278. task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG ||
  1279. task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH ||
  1280. task->send.command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND)
  1281. {
  1282. break;
  1283. }
  1284. }
  1285. else if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
  1286. {
  1287. case GEARMAN_TASK_STATE_DATA:
  1288. if (task->func.data_fn)
  1289. {
  1290. gearman_return_t ret= task->func.data_fn(task);
  1291. if (gearman_failed(ret))
  1292. {
  1293. task->state= GEARMAN_TASK_STATE_DATA;
  1294. return ret;
  1295. }
  1296. }
  1297. }
  1298. else if (task->recv->command == GEARMAN_COMMAND_WORK_WARNING)
  1299. {
  1300. case GEARMAN_TASK_STATE_WARNING:
  1301. if (task->func.warning_fn)
  1302. {
  1303. gearman_return_t ret= task->func.warning_fn(task);
  1304. if (gearman_failed(ret))
  1305. {
  1306. task->state= GEARMAN_TASK_STATE_WARNING;
  1307. return ret;
  1308. }
  1309. }
  1310. }
  1311. else if (task->recv->command == GEARMAN_COMMAND_WORK_STATUS ||
  1312. task->recv->command == GEARMAN_COMMAND_STATUS_RES)
  1313. {
  1314. uint8_t x;
  1315. if (task->recv->command == GEARMAN_COMMAND_STATUS_RES)
  1316. {
  1317. if (atoi(static_cast<char *>(task->recv->arg[1])) == 0)
  1318. task->options.is_known= false;
  1319. else
  1320. task->options.is_known= true;
  1321. if (atoi(static_cast<char *>(task->recv->arg[2])) == 0)
  1322. task->options.is_running= false;
  1323. else
  1324. task->options.is_running= true;
  1325. x= 3;
  1326. }
  1327. else
  1328. {
  1329. x= 1;
  1330. }
  1331. task->numerator= uint32_t(atoi(static_cast<char *>(task->recv->arg[x])));
  1332. char status_buffer[11]; /* Max string size to hold a uint32_t. */
  1333. snprintf(status_buffer, 11, "%.*s",
  1334. int(task->recv->arg_size[x + 1]),
  1335. static_cast<char *>(task->recv->arg[x + 1]));
  1336. task->denominator= uint32_t(atoi(status_buffer));
  1337. case GEARMAN_TASK_STATE_STATUS:
  1338. if (task->func.status_fn)
  1339. {
  1340. gearman_return_t ret= task->func.status_fn(task);
  1341. if (gearman_failed(ret))
  1342. {
  1343. task->state= GEARMAN_TASK_STATE_STATUS;
  1344. return ret;
  1345. }
  1346. }
  1347. if (task->send.command == GEARMAN_COMMAND_GET_STATUS)
  1348. break;
  1349. }
  1350. else if (task->recv->command == GEARMAN_COMMAND_WORK_COMPLETE)
  1351. {
  1352. task->result_rc= GEARMAN_SUCCESS;
  1353. case GEARMAN_TASK_STATE_COMPLETE:
  1354. if (task->func.complete_fn)
  1355. {
  1356. gearman_return_t ret= task->func.complete_fn(task);
  1357. if (gearman_failed(ret))
  1358. {
  1359. task->state= GEARMAN_TASK_STATE_COMPLETE;
  1360. return ret;
  1361. }
  1362. }
  1363. if (task->reducer.each_fn and not task->options.was_reduced)
  1364. {
  1365. task->reducer.each_fn(task, task->context);
  1366. task->options.was_reduced= true;
  1367. }
  1368. break;
  1369. }
  1370. else if (task->recv->command == GEARMAN_COMMAND_WORK_EXCEPTION)
  1371. {
  1372. case GEARMAN_TASK_STATE_EXCEPTION:
  1373. if (task->func.exception_fn)
  1374. {
  1375. gearman_return_t ret= task->func.exception_fn(task);
  1376. if (gearman_failed(ret))
  1377. {
  1378. task->state= GEARMAN_TASK_STATE_EXCEPTION;
  1379. return ret;
  1380. }
  1381. }
  1382. }
  1383. else if (task->recv->command == GEARMAN_COMMAND_WORK_FAIL)
  1384. {
  1385. // If things fail we need to delete the result, and set the result_rc
  1386. // correctly.
  1387. delete task->result_ptr;
  1388. task->result_ptr= NULL;
  1389. task->result_rc= GEARMAN_WORK_FAIL;
  1390. case GEARMAN_TASK_STATE_FAIL:
  1391. if (task->func.fail_fn)
  1392. {
  1393. gearman_return_t ret= task->func.fail_fn(task);
  1394. if (gearman_failed(ret))
  1395. {
  1396. task->state= GEARMAN_TASK_STATE_FAIL;
  1397. return ret;
  1398. }
  1399. }
  1400. break;
  1401. }
  1402. task->state= GEARMAN_TASK_STATE_WORK;
  1403. return GEARMAN_SUCCESS;
  1404. case GEARMAN_TASK_STATE_FINISHED:
  1405. break;
  1406. }
  1407. client->running_tasks--;
  1408. task->state= GEARMAN_TASK_STATE_FINISHED;
  1409. if (client->options.free_tasks)
  1410. {
  1411. gearman_task_free(task);
  1412. }
  1413. return GEARMAN_SUCCESS;
  1414. }
  1415. static void *_client_do(gearman_client_st *client, gearman_command_t command,
  1416. gearman_string_t &function,
  1417. gearman_unique_t &unique,
  1418. const void *workload_str, size_t workload_size,
  1419. size_t *result_size, gearman_return_t *ret_ptr)
  1420. {
  1421. gearman_task_st do_task, *do_task_ptr;
  1422. gearman_client_task_free_all(client);
  1423. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  1424. do_task_ptr= add_task(client, &do_task, NULL, command,
  1425. function,
  1426. unique,
  1427. workload,
  1428. 0);
  1429. if (not do_task_ptr)
  1430. {
  1431. if (ret_ptr)
  1432. *ret_ptr= gearman_universal_error_code(&client->universal);
  1433. return NULL;
  1434. }
  1435. do_task_ptr->func= gearman_actions_do_default();
  1436. gearman_return_t ret= gearman_client_run_tasks(client);
  1437. const void *returnable= NULL;
  1438. // gearman_client_run_tasks failed
  1439. if (gearman_failed(ret))
  1440. {
  1441. gearman_error(&client->universal, ret, "occured during gearman_client_run_tasks()");
  1442. if (ret_ptr)
  1443. *ret_ptr= ret;
  1444. *result_size= 0;
  1445. }
  1446. else if (ret == GEARMAN_SUCCESS and do_task_ptr->result_rc == GEARMAN_SUCCESS)
  1447. {
  1448. *ret_ptr= do_task_ptr->result_rc;
  1449. assert(do_task_ptr);
  1450. if (do_task_ptr->result_ptr)
  1451. {
  1452. gearman_string_t result= gearman_result_take_string(do_task_ptr->result_ptr);
  1453. *result_size= gearman_size(result);
  1454. returnable= gearman_c_str(result);
  1455. }
  1456. else // NULL job
  1457. {
  1458. *result_size= 0;
  1459. }
  1460. }
  1461. else // gearman_client_run_tasks() was successful, but the task was not
  1462. {
  1463. gearman_error(&client->universal, do_task_ptr->result_rc, "occured during gearman_client_run_tasks()");
  1464. *ret_ptr= do_task_ptr->result_rc;
  1465. *result_size= 0;
  1466. }
  1467. assert(client->task_list);
  1468. gearman_task_free(&do_task);
  1469. client->new_tasks= 0;
  1470. client->running_tasks= 0;
  1471. return const_cast<void *>(returnable);
  1472. }
  1473. static gearman_return_t _client_do_background(gearman_client_st *client,
  1474. gearman_command_t command,
  1475. gearman_string_t &function,
  1476. gearman_unique_t &unique,
  1477. gearman_string_t &workload,
  1478. char *job_handle)
  1479. {
  1480. gearman_task_st do_task, *do_task_ptr;
  1481. do_task_ptr= add_task(client, &do_task,
  1482. client,
  1483. command,
  1484. function,
  1485. unique,
  1486. workload,
  1487. 0);
  1488. if (not do_task_ptr)
  1489. {
  1490. return gearman_universal_error_code(&client->universal);
  1491. }
  1492. gearman_task_clear_fn(do_task_ptr);
  1493. gearman_return_t ret= gearman_client_run_tasks(client);
  1494. if (ret != GEARMAN_IO_WAIT)
  1495. {
  1496. if (job_handle)
  1497. {
  1498. strncpy(job_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
  1499. }
  1500. client->new_tasks= 0;
  1501. client->running_tasks= 0;
  1502. }
  1503. gearman_task_free(&do_task);
  1504. return ret;
  1505. }
  1506. bool gearman_client_compare(const gearman_client_st *first, const gearman_client_st *second)
  1507. {
  1508. if (not first || not second)
  1509. return false;
  1510. if (strcmp(first->universal.con_list->host, second->universal.con_list->host))
  1511. return false;
  1512. if (first->universal.con_list->port != second->universal.con_list->port)
  1513. return false;
  1514. return true;
  1515. }
  1516. bool gearman_client_set_server_option(gearman_client_st *self, const char *option_arg, size_t option_arg_size)
  1517. {
  1518. gearman_string_t option= { option_arg, option_arg_size };
  1519. return gearman_request_option(self->universal, option);
  1520. }