worker.c 31 KB


  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Worker Definitions
  11. */
  12. #include "common.h"
  13. /**
  14. Private structure.
  15. */
  16. struct _worker_function_st
  17. {
  18. struct {
  19. bool packet_in_use:1;
  20. bool change:1;
  21. bool remove:1;
  22. } options;
  23. struct _worker_function_st *next;
  24. struct _worker_function_st *prev;
  25. char *function_name;
  26. size_t function_length;
  27. gearman_worker_fn *worker_fn;
  28. void *context;
  29. gearman_packet_st packet;
  30. };
  31. /**
  32. * @addtogroup gearman_worker_static Static Worker Declarations
  33. * @ingroup gearman_worker
  34. * @{
  35. */
  36. static inline struct _worker_function_st *_function_exist(gearman_worker_st *worker, const char *function_name, size_t function_length)
  37. {
  38. struct _worker_function_st *function;
  39. for (function= worker->function_list; function != NULL;
  40. function= function->next)
  41. {
  42. if (function_length == function->function_length)
  43. {
  44. if (! memcmp(function_name, function->function_name, function_length))
  45. break;
  46. }
  47. }
  48. return function;
  49. }
  50. /**
  51. * Allocate a worker structure.
  52. */
  53. static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone);
  54. /**
  55. * Initialize common packets for later use.
  56. */
  57. static gearman_return_t _worker_packet_init(gearman_worker_st *worker);
  58. /**
  59. * Callback function used when parsing server lists.
  60. */
  61. static gearman_return_t _worker_add_server(const char *host, in_port_t port,
  62. void *context);
  63. /**
  64. * Allocate and add a function to the register list.
  65. */
  66. static gearman_return_t _worker_function_create(gearman_worker_st *worker,
  67. const char *function_name,
  68. size_t function_length,
  69. uint32_t timeout,
  70. gearman_worker_fn *worker_fn,
  71. void *context);
  72. /**
  73. * Free a function.
  74. */
  75. static void _worker_function_free(gearman_worker_st *worker,
  76. struct _worker_function_st *function);
  77. /** @} */
  78. /*
  79. * Public Definitions
  80. */
  81. gearman_worker_st *gearman_worker_create(gearman_worker_st *worker)
  82. {
  83. worker= _worker_allocate(worker, false);
  84. if (worker == NULL)
  85. return NULL;
  86. if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
  87. {
  88. gearman_worker_free(worker);
  89. return NULL;
  90. }
  91. return worker;
  92. }
  93. gearman_worker_st *gearman_worker_clone(gearman_worker_st *worker,
  94. const gearman_worker_st *from)
  95. {
  96. gearman_universal_st *check;
  97. if (! from)
  98. {
  99. return _worker_allocate(worker, false);
  100. }
  101. worker= _worker_allocate(worker, true);
  102. if (worker == NULL)
  103. {
  104. return worker;
  105. }
  106. worker->options.non_blocking= from->options.non_blocking;
  107. worker->options.grab_job_in_use= from->options.grab_job_in_use;
  108. worker->options.pre_sleep_in_use= from->options.pre_sleep_in_use;
  109. worker->options.work_job_in_use= from->options.work_job_in_use;
  110. worker->options.change= from->options.change;
  111. worker->options.grab_uniq= from->options.grab_uniq;
  112. worker->options.timeout_return= from->options.timeout_return;
  113. check= gearman_universal_clone(&(worker->universal), &from->universal);
  114. if (check == NULL)
  115. {
  116. gearman_worker_free(worker);
  117. return NULL;
  118. }
  119. if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
  120. {
  121. gearman_worker_free(worker);
  122. return NULL;
  123. }
  124. return worker;
  125. }
  126. void gearman_worker_free(gearman_worker_st *worker)
  127. {
  128. if (! worker)
  129. return;
  130. gearman_worker_unregister_all(worker);
  131. if (worker->options.packet_init)
  132. {
  133. gearman_packet_free(&(worker->grab_job));
  134. gearman_packet_free(&(worker->pre_sleep));
  135. }
  136. if (worker->job != NULL)
  137. gearman_job_free(worker->job);
  138. if (worker->options.work_job_in_use)
  139. gearman_job_free(&(worker->work_job));
  140. if (worker->work_result != NULL)
  141. {
  142. if ((&worker->universal)->workload_free_fn == NULL)
  143. free(worker->work_result);
  144. else
  145. {
  146. (&worker->universal)->workload_free_fn(worker->work_result,
  147. (void *)(&worker->universal)->workload_free_context);
  148. }
  149. }
  150. while (worker->function_list != NULL)
  151. _worker_function_free(worker, worker->function_list);
  152. gearman_job_free_all(worker);
  153. if ((&worker->universal) != NULL)
  154. gearman_universal_free((&worker->universal));
  155. if (worker->options.allocated)
  156. free(worker);
  157. }
  158. const char *gearman_worker_error(const gearman_worker_st *worker)
  159. {
  160. return gearman_universal_error((&worker->universal));
  161. }
  162. int gearman_worker_errno(gearman_worker_st *worker)
  163. {
  164. return gearman_universal_errno((&worker->universal));
  165. }
  166. gearman_worker_options_t gearman_worker_options(const gearman_worker_st *worker)
  167. {
  168. gearman_worker_options_t options;
  169. memset(&options, 0, sizeof(gearman_worker_options_t));
  170. if (worker->options.allocated)
  171. options|= GEARMAN_WORKER_ALLOCATED;
  172. if (worker->options.non_blocking)
  173. options|= GEARMAN_WORKER_NON_BLOCKING;
  174. if (worker->options.packet_init)
  175. options|= GEARMAN_WORKER_PACKET_INIT;
  176. if (worker->options.grab_job_in_use)
  177. options|= GEARMAN_WORKER_GRAB_JOB_IN_USE;
  178. if (worker->options.pre_sleep_in_use)
  179. options|= GEARMAN_WORKER_PRE_SLEEP_IN_USE;
  180. if (worker->options.work_job_in_use)
  181. options|= GEARMAN_WORKER_WORK_JOB_IN_USE;
  182. if (worker->options.change)
  183. options|= GEARMAN_WORKER_CHANGE;
  184. if (worker->options.grab_uniq)
  185. options|= GEARMAN_WORKER_GRAB_UNIQ;
  186. if (worker->options.timeout_return)
  187. options|= GEARMAN_WORKER_TIMEOUT_RETURN;
  188. return options;
  189. }
  190. void gearman_worker_set_options(gearman_worker_st *worker,
  191. gearman_worker_options_t options)
  192. {
  193. gearman_worker_options_t usable_options[]= {
  194. GEARMAN_WORKER_NON_BLOCKING,
  195. GEARMAN_WORKER_GRAB_UNIQ,
  196. GEARMAN_WORKER_TIMEOUT_RETURN,
  197. GEARMAN_WORKER_MAX
  198. };
  199. gearman_worker_options_t *ptr;
  200. for (ptr= usable_options; *ptr != GEARMAN_WORKER_MAX ; ptr++)
  201. {
  202. if (options & *ptr)
  203. {
  204. gearman_worker_add_options(worker, *ptr);
  205. }
  206. else
  207. {
  208. gearman_worker_remove_options(worker, *ptr);
  209. }
  210. }
  211. }
  212. void gearman_worker_add_options(gearman_worker_st *worker,
  213. gearman_worker_options_t options)
  214. {
  215. if (options & GEARMAN_WORKER_NON_BLOCKING)
  216. {
  217. gearman_universal_add_options((&worker->universal), GEARMAN_NON_BLOCKING);
  218. worker->options.non_blocking= true;
  219. }
  220. if (options & GEARMAN_WORKER_GRAB_UNIQ)
  221. {
  222. worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_UNIQ;
  223. (void)gearman_packet_pack_header(&(worker->grab_job));
  224. worker->options.grab_uniq= true;
  225. }
  226. if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
  227. {
  228. worker->options.timeout_return= true;
  229. }
  230. }
  231. void gearman_worker_remove_options(gearman_worker_st *worker,
  232. gearman_worker_options_t options)
  233. {
  234. if (options & GEARMAN_WORKER_NON_BLOCKING)
  235. {
  236. gearman_universal_remove_options((&worker->universal), GEARMAN_NON_BLOCKING);
  237. worker->options.non_blocking= false;
  238. }
  239. if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
  240. {
  241. worker->options.timeout_return= false;
  242. }
  243. if (options & GEARMAN_WORKER_GRAB_UNIQ)
  244. {
  245. worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
  246. (void)gearman_packet_pack_header(&(worker->grab_job));
  247. worker->options.grab_uniq= false;
  248. }
  249. }
  250. int gearman_worker_timeout(gearman_worker_st *worker)
  251. {
  252. return gearman_universal_timeout((&worker->universal));
  253. }
  254. void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout)
  255. {
  256. gearman_worker_add_options(worker, GEARMAN_WORKER_TIMEOUT_RETURN);
  257. gearman_universal_set_timeout((&worker->universal), timeout);
  258. }
  259. void *gearman_worker_context(const gearman_worker_st *worker)
  260. {
  261. return worker->context;
  262. }
  263. void gearman_worker_set_context(gearman_worker_st *worker, void *context)
  264. {
  265. worker->context= context;
  266. }
  267. void gearman_worker_set_log_fn(gearman_worker_st *worker,
  268. gearman_log_fn *function, void *context,
  269. gearman_verbose_t verbose)
  270. {
  271. gearman_set_log_fn((&worker->universal), function, context, verbose);
  272. }
  273. void gearman_worker_set_workload_malloc_fn(gearman_worker_st *worker,
  274. gearman_malloc_fn *function,
  275. void *context)
  276. {
  277. gearman_set_workload_malloc_fn((&worker->universal), function, context);
  278. }
  279. void gearman_worker_set_workload_free_fn(gearman_worker_st *worker,
  280. gearman_free_fn *function,
  281. void *context)
  282. {
  283. gearman_set_workload_free_fn((&worker->universal), function, context);
  284. }
  285. gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
  286. const char *host, in_port_t port)
  287. {
  288. if (gearman_connection_create_args((&worker->universal), NULL, host, port) == NULL)
  289. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  290. return GEARMAN_SUCCESS;
  291. }
  292. gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker,
  293. const char *servers)
  294. {
  295. return gearman_parse_servers(servers, _worker_add_server, worker);
  296. }
  297. void gearman_worker_remove_servers(gearman_worker_st *worker)
  298. {
  299. gearman_free_all_cons((&worker->universal));
  300. }
  301. gearman_return_t gearman_worker_wait(gearman_worker_st *worker)
  302. {
  303. return gearman_wait((&worker->universal));
  304. }
  305. gearman_return_t gearman_worker_register(gearman_worker_st *worker,
  306. const char *function_name,
  307. uint32_t timeout)
  308. {
  309. return _worker_function_create(worker, function_name, strlen(function_name), timeout, NULL, NULL);
  310. }
  311. bool gearman_worker_function_exist(gearman_worker_st *worker,
  312. const char *function_name,
  313. size_t function_length)
  314. {
  315. struct _worker_function_st *function;
  316. function= _function_exist(worker, function_name, function_length);
  317. return (function && function->options.remove == false) ? true : false;
  318. }
  319. static inline gearman_return_t _worker_unregister(gearman_worker_st *worker,
  320. const char *function_name, size_t function_length)
  321. {
  322. struct _worker_function_st *function;
  323. gearman_return_t ret;
  324. const void *args[1];
  325. size_t args_size[1];
  326. function= _function_exist(worker, function_name, function_length);
  327. if (function == NULL || function->options.remove)
  328. return GEARMAN_NO_REGISTERED_FUNCTION;
  329. gearman_packet_free(&(function->packet));
  330. args[0]= function->function_name;
  331. args_size[0]= function->function_length;
  332. ret= gearman_packet_create_args((&worker->universal), &(function->packet),
  333. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CANT_DO,
  334. args, args_size, 1);
  335. if (ret != GEARMAN_SUCCESS)
  336. {
  337. function->options.packet_in_use= false;
  338. return ret;
  339. }
  340. function->options.change= true;
  341. function->options.remove= true;
  342. worker->options.change= true;
  343. return GEARMAN_SUCCESS;
  344. }
  345. gearman_return_t gearman_worker_unregister(gearman_worker_st *worker,
  346. const char *function_name)
  347. {
  348. return _worker_unregister(worker, function_name, strlen(function_name));
  349. }
  350. gearman_return_t gearman_worker_unregister_all(gearman_worker_st *worker)
  351. {
  352. gearman_return_t ret;
  353. struct _worker_function_st *function;
  354. uint32_t count= 0;
  355. if (worker->function_list == NULL)
  356. return GEARMAN_NO_REGISTERED_FUNCTIONS;
  357. /* Lets find out if we have any functions left that are valid */
  358. for (function= worker->function_list; function != NULL;
  359. function= function->next)
  360. {
  361. if (function->options.remove == false)
  362. count++;
  363. }
  364. if (count == 0)
  365. return GEARMAN_NO_REGISTERED_FUNCTIONS;
  366. gearman_packet_free(&(worker->function_list->packet));
  367. ret= gearman_packet_create_args((&worker->universal),
  368. &(worker->function_list->packet),
  369. GEARMAN_MAGIC_REQUEST,
  370. GEARMAN_COMMAND_RESET_ABILITIES,
  371. NULL, NULL, 0);
  372. if (ret != GEARMAN_SUCCESS)
  373. {
  374. worker->function_list->options.packet_in_use= false;
  375. return ret;
  376. }
  377. while (worker->function_list->next != NULL)
  378. _worker_function_free(worker, worker->function_list->next);
  379. worker->function_list->options.change= true;
  380. worker->function_list->options.remove= true;
  381. worker->options.change= true;
  382. return GEARMAN_SUCCESS;
  383. }
  384. gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
  385. gearman_job_st *job,
  386. gearman_return_t *ret_ptr)
  387. {
  388. struct _worker_function_st *function;
  389. uint32_t active;
  390. while (1)
  391. {
  392. switch (worker->state)
  393. {
  394. case GEARMAN_WORKER_STATE_START:
  395. /* If there are any new functions changes, send them now. */
  396. if (worker->options.change)
  397. {
  398. worker->function= worker->function_list;
  399. while (worker->function != NULL)
  400. {
  401. if (! (worker->function->options.change))
  402. {
  403. worker->function= worker->function->next;
  404. continue;
  405. }
  406. for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
  407. worker->con= worker->con->next)
  408. {
  409. if (worker->con->fd == -1)
  410. continue;
  411. case GEARMAN_WORKER_STATE_FUNCTION_SEND:
  412. *ret_ptr= gearman_connection_send(worker->con, &(worker->function->packet),
  413. true);
  414. if (*ret_ptr != GEARMAN_SUCCESS)
  415. {
  416. if (*ret_ptr == GEARMAN_IO_WAIT)
  417. worker->state= GEARMAN_WORKER_STATE_FUNCTION_SEND;
  418. else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  419. continue;
  420. return NULL;
  421. }
  422. }
  423. if (worker->function->options.remove)
  424. {
  425. function= worker->function->prev;
  426. _worker_function_free(worker, worker->function);
  427. if (function == NULL)
  428. worker->function= worker->function_list;
  429. else
  430. worker->function= function;
  431. }
  432. else
  433. {
  434. worker->function->options.change= false;
  435. worker->function= worker->function->next;
  436. }
  437. }
  438. worker->options.change= false;
  439. }
  440. if (worker->function_list == NULL)
  441. {
  442. gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
  443. "no functions have been registered");
  444. *ret_ptr= GEARMAN_NO_REGISTERED_FUNCTIONS;
  445. return NULL;
  446. }
  447. for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
  448. worker->con= worker->con->next)
  449. {
  450. /* If the connection to the job server is not active, start it. */
  451. if (worker->con->fd == -1)
  452. {
  453. for (worker->function= worker->function_list;
  454. worker->function != NULL;
  455. worker->function= worker->function->next)
  456. {
  457. case GEARMAN_WORKER_STATE_CONNECT:
  458. *ret_ptr= gearman_connection_send(worker->con, &(worker->function->packet),
  459. true);
  460. if (*ret_ptr != GEARMAN_SUCCESS)
  461. {
  462. if (*ret_ptr == GEARMAN_IO_WAIT)
  463. worker->state= GEARMAN_WORKER_STATE_CONNECT;
  464. else if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT ||
  465. *ret_ptr == GEARMAN_LOST_CONNECTION)
  466. {
  467. break;
  468. }
  469. return NULL;
  470. }
  471. }
  472. if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT)
  473. continue;
  474. }
  475. case GEARMAN_WORKER_STATE_GRAB_JOB_SEND:
  476. if (worker->con->fd == -1)
  477. continue;
  478. *ret_ptr= gearman_connection_send(worker->con, &(worker->grab_job), true);
  479. if (*ret_ptr != GEARMAN_SUCCESS)
  480. {
  481. if (*ret_ptr == GEARMAN_IO_WAIT)
  482. worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
  483. else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  484. continue;
  485. return NULL;
  486. }
  487. if (worker->job == NULL)
  488. {
  489. worker->job= gearman_job_create(worker, job);
  490. if (worker->job == NULL)
  491. {
  492. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  493. return NULL;
  494. }
  495. }
  496. while (1)
  497. {
  498. case GEARMAN_WORKER_STATE_GRAB_JOB_RECV:
  499. (void)gearman_connection_recv(worker->con, &(worker->job->assigned), ret_ptr,
  500. true);
  501. if (*ret_ptr != GEARMAN_SUCCESS)
  502. {
  503. if (*ret_ptr == GEARMAN_IO_WAIT)
  504. worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_RECV;
  505. else
  506. {
  507. gearman_job_free(worker->job);
  508. worker->job= NULL;
  509. if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  510. break;
  511. }
  512. return NULL;
  513. }
  514. if (worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN ||
  515. worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
  516. {
  517. worker->job->options.assigned_in_use= true;
  518. worker->job->con= worker->con;
  519. worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
  520. job= worker->job;
  521. worker->job= NULL;
  522. return job;
  523. }
  524. if (worker->job->assigned.command == GEARMAN_COMMAND_NO_JOB)
  525. {
  526. gearman_packet_free(&(worker->job->assigned));
  527. break;
  528. }
  529. if (worker->job->assigned.command != GEARMAN_COMMAND_NOOP)
  530. {
  531. gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
  532. "unexpected packet:%s",
  533. gearman_command_info_list[worker->job->assigned.command].name);
  534. gearman_packet_free(&(worker->job->assigned));
  535. gearman_job_free(worker->job);
  536. worker->job= NULL;
  537. *ret_ptr= GEARMAN_UNEXPECTED_PACKET;
  538. return NULL;
  539. }
  540. gearman_packet_free(&(worker->job->assigned));
  541. }
  542. }
  543. case GEARMAN_WORKER_STATE_PRE_SLEEP:
  544. for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
  545. worker->con= worker->con->next)
  546. {
  547. if (worker->con->fd == -1)
  548. continue;
  549. *ret_ptr= gearman_connection_send(worker->con, &(worker->pre_sleep), true);
  550. if (*ret_ptr != GEARMAN_SUCCESS)
  551. {
  552. if (*ret_ptr == GEARMAN_IO_WAIT)
  553. worker->state= GEARMAN_WORKER_STATE_PRE_SLEEP;
  554. else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  555. continue;
  556. return NULL;
  557. }
  558. }
  559. worker->state= GEARMAN_WORKER_STATE_START;
  560. /* Set a watch on all active connections that we sent a PRE_SLEEP to. */
  561. active= 0;
  562. for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
  563. worker->con= worker->con->next)
  564. {
  565. if (worker->con->fd == -1)
  566. continue;
  567. *ret_ptr= gearman_connection_set_events(worker->con, POLLIN);
  568. if (*ret_ptr != GEARMAN_SUCCESS)
  569. return NULL;
  570. active++;
  571. }
  572. if ((&worker->universal)->options.non_blocking)
  573. {
  574. *ret_ptr= GEARMAN_NO_JOBS;
  575. return NULL;
  576. }
  577. if (active == 0)
  578. {
  579. if ((&worker->universal)->timeout < 0)
  580. usleep(GEARMAN_WORKER_WAIT_TIMEOUT * 1000);
  581. else
  582. {
  583. if ((&worker->universal)->timeout > 0)
  584. usleep((unsigned int)(&worker->universal)->timeout * 1000);
  585. if (worker->options.timeout_return)
  586. {
  587. gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
  588. "timeout reached");
  589. *ret_ptr= GEARMAN_TIMEOUT;
  590. return NULL;
  591. }
  592. }
  593. }
  594. else
  595. {
  596. *ret_ptr= gearman_wait((&worker->universal));
  597. if (*ret_ptr != GEARMAN_SUCCESS && (*ret_ptr != GEARMAN_TIMEOUT ||
  598. worker->options.timeout_return))
  599. {
  600. return NULL;
  601. }
  602. }
  603. break;
  604. default:
  605. gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
  606. "unknown state: %u", worker->state);
  607. *ret_ptr= GEARMAN_UNKNOWN_STATE;
  608. return NULL;
  609. }
  610. }
  611. }
  612. void gearman_job_free(gearman_job_st *job)
  613. {
  614. if (job->options.assigned_in_use)
  615. gearman_packet_free(&(job->assigned));
  616. if (job->options.work_in_use)
  617. gearman_packet_free(&(job->work));
  618. if (job->worker->job_list == job)
  619. job->worker->job_list= job->next;
  620. if (job->prev != NULL)
  621. job->prev->next= job->next;
  622. if (job->next != NULL)
  623. job->next->prev= job->prev;
  624. job->worker->job_count--;
  625. if (job->options.allocated)
  626. free(job);
  627. }
  628. void gearman_job_free_all(gearman_worker_st *worker)
  629. {
  630. while (worker->job_list != NULL)
  631. gearman_job_free(worker->job_list);
  632. }
  633. gearman_return_t gearman_worker_add_function(gearman_worker_st *worker,
  634. const char *function_name,
  635. uint32_t timeout,
  636. gearman_worker_fn *worker_fn,
  637. void *context)
  638. {
  639. if (function_name == NULL)
  640. {
  641. gearman_universal_set_error((&worker->universal), "gearman_worker_add_function",
  642. "function name not given");
  643. return GEARMAN_INVALID_FUNCTION_NAME;
  644. }
  645. if (worker_fn == NULL)
  646. {
  647. gearman_universal_set_error((&worker->universal), "gearman_worker_add_function",
  648. "function not given");
  649. return GEARMAN_INVALID_WORKER_FUNCTION;
  650. }
  651. return _worker_function_create(worker, function_name, strlen(function_name),
  652. timeout, worker_fn,
  653. context);
  654. }
  655. gearman_return_t gearman_worker_work(gearman_worker_st *worker)
  656. {
  657. gearman_job_st *check;
  658. gearman_return_t ret;
  659. switch (worker->work_state)
  660. {
  661. case GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB:
  662. check= gearman_worker_grab_job(worker, &(worker->work_job), &ret);
  663. (void)check; // @todo test this good values
  664. if (ret != GEARMAN_SUCCESS)
  665. return ret;
  666. for (worker->work_function= worker->function_list;
  667. worker->work_function != NULL;
  668. worker->work_function= worker->work_function->next)
  669. {
  670. if (!strcmp(gearman_job_function_name(&(worker->work_job)),
  671. worker->work_function->function_name))
  672. {
  673. break;
  674. }
  675. }
  676. if (worker->work_function == NULL)
  677. {
  678. gearman_job_free(&(worker->work_job));
  679. gearman_universal_set_error((&worker->universal), "gearman_worker_work",
  680. "function not found");
  681. return GEARMAN_INVALID_FUNCTION_NAME;
  682. }
  683. if (worker->work_function->worker_fn == NULL)
  684. {
  685. gearman_job_free(&(worker->work_job));
  686. gearman_universal_set_error((&worker->universal), "gearman_worker_work",
  687. "no callback function supplied");
  688. return GEARMAN_INVALID_FUNCTION_NAME;
  689. }
  690. worker->options.work_job_in_use= true;
  691. worker->work_result_size= 0;
  692. case GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION:
  693. worker->work_result= worker->work_function->worker_fn(&(worker->work_job),
  694. (void *)worker->work_function->context,
  695. &(worker->work_result_size), &ret);
  696. if (ret == GEARMAN_WORK_FAIL)
  697. {
  698. ret= gearman_job_send_fail(&(worker->work_job));
  699. if (ret != GEARMAN_SUCCESS)
  700. {
  701. if (ret == GEARMAN_LOST_CONNECTION)
  702. break;
  703. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
  704. return ret;
  705. }
  706. break;
  707. }
  708. if (ret != GEARMAN_SUCCESS)
  709. {
  710. if (ret == GEARMAN_LOST_CONNECTION)
  711. break;
  712. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION;
  713. return ret;
  714. }
  715. case GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE:
  716. ret= gearman_job_send_complete(&(worker->work_job), worker->work_result,
  717. worker->work_result_size);
  718. if (ret == GEARMAN_IO_WAIT)
  719. {
  720. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE;
  721. return ret;
  722. }
  723. if (worker->work_result != NULL)
  724. {
  725. if ((&worker->universal)->workload_free_fn == NULL)
  726. {
  727. free(worker->work_result);
  728. }
  729. else
  730. {
  731. (&worker->universal)->workload_free_fn(worker->work_result,
  732. (&worker->universal)->workload_free_context);
  733. }
  734. worker->work_result= NULL;
  735. }
  736. if (ret != GEARMAN_SUCCESS)
  737. {
  738. if (ret == GEARMAN_LOST_CONNECTION)
  739. break;
  740. return ret;
  741. }
  742. break;
  743. case GEARMAN_WORKER_WORK_UNIVERSAL_FAIL:
  744. ret= gearman_job_send_fail(&(worker->work_job));
  745. if (ret != GEARMAN_SUCCESS)
  746. {
  747. if (ret == GEARMAN_LOST_CONNECTION)
  748. break;
  749. return ret;
  750. }
  751. break;
  752. default:
  753. gearman_universal_set_error((&worker->universal), "gearman_worker_work",
  754. "unknown state: %u", worker->work_state);
  755. return GEARMAN_UNKNOWN_STATE;
  756. }
  757. gearman_job_free(&(worker->work_job));
  758. worker->options.work_job_in_use= false;
  759. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB;
  760. return GEARMAN_SUCCESS;
  761. }
  762. gearman_return_t gearman_worker_echo(gearman_worker_st *worker,
  763. const void *workload,
  764. size_t workload_size)
  765. {
  766. return gearman_echo((&worker->universal), workload, workload_size);
  767. }
  768. /*
  769. * Static Definitions
  770. */
  771. static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone)
  772. {
  773. if (worker == NULL)
  774. {
  775. worker= malloc(sizeof(gearman_worker_st));
  776. if (worker == NULL)
  777. return NULL;
  778. worker->options.allocated= true;
  779. }
  780. else
  781. {
  782. worker->options.allocated= false;
  783. }
  784. worker->options.non_blocking= false;
  785. worker->options.packet_init= false;
  786. worker->options.grab_job_in_use= false;
  787. worker->options.pre_sleep_in_use= false;
  788. worker->options.work_job_in_use= false;
  789. worker->options.change= false;
  790. worker->options.grab_uniq= false;
  791. worker->options.timeout_return= false;
  792. worker->state= 0;
  793. worker->work_state= 0;
  794. worker->function_count= 0;
  795. worker->job_count= 0;
  796. worker->work_result_size= 0;
  797. worker->con= NULL;
  798. worker->job= NULL;
  799. worker->job_list= NULL;
  800. worker->function= NULL;
  801. worker->function_list= NULL;
  802. worker->work_function= NULL;
  803. worker->work_result= NULL;
  804. if (! is_clone)
  805. {
  806. gearman_universal_st *check;
  807. check= gearman_universal_create(&worker->universal, NULL);
  808. if (check == NULL)
  809. {
  810. gearman_worker_free(worker);
  811. return NULL;
  812. }
  813. gearman_universal_set_timeout((&worker->universal), GEARMAN_WORKER_WAIT_TIMEOUT);
  814. }
  815. return worker;
  816. }
  817. static gearman_return_t _worker_packet_init(gearman_worker_st *worker)
  818. {
  819. gearman_return_t ret;
  820. ret= gearman_packet_create_args((&worker->universal), &(worker->grab_job),
  821. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_GRAB_JOB,
  822. NULL, NULL, 0);
  823. if (ret != GEARMAN_SUCCESS)
  824. return ret;
  825. ret= gearman_packet_create_args((&worker->universal), &(worker->pre_sleep),
  826. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_PRE_SLEEP,
  827. NULL, NULL, 0);
  828. if (ret != GEARMAN_SUCCESS)
  829. {
  830. gearman_packet_free(&(worker->grab_job));
  831. return ret;
  832. }
  833. worker->options.packet_init= true;
  834. return GEARMAN_SUCCESS;
  835. }
  836. static gearman_return_t _worker_add_server(const char *host, in_port_t port,
  837. void *context)
  838. {
  839. return gearman_worker_add_server((gearman_worker_st *)context, host, port);
  840. }
  841. static gearman_return_t _worker_function_create(gearman_worker_st *worker,
  842. const char *function_name,
  843. size_t function_length,
  844. uint32_t timeout,
  845. gearman_worker_fn *worker_fn,
  846. void *context)
  847. {
  848. struct _worker_function_st *function;
  849. gearman_return_t ret;
  850. char timeout_buffer[11];
  851. const void *args[2];
  852. size_t args_size[2];
  853. function= malloc(sizeof(struct _worker_function_st));
  854. if (function == NULL)
  855. {
  856. gearman_universal_set_error((&worker->universal), "_worker_function_create", "malloc");
  857. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  858. }
  859. function->options.packet_in_use= true;
  860. function->options.change= true;
  861. function->options.remove= false;
  862. function->function_name= strdup(function_name);
  863. function->function_length= function_length;
  864. if (function->function_name == NULL)
  865. {
  866. free(function);
  867. gearman_universal_set_error((&worker->universal), "gearman_worker_add_function", "strdup");
  868. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  869. }
  870. function->worker_fn= worker_fn;
  871. function->context= context;
  872. if (timeout > 0)
  873. {
  874. snprintf(timeout_buffer, 11, "%u", timeout);
  875. args[0]= function_name;
  876. args_size[0]= strlen(function_name) + 1;
  877. args[1]= timeout_buffer;
  878. args_size[1]= strlen(timeout_buffer);
  879. ret= gearman_packet_create_args((&worker->universal), &(function->packet),
  880. GEARMAN_MAGIC_REQUEST,
  881. GEARMAN_COMMAND_CAN_DO_TIMEOUT,
  882. args, args_size, 2);
  883. }
  884. else
  885. {
  886. args[0]= function->function_name;
  887. args_size[0]= function->function_length= function_length;
  888. ret= gearman_packet_create_args((&worker->universal), &(function->packet),
  889. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
  890. args, args_size, 1);
  891. }
  892. if (ret != GEARMAN_SUCCESS)
  893. {
  894. free(function->function_name);
  895. free(function);
  896. return ret;
  897. }
  898. if (worker->function_list != NULL)
  899. worker->function_list->prev= function;
  900. function->next= worker->function_list;
  901. function->prev= NULL;
  902. worker->function_list= function;
  903. worker->function_count++;
  904. worker->options.change= true;
  905. return GEARMAN_SUCCESS;
  906. }
  907. static void _worker_function_free(gearman_worker_st *worker,
  908. struct _worker_function_st *function)
  909. {
  910. if (worker->function_list == function)
  911. worker->function_list= function->next;
  912. if (function->prev != NULL)
  913. function->prev->next= function->next;
  914. if (function->next != NULL)
  915. function->next->prev= function->prev;
  916. worker->function_count--;
  917. if (function->options.packet_in_use)
  918. gearman_packet_free(&(function->packet));
  919. free(function->function_name);
  920. free(function);
  921. }