gearman.cc 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. #include "gear_config.h"
  39. #include <assert.h>
  40. #include <errno.h>
  41. #include <fcntl.h>
  42. #include <signal.h>
  43. #include <stdio.h>
  44. #include <stdlib.h>
  45. #include <string.h>
  46. #include <sys/stat.h>
  47. #include <sys/types.h>
  48. #include <sys/wait.h>
  49. #include <unistd.h>
  50. #include <iostream>
  51. #include <vector>
  52. #include <libgearman-1.0/gearman.h>
  53. #include "bin/arguments.h"
  54. #include "libgearman/client.hpp"
  55. #include "libgearman/worker.hpp"
  56. using namespace org::gearmand;
  57. #include "util/pidfile.hpp"
  58. #include "bin/error.h"
  59. using namespace gearman_client;
  60. using namespace datadifferential;
  61. #define GEARMAN_INITIAL_WORKLOAD_SIZE 8192
  62. struct worker_argument_t
  63. {
  64. Args &args;
  65. Function &function;
  66. worker_argument_t(Args &args_arg, Function &function_arg) :
  67. args(args_arg),
  68. function(function_arg)
  69. {
  70. }
  71. };
  72. /**
  73. * Function to run in client mode.
  74. */
  75. static void _client(Args &args);
  76. /**
  77. * Run client jobs.
  78. */
  79. static void _client_run(libgearman::Client& client, Args &args,
  80. const void *workload, size_t workload_size);
  81. static gearman_return_t _client_created(gearman_task_st *task);
  82. /**
  83. * Client data/complete callback function.
  84. */
  85. static gearman_return_t _client_data(gearman_task_st *task);
  86. /**
  87. * Client warning/exception callback function.
  88. */
  89. static gearman_return_t _client_warning(gearman_task_st *task);
  90. /**
  91. * Client status callback function.
  92. */
  93. static gearman_return_t _client_status(gearman_task_st *task);
  94. /**
  95. * Client fail callback function.
  96. */
  97. static gearman_return_t _client_fail(gearman_task_st *task);
  98. /**
  99. * Function to run in worker mode.
  100. */
  101. static void _worker(Args &args);
  102. /**
  103. * Callback function when worker gets a job.
  104. */
  105. static void *_worker_cb(gearman_job_st *job, void *context,
  106. size_t *result_size, gearman_return_t *ret_ptr);
  107. /**
  108. * Read workload chunk from a file descriptor and put into allocated memory.
  109. */
  110. static void _read_workload(int fd, Bytes& workload);
  111. /**
  112. * Print usage information.
  113. */
  114. static void usage(Args&, char *name);
  115. extern "C"
  116. {
  117. static void signal_setup()
  118. {
  119. if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
  120. {
  121. error::perror("signal");
  122. }
  123. }
  124. }
  125. int main(int argc, char *argv[])
  126. {
  127. Args args(argc, argv);
  128. bool close_stdio= false;
  129. if (args.usage())
  130. {
  131. usage(args, argv[0]);
  132. return EXIT_SUCCESS;
  133. }
  134. if (args.is_error())
  135. {
  136. usage(args, argv[0]);
  137. return EXIT_FAILURE;
  138. }
  139. signal_setup();
  140. if (args.daemon())
  141. {
  142. switch (fork())
  143. {
  144. case -1:
  145. error::perror("fork");
  146. return EXIT_FAILURE;
  147. case 0:
  148. break;
  149. default:
  150. return EXIT_SUCCESS;
  151. }
  152. if (setsid() == -1)
  153. {
  154. error::perror("setsid");
  155. return EXIT_FAILURE;
  156. }
  157. close_stdio= true;
  158. }
  159. if (close_stdio)
  160. {
  161. /* If we can't remap stdio, it should not a fatal error. */
  162. int fd= open("/dev/null", O_RDWR, 0);
  163. if (fd != -1)
  164. {
  165. if (dup2(fd, STDIN_FILENO) == -1)
  166. {
  167. error::perror("dup2");
  168. return EXIT_FAILURE;
  169. }
  170. if (dup2(fd, STDOUT_FILENO) == -1)
  171. {
  172. error::perror("dup2");
  173. return EXIT_FAILURE;
  174. }
  175. if (dup2(fd, STDERR_FILENO) == -1)
  176. {
  177. error::perror("dup2");
  178. return EXIT_FAILURE;
  179. }
  180. close(fd);
  181. }
  182. }
  183. util::Pidfile _pid_file(args.pid_file());
  184. if (not _pid_file.create())
  185. {
  186. error::perror(_pid_file.error_message().c_str());
  187. return EXIT_FAILURE;
  188. }
  189. if (args.worker())
  190. {
  191. _worker(args);
  192. }
  193. else
  194. {
  195. _client(args);
  196. }
  197. return args.error();
  198. }
  199. void _client(Args &args)
  200. {
  201. libgearman::Client client;
  202. Bytes workload;
  203. if (args.timeout() >= 0)
  204. {
  205. gearman_client_set_timeout(&client, args.timeout());
  206. }
  207. if (getenv("GEARMAN_SERVER"))
  208. {
  209. if (gearman_failed(gearman_client_add_servers(&client, getenv("GEARMAN_SERVER"))))
  210. {
  211. error::message("Error occurred while parsing GEARMAN_SERVER", &client);
  212. return;
  213. }
  214. }
  215. else if (gearman_failed(gearman_client_add_server(&client, args.host(), args.port())))
  216. {
  217. error::message("gearman_client_add_server", &client);
  218. return;
  219. }
  220. if (args.use_ssl())
  221. {
  222. gearman_client_add_options(&client, GEARMAN_CLIENT_SSL);
  223. }
  224. gearman_client_set_created_fn(&client, _client_created);
  225. gearman_client_set_data_fn(&client, _client_data);
  226. gearman_client_set_warning_fn(&client, _client_warning);
  227. gearman_client_set_status_fn(&client, _client_status);
  228. gearman_client_set_complete_fn(&client, _client_data);
  229. gearman_client_set_exception_fn(&client, _client_warning);
  230. gearman_client_set_fail_fn(&client, _client_fail);
  231. if (not args.arguments())
  232. {
  233. if (args.suppress_input())
  234. {
  235. _client_run(client, args, NULL, 0);
  236. }
  237. else if (args.job_per_newline())
  238. {
  239. workload.resize(GEARMAN_INITIAL_WORKLOAD_SIZE);
  240. while (1)
  241. {
  242. if (fgets(&workload[0], static_cast<int>(workload.size()), stdin) == NULL)
  243. {
  244. break;
  245. }
  246. if (args.strip_newline())
  247. {
  248. _client_run(client, args, &workload[0], strlen(&workload[0]) - 1);
  249. }
  250. else
  251. {
  252. _client_run(client, args, &workload[0], strlen(&workload[0]));
  253. }
  254. }
  255. }
  256. else
  257. {
  258. _read_workload(STDIN_FILENO, workload);
  259. _client_run(client, args, &workload[0], workload.size());
  260. }
  261. }
  262. else
  263. {
  264. for (size_t x= 0; args.argument(x) != NULL; x++)
  265. {
  266. _client_run(client, args, args.argument(x), strlen(args.argument(x)));
  267. }
  268. }
  269. }
  270. void _client_run(libgearman::Client& client, Args &args,
  271. const void *workload, size_t workload_size)
  272. {
  273. gearman_return_t ret;
  274. for (Function::vector::iterator iter= args.begin();
  275. iter != args.end();
  276. ++iter)
  277. {
  278. Function &function= *iter;
  279. /* This is a bit nasty, but all we have currently is multiple function
  280. calls. */
  281. if (args.background())
  282. {
  283. switch (args.priority())
  284. {
  285. case GEARMAN_JOB_PRIORITY_HIGH:
  286. (void)gearman_client_add_task_high_background(&client,
  287. NULL,
  288. &args,
  289. function.name(),
  290. args.unique(),
  291. workload,
  292. workload_size, &ret);
  293. break;
  294. case GEARMAN_JOB_PRIORITY_NORMAL:
  295. (void)gearman_client_add_task_background(&client,
  296. NULL,
  297. &args,
  298. function.name(),
  299. args.unique(),
  300. workload,
  301. workload_size, &ret);
  302. break;
  303. case GEARMAN_JOB_PRIORITY_LOW:
  304. (void)gearman_client_add_task_low_background(&client,
  305. NULL,
  306. &args,
  307. function.name(),
  308. args.unique(),
  309. workload,
  310. workload_size, &ret);
  311. break;
  312. case GEARMAN_JOB_PRIORITY_MAX:
  313. default:
  314. /* This should never happen. */
  315. ret= GEARMAN_UNKNOWN_STATE;
  316. break;
  317. }
  318. }
  319. else
  320. {
  321. switch (args.priority())
  322. {
  323. case GEARMAN_JOB_PRIORITY_HIGH:
  324. (void)gearman_client_add_task_high(&client,
  325. NULL,
  326. &args,
  327. function.name(),
  328. args.unique(),
  329. workload, workload_size, &ret);
  330. break;
  331. case GEARMAN_JOB_PRIORITY_NORMAL:
  332. (void)gearman_client_add_task(&client,
  333. NULL,
  334. &args,
  335. function.name(),
  336. args.unique(),
  337. workload,
  338. workload_size, &ret);
  339. break;
  340. case GEARMAN_JOB_PRIORITY_LOW:
  341. (void)gearman_client_add_task_low(&client,
  342. NULL,
  343. &args,
  344. function.name(),
  345. args.unique(),
  346. workload, workload_size, &ret);
  347. break;
  348. case GEARMAN_JOB_PRIORITY_MAX:
  349. default:
  350. /* This should never happen. */
  351. ret= GEARMAN_UNKNOWN_STATE;
  352. break;
  353. }
  354. }
  355. if (gearman_failed(ret))
  356. {
  357. error::message("gearman_client_add_task", &client);
  358. }
  359. }
  360. if (gearman_failed(gearman_client_run_tasks(&client)))
  361. {
  362. error::message("gearman_client_run_tasks", &client);
  363. }
  364. }
  365. static gearman_return_t _client_created(gearman_task_st *task)
  366. {
  367. const Args *args= static_cast<const Args*>(gearman_task_context(task));
  368. if (args->verbose())
  369. {
  370. fprintf(stdout, "Task created: %s\n", gearman_task_job_handle(task));
  371. }
  372. return GEARMAN_SUCCESS;
  373. }
  374. static gearman_return_t _client_data(gearman_task_st *task)
  375. {
  376. const Args *args= static_cast<const Args*>(gearman_task_context(task));
  377. if (args->prefix())
  378. {
  379. fprintf(stdout, "%s: ", gearman_task_function_name(task));
  380. fflush(stdout);
  381. }
  382. if (write(fileno(stdout), gearman_task_data(task), gearman_task_data_size(task)) == -1)
  383. {
  384. error::perror("write");
  385. return GEARMAN_ERRNO;
  386. }
  387. return GEARMAN_SUCCESS;
  388. }
  389. static gearman_return_t _client_warning(gearman_task_st *task)
  390. {
  391. const Args *args= static_cast<const Args*>(gearman_task_context(task));
  392. if (args->prefix())
  393. {
  394. fprintf(stderr, "%s: ", gearman_task_function_name(task));
  395. fflush(stderr);
  396. }
  397. if (write(fileno(stderr), gearman_task_data(task), gearman_task_data_size(task)) == -1)
  398. {
  399. error::perror("write");
  400. }
  401. return GEARMAN_SUCCESS;
  402. }
  403. static gearman_return_t _client_status(gearman_task_st *task)
  404. {
  405. const Args *args= static_cast<const Args*>(gearman_task_context(task));
  406. if (args->prefix())
  407. printf("%s: ", gearman_task_function_name(task));
  408. printf("%u%% Complete\n", (gearman_task_numerator(task) * 100) /
  409. gearman_task_denominator(task));
  410. return GEARMAN_SUCCESS;
  411. }
  412. static gearman_return_t _client_fail(gearman_task_st *task)
  413. {
  414. const Args *args= static_cast<const Args *>(gearman_task_context(task));
  415. if (args->prefix())
  416. fprintf(stderr, "%s: ", gearman_task_function_name(task));
  417. fprintf(stderr, "Job failed\n");
  418. args->set_error();
  419. return GEARMAN_SUCCESS;
  420. }
  421. static void _worker_free(void *, void *)
  422. {
  423. }
  424. void _worker(Args &args)
  425. {
  426. libgearman::Worker worker;
  427. if (args.timeout() >= 0)
  428. {
  429. gearman_worker_set_timeout(&worker, args.timeout());
  430. }
  431. if (getenv("GEARMAN_SERVER"))
  432. {
  433. if (gearman_failed(gearman_worker_add_servers(&worker, getenv("GEARMAN_SERVER"))))
  434. {
  435. error::message("Error occurred while parsing GEARMAN_SERVER", &worker);
  436. return;
  437. }
  438. }
  439. else if (gearman_failed(gearman_worker_add_server(&worker, args.host(), args.port())))
  440. {
  441. error::message("gearman_worker_add_server", &worker);
  442. _exit(EXIT_FAILURE);
  443. }
  444. if (args.use_ssl())
  445. {
  446. gearman_worker_add_options(&worker, GEARMAN_WORKER_SSL);
  447. }
  448. gearman_worker_set_workload_free_fn(&worker, _worker_free, NULL);
  449. for (Function::vector::iterator iter= args.begin();
  450. iter != args.end();
  451. ++iter)
  452. {
  453. Function &function= *iter;
  454. worker_argument_t pass(args, *iter);
  455. if (gearman_failed(gearman_worker_add_function(&worker, function.name(), 0, _worker_cb, &pass)))
  456. {
  457. error::message("gearman_worker_add_function", &worker);
  458. _exit(EXIT_FAILURE);
  459. }
  460. }
  461. while (1)
  462. {
  463. if (gearman_failed(gearman_worker_work(&worker)))
  464. {
  465. error::message("gearman_worker_work", &worker);
  466. }
  467. if (args.count() > 0)
  468. {
  469. --args.count();
  470. if (args.count() == 0)
  471. break;
  472. }
  473. }
  474. }
  475. extern "C" {
  476. static bool local_wexitstatus(int status)
  477. {
  478. if (WEXITSTATUS(status) != 0)
  479. return true;
  480. return false;
  481. }
  482. }
  483. static void *_worker_cb(gearman_job_st *job, void *context,
  484. size_t *result_size, gearman_return_t *ret_ptr)
  485. {
  486. worker_argument_t *arguments= static_cast<worker_argument_t *>(context);
  487. Args &args= arguments->args;
  488. Function &function= arguments->function;
  489. function.buffer().clear();
  490. *ret_ptr= GEARMAN_SUCCESS;
  491. if (not args.arguments())
  492. {
  493. if (write(STDOUT_FILENO, gearman_job_workload(job),
  494. gearman_job_workload_size(job)) == -1)
  495. {
  496. error::perror("write");
  497. }
  498. }
  499. else
  500. {
  501. int in_fds[2];
  502. int out_fds[2];
  503. if (pipe(in_fds) == -1 or pipe(out_fds) == -1)
  504. {
  505. error::perror("pipe");
  506. }
  507. pid_t pid;
  508. switch ((pid= fork()))
  509. {
  510. case -1:
  511. error::perror("fork");
  512. return NULL;
  513. case 0:
  514. if (dup2(in_fds[0], 0) == -1)
  515. {
  516. error::perror("dup2");
  517. return NULL;
  518. }
  519. if (close(in_fds[1]) < 0)
  520. {
  521. error::perror("close");
  522. return NULL;
  523. }
  524. if (dup2(out_fds[1], 1) == -1)
  525. {
  526. error::perror("dup2");
  527. return NULL;
  528. }
  529. if (close(out_fds[0]) < 0)
  530. {
  531. error::perror("close");
  532. return NULL;
  533. }
  534. if (execvp(args.argument(0), args.argumentv()) < 0)
  535. {
  536. error::perror("execvp");
  537. return NULL;
  538. }
  539. default:
  540. break;
  541. }
  542. if (close(in_fds[0]) < 0)
  543. {
  544. error::perror("close");
  545. }
  546. if (close(out_fds[1]) < 0)
  547. {
  548. error::perror("close");
  549. }
  550. if (gearman_job_workload_size(job) > 0)
  551. {
  552. if (write(in_fds[1], gearman_job_workload(job),
  553. gearman_job_workload_size(job)) == -1)
  554. {
  555. error::perror("write");
  556. }
  557. }
  558. if (close(in_fds[1]) < 0)
  559. {
  560. error::perror("close");
  561. }
  562. if (args.job_per_newline())
  563. {
  564. FILE *f= fdopen(out_fds[0], "r");
  565. if (f == NULL)
  566. {
  567. error::perror("fdopen");
  568. }
  569. function.buffer().clear();
  570. while (1)
  571. {
  572. char buffer[1024];
  573. if (fgets(buffer, sizeof(buffer), f) == NULL)
  574. {
  575. break;
  576. }
  577. size_t length= strlen(buffer);
  578. for (size_t x= 0; x < length ; x++)
  579. {
  580. function.buffer().push_back(buffer[x]);
  581. }
  582. if (args.strip_newline())
  583. {
  584. *ret_ptr= gearman_job_send_data(job, function.buffer_ptr(), function.buffer().size() - 1);
  585. }
  586. else
  587. {
  588. *ret_ptr= gearman_job_send_data(job, function.buffer_ptr(), function.buffer().size());
  589. }
  590. if (*ret_ptr != GEARMAN_SUCCESS)
  591. {
  592. error::message("gearman_job_send_data() failed with", *ret_ptr);
  593. break;
  594. }
  595. }
  596. function.buffer().clear();
  597. fclose(f);
  598. }
  599. else
  600. {
  601. _read_workload(out_fds[0], function.buffer());
  602. if (close(out_fds[0]) < 0)
  603. {
  604. error::perror("close");
  605. }
  606. *result_size= function.buffer().size();
  607. }
  608. int status;
  609. if (wait(&status) == -1)
  610. {
  611. error::perror("wait");
  612. }
  613. if (local_wexitstatus(status))
  614. {
  615. if (not function.buffer().empty())
  616. {
  617. *ret_ptr= gearman_job_send_data(job, function.buffer_ptr(), function.buffer().size());
  618. if (*ret_ptr != GEARMAN_SUCCESS)
  619. return NULL;
  620. }
  621. *ret_ptr= GEARMAN_WORK_FAIL;
  622. return NULL;
  623. }
  624. }
  625. return function.buffer_ptr();
  626. }
  627. void _read_workload(int fd, Bytes& workload)
  628. {
  629. while (1)
  630. {
  631. char buffer[1024];
  632. ssize_t read_ret= read(fd, buffer, sizeof(buffer));
  633. if (read_ret == -1)
  634. {
  635. error::perror("read");
  636. }
  637. else if (read_ret == 0)
  638. {
  639. break;
  640. }
  641. workload.reserve(workload.size() + static_cast<size_t>(read_ret));
  642. for (size_t x= 0; x < static_cast<size_t>(read_ret); x++)
  643. {
  644. workload.push_back(buffer[x]);
  645. }
  646. }
  647. }
  648. static void usage(Args& args, char *name)
  649. {
  650. FILE *stream;
  651. if (args.is_error())
  652. {
  653. stream= stderr;
  654. fprintf(stream, "\n%s\tError in usage(%s).\n\n", name, args.arg_error());
  655. }
  656. else
  657. {
  658. stream= stdout;
  659. fprintf(stream, "\n%s\tError in usage(%s).\n\n", name, args.arg_error());
  660. }
  661. fprintf(stream, "Client mode: %s [options] [<data>]\n", name);
  662. fprintf(stream, "Worker mode: %s -w [options] [<command> [<args> ...]]\n", name);
  663. fprintf(stream, "\nCommon options to both client and worker modes.\n");
  664. fprintf(stream, "\t-f <function> - Function name to use for jobs (can give many)\n");
  665. fprintf(stream, "\t-h <host> - Job server host\n");
  666. fprintf(stream, "\t-H - Print this help menu\n");
  667. fprintf(stream, "\t-v - Print diagnostic information to stdout(%s)\n", args.verbose() ? "true" : "false");
  668. fprintf(stream, "\t-p <port> - Job server port\n");
  669. fprintf(stream, "\t-t <timeout> - Timeout in milliseconds\n");
  670. fprintf(stream, "\t-i <pidfile> - Create a pidfile for the process\n");
  671. fprintf(stream, "\t-S - Enable SSL connections\n");
  672. fprintf(stream, "\nClient options:\n");
  673. fprintf(stream, "\t-b - Run jobs in the background(%s)\n", args.background() ? "true" : "false");
  674. fprintf(stream, "\t-I - Run jobs as high priority\n");
  675. fprintf(stream, "\t-L - Run jobs as low priority\n");
  676. fprintf(stream, "\t-n - Run one job per line(%s)\n", args.job_per_newline() ? "true" : "false");
  677. fprintf(stream, "\t-N - Same as -n, but strip off the newline(%s)\n", args.strip_newline() ? "true" : "false");
  678. fprintf(stream, "\t-P - Prefix all output lines with functions names\n");
  679. fprintf(stream, "\t-s - Send job without reading from standard input\n");
  680. fprintf(stream, "\t-u <unique> - Unique key to use for job\n");
  681. fprintf(stream, "\nWorker options:\n");
  682. fprintf(stream, "\t-c <count> - Number of jobs for worker to run before exiting\n");
  683. fprintf(stream, "\t-n - Send data packet for each line(%s)\n", args.job_per_newline() ? "true" : "false");
  684. fprintf(stream, "\t-N - Same as -n, but strip off the newline(%s)\n", args.strip_newline() ? "true" : "false");
  685. fprintf(stream, "\t-w - Run in worker mode(%s)\n", args.worker() ? "true" : "false");
  686. }