client.cc 44 KB


  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. #include <libgearman/common.h>
  39. #include <arpa/inet.h>
  40. #include <cassert>
  41. #include <cerrno>
  42. #include <cstdio>
  43. #include <cstdlib>
  44. #include <cstring>
  45. #include <memory>
  46. #include <netdb.h>
  47. #include <netinet/in.h>
  48. #include <sys/socket.h>
  49. #include <poll.h>
  50. #include <libgearman/visibility.h>
  51. #include <libgearman/constants.h>
  52. #include <libgearman/return.h>
  53. #include <libgearman/job_handle.h>
  54. #include <libgearman/actions.h>
  55. #include <libgearman/unique.h>
  56. #include <libgearman/universal.h>
  57. #include <libgearman/universal.hpp>
  58. #include <libgearman/allocator.hpp>
  59. #include <libgearman/string.h>
  60. #include <libgearman/allocator.hpp>
  61. #include <libgearman/client.h>
  62. #include <libgearman/packet.hpp>
  63. #include <libgearman/connection.hpp>
  64. #include <libgearman/add.hpp>
  65. #include <libgearman/connection.h>
  66. #include <libgearman/task.h>
  67. #include <libgearman/packet.hpp>
  68. #include <libgearman/run.hpp>
  69. #include <libgearman/parse.h>
  70. #include <libgearman/result.h>
  71. /** Allocate a client structure.
  72. */
  73. static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone);
  74. /**
  75. * Callback function used when parsing server lists.
  76. */
  77. static gearman_return_t _client_add_server(const char *host, in_port_t port,
  78. void *context);
  79. /**
  80. * Real do function.
  81. */
  82. static void *_client_do(gearman_client_st *client, gearman_command_t command,
  83. const char *function_name,
  84. const char *unique,
  85. const void *workload, size_t workload_size,
  86. size_t *result_size, gearman_return_t *ret_ptr);
  87. /**
  88. * Real background do function.
  89. */
  90. static gearman_return_t _client_do_background(gearman_client_st *client,
  91. gearman_command_t command,
  92. gearman_string_t &function,
  93. gearman_unique_t &unique,
  94. gearman_string_t &workload,
  95. gearman_job_handle_t job_handle);
  96. /*
  97. * Public Definitions
  98. */
  99. gearman_client_st *gearman_client_create(gearman_client_st *client)
  100. {
  101. return _client_allocate(client, false);
  102. }
  103. gearman_client_st *gearman_client_clone(gearman_client_st *client,
  104. const gearman_client_st *from)
  105. {
  106. if (not from)
  107. {
  108. return _client_allocate(client, false);
  109. }
  110. client= _client_allocate(client, true);
  111. if (not client)
  112. {
  113. return client;
  114. }
  115. client->options.non_blocking= from->options.non_blocking;
  116. client->options.unbuffered_result= from->options.unbuffered_result;
  117. client->options.no_new= from->options.no_new;
  118. client->options.free_tasks= from->options.free_tasks;
  119. client->actions= from->actions;
  120. client->_do_handle[0]= 0;
  121. gearman_universal_clone(client->universal, from->universal);
  122. return client;
  123. }
  124. void gearman_client_free(gearman_client_st *client)
  125. {
  126. if (not client)
  127. return;
  128. gearman_client_task_free_all(client);
  129. gearman_universal_free(client->universal);
  130. if (client->options.allocated)
  131. delete client;
  132. }
  133. const char *gearman_client_error(const gearman_client_st *client)
  134. {
  135. if (not client)
  136. return NULL;
  137. return gearman_universal_error(client->universal);
  138. }
  139. int gearman_client_errno(const gearman_client_st *client)
  140. {
  141. if (not client)
  142. return 0;
  143. return gearman_universal_errno(client->universal);
  144. }
  145. gearman_client_options_t gearman_client_options(const gearman_client_st *client)
  146. {
  147. int32_t options;
  148. memset(&options, 0, sizeof(int32_t));
  149. if (client->options.allocated)
  150. options|= int(GEARMAN_CLIENT_ALLOCATED);
  151. if (client->options.non_blocking)
  152. options|= int(GEARMAN_CLIENT_NON_BLOCKING);
  153. if (client->options.unbuffered_result)
  154. options|= int(GEARMAN_CLIENT_UNBUFFERED_RESULT);
  155. if (client->options.no_new)
  156. options|= int(GEARMAN_CLIENT_NO_NEW);
  157. if (client->options.free_tasks)
  158. options|= int(GEARMAN_CLIENT_FREE_TASKS);
  159. return gearman_client_options_t(options);
  160. }
  161. bool gearman_client_has_option(gearman_client_st *client,
  162. gearman_client_options_t option)
  163. {
  164. if (not client)
  165. return false;
  166. switch (option)
  167. {
  168. case GEARMAN_CLIENT_ALLOCATED:
  169. return client->options.allocated;
  170. case GEARMAN_CLIENT_NON_BLOCKING:
  171. return client->options.non_blocking;
  172. case GEARMAN_CLIENT_UNBUFFERED_RESULT:
  173. return client->options.unbuffered_result;
  174. case GEARMAN_CLIENT_NO_NEW:
  175. return client->options.no_new;
  176. case GEARMAN_CLIENT_FREE_TASKS:
  177. return client->options.free_tasks;
  178. default:
  179. case GEARMAN_CLIENT_TASK_IN_USE:
  180. case GEARMAN_CLIENT_MAX:
  181. return false;
  182. }
  183. }
  184. void gearman_client_set_options(gearman_client_st *client,
  185. gearman_client_options_t options)
  186. {
  187. if (not client)
  188. return;
  189. gearman_client_options_t usable_options[]= {
  190. GEARMAN_CLIENT_NON_BLOCKING,
  191. GEARMAN_CLIENT_UNBUFFERED_RESULT,
  192. GEARMAN_CLIENT_FREE_TASKS,
  193. GEARMAN_CLIENT_MAX
  194. };
  195. gearman_client_options_t *ptr;
  196. for (ptr= usable_options; *ptr != GEARMAN_CLIENT_MAX ; ptr++)
  197. {
  198. if (options & *ptr)
  199. {
  200. gearman_client_add_options(client, *ptr);
  201. }
  202. else
  203. {
  204. gearman_client_remove_options(client, *ptr);
  205. }
  206. }
  207. }
  208. void gearman_client_add_options(gearman_client_st *client,
  209. gearman_client_options_t options)
  210. {
  211. if (not client)
  212. return;
  213. if (options & GEARMAN_CLIENT_NON_BLOCKING)
  214. {
  215. gearman_universal_add_options(client->universal, GEARMAN_NON_BLOCKING);
  216. client->options.non_blocking= true;
  217. }
  218. if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
  219. {
  220. client->options.unbuffered_result= true;
  221. }
  222. if (options & GEARMAN_CLIENT_FREE_TASKS)
  223. {
  224. client->options.free_tasks= true;
  225. }
  226. }
  227. void gearman_client_remove_options(gearman_client_st *client,
  228. gearman_client_options_t options)
  229. {
  230. if (not client)
  231. return;
  232. if (options & GEARMAN_CLIENT_NON_BLOCKING)
  233. {
  234. gearman_universal_remove_options(client->universal, GEARMAN_NON_BLOCKING);
  235. client->options.non_blocking= false;
  236. }
  237. if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
  238. {
  239. client->options.unbuffered_result= false;
  240. }
  241. if (options & GEARMAN_CLIENT_FREE_TASKS)
  242. {
  243. client->options.free_tasks= false;
  244. }
  245. }
  246. int gearman_client_timeout(gearman_client_st *client)
  247. {
  248. return gearman_universal_timeout(client->universal);
  249. }
  250. void gearman_client_set_timeout(gearman_client_st *client, int timeout)
  251. {
  252. if (not client)
  253. return;
  254. gearman_universal_set_timeout(client->universal, timeout);
  255. }
  256. void *gearman_client_context(const gearman_client_st *client)
  257. {
  258. if (not client)
  259. return NULL;
  260. return const_cast<void *>(client->context);
  261. }
  262. void gearman_client_set_context(gearman_client_st *client, void *context)
  263. {
  264. if (not client)
  265. return;
  266. client->context= context;
  267. }
  268. void gearman_client_set_log_fn(gearman_client_st *client,
  269. gearman_log_fn *function, void *context,
  270. gearman_verbose_t verbose)
  271. {
  272. if (not client)
  273. return;
  274. gearman_set_log_fn(client->universal, function, context, verbose);
  275. }
  276. void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
  277. gearman_malloc_fn *function,
  278. void *context)
  279. {
  280. if (not client)
  281. return;
  282. gearman_set_workload_malloc_fn(client->universal, function, context);
  283. }
  284. void gearman_client_set_workload_free_fn(gearman_client_st *client, gearman_free_fn *function, void *context)
  285. {
  286. if (not client)
  287. return;
  288. gearman_set_workload_free_fn(client->universal, function, context);
  289. }
  290. gearman_return_t gearman_client_add_server(gearman_client_st *client,
  291. const char *host, in_port_t port)
  292. {
  293. if (not client)
  294. {
  295. return GEARMAN_INVALID_ARGUMENT;
  296. }
  297. if (not gearman_connection_create_args(client->universal, host, port))
  298. {
  299. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  300. }
  301. return GEARMAN_SUCCESS;
  302. }
  303. gearman_return_t gearman_client_add_servers(gearman_client_st *client,
  304. const char *servers)
  305. {
  306. return gearman_parse_servers(servers, _client_add_server, client);
  307. }
  308. void gearman_client_remove_servers(gearman_client_st *client)
  309. {
  310. if (not client)
  311. return;
  312. gearman_free_all_cons(client->universal);
  313. }
  314. gearman_return_t gearman_client_wait(gearman_client_st *client)
  315. {
  316. if (not client)
  317. return GEARMAN_INVALID_ARGUMENT;
  318. return gearman_wait(client->universal);
  319. }
  320. void *gearman_client_do(gearman_client_st *client,
  321. const char *function,
  322. const char *unique,
  323. const void *workload,
  324. size_t workload_size, size_t *result_size,
  325. gearman_return_t *ret_ptr)
  326. {
  327. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB,
  328. function,
  329. unique,
  330. workload, workload_size,
  331. result_size, ret_ptr);
  332. }
  333. void *gearman_client_do_high(gearman_client_st *client,
  334. const char *function,
  335. const char *unique,
  336. const void *workload, size_t workload_size,
  337. size_t *result_size, gearman_return_t *ret_ptr)
  338. {
  339. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
  340. function,
  341. unique,
  342. workload, workload_size,
  343. result_size, ret_ptr);
  344. }
  345. void *gearman_client_do_low(gearman_client_st *client,
  346. const char *function,
  347. const char *unique,
  348. const void *workload, size_t workload_size,
  349. size_t *result_size, gearman_return_t *ret_ptr)
  350. {
  351. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
  352. function,
  353. unique,
  354. workload, workload_size,
  355. result_size, ret_ptr);
  356. }
  357. size_t gearman_client_count_tasks(gearman_client_st *client)
  358. {
  359. if (not client)
  360. return 0;
  361. size_t count= 1;
  362. gearman_task_st *search= client->task_list;
  363. while ((search= search->next))
  364. {
  365. count++;
  366. }
  367. return count;
  368. }
  369. #if 0
  370. static bool _active_tasks(gearman_client_st *client)
  371. {
  372. assert(client);
  373. gearman_task_st *search= client->task_list;
  374. if (not search)
  375. return false;
  376. do
  377. {
  378. if (gearman_task_is_active(search))
  379. {
  380. return true;
  381. }
  382. } while ((search= search->next));
  383. return false;
  384. }
  385. #endif
  386. const char *gearman_client_do_job_handle(gearman_client_st *self)
  387. {
  388. if (not self)
  389. {
  390. errno= EINVAL;
  391. return NULL;
  392. }
  393. return self->_do_handle;
  394. }
  395. void gearman_client_do_status(gearman_client_st *, uint32_t *numerator, uint32_t *denominator)
  396. {
  397. if (numerator)
  398. *numerator= 0;
  399. if (denominator)
  400. *denominator= 0;
  401. }
  402. gearman_return_t gearman_client_do_background(gearman_client_st *client,
  403. const char *function_name,
  404. const char *unique,
  405. const void *workload_str,
  406. size_t workload_size,
  407. gearman_job_handle_t job_handle)
  408. {
  409. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  410. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  411. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  412. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
  413. function,
  414. local_unique,
  415. workload,
  416. job_handle);
  417. }
  418. gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
  419. const char *function_name,
  420. const char *unique,
  421. const void *workload_str,
  422. size_t workload_size,
  423. gearman_job_handle_t job_handle)
  424. {
  425. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  426. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  427. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  428. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
  429. function,
  430. local_unique,
  431. workload,
  432. job_handle);
  433. }
  434. gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
  435. const char *function_name,
  436. const char *unique,
  437. const void *workload_str,
  438. size_t workload_size,
  439. gearman_job_handle_t job_handle)
  440. {
  441. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  442. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  443. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  444. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
  445. function,
  446. local_unique,
  447. workload,
  448. job_handle);
  449. }
  450. gearman_return_t gearman_client_job_status(gearman_client_st *client,
  451. const gearman_job_handle_t job_handle,
  452. bool *is_known, bool *is_running,
  453. uint32_t *numerator,
  454. uint32_t *denominator)
  455. {
  456. gearman_return_t ret;
  457. if (not client)
  458. return GEARMAN_INVALID_ARGUMENT;
  459. gearman_task_st do_task;
  460. gearman_task_st *do_task_ptr= gearman_client_add_task_status(client, &do_task, client,
  461. job_handle, &ret);
  462. if (gearman_failed(ret))
  463. {
  464. return ret;
  465. }
  466. assert(do_task_ptr);
  467. do_task_ptr->type= GEARMAN_TASK_KIND_DO;
  468. gearman_task_clear_fn(do_task_ptr);
  469. do {
  470. ret= gearman_client_run_tasks(client);
  471. // If either of the following is ever true, we will end up in an
  472. // infinite loop
  473. assert(ret != GEARMAN_IN_PROGRESS and ret != GEARMAN_JOB_EXISTS);
  474. } while (gearman_continue(ret));
  475. // @note we don't know if our task was run or not, we just know something
  476. // happened.
  477. if (gearman_success(ret))
  478. {
  479. if (is_known)
  480. *is_known= do_task.options.is_known;
  481. if (is_running)
  482. *is_running= do_task.options.is_running;
  483. if (numerator)
  484. *numerator= do_task.numerator;
  485. if (denominator)
  486. *denominator= do_task.denominator;
  487. if (not is_known and not is_running)
  488. {
  489. if (do_task.options.is_running)
  490. {
  491. ret= GEARMAN_IN_PROGRESS;
  492. }
  493. else if (do_task.options.is_known)
  494. {
  495. ret= GEARMAN_JOB_EXISTS;
  496. }
  497. }
  498. }
  499. else
  500. {
  501. if (is_known)
  502. *is_known= false;
  503. if (is_running)
  504. *is_running= false;
  505. if (numerator)
  506. *numerator= 0;
  507. if (denominator)
  508. *denominator= 0;
  509. }
  510. gearman_task_free(do_task_ptr);
  511. return ret;
  512. }
  513. gearman_return_t gearman_client_echo(gearman_client_st *client,
  514. const void *workload,
  515. size_t workload_size)
  516. {
  517. if (not client)
  518. return GEARMAN_INVALID_ARGUMENT;
  519. return gearman_echo(client->universal, workload, workload_size);
  520. }
  521. void gearman_client_task_free_all(gearman_client_st *client)
  522. {
  523. if (not client)
  524. return;
  525. while (client->task_list)
  526. {
  527. gearman_task_free(client->task_list);
  528. }
  529. }
  530. void gearman_client_set_task_context_free_fn(gearman_client_st *client,
  531. gearman_task_context_free_fn *function)
  532. {
  533. if (not client)
  534. return;
  535. client->task_context_free_fn= function;
  536. }
  537. gearman_return_t gearman_client_set_memory_allocators(gearman_client_st *client,
  538. gearman_malloc_fn *malloc_fn,
  539. gearman_free_fn *free_fn,
  540. gearman_realloc_fn *realloc_fn,
  541. gearman_calloc_fn *calloc_fn,
  542. void *context)
  543. {
  544. if (not client)
  545. return GEARMAN_INVALID_ARGUMENT;
  546. return gearman_set_memory_allocator(client->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
  547. }
  548. gearman_task_st *gearman_client_add_task(gearman_client_st *client,
  549. gearman_task_st *task,
  550. void *context,
  551. const char *function,
  552. const char *unique,
  553. const void *workload, size_t workload_size,
  554. gearman_return_t *ret_ptr)
  555. {
  556. return add_task(client, task,
  557. context, GEARMAN_COMMAND_SUBMIT_JOB,
  558. function,
  559. unique,
  560. workload, workload_size,
  561. time_t(0),
  562. ret_ptr,
  563. client->actions);
  564. }
  565. gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
  566. gearman_task_st *task,
  567. void *context,
  568. const char *function,
  569. const char *unique,
  570. const void *workload, size_t workload_size,
  571. gearman_return_t *ret_ptr)
  572. {
  573. return add_task(client, task, context,
  574. GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
  575. function,
  576. unique,
  577. workload, workload_size,
  578. time_t(0),
  579. ret_ptr,
  580. client->actions);
  581. }
  582. gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
  583. gearman_task_st *task,
  584. void *context,
  585. const char *function,
  586. const char *unique,
  587. const void *workload, size_t workload_size,
  588. gearman_return_t *ret_ptr)
  589. {
  590. return add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
  591. function,
  592. unique,
  593. workload, workload_size,
  594. time_t(0),
  595. ret_ptr,
  596. client->actions);
  597. }
  598. gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
  599. gearman_task_st *task,
  600. void *context,
  601. const char *function,
  602. const char *unique,
  603. const void *workload, size_t workload_size,
  604. gearman_return_t *ret_ptr)
  605. {
  606. return add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
  607. function,
  608. unique,
  609. workload, workload_size,
  610. time_t(0),
  611. ret_ptr,
  612. client->actions);
  613. }
  614. gearman_task_st *
  615. gearman_client_add_task_high_background(gearman_client_st *client,
  616. gearman_task_st *task,
  617. void *context,
  618. const char *function,
  619. const char *unique,
  620. const void *workload, size_t workload_size,
  621. gearman_return_t *ret_ptr)
  622. {
  623. return add_task(client, task, context,
  624. GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
  625. function,
  626. unique,
  627. workload, workload_size,
  628. time_t(0),
  629. ret_ptr,
  630. client->actions);
  631. }
  632. gearman_task_st *
  633. gearman_client_add_task_low_background(gearman_client_st *client,
  634. gearman_task_st *task,
  635. void *context,
  636. const char *function,
  637. const char *unique,
  638. const void *workload, size_t workload_size,
  639. gearman_return_t *ret_ptr)
  640. {
  641. return add_task(client, task, context,
  642. GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
  643. function,
  644. unique,
  645. workload, workload_size,
  646. time_t(0),
  647. ret_ptr,
  648. client->actions);
  649. }
  650. gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
  651. gearman_task_st *task,
  652. void *context,
  653. const gearman_job_handle_t job_handle,
  654. gearman_return_t *ret_ptr)
  655. {
  656. const void *args[1];
  657. size_t args_size[1];
  658. gearman_return_t unused;
  659. if (not ret_ptr)
  660. ret_ptr= &unused;
  661. if (not (task= gearman_task_internal_create(client, task)))
  662. {
  663. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  664. return NULL;
  665. }
  666. task->context= context;
  667. snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
  668. args[0]= job_handle;
  669. args_size[0]= strlen(job_handle);
  670. gearman_return_t rc= gearman_packet_create_args(client->universal, task->send,
  671. GEARMAN_MAGIC_REQUEST,
  672. GEARMAN_COMMAND_GET_STATUS,
  673. args, args_size, 1);
  674. if (gearman_success(rc))
  675. {
  676. client->new_tasks++;
  677. client->running_tasks++;
  678. task->options.send_in_use= true;
  679. }
  680. *ret_ptr= rc;
  681. return task;
  682. }
  683. void gearman_client_set_workload_fn(gearman_client_st *client,
  684. gearman_workload_fn *function)
  685. {
  686. if (not client)
  687. return;
  688. client->actions.workload_fn= function;
  689. }
  690. void gearman_client_set_created_fn(gearman_client_st *client,
  691. gearman_created_fn *function)
  692. {
  693. if (not client)
  694. return;
  695. client->actions.created_fn= function;
  696. }
  697. void gearman_client_set_data_fn(gearman_client_st *client,
  698. gearman_data_fn *function)
  699. {
  700. if (not client)
  701. return;
  702. client->actions.data_fn= function;
  703. }
  704. void gearman_client_set_warning_fn(gearman_client_st *client,
  705. gearman_warning_fn *function)
  706. {
  707. if (not client)
  708. return;
  709. client->actions.warning_fn= function;
  710. }
  711. void gearman_client_set_status_fn(gearman_client_st *client,
  712. gearman_universal_status_fn *function)
  713. {
  714. if (not client)
  715. return;
  716. client->actions.status_fn= function;
  717. }
  718. void gearman_client_set_complete_fn(gearman_client_st *client,
  719. gearman_complete_fn *function)
  720. {
  721. if (not client)
  722. return;
  723. client->actions.complete_fn= function;
  724. }
  725. void gearman_client_set_exception_fn(gearman_client_st *client,
  726. gearman_exception_fn *function)
  727. {
  728. if (not client)
  729. return;
  730. client->actions.exception_fn= function;
  731. }
  732. void gearman_client_set_fail_fn(gearman_client_st *client,
  733. gearman_fail_fn *function)
  734. {
  735. if (not client)
  736. return;
  737. client->actions.fail_fn= function;
  738. }
  739. void gearman_client_clear_fn(gearman_client_st *client)
  740. {
  741. if (not client)
  742. return;
  743. client->actions= gearman_actions_default();
  744. }
  745. static inline void _push_non_blocking(gearman_client_st *client)
  746. {
  747. client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
  748. client->universal.options.non_blocking= true;
  749. }
  750. static inline void _pop_non_blocking(gearman_client_st *client)
  751. {
  752. client->universal.options.non_blocking= client->options.non_blocking;
  753. assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
  754. }
  755. static inline void _push_blocking(gearman_client_st *client)
  756. {
  757. client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
  758. client->universal.options.non_blocking= false;
  759. }
  760. static inline void _pop_blocking(gearman_client_st *client)
  761. {
  762. client->universal.options.non_blocking= client->options.non_blocking;
  763. assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
  764. }
  765. static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
  766. {
  767. gearman_return_t ret= GEARMAN_MAX_RETURN;
  768. switch(client->state)
  769. {
  770. case GEARMAN_CLIENT_STATE_IDLE:
  771. while (1)
  772. {
  773. /* Start any new tasks. */
  774. if (client->new_tasks > 0 && ! (client->options.no_new))
  775. {
  776. for (client->task= client->task_list; client->task;
  777. client->task= client->task->next)
  778. {
  779. if (client->task->state != GEARMAN_TASK_STATE_NEW)
  780. {
  781. continue;
  782. }
  783. case GEARMAN_CLIENT_STATE_NEW:
  784. gearman_return_t local_ret= _client_run_task(client, client->task);
  785. if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
  786. {
  787. client->state= GEARMAN_CLIENT_STATE_NEW;
  788. return local_ret;
  789. }
  790. }
  791. if (client->new_tasks == 0)
  792. {
  793. gearman_return_t local_ret= gearman_flush_all(client->universal);
  794. if (gearman_failed(local_ret))
  795. {
  796. return local_ret;
  797. }
  798. }
  799. }
  800. /* See if there are any connections ready for I/O. */
  801. while ((client->con= gearman_ready(client->universal)))
  802. {
  803. if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
  804. {
  805. /* Socket is ready for writing, continue submitting jobs. */
  806. for (client->task= client->task_list; client->task;
  807. client->task= client->task->next)
  808. {
  809. if (client->task->con != client->con or
  810. (client->task->state != GEARMAN_TASK_STATE_SUBMIT and
  811. client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
  812. {
  813. continue;
  814. }
  815. case GEARMAN_CLIENT_STATE_SUBMIT:
  816. gearman_return_t local_ret= _client_run_task(client, client->task);
  817. if (local_ret == GEARMAN_COULD_NOT_CONNECT)
  818. {
  819. client->state= GEARMAN_CLIENT_STATE_IDLE;
  820. return local_ret;
  821. }
  822. else if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
  823. {
  824. client->state= GEARMAN_CLIENT_STATE_SUBMIT;
  825. return local_ret;
  826. }
  827. }
  828. /* Connection errors are fatal. */
  829. if (client->con->revents & (POLLERR | POLLHUP | POLLNVAL))
  830. {
  831. gearman_error(client->universal, GEARMAN_LOST_CONNECTION, "detected lost connection in _client_run_tasks()");
  832. client->con->close();
  833. client->state= GEARMAN_CLIENT_STATE_IDLE;
  834. return GEARMAN_LOST_CONNECTION;
  835. }
  836. }
  837. if (not (client->con->revents & POLLIN))
  838. continue;
  839. /* Socket is ready for reading. */
  840. while (1)
  841. {
  842. /* Read packet on connection and find which task it belongs to. */
  843. if (client->options.unbuffered_result)
  844. {
  845. /* If client is handling the data read, make sure it's complete. */
  846. if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
  847. {
  848. for (client->task= client->task_list; client->task;
  849. client->task= client->task->next)
  850. {
  851. if (client->task->con == client->con &&
  852. (client->task->state == GEARMAN_TASK_STATE_DATA or
  853. client->task->state == GEARMAN_TASK_STATE_COMPLETE))
  854. {
  855. break;
  856. }
  857. }
  858. /*
  859. Someone has set GEARMAN_CLIENT_UNBUFFERED_RESULT but hasn't setup the client to fetch data correctly.
  860. Fatal error :(
  861. */
  862. return gearman_universal_set_error(client->universal, GEARMAN_INVALID_ARGUMENT, AT,
  863. "client created with GEARMAN_CLIENT_UNBUFFERED_RESULT, but was not setup to use it. %s", __func__);
  864. }
  865. else
  866. {
  867. /* Read the next packet, without buffering the data part. */
  868. client->task= NULL;
  869. (void)client->con->receiving(client->con->_packet, ret, false);
  870. }
  871. }
  872. else
  873. {
  874. /* Read the next packet, buffering the data part. */
  875. client->task= NULL;
  876. (void)client->con->receiving(client->con->_packet, ret, true);
  877. }
  878. if (client->task == NULL)
  879. {
  880. assert(ret != GEARMAN_MAX_RETURN);
  881. /* Check the return of the gearman_connection_recv() calls above. */
  882. if (gearman_failed(ret))
  883. {
  884. if (ret == GEARMAN_IO_WAIT)
  885. break;
  886. client->state= GEARMAN_CLIENT_STATE_IDLE;
  887. return ret;
  888. }
  889. client->con->options.packet_in_use= true;
  890. /* We have a packet, see which task it belongs to. */
  891. for (client->task= client->task_list; client->task;
  892. client->task= client->task->next)
  893. {
  894. if (client->task->con != client->con)
  895. continue;
  896. if (client->con->_packet.command == GEARMAN_COMMAND_JOB_CREATED)
  897. {
  898. if (client->task->created_id != client->con->created_id)
  899. continue;
  900. /* New job created, drop through below and notify task. */
  901. client->con->created_id++;
  902. }
  903. else if (client->con->_packet.command == GEARMAN_COMMAND_ERROR)
  904. {
  905. gearman_universal_set_error(client->universal, GEARMAN_SERVER_ERROR, AT,
  906. "%s:%.*s",
  907. static_cast<char *>(client->con->_packet.arg[0]),
  908. int(client->con->_packet.arg_size[1]),
  909. static_cast<char *>(client->con->_packet.arg[1]));
  910. /* Packet cleanup copied from "Clean up the packet" below, and must
  911. remain in sync with its reference. */
  912. gearman_packet_free(&(client->con->_packet));
  913. client->con->options.packet_in_use= false;
  914. /* This step copied from _client_run_tasks() above: */
  915. /* Increment this value because new job created then failed. */
  916. client->con->created_id++;
  917. return GEARMAN_SERVER_ERROR;
  918. }
  919. else if (strncmp(client->task->job_handle,
  920. static_cast<char *>(client->con->_packet.arg[0]),
  921. client->con->_packet.arg_size[0]) ||
  922. (client->con->_packet.command != GEARMAN_COMMAND_WORK_FAIL &&
  923. strlen(client->task->job_handle) != client->con->_packet.arg_size[0] - 1) ||
  924. (client->con->_packet.command == GEARMAN_COMMAND_WORK_FAIL &&
  925. strlen(client->task->job_handle) != client->con->_packet.arg_size[0]))
  926. {
  927. continue;
  928. }
  929. /* Else, we have a matching result packet of some kind. */
  930. break;
  931. }
  932. if (not client->task)
  933. {
  934. /* The client has stopped waiting for the response, ignore it. */
  935. gearman_packet_free(&(client->con->_packet));
  936. client->con->options.packet_in_use= false;
  937. continue;
  938. }
  939. client->task->recv= &(client->con->_packet);
  940. }
  941. case GEARMAN_CLIENT_STATE_PACKET:
  942. /* Let task process job created or result packet. */
  943. gearman_return_t local_ret= _client_run_task(client, client->task);
  944. if (local_ret == GEARMAN_IO_WAIT)
  945. break;
  946. if (gearman_failed(local_ret))
  947. {
  948. client->state= GEARMAN_CLIENT_STATE_PACKET;
  949. return local_ret;
  950. }
  951. /* Clean up the packet. */
  952. gearman_packet_free(&(client->con->_packet));
  953. client->con->options.packet_in_use= false;
  954. /* If all tasks are done, return. */
  955. if (client->running_tasks == 0)
  956. break;
  957. }
  958. }
  959. /* If all tasks are done, return. */
  960. if (client->running_tasks == 0)
  961. {
  962. break;
  963. }
  964. if (client->new_tasks > 0 && ! (client->options.no_new))
  965. continue;
  966. if (client->options.non_blocking)
  967. {
  968. /* Let the caller wait for activity. */
  969. client->state= GEARMAN_CLIENT_STATE_IDLE;
  970. gearman_gerror(client->universal, GEARMAN_IO_WAIT);
  971. return GEARMAN_IO_WAIT;
  972. }
  973. /* Wait for activity on one of the connections. */
  974. gearman_return_t local_ret= gearman_wait(client->universal);
  975. if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
  976. {
  977. client->state= GEARMAN_CLIENT_STATE_IDLE;
  978. return local_ret;
  979. }
  980. }
  981. break;
  982. }
  983. client->state= GEARMAN_CLIENT_STATE_IDLE;
  984. return GEARMAN_SUCCESS;
  985. }
  986. gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
  987. {
  988. if (not client)
  989. {
  990. return GEARMAN_INVALID_ARGUMENT;
  991. }
  992. if (not client->task_list) // We are immediatly successful if all tasks are completed
  993. {
  994. return GEARMAN_SUCCESS;
  995. }
  996. _push_non_blocking(client);
  997. gearman_return_t rc= _client_run_tasks(client);
  998. _pop_non_blocking(client);
  999. if (rc == GEARMAN_COULD_NOT_CONNECT)
  1000. {
  1001. gearman_reset(client->universal);
  1002. }
  1003. return rc;
  1004. }
  1005. gearman_return_t gearman_client_run_block_tasks(gearman_client_st *client)
  1006. {
  1007. if (not client)
  1008. {
  1009. return GEARMAN_INVALID_ARGUMENT;
  1010. }
  1011. if (not client->task_list) // We are immediatly successful if all tasks are completed
  1012. {
  1013. return GEARMAN_SUCCESS;
  1014. }
  1015. _push_blocking(client);
  1016. gearman_return_t rc= _client_run_tasks(client);
  1017. _pop_blocking(client);
  1018. if (gearman_failed(rc))
  1019. {
  1020. if (rc == GEARMAN_COULD_NOT_CONNECT)
  1021. {
  1022. gearman_reset(client->universal);
  1023. }
  1024. assert(gearman_universal_error_code(client->universal) == rc);
  1025. }
  1026. return rc;
  1027. }
  1028. /*
  1029. * Static Definitions
  1030. */
  1031. static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone)
  1032. {
  1033. if (client)
  1034. {
  1035. client->options.allocated= false;
  1036. }
  1037. else
  1038. {
  1039. client= new (std::nothrow) gearman_client_st;
  1040. if (not client)
  1041. return NULL;
  1042. client->options.allocated= true;
  1043. }
  1044. client->options.non_blocking= false;
  1045. client->options.unbuffered_result= false;
  1046. client->options.no_new= false;
  1047. client->options.free_tasks= false;
  1048. client->state= GEARMAN_CLIENT_STATE_IDLE;
  1049. client->new_tasks= 0;
  1050. client->running_tasks= 0;
  1051. client->task_count= 0;
  1052. client->context= NULL;
  1053. client->con= NULL;
  1054. client->task= NULL;
  1055. client->task_list= NULL;
  1056. client->task_context_free_fn= NULL;
  1057. gearman_client_clear_fn(client);
  1058. if (not is_clone)
  1059. {
  1060. gearman_universal_initialize(client->universal);
  1061. }
  1062. return client;
  1063. }
  1064. static gearman_return_t _client_add_server(const char *host, in_port_t port,
  1065. void *context)
  1066. {
  1067. return gearman_client_add_server(static_cast<gearman_client_st *>(context), host, port);
  1068. }
  1069. static void *_client_do(gearman_client_st *client, gearman_command_t command,
  1070. const char *function_name,
  1071. const char *unique,
  1072. const void *workload_str, size_t workload_size,
  1073. size_t *result_size, gearman_return_t *ret_ptr)
  1074. {
  1075. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  1076. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  1077. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  1078. gearman_return_t unused;
  1079. if (not ret_ptr)
  1080. ret_ptr= &unused;
  1081. size_t unused_size;
  1082. if (not result_size)
  1083. result_size= &unused_size;
  1084. if (not client)
  1085. {
  1086. *ret_ptr= GEARMAN_INVALID_ARGUMENT;
  1087. errno= EINVAL;
  1088. return NULL;
  1089. }
  1090. if (not function_name)
  1091. {
  1092. *ret_ptr= GEARMAN_INVALID_ARGUMENT;
  1093. return NULL;
  1094. }
  1095. gearman_task_st do_task;
  1096. gearman_task_st *do_task_ptr= add_task(client, &do_task, NULL, command,
  1097. function,
  1098. local_unique,
  1099. workload,
  1100. time_t(0),
  1101. gearman_actions_do_default());
  1102. if (not do_task_ptr)
  1103. {
  1104. *ret_ptr= gearman_universal_error_code(client->universal);
  1105. return NULL;
  1106. }
  1107. do_task_ptr->type= GEARMAN_TASK_KIND_DO;
  1108. gearman_return_t ret;
  1109. do {
  1110. ret= gearman_client_run_tasks(client);
  1111. } while (gearman_continue(ret));
  1112. // gearman_client_run_tasks failed
  1113. assert(client->task_list); // Programmer error, we should always have the task that we used for do
  1114. char *returnable= NULL;
  1115. if (gearman_failed(ret))
  1116. {
  1117. if (ret == GEARMAN_COULD_NOT_CONNECT)
  1118. { }
  1119. else
  1120. {
  1121. gearman_error(client->universal, ret, "occured during gearman_client_run_tasks()");
  1122. }
  1123. *ret_ptr= ret;
  1124. *result_size= 0;
  1125. }
  1126. else if (gearman_success(ret) and do_task_ptr->result_rc == GEARMAN_SUCCESS)
  1127. {
  1128. *ret_ptr= do_task_ptr->result_rc;
  1129. if (do_task_ptr->result_ptr)
  1130. {
  1131. if (gearman_has_allocator(client->universal))
  1132. {
  1133. gearman_string_t result= gearman_result_string(do_task_ptr->result_ptr);
  1134. returnable= static_cast<char *>(gearman_malloc(client->universal, gearman_size(result) +1));
  1135. if (not returnable)
  1136. {
  1137. gearman_error(client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "custom workload_fn failed to allocate memory");
  1138. *result_size= 0;
  1139. }
  1140. else // NULL terminate
  1141. {
  1142. memcpy(returnable, gearman_c_str(result), gearman_size(result));
  1143. returnable[gearman_size(result)]= 0;
  1144. *result_size= gearman_size(result);
  1145. }
  1146. }
  1147. else
  1148. {
  1149. gearman_string_t result= gearman_result_take_string(do_task_ptr->result_ptr);
  1150. *result_size= gearman_size(result);
  1151. returnable= const_cast<char *>(gearman_c_str(result));
  1152. }
  1153. }
  1154. else // NULL job
  1155. {
  1156. *result_size= 0;
  1157. }
  1158. }
  1159. else // gearman_client_run_tasks() was successful, but the task was not
  1160. {
  1161. gearman_error(client->universal, do_task_ptr->result_rc, "occured during gearman_client_run_tasks()");
  1162. *ret_ptr= do_task_ptr->result_rc;
  1163. *result_size= 0;
  1164. }
  1165. gearman_task_free(&do_task);
  1166. client->new_tasks= 0;
  1167. client->running_tasks= 0;
  1168. return returnable;
  1169. }
  1170. static gearman_return_t _client_do_background(gearman_client_st *client,
  1171. gearman_command_t command,
  1172. gearman_string_t &function,
  1173. gearman_unique_t &unique,
  1174. gearman_string_t &workload,
  1175. gearman_job_handle_t job_handle)
  1176. {
  1177. if (not client)
  1178. {
  1179. errno= EINVAL;
  1180. return GEARMAN_INVALID_ARGUMENT;
  1181. }
  1182. if (gearman_size(function) == 0)
  1183. {
  1184. return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "function arguement was empty");
  1185. }
  1186. client->_do_handle[0]= 0; // Reset the job_handle we store in client
  1187. gearman_task_st do_task, *do_task_ptr;
  1188. do_task_ptr= add_task(client, &do_task,
  1189. client,
  1190. command,
  1191. function,
  1192. unique,
  1193. workload,
  1194. time_t(0),
  1195. gearman_actions_do_default());
  1196. if (not do_task_ptr)
  1197. {
  1198. return gearman_universal_error_code(client->universal);
  1199. }
  1200. do_task_ptr->type= GEARMAN_TASK_KIND_DO;
  1201. gearman_return_t ret;
  1202. do {
  1203. ret= gearman_client_run_tasks(client);
  1204. // If either of the following is ever true, we will end up in an
  1205. // infinite loop
  1206. assert(ret != GEARMAN_IN_PROGRESS and ret != GEARMAN_JOB_EXISTS);
  1207. } while (gearman_continue(ret));
  1208. if (job_handle)
  1209. {
  1210. strncpy(job_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
  1211. }
  1212. strncpy(client->_do_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
  1213. client->new_tasks= 0;
  1214. client->running_tasks= 0;
  1215. gearman_task_free(&do_task);
  1216. return ret;
  1217. }
  1218. bool gearman_client_compare(const gearman_client_st *first, const gearman_client_st *second)
  1219. {
  1220. if (not first || not second)
  1221. return false;
  1222. if (strcmp(first->universal.con_list->host, second->universal.con_list->host))
  1223. return false;
  1224. if (first->universal.con_list->port != second->universal.con_list->port)
  1225. return false;
  1226. return true;
  1227. }
  1228. bool gearman_client_set_server_option(gearman_client_st *self, const char *option_arg, size_t option_arg_size)
  1229. {
  1230. if (not self)
  1231. return false;
  1232. gearman_string_t option= { option_arg, option_arg_size };
  1233. return gearman_request_option(self->universal, option);
  1234. }
  1235. void gearman_client_set_namespace(gearman_client_st *self, const char *namespace_key, size_t namespace_key_size)
  1236. {
  1237. if (not self)
  1238. return;
  1239. gearman_universal_set_namespace(self->universal, namespace_key, namespace_key_size);
  1240. }