blobslap_worker.cc 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Blob slap worker utility
  41. */
  42. #include <config.h>
  43. #include <benchmark/benchmark.h>
  44. #include <boost/program_options.hpp>
  45. #include <cerrno>
  46. #include <cstdio>
  47. #include <climits>
  48. #include <iostream>
  49. #include <vector>
  50. #include "util/daemon.hpp"
  51. #include "util/logfile.hpp"
  52. #include "util/pidfile.hpp"
  53. #include "util/signal.hpp"
  54. #include "util/string.hpp"
  55. using namespace datadifferential;
  56. static void *worker_fn(gearman_job_st *job, void *context,
  57. size_t *result_size, gearman_return_t *ret_ptr);
  58. static gearman_return_t shutdown_fn(gearman_job_st*, void* /* context */)
  59. {
  60. return GEARMAN_SHUTDOWN;
  61. }
  62. static gearman_return_t ping_fn(gearman_job_st*, void* /* context */)
  63. {
  64. return GEARMAN_SUCCESS;
  65. }
  66. int main(int args, char *argv[])
  67. {
  68. gearman_benchmark_st benchmark;
  69. bool opt_daemon;
  70. bool opt_chunk;
  71. bool opt_status;
  72. bool opt_unique;
  73. std::string pid_file;
  74. std::string log_file;
  75. int32_t timeout;
  76. uint32_t count= UINT_MAX;
  77. in_port_t port;
  78. std::string host;
  79. std::vector<std::string>* functions= NULL;
  80. std::string verbose_string;
  81. boost::program_options::options_description desc("Options");
  82. desc.add_options()
  83. ("help", "Options related to the program.")
  84. ("host,h", boost::program_options::value<std::string>(&host)->default_value("localhost"),"Connect to the host")
  85. ("port,p", boost::program_options::value<in_port_t>(&port)->default_value(GEARMAN_DEFAULT_TCP_PORT), "Port number use for connection")
  86. ("count,c", boost::program_options::value<uint32_t>(&count)->default_value(0), "Number of jobs to run before exiting")
  87. ("timeout,u", boost::program_options::value<int32_t>(&timeout)->default_value(-1), "Timeout in milliseconds")
  88. ("chunk", boost::program_options::bool_switch(&opt_chunk)->default_value(false), "Send result back in data chunks")
  89. ("status,s", boost::program_options::bool_switch(&opt_status)->default_value(false), "Send status updates and sleep while running job")
  90. ("unique,u", boost::program_options::bool_switch(&opt_unique)->default_value(false), "When grabbing jobs, grab the uniqie id")
  91. ("daemon,d", boost::program_options::bool_switch(&opt_daemon)->default_value(false), "Daemonize")
  92. ("function,f", boost::program_options::value(functions), "Function to use.")
  93. ("verbose,v", boost::program_options::value(&verbose_string)->default_value("v"), "Increase verbosity level by one.")
  94. ("pid-file", boost::program_options::value(&pid_file), "File to write process ID out to.")
  95. ("log-file", boost::program_options::value(&log_file), "Create a log file.")
  96. ;
  97. boost::program_options::variables_map vm;
  98. try
  99. {
  100. boost::program_options::store(boost::program_options::parse_command_line(args, argv, desc), vm);
  101. boost::program_options::notify(vm);
  102. }
  103. catch(std::exception &e)
  104. {
  105. std::cout << e.what() << std::endl;
  106. return EXIT_FAILURE;
  107. }
  108. if (vm.count("help"))
  109. {
  110. std::cout << desc << std::endl;
  111. return EXIT_SUCCESS;
  112. }
  113. if (opt_daemon)
  114. {
  115. util::daemonize(false, true);
  116. }
  117. if (not pid_file.empty())
  118. {
  119. if (access(pid_file.c_str(), F_OK) == 0)
  120. {
  121. std::cerr << "pid_file already exists:" << pid_file << std::endl;
  122. return EXIT_FAILURE;
  123. }
  124. FILE *file= fopen(pid_file.c_str(), "w+");
  125. if (file == NULL)
  126. {
  127. std::cerr << "Unable to open:" << pid_file << "(" << strerror(errno) << ")" << std::endl;
  128. return EXIT_FAILURE;
  129. }
  130. fclose(file);
  131. // We let the error from this happen later (if one was to occur)
  132. unlink(pid_file.c_str());
  133. }
  134. if (not log_file.empty())
  135. {
  136. FILE *file= fopen(log_file.c_str(), "w+");
  137. if (file == NULL)
  138. {
  139. std::cerr << "Unable to open:" << log_file << "(" << strerror(errno) << ")" << std::endl;
  140. return EXIT_FAILURE;
  141. }
  142. fclose(file);
  143. // We let the error from this happen later (if one was to occur)
  144. unlink(log_file.c_str());
  145. }
  146. gearman_worker_st *worker;
  147. if (not (worker= gearman_worker_create(NULL)))
  148. {
  149. std::cerr << "Failed to allocate worker" << std::endl;
  150. return EXIT_FAILURE;
  151. }
  152. if (gearman_failed(gearman_worker_add_server(worker, host.c_str(), port)))
  153. {
  154. std::cerr << "Failed while adding server " << host << ":" << port << " :" << gearman_worker_error(worker) << std::endl;
  155. return EXIT_FAILURE;
  156. }
  157. benchmark.verbose= static_cast<uint8_t>(verbose_string.length());
  158. if (opt_daemon)
  159. {
  160. util::daemon_is_ready(benchmark.verbose == 0);
  161. }
  162. util::SignalThread signal(true);
  163. util::Logfile log(log_file);
  164. if (not log.open())
  165. {
  166. std::cerr << "Could not open logfile:" << log_file << std::endl;
  167. return EXIT_FAILURE;
  168. }
  169. if (not signal.setup())
  170. {
  171. log.log() << "Failed signal.setup()" << std::endl;
  172. return EXIT_FAILURE;
  173. }
  174. util::Pidfile _pid_file(pid_file);
  175. if (not _pid_file.create())
  176. {
  177. log.log() << _pid_file.error_message() << std::endl;
  178. return EXIT_FAILURE;
  179. }
  180. gearman_function_t shutdown_function= gearman_function_create(shutdown_fn);
  181. if (gearman_failed(gearman_worker_define_function(worker,
  182. util_literal_param("shutdown"),
  183. shutdown_function,
  184. 0, 0)))
  185. {
  186. log.log() << "Failed to add shutdown function: " << gearman_worker_error(worker) << std::endl;
  187. return EXIT_FAILURE;
  188. }
  189. gearman_function_t ping_function= gearman_function_create(ping_fn);
  190. if (gearman_failed(gearman_worker_define_function(worker,
  191. util_literal_param("blobslap_worker_ping"),
  192. ping_function,
  193. 0, 0)))
  194. {
  195. log.log() << "Failed to add blobslap_worker_ping function: " << gearman_worker_error(worker) << std::endl;
  196. return EXIT_FAILURE;
  197. }
  198. if (functions and functions->size())
  199. {
  200. for (std::vector<std::string>::iterator iter= functions->begin(); iter != functions->end(); iter++)
  201. {
  202. if (gearman_failed(gearman_worker_add_function(worker,
  203. (*iter).c_str(), 0,
  204. worker_fn, &benchmark)))
  205. {
  206. log.log() << "Failed to add default function: " << gearman_worker_error(worker) << std::endl;
  207. return EXIT_FAILURE;
  208. }
  209. }
  210. }
  211. else
  212. {
  213. if (gearman_failed(gearman_worker_add_function(worker,
  214. GEARMAN_BENCHMARK_DEFAULT_FUNCTION, 0,
  215. worker_fn, &benchmark)))
  216. {
  217. log.log() << "Failed to add default function: " << gearman_worker_error(worker) << std::endl;
  218. return EXIT_FAILURE;
  219. }
  220. }
  221. gearman_worker_set_timeout(worker, timeout);
  222. do
  223. {
  224. gearman_return_t rc= gearman_worker_work(worker);
  225. if (rc == GEARMAN_SHUTDOWN)
  226. {
  227. if (benchmark.verbose > 0)
  228. {
  229. log.log() << "shutdown" << std::endl;
  230. }
  231. break;
  232. }
  233. else if (gearman_failed(rc))
  234. {
  235. log.log() << "gearman_worker_work(): " << gearman_worker_error(worker) << std::endl;
  236. break;
  237. }
  238. count--;
  239. } while(count and (not signal.is_shutdown()));
  240. gearman_worker_free(worker);
  241. return EXIT_SUCCESS;
  242. }
  243. static void *worker_fn(gearman_job_st *job, void *context,
  244. size_t *, gearman_return_t *ret_ptr)
  245. {
  246. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(context);
  247. if (benchmark->verbose > 0)
  248. {
  249. benchmark_check_time(benchmark);
  250. }
  251. if (benchmark->verbose > 1)
  252. {
  253. std::cout << "Job=%s (" << gearman_job_workload_size(job) << ")" << std::endl;
  254. }
  255. *ret_ptr= GEARMAN_SUCCESS;
  256. return NULL;
  257. }