gearmand.cc 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. #include "gear_config.h"
  39. #include "configmake.h"
  40. #include <cerrno>
  41. #include <cstdio>
  42. #include <cstdlib>
  43. #include <cstring>
  44. #include <fcntl.h>
  45. #include <fstream>
  46. #include <pwd.h>
  47. #include <signal.h>
  48. #include <sys/resource.h>
  49. #include <sys/stat.h>
  50. #include <sys/types.h>
  51. #include <syslog.h>
  52. #include <unistd.h>
  53. #include <grp.h>
  54. #include <time.h>
  55. #ifdef HAVE_SYS_TIME_H
  56. #include <sys/time.h>
  57. #endif
  58. #include "libgearman-server/gearmand.h"
  59. #include "libgearman-server/plugins.h"
  60. #include "libgearman-server/queue.hpp"
  61. #define GEARMAND_LOG_REOPEN_TIME 60
  62. #include "util/daemon.hpp"
  63. #include "util/pidfile.hpp"
  64. #include <boost/program_options.hpp>
  65. #include <boost/program_options/options_description.hpp>
  66. #include <boost/program_options/parsers.hpp>
  67. #include <boost/program_options/variables_map.hpp>
  68. #include <boost/token_functions.hpp>
  69. #include <boost/tokenizer.hpp>
  70. #include <iostream>
  71. #include "libtest/cpu.hpp"
  72. #include "gearmand/error.hpp"
  73. #include "gearmand/log.hpp"
  74. #include "libgearman/backtrace.hpp"
  75. using namespace datadifferential;
  76. using namespace gearmand;
  77. static bool _set_fdlimit(rlim_t fds);
  78. static bool _switch_user(const char *user);
  79. extern "C" {
  80. static bool _set_signals(bool core_dump= false);
  81. }
  82. static void _log(const char *line, gearmand_verbose_t verbose, void *context);
  83. int main(int argc, char *argv[])
  84. {
  85. gearmand::error::init(argv[0]);
  86. int backlog;
  87. rlim_t fds= 0;
  88. uint32_t job_retries;
  89. uint32_t worker_wakeup;
  90. std::string host;
  91. std::string user;
  92. std::string log_file;
  93. std::string pid_file;
  94. std::string protocol;
  95. std::string queue_type;
  96. std::string job_handle_prefix;
  97. std::string verbose_string;
  98. std::string config_file;
  99. uint32_t threads;
  100. bool opt_exceptions;
  101. bool opt_round_robin;
  102. bool opt_daemon;
  103. bool opt_check_args;
  104. bool opt_syslog;
  105. bool opt_coredump;
  106. uint32_t hashtable_buckets;
  107. bool opt_keepalive;
  108. int opt_keepalive_idle;
  109. int opt_keepalive_interval;
  110. int opt_keepalive_count;
  111. boost::program_options::options_description general("General options");
  112. general.add_options()
  113. ("backlog,b", boost::program_options::value(&backlog)->default_value(32),
  114. "Number of backlog connections for listen.")
  115. ("daemon,d", boost::program_options::bool_switch(&opt_daemon)->default_value(false),
  116. "Daemon, detach and run in the background.")
  117. ("exceptions", boost::program_options::bool_switch(&opt_exceptions)->default_value(false),
  118. "Enable protocol exceptions by default.")
  119. ("file-descriptors,f", boost::program_options::value(&fds),
  120. "Number of file descriptors to allow for the process (total connections will be slightly less). Default is max allowed for user.")
  121. ("help,h", "Print this help menu.")
  122. ("job-retries,j", boost::program_options::value(&job_retries)->default_value(0),
  123. "Number of attempts to run the job before the job server removes it. This is helpful to ensure a bad job does not crash all available workers. Default is no limit.")
  124. ("job-handle-prefix", boost::program_options::value(&job_handle_prefix),
  125. "Prefix used to generate a job handle string. If not provided, the default \"H:<host_name>\" is used.")
  126. ("hashtable-buckets", boost::program_options::value(&hashtable_buckets)->default_value(GEARMAND_DEFAULT_HASH_SIZE),
  127. "Number of buckets in the internal job hash tables. The default of 991 works well for about three million jobs in queue. If the number of jobs in the queue at any time will exceed three million, use proportionally larger values (991 * # of jobs / 3M). For example, to accommodate 2^32 jobs, use 1733003. This will consume ~26MB of extra memory. Gearmand cannot support more than 2^32 jobs in queue at this time.")
  128. ("keepalive", boost::program_options::bool_switch(&opt_keepalive)->default_value(false),
  129. "Enable keepalive on sockets.")
  130. ("keepalive-idle", boost::program_options::value(&opt_keepalive_idle)->default_value(-1),
  131. "If keepalive is enabled, set the value for TCP_KEEPIDLE for systems that support it. A value of -1 means that either the system does not support it or an error occurred when trying to retrieve the default value.")
  132. ("keepalive-interval", boost::program_options::value(&opt_keepalive_interval)->default_value(-1),
  133. "If keepalive is enabled, set the value for TCP_KEEPINTVL for systems that support it. A value of -1 means that either the system does not support it or an error occurred when trying to retrieve the default value.")
  134. ("keepalive-count", boost::program_options::value(&opt_keepalive_count)->default_value(-1),
  135. "If keepalive is enabled, set the value for TCP_KEEPCNT for systems that support it. A value of -1 means that either the system does not support it or an error occurred when trying to retrieve the default value.")
  136. ("log-file,l", boost::program_options::value(&log_file)->default_value(LOCALSTATEDIR"/log/gearmand.log"),
  137. "Log file to write errors and information to. If the log-file parameter is specified as 'stderr', then output will go to stderr. If 'none', then no logfile will be generated.")
  138. ("listen,L", boost::program_options::value(&host),
  139. "Address the server should listen on. Default is INADDR_ANY.")
  140. ("pid-file,P", boost::program_options::value(&pid_file)->default_value(GEARMAND_PID),
  141. "File to write process ID out to.")
  142. ("protocol,r", boost::program_options::value(&protocol),
  143. "Load protocol module.")
  144. ("round-robin,R", boost::program_options::bool_switch(&opt_round_robin)->default_value(false),
  145. "Assign work in round-robin order per worker connection. The default is to assign work in the order of functions added by the worker.")
  146. ("queue-type,q", boost::program_options::value(&queue_type)->default_value("builtin"),
  147. "Persistent queue type to use.")
  148. ("config-file", boost::program_options::value(&config_file)->default_value(GEARMAND_CONFIG),
  149. "Can be specified with '@name', too")
  150. ("syslog", boost::program_options::bool_switch(&opt_syslog)->default_value(false),
  151. "Use syslog.")
  152. ("coredump", boost::program_options::bool_switch(&opt_coredump)->default_value(false),
  153. "Whether to create a core dump for uncaught signals.")
  154. ("threads,t", boost::program_options::value(&threads)->default_value(4),
  155. "Number of I/O threads to use, 0 means that gearmand will try to guess the maximum number it can use. Default=4.")
  156. ("user,u", boost::program_options::value(&user),
  157. "Switch to given user after startup.")
  158. ("verbose", boost::program_options::value(&verbose_string)->default_value("ERROR"),
  159. "Set verbose level (FATAL, ALERT, CRITICAL, ERROR, WARNING, NOTICE, INFO, DEBUG).")
  160. ("version,V", "Display the version of gearmand and exit.")
  161. ("worker-wakeup,w", boost::program_options::value(&worker_wakeup)->default_value(0),
  162. "Number of workers to wakeup for each job received. The default is to wakeup all available workers.")
  163. ;
  164. boost::program_options::options_description all("Allowed options");
  165. all.add(general);
  166. gearmand::protocol::HTTP http;
  167. all.add(http.command_line_options());
  168. gearmand::protocol::Gear gear;
  169. all.add(gear.command_line_options());
  170. gearmand::plugins::initialize(all);
  171. boost::program_options::positional_options_description positional;
  172. positional.add("provided", -1);
  173. // Now insert all options that we want to make visible to the user
  174. boost::program_options::options_description visible("Allowed options");
  175. visible.add(all);
  176. boost::program_options::options_description hidden("Hidden options");
  177. hidden.add_options()
  178. ("check-args", boost::program_options::bool_switch(&opt_check_args)->default_value(false),
  179. "Check command line and configuration file arguments and then exit.");
  180. all.add(hidden);
  181. boost::program_options::variables_map vm;
  182. try {
  183. // Disable allow_guessing
  184. int style= boost::program_options::command_line_style::default_style ^ boost::program_options::command_line_style::allow_guessing;
  185. boost::program_options::parsed_options parsed= boost::program_options::command_line_parser(argc, argv)
  186. .options(all)
  187. .positional(positional)
  188. .style(style)
  189. .run();
  190. store(parsed, vm);
  191. notify(vm);
  192. if (config_file.empty() == false)
  193. {
  194. // Load the file and tokenize it
  195. std::ifstream ifs(config_file.c_str());
  196. if (ifs)
  197. {
  198. // Read the whole file into a string
  199. std::stringstream ss;
  200. ss << ifs.rdbuf();
  201. // Split the file content
  202. boost::char_separator<char> sep(" \n\r");
  203. std::string sstr= ss.str();
  204. boost::tokenizer<boost::char_separator<char> > tok(sstr, sep);
  205. std::vector<std::string> args;
  206. std::copy(tok.begin(), tok.end(), back_inserter(args));
  207. #if defined(DEBUG) && DEBUG
  208. for (std::vector<std::string>::iterator iter= args.begin();
  209. iter != args.end();
  210. ++iter)
  211. {
  212. std::cerr << *iter << std::endl;
  213. }
  214. #endif
  215. // Parse the file and store the options
  216. store(boost::program_options::command_line_parser(args).options(visible).run(), vm);
  217. }
  218. else if (config_file.compare(GEARMAND_CONFIG))
  219. {
  220. error::message("Could not open configuration file.");
  221. return EXIT_FAILURE;
  222. }
  223. }
  224. notify(vm);
  225. }
  226. catch(boost::program_options::validation_error &e)
  227. {
  228. error::message(e.what());
  229. return EXIT_FAILURE;
  230. }
  231. catch(std::exception &e)
  232. {
  233. if (e.what() and strncmp("-v", e.what(), 2) == 0)
  234. {
  235. error::message("Option -v has been deprecated, please use --verbose");
  236. }
  237. else
  238. {
  239. error::message(e.what());
  240. }
  241. return EXIT_FAILURE;
  242. }
  243. gearmand_verbose_t verbose= GEARMAND_VERBOSE_ERROR;
  244. if (gearmand_verbose_check(verbose_string.c_str(), verbose) == false)
  245. {
  246. error::message("Invalid value for --verbose supplied");
  247. return EXIT_FAILURE;
  248. }
  249. if (hashtable_buckets <= 0)
  250. {
  251. error::message("hashtable-buckets has to be greater than 0");
  252. return EXIT_FAILURE;
  253. }
  254. if (opt_check_args)
  255. {
  256. return EXIT_SUCCESS;
  257. }
  258. if (vm.count("help"))
  259. {
  260. std::cout << visible << std::endl;
  261. return EXIT_SUCCESS;
  262. }
  263. if (vm.count("version"))
  264. {
  265. std::cout << std::endl << "gearmand " << gearmand_version() << " - " << gearmand_bugreport() << std::endl;
  266. return EXIT_SUCCESS;
  267. }
  268. if (fds > 0 and _set_fdlimit(fds))
  269. {
  270. return EXIT_FAILURE;
  271. }
  272. if (not user.empty() and _switch_user(user.c_str()))
  273. {
  274. return EXIT_FAILURE;
  275. }
  276. if (opt_daemon)
  277. {
  278. util::daemonize(false, true);
  279. }
  280. if (_set_signals(opt_coredump))
  281. {
  282. return EXIT_FAILURE;
  283. }
  284. util::Pidfile _pid_file(pid_file);
  285. if (_pid_file.create() == false and pid_file.compare(GEARMAND_PID))
  286. {
  287. error::perror(_pid_file.error_message().c_str());
  288. return EXIT_FAILURE;
  289. }
  290. gearmand::gearmand_log_info_st log_info(log_file, opt_syslog);
  291. if (log_info.initialized() == false)
  292. {
  293. return EXIT_FAILURE;
  294. }
  295. if (threads == 0)
  296. {
  297. uint32_t number_of_threads= libtest::number_of_cpus();
  298. if (number_of_threads > 4)
  299. {
  300. threads= number_of_threads;
  301. }
  302. }
  303. gearmand_config_st *gearmand_config= gearmand_config_create();
  304. if (gearmand_config == NULL)
  305. {
  306. return EXIT_FAILURE;
  307. }
  308. gearmand_config_sockopt_keepalive(gearmand_config, opt_keepalive);
  309. gearmand_config_sockopt_keepalive_count(gearmand_config, opt_keepalive_count);
  310. gearmand_config_sockopt_keepalive_idle(gearmand_config, opt_keepalive_idle);
  311. gearmand_config_sockopt_keepalive_interval(gearmand_config, opt_keepalive_interval);
  312. gearmand_st *_gearmand= gearmand_create(gearmand_config,
  313. host.empty() ? NULL : host.c_str(),
  314. threads, backlog,
  315. static_cast<uint8_t>(job_retries),
  316. job_handle_prefix.empty() ? NULL : job_handle_prefix.c_str(),
  317. static_cast<uint8_t>(worker_wakeup),
  318. _log, &log_info, verbose,
  319. opt_round_robin, opt_exceptions,
  320. hashtable_buckets);
  321. if (_gearmand == NULL)
  322. {
  323. error::message("Could not create gearmand library instance.");
  324. return EXIT_FAILURE;
  325. }
  326. gearmand_config_free(gearmand_config);
  327. assert(queue_type.size());
  328. if (queue_type.empty() == false)
  329. {
  330. gearmand_error_t rc;
  331. if ((rc= gearmand::queue::initialize(_gearmand, queue_type.c_str())) != GEARMAND_SUCCESS)
  332. {
  333. error::message("Error while initializing the queue", queue_type.c_str());
  334. gearmand_free(_gearmand);
  335. return EXIT_FAILURE;
  336. }
  337. }
  338. if (gear.start(_gearmand) != GEARMAND_SUCCESS)
  339. {
  340. error::message("Error while enabling Gear protocol module");
  341. gearmand_free(_gearmand);
  342. return EXIT_FAILURE;
  343. }
  344. if (protocol.compare("http") == 0)
  345. {
  346. if (http.start(_gearmand) != GEARMAND_SUCCESS)
  347. {
  348. error::message("Error while enabling protocol module", protocol.c_str());
  349. gearmand_free(_gearmand);
  350. return EXIT_FAILURE;
  351. }
  352. }
  353. else if (protocol.empty() == false)
  354. {
  355. error::message("Unknown protocol module", protocol.c_str());
  356. gearmand_free(_gearmand);
  357. return EXIT_FAILURE;
  358. }
  359. if (opt_daemon)
  360. {
  361. if (util::daemon_is_ready(true) == false)
  362. {
  363. return EXIT_FAILURE;
  364. }
  365. }
  366. gearmand_error_t ret= gearmand_run(_gearmand);
  367. gearmand_free(_gearmand);
  368. _gearmand= NULL;
  369. return (ret == GEARMAND_SUCCESS || ret == GEARMAND_SHUTDOWN) ? 0 : 1;
  370. }
  371. static bool _set_fdlimit(rlim_t fds)
  372. {
  373. struct rlimit rl;
  374. if (getrlimit(RLIMIT_NOFILE, &rl) == -1)
  375. {
  376. error::perror("Could not get file descriptor limit");
  377. return true;
  378. }
  379. rl.rlim_cur= fds;
  380. if (rl.rlim_max < rl.rlim_cur)
  381. {
  382. rl.rlim_max= rl.rlim_cur;
  383. }
  384. if (setrlimit(RLIMIT_NOFILE, &rl) == -1)
  385. {
  386. error::perror("Failed to set limit for the number of file "
  387. "descriptors. Try running as root or giving a "
  388. "smaller value to -f.");
  389. return true;
  390. }
  391. return false;
  392. }
  393. static bool _switch_user(const char *user)
  394. {
  395. if (getuid() == 0 or geteuid() == 0)
  396. {
  397. struct passwd *pw= getpwnam(user);
  398. if (not pw)
  399. {
  400. error::message("Could not find user", user);
  401. return EXIT_FAILURE;
  402. }
  403. if (setgroups(0, NULL) == -1 ||
  404. setgid(pw->pw_gid) == -1 ||
  405. setuid(pw->pw_uid) == -1)
  406. {
  407. error::message("Could not switch to user", user);
  408. return EXIT_FAILURE;
  409. }
  410. }
  411. else
  412. {
  413. error::message("Must be root to switch users.");
  414. return true;
  415. }
  416. return false;
  417. }
  418. extern "C" void _shutdown_handler(int signal_, siginfo_t*, void*)
  419. {
  420. if (signal_== SIGUSR1)
  421. {
  422. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
  423. }
  424. else
  425. {
  426. gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
  427. }
  428. }
  429. extern "C" void _reset_log_handler(int, siginfo_t*, void*) // signal_arg
  430. {
  431. gearmand_log_info_st *log_info= static_cast<gearmand_log_info_st *>(Gearmand()->log_context);
  432. log_info->write(GEARMAND_VERBOSE_NOTICE, "SIGHUP, reopening log file");
  433. log_info->reset();
  434. }
  435. static bool segfaulted= false;
  436. extern "C" void _crash_handler(int signal_, siginfo_t*, void*)
  437. {
  438. if (segfaulted)
  439. {
  440. error::message("\nFatal crash while backtrace from signal:", strsignal(signal_));
  441. _exit(EXIT_FAILURE); /* Quit without running destructors */
  442. }
  443. segfaulted= true;
  444. custom_backtrace();
  445. _exit(EXIT_FAILURE); /* Quit without running destructors */
  446. }
  447. extern "C" {
  448. static bool _set_signals(bool core_dump)
  449. {
  450. struct sigaction sa;
  451. memset(&sa, 0, sizeof(struct sigaction));
  452. sa.sa_handler= SIG_IGN;
  453. if (sigemptyset(&sa.sa_mask) == -1 or
  454. sigaction(SIGPIPE, &sa, 0) == -1)
  455. {
  456. error::perror("Could not set SIGPIPE handler.");
  457. return true;
  458. }
  459. sa.sa_sigaction= _shutdown_handler;
  460. sa.sa_flags= SA_SIGINFO;
  461. if (sigaction(SIGTERM, &sa, 0) == -1)
  462. {
  463. error::perror("Could not set SIGTERM handler.");
  464. return true;
  465. }
  466. if (sigaction(SIGINT, &sa, 0) == -1)
  467. {
  468. error::perror("Could not set SIGINT handler.");
  469. return true;
  470. }
  471. if (sigaction(SIGUSR1, &sa, 0) == -1)
  472. {
  473. error::perror("Could not set SIGUSR1 handler.");
  474. return true;
  475. }
  476. sa.sa_sigaction= _reset_log_handler;
  477. if (sigaction(SIGHUP, &sa, 0) == -1)
  478. {
  479. error::perror("Could not set SIGHUP handler.");
  480. return true;
  481. }
  482. bool in_gdb_libtest= bool(getenv("LIBTEST_IN_GDB"));
  483. if ((in_gdb_libtest == false) and (core_dump == false))
  484. {
  485. sa.sa_sigaction= _crash_handler;
  486. if (sigaction(SIGSEGV, &sa, NULL) == -1)
  487. {
  488. error::perror("Could not set SIGSEGV handler.");
  489. return true;
  490. }
  491. if (sigaction(SIGABRT, &sa, NULL) == -1)
  492. {
  493. error::perror("Could not set SIGABRT handler.");
  494. return true;
  495. }
  496. #ifdef SIGBUS
  497. if (sigaction(SIGBUS, &sa, NULL) == -1)
  498. {
  499. error::perror("Could not set SIGBUS handler.");
  500. return true;
  501. }
  502. #endif
  503. if (sigaction(SIGILL, &sa, NULL) == -1)
  504. {
  505. error::perror("Could not set SIGILL handler.");
  506. return true;
  507. }
  508. if (sigaction(SIGFPE, &sa, NULL) == -1)
  509. {
  510. error::perror("Could not set SIGFPE handler.");
  511. return true;
  512. }
  513. }
  514. return false;
  515. }
  516. }
  517. static void _log(const char *mesg, gearmand_verbose_t verbose, void *context)
  518. {
  519. gearmand_log_info_st *log_info= static_cast<gearmand_log_info_st *>(context);
  520. log_info->write(verbose, mesg);
  521. }