job.cc 20 KB


  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. #include "gear_config.h"
  39. #include <libgearman/common.h>
  40. #include "libgearman/assert.hpp"
  41. #include "libgearman/vector.h"
  42. #include "libgearman/protocol/work_exception.h"
  43. #include <cstdio>
  44. #include <cstring>
  45. #include <memory>
  46. struct gearman_job_reducer_st {
  47. gearman_universal_st &universal;
  48. gearman_client_st *client;
  49. gearman_result_st result;
  50. gearman_vector_st *reducer_function;
  51. gearman_aggregator_fn *aggregator_fn;
  52. gearman_job_reducer_st(gearman_universal_st &universal_arg,
  53. const gearman_string_t &reducer_function_name,
  54. gearman_aggregator_fn *aggregator_fn_arg):
  55. universal(universal_arg),
  56. client(NULL),
  57. reducer_function(NULL),
  58. aggregator_fn(aggregator_fn_arg)
  59. {
  60. assert_msg(gearman_size(reducer_function_name), "Trying to creat a function with zero length");
  61. reducer_function= gearman_string_create(NULL, gearman_size(reducer_function_name));
  62. gearman_string_append(reducer_function, gearman_string_param(reducer_function_name));
  63. }
  64. const char* name() const
  65. {
  66. if (reducer_function)
  67. {
  68. return reducer_function->c_str();
  69. }
  70. return "__UNKNOWN";
  71. }
  72. ~gearman_job_reducer_st()
  73. {
  74. gearman_client_free(client);
  75. gearman_string_free(reducer_function);
  76. }
  77. bool init()
  78. {
  79. client= gearman_client_create(NULL);
  80. if (client)
  81. {
  82. gearman_universal_clone(client->impl()->universal, universal);
  83. #if 0
  84. if (universal._namespace)
  85. {
  86. gearman_client_set_namespace(client,
  87. gearman_string_value(universal._namespace),
  88. gearman_string_length(universal._namespace));
  89. }
  90. for (gearman_connection_st *con= universal.con_list; con; con= con->next_connection())
  91. {
  92. if (gearman_failed(client->impl()->add_server(con->_host, con->_service)))
  93. {
  94. return false;
  95. }
  96. }
  97. #endif
  98. return true;
  99. }
  100. return false;
  101. }
  102. bool add(gearman_argument_t &arguments)
  103. {
  104. gearman_string_t function= gearman_string(reducer_function);
  105. gearman_unique_t unique= gearman_unique_make(0, 0);
  106. gearman_task_st *task= add_task(*(client->impl()),
  107. NULL,
  108. GEARMAN_COMMAND_SUBMIT_JOB,
  109. function,
  110. unique,
  111. arguments.value,
  112. time_t(0),
  113. gearman_actions_execute_defaults());
  114. if (task == NULL)
  115. {
  116. gearman_universal_error_code(client->impl()->universal);
  117. return false;
  118. }
  119. return true;
  120. }
  121. gearman_return_t complete()
  122. {
  123. gearman_return_t rc;
  124. if (gearman_failed(rc= gearman_client_run_tasks(client)))
  125. {
  126. return rc;
  127. }
  128. gearman_task_st *check_task= client->impl()->task_list;
  129. if (check_task)
  130. {
  131. do
  132. {
  133. if (gearman_failed(check_task->impl()->error_code()))
  134. {
  135. return check_task->impl()->error_code();
  136. }
  137. } while ((check_task= gearman_next(check_task)));
  138. if (aggregator_fn)
  139. {
  140. gearman_aggregator_st aggregator(client->impl()->context);
  141. aggregator_fn(&aggregator, client->impl()->task_list, &result);
  142. }
  143. }
  144. return GEARMAN_SUCCESS;
  145. }
  146. };
  147. /**
  148. * @addtogroup gearman_job_static Static Job Declarations
  149. * @ingroup gearman_job
  150. * @{
  151. */
  152. /**
  153. * Send a packet for a job.
  154. */
  155. static gearman_return_t _job_send(Job* job);
  156. /*
  157. * Public Definitions
  158. */
  159. gearman_job_st *gearman_job_create(Worker* worker, gearman_job_st *job_shell)
  160. {
  161. assert(worker);
  162. if (worker)
  163. {
  164. Job* job;
  165. assert(job_shell == NULL);
  166. if (job_shell)
  167. {
  168. job= job_shell->impl();
  169. assert(job);
  170. }
  171. else
  172. {
  173. job= new (std::nothrow) Job(job_shell, *(worker));
  174. if (job == NULL)
  175. {
  176. gearman_error(worker->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "new failed for Job");
  177. return NULL;
  178. }
  179. }
  180. job->reducer= NULL;
  181. job->_error_code= GEARMAN_UNKNOWN_STATE;
  182. if (job->_worker.job_list)
  183. {
  184. job->_worker.job_list->prev= job;
  185. }
  186. job->next= job->_worker.job_list;
  187. job->prev= NULL;
  188. job->_worker.job_list= job;
  189. job->_worker.job_count++;
  190. job->con= NULL;
  191. return job->shell();
  192. }
  193. return NULL;
  194. }
  195. Job::Job(gearman_job_st* shell_, Worker& worker_):
  196. _worker(worker_),
  197. _client(NULL),
  198. next(NULL),
  199. prev(NULL),
  200. con(NULL),
  201. reducer(NULL),
  202. _error_code(GEARMAN_UNKNOWN_STATE),
  203. _shell(shell_)
  204. {
  205. if (shell_)
  206. {
  207. gearman_set_allocated(_shell, false);
  208. }
  209. else
  210. {
  211. _shell= &_owned_shell;
  212. gearman_set_allocated(_shell, true);
  213. }
  214. _shell->impl(this);
  215. gearman_set_initialized(_shell, true);
  216. }
  217. Job::~Job()
  218. {
  219. if (_client)
  220. {
  221. gearman_client_free(_client);
  222. }
  223. delete reducer;
  224. }
  225. gearman_client_st* Job::client()
  226. {
  227. if (_client == NULL)
  228. {
  229. _client= gearman_client_create(NULL);
  230. if (_client)
  231. {
  232. gearman_universal_clone(_client->impl()->universal, _worker.universal);
  233. }
  234. }
  235. return _client;
  236. }
  237. bool gearman_job_build_reducer(Job* job, gearman_aggregator_fn *aggregator_fn)
  238. {
  239. if (job->reducer)
  240. {
  241. return true;
  242. }
  243. gearman_string_t reducer_func= gearman_job_reducer_string(job);
  244. job->reducer= new (std::nothrow) gearman_job_reducer_st(job->universal(), reducer_func, aggregator_fn);
  245. if (job->reducer == NULL)
  246. {
  247. gearman_job_free(job->shell());
  248. return false;
  249. }
  250. if (job->reducer->init() == false)
  251. {
  252. gearman_job_free(job->shell());
  253. return false;
  254. }
  255. return true;
  256. }
  257. gearman_worker_st *gearman_job_clone_worker(gearman_job_st *job_shell)
  258. {
  259. if (job_shell and job_shell->impl())
  260. {
  261. return gearman_worker_clone(NULL, job_shell->impl()->_worker.shell());
  262. }
  263. return NULL;
  264. }
  265. gearman_client_st *gearman_job_use_client(gearman_job_st *job_shell)
  266. {
  267. if (job_shell and job_shell->impl())
  268. {
  269. return job_shell->impl()->client();
  270. }
  271. return NULL;
  272. }
  273. gearman_return_t gearman_job_send_data(gearman_job_st *job_shell, const void *data, size_t data_size)
  274. {
  275. if (job_shell and job_shell->impl())
  276. {
  277. Job* job= job_shell->impl();
  278. if (job->finished() == false)
  279. {
  280. const void *args[2];
  281. size_t args_size[2];
  282. if (job->reducer)
  283. {
  284. gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(data), data_size);
  285. job->reducer->add(value);
  286. return GEARMAN_SUCCESS;
  287. }
  288. if ((job->options.work_in_use) == false)
  289. {
  290. args[0]= job->assigned.arg[0];
  291. args_size[0]= job->assigned.arg_size[0];
  292. args[1]= data;
  293. args_size[1]= data_size;
  294. gearman_return_t ret= gearman_packet_create_args(job->universal(), job->work,
  295. GEARMAN_MAGIC_REQUEST,
  296. GEARMAN_COMMAND_WORK_DATA,
  297. args, args_size, 2);
  298. if (gearman_failed(ret))
  299. {
  300. return ret;
  301. }
  302. job->options.work_in_use= true;
  303. }
  304. return _job_send(job);
  305. }
  306. return GEARMAN_SUCCESS;
  307. }
  308. return GEARMAN_INVALID_ARGUMENT;
  309. }
  310. gearman_return_t gearman_job_send_warning(gearman_job_st *job_shell,
  311. const void *warning,
  312. size_t warning_size)
  313. {
  314. if (job_shell and job_shell->impl())
  315. {
  316. Job* job= job_shell->impl();
  317. if (job->finished() == false)
  318. {
  319. const void *args[2];
  320. size_t args_size[2];
  321. if ((job->options.work_in_use) == false)
  322. {
  323. args[0]= job->assigned.arg[0];
  324. args_size[0]= job->assigned.arg_size[0];
  325. args[1]= warning;
  326. args_size[1]= warning_size;
  327. gearman_return_t ret;
  328. ret= gearman_packet_create_args(job->universal(), job->work,
  329. GEARMAN_MAGIC_REQUEST,
  330. GEARMAN_COMMAND_WORK_WARNING,
  331. args, args_size, 2);
  332. if (gearman_failed(ret))
  333. {
  334. return ret;
  335. }
  336. job->options.work_in_use= true;
  337. }
  338. return _job_send(job);
  339. }
  340. return GEARMAN_SUCCESS;
  341. }
  342. return GEARMAN_INVALID_ARGUMENT;
  343. }
  344. gearman_return_t gearman_job_send_status(gearman_job_st *job_shell,
  345. uint32_t numerator,
  346. uint32_t denominator)
  347. {
  348. if (job_shell and job_shell->impl())
  349. {
  350. Job* job= job_shell->impl();
  351. if (job->finished() == false)
  352. {
  353. char numerator_string[12];
  354. char denominator_string[12];
  355. const void *args[3];
  356. size_t args_size[3];
  357. if (not (job->options.work_in_use))
  358. {
  359. snprintf(numerator_string, 12, "%u", numerator);
  360. snprintf(denominator_string, 12, "%u", denominator);
  361. args[0]= job->assigned.arg[0];
  362. args_size[0]= job->assigned.arg_size[0];
  363. args[1]= numerator_string;
  364. args_size[1]= strlen(numerator_string) + 1;
  365. args[2]= denominator_string;
  366. args_size[2]= strlen(denominator_string);
  367. gearman_return_t ret;
  368. ret= gearman_packet_create_args(job->universal(), job->work,
  369. GEARMAN_MAGIC_REQUEST,
  370. GEARMAN_COMMAND_WORK_STATUS,
  371. args, args_size, 3);
  372. if (gearman_failed(ret))
  373. {
  374. return ret;
  375. }
  376. job->options.work_in_use= true;
  377. }
  378. return _job_send(job);
  379. }
  380. return GEARMAN_SUCCESS;
  381. }
  382. return GEARMAN_INVALID_ARGUMENT;
  383. }
  384. gearman_return_t gearman_job_send_complete(gearman_job_st *job_shell,
  385. const void *result,
  386. size_t result_size)
  387. {
  388. if (job_shell and job_shell->impl())
  389. {
  390. Job* job= job_shell->impl();
  391. if (job->finished() == false)
  392. {
  393. if (job->reducer)
  394. {
  395. return GEARMAN_INVALID_ARGUMENT;
  396. }
  397. return gearman_job_send_complete_fin(job, result, result_size);
  398. }
  399. return GEARMAN_SUCCESS;
  400. }
  401. return GEARMAN_INVALID_ARGUMENT;
  402. }
  403. gearman_return_t gearman_job_send_complete_fin(Job* job,
  404. const void *result, size_t result_size)
  405. {
  406. if (job->finished() == false)
  407. {
  408. if (job->reducer)
  409. {
  410. if (result_size)
  411. {
  412. gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(result), result_size);
  413. job->reducer->add(value);
  414. }
  415. gearman_return_t rc= job->reducer->complete();
  416. if (gearman_failed(rc))
  417. {
  418. return gearman_universal_set_error(job->universal(), rc, GEARMAN_AT, "%s couldn't call complete()", job->reducer->name());
  419. }
  420. const gearman_vector_st *reduced_value= job->reducer->result.string();
  421. if (reduced_value)
  422. {
  423. result= gearman_string_value(reduced_value);
  424. result_size= gearman_string_length(reduced_value);
  425. }
  426. else
  427. {
  428. result= NULL;
  429. result_size= 0;
  430. }
  431. }
  432. const void *args[2];
  433. size_t args_size[2];
  434. if (not (job->options.work_in_use))
  435. {
  436. args[0]= job->assigned.arg[0];
  437. args_size[0]= job->assigned.arg_size[0];
  438. args[1]= result;
  439. args_size[1]= result_size;
  440. gearman_return_t ret= gearman_packet_create_args(job->_worker.universal, job->work,
  441. GEARMAN_MAGIC_REQUEST,
  442. GEARMAN_COMMAND_WORK_COMPLETE,
  443. args, args_size, 2);
  444. if (gearman_failed(ret))
  445. {
  446. return ret;
  447. }
  448. job->options.work_in_use= true;
  449. }
  450. gearman_return_t ret= _job_send(job);
  451. if (gearman_failed(ret))
  452. {
  453. return ret;
  454. }
  455. job->finished(true);
  456. }
  457. return GEARMAN_SUCCESS;
  458. }
  459. gearman_return_t gearman_job_send_exception(gearman_job_st *job_shell,
  460. const void *exception,
  461. size_t exception_size)
  462. {
  463. if (job_shell and job_shell->impl())
  464. {
  465. Job* job= job_shell->impl();
  466. if (exception == NULL or exception_size == 0)
  467. {
  468. return gearman_error(job->universal(), GEARMAN_INVALID_ARGUMENT, "No exception was provided");
  469. }
  470. if (job->finished() == false)
  471. {
  472. if (job->options.work_in_use == false)
  473. {
  474. gearman_string_t handle_string= { static_cast<const char *>(job->assigned.arg[0]), job->assigned.arg_size[0] };
  475. gearman_string_t exception_string= { static_cast<const char *>(exception), exception_size };
  476. gearman_return_t ret= libgearman::protocol::work_exception(job->_worker.universal, job->work, handle_string, exception_string);
  477. if (gearman_failed(ret))
  478. {
  479. return ret;
  480. }
  481. job->options.work_in_use= true;
  482. }
  483. if (gearman_failed(_job_send(job)))
  484. {
  485. return job->error_code();
  486. }
  487. job->finished(true);
  488. }
  489. return GEARMAN_SUCCESS;
  490. }
  491. return GEARMAN_INVALID_ARGUMENT;
  492. }
  493. gearman_return_t gearman_job_send_fail(gearman_job_st *job_shell)
  494. {
  495. if (job_shell and job_shell->impl())
  496. {
  497. Job* job= job_shell->impl();
  498. if (job->finished() == false)
  499. {
  500. if (job->reducer)
  501. {
  502. return gearman_error(job->universal(), GEARMAN_INVALID_ARGUMENT, "Job has a reducer");
  503. }
  504. return gearman_job_send_fail_fin(job);
  505. }
  506. return GEARMAN_SUCCESS;
  507. }
  508. return GEARMAN_INVALID_ARGUMENT;
  509. }
  510. gearman_return_t gearman_job_send_fail_fin(Job* job)
  511. {
  512. assert(job);
  513. if (job)
  514. {
  515. const void *args[1];
  516. size_t args_size[1];
  517. if (job->finished() == false)
  518. {
  519. if (not (job->options.work_in_use))
  520. {
  521. args[0]= job->assigned.arg[0];
  522. args_size[0]= job->assigned.arg_size[0] - 1;
  523. gearman_return_t ret= gearman_packet_create_args(job->_worker.universal, job->work,
  524. GEARMAN_MAGIC_REQUEST,
  525. GEARMAN_COMMAND_WORK_FAIL,
  526. args, args_size, 1);
  527. if (gearman_failed(ret))
  528. {
  529. return ret;
  530. }
  531. job->options.work_in_use= true;
  532. }
  533. gearman_return_t ret= _job_send(job);
  534. if (gearman_failed(ret))
  535. {
  536. return ret;
  537. }
  538. job->finished(true);
  539. }
  540. return GEARMAN_SUCCESS;
  541. }
  542. return GEARMAN_INVALID_ARGUMENT;
  543. }
  544. const char *gearman_job_handle(const gearman_job_st *job_shell)
  545. {
  546. if (job_shell and job_shell->impl())
  547. {
  548. Job* job= job_shell->impl();
  549. return static_cast<const char *>(job->assigned.arg[0]);
  550. }
  551. return NULL;
  552. }
  553. const char *gearman_job_function_name(const gearman_job_st *job_shell)
  554. {
  555. if (job_shell and job_shell->impl())
  556. {
  557. Job* job= job_shell->impl();
  558. return static_cast<char *>(job->assigned.arg[1]);
  559. }
  560. return NULL;
  561. }
  562. gearman_string_t gearman_job_function_name_string(const Job* job)
  563. {
  564. assert(job);
  565. if (job)
  566. {
  567. gearman_string_t temp= { job->assigned.arg[1], job->assigned.arg_size[1] };
  568. return temp;
  569. }
  570. static gearman_string_t ret= {0, 0};
  571. return ret;
  572. }
  573. const char *gearman_job_unique(const gearman_job_st *job_shell)
  574. {
  575. if (job_shell and job_shell->impl())
  576. {
  577. Job* job= job_shell->impl();
  578. if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ or
  579. job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL)
  580. {
  581. return static_cast<const char *>(job->assigned.arg[2]);
  582. }
  583. return "";
  584. }
  585. return NULL;
  586. }
  587. bool gearman_job_is_map(const Job* job)
  588. {
  589. assert(job);
  590. return bool(job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL) and job->assigned.arg_size[3] > 1;
  591. }
  592. gearman_string_t gearman_job_reducer_string(const Job* job)
  593. {
  594. if (job)
  595. {
  596. if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL and job->assigned.arg_size[3] > 1)
  597. {
  598. gearman_string_t temp= { job->assigned.arg[3], job->assigned.arg_size[3] -1 };
  599. return temp;
  600. }
  601. static gearman_string_t null_temp= { gearman_literal_param("") };
  602. return null_temp;
  603. }
  604. static gearman_string_t ret= {0, 0};
  605. return ret;
  606. }
  607. const char *gearman_job_reducer(const gearman_job_st *job_shell)
  608. {
  609. if (job_shell and job_shell->impl())
  610. {
  611. Job* job= job_shell->impl();
  612. if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL)
  613. {
  614. return static_cast<const char *>(job->assigned.arg[3]);
  615. }
  616. return "";
  617. }
  618. return NULL;
  619. }
  620. const void *gearman_job_workload(const gearman_job_st *job_shell)
  621. {
  622. if (job_shell and job_shell->impl())
  623. {
  624. Job* job= job_shell->impl();
  625. return job->assigned.data;
  626. }
  627. return NULL;
  628. }
  629. size_t gearman_job_workload_size(const gearman_job_st *job_shell)
  630. {
  631. if (job_shell and job_shell->impl())
  632. {
  633. Job* job= job_shell->impl();
  634. return job->assigned.data_size;
  635. }
  636. return 0;
  637. }
  638. void *gearman_job_take_workload(gearman_job_st *job_shell, size_t *data_size)
  639. {
  640. if (job_shell and job_shell->impl())
  641. {
  642. return gearman_packet_take_data(job_shell->impl()->assigned, data_size);
  643. }
  644. return NULL;
  645. }
  646. void gearman_job_free(gearman_job_st *job_shell)
  647. {
  648. #ifndef NDEBUG
  649. if (job_shell)
  650. {
  651. assert(job_shell->impl());
  652. }
  653. #endif
  654. if (job_shell and job_shell->impl())
  655. {
  656. Job* job= job_shell->impl();
  657. if (job->options.assigned_in_use)
  658. {
  659. gearman_packet_free(&(job->assigned));
  660. }
  661. if (job->options.work_in_use)
  662. {
  663. gearman_packet_free(&(job->work));
  664. }
  665. if (job->_worker.job_list == job)
  666. {
  667. job->_worker.job_list= job->next;
  668. }
  669. if (job->prev)
  670. {
  671. job->prev->next= job->next;
  672. }
  673. if (job->next)
  674. {
  675. job->next->prev= job->prev;
  676. }
  677. job->_worker.job_count--;
  678. delete job;
  679. }
  680. else if (job_shell)
  681. {
  682. assert(job_shell->impl());
  683. }
  684. }
  685. /*
  686. * Static Definitions
  687. */
  688. static gearman_return_t _job_send(Job *job)
  689. {
  690. gearman_return_t ret= job->con->send_packet(job->work, true);
  691. while ((ret == GEARMAN_IO_WAIT) or (ret == GEARMAN_TIMEOUT))
  692. {
  693. ret= gearman_wait(job->universal());
  694. if (ret == GEARMAN_SUCCESS)
  695. {
  696. ret= job->con->send_packet(job->work, true);
  697. }
  698. }
  699. if (gearman_failed(ret))
  700. {
  701. return ret;
  702. }
  703. gearman_packet_free(&(job->work));
  704. job->options.work_in_use= false;
  705. return GEARMAN_SUCCESS;
  706. }
  707. const char *gearman_job_error(gearman_job_st *job_shell)
  708. {
  709. if (job_shell and job_shell->impl())
  710. {
  711. return job_shell->impl()->_worker.error();
  712. }
  713. return NULL;
  714. }