blobslap_client.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Blob slap client utility
  41. */
  42. #include "gear_config.h"
  43. #include <benchmark/benchmark.h>
  44. #include <iostream>
  45. #include <cstdlib>
  46. #include <cstring>
  47. #include <cstdio>
  48. #include <unistd.h>
  49. #define BLOBSLAP_DEFAULT_NUM_TASKS 10
  50. #define BLOBSLAP_DEFAULT_BLOB_MIN_SIZE 0
  51. #define BLOBSLAP_DEFAULT_BLOB_MAX_SIZE 1024
  52. #define BLOBSLAP_BUFFER_SIZE 8192
  53. static gearman_return_t _created(gearman_task_st *task);
  54. static gearman_return_t _data(gearman_task_st *task);
  55. static gearman_return_t _status(gearman_task_st *task);
  56. static gearman_return_t _complete(gearman_task_st *task);
  57. static gearman_return_t _fail(gearman_task_st *task);
  58. static void _usage(char *name);
  59. int main(int argc, char *argv[])
  60. {
  61. gearman_benchmark_st benchmark;
  62. int c;
  63. char *host= NULL;
  64. in_port_t port= 0;
  65. const char *function= GEARMAN_BENCHMARK_DEFAULT_FUNCTION;
  66. uint32_t num_tasks= BLOBSLAP_DEFAULT_NUM_TASKS;
  67. size_t min_size= BLOBSLAP_DEFAULT_BLOB_MIN_SIZE;
  68. size_t max_size= BLOBSLAP_DEFAULT_BLOB_MAX_SIZE;
  69. unsigned long int count= 1;
  70. gearman_client_st client;
  71. bool shutdown_worker= false;
  72. if (not gearman_client_create(&client))
  73. {
  74. std::cerr << "Failed to allocate memory for client" << std::endl;
  75. return EXIT_FAILURE;
  76. }
  77. gearman_client_add_options(&client, GEARMAN_CLIENT_UNBUFFERED_RESULT);
  78. while ((c= getopt(argc, argv, "bc:f:h:m:M:n:p:s:ve?")) != -1)
  79. {
  80. switch(c)
  81. {
  82. case 'b':
  83. benchmark.background= true;
  84. break;
  85. case 'c':
  86. count= strtoul(optarg, NULL, 10);
  87. break;
  88. case 'f':
  89. function= optarg;
  90. break;
  91. case 'h':
  92. {
  93. if (gearman_failed(gearman_client_add_server(&client, host, port)))
  94. {
  95. std::cerr << "Failed while adding server " << host << ":" << port << " :" << gearman_client_error(&client) << std::endl;
  96. exit(EXIT_FAILURE);
  97. }
  98. }
  99. break;
  100. case 'm':
  101. min_size= static_cast<size_t>(strtoul(optarg, NULL, 10));
  102. break;
  103. case 'M':
  104. max_size= static_cast<size_t>(strtoul(optarg, NULL, 10));
  105. break;
  106. case 'n':
  107. num_tasks= uint32_t(strtoul(optarg, NULL, 10));
  108. break;
  109. case 'p':
  110. port= in_port_t(atoi(optarg));
  111. break;
  112. case 's':
  113. srand(uint32_t(atoi(optarg)));
  114. break;
  115. case 'e':
  116. shutdown_worker= true;
  117. break;
  118. case 'v':
  119. benchmark.verbose++;
  120. break;
  121. case '?':
  122. gearman_client_free(&client);
  123. _usage(argv[0]);
  124. exit(EXIT_SUCCESS);
  125. break;
  126. default:
  127. gearman_client_free(&client);
  128. _usage(argv[0]);
  129. exit(EXIT_FAILURE);
  130. }
  131. }
  132. if (not host)
  133. {
  134. if (gearman_failed(gearman_client_add_server(&client, NULL, port)))
  135. {
  136. std::cerr << "Failing to add localhost:" << port << " :" << gearman_client_error(&client) << std::endl;
  137. exit(EXIT_FAILURE);
  138. }
  139. }
  140. if (min_size > max_size)
  141. {
  142. std::cerr << "Min data size must be smaller than max data size" << std::endl;
  143. exit(EXIT_FAILURE);
  144. }
  145. if (num_tasks == 0)
  146. {
  147. std::cerr << "Number of tasks must be larger than zero\n" << std::endl;
  148. exit(EXIT_FAILURE);
  149. }
  150. gearman_task_st *tasks= new gearman_task_st[num_tasks];
  151. if (not tasks)
  152. {
  153. std::cerr << "Failed to allocate " << num_tasks << " tasks" << std::endl;
  154. exit(EXIT_FAILURE);
  155. }
  156. char *blob= new char[max_size];
  157. if (not blob)
  158. {
  159. std::cerr << "Failed to allocate blob with length of " << max_size << std::endl;
  160. exit(EXIT_FAILURE);
  161. }
  162. memset(blob, 'x', max_size);
  163. bool error= false;
  164. do
  165. {
  166. for (uint32_t x= 0; x < num_tasks; x++)
  167. {
  168. size_t blob_size;
  169. if (min_size == max_size)
  170. {
  171. blob_size= max_size;
  172. }
  173. else
  174. {
  175. blob_size= size_t(rand());
  176. if (max_size > RAND_MAX)
  177. blob_size*= size_t(rand()) + 1;
  178. blob_size= (blob_size % (max_size - min_size)) + min_size;
  179. }
  180. const char *blob_ptr= blob_size ? blob : NULL;
  181. gearman_return_t ret;
  182. if (benchmark.background)
  183. {
  184. (void)gearman_client_add_task_background(&client, &(tasks[x]),
  185. &benchmark, function, NULL,
  186. blob_ptr, blob_size, &ret);
  187. }
  188. else
  189. {
  190. (void)gearman_client_add_task(&client, &(tasks[x]), &benchmark,
  191. function, NULL, blob_ptr, blob_size,
  192. &ret);
  193. }
  194. if (gearman_failed(ret))
  195. {
  196. if (ret == GEARMAN_LOST_CONNECTION)
  197. continue;
  198. if (benchmark.background)
  199. {
  200. std::cerr << "Task #" << x << " failed during gearman_client_add_task_background(" << gearman_strerror(ret) << " -> " << gearman_client_error(&client) << std::endl ;
  201. }
  202. else
  203. {
  204. std::cerr << "Task #" << x << " failed during gearman_client_add_task(" << gearman_strerror(ret) << " -> " << gearman_client_error(&client) << std::endl ;
  205. }
  206. error= true;
  207. goto exit_immediatly;
  208. }
  209. }
  210. gearman_client_set_created_fn(&client, _created);
  211. gearman_client_set_data_fn(&client, _data);
  212. gearman_client_set_status_fn(&client, _status);
  213. gearman_client_set_complete_fn(&client, _complete);
  214. gearman_client_set_fail_fn(&client, _fail);
  215. gearman_client_set_timeout(&client, 1000);
  216. gearman_return_t ret;
  217. do {
  218. ret= gearman_client_run_tasks(&client);
  219. } while (gearman_continue(ret));
  220. if (ret == GEARMAN_TIMEOUT)
  221. {
  222. error= true;
  223. }
  224. else if (gearman_failed(ret) and ret != GEARMAN_LOST_CONNECTION)
  225. {
  226. std::cerr << "gearman_client_run_tasks(" << gearman_strerror(ret) << ") -> " << gearman_client_error(&client);
  227. for (uint32_t x= 0; x < num_tasks; x++)
  228. {
  229. if (gearman_task_error(&tasks[x]))
  230. {
  231. std::cerr << "\t Task #" << x << " failed with " << gearman_task_error(&tasks[x]) << std::endl;
  232. }
  233. }
  234. error= true;
  235. }
  236. for (uint32_t x= 0; x < num_tasks; x++)
  237. {
  238. gearman_task_free(&(tasks[x]));
  239. }
  240. count--;
  241. } while (count or error);
  242. exit_immediatly:
  243. if (shutdown_worker)
  244. {
  245. gearman_client_do(&client, "shutdown", 0, 0, 0, 0, 0);
  246. }
  247. delete [] blob;
  248. delete [] tasks;
  249. gearman_client_free(&client);
  250. if (benchmark.verbose)
  251. std::cout << "Successfully completed all tasks" << std::endl;
  252. return error ? EXIT_FAILURE : 0;
  253. }
  254. static gearman_return_t _created(gearman_task_st *task)
  255. {
  256. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(gearman_task_context(task));
  257. if (benchmark->background && benchmark->verbose > 0)
  258. benchmark_check_time(benchmark);
  259. if (benchmark->verbose > 2)
  260. {
  261. std::cout << "Created: " << gearman_task_job_handle(task) << std::endl;
  262. }
  263. return GEARMAN_SUCCESS;
  264. }
  265. static gearman_return_t _status(gearman_task_st *task)
  266. {
  267. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(gearman_task_context(task));
  268. if (benchmark->verbose > 2)
  269. {
  270. std::cout << "Status " << gearman_task_job_handle(task) << " " << gearman_task_numerator(task) << " " << gearman_task_denominator(task) << std::endl;
  271. }
  272. return GEARMAN_SUCCESS;
  273. }
  274. static gearman_return_t _data(gearman_task_st *task)
  275. {
  276. char buffer[BLOBSLAP_BUFFER_SIZE];
  277. gearman_return_t ret;
  278. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(gearman_task_context(task));
  279. while (1)
  280. {
  281. size_t size= gearman_task_recv_data(task, buffer, BLOBSLAP_BUFFER_SIZE, &ret);
  282. if (gearman_failed(GEARMAN_SUCCESS))
  283. {
  284. return ret;
  285. }
  286. if (size == 0)
  287. {
  288. break;
  289. }
  290. }
  291. if (benchmark->verbose > 2)
  292. {
  293. std::cerr << "Data: " << gearman_task_job_handle(task) << " " << gearman_task_data_size(task) << std::endl;
  294. }
  295. return GEARMAN_SUCCESS;
  296. }
  297. static gearman_return_t _complete(gearman_task_st *task)
  298. {
  299. char buffer[BLOBSLAP_BUFFER_SIZE];
  300. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(gearman_task_context(task));
  301. while (1)
  302. {
  303. gearman_return_t ret;
  304. size_t size= gearman_task_recv_data(task, buffer, BLOBSLAP_BUFFER_SIZE, &ret);
  305. if (gearman_failed(ret))
  306. {
  307. return ret;
  308. }
  309. if (size == 0)
  310. {
  311. break;
  312. }
  313. }
  314. if (benchmark->verbose > 0)
  315. {
  316. benchmark_check_time(benchmark);
  317. }
  318. if (benchmark->verbose > 1)
  319. {
  320. std::cout << "Completed: " << gearman_task_job_handle(task) << " " << gearman_task_data_size(task) << std::endl;
  321. }
  322. return GEARMAN_SUCCESS;
  323. }
  324. static gearman_return_t _fail(gearman_task_st *task)
  325. {
  326. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(gearman_task_context(task));
  327. if (benchmark->verbose > 0)
  328. benchmark_check_time(benchmark);
  329. if (benchmark->verbose > 1)
  330. {
  331. std::cerr << "Failed " << gearman_task_job_handle(task) << " " << gearman_task_error(task) << std::endl;
  332. }
  333. return GEARMAN_SUCCESS;
  334. }
  335. static void _usage(char *name)
  336. {
  337. printf("\nusage: %s\n"
  338. "\t[-c count] [-f <function>] [-h <host>] [-m <min_size>]\n"
  339. "\t[-M <max_size>] [-n <num_tasks>] [-p <port>] [-s] [-v]\n\n", name);
  340. printf("\t-c <count> - number of times to run all tasks\n");
  341. printf("\t-f <function> - function name for tasks (default %s)\n", GEARMAN_BENCHMARK_DEFAULT_FUNCTION);
  342. printf("\t-h <host> - job server host, can specify many\n");
  343. printf("\t-m <min_size> - minimum blob size (default %d)\n", BLOBSLAP_DEFAULT_BLOB_MIN_SIZE);
  344. printf("\t-M <max_size> - maximum blob size (default %d)\n", BLOBSLAP_DEFAULT_BLOB_MAX_SIZE);
  345. printf("\t-n <num_tasks> - number of tasks to run at once (default %d)\n", BLOBSLAP_DEFAULT_NUM_TASKS);
  346. printf("\t-p <port> - job server port\n");
  347. printf("\t-s <seed> - seed random number for blobsize with <seed>\n");
  348. printf("\t-e - tell worker to shutdown when done\n");
  349. printf("\t-v - increase verbose level\n");
  350. }