client.cc 38 KB


  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. #include <config.h>
  39. #include <arpa/inet.h>
  40. #include <cassert>
  41. #include <cerrno>
  42. #include <cstdio>
  43. #include <cstdlib>
  44. #include <cstring>
  45. #include <memory>
  46. #include <netdb.h>
  47. #include <netinet/in.h>
  48. #include <sys/socket.h>
  49. #include <poll.h>
  50. #define BUILDING_LIBGEARMAN
  51. #define GEARMAN_CORE
  52. #include <libgearman/visibility.h>
  53. #include <libgearman/constants.h>
  54. #include <libgearman/return.h>
  55. #include <libgearman/job_handle.h>
  56. #include <libgearman/actions.h>
  57. #include <libgearman/unique.h>
  58. #include <libgearman/universal.h>
  59. #include <libgearman/universal.hpp>
  60. #include <libgearman/string.h>
  61. #include <libgearman/client.h>
  62. #include <libgearman/packet.hpp>
  63. #include <libgearman/connection.hpp>
  64. #include <libgearman/add.hpp>
  65. #include <libgearman/connection.h>
  66. #include <libgearman/task.h>
  67. #include <libgearman/packet.hpp>
  68. #include <libgearman/run.hpp>
  69. #include <libgearman/parse.h>
  70. #include <libgearman/result.h>
  71. /** Allocate a client structure.
  72. */
  73. static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone);
  74. /**
  75. * Callback function used when parsing server lists.
  76. */
  77. static gearman_return_t _client_add_server(const char *host, in_port_t port,
  78. void *context);
  79. /**
  80. * Real do function.
  81. */
  82. static void *_client_do(gearman_client_st *client, gearman_command_t command,
  83. const char *function_name,
  84. const char *unique,
  85. const void *workload, size_t workload_size,
  86. size_t *result_size, gearman_return_t *ret_ptr);
  87. /**
  88. * Real background do function.
  89. */
  90. static gearman_return_t _client_do_background(gearman_client_st *client,
  91. gearman_command_t command,
  92. gearman_string_t &function,
  93. gearman_unique_t &unique,
  94. gearman_string_t &workload,
  95. char *job_handle);
  96. /*
  97. * Public Definitions
  98. */
  99. gearman_client_st *gearman_client_create(gearman_client_st *client)
  100. {
  101. return _client_allocate(client, false);
  102. }
  103. gearman_client_st *gearman_client_clone(gearman_client_st *client,
  104. const gearman_client_st *from)
  105. {
  106. if (not from)
  107. {
  108. return _client_allocate(client, false);
  109. }
  110. client= _client_allocate(client, true);
  111. if (client == NULL)
  112. {
  113. return client;
  114. }
  115. client->options.non_blocking= from->options.non_blocking;
  116. client->options.unbuffered_result= from->options.unbuffered_result;
  117. client->options.no_new= from->options.no_new;
  118. client->options.free_tasks= from->options.free_tasks;
  119. client->actions= from->actions;
  120. gearman_universal_clone(client->universal, from->universal);
  121. return client;
  122. }
  123. void gearman_client_free(gearman_client_st *client)
  124. {
  125. gearman_client_task_free_all(client);
  126. gearman_universal_free(client->universal);
  127. if (client->options.allocated)
  128. delete client;
  129. }
  130. const char *gearman_client_error(const gearman_client_st *client)
  131. {
  132. if (not client)
  133. return NULL;
  134. return gearman_universal_error(client->universal);
  135. }
  136. gearman_return_t gearman_client_error_code(const gearman_client_st *client)
  137. {
  138. if (not client)
  139. return GEARMAN_SUCCESS;
  140. return gearman_universal_error_code(client->universal);
  141. }
  142. int gearman_client_errno(const gearman_client_st *client)
  143. {
  144. if (not client)
  145. return 0;
  146. return gearman_universal_errno(client->universal);
  147. }
  148. gearman_client_options_t gearman_client_options(const gearman_client_st *client)
  149. {
  150. int32_t options;
  151. memset(&options, 0, sizeof(int32_t));
  152. if (client->options.allocated)
  153. options|= int(GEARMAN_CLIENT_ALLOCATED);
  154. if (client->options.non_blocking)
  155. options|= int(GEARMAN_CLIENT_NON_BLOCKING);
  156. if (client->options.unbuffered_result)
  157. options|= int(GEARMAN_CLIENT_UNBUFFERED_RESULT);
  158. if (client->options.no_new)
  159. options|= int(GEARMAN_CLIENT_NO_NEW);
  160. if (client->options.free_tasks)
  161. options|= int(GEARMAN_CLIENT_FREE_TASKS);
  162. return gearman_client_options_t(options);
  163. }
  164. void gearman_client_set_options(gearman_client_st *client,
  165. gearman_client_options_t options)
  166. {
  167. gearman_client_options_t usable_options[]= {
  168. GEARMAN_CLIENT_NON_BLOCKING,
  169. GEARMAN_CLIENT_UNBUFFERED_RESULT,
  170. GEARMAN_CLIENT_FREE_TASKS,
  171. GEARMAN_CLIENT_MAX
  172. };
  173. gearman_client_options_t *ptr;
  174. for (ptr= usable_options; *ptr != GEARMAN_CLIENT_MAX ; ptr++)
  175. {
  176. if (options & *ptr)
  177. {
  178. gearman_client_add_options(client, *ptr);
  179. }
  180. else
  181. {
  182. gearman_client_remove_options(client, *ptr);
  183. }
  184. }
  185. }
  186. void gearman_client_add_options(gearman_client_st *client,
  187. gearman_client_options_t options)
  188. {
  189. if (options & GEARMAN_CLIENT_NON_BLOCKING)
  190. {
  191. gearman_universal_add_options(client->universal, GEARMAN_NON_BLOCKING);
  192. client->options.non_blocking= true;
  193. }
  194. if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
  195. {
  196. client->options.unbuffered_result= true;
  197. }
  198. if (options & GEARMAN_CLIENT_FREE_TASKS)
  199. {
  200. client->options.free_tasks= true;
  201. }
  202. }
  203. void gearman_client_remove_options(gearman_client_st *client,
  204. gearman_client_options_t options)
  205. {
  206. if (not client)
  207. return;
  208. if (options & GEARMAN_CLIENT_NON_BLOCKING)
  209. {
  210. gearman_universal_remove_options(client->universal, GEARMAN_NON_BLOCKING);
  211. client->options.non_blocking= false;
  212. }
  213. if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
  214. {
  215. client->options.unbuffered_result= false;
  216. }
  217. if (options & GEARMAN_CLIENT_FREE_TASKS)
  218. {
  219. client->options.free_tasks= false;
  220. }
  221. }
  222. int gearman_client_timeout(gearman_client_st *client)
  223. {
  224. return gearman_universal_timeout(client->universal);
  225. }
  226. void gearman_client_set_timeout(gearman_client_st *client, int timeout)
  227. {
  228. if (not client)
  229. return;
  230. gearman_universal_set_timeout(client->universal, timeout);
  231. }
  232. void *gearman_client_context(const gearman_client_st *client)
  233. {
  234. if (not client)
  235. return NULL;
  236. return const_cast<void *>(client->context);
  237. }
  238. void gearman_client_set_context(gearman_client_st *client, void *context)
  239. {
  240. if (not client)
  241. return;
  242. client->context= context;
  243. }
  244. void gearman_client_set_log_fn(gearman_client_st *client,
  245. gearman_log_fn *function, void *context,
  246. gearman_verbose_t verbose)
  247. {
  248. if (not client)
  249. return;
  250. gearman_set_log_fn(client->universal, function, context, verbose);
  251. }
  252. void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
  253. gearman_malloc_fn *function,
  254. void *context)
  255. {
  256. if (not client)
  257. return;
  258. gearman_set_workload_malloc_fn(client->universal, function, context);
  259. }
  260. void gearman_client_set_workload_free_fn(gearman_client_st *client, gearman_free_fn *function, void *context)
  261. {
  262. if (not client)
  263. return;
  264. gearman_set_workload_free_fn(client->universal, function, context);
  265. }
  266. gearman_return_t gearman_client_add_server(gearman_client_st *client,
  267. const char *host, in_port_t port)
  268. {
  269. if (not client)
  270. {
  271. return GEARMAN_INVALID_ARGUMENT;
  272. }
  273. if (not gearman_connection_create_args(client->universal, host, port))
  274. {
  275. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  276. }
  277. return GEARMAN_SUCCESS;
  278. }
  279. gearman_return_t gearman_client_add_servers(gearman_client_st *client,
  280. const char *servers)
  281. {
  282. return gearman_parse_servers(servers, _client_add_server, client);
  283. }
  284. void gearman_client_remove_servers(gearman_client_st *client)
  285. {
  286. if (not client)
  287. return;
  288. gearman_free_all_cons(client->universal);
  289. }
  290. gearman_return_t gearman_client_wait(gearman_client_st *client)
  291. {
  292. return gearman_wait(client->universal);
  293. }
  294. void *gearman_client_do(gearman_client_st *client,
  295. const char *function,
  296. const char *unique,
  297. const void *workload,
  298. size_t workload_size, size_t *result_size,
  299. gearman_return_t *ret_ptr)
  300. {
  301. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB,
  302. function,
  303. unique,
  304. workload, workload_size,
  305. result_size, ret_ptr);
  306. }
  307. void *gearman_client_do_high(gearman_client_st *client,
  308. const char *function,
  309. const char *unique,
  310. const void *workload, size_t workload_size,
  311. size_t *result_size, gearman_return_t *ret_ptr)
  312. {
  313. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
  314. function,
  315. unique,
  316. workload, workload_size,
  317. result_size, ret_ptr);
  318. }
  319. void *gearman_client_do_low(gearman_client_st *client,
  320. const char *function,
  321. const char *unique,
  322. const void *workload, size_t workload_size,
  323. size_t *result_size, gearman_return_t *ret_ptr)
  324. {
  325. return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
  326. function,
  327. unique,
  328. workload, workload_size,
  329. result_size, ret_ptr);
  330. }
  331. size_t gearman_client_count_tasks(gearman_client_st *client)
  332. {
  333. size_t count= 1;
  334. gearman_task_st *search= client->task_list;
  335. while ((search= search->next))
  336. {
  337. count++;
  338. }
  339. return count;
  340. }
  341. #if 0
  342. static bool _active_tasks(gearman_client_st *client)
  343. {
  344. assert(client);
  345. gearman_task_st *search= client->task_list;
  346. if (not search)
  347. return false;
  348. do
  349. {
  350. if (gearman_task_is_active(search))
  351. {
  352. return true;
  353. }
  354. } while ((search= search->next));
  355. return false;
  356. }
  357. #endif
  358. const char *gearman_client_do_job_handle(gearman_client_st *self)
  359. {
  360. if (not self)
  361. {
  362. errno= EINVAL;
  363. return NULL;
  364. }
  365. if (not self->task_list)
  366. {
  367. gearman_error(self->universal, GEARMAN_INVALID_ARGUMENT, "client has an empty task list");
  368. return NULL;
  369. }
  370. return gearman_task_job_handle(self->task_list);
  371. }
  372. void gearman_client_do_status(gearman_client_st *, uint32_t *numerator, uint32_t *denominator)
  373. {
  374. if (numerator)
  375. *numerator= 0;
  376. if (denominator)
  377. *denominator= 0;
  378. }
  379. gearman_return_t gearman_client_do_background(gearman_client_st *client,
  380. const char *function_name,
  381. const char *unique,
  382. const void *workload_str,
  383. size_t workload_size,
  384. gearman_job_handle_t job_handle)
  385. {
  386. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  387. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  388. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  389. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
  390. function,
  391. local_unique,
  392. workload,
  393. job_handle);
  394. }
  395. gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
  396. const char *function_name,
  397. const char *unique,
  398. const void *workload_str,
  399. size_t workload_size,
  400. gearman_job_handle_t job_handle)
  401. {
  402. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  403. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  404. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  405. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
  406. function,
  407. local_unique,
  408. workload,
  409. job_handle);
  410. }
  411. gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
  412. const char *function_name,
  413. const char *unique,
  414. const void *workload_str,
  415. size_t workload_size,
  416. gearman_job_handle_t job_handle)
  417. {
  418. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  419. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  420. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  421. return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
  422. function,
  423. local_unique,
  424. workload,
  425. job_handle);
  426. }
  427. gearman_return_t gearman_client_job_status(gearman_client_st *client,
  428. const char *job_handle,
  429. bool *is_known, bool *is_running,
  430. uint32_t *numerator,
  431. uint32_t *denominator)
  432. {
  433. gearman_return_t ret;
  434. gearman_task_st do_task, *do_task_ptr;
  435. do_task_ptr= gearman_client_add_task_status(client, &do_task, client,
  436. job_handle, &ret);
  437. if (gearman_failed(ret))
  438. return ret;
  439. assert(do_task_ptr);
  440. gearman_task_clear_fn(do_task_ptr);
  441. ret= gearman_client_run_tasks(client);
  442. if (ret != GEARMAN_IO_WAIT)
  443. {
  444. if (is_known)
  445. *is_known= do_task.options.is_known;
  446. if (is_running)
  447. *is_running= do_task.options.is_running;
  448. if (numerator)
  449. *numerator= do_task.numerator;
  450. if (denominator)
  451. *denominator= do_task.denominator;
  452. }
  453. gearman_task_free(do_task_ptr);
  454. return ret;
  455. }
  456. gearman_return_t gearman_client_echo(gearman_client_st *client,
  457. const void *workload,
  458. size_t workload_size)
  459. {
  460. if (not client)
  461. return GEARMAN_INVALID_ARGUMENT;
  462. return gearman_echo(client->universal, workload, workload_size);
  463. }
  464. void gearman_client_task_free_all(gearman_client_st *client)
  465. {
  466. while (client->task_list)
  467. {
  468. gearman_task_free(client->task_list);
  469. }
  470. }
  471. void gearman_client_set_task_context_free_fn(gearman_client_st *client,
  472. gearman_task_context_free_fn *function)
  473. {
  474. client->task_context_free_fn= function;
  475. }
  476. gearman_task_st *gearman_client_add_task(gearman_client_st *client,
  477. gearman_task_st *task,
  478. void *context,
  479. const char *function,
  480. const char *unique,
  481. const void *workload, size_t workload_size,
  482. gearman_return_t *ret_ptr)
  483. {
  484. return add_task(client, task,
  485. context, GEARMAN_COMMAND_SUBMIT_JOB,
  486. function,
  487. unique,
  488. workload, workload_size,
  489. time_t(0),
  490. ret_ptr,
  491. client->actions);
  492. }
  493. gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
  494. gearman_task_st *task,
  495. void *context,
  496. const char *function,
  497. const char *unique,
  498. const void *workload, size_t workload_size,
  499. gearman_return_t *ret_ptr)
  500. {
  501. return add_task(client, task, context,
  502. GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
  503. function,
  504. unique,
  505. workload, workload_size,
  506. time_t(0),
  507. ret_ptr,
  508. client->actions);
  509. }
  510. gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
  511. gearman_task_st *task,
  512. void *context,
  513. const char *function,
  514. const char *unique,
  515. const void *workload, size_t workload_size,
  516. gearman_return_t *ret_ptr)
  517. {
  518. return add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
  519. function,
  520. unique,
  521. workload, workload_size,
  522. time_t(0),
  523. ret_ptr,
  524. client->actions);
  525. }
  526. gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
  527. gearman_task_st *task,
  528. void *context,
  529. const char *function,
  530. const char *unique,
  531. const void *workload, size_t workload_size,
  532. gearman_return_t *ret_ptr)
  533. {
  534. return add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
  535. function,
  536. unique,
  537. workload, workload_size,
  538. time_t(0),
  539. ret_ptr,
  540. client->actions);
  541. }
  542. gearman_task_st *
  543. gearman_client_add_task_high_background(gearman_client_st *client,
  544. gearman_task_st *task,
  545. void *context,
  546. const char *function,
  547. const char *unique,
  548. const void *workload, size_t workload_size,
  549. gearman_return_t *ret_ptr)
  550. {
  551. return add_task(client, task, context,
  552. GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
  553. function,
  554. unique,
  555. workload, workload_size,
  556. time_t(0),
  557. ret_ptr,
  558. client->actions);
  559. }
  560. gearman_task_st *
  561. gearman_client_add_task_low_background(gearman_client_st *client,
  562. gearman_task_st *task,
  563. void *context,
  564. const char *function,
  565. const char *unique,
  566. const void *workload, size_t workload_size,
  567. gearman_return_t *ret_ptr)
  568. {
  569. return add_task(client, task, context,
  570. GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
  571. function,
  572. unique,
  573. workload, workload_size,
  574. time_t(0),
  575. ret_ptr,
  576. client->actions);
  577. }
  578. gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
  579. gearman_task_st *task,
  580. void *context,
  581. const char *job_handle,
  582. gearman_return_t *ret_ptr)
  583. {
  584. const void *args[1];
  585. size_t args_size[1];
  586. gearman_return_t unused;
  587. if (not ret_ptr)
  588. ret_ptr= &unused;
  589. if (not (task= gearman_task_internal_create(client, task)))
  590. {
  591. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  592. return NULL;
  593. }
  594. task->context= context;
  595. snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
  596. args[0]= job_handle;
  597. args_size[0]= strlen(job_handle);
  598. gearman_return_t rc= gearman_packet_create_args(client->universal, task->send,
  599. GEARMAN_MAGIC_REQUEST,
  600. GEARMAN_COMMAND_GET_STATUS,
  601. args, args_size, 1);
  602. if (gearman_success(rc))
  603. {
  604. client->new_tasks++;
  605. client->running_tasks++;
  606. task->options.send_in_use= true;
  607. }
  608. *ret_ptr= rc;
  609. return task;
  610. }
  611. void gearman_client_set_workload_fn(gearman_client_st *client,
  612. gearman_workload_fn *function)
  613. {
  614. client->actions.workload_fn= function;
  615. }
  616. void gearman_client_set_created_fn(gearman_client_st *client,
  617. gearman_created_fn *function)
  618. {
  619. client->actions.created_fn= function;
  620. }
  621. void gearman_client_set_data_fn(gearman_client_st *client,
  622. gearman_data_fn *function)
  623. {
  624. client->actions.data_fn= function;
  625. }
  626. void gearman_client_set_warning_fn(gearman_client_st *client,
  627. gearman_warning_fn *function)
  628. {
  629. client->actions.warning_fn= function;
  630. }
  631. void gearman_client_set_status_fn(gearman_client_st *client,
  632. gearman_universal_status_fn *function)
  633. {
  634. client->actions.status_fn= function;
  635. }
  636. void gearman_client_set_complete_fn(gearman_client_st *client,
  637. gearman_complete_fn *function)
  638. {
  639. client->actions.complete_fn= function;
  640. }
  641. void gearman_client_set_exception_fn(gearman_client_st *client,
  642. gearman_exception_fn *function)
  643. {
  644. client->actions.exception_fn= function;
  645. }
  646. void gearman_client_set_fail_fn(gearman_client_st *client,
  647. gearman_fail_fn *function)
  648. {
  649. client->actions.fail_fn= function;
  650. }
  651. void gearman_client_clear_fn(gearman_client_st *client)
  652. {
  653. client->actions= gearman_actions_default();
  654. }
  655. static inline void _push_non_blocking(gearman_client_st *client)
  656. {
  657. client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
  658. client->universal.options.non_blocking= true;
  659. }
  660. static inline void _pop_non_blocking(gearman_client_st *client)
  661. {
  662. client->universal.options.non_blocking= client->options.non_blocking;
  663. assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
  664. }
  665. static inline void _push_blocking(gearman_client_st *client)
  666. {
  667. client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
  668. client->universal.options.non_blocking= false;
  669. }
  670. static inline void _pop_blocking(gearman_client_st *client)
  671. {
  672. client->universal.options.non_blocking= client->options.non_blocking;
  673. assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
  674. }
  675. static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
  676. {
  677. gearman_return_t ret= GEARMAN_MAX_RETURN;
  678. switch(client->state)
  679. {
  680. case GEARMAN_CLIENT_STATE_IDLE:
  681. while (1)
  682. {
  683. /* Start any new tasks. */
  684. if (client->new_tasks > 0 && ! (client->options.no_new))
  685. {
  686. for (client->task= client->task_list; client->task;
  687. client->task= client->task->next)
  688. {
  689. if (client->task->state != GEARMAN_TASK_STATE_NEW)
  690. continue;
  691. case GEARMAN_CLIENT_STATE_NEW:
  692. gearman_return_t local_ret= _client_run_task(client, client->task);
  693. if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
  694. {
  695. client->state= GEARMAN_CLIENT_STATE_NEW;
  696. return local_ret;
  697. }
  698. }
  699. if (client->new_tasks == 0)
  700. {
  701. gearman_return_t local_ret= gearman_flush_all(client->universal);
  702. if (gearman_failed(local_ret))
  703. {
  704. return local_ret;
  705. }
  706. }
  707. }
  708. /* See if there are any connections ready for I/O. */
  709. while ((client->con= gearman_ready(client->universal)))
  710. {
  711. if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
  712. {
  713. /* Socket is ready for writing, continue submitting jobs. */
  714. for (client->task= client->task_list; client->task;
  715. client->task= client->task->next)
  716. {
  717. if (client->task->con != client->con ||
  718. (client->task->state != GEARMAN_TASK_STATE_SUBMIT &&
  719. client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
  720. {
  721. continue;
  722. }
  723. case GEARMAN_CLIENT_STATE_SUBMIT:
  724. gearman_return_t local_ret= _client_run_task(client, client->task);
  725. if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
  726. {
  727. client->state= GEARMAN_CLIENT_STATE_SUBMIT;
  728. return local_ret;
  729. }
  730. }
  731. }
  732. if (not (client->con->revents & POLLIN))
  733. continue;
  734. /* Socket is ready for reading. */
  735. while (1)
  736. {
  737. /* Read packet on connection and find which task it belongs to. */
  738. if (client->options.unbuffered_result)
  739. {
  740. /* If client is handling the data read, make sure it's complete. */
  741. if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
  742. {
  743. for (client->task= client->task_list; client->task;
  744. client->task= client->task->next)
  745. {
  746. if (client->task->con == client->con &&
  747. (client->task->state == GEARMAN_TASK_STATE_DATA ||
  748. client->task->state == GEARMAN_TASK_STATE_COMPLETE))
  749. {
  750. break;
  751. }
  752. }
  753. assert(client->task);
  754. }
  755. else
  756. {
  757. /* Read the next packet, without buffering the data part. */
  758. client->task= NULL;
  759. (void)client->con->receiving(client->con->_packet, ret, false);
  760. }
  761. }
  762. else
  763. {
  764. /* Read the next packet, buffering the data part. */
  765. client->task= NULL;
  766. (void)client->con->receiving(client->con->_packet, ret, true);
  767. }
  768. if (client->task == NULL)
  769. {
  770. assert(ret != GEARMAN_MAX_RETURN);
  771. /* Check the return of the gearman_connection_recv() calls above. */
  772. if (gearman_failed(ret))
  773. {
  774. if (ret == GEARMAN_IO_WAIT)
  775. break;
  776. client->state= GEARMAN_CLIENT_STATE_IDLE;
  777. return ret;
  778. }
  779. client->con->options.packet_in_use= true;
  780. /* We have a packet, see which task it belongs to. */
  781. for (client->task= client->task_list; client->task;
  782. client->task= client->task->next)
  783. {
  784. if (client->task->con != client->con)
  785. continue;
  786. if (client->con->_packet.command == GEARMAN_COMMAND_JOB_CREATED)
  787. {
  788. if (client->task->created_id != client->con->created_id)
  789. continue;
  790. /* New job created, drop through below and notify task. */
  791. client->con->created_id++;
  792. }
  793. else if (client->con->_packet.command == GEARMAN_COMMAND_ERROR)
  794. {
  795. gearman_universal_set_error(client->universal, GEARMAN_SERVER_ERROR, AT,
  796. "%s:%.*s",
  797. static_cast<char *>(client->con->_packet.arg[0]),
  798. int(client->con->_packet.arg_size[1]),
  799. static_cast<char *>(client->con->_packet.arg[1]));
  800. return GEARMAN_SERVER_ERROR;
  801. }
  802. else if (strncmp(client->task->job_handle,
  803. static_cast<char *>(client->con->_packet.arg[0]),
  804. client->con->_packet.arg_size[0]) ||
  805. (client->con->_packet.command != GEARMAN_COMMAND_WORK_FAIL &&
  806. strlen(client->task->job_handle) != client->con->_packet.arg_size[0] - 1) ||
  807. (client->con->_packet.command == GEARMAN_COMMAND_WORK_FAIL &&
  808. strlen(client->task->job_handle) != client->con->_packet.arg_size[0]))
  809. {
  810. continue;
  811. }
  812. /* Else, we have a matching result packet of some kind. */
  813. break;
  814. }
  815. if (not client->task)
  816. {
  817. /* The client has stopped waiting for the response, ignore it. */
  818. gearman_packet_free(&(client->con->_packet));
  819. client->con->options.packet_in_use= false;
  820. continue;
  821. }
  822. client->task->recv= &(client->con->_packet);
  823. }
  824. case GEARMAN_CLIENT_STATE_PACKET:
  825. /* Let task process job created or result packet. */
  826. gearman_return_t local_ret= _client_run_task(client, client->task);
  827. if (local_ret == GEARMAN_IO_WAIT)
  828. break;
  829. if (gearman_failed(local_ret))
  830. {
  831. client->state= GEARMAN_CLIENT_STATE_PACKET;
  832. return local_ret;
  833. }
  834. /* Clean up the packet. */
  835. gearman_packet_free(&(client->con->_packet));
  836. client->con->options.packet_in_use= false;
  837. /* If all tasks are done, return. */
  838. if (client->running_tasks == 0)
  839. break;
  840. }
  841. }
  842. /* If all tasks are done, return. */
  843. if (client->running_tasks == 0)
  844. {
  845. break;
  846. }
  847. if (client->new_tasks > 0 && ! (client->options.no_new))
  848. continue;
  849. if (client->options.non_blocking)
  850. {
  851. /* Let the caller wait for activity. */
  852. client->state= GEARMAN_CLIENT_STATE_IDLE;
  853. gearman_gerror(client->universal, GEARMAN_IO_WAIT);
  854. return GEARMAN_IO_WAIT;
  855. }
  856. /* Wait for activity on one of the connections. */
  857. gearman_return_t local_ret= gearman_wait(client->universal);
  858. if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
  859. {
  860. client->state= GEARMAN_CLIENT_STATE_IDLE;
  861. return local_ret;
  862. }
  863. }
  864. break;
  865. }
  866. client->state= GEARMAN_CLIENT_STATE_IDLE;
  867. return GEARMAN_SUCCESS;
  868. }
  869. gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
  870. {
  871. if (not client)
  872. {
  873. return GEARMAN_INVALID_ARGUMENT;
  874. }
  875. if (not client->task_list)
  876. {
  877. return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "No active tasks");
  878. }
  879. _push_non_blocking(client);
  880. gearman_return_t rc= _client_run_tasks(client);
  881. _pop_non_blocking(client);
  882. if (gearman_failed(rc))
  883. {
  884. assert(gearman_universal_error_code(client->universal) == rc);
  885. }
  886. return rc;
  887. }
  888. gearman_return_t gearman_client_run_block_tasks(gearman_client_st *client)
  889. {
  890. if (not client)
  891. {
  892. return GEARMAN_INVALID_ARGUMENT;
  893. }
  894. if (not client->task_list)
  895. {
  896. return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "No active tasks");
  897. }
  898. _push_blocking(client);
  899. gearman_return_t rc= _client_run_tasks(client);
  900. _pop_blocking(client);
  901. if (gearman_failed(rc))
  902. {
  903. assert(gearman_universal_error_code(client->universal) == rc);
  904. }
  905. return rc;
  906. }
  907. /*
  908. * Static Definitions
  909. */
  910. static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone)
  911. {
  912. if (client)
  913. {
  914. client->options.allocated= false;
  915. }
  916. else
  917. {
  918. client= new (std::nothrow) gearman_client_st;
  919. if (not client)
  920. return NULL;
  921. client->options.allocated= true;
  922. }
  923. client->options.non_blocking= false;
  924. client->options.unbuffered_result= false;
  925. client->options.no_new= false;
  926. client->options.free_tasks= false;
  927. client->state= GEARMAN_CLIENT_STATE_IDLE;
  928. client->new_tasks= 0;
  929. client->running_tasks= 0;
  930. client->task_count= 0;
  931. client->context= NULL;
  932. client->con= NULL;
  933. client->task= NULL;
  934. client->task_list= NULL;
  935. client->task_context_free_fn= NULL;
  936. gearman_client_clear_fn(client);
  937. if (not is_clone)
  938. {
  939. gearman_universal_initialize(client->universal);
  940. }
  941. return client;
  942. }
  943. static gearman_return_t _client_add_server(const char *host, in_port_t port,
  944. void *context)
  945. {
  946. return gearman_client_add_server(static_cast<gearman_client_st *>(context), host, port);
  947. }
  948. static void *_client_do(gearman_client_st *client, gearman_command_t command,
  949. const char *function_name,
  950. const char *unique,
  951. const void *workload_str, size_t workload_size,
  952. size_t *result_size, gearman_return_t *ret_ptr)
  953. {
  954. gearman_task_st do_task, *do_task_ptr;
  955. gearman_string_t function= { gearman_string_param_cstr(function_name) };
  956. gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
  957. gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
  958. gearman_return_t unused;
  959. if (not ret_ptr)
  960. ret_ptr= &unused;
  961. if (not client)
  962. {
  963. *ret_ptr= GEARMAN_ERRNO;
  964. errno= EINVAL;
  965. return NULL;
  966. }
  967. do_task_ptr= add_task(client, &do_task, NULL, command,
  968. function,
  969. local_unique,
  970. workload,
  971. time_t(0),
  972. gearman_actions_do_default());
  973. if (not do_task_ptr)
  974. {
  975. *ret_ptr= gearman_universal_error_code(client->universal);
  976. return NULL;
  977. }
  978. gearman_return_t ret= gearman_client_run_tasks(client);
  979. const void *returnable= NULL;
  980. // gearman_client_run_tasks failed
  981. if (gearman_failed(ret))
  982. {
  983. gearman_error(client->universal, ret, "occured during gearman_client_run_tasks()");
  984. *ret_ptr= ret;
  985. *result_size= 0;
  986. }
  987. else if (ret == GEARMAN_SUCCESS and do_task_ptr->result_rc == GEARMAN_SUCCESS)
  988. {
  989. *ret_ptr= do_task_ptr->result_rc;
  990. assert(do_task_ptr);
  991. if (do_task_ptr->result_ptr)
  992. {
  993. gearman_string_t result= gearman_result_take_string(do_task_ptr->result_ptr);
  994. *result_size= gearman_size(result);
  995. returnable= gearman_c_str(result);
  996. }
  997. else // NULL job
  998. {
  999. *result_size= 0;
  1000. }
  1001. }
  1002. else // gearman_client_run_tasks() was successful, but the task was not
  1003. {
  1004. gearman_error(client->universal, do_task_ptr->result_rc, "occured during gearman_client_run_tasks()");
  1005. *ret_ptr= do_task_ptr->result_rc;
  1006. *result_size= 0;
  1007. }
  1008. assert(client->task_list);
  1009. gearman_task_free(&do_task);
  1010. client->new_tasks= 0;
  1011. client->running_tasks= 0;
  1012. return const_cast<void *>(returnable);
  1013. }
  1014. static gearman_return_t _client_do_background(gearman_client_st *client,
  1015. gearman_command_t command,
  1016. gearman_string_t &function,
  1017. gearman_unique_t &unique,
  1018. gearman_string_t &workload,
  1019. char *job_handle)
  1020. {
  1021. gearman_task_st do_task, *do_task_ptr;
  1022. do_task_ptr= add_task(client, &do_task,
  1023. client,
  1024. command,
  1025. function,
  1026. unique,
  1027. workload,
  1028. time_t(0),
  1029. gearman_actions_do_default());
  1030. if (not do_task_ptr)
  1031. {
  1032. return gearman_universal_error_code(client->universal);
  1033. }
  1034. gearman_task_clear_fn(do_task_ptr);
  1035. gearman_return_t ret= gearman_client_run_tasks(client);
  1036. if (ret != GEARMAN_IO_WAIT)
  1037. {
  1038. if (job_handle)
  1039. {
  1040. strncpy(job_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
  1041. }
  1042. client->new_tasks= 0;
  1043. client->running_tasks= 0;
  1044. }
  1045. gearman_task_free(&do_task);
  1046. return ret;
  1047. }
  1048. bool gearman_client_compare(const gearman_client_st *first, const gearman_client_st *second)
  1049. {
  1050. if (not first || not second)
  1051. return false;
  1052. if (strcmp(first->universal.con_list->host, second->universal.con_list->host))
  1053. return false;
  1054. if (first->universal.con_list->port != second->universal.con_list->port)
  1055. return false;
  1056. return true;
  1057. }
  1058. bool gearman_client_set_server_option(gearman_client_st *self, const char *option_arg, size_t option_arg_size)
  1059. {
  1060. gearman_string_t option= { option_arg, option_arg_size };
  1061. return gearman_request_option(self->universal, option);
  1062. }
  1063. void gearman_client_set_namespace(gearman_client_st *self, const char *namespace_key, size_t namespace_key_size)
  1064. {
  1065. if (not self)
  1066. return;
  1067. gearman_universal_set_namespace(self->universal, namespace_key, namespace_key_size);
  1068. }