gearman.cc 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Gearman Command Line Tool
  11. */
  12. #include <config.h>
  13. #include <assert.h>
  14. #include <errno.h>
  15. #include <fcntl.h>
  16. #include <signal.h>
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. #include <string.h>
  20. #include <sys/stat.h>
  21. #include <sys/types.h>
  22. #include <sys/wait.h>
  23. #include <unistd.h>
  24. #include <iostream>
  25. #include <vector>
  26. #include <libgearman/gearman.h>
  27. #include "bin/arguments.h"
  28. #include "bin/client.h"
  29. #include "bin/worker.h"
  30. #include "util/pidfile.hpp"
  31. #include "bin/error.h"
  32. using namespace gearman_client;
  33. using namespace datadifferential;
  34. #define GEARMAN_INITIAL_WORKLOAD_SIZE 8192
  35. struct worker_argument_t
  36. {
  37. Args &args;
  38. Function &function;
  39. worker_argument_t(Args &args_arg, Function &function_arg) :
  40. args(args_arg),
  41. function(function_arg)
  42. {
  43. }
  44. };
  45. /**
  46. * Function to run in client mode.
  47. */
  48. static void _client(Args &args);
  49. /**
  50. * Run client jobs.
  51. */
  52. static void _client_run(gearman_client_st& client, Args &args,
  53. const void *workload, size_t workload_size);
  54. /**
  55. * Client data/complete callback function.
  56. */
  57. static gearman_return_t _client_data(gearman_task_st *task);
  58. /**
  59. * Client warning/exception callback function.
  60. */
  61. static gearman_return_t _client_warning(gearman_task_st *task);
  62. /**
  63. * Client status callback function.
  64. */
  65. static gearman_return_t _client_status(gearman_task_st *task);
  66. /**
  67. * Client fail callback function.
  68. */
  69. static gearman_return_t _client_fail(gearman_task_st *task);
  70. /**
  71. * Function to run in worker mode.
  72. */
  73. static void _worker(Args &args);
  74. /**
  75. * Callback function when worker gets a job.
  76. */
  77. static void *_worker_cb(gearman_job_st *job, void *context,
  78. size_t *result_size, gearman_return_t *ret_ptr);
  79. /**
  80. * Read workload chunk from a file descriptor and put into allocated memory.
  81. */
  82. static void _read_workload(int fd, Bytes& workload);
  83. /**
  84. * Print usage information.
  85. */
  86. static void usage(char *name);
  87. extern "C"
  88. {
  89. static void signal_setup()
  90. {
  91. if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
  92. {
  93. error::perror("signal");
  94. }
  95. }
  96. }
  97. int main(int argc, char *argv[])
  98. {
  99. Args args(argc, argv);
  100. bool close_stdio= false;
  101. if (args.usage())
  102. {
  103. usage(argv[0]);
  104. return args.is_error() ? EXIT_FAILURE : EXIT_SUCCESS;
  105. }
  106. signal_setup();
  107. if (args.daemon())
  108. {
  109. switch (fork())
  110. {
  111. case -1:
  112. error::perror("fork");
  113. return EXIT_FAILURE;
  114. case 0:
  115. break;
  116. default:
  117. return EXIT_SUCCESS;
  118. }
  119. if (setsid() == -1)
  120. {
  121. error::perror("setsid");
  122. return EXIT_FAILURE;
  123. }
  124. close_stdio= true;
  125. }
  126. if (close_stdio)
  127. {
  128. /* If we can't remap stdio, it should not a fatal error. */
  129. int fd= open("/dev/null", O_RDWR, 0);
  130. if (fd != -1)
  131. {
  132. if (dup2(fd, STDIN_FILENO) == -1)
  133. {
  134. error::perror("dup2");
  135. return EXIT_FAILURE;
  136. }
  137. if (dup2(fd, STDOUT_FILENO) == -1)
  138. {
  139. error::perror("dup2");
  140. return EXIT_FAILURE;
  141. }
  142. if (dup2(fd, STDERR_FILENO) == -1)
  143. {
  144. error::perror("dup2");
  145. return EXIT_FAILURE;
  146. }
  147. close(fd);
  148. }
  149. }
  150. util::Pidfile _pid_file(args.pid_file());
  151. if (not _pid_file.create())
  152. {
  153. error::perror(_pid_file.error_message().c_str());
  154. return EXIT_FAILURE;
  155. }
  156. if (args.worker())
  157. {
  158. _worker(args);
  159. }
  160. else
  161. {
  162. _client(args);
  163. }
  164. return args.error();
  165. }
  166. void _client(Args &args)
  167. {
  168. Client local_client;
  169. gearman_client_st &client= local_client.client();
  170. Bytes workload;
  171. if (args.timeout() >= 0)
  172. {
  173. gearman_client_set_timeout(&client, args.timeout());
  174. }
  175. if (gearman_failed(gearman_client_add_server(&client, args.host(), args.port())))
  176. {
  177. error::message("gearman_client_add_server", client);
  178. return;
  179. }
  180. gearman_client_set_data_fn(&client, _client_data);
  181. gearman_client_set_warning_fn(&client, _client_warning);
  182. gearman_client_set_status_fn(&client, _client_status);
  183. gearman_client_set_complete_fn(&client, _client_data);
  184. gearman_client_set_exception_fn(&client, _client_warning);
  185. gearman_client_set_fail_fn(&client, _client_fail);
  186. if (not args.arguments())
  187. {
  188. if (args.suppress_input())
  189. {
  190. _client_run(client, args, NULL, 0);
  191. }
  192. else if (args.job_per_newline())
  193. {
  194. workload.resize(GEARMAN_INITIAL_WORKLOAD_SIZE);
  195. while (1)
  196. {
  197. if (fgets(&workload[0], static_cast<int>(workload.size()), stdin) == NULL)
  198. {
  199. break;
  200. }
  201. if (args.strip_newline())
  202. {
  203. _client_run(client, args, &workload[0], strlen(&workload[0]) - 1);
  204. }
  205. else
  206. {
  207. _client_run(client, args, &workload[0], strlen(&workload[0]));
  208. }
  209. }
  210. }
  211. else
  212. {
  213. _read_workload(STDIN_FILENO, workload);
  214. _client_run(client, args, &workload[0], workload.size());
  215. }
  216. }
  217. else
  218. {
  219. for (size_t x= 0; args.argument(x) != NULL; x++)
  220. {
  221. _client_run(client, args, args.argument(x), strlen(args.argument(x)));
  222. }
  223. }
  224. }
  225. void _client_run(gearman_client_st& client, Args &args,
  226. const void *workload, size_t workload_size)
  227. {
  228. gearman_return_t ret;
  229. for (Function::vector::iterator iter= args.begin();
  230. iter != args.end();
  231. iter++)
  232. {
  233. Function &function= *iter;
  234. /* This is a bit nasty, but all we have currently is multiple function
  235. calls. */
  236. if (args.background())
  237. {
  238. switch (args.priority())
  239. {
  240. case GEARMAN_JOB_PRIORITY_HIGH:
  241. (void)gearman_client_add_task_high_background(&client,
  242. NULL,
  243. &args,
  244. function.name(),
  245. args.unique(),
  246. workload,
  247. workload_size, &ret);
  248. break;
  249. case GEARMAN_JOB_PRIORITY_NORMAL:
  250. (void)gearman_client_add_task_background(&client,
  251. NULL,
  252. &args,
  253. function.name(),
  254. args.unique(),
  255. workload,
  256. workload_size, &ret);
  257. break;
  258. case GEARMAN_JOB_PRIORITY_LOW:
  259. (void)gearman_client_add_task_low_background(&client,
  260. NULL,
  261. &args,
  262. function.name(),
  263. args.unique(),
  264. workload,
  265. workload_size, &ret);
  266. break;
  267. case GEARMAN_JOB_PRIORITY_MAX:
  268. default:
  269. /* This should never happen. */
  270. ret= GEARMAN_UNKNOWN_STATE;
  271. break;
  272. }
  273. }
  274. else
  275. {
  276. switch (args.priority())
  277. {
  278. case GEARMAN_JOB_PRIORITY_HIGH:
  279. (void)gearman_client_add_task_high(&client,
  280. NULL,
  281. &args,
  282. function.name(),
  283. args.unique(),
  284. workload, workload_size, &ret);
  285. break;
  286. case GEARMAN_JOB_PRIORITY_NORMAL:
  287. (void)gearman_client_add_task(&client,
  288. NULL,
  289. &args,
  290. function.name(),
  291. args.unique(),
  292. workload,
  293. workload_size, &ret);
  294. break;
  295. case GEARMAN_JOB_PRIORITY_LOW:
  296. (void)gearman_client_add_task_low(&client,
  297. NULL,
  298. &args,
  299. function.name(),
  300. args.unique(),
  301. workload, workload_size, &ret);
  302. break;
  303. case GEARMAN_JOB_PRIORITY_MAX:
  304. default:
  305. /* This should never happen. */
  306. ret= GEARMAN_UNKNOWN_STATE;
  307. break;
  308. }
  309. }
  310. if (gearman_failed(ret))
  311. {
  312. error::message("gearman_client_add_task", client);
  313. }
  314. }
  315. if (gearman_failed(gearman_client_run_tasks(&client)))
  316. {
  317. error::message("gearman_client_run_tasks", client);
  318. }
  319. }
  320. static gearman_return_t _client_data(gearman_task_st *task)
  321. {
  322. const Args *args= static_cast<const Args*>(gearman_task_context(task));
  323. if (args->prefix())
  324. {
  325. fprintf(stdout, "%s: ", gearman_task_function_name(task));
  326. fflush(stdout);
  327. }
  328. if (write(fileno(stdout), gearman_task_data(task), gearman_task_data_size(task)) == -1)
  329. {
  330. error::perror("write");
  331. return GEARMAN_ERRNO;
  332. }
  333. return GEARMAN_SUCCESS;
  334. }
  335. static gearman_return_t _client_warning(gearman_task_st *task)
  336. {
  337. const Args *args= static_cast<const Args*>(gearman_task_context(task));
  338. if (args->prefix())
  339. {
  340. fprintf(stderr, "%s: ", gearman_task_function_name(task));
  341. fflush(stderr);
  342. }
  343. if (write(fileno(stderr), gearman_task_data(task), gearman_task_data_size(task)) == -1)
  344. {
  345. error::perror("write");
  346. }
  347. return GEARMAN_SUCCESS;
  348. }
  349. static gearman_return_t _client_status(gearman_task_st *task)
  350. {
  351. const Args *args= static_cast<const Args*>(gearman_task_context(task));
  352. if (args->prefix())
  353. printf("%s: ", gearman_task_function_name(task));
  354. printf("%u%% Complete\n", (gearman_task_numerator(task) * 100) /
  355. gearman_task_denominator(task));
  356. return GEARMAN_SUCCESS;
  357. }
  358. static gearman_return_t _client_fail(gearman_task_st *task)
  359. {
  360. const Args *args= static_cast<const Args *>(gearman_task_context(task));
  361. if (args->prefix())
  362. fprintf(stderr, "%s: ", gearman_task_function_name(task));
  363. fprintf(stderr, "Job failed\n");
  364. args->set_error();
  365. return GEARMAN_SUCCESS;
  366. }
  367. static void _worker_free(void *, void *)
  368. {
  369. }
  370. void _worker(Args &args)
  371. {
  372. Worker local_worker;
  373. gearman_worker_st &worker= local_worker.worker();
  374. if (args.timeout() >= 0)
  375. {
  376. gearman_worker_set_timeout(&worker, args.timeout());
  377. }
  378. if (gearman_failed(gearman_worker_add_server(&worker, args.host(), args.port())))
  379. {
  380. error::message("gearman_worker_add_server", worker);
  381. _exit(EXIT_FAILURE);
  382. }
  383. gearman_worker_set_workload_free_fn(&worker, _worker_free, NULL);
  384. for (Function::vector::iterator iter= args.begin();
  385. iter != args.end();
  386. iter++)
  387. {
  388. Function &function= *iter;
  389. worker_argument_t pass(args, *iter);
  390. if (gearman_failed(gearman_worker_add_function(&worker, function.name(), 0, _worker_cb, &pass)))
  391. {
  392. error::message("gearman_worker_add_function", worker);
  393. _exit(EXIT_FAILURE);
  394. }
  395. }
  396. while (1)
  397. {
  398. if (gearman_failed(gearman_worker_work(&worker)))
  399. {
  400. error::message("gearman_worker_work", worker);
  401. }
  402. if (args.count() > 0)
  403. {
  404. --args.count();
  405. if (args.count() == 0)
  406. break;
  407. }
  408. }
  409. }
  410. extern "C" {
  411. static bool local_wexitstatus(int status)
  412. {
  413. if (WEXITSTATUS(status) != 0)
  414. return true;
  415. return false;
  416. }
  417. }
  418. static void *_worker_cb(gearman_job_st *job, void *context,
  419. size_t *result_size, gearman_return_t *ret_ptr)
  420. {
  421. worker_argument_t *arguments= static_cast<worker_argument_t *>(context);
  422. int in_fds[2];
  423. int out_fds[2];
  424. int status;
  425. Args &args= arguments->args;
  426. Function &function= arguments->function;
  427. function.buffer().clear();
  428. *ret_ptr= GEARMAN_SUCCESS;
  429. if (not args.arguments())
  430. {
  431. if (write(STDOUT_FILENO, gearman_job_workload(job),
  432. gearman_job_workload_size(job)) == -1)
  433. {
  434. error::perror("write");
  435. }
  436. }
  437. else
  438. {
  439. if (pipe(in_fds) == -1 || pipe(out_fds) == -1)
  440. {
  441. error::perror("pipe");
  442. }
  443. pid_t pid;
  444. switch ((pid= fork()))
  445. {
  446. case -1:
  447. error::perror("fork");
  448. return NULL;
  449. case 0:
  450. if (dup2(in_fds[0], 0) == -1)
  451. {
  452. error::perror("dup2");
  453. return NULL;
  454. }
  455. if (close(in_fds[1]) < 0)
  456. {
  457. error::perror("close");
  458. return NULL;
  459. }
  460. if (dup2(out_fds[1], 1) == -1)
  461. {
  462. error::perror("dup2");
  463. return NULL;
  464. }
  465. if (close(out_fds[0]) < 0)
  466. {
  467. error::perror("close");
  468. return NULL;
  469. }
  470. if (execvp(args.argument(0), args.argumentv()) < 0)
  471. {
  472. error::perror("execvp");
  473. return NULL;
  474. }
  475. default:
  476. break;
  477. }
  478. if (close(in_fds[0]) < 0)
  479. {
  480. error::perror("close");
  481. }
  482. if (close(out_fds[1]) < 0)
  483. {
  484. error::perror("close");
  485. }
  486. if (gearman_job_workload_size(job) > 0)
  487. {
  488. if (write(in_fds[1], gearman_job_workload(job),
  489. gearman_job_workload_size(job)) == -1)
  490. {
  491. error::perror("write");
  492. }
  493. }
  494. if (close(in_fds[1]) < 0)
  495. {
  496. error::perror("close");
  497. }
  498. if (args.job_per_newline())
  499. {
  500. FILE *f= fdopen(out_fds[0], "r");
  501. if (f == NULL)
  502. {
  503. error::perror("fdopen");
  504. }
  505. function.buffer().clear();
  506. while (1)
  507. {
  508. char buffer[1024];
  509. if (fgets(buffer, sizeof(buffer), f) == NULL)
  510. {
  511. break;
  512. }
  513. size_t length= strlen(buffer);
  514. for (size_t x= 0; x < length ; x++)
  515. {
  516. function.buffer().push_back(buffer[x]);
  517. }
  518. if (args.strip_newline())
  519. {
  520. *ret_ptr= gearman_job_send_data(job, function.buffer_ptr(), function.buffer().size() - 1);
  521. }
  522. else
  523. {
  524. *ret_ptr= gearman_job_send_data(job, function.buffer_ptr(), function.buffer().size());
  525. }
  526. if (*ret_ptr != GEARMAN_SUCCESS)
  527. {
  528. error::message("gearman_job_send_data() failed with", *ret_ptr);
  529. break;
  530. }
  531. }
  532. function.buffer().clear();
  533. fclose(f);
  534. }
  535. else
  536. {
  537. _read_workload(out_fds[0], function.buffer());
  538. if (close(out_fds[0]) < 0)
  539. {
  540. error::perror("close");
  541. }
  542. *result_size= function.buffer().size();
  543. }
  544. if (wait(&status) == -1)
  545. {
  546. error::perror("wait");
  547. }
  548. if (local_wexitstatus(status))
  549. {
  550. if (not function.buffer().empty())
  551. {
  552. *ret_ptr= gearman_job_send_data(job, function.buffer_ptr(), function.buffer().size());
  553. if (*ret_ptr != GEARMAN_SUCCESS)
  554. return NULL;
  555. }
  556. *ret_ptr= GEARMAN_WORK_FAIL;
  557. return NULL;
  558. }
  559. }
  560. return function.buffer_ptr();
  561. }
  562. void _read_workload(int fd, Bytes& workload)
  563. {
  564. while (1)
  565. {
  566. char buffer[1024];
  567. ssize_t read_ret= read(fd, buffer, sizeof(buffer));
  568. if (read_ret == -1)
  569. {
  570. error::perror("read");
  571. }
  572. else if (read_ret == 0)
  573. {
  574. break;
  575. }
  576. workload.reserve(workload.size() + static_cast<size_t>(read_ret));
  577. for (size_t x= 0; x < static_cast<size_t>(read_ret); x++)
  578. {
  579. workload.push_back(buffer[x]);
  580. }
  581. }
  582. }
  583. static void usage(char *name)
  584. {
  585. printf("Client mode: %s [options] [<data>]\n", name);
  586. printf("Worker mode: %s -w [options] [<command> [<args> ...]]\n", name);
  587. printf("\nCommon options to both client and worker modes.\n");
  588. printf("\t-f <function> - Function name to use for jobs (can give many)\n");
  589. printf("\t-h <host> - Job server host\n");
  590. printf("\t-H - Print this help menu\n");
  591. printf("\t-p <port> - Job server port\n");
  592. printf("\t-t <timeout> - Timeout in milliseconds\n");
  593. printf("\t-i <pidfile> - Create a pidfile for the process\n");
  594. printf("\nClient options:\n");
  595. printf("\t-b - Run jobs in the background\n");
  596. printf("\t-I - Run jobs as high priority\n");
  597. printf("\t-L - Run jobs as low priority\n");
  598. printf("\t-n - Run one job per line\n");
  599. printf("\t-N - Same as -n, but strip off the newline\n");
  600. printf("\t-P - Prefix all output lines with functions names\n");
  601. printf("\t-s - Send job without reading from standard input\n");
  602. printf("\t-u <unique> - Unique key to use for job\n");
  603. printf("\nWorker options:\n");
  604. printf("\t-c <count> - Number of jobs for worker to run before exiting\n");
  605. printf("\t-n - Send data packet for each line\n");
  606. printf("\t-N - Same as -n, but strip off the newline\n");
  607. printf("\t-w - Run in worker mode\n");
  608. }