worker.cc 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. #include "gear_config.h"
  39. #include <libgearman/common.h>
  40. #include "libgearman/uuid.hpp"
  41. #include <libgearman/function/base.hpp>
  42. #include <libgearman/function/make.hpp>
  43. #include "libgearman/assert.hpp"
  44. #include "libgearman/log.hpp"
  45. #include <cstdio>
  46. #include <cstdlib>
  47. #include <cstring>
  48. #include <memory>
  49. #include <unistd.h>
  50. #include <fcntl.h>
  51. #include <cerrno>
  52. /**
  53. * @addtogroup gearman_worker_static Static Worker Declarations
  54. * @ingroup gearman_worker
  55. * @{
  56. */
  57. static inline struct _worker_function_st *_function_exist(Worker* worker, const char *function_name, const size_t function_length)
  58. {
  59. struct _worker_function_st *function;
  60. for (function= worker->function_list; function;
  61. function= function->next)
  62. {
  63. if (function_length == function->function_length())
  64. {
  65. if (memcmp(function_name, function->function_name(), function_length) == 0)
  66. {
  67. break;
  68. }
  69. }
  70. }
  71. return function;
  72. }
  73. /**
  74. * Allocate a worker structure.
  75. */
  76. static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone);
  77. /**
  78. * Initialize common packets for later use.
  79. */
  80. static gearman_return_t _worker_packet_init(Worker*);
  81. /**
  82. * Callback function used when parsing server lists.
  83. */
  84. static gearman_return_t _worker_add_server(const char *host, in_port_t port, void *context);
  85. /**
  86. * Allocate and add a function to the register list.
  87. */
  88. static gearman_return_t _worker_function_create(Worker *worker,
  89. const char *function_name, const size_t function_length,
  90. const gearman_function_t &function,
  91. uint32_t timeout,
  92. void *context);
  93. /**
  94. * Free a function.
  95. */
  96. static void _worker_function_free(Worker* worker,
  97. struct _worker_function_st *function);
  98. /** @} */
  99. /*
  100. * Public Definitions
  101. */
  102. gearman_worker_st *gearman_worker_create(gearman_worker_st *worker_shell)
  103. {
  104. worker_shell= _worker_allocate(worker_shell, false);
  105. if (worker_shell and worker_shell->impl())
  106. {
  107. Worker* worker= worker_shell->impl();
  108. gearman_return_t ret;
  109. if (gearman_failed((ret= _worker_packet_init(worker))))
  110. {
  111. gearman_worker_free(worker_shell);
  112. return NULL;
  113. }
  114. }
  115. return worker_shell;
  116. }
  117. gearman_worker_st *gearman_worker_clone(gearman_worker_st *worker_shell,
  118. const gearman_worker_st *source_shell)
  119. {
  120. if (source_shell == NULL)
  121. {
  122. return gearman_worker_create(worker_shell);
  123. }
  124. worker_shell= _worker_allocate(worker_shell, true);
  125. if (worker_shell and worker_shell->impl())
  126. {
  127. Worker* worker= worker_shell->impl();
  128. Worker* source= source_shell->impl();
  129. worker->options.change= source->options.change;
  130. worker->options.grab_uniq= source->options.grab_uniq;
  131. worker->options.grab_all= source->options.grab_all;
  132. worker->options.timeout_return= source->options.timeout_return;
  133. worker->ssl(source->ssl());
  134. gearman_universal_clone(worker->universal, source->universal);
  135. if (gearman_failed(_worker_packet_init(worker)))
  136. {
  137. gearman_worker_free(worker_shell);
  138. return NULL;
  139. }
  140. for (struct _worker_function_st* function= source->function_list;
  141. function;
  142. function= function->next)
  143. {
  144. _worker_function_create(worker,
  145. function->function_name(), function->function_length(),
  146. function->function(),
  147. function->timeout(),
  148. function->context());
  149. }
  150. }
  151. return worker_shell;
  152. }
  153. void gearman_worker_free(gearman_worker_st *worker_shell)
  154. {
  155. #ifndef NDEBUG
  156. if (worker_shell)
  157. {
  158. assert(worker_shell->impl());
  159. }
  160. #endif
  161. if (worker_shell and worker_shell->impl())
  162. {
  163. Worker* worker= worker_shell->impl();
  164. gearman_worker_unregister_all(worker_shell);
  165. if (worker->options.packet_init)
  166. {
  167. gearman_packet_free(&worker->grab_job);
  168. gearman_packet_free(&worker->pre_sleep);
  169. }
  170. worker->job(NULL);
  171. if (worker->work_result)
  172. {
  173. gearman_free(worker->universal, worker->work_result);
  174. }
  175. while (worker->function_list)
  176. {
  177. _worker_function_free(worker, worker->function_list);
  178. }
  179. gearman_job_free_all(worker_shell);
  180. gearman_universal_free(worker->universal);
  181. delete worker;
  182. }
  183. }
  184. const char *gearman_worker_error(const gearman_worker_st *worker_shell)
  185. {
  186. if (worker_shell and worker_shell->impl())
  187. {
  188. return worker_shell->impl()->universal.error();
  189. }
  190. return NULL;
  191. }
  192. int gearman_worker_errno(gearman_worker_st *worker_shell)
  193. {
  194. if (worker_shell and worker_shell->impl())
  195. {
  196. return worker_shell->impl()->universal.last_errno();
  197. }
  198. return EINVAL;
  199. }
  200. gearman_worker_options_t gearman_worker_options(const gearman_worker_st *worker_shell)
  201. {
  202. if (worker_shell and worker_shell->impl())
  203. {
  204. Worker* worker= worker_shell->impl();
  205. int options;
  206. memset(&options, 0, sizeof(gearman_worker_options_t));
  207. if (gearman_is_allocated(worker_shell))
  208. {
  209. options|= int(GEARMAN_WORKER_ALLOCATED);
  210. }
  211. if (worker->options.non_blocking)
  212. options|= int(GEARMAN_WORKER_NON_BLOCKING);
  213. if (worker->options.packet_init)
  214. options|= int(GEARMAN_WORKER_PACKET_INIT);
  215. if (worker->options.change)
  216. options|= int(GEARMAN_WORKER_CHANGE);
  217. if (worker->options.grab_uniq)
  218. options|= int(GEARMAN_WORKER_GRAB_UNIQ);
  219. if (worker->options.grab_all)
  220. options|= int(GEARMAN_WORKER_GRAB_ALL);
  221. if (worker->options.timeout_return)
  222. options|= int(GEARMAN_WORKER_TIMEOUT_RETURN);
  223. if (worker->ssl())
  224. options|= int(GEARMAN_WORKER_SSL);
  225. if (worker->has_identifier())
  226. options|= int(GEARMAN_WORKER_IDENTIFIER);
  227. return gearman_worker_options_t(options);
  228. }
  229. return gearman_worker_options_t();
  230. }
  231. void gearman_worker_set_options(gearman_worker_st *worker,
  232. gearman_worker_options_t options)
  233. {
  234. if (worker and worker->impl())
  235. {
  236. gearman_worker_options_t usable_options[]= {
  237. GEARMAN_WORKER_NON_BLOCKING,
  238. GEARMAN_WORKER_GRAB_UNIQ,
  239. GEARMAN_WORKER_GRAB_ALL,
  240. GEARMAN_WORKER_TIMEOUT_RETURN,
  241. GEARMAN_WORKER_SSL,
  242. GEARMAN_WORKER_IDENTIFIER,
  243. GEARMAN_WORKER_MAX
  244. };
  245. for (gearman_worker_options_t* ptr= usable_options; *ptr != GEARMAN_WORKER_MAX ; ++ptr)
  246. {
  247. if (options & *ptr)
  248. {
  249. gearman_worker_add_options(worker, *ptr);
  250. }
  251. else
  252. {
  253. gearman_worker_remove_options(worker, *ptr);
  254. }
  255. }
  256. }
  257. }
  258. void gearman_worker_add_options(gearman_worker_st *worker_shell,
  259. gearman_worker_options_t options)
  260. {
  261. if (worker_shell and worker_shell->impl())
  262. {
  263. Worker* worker= worker_shell->impl();
  264. if (options & GEARMAN_WORKER_NON_BLOCKING)
  265. {
  266. gearman_universal_add_options(worker->universal, GEARMAN_UNIVERSAL_NON_BLOCKING);
  267. worker->options.non_blocking= true;
  268. }
  269. if (options & GEARMAN_WORKER_GRAB_UNIQ)
  270. {
  271. worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_UNIQ;
  272. gearman_return_t rc= gearman_packet_pack_header(&(worker->grab_job));
  273. (void)(rc);
  274. assert(gearman_success(rc));
  275. worker->options.grab_uniq= true;
  276. }
  277. if (options & GEARMAN_WORKER_GRAB_ALL)
  278. {
  279. worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_ALL;
  280. gearman_return_t rc= gearman_packet_pack_header(&(worker->grab_job));
  281. (void)(rc);
  282. assert(gearman_success(rc));
  283. worker->options.grab_all= true;
  284. }
  285. if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
  286. {
  287. worker->options.timeout_return= true;
  288. }
  289. if (options & GEARMAN_WORKER_SSL)
  290. {
  291. worker->ssl(true);
  292. }
  293. if (options & GEARMAN_WORKER_IDENTIFIER)
  294. {
  295. char uuid_buffer[GEARMAN_MAX_IDENTIFIER];
  296. size_t length= GEARMAN_MAX_IDENTIFIER;
  297. safe_uuid_generate(uuid_buffer, length);
  298. worker->universal.identifier(uuid_buffer, length);
  299. }
  300. }
  301. }
  302. void gearman_worker_remove_options(gearman_worker_st *worker_shell,
  303. gearman_worker_options_t options)
  304. {
  305. if (worker_shell and worker_shell->impl())
  306. {
  307. Worker* worker= worker_shell->impl();
  308. if (options & GEARMAN_WORKER_NON_BLOCKING)
  309. {
  310. gearman_universal_remove_options(worker->universal, GEARMAN_UNIVERSAL_NON_BLOCKING);
  311. worker->options.non_blocking= false;
  312. }
  313. if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
  314. {
  315. worker->options.timeout_return= false;
  316. gearman_universal_set_timeout(worker->universal, GEARMAN_WORKER_WAIT_TIMEOUT);
  317. }
  318. if (options & GEARMAN_WORKER_GRAB_UNIQ)
  319. {
  320. worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
  321. (void)gearman_packet_pack_header(&(worker->grab_job));
  322. worker->options.grab_uniq= false;
  323. }
  324. if (options & GEARMAN_WORKER_GRAB_ALL)
  325. {
  326. worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
  327. (void)gearman_packet_pack_header(&(worker->grab_job));
  328. worker->options.grab_all= false;
  329. }
  330. if (options & GEARMAN_WORKER_IDENTIFIER)
  331. {
  332. worker->universal.identifier(NULL, 0);
  333. }
  334. }
  335. }
  336. int gearman_worker_timeout(gearman_worker_st *worker)
  337. {
  338. if (worker and worker->impl())
  339. {
  340. return gearman_universal_timeout(worker->impl()->universal);
  341. }
  342. return 0;
  343. }
  344. void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout)
  345. {
  346. if (worker and worker->impl())
  347. {
  348. gearman_worker_add_options(worker, GEARMAN_WORKER_TIMEOUT_RETURN);
  349. gearman_universal_set_timeout(worker->impl()->universal, timeout);
  350. }
  351. }
  352. void *gearman_worker_context(const gearman_worker_st *worker)
  353. {
  354. if (worker and worker->impl())
  355. {
  356. return worker->impl()->context;
  357. }
  358. return NULL;
  359. }
  360. void gearman_worker_set_context(gearman_worker_st *worker, void *context)
  361. {
  362. if (worker and worker->impl())
  363. {
  364. worker->impl()->context= context;
  365. }
  366. }
  367. void gearman_worker_set_log_fn(gearman_worker_st *worker,
  368. gearman_log_fn *function, void *context,
  369. gearman_verbose_t verbose)
  370. {
  371. if (worker and worker->impl())
  372. {
  373. gearman_set_log_fn(worker->impl()->universal, function, context, verbose);
  374. }
  375. }
  376. void gearman_worker_set_workload_malloc_fn(gearman_worker_st *worker,
  377. gearman_malloc_fn *function,
  378. void *context)
  379. {
  380. if (worker and worker->impl())
  381. {
  382. gearman_set_workload_malloc_fn(worker->impl()->universal, function, context);
  383. }
  384. }
  385. void gearman_worker_set_workload_free_fn(gearman_worker_st *worker,
  386. gearman_free_fn *function,
  387. void *context)
  388. {
  389. if (worker and worker->impl())
  390. {
  391. gearman_set_workload_free_fn(worker->impl()->universal, function, context);
  392. }
  393. }
  394. gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
  395. const char *host, in_port_t port)
  396. {
  397. if (worker and worker->impl())
  398. {
  399. if (gearman_connection_create(worker->impl()->universal, host, port) == NULL)
  400. {
  401. return gearman_universal_error_code(worker->impl()->universal);
  402. }
  403. return GEARMAN_SUCCESS;
  404. }
  405. return GEARMAN_INVALID_ARGUMENT;
  406. }
  407. gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker, const char *servers)
  408. {
  409. if (worker and worker->impl())
  410. {
  411. return gearman_parse_servers(servers, _worker_add_server, worker);
  412. }
  413. return GEARMAN_INVALID_ARGUMENT;
  414. }
  415. void gearman_worker_remove_servers(gearman_worker_st *worker)
  416. {
  417. if (worker and worker->impl())
  418. {
  419. gearman_free_all_cons(worker->impl()->universal);
  420. }
  421. }
  422. gearman_return_t gearman_worker_wait(gearman_worker_st *worker)
  423. {
  424. if (worker and worker->impl())
  425. {
  426. return gearman_wait(worker->impl()->universal);
  427. }
  428. return GEARMAN_INVALID_ARGUMENT;
  429. }
  430. gearman_return_t gearman_worker_register(gearman_worker_st *worker_shell,
  431. const char *function_name,
  432. uint32_t timeout)
  433. {
  434. if (worker_shell and worker_shell->impl())
  435. {
  436. Worker* worker= worker_shell->impl();
  437. gearman_function_t null_func= gearman_function_create_null();
  438. return _worker_function_create(worker, function_name, strlen(function_name), null_func, timeout, NULL);
  439. }
  440. return GEARMAN_INVALID_ARGUMENT;
  441. }
  442. bool gearman_worker_function_exist(gearman_worker_st *worker_shell,
  443. const char *function_name,
  444. size_t function_length)
  445. {
  446. if (worker_shell and worker_shell->impl())
  447. {
  448. struct _worker_function_st *function;
  449. Worker* worker= worker_shell->impl();
  450. function= _function_exist(worker, function_name, function_length);
  451. return (function && function->options.remove == false) ? true : false;
  452. }
  453. return false;
  454. }
  455. static inline gearman_return_t _worker_unregister(gearman_worker_st *worker_shell,
  456. const char *function_name, size_t function_length)
  457. {
  458. if (worker_shell and worker_shell->impl())
  459. {
  460. Worker* worker= worker_shell->impl();
  461. _worker_function_st *function= _function_exist(worker, function_name, function_length);
  462. if (function == NULL or function->options.remove)
  463. {
  464. return GEARMAN_NO_REGISTERED_FUNCTION;
  465. }
  466. if (function->options.packet_in_use)
  467. {
  468. gearman_packet_free(&(function->packet()));
  469. function->options.packet_in_use= false;
  470. }
  471. const void *args[1];
  472. size_t args_size[1];
  473. args[0]= function->name();
  474. args_size[0]= function->length();
  475. gearman_return_t ret= gearman_packet_create_args(worker->universal, function->packet(),
  476. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CANT_DO,
  477. args, args_size, 1);
  478. if (gearman_failed(ret))
  479. {
  480. function->options.packet_in_use= false;
  481. return ret;
  482. }
  483. function->options.packet_in_use= true;
  484. function->options.change= true;
  485. function->options.remove= true;
  486. worker->options.change= true;
  487. return GEARMAN_SUCCESS;
  488. }
  489. return GEARMAN_INVALID_ARGUMENT;
  490. }
  491. gearman_return_t gearman_worker_unregister(gearman_worker_st *worker,
  492. const char *function_name)
  493. {
  494. return _worker_unregister(worker, function_name, strlen(function_name));
  495. }
  496. gearman_return_t gearman_worker_unregister_all(gearman_worker_st *worker_shell)
  497. {
  498. if (worker_shell and worker_shell->impl())
  499. {
  500. Worker* worker= worker_shell->impl();
  501. struct _worker_function_st *function;
  502. uint32_t count= 0;
  503. if (worker->function_list == NULL)
  504. {
  505. return GEARMAN_NO_REGISTERED_FUNCTIONS;
  506. }
  507. /* Lets find out if we have any functions left that are valid */
  508. for (function= worker->function_list; function;
  509. function= function->next)
  510. {
  511. if (function->options.remove == false)
  512. {
  513. count++;
  514. }
  515. }
  516. if (count == 0)
  517. {
  518. return GEARMAN_NO_REGISTERED_FUNCTIONS;
  519. }
  520. gearman_packet_free(&(worker->function_list->packet()));
  521. gearman_return_t ret= gearman_packet_create_args(worker->universal,
  522. worker->function_list->packet(),
  523. GEARMAN_MAGIC_REQUEST,
  524. GEARMAN_COMMAND_RESET_ABILITIES,
  525. NULL, NULL, 0);
  526. if (gearman_failed(ret))
  527. {
  528. worker->function_list->options.packet_in_use= false;
  529. return ret;
  530. }
  531. while (worker->function_list->next)
  532. {
  533. _worker_function_free(worker, worker->function_list->next);
  534. }
  535. worker->function_list->options.change= true;
  536. worker->function_list->options.remove= true;
  537. worker->options.change= true;
  538. return GEARMAN_SUCCESS;
  539. }
  540. return GEARMAN_INVALID_ARGUMENT;
  541. }
  542. #if __GNUC__ >= 7
  543. #pragma GCC diagnostic warning "-Wimplicit-fallthrough"
  544. #endif
  545. gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker_shell,
  546. gearman_job_st *job,
  547. gearman_return_t *ret_ptr)
  548. {
  549. if (worker_shell and worker_shell->impl())
  550. {
  551. Worker* worker= worker_shell->impl();
  552. struct _worker_function_st *function;
  553. uint32_t active;
  554. bool no_job= false;
  555. if (worker->in_work() == false)
  556. {
  557. worker->universal.reset_error();
  558. }
  559. gearman_return_t unused;
  560. if (ret_ptr == NULL)
  561. {
  562. ret_ptr= &unused;
  563. }
  564. *ret_ptr= GEARMAN_MAX_RETURN;
  565. if (worker->universal.con_list == NULL)
  566. {
  567. *ret_ptr= GEARMAN_NO_SERVERS;
  568. return NULL;
  569. }
  570. while (1)
  571. {
  572. switch (worker->state)
  573. {
  574. case GEARMAN_WORKER_STATE_START:
  575. /* If there are any new functions changes, send them now. */
  576. if (worker->options.change)
  577. {
  578. worker->function= worker->function_list;
  579. while (worker->function)
  580. {
  581. if (not (worker->function->options.change))
  582. {
  583. worker->function= worker->function->next;
  584. continue;
  585. }
  586. for (worker->con= (&worker->universal)->con_list; worker->con;
  587. worker->con= worker->con->next_connection())
  588. {
  589. if (worker->con->socket_descriptor_is_valid() == false)
  590. {
  591. continue;
  592. }
  593. case GEARMAN_WORKER_STATE_FUNCTION_SEND:
  594. *ret_ptr= worker->con->send_packet(worker->function->packet(), true);
  595. if (gearman_failed(*ret_ptr))
  596. {
  597. if (*ret_ptr == GEARMAN_IO_WAIT)
  598. {
  599. worker->state= GEARMAN_WORKER_STATE_FUNCTION_SEND;
  600. }
  601. else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  602. {
  603. continue;
  604. }
  605. assert(*ret_ptr != GEARMAN_MAX_RETURN);
  606. return NULL;
  607. }
  608. }
  609. if (worker->function->options.remove)
  610. {
  611. function= worker->function->prev;
  612. _worker_function_free(worker, worker->function);
  613. if (function == NULL)
  614. {
  615. worker->function= worker->function_list;
  616. }
  617. else
  618. {
  619. worker->function= function;
  620. }
  621. }
  622. else
  623. {
  624. worker->function->options.change= false;
  625. worker->function= worker->function->next;
  626. }
  627. }
  628. worker->options.change= false;
  629. }
  630. if (not worker->function_list)
  631. {
  632. gearman_error(worker->universal, GEARMAN_NO_REGISTERED_FUNCTIONS, "no functions have been registered");
  633. *ret_ptr= GEARMAN_NO_REGISTERED_FUNCTIONS;
  634. return NULL;
  635. }
  636. for (worker->con= (&worker->universal)->con_list; worker->con;
  637. worker->con= worker->con->next_connection())
  638. {
  639. /* If the connection to the job server is not active, start it. */
  640. if (worker->con->socket_descriptor_is_valid() == false)
  641. {
  642. for (worker->function= worker->function_list;
  643. worker->function;
  644. worker->function= worker->function->next)
  645. {
  646. case GEARMAN_WORKER_STATE_CONNECT:
  647. *ret_ptr= worker->con->send_packet(worker->function->packet(), true);
  648. if (gearman_failed(*ret_ptr))
  649. {
  650. if (*ret_ptr == GEARMAN_IO_WAIT)
  651. {
  652. worker->state= GEARMAN_WORKER_STATE_CONNECT;
  653. }
  654. else if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT or *ret_ptr == GEARMAN_LOST_CONNECTION)
  655. {
  656. break;
  657. }
  658. assert(*ret_ptr != GEARMAN_MAX_RETURN);
  659. return NULL;
  660. }
  661. }
  662. if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT)
  663. {
  664. continue;
  665. }
  666. }
  667. /* fall-thru */
  668. case GEARMAN_WORKER_STATE_GRAB_JOB_SEND:
  669. if (worker->con->socket_descriptor_is_valid() == false)
  670. {
  671. continue;
  672. }
  673. *ret_ptr= worker->con->send_packet(worker->grab_job, true);
  674. if (gearman_failed(*ret_ptr))
  675. {
  676. if (*ret_ptr == GEARMAN_IO_WAIT)
  677. {
  678. worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
  679. }
  680. else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  681. {
  682. continue;
  683. }
  684. assert(*ret_ptr != GEARMAN_MAX_RETURN);
  685. return NULL;
  686. }
  687. if (worker->job() == NULL)
  688. {
  689. assert(job == NULL);
  690. worker->job(gearman_job_create(worker, job));
  691. if (worker->job() == NULL)
  692. {
  693. *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  694. return NULL;
  695. }
  696. assert(worker->job()->impl());
  697. }
  698. while (1)
  699. {
  700. /* fall-thru */
  701. case GEARMAN_WORKER_STATE_GRAB_JOB_RECV:
  702. assert(worker);
  703. assert(worker->job());
  704. assert(worker->job()->impl());
  705. (void)worker->con->receiving(worker->job()->impl()->assigned, *ret_ptr, true);
  706. if (gearman_failed(*ret_ptr))
  707. {
  708. if (*ret_ptr == GEARMAN_IO_WAIT)
  709. {
  710. worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_RECV;
  711. }
  712. else
  713. {
  714. worker->job(NULL);
  715. if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  716. {
  717. break;
  718. }
  719. }
  720. assert(*ret_ptr != GEARMAN_MAX_RETURN);
  721. return NULL;
  722. }
  723. if (worker->job()->impl()->assigned.command == GEARMAN_COMMAND_NOOP)
  724. {
  725. gearman_log_debug(worker->universal, "Received NOOP");
  726. }
  727. if (worker->job()->impl()->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN or
  728. worker->job()->impl()->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL or
  729. worker->job()->impl()->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
  730. {
  731. worker->job()->impl()->options.assigned_in_use= true;
  732. worker->job()->impl()->con= worker->con;
  733. worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
  734. job= worker->take_job();
  735. assert(*ret_ptr != GEARMAN_MAX_RETURN);
  736. return job;
  737. }
  738. if (worker->job()->impl()->assigned.command == GEARMAN_COMMAND_NO_JOB or
  739. worker->job()->impl()->assigned.command == GEARMAN_COMMAND_OPTION_RES)
  740. {
  741. no_job= true;
  742. gearman_packet_free(&(worker->job()->impl()->assigned));
  743. break;
  744. }
  745. if (worker->job()->impl()->assigned.command != GEARMAN_COMMAND_NOOP)
  746. {
  747. gearman_universal_set_error(worker->universal, GEARMAN_UNEXPECTED_PACKET, GEARMAN_AT,
  748. "unexpected packet:%s",
  749. gearman_command_info(worker->job()->impl()->assigned.command)->name);
  750. gearman_packet_free(&(worker->job()->impl()->assigned));
  751. worker->job(NULL);
  752. *ret_ptr= GEARMAN_UNEXPECTED_PACKET;
  753. return NULL;
  754. }
  755. gearman_packet_free(&(worker->job()->impl()->assigned));
  756. }
  757. }
  758. if (worker->in_work() == false and no_job)
  759. {
  760. *ret_ptr= GEARMAN_NO_JOBS;
  761. break;
  762. }
  763. /* fall-thru */
  764. case GEARMAN_WORKER_STATE_PRE_SLEEP:
  765. for (worker->con= (&worker->universal)->con_list; worker->con;
  766. worker->con= worker->con->next_connection())
  767. {
  768. if (worker->con->socket_descriptor_is_valid() == false)
  769. {
  770. continue;
  771. }
  772. *ret_ptr= worker->con->send_packet(worker->pre_sleep, true);
  773. if (gearman_failed(*ret_ptr))
  774. {
  775. if (*ret_ptr == GEARMAN_IO_WAIT)
  776. {
  777. worker->state= GEARMAN_WORKER_STATE_PRE_SLEEP;
  778. }
  779. else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
  780. {
  781. continue;
  782. }
  783. assert(*ret_ptr != GEARMAN_MAX_RETURN);
  784. return NULL;
  785. }
  786. }
  787. worker->state= GEARMAN_WORKER_STATE_START;
  788. /* Set a watch on all active connections that we sent a PRE_SLEEP to. */
  789. active= 0;
  790. for (worker->con= worker->universal.con_list; worker->con; worker->con= worker->con->next_connection())
  791. {
  792. if (worker->con->socket_descriptor_is_valid())
  793. {
  794. worker->con->set_events(POLLIN);
  795. active++;
  796. }
  797. }
  798. if ((&worker->universal)->options.non_blocking)
  799. {
  800. *ret_ptr= GEARMAN_NO_JOBS;
  801. return NULL;
  802. }
  803. if (active == 0)
  804. {
  805. if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT)
  806. {
  807. return NULL;
  808. }
  809. else if (worker->universal.timeout < 0)
  810. {
  811. gearman_nap(GEARMAN_WORKER_WAIT_TIMEOUT);
  812. }
  813. else
  814. {
  815. if (worker->universal.timeout > 0)
  816. {
  817. gearman_nap(worker->universal);
  818. }
  819. if (worker->options.timeout_return)
  820. {
  821. *ret_ptr= gearman_error(worker->universal, GEARMAN_TIMEOUT, "Option timeout return reached");
  822. return NULL;
  823. }
  824. }
  825. }
  826. else
  827. {
  828. *ret_ptr= gearman_wait(worker->universal);
  829. if (gearman_failed(*ret_ptr) and (*ret_ptr != GEARMAN_TIMEOUT or worker->options.timeout_return))
  830. {
  831. assert(*ret_ptr != GEARMAN_MAX_RETURN);
  832. return NULL;
  833. }
  834. }
  835. break;
  836. }
  837. if (*ret_ptr == GEARMAN_NO_JOBS)
  838. {
  839. break;
  840. }
  841. }
  842. }
  843. assert(*ret_ptr != GEARMAN_MAX_RETURN);
  844. return NULL;
  845. }
  846. void gearman_job_free_all(gearman_worker_st *worker_shell)
  847. {
  848. if (worker_shell and worker_shell->impl())
  849. {
  850. Worker* worker= worker_shell->impl();
  851. while (worker->job_list)
  852. {
  853. gearman_job_free(worker->job_list->shell());
  854. }
  855. }
  856. }
  857. gearman_return_t gearman_worker_add_function(gearman_worker_st *worker_shell,
  858. const char *function_name,
  859. uint32_t timeout,
  860. gearman_worker_fn *worker_fn,
  861. void *context)
  862. {
  863. if (worker_shell and worker_shell->impl())
  864. {
  865. Worker* worker= worker_shell->impl();
  866. if (function_name == NULL)
  867. {
  868. return gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name not given");
  869. }
  870. if (worker_fn == NULL)
  871. {
  872. return gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function not given");
  873. }
  874. gearman_function_t local= gearman_function_create_v1(worker_fn);
  875. return _worker_function_create(worker,
  876. function_name, strlen(function_name),
  877. local,
  878. timeout,
  879. context);
  880. }
  881. return GEARMAN_INVALID_ARGUMENT;
  882. }
  883. gearman_return_t gearman_worker_define_function(gearman_worker_st *worker_shell,
  884. const char *function_name, const size_t function_name_length,
  885. const gearman_function_t function,
  886. const uint32_t timeout,
  887. void *context)
  888. {
  889. if (worker_shell and worker_shell->impl())
  890. {
  891. Worker* worker= worker_shell->impl();
  892. if (function_name == NULL or function_name_length == 0)
  893. {
  894. return gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name not given");
  895. }
  896. return _worker_function_create(worker,
  897. function_name, function_name_length,
  898. function,
  899. timeout,
  900. context);
  901. }
  902. return GEARMAN_INVALID_ARGUMENT;
  903. }
  904. void gearman_worker_reset_error(Worker& worker)
  905. {
  906. universal_reset_error(worker.universal);
  907. }
  908. void gearman_worker_reset_error(gearman_worker_st *worker)
  909. {
  910. if (worker and worker->impl())
  911. {
  912. universal_reset_error(worker->impl()->universal);
  913. }
  914. }
  915. #if __GNUC__ >= 7
  916. #pragma GCC diagnostic warning "-Wimplicit-fallthrough"
  917. #endif
  918. gearman_return_t gearman_worker_work(gearman_worker_st *worker_shell)
  919. {
  920. if (worker_shell and worker_shell->impl())
  921. {
  922. Worker* worker= worker_shell->impl();
  923. bool shutdown= false;
  924. universal_reset_error(worker->universal);
  925. switch (worker->work_state)
  926. {
  927. case GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB:
  928. {
  929. worker->in_work(true);
  930. gearman_return_t ret;
  931. worker->work_job(gearman_worker_grab_job(worker->shell(), NULL, &ret));
  932. worker->in_work(false);
  933. if (gearman_failed(ret))
  934. {
  935. if (ret == GEARMAN_COULD_NOT_CONNECT)
  936. {
  937. worker->universal.reset();
  938. }
  939. return ret;
  940. }
  941. assert(worker->has_work_job());
  942. for (worker->work_function= worker->function_list;
  943. worker->work_function;
  944. worker->work_function= worker->work_function->next)
  945. {
  946. if (strcmp(gearman_job_function_name(worker->work_job()), worker->work_function->name()) == 0)
  947. {
  948. break;
  949. }
  950. }
  951. if (not worker->work_function)
  952. {
  953. worker->work_job(NULL);
  954. return gearman_error(worker->universal, GEARMAN_INVALID_FUNCTION_NAME, "Function not found");
  955. }
  956. if (not worker->work_function->has_callback())
  957. {
  958. worker->work_job(NULL);
  959. return gearman_error(worker->universal, GEARMAN_INVALID_FUNCTION_NAME, "Neither a gearman_worker_fn, or gearman_function_fn callback was supplied");
  960. }
  961. worker->work_result_size= 0;
  962. }
  963. /* fall-thru */
  964. case GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION:
  965. {
  966. switch (worker->work_function->callback(worker->work_job(),
  967. static_cast<void *>(worker->work_function->context())))
  968. {
  969. case GEARMAN_FUNCTION_FATAL:
  970. if (gearman_job_send_fail_fin(worker->work_job()->impl()) == GEARMAN_LOST_CONNECTION) // If we fail this, we have no connection, @note this causes us to lose the current error
  971. {
  972. worker->work_job()->impl()->_error_code= GEARMAN_LOST_CONNECTION;
  973. break;
  974. }
  975. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
  976. return worker->work_job()->impl()->_error_code;
  977. case GEARMAN_FUNCTION_ERROR: // retry
  978. worker->universal.reset();
  979. worker->work_job()->impl()->_error_code= GEARMAN_LOST_CONNECTION;
  980. break;
  981. case GEARMAN_FUNCTION_SHUTDOWN:
  982. shutdown= true;
  983. case GEARMAN_FUNCTION_SUCCESS:
  984. break;
  985. }
  986. if (worker->work_job()->impl()->_error_code == GEARMAN_LOST_CONNECTION)
  987. {
  988. break;
  989. }
  990. }
  991. /* fall-thru */
  992. case GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE:
  993. {
  994. worker->work_job()->impl()->_error_code= gearman_job_send_complete_fin(worker->work_job()->impl(),
  995. worker->work_result, worker->work_result_size);
  996. if (worker->work_job()->impl()->_error_code == GEARMAN_IO_WAIT)
  997. {
  998. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE;
  999. return gearman_error(worker->universal, worker->work_job()->impl()->_error_code,
  1000. "A failure occurred after worker had successful complete, unless gearman_job_send_complete() was called directly by worker, client has not been informed of success.");
  1001. }
  1002. if (worker->work_result)
  1003. {
  1004. gearman_free(worker->universal, worker->work_result);
  1005. worker->work_result= NULL;
  1006. }
  1007. // If we lost the connection, we retry the work, otherwise we error
  1008. if (worker->work_job()->impl()->_error_code == GEARMAN_LOST_CONNECTION)
  1009. {
  1010. break;
  1011. }
  1012. else if (worker->work_job()->impl()->_error_code == GEARMAN_SHUTDOWN)
  1013. { }
  1014. else if (gearman_failed(worker->work_job()->impl()->_error_code))
  1015. {
  1016. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
  1017. return worker->work_job()->impl()->_error_code;
  1018. }
  1019. }
  1020. break;
  1021. case GEARMAN_WORKER_WORK_UNIVERSAL_FAIL:
  1022. {
  1023. if (gearman_failed(worker->work_job()->impl()->_error_code= gearman_job_send_fail_fin(worker->work_job()->impl())))
  1024. {
  1025. if (worker->work_job()->impl()->_error_code == GEARMAN_LOST_CONNECTION)
  1026. {
  1027. break;
  1028. }
  1029. return worker->work_job()->impl()->_error_code;
  1030. }
  1031. }
  1032. break;
  1033. }
  1034. worker->work_job(NULL);
  1035. worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB;
  1036. if (shutdown)
  1037. {
  1038. return GEARMAN_SHUTDOWN;
  1039. }
  1040. return GEARMAN_SUCCESS;
  1041. }
  1042. return GEARMAN_INVALID_ARGUMENT;
  1043. }
  1044. gearman_return_t gearman_worker_echo(gearman_worker_st *worker,
  1045. const void *workload,
  1046. size_t workload_size)
  1047. {
  1048. if (worker and worker->impl())
  1049. {
  1050. return gearman_echo(worker->impl()->universal, workload, workload_size);
  1051. }
  1052. return GEARMAN_INVALID_ARGUMENT;
  1053. }
  1054. /*
  1055. * Static Definitions
  1056. */
  1057. static gearman_worker_st *_worker_allocate(gearman_worker_st *worker_shell, bool is_clone)
  1058. {
  1059. Worker *worker= new (std::nothrow) Worker(worker_shell);
  1060. if (worker)
  1061. {
  1062. if (is_clone == false)
  1063. {
  1064. if (getenv("GEARMAN_SERVERS"))
  1065. {
  1066. if (gearman_worker_add_servers(worker->shell(), getenv("GEARMAN_SERVERS")))
  1067. {
  1068. gearman_worker_free(worker->shell());
  1069. return NULL;
  1070. }
  1071. }
  1072. }
  1073. if (worker->universal.wakeup(true) == false)
  1074. {
  1075. delete worker;
  1076. return NULL;
  1077. }
  1078. return worker->shell();
  1079. }
  1080. #if defined(DEBUG) && DEBUG
  1081. perror("new Worker");
  1082. #endif
  1083. return NULL;
  1084. }
  1085. static gearman_return_t _worker_packet_init(Worker* worker)
  1086. {
  1087. gearman_return_t ret= gearman_packet_create_args(worker->universal, worker->grab_job,
  1088. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_GRAB_JOB_ALL,
  1089. NULL, NULL, 0);
  1090. if (gearman_failed(ret))
  1091. {
  1092. return ret;
  1093. }
  1094. ret= gearman_packet_create_args(worker->universal, worker->pre_sleep,
  1095. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_PRE_SLEEP,
  1096. NULL, NULL, 0);
  1097. if (gearman_failed(ret))
  1098. {
  1099. gearman_packet_free(&(worker->grab_job));
  1100. return ret;
  1101. }
  1102. worker->options.packet_init= true;
  1103. return GEARMAN_SUCCESS;
  1104. }
  1105. static gearman_return_t _worker_add_server(const char *host, in_port_t port, void *context)
  1106. {
  1107. return gearman_worker_add_server(static_cast<gearman_worker_st *>(context), host, port);
  1108. }
  1109. static gearman_return_t _worker_function_create(Worker *worker,
  1110. const char *function_name_, const size_t function_length_,
  1111. const gearman_function_t &function_arg,
  1112. uint32_t timeout,
  1113. void *context)
  1114. {
  1115. const void *args[2];
  1116. size_t args_size[2];
  1117. if (function_length_ == 0 or function_name_ == NULL or function_length_ > GEARMAN_FUNCTION_MAX_SIZE)
  1118. {
  1119. if (function_length_ > GEARMAN_FUNCTION_MAX_SIZE)
  1120. {
  1121. gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name longer then GEARMAN_MAX_FUNCTION_SIZE");
  1122. }
  1123. else
  1124. {
  1125. gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "invalid function");
  1126. }
  1127. return GEARMAN_INVALID_ARGUMENT;
  1128. }
  1129. _worker_function_st *function= make(worker->universal._namespace,
  1130. function_name_, function_length_,
  1131. function_arg, context, timeout);
  1132. if (function == NULL)
  1133. {
  1134. gearman_perror(worker->universal, errno, "_worker_function_st::new()");
  1135. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  1136. }
  1137. gearman_return_t ret;
  1138. if (timeout > 0)
  1139. {
  1140. char timeout_buffer[GEARMAN_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
  1141. snprintf(timeout_buffer, sizeof(timeout_buffer), "%u", timeout);
  1142. timeout_buffer[GEARMAN_MAXIMUM_INTEGER_DISPLAY_LENGTH]= 0;
  1143. args[0]= function->name();
  1144. args_size[0]= function->length() + 1;
  1145. args[1]= timeout_buffer;
  1146. args_size[1]= strlen(timeout_buffer);
  1147. ret= gearman_packet_create_args(worker->universal, function->packet(),
  1148. GEARMAN_MAGIC_REQUEST,
  1149. GEARMAN_COMMAND_CAN_DO_TIMEOUT,
  1150. args, args_size, 2);
  1151. }
  1152. else
  1153. {
  1154. args[0]= function->name();
  1155. args_size[0]= function->length();
  1156. ret= gearman_packet_create_args(worker->universal, function->packet(),
  1157. GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
  1158. args, args_size, 1);
  1159. }
  1160. if (gearman_failed(ret))
  1161. {
  1162. delete function;
  1163. return ret;
  1164. }
  1165. if (worker->function_list)
  1166. {
  1167. worker->function_list->prev= function;
  1168. }
  1169. function->next= worker->function_list;
  1170. function->prev= NULL;
  1171. worker->function_list= function;
  1172. worker->function_count++;
  1173. worker->options.change= true;
  1174. return GEARMAN_SUCCESS;
  1175. }
  1176. static void _worker_function_free(Worker* worker,
  1177. struct _worker_function_st *function)
  1178. {
  1179. if (worker->function_list == function)
  1180. {
  1181. worker->function_list= function->next;
  1182. }
  1183. if (function->prev)
  1184. {
  1185. function->prev->next= function->next;
  1186. }
  1187. if (function->next)
  1188. {
  1189. function->next->prev= function->prev;
  1190. }
  1191. worker->function_count--;
  1192. delete function;
  1193. }
  1194. gearman_return_t gearman_worker_set_memory_allocators(gearman_worker_st *worker,
  1195. gearman_malloc_fn *malloc_fn,
  1196. gearman_free_fn *free_fn,
  1197. gearman_realloc_fn *realloc_fn,
  1198. gearman_calloc_fn *calloc_fn,
  1199. void *context)
  1200. {
  1201. if (worker and worker->impl())
  1202. {
  1203. return gearman_set_memory_allocator(worker->impl()->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
  1204. }
  1205. return GEARMAN_INVALID_ARGUMENT;
  1206. }
  1207. bool gearman_worker_set_server_option(gearman_worker_st *worker_shell, const char *option_arg, size_t option_arg_size)
  1208. {
  1209. if (worker_shell and worker_shell->impl())
  1210. {
  1211. Worker* worker= worker_shell->impl();
  1212. gearman_string_t option= { option_arg, option_arg_size };
  1213. if (gearman_success(gearman_server_option(worker->universal, option)))
  1214. {
  1215. if (gearman_request_option(worker->universal, option))
  1216. {
  1217. return true;
  1218. }
  1219. }
  1220. }
  1221. return false;
  1222. }
  1223. void gearman_worker_set_namespace(gearman_worker_st *self, const char *namespace_key, size_t namespace_key_size)
  1224. {
  1225. if (self and self->impl())
  1226. {
  1227. gearman_universal_set_namespace(self->impl()->universal, namespace_key, namespace_key_size);
  1228. }
  1229. }
  1230. gearman_id_t gearman_worker_id(gearman_worker_st *self)
  1231. {
  1232. if (self == NULL)
  1233. {
  1234. gearman_id_t handle= { INVALID_SOCKET, INVALID_SOCKET };
  1235. return handle;
  1236. }
  1237. return gearman_universal_id(self->impl()->universal);
  1238. }
  1239. gearman_return_t gearman_worker_set_identifier(gearman_worker_st *worker,
  1240. const char *id, size_t id_size)
  1241. {
  1242. if (worker and worker->impl())
  1243. {
  1244. return gearman_set_identifier(worker->impl()->universal, id, id_size);
  1245. }
  1246. return GEARMAN_INVALID_ARGUMENT;
  1247. }
  1248. const char *gearman_worker_namespace(gearman_worker_st* worker)
  1249. {
  1250. if (worker and worker->impl())
  1251. {
  1252. return gearman_univeral_namespace(worker->impl()->universal);
  1253. }
  1254. return NULL;
  1255. }
  1256. gearman_job_st* Worker::take_job()
  1257. {
  1258. gearman_job_st* tmp= _job->shell();
  1259. _job= NULL;
  1260. return tmp;
  1261. }
  1262. gearman_job_st* Worker::job()
  1263. {
  1264. if (_job)
  1265. {
  1266. return _job->shell();
  1267. }
  1268. return NULL;
  1269. }
  1270. void Worker::job(gearman_job_st* job_)
  1271. {
  1272. if (job_)
  1273. {
  1274. assert(job_->impl());
  1275. assert(_job == NULL);
  1276. _job= job_->impl();
  1277. }
  1278. else if (_job)
  1279. {
  1280. gearman_job_free(_job->shell());
  1281. _job= NULL;
  1282. }
  1283. }
  1284. gearman_job_st* Worker::work_job()
  1285. {
  1286. assert(_work_job);
  1287. return _work_job->shell();
  1288. }
  1289. void Worker::work_job(gearman_job_st* work_job_)
  1290. {
  1291. if (work_job_)
  1292. {
  1293. assert(_work_job == NULL);
  1294. _work_job= work_job_->impl();
  1295. }
  1296. else if (_work_job)
  1297. {
  1298. gearman_job_free(_work_job->shell());
  1299. _work_job= NULL;
  1300. }
  1301. }