blobslap_worker.cc 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Blob slap worker utility
  41. */
  42. #include "gear_config.h"
  43. #include <benchmark/benchmark.h>
  44. #include <boost/program_options.hpp>
  45. #include <cerrno>
  46. #include <cstdio>
  47. #include <climits>
  48. #include <iostream>
  49. #include <vector>
  50. #include "util/daemon.hpp"
  51. #include "util/logfile.hpp"
  52. #include "util/pidfile.hpp"
  53. #include "util/signal.hpp"
  54. #include "util/string.hpp"
  55. using namespace datadifferential;
  56. static void *worker_fn(gearman_job_st *job, void *context,
  57. size_t *result_size, gearman_return_t *ret_ptr);
  58. static gearman_return_t shutdown_fn(gearman_job_st*, void* /* context */)
  59. {
  60. return GEARMAN_SHUTDOWN;
  61. }
  62. static gearman_return_t ping_fn(gearman_job_st*, void* /* context */)
  63. {
  64. return GEARMAN_SUCCESS;
  65. }
  66. int main(int args, char *argv[])
  67. {
  68. gearman_benchmark_st benchmark;
  69. bool opt_daemon;
  70. bool opt_chunk;
  71. bool opt_status;
  72. bool opt_unique;
  73. std::string pid_file;
  74. std::string log_file;
  75. int32_t timeout;
  76. uint32_t count= UINT_MAX;
  77. in_port_t port;
  78. std::string host;
  79. std::string identifier;
  80. std::vector<std::string> functions;
  81. std::string verbose_string;
  82. boost::program_options::options_description desc("Options");
  83. desc.add_options()
  84. ("help", "Options related to the program.")
  85. ("host,h", boost::program_options::value<std::string>(&host)->default_value("localhost"),"Connect to the host")
  86. ("identifier", boost::program_options::value<std::string>(&identifier)->default_value("blobslap_worker"), "Worker identifier")
  87. ("port,p", boost::program_options::value<in_port_t>(&port)->default_value(GEARMAN_DEFAULT_TCP_PORT), "Port number use for connection")
  88. ("count,c", boost::program_options::value<uint32_t>(&count)->default_value(0), "Number of jobs to run before exiting")
  89. ("timeout,u", boost::program_options::value<int32_t>(&timeout)->default_value(-1), "Timeout in milliseconds")
  90. ("chunk", boost::program_options::bool_switch(&opt_chunk)->default_value(false), "Send result back in data chunks")
  91. ("status,s", boost::program_options::bool_switch(&opt_status)->default_value(false), "Send status updates and sleep while running job")
  92. ("unique,u", boost::program_options::bool_switch(&opt_unique)->default_value(false), "When grabbing jobs, grab the uniqie id")
  93. ("daemon,d", boost::program_options::bool_switch(&opt_daemon)->default_value(false), "Daemonize")
  94. ("function,f", boost::program_options::value(&functions), "Function to use.")
  95. ("verbose,v", boost::program_options::value(&verbose_string)->default_value("v"), "Increase verbosity level by one.")
  96. ("pid-file", boost::program_options::value(&pid_file), "File to write process ID out to.")
  97. ("log-file", boost::program_options::value(&log_file), "Create a log file.")
  98. ;
  99. boost::program_options::variables_map vm;
  100. try
  101. {
  102. boost::program_options::store(boost::program_options::parse_command_line(args, argv, desc), vm);
  103. boost::program_options::notify(vm);
  104. }
  105. catch(std::exception &e)
  106. {
  107. std::cout << e.what() << std::endl;
  108. return EXIT_FAILURE;
  109. }
  110. if (vm.count("help"))
  111. {
  112. std::cout << desc << std::endl;
  113. return EXIT_SUCCESS;
  114. }
  115. if (opt_daemon)
  116. {
  117. util::daemonize(false, true);
  118. }
  119. util::Pidfile _pid_file(pid_file);
  120. if (pid_file.empty() == false)
  121. {
  122. if (_pid_file.create() == false)
  123. {
  124. std::cerr << _pid_file.error_message().c_str();
  125. return EXIT_FAILURE;
  126. }
  127. }
  128. if (not log_file.empty())
  129. {
  130. FILE *file= fopen(log_file.c_str(), "w+");
  131. if (file == NULL)
  132. {
  133. std::cerr << "Unable to open:" << log_file << "(" << strerror(errno) << ")" << std::endl;
  134. return EXIT_FAILURE;
  135. }
  136. fclose(file);
  137. // We let the error from this happen later (if one was to occur)
  138. unlink(log_file.c_str());
  139. }
  140. gearman_worker_st *worker;
  141. if (not (worker= gearman_worker_create(NULL)))
  142. {
  143. std::cerr << "Failed to allocate worker" << std::endl;
  144. return EXIT_FAILURE;
  145. }
  146. if (getenv("GEARMAN_SERVERS") == NULL)
  147. {
  148. if (gearman_failed(gearman_worker_add_server(worker, host.c_str(), port)))
  149. {
  150. std::cerr << "Failed while adding server " << host << ":" << port << " :" << gearman_worker_error(worker) << std::endl;
  151. return EXIT_FAILURE;
  152. }
  153. }
  154. if (identifier.size())
  155. {
  156. gearman_worker_set_identifier(worker, identifier.c_str(), identifier.size());
  157. }
  158. benchmark.verbose= static_cast<uint8_t>(verbose_string.length());
  159. if (opt_daemon)
  160. {
  161. util::daemon_is_ready(benchmark.verbose == 0);
  162. }
  163. util::SignalThread signal(true);
  164. util::Logfile log(log_file);
  165. if (not log.open())
  166. {
  167. std::cerr << "Could not open logfile:" << log_file << std::endl;
  168. return EXIT_FAILURE;
  169. }
  170. if (not signal.setup())
  171. {
  172. log.log() << "Failed signal.setup()" << std::endl;
  173. return EXIT_FAILURE;
  174. }
  175. gearman_function_t shutdown_function= gearman_function_create(shutdown_fn);
  176. if (gearman_failed(gearman_worker_define_function(worker,
  177. util_literal_param("shutdown"),
  178. shutdown_function,
  179. 0, 0)))
  180. {
  181. log.log() << "Failed to add shutdown function: " << gearman_worker_error(worker) << std::endl;
  182. return EXIT_FAILURE;
  183. }
  184. gearman_function_t ping_function= gearman_function_create(ping_fn);
  185. if (gearman_failed(gearman_worker_define_function(worker,
  186. util_literal_param("blobslap_worker_ping"),
  187. ping_function,
  188. 0, 0)))
  189. {
  190. log.log() << "Failed to add blobslap_worker_ping function: " << gearman_worker_error(worker) << std::endl;
  191. return EXIT_FAILURE;
  192. }
  193. if (functions.empty() == false)
  194. {
  195. for (std::vector<std::string>::iterator iter= functions.begin(); iter != functions.end(); ++iter)
  196. {
  197. if (gearman_failed(gearman_worker_add_function(worker,
  198. (*iter).c_str(), 0,
  199. worker_fn, &benchmark)))
  200. {
  201. log.log() << "Failed to add default function: " << gearman_worker_error(worker) << std::endl;
  202. return EXIT_FAILURE;
  203. }
  204. }
  205. }
  206. else
  207. {
  208. if (gearman_failed(gearman_worker_add_function(worker,
  209. GEARMAN_BENCHMARK_DEFAULT_FUNCTION, 0,
  210. worker_fn, &benchmark)))
  211. {
  212. log.log() << "Failed to add default function: " << gearman_worker_error(worker) << std::endl;
  213. return EXIT_FAILURE;
  214. }
  215. }
  216. gearman_worker_set_timeout(worker, timeout);
  217. do
  218. {
  219. gearman_return_t rc= gearman_worker_work(worker);
  220. if (rc == GEARMAN_SHUTDOWN)
  221. {
  222. if (benchmark.verbose > 0)
  223. {
  224. log.log() << "shutdown" << std::endl;
  225. }
  226. break;
  227. }
  228. else if (gearman_failed(rc))
  229. {
  230. log.log() << "gearman_worker_work(): " << gearman_worker_error(worker) << std::endl;
  231. break;
  232. }
  233. count--;
  234. } while(count and (not signal.is_shutdown()));
  235. gearman_worker_free(worker);
  236. return EXIT_SUCCESS;
  237. }
  238. static void *worker_fn(gearman_job_st *job, void *context,
  239. size_t *, gearman_return_t *ret_ptr)
  240. {
  241. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(context);
  242. if (benchmark->verbose > 0)
  243. {
  244. benchmark_check_time(benchmark);
  245. }
  246. if (benchmark->verbose > 1)
  247. {
  248. std::cout << "Job=" << gearman_job_handle(job) << " (" << gearman_job_workload_size(job) << ")" << std::endl;
  249. }
  250. *ret_ptr= GEARMAN_SUCCESS;
  251. return NULL;
  252. }