client.cc 43 KB


  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. #include <libgearman/common.h>
  39. #include <arpa/inet.h>
  40. #include <cassert>
  41. #include <cerrno>
  42. #include <cstdio>
  43. #include <cstdlib>
  44. #include <cstring>
  45. #include <memory>
  46. #include <netdb.h>
  47. #include <netinet/in.h>
  48. #include <sys/socket.h>
  49. #include <poll.h>
  50. #include <libgearman/visibility.h>
  51. #include <libgearman/constants.h>
  52. #include <libgearman/return.h>
  53. #include <libgearman/job_handle.h>
  54. #include <libgearman/actions.h>
  55. #include <libgearman/unique.h>
  56. #include <libgearman/universal.h>
  57. #include <libgearman/universal.hpp>
  58. #include <libgearman/allocator.hpp>
  59. #include <libgearman/string.h>
  60. #include <libgearman/allocator.hpp>
  61. #include <libgearman/client.h>
  62. #include <libgearman/packet.hpp>
  63. #include <libgearman/connection.hpp>
  64. #include <libgearman/add.hpp>
  65. #include <libgearman/connection.h>
  66. #include <libgearman/task.h>
  67. #include <libgearman/packet.hpp>
  68. #include <libgearman/run.hpp>
  69. #include <libgearman/parse.h>
  70. #include <libgearman/result.h>
  71. /** Allocate a client structure.
  72. */
  73. static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone);
  74. /**
  75. * Callback function used when parsing server lists.
  76. */
  77. static gearman_return_t _client_add_server(const char *host, in_port_t port,
  78. void *context);
  79. /**
  80. * Real do function.
  81. */
  82. static void *_client_do(gearman_client_st *client, gearman_command_t command,
  83. const char *function_name,
  84. const char *unique,
  85. const void *workload, size_t workload_size,
  86. size_t *result_size, gearman_return_t *ret_ptr);
  87. /**
  88. * Real background do function.
  89. */
  90. static gearman_return_t _client_do_background(gearman_client_st *client,
  91. gearman_command_t command,
  92. gearman_string_t &function,
  93. gearman_unique_t &unique,
  94. gearman_string_t &workload,
  95. gearman_job_handle_t job_handle);
  96. /*
  97. * Public Definitions
  98. */
  99. gearman_client_st *gearman_client_create(gearman_client_st *client)
  100. {
  101. return _client_allocate(client, false);
  102. }
  103. gearman_client_st *gearman_client_clone(gearman_client_st *client,
  104. const gearman_client_st *from)
  105. {
  106. if (not from)
  107. {
  108. return _client_allocate(client, false);
  109. }
  110. client= _client_allocate(client, true);
  111. if (not client)
  112. {
  113. return client;
  114. }
  115. client->options.non_blocking= from->options.non_blocking;
  116. client->options.unbuffered_result= from->options.unbuffered_result;
  117. client->options.no_new= from->options.no_new;
  118. client->options.free_tasks= from->options.free_tasks;
  119. client->actions= from->actions;
  120. client->_do_handle[0]= 0;
  121. gearman_universal_clone(client->universal, from->universal);
  122. return client;
  123. }
  124. void gearman_client_free(gearman_client_st *client)
  125. {
  126. if (not client)
  127. return;
  128. gearman_client_task_free_all(client);
  129. gearman_universal_free(client->universal);
  130. if (client->options.allocated)
  131. delete client;
  132. }
  133. const char *gearman_client_error(const gearman_client_st *client)
  134. {
  135. if (not client)
  136. return NULL;
  137. return gearman_universal_error(client->universal);
  138. }
  139. int gearman_client_errno(const gearman_client_st *client)
  140. {
  141. if (not client)
  142. return 0;
  143. return gearman_universal_errno(client->universal);
  144. }
  145. gearman_client_options_t gearman_client_options(const gearman_client_st *client)
  146. {
  147. int32_t options;
  148. memset(&options, 0, sizeof(int32_t));
  149. if (client->options.allocated)
  150. options|= int(GEARMAN_CLIENT_ALLOCATED);
  151. if (client->options.non_blocking)
  152. options|= int(GEARMAN_CLIENT_NON_BLOCKING);
  153. if (client->options.unbuffered_result)
  154. options|= int(GEARMAN_CLIENT_UNBUFFERED_RESULT);
  155. if (client->options.no_new)
  156. options|= int(GEARMAN_CLIENT_NO_NEW);
  157. if (client->options.free_tasks)
  158. options|= int(GEARMAN_CLIENT_FREE_TASKS);
  159. return gearman_client_options_t(options);
  160. }
  161. bool gearman_client_has_option(gearman_client_st *client,
  162. gearman_client_options_t option)
  163. {
  164. if (not client)
  165. return false;
  166. switch (option)
  167. {
  168. case GEARMAN_CLIENT_ALLOCATED:
  169. return client->options.allocated;
  170. case GEARMAN_CLIENT_NON_BLOCKING:
  171. return client->options.non_blocking;
  172. case GEARMAN_CLIENT_UNBUFFERED_RESULT:
  173. return client->options.unbuffered_result;
  174. case GEARMAN_CLIENT_NO_NEW:
  175. return client->options.no_new;
  176. case GEARMAN_CLIENT_FREE_TASKS:
  177. return client->options.free_tasks;
  178. default:
  179. case GEARMAN_CLIENT_TASK_IN_USE:
  180. case GEARMAN_CLIENT_MAX:
  181. return false;
  182. }
  183. }
  184. void gearman_client_set_options(gearman_client_st *client,
  185. gearman_client_options_t options)
  186. {
  187. if (not client)
  188. return;
  189. gearman_client_options_t usable_options[]= {
  190. GEARMAN_CLIENT_NON_BLOCKING,
  191. GEARMAN_CLIENT_UNBUFFERED_RESULT,
  192. GEARMAN_CLIENT_FREE_TASKS,
  193. GEARMAN_CLIENT_MAX
  194. };
  195. gearman_client_options_t *ptr;
  196. for (ptr= usable_options; *ptr != GEARMAN_CLIENT_MAX ; ptr++)
  197. {
  198. if (options & *ptr)
  199. {
  200. gearman_client_add_options(client, *ptr);
  201. }
  202. else
  203. {
  204. gearman_client_remove_options(client, *ptr);
  205. }
  206. }
  207. }
  208. void gearman_client_add_options(gearman_client_st *client,
  209. gearman_client_options_t options)
  210. {
  211. if (not client)
  212. return;
  213. if (options & GEARMAN_CLIENT_NON_BLOCKING)
  214. {
  215. gearman_universal_add_options(client->universal, GEARMAN_NON_BLOCKING);
  216. client->options.non_blocking= true;
  217. }
  218. if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
  219. {
  220. client->options.unbuffered_result= true;
  221. }
  222. if (options & GEARMAN_CLIENT_FREE_TASKS)
  223. {
  224. client->options.free_tasks= true;
  225. }
  226. }
  227. void gearman_client_remove_options(gearman_client_st *client,
  228. gearman_client_options_t options)
  229. {
  230. if (not client)
  231. return;
  232. if (options & GEARMAN_CLIENT_NON_BLOCKING)
  233. {
  234. gearman_universal_remove_options(client->universal, GEARMAN_NON_BLOCKING);
  235. client->options.non_blocking= false;
  236. }
  237. if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
  238. {
  239. client->options.unbuffered_result= false;
  240. }
  241. if (options & GEARMAN_CLIENT_FREE_TASKS)
  242. {
  243. client->options.free_tasks= false;
  244. }
  245. }
  246. int gearman_client_timeout(gearman_client_st *client)
  247. {
  248. return gearman_universal_timeout(client->universal);
  249. }
  250. void gearman_client_set_timeout(gearman_client_st *client, int timeout)
  251. {
  252. if (not client)
  253. return;
  254. gearman_universal_set_timeout(client->universal, timeout);
  255. }
  256. void *gearman_client_context(const gearman_client_st *client)
  257. {
  258. if (not client)
  259. return NULL;
  260. return const_cast<void *>(client->context);
  261. }
  262. void gearman_client_set_context(gearman_client_st *client, void *context)
  263. {
  264. if (not client)
  265. return;
  266. client->context= context;
  267. }
  268. void gearman_client_set_log_fn(gearman_client_st *client,
  269. gearman_log_fn *function, void *context,
  270. gearman_verbose_t verbose)
  271. {
  272. if (not client)
  273. return;
  274. gearman_set_log_fn(client->universal, function, context, verbose);
  275. }
  276. void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
  277. gearman_malloc_fn *function,
  278. void *context)
  279. {
  280. if (not client)
  281. return;
  282. gearman_set_workload_malloc_fn(client->universal, function, context);
  283. }
  284. void gearman_client_set_workload_free_fn(gearman_client_st *client, gearman_free_fn *function, void *context)
  285. {
  286. if (not client)
  287. return;
  288. gearman_set_workload_free_fn(client->universal, function, context);
  289. }
  290. gearman_return_t gearman_client_add_server(gearman_client_st *client,
  291. const char *host, in_port_t port)
  292. {
  293. if (not client)
  294. {
  295. return GEARMAN_INVALID_ARGUMENT;
  296. }
  297. if (not gearman_connection_create_args(client->universal, host, port))
  298. {
  299. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  300. }
  301. return GEARMAN_SUCCESS;
  302. }
  303. gearman_return_t gearman_client_add_servers(gearman_client_st *client,
  304. const char *servers)
  305. {
  306. return gearman_parse_servers(servers, _client_add_server, client);
  307. }
  308. void gearman_client_remove_servers(gearman_client_st *client)
  309. {
  310. if (not client)
  311. return;
  312. gearman_free_all_cons(client->universal);
  313. }
  314. gearman_return_t gearman_client_wait(gearman_client_st *client)
  315. {
  316. if (not client)
  317. return GEARMAN_INVALID_ARGUMENT;
  318. return gearman_wait(client->universal);
  319. }
  320. void *gearman_client_do(gearman_client_st *client,
  321. const char *function,
  322. const char *unique,
  323. const void *workload,
  324. size_t workload_size, size_t *result_size,
  325. gearman_return_t *ret_ptr)
  326. {
  327. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB,
  328. function,
  329. unique,
  330. workload, workload_size,
  331. result_size, ret_ptr);
  332. }
  333. void *gearman_client_do_high(gearman_client_st *client,
  334. const char *function,
  335. const char *unique,
  336. const void *workload, size_t workload_size,
  337. size_t *result_size, gearman_return_t *ret_ptr)
  338. {
  339. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
  340. function,
  341. unique,
  342. workload, workload_size,
  343. result_size, ret_ptr);
  344. }
  345. void *gearman_client_do_low(gearman_client_st *client,
  346. const char *function,
  347. const char *unique,
  348. const void *workload, size_t workload_size,
  349. size_t *result_size, gearman_return_t *ret_ptr)
  350. {
  351. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
  352. function,
  353. unique,
  354. workload, workload_size,
  355. result_size, ret_ptr);
  356. }
  357. size_t gearman_client_count_tasks(gearman_client_st *client)
  358. {
  359. if (not client)
  360. return 0;
  361. size_t count= 1;
  362. gearman_task_st *search= client->task_list;
  363. while ((search= search->next))
  364. {
  365. count++;
  366. }
  367. return count;
  368. }
  369. #if 0
  370. static bool _active_tasks(gearman_client_st *client)
  371. {
  372. assert(client);
  373. gearman_task_st *search= client->task_list;
  374. if (not search)
  375. return false;
  376. do
  377. {
  378. if (gearman_task_is_active(search))
  379. {
  380. return true;
  381. }
  382. } while ((search= search->next));
  383. return false;
  384. }
  385. #endif
  386. const char *gearman_client_do_job_handle(gearman_client_st *self)
  387. {
  388. if (not self)
  389. {
  390. errno= EINVAL;
  391. return NULL;
  392. }
  393. return self->_do_handle;
  394. }
  395. void gearman_client_do_status(gearman_client_st *, uint32_t *numerator, uint32_t *denominator)
  396. {
  397. if (numerator)
  398. *numerator= 0;
  399. if (denominator)
  400. *denominator= 0;
  401. }
  402. gearman_return_t gearman_client_do_background(gearman_client_st *client,
  403. const char *function_name,
  404. const char *unique,
  405. const void *workload_str,
  406. size_t workload_size,
  407. gearman_job_handle_t job_handle)
  408. {
  409. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  410. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  411. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  412. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
  413. function,
  414. local_unique,
  415. workload,
  416. job_handle);
  417. }
  418. gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
  419. const char *function_name,
  420. const char *unique,
  421. const void *workload_str,
  422. size_t workload_size,
  423. gearman_job_handle_t job_handle)
  424. {
  425. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  426. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  427. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  428. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
  429. function,
  430. local_unique,
  431. workload,
  432. job_handle);
  433. }
  434. gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
  435. const char *function_name,
  436. const char *unique,
  437. const void *workload_str,
  438. size_t workload_size,
  439. gearman_job_handle_t job_handle)
  440. {
  441. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  442. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  443. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  444. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
  445. function,
  446. local_unique,
  447. workload,
  448. job_handle);
  449. }
  450. gearman_return_t gearman_client_job_status(gearman_client_st *client,
  451. const gearman_job_handle_t job_handle,
  452. bool *is_known, bool *is_running,
  453. uint32_t *numerator,
  454. uint32_t *denominator)
  455. {
  456. gearman_return_t ret;
  457. if (not client)
  458. return GEARMAN_INVALID_ARGUMENT;
  459. gearman_task_st do_task;
  460. gearman_task_st *do_task_ptr= gearman_client_add_task_status(client, &do_task, client,
  461. job_handle, &ret);
  462. if (gearman_failed(ret))
  463. {
  464. return ret;
  465. }
  466. assert(do_task_ptr);
  467. do_task_ptr->type= GEARMAN_TASK_KIND_DO;
  468. gearman_task_clear_fn(do_task_ptr);
  469. do {
  470. ret= gearman_client_run_tasks(client);
  471. // If either of the following is ever true, we will end up in an
  472. // infinite loop
  473. assert(ret != GEARMAN_IN_PROGRESS and ret != GEARMAN_JOB_EXISTS);
  474. } while (gearman_continue(ret));
  475. // @note we don't know if our task was run or not, we just know something
  476. // happened.
  477. if (gearman_success(ret))
  478. {
  479. if (is_known)
  480. *is_known= do_task.options.is_known;
  481. if (is_running)
  482. *is_running= do_task.options.is_running;
  483. if (numerator)
  484. *numerator= do_task.numerator;
  485. if (denominator)
  486. *denominator= do_task.denominator;
  487. if (not is_known and not is_running)
  488. {
  489. if (do_task.options.is_running)
  490. {
  491. ret= GEARMAN_IN_PROGRESS;
  492. }
  493. else if (do_task.options.is_known)
  494. {
  495. ret= GEARMAN_JOB_EXISTS;
  496. }
  497. }
  498. }
  499. else
  500. {
  501. if (is_known)
  502. *is_known= false;
  503. if (is_running)
  504. *is_running= false;
  505. if (numerator)
  506. *numerator= 0;
  507. if (denominator)
  508. *denominator= 0;
  509. }
  510. gearman_task_free(do_task_ptr);
  511. return ret;
  512. }
  513. gearman_return_t gearman_client_echo(gearman_client_st *client,
  514. const void *workload,
  515. size_t workload_size)
  516. {
  517. if (not client)
  518. return GEARMAN_INVALID_ARGUMENT;
  519. return gearman_echo(client->universal, workload, workload_size);
  520. }
  521. void gearman_client_task_free_all(gearman_client_st *client)
  522. {
  523. if (not client)
  524. return;
  525. while (client->task_list)
  526. {
  527. gearman_task_free(client->task_list);
  528. }
  529. }
  530. void gearman_client_set_task_context_free_fn(gearman_client_st *client,
  531. gearman_task_context_free_fn *function)
  532. {
  533. if (not client)
  534. return;
  535. client->task_context_free_fn= function;
  536. }
  537. gearman_return_t gearman_client_set_memory_allocators(gearman_client_st *client,
  538. gearman_malloc_fn *malloc_fn,
  539. gearman_free_fn *free_fn,
  540. gearman_realloc_fn *realloc_fn,
  541. gearman_calloc_fn *calloc_fn,
  542. void *context)
  543. {
  544. if (not client)
  545. return GEARMAN_INVALID_ARGUMENT;
  546. return gearman_set_memory_allocator(client->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
  547. }
  548. gearman_task_st *gearman_client_add_task(gearman_client_st *client,
  549. gearman_task_st *task,
  550. void *context,
  551. const char *function,
  552. const char *unique,
  553. const void *workload, size_t workload_size,
  554. gearman_return_t *ret_ptr)
  555. {
  556. return add_task(client, task,
  557. context, GEARMAN_COMMAND_SUBMIT_JOB,
  558. function,
  559. unique,
  560. workload, workload_size,
  561. time_t(0),
  562. ret_ptr,
  563. client->actions);
  564. }
  565. gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
  566. gearman_task_st *task,
  567. void *context,
  568. const char *function,
  569. const char *unique,
  570. const void *workload, size_t workload_size,
  571. gearman_return_t *ret_ptr)
  572. {
  573. return add_task(client, task, context,
  574. GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
  575. function,
  576. unique,
  577. workload, workload_size,
  578. time_t(0),
  579. ret_ptr,
  580. client->actions);
  581. }
  582. gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
  583. gearman_task_st *task,
  584. void *context,
  585. const char *function,
  586. const char *unique,
  587. const void *workload, size_t workload_size,
  588. gearman_return_t *ret_ptr)
  589. {
  590. return add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
  591. function,
  592. unique,
  593. workload, workload_size,
  594. time_t(0),
  595. ret_ptr,
  596. client->actions);
  597. }
  598. gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
  599. gearman_task_st *task,
  600. void *context,
  601. const char *function,
  602. const char *unique,
  603. const void *workload, size_t workload_size,
  604. gearman_return_t *ret_ptr)
  605. {
  606. return add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
  607. function,
  608. unique,
  609. workload, workload_size,
  610. time_t(0),
  611. ret_ptr,
  612. client->actions);
  613. }
  614. gearman_task_st *
  615. gearman_client_add_task_high_background(gearman_client_st *client,
  616. gearman_task_st *task,
  617. void *context,
  618. const char *function,
  619. const char *unique,
  620. const void *workload, size_t workload_size,
  621. gearman_return_t *ret_ptr)
  622. {
  623. return add_task(client, task, context,
  624. GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
  625. function,
  626. unique,
  627. workload, workload_size,
  628. time_t(0),
  629. ret_ptr,
  630. client->actions);
  631. }
  632. gearman_task_st *
  633. gearman_client_add_task_low_background(gearman_client_st *client,
  634. gearman_task_st *task,
  635. void *context,
  636. const char *function,
  637. const char *unique,
  638. const void *workload, size_t workload_size,
  639. gearman_return_t *ret_ptr)
  640. {
  641. return add_task(client, task, context,
  642. GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
  643. function,
  644. unique,
  645. workload, workload_size,
  646. time_t(0),
  647. ret_ptr,
  648. client->actions);
  649. }
  650. gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
  651. gearman_task_st *task,
  652. void *context,
  653. const gearman_job_handle_t job_handle,
  654. gearman_return_t *ret_ptr)
  655. {
  656. const void *args[1];
  657. size_t args_size[1];
  658. gearman_return_t unused;
  659. if (not ret_ptr)
  660. ret_ptr= &unused;
  661. if (not (task= gearman_task_internal_create(client, task)))
  662. {
  663. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  664. return NULL;
  665. }
  666. task->context= context;
  667. snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
  668. args[0]= job_handle;
  669. args_size[0]= strlen(job_handle);
  670. gearman_return_t rc= gearman_packet_create_args(client->universal, task->send,
  671. GEARMAN_MAGIC_REQUEST,
  672. GEARMAN_COMMAND_GET_STATUS,
  673. args, args_size, 1);
  674. if (gearman_success(rc))
  675. {
  676. client->new_tasks++;
  677. client->running_tasks++;
  678. task->options.send_in_use= true;
  679. }
  680. *ret_ptr= rc;
  681. return task;
  682. }
  683. void gearman_client_set_workload_fn(gearman_client_st *client,
  684. gearman_workload_fn *function)
  685. {
  686. if (not client)
  687. return;
  688. client->actions.workload_fn= function;
  689. }
  690. void gearman_client_set_created_fn(gearman_client_st *client,
  691. gearman_created_fn *function)
  692. {
  693. if (not client)
  694. return;
  695. client->actions.created_fn= function;
  696. }
  697. void gearman_client_set_data_fn(gearman_client_st *client,
  698. gearman_data_fn *function)
  699. {
  700. if (not client)
  701. return;
  702. client->actions.data_fn= function;
  703. }
  704. void gearman_client_set_warning_fn(gearman_client_st *client,
  705. gearman_warning_fn *function)
  706. {
  707. if (not client)
  708. return;
  709. client->actions.warning_fn= function;
  710. }
  711. void gearman_client_set_status_fn(gearman_client_st *client,
  712. gearman_universal_status_fn *function)
  713. {
  714. if (not client)
  715. return;
  716. client->actions.status_fn= function;
  717. }
  718. void gearman_client_set_complete_fn(gearman_client_st *client,
  719. gearman_complete_fn *function)
  720. {
  721. if (not client)
  722. return;
  723. client->actions.complete_fn= function;
  724. }
  725. void gearman_client_set_exception_fn(gearman_client_st *client,
  726. gearman_exception_fn *function)
  727. {
  728. if (not client)
  729. return;
  730. client->actions.exception_fn= function;
  731. }
  732. void gearman_client_set_fail_fn(gearman_client_st *client,
  733. gearman_fail_fn *function)
  734. {
  735. if (not client)
  736. return;
  737. client->actions.fail_fn= function;
  738. }
  739. void gearman_client_clear_fn(gearman_client_st *client)
  740. {
  741. if (not client)
  742. return;
  743. client->actions= gearman_actions_default();
  744. }
  745. static inline void _push_non_blocking(gearman_client_st *client)
  746. {
  747. client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
  748. client->universal.options.non_blocking= true;
  749. }
  750. static inline void _pop_non_blocking(gearman_client_st *client)
  751. {
  752. client->universal.options.non_blocking= client->options.non_blocking;
  753. assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
  754. }
  755. static inline void _push_blocking(gearman_client_st *client)
  756. {
  757. client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
  758. client->universal.options.non_blocking= false;
  759. }
  760. static inline void _pop_blocking(gearman_client_st *client)
  761. {
  762. client->universal.options.non_blocking= client->options.non_blocking;
  763. assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
  764. }
  765. static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
  766. {
  767. gearman_return_t ret= GEARMAN_MAX_RETURN;
  768. switch(client->state)
  769. {
  770. case GEARMAN_CLIENT_STATE_IDLE:
  771. while (1)
  772. {
  773. /* Start any new tasks. */
  774. if (client->new_tasks > 0 && ! (client->options.no_new))
  775. {
  776. for (client->task= client->task_list; client->task;
  777. client->task= client->task->next)
  778. {
  779. if (client->task->state != GEARMAN_TASK_STATE_NEW)
  780. {
  781. continue;
  782. }
  783. case GEARMAN_CLIENT_STATE_NEW:
  784. gearman_return_t local_ret= _client_run_task(client, client->task);
  785. if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
  786. {
  787. client->state= GEARMAN_CLIENT_STATE_NEW;
  788. return local_ret;
  789. }
  790. }
  791. if (client->new_tasks == 0)
  792. {
  793. gearman_return_t local_ret= gearman_flush_all(client->universal);
  794. if (gearman_failed(local_ret))
  795. {
  796. return local_ret;
  797. }
  798. }
  799. }
  800. /* See if there are any connections ready for I/O. */
  801. while ((client->con= gearman_ready(client->universal)))
  802. {
  803. if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
  804. {
  805. /* Socket is ready for writing, continue submitting jobs. */
  806. for (client->task= client->task_list; client->task;
  807. client->task= client->task->next)
  808. {
  809. if (client->task->con != client->con or
  810. (client->task->state != GEARMAN_TASK_STATE_SUBMIT and
  811. client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
  812. {
  813. continue;
  814. }
  815. case GEARMAN_CLIENT_STATE_SUBMIT:
  816. gearman_return_t local_ret= _client_run_task(client, client->task);
  817. if (local_ret == GEARMAN_COULD_NOT_CONNECT)
  818. {
  819. client->state= GEARMAN_CLIENT_STATE_IDLE;
  820. return local_ret;
  821. }
  822. else if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
  823. {
  824. client->state= GEARMAN_CLIENT_STATE_SUBMIT;
  825. return local_ret;
  826. }
  827. }
  828. }
  829. if (not (client->con->revents & POLLIN))
  830. continue;
  831. /* Socket is ready for reading. */
  832. while (1)
  833. {
  834. /* Read packet on connection and find which task it belongs to. */
  835. if (client->options.unbuffered_result)
  836. {
  837. /* If client is handling the data read, make sure it's complete. */
  838. if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
  839. {
  840. for (client->task= client->task_list; client->task;
  841. client->task= client->task->next)
  842. {
  843. if (client->task->con == client->con &&
  844. (client->task->state == GEARMAN_TASK_STATE_DATA or
  845. client->task->state == GEARMAN_TASK_STATE_COMPLETE))
  846. {
  847. break;
  848. }
  849. }
  850. /*
  851. Someone has set GEARMAN_CLIENT_UNBUFFERED_RESULT but hasn't setup the client to fetch data correctly.
  852. Fatal error :(
  853. */
  854. return gearman_universal_set_error(client->universal, GEARMAN_INVALID_ARGUMENT, AT,
  855. "client created with GEARMAN_CLIENT_UNBUFFERED_RESULT, but was not setup to use it. %s", __func__);
  856. }
  857. else
  858. {
  859. /* Read the next packet, without buffering the data part. */
  860. client->task= NULL;
  861. (void)client->con->receiving(client->con->_packet, ret, false);
  862. }
  863. }
  864. else
  865. {
  866. /* Read the next packet, buffering the data part. */
  867. client->task= NULL;
  868. (void)client->con->receiving(client->con->_packet, ret, true);
  869. }
  870. if (client->task == NULL)
  871. {
  872. assert(ret != GEARMAN_MAX_RETURN);
  873. /* Check the return of the gearman_connection_recv() calls above. */
  874. if (gearman_failed(ret))
  875. {
  876. if (ret == GEARMAN_IO_WAIT)
  877. break;
  878. client->state= GEARMAN_CLIENT_STATE_IDLE;
  879. return ret;
  880. }
  881. client->con->options.packet_in_use= true;
  882. /* We have a packet, see which task it belongs to. */
  883. for (client->task= client->task_list; client->task;
  884. client->task= client->task->next)
  885. {
  886. if (client->task->con != client->con)
  887. continue;
  888. if (client->con->_packet.command == GEARMAN_COMMAND_JOB_CREATED)
  889. {
  890. if (client->task->created_id != client->con->created_id)
  891. continue;
  892. /* New job created, drop through below and notify task. */
  893. client->con->created_id++;
  894. }
  895. else if (client->con->_packet.command == GEARMAN_COMMAND_ERROR)
  896. {
  897. gearman_universal_set_error(client->universal, GEARMAN_SERVER_ERROR, AT,
  898. "%s:%.*s",
  899. static_cast<char *>(client->con->_packet.arg[0]),
  900. int(client->con->_packet.arg_size[1]),
  901. static_cast<char *>(client->con->_packet.arg[1]));
  902. return GEARMAN_SERVER_ERROR;
  903. }
  904. else if (strncmp(client->task->job_handle,
  905. static_cast<char *>(client->con->_packet.arg[0]),
  906. client->con->_packet.arg_size[0]) ||
  907. (client->con->_packet.command != GEARMAN_COMMAND_WORK_FAIL &&
  908. strlen(client->task->job_handle) != client->con->_packet.arg_size[0] - 1) ||
  909. (client->con->_packet.command == GEARMAN_COMMAND_WORK_FAIL &&
  910. strlen(client->task->job_handle) != client->con->_packet.arg_size[0]))
  911. {
  912. continue;
  913. }
  914. /* Else, we have a matching result packet of some kind. */
  915. break;
  916. }
  917. if (not client->task)
  918. {
  919. /* The client has stopped waiting for the response, ignore it. */
  920. gearman_packet_free(&(client->con->_packet));
  921. client->con->options.packet_in_use= false;
  922. continue;
  923. }
  924. client->task->recv= &(client->con->_packet);
  925. }
  926. case GEARMAN_CLIENT_STATE_PACKET:
  927. /* Let task process job created or result packet. */
  928. gearman_return_t local_ret= _client_run_task(client, client->task);
  929. if (local_ret == GEARMAN_IO_WAIT)
  930. break;
  931. if (gearman_failed(local_ret))
  932. {
  933. client->state= GEARMAN_CLIENT_STATE_PACKET;
  934. return local_ret;
  935. }
  936. /* Clean up the packet. */
  937. gearman_packet_free(&(client->con->_packet));
  938. client->con->options.packet_in_use= false;
  939. /* If all tasks are done, return. */
  940. if (client->running_tasks == 0)
  941. break;
  942. }
  943. }
  944. /* If all tasks are done, return. */
  945. if (client->running_tasks == 0)
  946. {
  947. break;
  948. }
  949. if (client->new_tasks > 0 && ! (client->options.no_new))
  950. continue;
  951. if (client->options.non_blocking)
  952. {
  953. /* Let the caller wait for activity. */
  954. client->state= GEARMAN_CLIENT_STATE_IDLE;
  955. gearman_gerror(client->universal, GEARMAN_IO_WAIT);
  956. return GEARMAN_IO_WAIT;
  957. }
  958. /* Wait for activity on one of the connections. */
  959. gearman_return_t local_ret= gearman_wait(client->universal);
  960. if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
  961. {
  962. client->state= GEARMAN_CLIENT_STATE_IDLE;
  963. return local_ret;
  964. }
  965. }
  966. break;
  967. }
  968. client->state= GEARMAN_CLIENT_STATE_IDLE;
  969. return GEARMAN_SUCCESS;
  970. }
  971. gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
  972. {
  973. if (not client)
  974. {
  975. return GEARMAN_INVALID_ARGUMENT;
  976. }
  977. if (not client->task_list) // We are immediatly successful if all tasks are completed
  978. {
  979. return GEARMAN_SUCCESS;
  980. }
  981. _push_non_blocking(client);
  982. gearman_return_t rc= _client_run_tasks(client);
  983. _pop_non_blocking(client);
  984. if (rc == GEARMAN_COULD_NOT_CONNECT)
  985. {
  986. gearman_reset(client->universal);
  987. }
  988. return rc;
  989. }
  990. gearman_return_t gearman_client_run_block_tasks(gearman_client_st *client)
  991. {
  992. if (not client)
  993. {
  994. return GEARMAN_INVALID_ARGUMENT;
  995. }
  996. if (not client->task_list) // We are immediatly successful if all tasks are completed
  997. {
  998. return GEARMAN_SUCCESS;
  999. }
  1000. _push_blocking(client);
  1001. gearman_return_t rc= _client_run_tasks(client);
  1002. _pop_blocking(client);
  1003. if (gearman_failed(rc))
  1004. {
  1005. if (rc == GEARMAN_COULD_NOT_CONNECT)
  1006. {
  1007. gearman_reset(client->universal);
  1008. }
  1009. assert(gearman_universal_error_code(client->universal) == rc);
  1010. }
  1011. return rc;
  1012. }
  1013. /*
  1014. * Static Definitions
  1015. */
  1016. static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone)
  1017. {
  1018. if (client)
  1019. {
  1020. client->options.allocated= false;
  1021. }
  1022. else
  1023. {
  1024. client= new (std::nothrow) gearman_client_st;
  1025. if (not client)
  1026. return NULL;
  1027. client->options.allocated= true;
  1028. }
  1029. client->options.non_blocking= false;
  1030. client->options.unbuffered_result= false;
  1031. client->options.no_new= false;
  1032. client->options.free_tasks= false;
  1033. client->state= GEARMAN_CLIENT_STATE_IDLE;
  1034. client->new_tasks= 0;
  1035. client->running_tasks= 0;
  1036. client->task_count= 0;
  1037. client->context= NULL;
  1038. client->con= NULL;
  1039. client->task= NULL;
  1040. client->task_list= NULL;
  1041. client->task_context_free_fn= NULL;
  1042. gearman_client_clear_fn(client);
  1043. if (not is_clone)
  1044. {
  1045. gearman_universal_initialize(client->universal);
  1046. }
  1047. return client;
  1048. }
  1049. static gearman_return_t _client_add_server(const char *host, in_port_t port,
  1050. void *context)
  1051. {
  1052. return gearman_client_add_server(static_cast<gearman_client_st *>(context), host, port);
  1053. }
  1054. static void *_client_do(gearman_client_st *client, gearman_command_t command,
  1055. const char *function_name,
  1056. const char *unique,
  1057. const void *workload_str, size_t workload_size,
  1058. size_t *result_size, gearman_return_t *ret_ptr)
  1059. {
  1060. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  1061. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  1062. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  1063. gearman_return_t unused;
  1064. if (not ret_ptr)
  1065. ret_ptr= &unused;
  1066. size_t unused_size;
  1067. if (not result_size)
  1068. result_size= &unused_size;
  1069. if (not client)
  1070. {
  1071. *ret_ptr= GEARMAN_INVALID_ARGUMENT;
  1072. errno= EINVAL;
  1073. return NULL;
  1074. }
  1075. if (not function_name)
  1076. {
  1077. *ret_ptr= GEARMAN_INVALID_ARGUMENT;
  1078. return NULL;
  1079. }
  1080. gearman_task_st do_task;
  1081. gearman_task_st *do_task_ptr= add_task(client, &do_task, NULL, command,
  1082. function,
  1083. local_unique,
  1084. workload,
  1085. time_t(0),
  1086. gearman_actions_do_default());
  1087. if (not do_task_ptr)
  1088. {
  1089. *ret_ptr= gearman_universal_error_code(client->universal);
  1090. return NULL;
  1091. }
  1092. do_task_ptr->type= GEARMAN_TASK_KIND_DO;
  1093. gearman_return_t ret;
  1094. do {
  1095. ret= gearman_client_run_tasks(client);
  1096. } while (gearman_continue(ret));
  1097. // gearman_client_run_tasks failed
  1098. assert(client->task_list); // Programmer error, we should always have the task that we used for do
  1099. char *returnable= NULL;
  1100. if (gearman_failed(ret))
  1101. {
  1102. if (ret == GEARMAN_COULD_NOT_CONNECT)
  1103. { }
  1104. else
  1105. {
  1106. gearman_error(client->universal, ret, "occured during gearman_client_run_tasks()");
  1107. }
  1108. *ret_ptr= ret;
  1109. *result_size= 0;
  1110. }
  1111. else if (gearman_success(ret) and do_task_ptr->result_rc == GEARMAN_SUCCESS)
  1112. {
  1113. *ret_ptr= do_task_ptr->result_rc;
  1114. if (do_task_ptr->result_ptr)
  1115. {
  1116. if (gearman_has_allocator(client->universal))
  1117. {
  1118. gearman_string_t result= gearman_result_string(do_task_ptr->result_ptr);
  1119. returnable= static_cast<char *>(gearman_malloc(client->universal, gearman_size(result) +1));
  1120. if (not returnable)
  1121. {
  1122. gearman_error(client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "custom workload_fn failed to allocate memory");
  1123. *result_size= 0;
  1124. }
  1125. else // NULL terminate
  1126. {
  1127. memcpy(returnable, gearman_c_str(result), gearman_size(result));
  1128. returnable[gearman_size(result)]= 0;
  1129. *result_size= gearman_size(result);
  1130. }
  1131. }
  1132. else
  1133. {
  1134. gearman_string_t result= gearman_result_take_string(do_task_ptr->result_ptr);
  1135. *result_size= gearman_size(result);
  1136. returnable= const_cast<char *>(gearman_c_str(result));
  1137. }
  1138. }
  1139. else // NULL job
  1140. {
  1141. *result_size= 0;
  1142. }
  1143. }
  1144. else // gearman_client_run_tasks() was successful, but the task was not
  1145. {
  1146. gearman_error(client->universal, do_task_ptr->result_rc, "occured during gearman_client_run_tasks()");
  1147. *ret_ptr= do_task_ptr->result_rc;
  1148. *result_size= 0;
  1149. }
  1150. gearman_task_free(&do_task);
  1151. client->new_tasks= 0;
  1152. client->running_tasks= 0;
  1153. return returnable;
  1154. }
  1155. static gearman_return_t _client_do_background(gearman_client_st *client,
  1156. gearman_command_t command,
  1157. gearman_string_t &function,
  1158. gearman_unique_t &unique,
  1159. gearman_string_t &workload,
  1160. gearman_job_handle_t job_handle)
  1161. {
  1162. if (not client)
  1163. {
  1164. errno= EINVAL;
  1165. return GEARMAN_INVALID_ARGUMENT;
  1166. }
  1167. if (gearman_size(function) == 0)
  1168. {
  1169. return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "function arguement was empty");
  1170. }
  1171. client->_do_handle[0]= 0; // Reset the job_handle we store in client
  1172. gearman_task_st do_task, *do_task_ptr;
  1173. do_task_ptr= add_task(client, &do_task,
  1174. client,
  1175. command,
  1176. function,
  1177. unique,
  1178. workload,
  1179. time_t(0),
  1180. gearman_actions_do_default());
  1181. if (not do_task_ptr)
  1182. {
  1183. return gearman_universal_error_code(client->universal);
  1184. }
  1185. do_task_ptr->type= GEARMAN_TASK_KIND_DO;
  1186. gearman_return_t ret;
  1187. do {
  1188. ret= gearman_client_run_tasks(client);
  1189. // If either of the following is ever true, we will end up in an
  1190. // infinite loop
  1191. assert(ret != GEARMAN_IN_PROGRESS and ret != GEARMAN_JOB_EXISTS);
  1192. } while (gearman_continue(ret));
  1193. if (job_handle)
  1194. {
  1195. strncpy(job_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
  1196. }
  1197. strncpy(client->_do_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
  1198. client->new_tasks= 0;
  1199. client->running_tasks= 0;
  1200. gearman_task_free(&do_task);
  1201. return ret;
  1202. }
  1203. bool gearman_client_compare(const gearman_client_st *first, const gearman_client_st *second)
  1204. {
  1205. if (not first || not second)
  1206. return false;
  1207. if (strcmp(first->universal.con_list->host, second->universal.con_list->host))
  1208. return false;
  1209. if (first->universal.con_list->port != second->universal.con_list->port)
  1210. return false;
  1211. return true;
  1212. }
  1213. bool gearman_client_set_server_option(gearman_client_st *self, const char *option_arg, size_t option_arg_size)
  1214. {
  1215. if (not self)
  1216. return false;
  1217. gearman_string_t option= { option_arg, option_arg_size };
  1218. return gearman_request_option(self->universal, option);
  1219. }
  1220. void gearman_client_set_namespace(gearman_client_st *self, const char *namespace_key, size_t namespace_key_size)
  1221. {
  1222. if (not self)
  1223. return;
  1224. gearman_universal_set_namespace(self->universal, namespace_key, namespace_key_size);
  1225. }