worker.cc 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Worker Definitions
  11. */
  12. #include <libgearman/common.h>
  13. #include <libgearman/connection.h>
  14. #include <cassert>
  15. #include <cstdio>
  16. #include <cstdlib>
  17. #include <cstring>
  18. #include <memory>
  19. #include <unistd.h>
  20. /**
  21. Private structure.
  22. */
  23. struct _worker_function_st
  24. {
  25. struct {
  26. bool packet_in_use:1;
  27. bool change:1;
  28. bool remove:1;
  29. } options;
  30. struct _worker_function_st *next;
  31. struct _worker_function_st *prev;
  32. char *function_name;
  33. size_t function_length;
  34. gearman_worker_fn *worker_fn;
  35. gearman_mapper_fn *mapper_fn;
  36. void *context;
  37. gearman_packet_st packet;
  38. _worker_function_st(gearman_worker_fn *worker_fn_arg, gearman_mapper_fn *mapper_fn_arg, void *context_arg) :
  39. function_name(NULL),
  40. function_length(0),
  41. worker_fn(worker_fn_arg),
  42. mapper_fn(mapper_fn_arg),
  43. context(context_arg)
  44. {
  45. options.packet_in_use= true;
  46. options.change= true;
  47. options.remove= false;
  48. }
  49. bool init(const char *name_arg, size_t size)
  50. {
  51. function_name= new (std::nothrow) char[size +1];
  52. if (not function_name)
  53. {
  54. return false;
  55. }
  56. memcpy(function_name, name_arg, size);
  57. function_length= size;
  58. function_name[size]= 0;
  59. return true;
  60. }
  61. const char *name() const
  62. {
  63. return function_name;
  64. }
  65. size_t length() const
  66. {
  67. return function_length;
  68. }
  69. ~_worker_function_st()
  70. {
  71. delete [] function_name;
  72. }
  73. };
  74. /**
  75. * @addtogroup gearman_worker_static Static Worker Declarations
  76. * @ingroup gearman_worker
  77. * @{
  78. */
  79. static inline struct _worker_function_st *_function_exist(gearman_worker_st *worker, const char *function_name, size_t function_length)
  80. {
  81. struct _worker_function_st *function;
  82. for (function= worker->function_list; function;
  83. function= function->next)
  84. {
  85. if (function_length == function->function_length)
  86. {
  87. if (! memcmp(function_name, function->function_name, function_length))
  88. break;
  89. }
  90. }
  91. return function;
  92. }
  93. /**
  94. * Allocate a worker structure.
  95. */
  96. static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone);
  97. /**
  98. * Initialize common packets for later use.
  99. */
  100. static gearman_return_t _worker_packet_init(gearman_worker_st *worker);
  101. /**
  102. * Callback function used when parsing server lists.
  103. */
  104. static gearman_return_t _worker_add_server(const char *host, in_port_t port, void *context);
  105. /**
  106. * Allocate and add a function to the register list.
  107. */
  108. static gearman_return_t _worker_function_create(gearman_worker_st *worker,
  109. const char *function_name,
  110. size_t function_length,
  111. uint32_t timeout,
  112. gearman_worker_fn *worker_fb,
  113. gearman_mapper_fn *mapper_fn,
  114. void *context);
  115. /**
  116. * Free a function.
  117. */
  118. static void _worker_function_free(gearman_worker_st *worker,
  119. struct _worker_function_st *function);
  120. /** @} */
  121. /*
  122. * Public Definitions
  123. */
  124. gearman_worker_st *gearman_worker_create(gearman_worker_st *worker)
  125. {
  126. worker= _worker_allocate(worker, false);
  127. if (not worker)
  128. return NULL;
  129. if (gearman_failed(_worker_packet_init(worker)))
  130. {
  131. gearman_worker_free(worker);
  132. return NULL;
  133. }
  134. return worker;
  135. }
  136. gearman_worker_st *gearman_worker_clone(gearman_worker_st *worker,
  137. const gearman_worker_st *from)
  138. {
  139. if (not from)
  140. {
  141. return _worker_allocate(worker, false);
  142. }
  143. worker= _worker_allocate(worker, true);
  144. if (not worker)
  145. return worker;
  146. worker->options.non_blocking= from->options.non_blocking;
  147. worker->options.change= from->options.change;
  148. worker->options.grab_uniq= from->options.grab_uniq;
  149. worker->options.grab_all= from->options.grab_all;
  150. worker->options.timeout_return= from->options.timeout_return;
  151. if (not gearman_universal_clone(&(worker->universal), &from->universal))
  152. {
  153. gearman_worker_free(worker);
  154. return NULL;
  155. }
  156. if (gearman_failed(_worker_packet_init(worker)))
  157. {
  158. gearman_worker_free(worker);
  159. return NULL;
  160. }
  161. return worker;
  162. }
  163. void gearman_worker_free(gearman_worker_st *worker)
  164. {
  165. if (not worker)
  166. return;
  167. gearman_worker_unregister_all(worker);
  168. if (worker->options.packet_init)
  169. {
  170. gearman_packet_free(&(worker->grab_job));
  171. gearman_packet_free(&(worker->pre_sleep));
  172. }
  173. gearman_job_free(worker->job);
  174. worker->work_job= NULL;
  175. if (worker->work_result)
  176. {
  177. if ((&worker->universal)->workload_free_fn == NULL)
  178. {
  179. // Created with malloc
  180. free(worker->work_result);
  181. }
  182. else
  183. {
  184. (&worker->universal)->workload_free_fn(worker->work_result,
  185. static_cast<void *>((&worker->universal)->workload_free_context));
  186. }
  187. }
  188. while (worker->function_list)
  189. _worker_function_free(worker, worker->function_list);
  190. gearman_job_free_all(worker);
  191. if ((&worker->universal))
  192. gearman_universal_free((&worker->universal));
  193. if (worker->options.allocated)
  194. delete worker;
  195. }
  196. const char *gearman_worker_error(const gearman_worker_st *worker)
  197. {
  198. return gearman_universal_error((&worker->universal));
  199. }
  200. int gearman_worker_errno(gearman_worker_st *worker)
  201. {
  202. return gearman_universal_errno((&worker->universal));
  203. }
  204. gearman_worker_options_t gearman_worker_options(const gearman_worker_st *worker)
  205. {
  206. int options;
  207. memset(&options, 0, sizeof(gearman_worker_options_t));
  208. if (worker->options.allocated)
  209. options|= int(GEARMAN_WORKER_ALLOCATED);
  210. if (worker->options.non_blocking)
  211. options|= int(GEARMAN_WORKER_NON_BLOCKING);
  212. if (worker->options.packet_init)
  213. options|= int(GEARMAN_WORKER_PACKET_INIT);
  214. if (worker->options.change)
  215. options|= int(GEARMAN_WORKER_CHANGE);
  216. if (worker->options.grab_uniq)
  217. options|= int(GEARMAN_WORKER_GRAB_UNIQ);
  218. if (worker->options.grab_all)
  219. options|= int(GEARMAN_WORKER_GRAB_ALL);
  220. if (worker->options.timeout_return)
  221. options|= int(GEARMAN_WORKER_TIMEOUT_RETURN);
  222. return gearman_worker_options_t(options);
  223. }
  224. void gearman_worker_set_options(gearman_worker_st *worker,
  225. gearman_worker_options_t options)
  226. {
  227. gearman_worker_options_t usable_options[]= {
  228. GEARMAN_WORKER_NON_BLOCKING,
  229. GEARMAN_WORKER_GRAB_UNIQ,
  230. GEARMAN_WORKER_GRAB_ALL,
  231. GEARMAN_WORKER_TIMEOUT_RETURN,
  232. GEARMAN_WORKER_MAX
  233. };
  234. gearman_worker_options_t *ptr;
  235. for (ptr= usable_options; *ptr != GEARMAN_WORKER_MAX ; ptr++)
  236. {
  237. if (options & *ptr)
  238. {
  239. gearman_worker_add_options(worker, *ptr);
  240. }
  241. else
  242. {
  243. gearman_worker_remove_options(worker, *ptr);
  244. }
  245. }
  246. }
  247. void gearman_worker_add_options(gearman_worker_st *worker,
  248. gearman_worker_options_t options)
  249. {
  250. if (options & GEARMAN_WORKER_NON_BLOCKING)
  251. {
  252. gearman_universal_add_options((&worker->universal), GEARMAN_NON_BLOCKING);
  253. worker->options.non_blocking= true;
  254. }
  255. if (options & GEARMAN_WORKER_GRAB_UNIQ)
  256. {
  257. worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_UNIQ;
  258. gearman_return_t rc= gearman_packet_pack_header(&(worker->grab_job));
  259. assert(gearman_success(rc));
  260. worker->options.grab_uniq= true;
  261. }
  262. if (options & GEARMAN_WORKER_GRAB_ALL)
  263. {
  264. worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_ALL;
  265. gearman_return_t rc= gearman_packet_pack_header(&(worker->grab_job));
  266. assert(gearman_success(rc));
  267. worker->options.grab_all= true;
  268. }
  269. if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
  270. {
  271. worker->options.timeout_return= true;
  272. }
  273. }
  274. void gearman_worker_remove_options(gearman_worker_st *worker,
  275. gearman_worker_options_t options)
  276. {
  277. if (options & GEARMAN_WORKER_NON_BLOCKING)
  278. {
  279. gearman_universal_remove_options((&worker->universal), GEARMAN_NON_BLOCKING);
  280. worker->options.non_blocking= false;
  281. }
  282. if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
  283. {
  284. worker->options.timeout_return= false;
  285. }
  286. if (options & GEARMAN_WORKER_GRAB_UNIQ)
  287. {
  288. worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
  289. (void)gearman_packet_pack_header(&(worker->grab_job));
  290. worker->options.grab_uniq= false;
  291. }
  292. if (options & GEARMAN_WORKER_GRAB_ALL)
  293. {
  294. worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
  295. (void)gearman_packet_pack_header(&(worker->grab_job));
  296. worker->options.grab_all= false;
  297. }
  298. }
  299. int gearman_worker_timeout(gearman_worker_st *worker)
  300. {
  301. return gearman_universal_timeout((&worker->universal));
  302. }
  303. void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout)
  304. {
  305. gearman_worker_add_options(worker, GEARMAN_WORKER_TIMEOUT_RETURN);
  306. gearman_universal_set_timeout((&worker->universal), timeout);
  307. }
  308. void *gearman_worker_context(const gearman_worker_st *worker)
  309. {
  310. if (not worker)
  311. return NULL;
  312. return worker->context;
  313. }
  314. void gearman_worker_set_context(gearman_worker_st *worker, void *context)
  315. {
  316. if (not worker)
  317. return;
  318. worker->context= context;
  319. }
  320. void gearman_worker_set_log_fn(gearman_worker_st *worker,
  321. gearman_log_fn *function, void *context,
  322. gearman_verbose_t verbose)
  323. {
  324. gearman_set_log_fn((&worker->universal), function, context, verbose);
  325. }
  326. void gearman_worker_set_workload_malloc_fn(gearman_worker_st *worker,
  327. gearman_malloc_fn *function,
  328. void *context)
  329. {
  330. gearman_set_workload_malloc_fn((&worker->universal), function, context);
  331. }
  332. void gearman_worker_set_workload_free_fn(gearman_worker_st *worker,
  333. gearman_free_fn *function,
  334. void *context)
  335. {
  336. gearman_set_workload_free_fn((&worker->universal), function, context);
  337. }
  338. gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
  339. const char *host, in_port_t port)
  340. {
  341. if (gearman_connection_create_args((&worker->universal), NULL, host, port) == NULL)
  342. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  343. return GEARMAN_SUCCESS;
  344. }
  345. gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker,
  346. const char *servers)
  347. {
  348. return gearman_parse_servers(servers, _worker_add_server, worker);
  349. }
  350. void gearman_worker_remove_servers(gearman_worker_st *worker)
  351. {
  352. gearman_free_all_cons((&worker->universal));
  353. }
  354. gearman_return_t gearman_worker_wait(gearman_worker_st *worker)
  355. {
  356. return gearman_wait((&worker->universal));
  357. }
  358. gearman_return_t gearman_worker_register(gearman_worker_st *worker,
  359. const char *function_name,
  360. uint32_t timeout)
  361. {
  362. return _worker_function_create(worker, function_name, strlen(function_name), timeout, NULL, NULL, NULL);
  363. }
  364. bool gearman_worker_function_exist(gearman_worker_st *worker,
  365. const char *function_name,
  366. size_t function_length)
  367. {
  368. struct _worker_function_st *function;
  369. function= _function_exist(worker, function_name, function_length);
  370. return (function && function->options.remove == false) ? true : false;
  371. }
  372. static inline gearman_return_t _worker_unregister(gearman_worker_st *worker,
  373. const char *function_name, size_t function_length)
  374. {
  375. struct _worker_function_st *function;
  376. gearman_return_t ret;
  377. const void *args[1];
  378. size_t args_size[1];
  379. function= _function_exist(worker, function_name, function_length);
  380. if (function == NULL || function->options.remove)
  381. return GEARMAN_NO_REGISTERED_FUNCTION;
  382. gearman_packet_free(&(function->packet));
  383. args[0]= function->name();
  384. args_size[0]= function->length();
  385. ret= gearman_packet_create_args((&worker->universal), &(function->packet),
  386. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CANT_DO,
  387. args, args_size, 1);
  388. if (ret != GEARMAN_SUCCESS)
  389. {
  390. function->options.packet_in_use= false;
  391. return ret;
  392. }
  393. function->options.change= true;
  394. function->options.remove= true;
  395. worker->options.change= true;
  396. return GEARMAN_SUCCESS;
  397. }
  398. gearman_return_t gearman_worker_unregister(gearman_worker_st *worker,
  399. const char *function_name)
  400. {
  401. return _worker_unregister(worker, function_name, strlen(function_name));
  402. }
  403. gearman_return_t gearman_worker_unregister_all(gearman_worker_st *worker)
  404. {
  405. gearman_return_t ret;
  406. struct _worker_function_st *function;
  407. uint32_t count= 0;
  408. if (not worker->function_list)
  409. return GEARMAN_NO_REGISTERED_FUNCTIONS;
  410. /* Lets find out if we have any functions left that are valid */
  411. for (function= worker->function_list; function;
  412. function= function->next)
  413. {
  414. if (function->options.remove == false)
  415. count++;
  416. }
  417. if (count == 0)
  418. return GEARMAN_NO_REGISTERED_FUNCTIONS;
  419. gearman_packet_free(&(worker->function_list->packet));
  420. ret= gearman_packet_create_args((&worker->universal),
  421. &(worker->function_list->packet),
  422. GEARMAN_MAGIC_REQUEST,
  423. GEARMAN_COMMAND_RESET_ABILITIES,
  424. NULL, NULL, 0);
  425. if (ret != GEARMAN_SUCCESS)
  426. {
  427. worker->function_list->options.packet_in_use= false;
  428. return ret;
  429. }
  430. while (worker->function_list->next)
  431. _worker_function_free(worker, worker->function_list->next);
  432. worker->function_list->options.change= true;
  433. worker->function_list->options.remove= true;
  434. worker->options.change= true;
  435. return GEARMAN_SUCCESS;
  436. }
  437. gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
  438. gearman_job_st *job,
  439. gearman_return_t *ret_ptr)
  440. {
  441. struct _worker_function_st *function;
  442. uint32_t active;
  443. while (1)
  444. {
  445. switch (worker->state)
  446. {
  447. case GEARMAN_WORKER_STATE_START:
  448. /* If there are any new functions changes, send them now. */
  449. if (worker->options.change)
  450. {
  451. worker->function= worker->function_list;
  452. while (worker->function)
  453. {
  454. if (! (worker->function->options.change))
  455. {
  456. worker->function= worker->function->next;
  457. continue;
  458. }
  459. for (worker->con= (&worker->universal)->con_list; worker->con;
  460. worker->con= worker->con->next)
  461. {
  462. if (worker->con->fd == -1)
  463. continue;
  464. case GEARMAN_WORKER_STATE_FUNCTION_SEND:
  465. *ret_ptr= gearman_connection_send(worker->con, &(worker->function->packet),
  466. true);
  467. if (*ret_ptr != GEARMAN_SUCCESS)
  468. {
  469. if (*ret_ptr == GEARMAN_IO_WAIT)
  470. worker->state= GEARMAN_WORKER_STATE_FUNCTION_SEND;
  471. else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  472. continue;
  473. return NULL;
  474. }
  475. }
  476. if (worker->function->options.remove)
  477. {
  478. function= worker->function->prev;
  479. _worker_function_free(worker, worker->function);
  480. if (function == NULL)
  481. worker->function= worker->function_list;
  482. else
  483. worker->function= function;
  484. }
  485. else
  486. {
  487. worker->function->options.change= false;
  488. worker->function= worker->function->next;
  489. }
  490. }
  491. worker->options.change= false;
  492. }
  493. if (worker->function_list == NULL)
  494. {
  495. gearman_error((&worker->universal), GEARMAN_NO_REGISTERED_FUNCTIONS, "no functions have been registered");
  496. *ret_ptr= GEARMAN_NO_REGISTERED_FUNCTIONS;
  497. return NULL;
  498. }
  499. for (worker->con= (&worker->universal)->con_list; worker->con;
  500. worker->con= worker->con->next)
  501. {
  502. /* If the connection to the job server is not active, start it. */
  503. if (worker->con->fd == -1)
  504. {
  505. for (worker->function= worker->function_list;
  506. worker->function;
  507. worker->function= worker->function->next)
  508. {
  509. case GEARMAN_WORKER_STATE_CONNECT:
  510. *ret_ptr= gearman_connection_send(worker->con, &(worker->function->packet),
  511. true);
  512. if (*ret_ptr != GEARMAN_SUCCESS)
  513. {
  514. if (*ret_ptr == GEARMAN_IO_WAIT)
  515. {
  516. worker->state= GEARMAN_WORKER_STATE_CONNECT;
  517. }
  518. else if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT || *ret_ptr == GEARMAN_LOST_CONNECTION)
  519. {
  520. break;
  521. }
  522. return NULL;
  523. }
  524. }
  525. if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT)
  526. continue;
  527. }
  528. case GEARMAN_WORKER_STATE_GRAB_JOB_SEND:
  529. if (worker->con->fd == -1)
  530. continue;
  531. *ret_ptr= gearman_connection_send(worker->con, &(worker->grab_job), true);
  532. if (gearman_failed(*ret_ptr))
  533. {
  534. if (*ret_ptr == GEARMAN_IO_WAIT)
  535. worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
  536. else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  537. continue;
  538. return NULL;
  539. }
  540. if (not worker->job)
  541. {
  542. worker->job= gearman_job_create(worker, job);
  543. if (not worker->job)
  544. {
  545. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  546. return NULL;
  547. }
  548. }
  549. while (1)
  550. {
  551. case GEARMAN_WORKER_STATE_GRAB_JOB_RECV:
  552. assert(&(worker->job->assigned));
  553. (void)gearman_connection_recv(worker->con, &(worker->job->assigned), ret_ptr, true);
  554. if (gearman_failed(*ret_ptr))
  555. {
  556. if (*ret_ptr == GEARMAN_IO_WAIT)
  557. {
  558. worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_RECV;
  559. }
  560. else
  561. {
  562. gearman_job_free(worker->job);
  563. worker->job= NULL;
  564. if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  565. break;
  566. }
  567. return NULL;
  568. }
  569. if (worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN or
  570. worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL or
  571. worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
  572. {
  573. worker->job->options.assigned_in_use= true;
  574. worker->job->con= worker->con;
  575. worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
  576. job= worker->job;
  577. worker->job= NULL;
  578. return job;
  579. }
  580. if (worker->job->assigned.command == GEARMAN_COMMAND_NO_JOB)
  581. {
  582. gearman_packet_free(&(worker->job->assigned));
  583. break;
  584. }
  585. if (worker->job->assigned.command != GEARMAN_COMMAND_NOOP)
  586. {
  587. gearman_universal_set_error((&worker->universal), GEARMAN_UNEXPECTED_PACKET,
  588. "gearman_worker_grab_job",
  589. "unexpected packet:%s",
  590. gearman_command_info(worker->job->assigned.command)->name);
  591. gearman_packet_free(&(worker->job->assigned));
  592. gearman_job_free(worker->job);
  593. worker->job= NULL;
  594. *ret_ptr= GEARMAN_UNEXPECTED_PACKET;
  595. return NULL;
  596. }
  597. gearman_packet_free(&(worker->job->assigned));
  598. }
  599. }
  600. case GEARMAN_WORKER_STATE_PRE_SLEEP:
  601. for (worker->con= (&worker->universal)->con_list; worker->con;
  602. worker->con= worker->con->next)
  603. {
  604. if (worker->con->fd == -1)
  605. continue;
  606. *ret_ptr= gearman_connection_send(worker->con, &(worker->pre_sleep), true);
  607. if (*ret_ptr != GEARMAN_SUCCESS)
  608. {
  609. if (*ret_ptr == GEARMAN_IO_WAIT)
  610. worker->state= GEARMAN_WORKER_STATE_PRE_SLEEP;
  611. else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  612. continue;
  613. return NULL;
  614. }
  615. }
  616. worker->state= GEARMAN_WORKER_STATE_START;
  617. /* Set a watch on all active connections that we sent a PRE_SLEEP to. */
  618. active= 0;
  619. for (worker->con= (&worker->universal)->con_list; worker->con;
  620. worker->con= worker->con->next)
  621. {
  622. if (worker->con->fd == -1)
  623. continue;
  624. *ret_ptr= gearman_connection_set_events(worker->con, POLLIN);
  625. if (*ret_ptr != GEARMAN_SUCCESS)
  626. {
  627. return NULL;
  628. }
  629. active++;
  630. }
  631. if ((&worker->universal)->options.non_blocking)
  632. {
  633. *ret_ptr= GEARMAN_NO_JOBS;
  634. return NULL;
  635. }
  636. if (active == 0)
  637. {
  638. if ((&worker->universal)->timeout < 0)
  639. {
  640. usleep(GEARMAN_WORKER_WAIT_TIMEOUT * 1000);
  641. }
  642. else
  643. {
  644. if ((&worker->universal)->timeout > 0)
  645. {
  646. usleep(static_cast<unsigned int>((&worker->universal)->timeout) * 1000);
  647. }
  648. if (worker->options.timeout_return)
  649. {
  650. gearman_error((&worker->universal), GEARMAN_TIMEOUT, "timeout reached");
  651. *ret_ptr= GEARMAN_TIMEOUT;
  652. return NULL;
  653. }
  654. }
  655. }
  656. else
  657. {
  658. *ret_ptr= gearman_wait((&worker->universal));
  659. if (*ret_ptr != GEARMAN_SUCCESS && (*ret_ptr != GEARMAN_TIMEOUT ||
  660. worker->options.timeout_return))
  661. {
  662. return NULL;
  663. }
  664. }
  665. break;
  666. }
  667. }
  668. }
  669. void gearman_job_free_all(gearman_worker_st *worker)
  670. {
  671. while (worker->job_list)
  672. gearman_job_free(worker->job_list);
  673. }
  674. gearman_return_t gearman_worker_add_function(gearman_worker_st *worker,
  675. const char *function_name,
  676. uint32_t timeout,
  677. gearman_worker_fn *worker_fn,
  678. void *context)
  679. {
  680. return gearman_worker_add_map_function(worker, function_name, strlen(function_name), timeout, worker_fn, NULL, context);
  681. }
  682. gearman_return_t gearman_worker_add_map_function(gearman_worker_st *worker,
  683. const char *function_name,
  684. size_t functiona_name_length,
  685. uint32_t timeout,
  686. gearman_worker_fn *worker_fn,
  687. gearman_mapper_fn *mapper_fn,
  688. void *context)
  689. {
  690. if (not function_name)
  691. {
  692. gearman_universal_set_error((&worker->universal), GEARMAN_INVALID_ARGUMENT, AT,
  693. "function name not given");
  694. return GEARMAN_INVALID_ARGUMENT;
  695. }
  696. if (not worker_fn)
  697. {
  698. gearman_universal_set_error((&worker->universal), GEARMAN_INVALID_ARGUMENT, AT,
  699. "function not given");
  700. return GEARMAN_INVALID_ARGUMENT;
  701. }
  702. return _worker_function_create(worker,
  703. function_name, functiona_name_length,
  704. timeout,
  705. worker_fn,
  706. mapper_fn,
  707. context);
  708. }
  709. gearman_return_t gearman_worker_work(gearman_worker_st *worker)
  710. {
  711. switch (worker->work_state)
  712. {
  713. case GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB:
  714. {
  715. gearman_return_t ret;
  716. worker->work_job= gearman_worker_grab_job(worker, NULL, &ret);
  717. if (gearman_failed(ret))
  718. {
  719. return ret;
  720. }
  721. assert(worker->work_job);
  722. for (worker->work_function= worker->function_list;
  723. worker->work_function;
  724. worker->work_function= worker->work_function->next)
  725. {
  726. if (not strcmp(gearman_job_function_name(worker->work_job),
  727. worker->work_function->function_name))
  728. {
  729. break;
  730. }
  731. }
  732. if (not worker->work_function)
  733. {
  734. gearman_job_free(worker->work_job);
  735. worker->work_job= NULL;
  736. gearman_universal_set_error((&worker->universal), GEARMAN_INVALID_FUNCTION_NAME, AT, "function not found");
  737. return GEARMAN_INVALID_FUNCTION_NAME;
  738. }
  739. if (not worker->work_function->worker_fn)
  740. {
  741. gearman_job_free(worker->work_job);
  742. worker->work_job= NULL;
  743. gearman_universal_set_error((&worker->universal), GEARMAN_INVALID_FUNCTION_NAME, AT, "no callback function supplied");
  744. return GEARMAN_INVALID_FUNCTION_NAME;
  745. }
  746. worker->work_result_size= 0;
  747. }
  748. case GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION:
  749. {
  750. if (gearman_job_is_map(worker->work_job))
  751. {
  752. gearman_job_build_reducer(worker->work_job, worker->work_function->mapper_fn);
  753. }
  754. gearman_return_t ret= GEARMAN_WORK_FAIL;
  755. worker->work_result= worker->work_function->worker_fn(worker->work_job,
  756. static_cast<void *>(worker->work_function->context),
  757. &(worker->work_result_size), &ret);
  758. if (ret == GEARMAN_WORK_FAIL)
  759. {
  760. gearman_return_t rc= gearman_job_send_fail(worker->work_job);
  761. if (gearman_failed(rc))
  762. {
  763. if (rc == GEARMAN_LOST_CONNECTION)
  764. break;
  765. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
  766. return rc;
  767. }
  768. break;
  769. }
  770. if (gearman_failed(ret))
  771. {
  772. if (ret == GEARMAN_LOST_CONNECTION)
  773. break;
  774. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION;
  775. return ret;
  776. }
  777. }
  778. case GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE:
  779. {
  780. gearman_return_t ret= gearman_job_send_complete(worker->work_job, worker->work_result,
  781. worker->work_result_size);
  782. if (ret == GEARMAN_IO_WAIT)
  783. {
  784. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE;
  785. return ret;
  786. }
  787. if (worker->work_result)
  788. {
  789. if ((&worker->universal)->workload_free_fn == NULL)
  790. {
  791. free(worker->work_result);
  792. }
  793. else
  794. {
  795. (&worker->universal)->workload_free_fn(worker->work_result,
  796. (&worker->universal)->workload_free_context);
  797. }
  798. worker->work_result= NULL;
  799. }
  800. if (gearman_failed(ret))
  801. {
  802. if (ret == GEARMAN_LOST_CONNECTION)
  803. break;
  804. return ret;
  805. }
  806. }
  807. break;
  808. case GEARMAN_WORKER_WORK_UNIVERSAL_FAIL:
  809. {
  810. gearman_return_t ret;
  811. if (gearman_failed(ret= gearman_job_send_fail(worker->work_job)))
  812. {
  813. if (ret == GEARMAN_LOST_CONNECTION)
  814. break;
  815. return ret;
  816. }
  817. }
  818. break;
  819. }
  820. gearman_job_free(worker->work_job);
  821. worker->work_job= NULL;
  822. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB;
  823. return GEARMAN_SUCCESS;
  824. }
  825. void gearman_worker_set_reducer(gearman_worker_st *self, gearman_reducer_t reducer)
  826. {
  827. if (not self)
  828. return;
  829. self->reducer= reducer;
  830. }
  831. gearman_return_t gearman_worker_echo(gearman_worker_st *worker,
  832. const void *workload,
  833. size_t workload_size)
  834. {
  835. return gearman_echo((&worker->universal), workload, workload_size);
  836. }
  837. /*
  838. * Static Definitions
  839. */
  840. static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone)
  841. {
  842. if (not worker)
  843. {
  844. worker= new (std::nothrow) gearman_worker_st;
  845. if (worker == NULL)
  846. {
  847. gearman_perror((&worker->universal), "gearman_worker_st new");
  848. return NULL;
  849. }
  850. worker->options.allocated= true;
  851. }
  852. else
  853. {
  854. worker->options.allocated= false;
  855. }
  856. worker->options.non_blocking= false;
  857. worker->options.packet_init= false;
  858. worker->options.change= false;
  859. worker->options.grab_uniq= false;
  860. worker->options.timeout_return= false;
  861. worker->state= GEARMAN_WORKER_STATE_START;
  862. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB;
  863. worker->function_count= 0;
  864. worker->job_count= 0;
  865. worker->work_result_size= 0;
  866. worker->context= NULL;
  867. worker->con= NULL;
  868. worker->job= NULL;
  869. worker->job_list= NULL;
  870. worker->function= NULL;
  871. worker->function_list= NULL;
  872. worker->work_function= NULL;
  873. worker->work_result= NULL;
  874. worker->reducer= gearman_reducer_make(0);
  875. if (not is_clone)
  876. {
  877. gearman_universal_st *check;
  878. check= gearman_universal_create(&worker->universal, NULL);
  879. if (not check)
  880. {
  881. gearman_worker_free(worker);
  882. return NULL;
  883. }
  884. gearman_universal_set_timeout((&worker->universal), GEARMAN_WORKER_WAIT_TIMEOUT);
  885. }
  886. return worker;
  887. }
  888. static gearman_return_t _worker_packet_init(gearman_worker_st *worker)
  889. {
  890. gearman_return_t ret;
  891. ret= gearman_packet_create_args((&worker->universal), &(worker->grab_job),
  892. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_GRAB_JOB,
  893. NULL, NULL, 0);
  894. if (gearman_failed(ret))
  895. return ret;
  896. ret= gearman_packet_create_args((&worker->universal), &(worker->pre_sleep),
  897. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_PRE_SLEEP,
  898. NULL, NULL, 0);
  899. if (gearman_failed(ret))
  900. {
  901. gearman_packet_free(&(worker->grab_job));
  902. return ret;
  903. }
  904. worker->options.packet_init= true;
  905. return GEARMAN_SUCCESS;
  906. }
  907. static gearman_return_t _worker_add_server(const char *host, in_port_t port, void *context)
  908. {
  909. return gearman_worker_add_server(static_cast<gearman_worker_st *>(context), host, port);
  910. }
  911. static gearman_return_t _worker_function_create(gearman_worker_st *worker,
  912. const char *function_name,
  913. size_t function_length,
  914. uint32_t timeout,
  915. gearman_worker_fn *worker_function,
  916. gearman_mapper_fn *mapper_fn,
  917. void *context)
  918. {
  919. const void *args[2];
  920. size_t args_size[2];
  921. _worker_function_st *function= new (std::nothrow) _worker_function_st(worker_function, mapper_fn, context);
  922. if (not function)
  923. {
  924. gearman_perror((&worker->universal), "_worker_function_st::new()");
  925. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  926. }
  927. if (not function->init(function_name, function_length))
  928. {
  929. gearman_perror((&worker->universal), "_worker_function_st::init()");
  930. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  931. }
  932. gearman_return_t ret;
  933. if (timeout > 0)
  934. {
  935. char timeout_buffer[11];
  936. snprintf(timeout_buffer, sizeof(timeout_buffer), "%u", timeout);
  937. args[0]= function->name();
  938. args_size[0]= function->length() + 1;
  939. args[1]= timeout_buffer;
  940. args_size[1]= strlen(timeout_buffer);
  941. ret= gearman_packet_create_args((&worker->universal), &(function->packet),
  942. GEARMAN_MAGIC_REQUEST,
  943. GEARMAN_COMMAND_CAN_DO_TIMEOUT,
  944. args, args_size, 2);
  945. }
  946. else
  947. {
  948. args[0]= function->name();
  949. args_size[0]= function->length();
  950. ret= gearman_packet_create_args((&worker->universal), &(function->packet),
  951. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
  952. args, args_size, 1);
  953. }
  954. if (gearman_failed(ret))
  955. {
  956. delete function;
  957. return ret;
  958. }
  959. if (worker->function_list)
  960. worker->function_list->prev= function;
  961. function->next= worker->function_list;
  962. function->prev= NULL;
  963. worker->function_list= function;
  964. worker->function_count++;
  965. worker->options.change= true;
  966. return GEARMAN_SUCCESS;
  967. }
  968. static void _worker_function_free(gearman_worker_st *worker,
  969. struct _worker_function_st *function)
  970. {
  971. if (worker->function_list == function)
  972. worker->function_list= function->next;
  973. if (function->prev)
  974. function->prev->next= function->next;
  975. if (function->next)
  976. function->next->prev= function->prev;
  977. worker->function_count--;
  978. if (function->options.packet_in_use)
  979. gearman_packet_free(&(function->packet));
  980. delete function;
  981. }
  982. bool gearman_worker_set_server_option(gearman_worker_st *self, const char *option_arg, size_t option_arg_size)
  983. {
  984. gearman_string_t option= { option_arg, option_arg_size };
  985. return gearman_request_option(self->universal, option);
  986. }