gearman.cc 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. #include "gear_config.h"
  39. #include <assert.h>
  40. #include <errno.h>
  41. #include <fcntl.h>
  42. #include <signal.h>
  43. #include <stdio.h>
  44. #include <stdlib.h>
  45. #include <string.h>
  46. #include <sys/stat.h>
  47. #include <sys/types.h>
  48. #include <sys/wait.h>
  49. #include <unistd.h>
  50. #include <iostream>
  51. #include <vector>
  52. #include <libgearman-1.0/gearman.h>
  53. #include "bin/arguments.h"
  54. #include "libgearman/client.hpp"
  55. #include "libgearman/worker.hpp"
  56. using namespace org::gearmand;
  57. #include "util/pidfile.hpp"
  58. #include "bin/error.h"
  59. using namespace gearman_client;
  60. using namespace datadifferential;
  61. #define GEARMAN_INITIAL_WORKLOAD_SIZE 8192
  62. struct worker_argument_t
  63. {
  64. Args &args;
  65. Function &function;
  66. worker_argument_t(Args &args_arg, Function &function_arg) :
  67. args(args_arg),
  68. function(function_arg)
  69. {
  70. }
  71. };
  72. /**
  73. * Function to run in client mode.
  74. */
  75. static void _client(Args &args);
  76. /**
  77. * Run client jobs.
  78. */
  79. static void _client_run(libgearman::Client& client, Args &args,
  80. const void *workload, size_t workload_size);
  81. static gearman_return_t _client_created(gearman_task_st *task);
  82. /**
  83. * Client data/complete callback function.
  84. */
  85. static gearman_return_t _client_data(gearman_task_st *task);
  86. /**
  87. * Client warning/exception callback function.
  88. */
  89. static gearman_return_t _client_warning(gearman_task_st *task);
  90. /**
  91. * Client status callback function.
  92. */
  93. static gearman_return_t _client_status(gearman_task_st *task);
  94. /**
  95. * Client fail callback function.
  96. */
  97. static gearman_return_t _client_fail(gearman_task_st *task);
  98. /**
  99. * Function to run in worker mode.
  100. */
  101. static void _worker(Args &args);
  102. /**
  103. * Callback function when worker gets a job.
  104. */
  105. static void *_worker_cb(gearman_job_st *job, void *context,
  106. size_t *result_size, gearman_return_t *ret_ptr);
  107. /**
  108. * Read workload chunk from a file descriptor and put into allocated memory.
  109. */
  110. static void _read_workload(int fd, Bytes& workload);
  111. /**
  112. * Print usage information.
  113. */
  114. static void usage(Args&, char *name);
  115. extern "C"
  116. {
  117. static void signal_setup()
  118. {
  119. if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
  120. {
  121. error::perror("signal");
  122. }
  123. }
  124. }
  125. int main(int argc, char *argv[])
  126. {
  127. Args args(argc, argv);
  128. bool close_stdio= false;
  129. if (args.usage())
  130. {
  131. usage(args, argv[0]);
  132. return EXIT_SUCCESS;
  133. }
  134. if (args.is_error())
  135. {
  136. usage(args, argv[0]);
  137. return EXIT_FAILURE;
  138. }
  139. signal_setup();
  140. if (args.daemon())
  141. {
  142. switch (fork())
  143. {
  144. case -1:
  145. error::perror("fork");
  146. return EXIT_FAILURE;
  147. case 0:
  148. break;
  149. default:
  150. return EXIT_SUCCESS;
  151. }
  152. if (setsid() == -1)
  153. {
  154. error::perror("setsid");
  155. return EXIT_FAILURE;
  156. }
  157. close_stdio= true;
  158. }
  159. if (close_stdio)
  160. {
  161. /* If we can't remap stdio, it should not a fatal error. */
  162. int fd= open("/dev/null", O_RDWR, 0);
  163. if (fd != -1)
  164. {
  165. if (dup2(fd, STDIN_FILENO) == -1)
  166. {
  167. error::perror("dup2");
  168. return EXIT_FAILURE;
  169. }
  170. if (dup2(fd, STDOUT_FILENO) == -1)
  171. {
  172. error::perror("dup2");
  173. return EXIT_FAILURE;
  174. }
  175. if (dup2(fd, STDERR_FILENO) == -1)
  176. {
  177. error::perror("dup2");
  178. return EXIT_FAILURE;
  179. }
  180. close(fd);
  181. }
  182. }
  183. util::Pidfile _pid_file(args.pid_file());
  184. if (not _pid_file.create())
  185. {
  186. error::perror(_pid_file.error_message().c_str());
  187. return EXIT_FAILURE;
  188. }
  189. if (args.worker())
  190. {
  191. _worker(args);
  192. }
  193. else
  194. {
  195. _client(args);
  196. }
  197. return args.error();
  198. }
  199. void _client(Args &args)
  200. {
  201. libgearman::Client client;
  202. Bytes workload;
  203. if (args.timeout() >= 0)
  204. {
  205. gearman_client_set_timeout(&client, args.timeout());
  206. }
  207. if (getenv("GEARMAN_SERVER"))
  208. {
  209. if (gearman_failed(gearman_client_add_servers(&client, getenv("GEARMAN_SERVER"))))
  210. {
  211. error::message("Error occurred while parsing GEARMAN_SERVER", &client);
  212. return;
  213. }
  214. }
  215. else if (gearman_failed(gearman_client_add_server(&client, args.host(), args.port())))
  216. {
  217. error::message("gearman_client_add_server", &client);
  218. return;
  219. }
  220. gearman_client_set_created_fn(&client, _client_created);
  221. gearman_client_set_data_fn(&client, _client_data);
  222. gearman_client_set_warning_fn(&client, _client_warning);
  223. gearman_client_set_status_fn(&client, _client_status);
  224. gearman_client_set_complete_fn(&client, _client_data);
  225. gearman_client_set_exception_fn(&client, _client_warning);
  226. gearman_client_set_fail_fn(&client, _client_fail);
  227. if (not args.arguments())
  228. {
  229. if (args.suppress_input())
  230. {
  231. _client_run(client, args, NULL, 0);
  232. }
  233. else if (args.job_per_newline())
  234. {
  235. workload.resize(GEARMAN_INITIAL_WORKLOAD_SIZE);
  236. while (1)
  237. {
  238. if (fgets(&workload[0], static_cast<int>(workload.size()), stdin) == NULL)
  239. {
  240. break;
  241. }
  242. if (args.strip_newline())
  243. {
  244. _client_run(client, args, &workload[0], strlen(&workload[0]) - 1);
  245. }
  246. else
  247. {
  248. _client_run(client, args, &workload[0], strlen(&workload[0]));
  249. }
  250. }
  251. }
  252. else
  253. {
  254. _read_workload(STDIN_FILENO, workload);
  255. _client_run(client, args, &workload[0], workload.size());
  256. }
  257. }
  258. else
  259. {
  260. for (size_t x= 0; args.argument(x) != NULL; x++)
  261. {
  262. _client_run(client, args, args.argument(x), strlen(args.argument(x)));
  263. }
  264. }
  265. }
  266. void _client_run(libgearman::Client& client, Args &args,
  267. const void *workload, size_t workload_size)
  268. {
  269. gearman_return_t ret;
  270. for (Function::vector::iterator iter= args.begin();
  271. iter != args.end();
  272. ++iter)
  273. {
  274. Function &function= *iter;
  275. /* This is a bit nasty, but all we have currently is multiple function
  276. calls. */
  277. if (args.background())
  278. {
  279. switch (args.priority())
  280. {
  281. case GEARMAN_JOB_PRIORITY_HIGH:
  282. (void)gearman_client_add_task_high_background(&client,
  283. NULL,
  284. &args,
  285. function.name(),
  286. args.unique(),
  287. workload,
  288. workload_size, &ret);
  289. break;
  290. case GEARMAN_JOB_PRIORITY_NORMAL:
  291. (void)gearman_client_add_task_background(&client,
  292. NULL,
  293. &args,
  294. function.name(),
  295. args.unique(),
  296. workload,
  297. workload_size, &ret);
  298. break;
  299. case GEARMAN_JOB_PRIORITY_LOW:
  300. (void)gearman_client_add_task_low_background(&client,
  301. NULL,
  302. &args,
  303. function.name(),
  304. args.unique(),
  305. workload,
  306. workload_size, &ret);
  307. break;
  308. case GEARMAN_JOB_PRIORITY_MAX:
  309. default:
  310. /* This should never happen. */
  311. ret= GEARMAN_UNKNOWN_STATE;
  312. break;
  313. }
  314. }
  315. else
  316. {
  317. switch (args.priority())
  318. {
  319. case GEARMAN_JOB_PRIORITY_HIGH:
  320. (void)gearman_client_add_task_high(&client,
  321. NULL,
  322. &args,
  323. function.name(),
  324. args.unique(),
  325. workload, workload_size, &ret);
  326. break;
  327. case GEARMAN_JOB_PRIORITY_NORMAL:
  328. (void)gearman_client_add_task(&client,
  329. NULL,
  330. &args,
  331. function.name(),
  332. args.unique(),
  333. workload,
  334. workload_size, &ret);
  335. break;
  336. case GEARMAN_JOB_PRIORITY_LOW:
  337. (void)gearman_client_add_task_low(&client,
  338. NULL,
  339. &args,
  340. function.name(),
  341. args.unique(),
  342. workload, workload_size, &ret);
  343. break;
  344. case GEARMAN_JOB_PRIORITY_MAX:
  345. default:
  346. /* This should never happen. */
  347. ret= GEARMAN_UNKNOWN_STATE;
  348. break;
  349. }
  350. }
  351. if (gearman_failed(ret))
  352. {
  353. error::message("gearman_client_add_task", &client);
  354. }
  355. }
  356. if (gearman_failed(gearman_client_run_tasks(&client)))
  357. {
  358. error::message("gearman_client_run_tasks", &client);
  359. }
  360. }
  361. static gearman_return_t _client_created(gearman_task_st *task)
  362. {
  363. const Args *args= static_cast<const Args*>(gearman_task_context(task));
  364. if (args->verbose())
  365. {
  366. fprintf(stdout, "Task created: %s\n", gearman_task_job_handle(task));
  367. }
  368. return GEARMAN_SUCCESS;
  369. }
  370. static gearman_return_t _client_data(gearman_task_st *task)
  371. {
  372. const Args *args= static_cast<const Args*>(gearman_task_context(task));
  373. if (args->prefix())
  374. {
  375. fprintf(stdout, "%s: ", gearman_task_function_name(task));
  376. fflush(stdout);
  377. }
  378. if (write(fileno(stdout), gearman_task_data(task), gearman_task_data_size(task)) == -1)
  379. {
  380. error::perror("write");
  381. return GEARMAN_ERRNO;
  382. }
  383. return GEARMAN_SUCCESS;
  384. }
  385. static gearman_return_t _client_warning(gearman_task_st *task)
  386. {
  387. const Args *args= static_cast<const Args*>(gearman_task_context(task));
  388. if (args->prefix())
  389. {
  390. fprintf(stderr, "%s: ", gearman_task_function_name(task));
  391. fflush(stderr);
  392. }
  393. if (write(fileno(stderr), gearman_task_data(task), gearman_task_data_size(task)) == -1)
  394. {
  395. error::perror("write");
  396. }
  397. return GEARMAN_SUCCESS;
  398. }
  399. static gearman_return_t _client_status(gearman_task_st *task)
  400. {
  401. const Args *args= static_cast<const Args*>(gearman_task_context(task));
  402. if (args->prefix())
  403. printf("%s: ", gearman_task_function_name(task));
  404. printf("%u%% Complete\n", (gearman_task_numerator(task) * 100) /
  405. gearman_task_denominator(task));
  406. return GEARMAN_SUCCESS;
  407. }
  408. static gearman_return_t _client_fail(gearman_task_st *task)
  409. {
  410. const Args *args= static_cast<const Args *>(gearman_task_context(task));
  411. if (args->prefix())
  412. fprintf(stderr, "%s: ", gearman_task_function_name(task));
  413. fprintf(stderr, "Job failed\n");
  414. args->set_error();
  415. return GEARMAN_SUCCESS;
  416. }
  417. static void _worker_free(void *, void *)
  418. {
  419. }
  420. void _worker(Args &args)
  421. {
  422. libgearman::Worker worker;
  423. if (args.timeout() >= 0)
  424. {
  425. gearman_worker_set_timeout(&worker, args.timeout());
  426. }
  427. if (getenv("GEARMAN_SERVER"))
  428. {
  429. if (gearman_failed(gearman_worker_add_servers(&worker, getenv("GEARMAN_SERVER"))))
  430. {
  431. error::message("Error occurred while parsing GEARMAN_SERVER", &worker);
  432. return;
  433. }
  434. }
  435. else if (gearman_failed(gearman_worker_add_server(&worker, args.host(), args.port())))
  436. {
  437. error::message("gearman_worker_add_server", &worker);
  438. _exit(EXIT_FAILURE);
  439. }
  440. gearman_worker_set_workload_free_fn(&worker, _worker_free, NULL);
  441. for (Function::vector::iterator iter= args.begin();
  442. iter != args.end();
  443. ++iter)
  444. {
  445. Function &function= *iter;
  446. worker_argument_t pass(args, *iter);
  447. if (gearman_failed(gearman_worker_add_function(&worker, function.name(), 0, _worker_cb, &pass)))
  448. {
  449. error::message("gearman_worker_add_function", &worker);
  450. _exit(EXIT_FAILURE);
  451. }
  452. }
  453. while (1)
  454. {
  455. if (gearman_failed(gearman_worker_work(&worker)))
  456. {
  457. error::message("gearman_worker_work", &worker);
  458. }
  459. if (args.count() > 0)
  460. {
  461. --args.count();
  462. if (args.count() == 0)
  463. break;
  464. }
  465. }
  466. }
  467. extern "C" {
  468. static bool local_wexitstatus(int status)
  469. {
  470. if (WEXITSTATUS(status) != 0)
  471. return true;
  472. return false;
  473. }
  474. }
  475. static void *_worker_cb(gearman_job_st *job, void *context,
  476. size_t *result_size, gearman_return_t *ret_ptr)
  477. {
  478. worker_argument_t *arguments= static_cast<worker_argument_t *>(context);
  479. Args &args= arguments->args;
  480. Function &function= arguments->function;
  481. function.buffer().clear();
  482. *ret_ptr= GEARMAN_SUCCESS;
  483. if (not args.arguments())
  484. {
  485. if (write(STDOUT_FILENO, gearman_job_workload(job),
  486. gearman_job_workload_size(job)) == -1)
  487. {
  488. error::perror("write");
  489. }
  490. }
  491. else
  492. {
  493. int in_fds[2];
  494. int out_fds[2];
  495. if (pipe(in_fds) == -1 or pipe(out_fds) == -1)
  496. {
  497. error::perror("pipe");
  498. }
  499. pid_t pid;
  500. switch ((pid= fork()))
  501. {
  502. case -1:
  503. error::perror("fork");
  504. return NULL;
  505. case 0:
  506. if (dup2(in_fds[0], 0) == -1)
  507. {
  508. error::perror("dup2");
  509. return NULL;
  510. }
  511. if (close(in_fds[1]) < 0)
  512. {
  513. error::perror("close");
  514. return NULL;
  515. }
  516. if (dup2(out_fds[1], 1) == -1)
  517. {
  518. error::perror("dup2");
  519. return NULL;
  520. }
  521. if (close(out_fds[0]) < 0)
  522. {
  523. error::perror("close");
  524. return NULL;
  525. }
  526. if (execvp(args.argument(0), args.argumentv()) < 0)
  527. {
  528. error::perror("execvp");
  529. return NULL;
  530. }
  531. default:
  532. break;
  533. }
  534. if (close(in_fds[0]) < 0)
  535. {
  536. error::perror("close");
  537. }
  538. if (close(out_fds[1]) < 0)
  539. {
  540. error::perror("close");
  541. }
  542. if (gearman_job_workload_size(job) > 0)
  543. {
  544. if (write(in_fds[1], gearman_job_workload(job),
  545. gearman_job_workload_size(job)) == -1)
  546. {
  547. error::perror("write");
  548. }
  549. }
  550. if (close(in_fds[1]) < 0)
  551. {
  552. error::perror("close");
  553. }
  554. if (args.job_per_newline())
  555. {
  556. FILE *f= fdopen(out_fds[0], "r");
  557. if (f == NULL)
  558. {
  559. error::perror("fdopen");
  560. }
  561. function.buffer().clear();
  562. while (1)
  563. {
  564. char buffer[1024];
  565. if (fgets(buffer, sizeof(buffer), f) == NULL)
  566. {
  567. break;
  568. }
  569. size_t length= strlen(buffer);
  570. for (size_t x= 0; x < length ; x++)
  571. {
  572. function.buffer().push_back(buffer[x]);
  573. }
  574. if (args.strip_newline())
  575. {
  576. *ret_ptr= gearman_job_send_data(job, function.buffer_ptr(), function.buffer().size() - 1);
  577. }
  578. else
  579. {
  580. *ret_ptr= gearman_job_send_data(job, function.buffer_ptr(), function.buffer().size());
  581. }
  582. if (*ret_ptr != GEARMAN_SUCCESS)
  583. {
  584. error::message("gearman_job_send_data() failed with", *ret_ptr);
  585. break;
  586. }
  587. }
  588. function.buffer().clear();
  589. fclose(f);
  590. }
  591. else
  592. {
  593. _read_workload(out_fds[0], function.buffer());
  594. if (close(out_fds[0]) < 0)
  595. {
  596. error::perror("close");
  597. }
  598. *result_size= function.buffer().size();
  599. }
  600. int status;
  601. if (wait(&status) == -1)
  602. {
  603. error::perror("wait");
  604. }
  605. if (local_wexitstatus(status))
  606. {
  607. if (not function.buffer().empty())
  608. {
  609. *ret_ptr= gearman_job_send_data(job, function.buffer_ptr(), function.buffer().size());
  610. if (*ret_ptr != GEARMAN_SUCCESS)
  611. return NULL;
  612. }
  613. *ret_ptr= GEARMAN_WORK_FAIL;
  614. return NULL;
  615. }
  616. }
  617. return function.buffer_ptr();
  618. }
  619. void _read_workload(int fd, Bytes& workload)
  620. {
  621. while (1)
  622. {
  623. char buffer[1024];
  624. ssize_t read_ret= read(fd, buffer, sizeof(buffer));
  625. if (read_ret == -1)
  626. {
  627. error::perror("read");
  628. }
  629. else if (read_ret == 0)
  630. {
  631. break;
  632. }
  633. workload.reserve(workload.size() + static_cast<size_t>(read_ret));
  634. for (size_t x= 0; x < static_cast<size_t>(read_ret); x++)
  635. {
  636. workload.push_back(buffer[x]);
  637. }
  638. }
  639. }
  640. static void usage(Args& args, char *name)
  641. {
  642. FILE *stream;
  643. if (args.is_error())
  644. {
  645. stream= stderr;
  646. fprintf(stream, "\n%s\tError in usage(%s).\n\n", name, args.arg_error());
  647. }
  648. else
  649. {
  650. stream= stdout;
  651. fprintf(stream, "\n%s\tError in usage(%s).\n\n", name, args.arg_error());
  652. }
  653. fprintf(stream, "Client mode: %s [options] [<data>]\n", name);
  654. fprintf(stream, "Worker mode: %s -w [options] [<command> [<args> ...]]\n", name);
  655. fprintf(stream, "\nCommon options to both client and worker modes.\n");
  656. fprintf(stream, "\t-f <function> - Function name to use for jobs (can give many)\n");
  657. fprintf(stream, "\t-h <host> - Job server host\n");
  658. fprintf(stream, "\t-H - Print this help menu\n");
  659. fprintf(stream, "\t-v - Print diagnostic information to stdout(%s)\n", args.verbose() ? "true" : "false");
  660. fprintf(stream, "\t-p <port> - Job server port\n");
  661. fprintf(stream, "\t-t <timeout> - Timeout in milliseconds\n");
  662. fprintf(stream, "\t-i <pidfile> - Create a pidfile for the process\n");
  663. fprintf(stream, "\nClient options:\n");
  664. fprintf(stream, "\t-b - Run jobs in the background(%s)\n", args.background() ? "true" : "false");
  665. fprintf(stream, "\t-I - Run jobs as high priority\n");
  666. fprintf(stream, "\t-L - Run jobs as low priority\n");
  667. fprintf(stream, "\t-n - Run one job per line(%s)\n", args.job_per_newline() ? "true" : "false");
  668. fprintf(stream, "\t-N - Same as -n, but strip off the newline(%s)\n", args.strip_newline() ? "true" : "false");
  669. fprintf(stream, "\t-P - Prefix all output lines with functions names\n");
  670. fprintf(stream, "\t-s - Send job without reading from standard input\n");
  671. fprintf(stream, "\t-u <unique> - Unique key to use for job\n");
  672. fprintf(stream, "\nWorker options:\n");
  673. fprintf(stream, "\t-c <count> - Number of jobs for worker to run before exiting\n");
  674. fprintf(stream, "\t-n - Send data packet for each line(%s)\n", args.job_per_newline() ? "true" : "false");
  675. fprintf(stream, "\t-N - Same as -n, but strip off the newline(%s)\n", args.strip_newline() ? "true" : "false");
  676. fprintf(stream, "\t-w - Run in worker mode(%s)\n", args.worker() ? "true" : "false");
  677. }