blobslap_client.cc 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Blob slap client utility
  11. */
  12. #include <benchmark/benchmark.h>
  13. #include <iostream>
  14. #define BLOBSLAP_DEFAULT_NUM_TASKS 10
  15. #define BLOBSLAP_DEFAULT_BLOB_MIN_SIZE 0
  16. #define BLOBSLAP_DEFAULT_BLOB_MAX_SIZE 1024
  17. #define BLOBSLAP_BUFFER_SIZE 8192
  18. static gearman_return_t _created(gearman_task_st *task);
  19. static gearman_return_t _data(gearman_task_st *task);
  20. static gearman_return_t _status(gearman_task_st *task);
  21. static gearman_return_t _complete(gearman_task_st *task);
  22. static gearman_return_t _fail(gearman_task_st *task);
  23. static void _usage(char *name);
  24. int main(int argc, char *argv[])
  25. {
  26. gearman_benchmark_st benchmark;
  27. int c;
  28. char *host= NULL;
  29. in_port_t port= 0;
  30. const char *function= GEARMAN_BENCHMARK_DEFAULT_FUNCTION;
  31. uint32_t num_tasks= BLOBSLAP_DEFAULT_NUM_TASKS;
  32. size_t min_size= BLOBSLAP_DEFAULT_BLOB_MIN_SIZE;
  33. size_t max_size= BLOBSLAP_DEFAULT_BLOB_MAX_SIZE;
  34. unsigned long int count= 1;
  35. gearman_client_st client;
  36. bool shutdown_worker= false;
  37. if (not gearman_client_create(&client))
  38. {
  39. std::cerr << "Failed to allocate memory for client" << std::endl;
  40. exit(EXIT_FAILURE);
  41. }
  42. gearman_client_add_options(&client, GEARMAN_CLIENT_UNBUFFERED_RESULT);
  43. while ((c= getopt(argc, argv, "bc:f:h:m:M:n:p:s:ve?")) != -1)
  44. {
  45. switch(c)
  46. {
  47. case 'b':
  48. benchmark.background= true;
  49. break;
  50. case 'c':
  51. count= strtoul(optarg, NULL, 10);
  52. break;
  53. case 'f':
  54. function= optarg;
  55. break;
  56. case 'h':
  57. {
  58. if (gearman_failed(gearman_client_add_server(&client, host, port)))
  59. {
  60. std::cerr << "Failed while adding server " << host << ":" << port << " :" << gearman_client_error(&client) << std::endl;
  61. exit(EXIT_FAILURE);
  62. }
  63. }
  64. break;
  65. case 'm':
  66. min_size= static_cast<size_t>(strtoul(optarg, NULL, 10));
  67. break;
  68. case 'M':
  69. max_size= static_cast<size_t>(strtoul(optarg, NULL, 10));
  70. break;
  71. case 'n':
  72. num_tasks= uint32_t(strtoul(optarg, NULL, 10));
  73. break;
  74. case 'p':
  75. port= in_port_t(atoi(optarg));
  76. break;
  77. case 's':
  78. srand(uint32_t(atoi(optarg)));
  79. break;
  80. case 'e':
  81. shutdown_worker= true;
  82. break;
  83. case 'v':
  84. benchmark.verbose++;
  85. break;
  86. case '?':
  87. gearman_client_free(&client);
  88. _usage(argv[0]);
  89. exit(EXIT_SUCCESS);
  90. break;
  91. default:
  92. gearman_client_free(&client);
  93. _usage(argv[0]);
  94. exit(EXIT_FAILURE);
  95. }
  96. }
  97. if (not host)
  98. {
  99. if (gearman_failed(gearman_client_add_server(&client, NULL, port)))
  100. {
  101. std::cerr << "Failing to add localhost:" << port << " :" << gearman_client_error(&client) << std::endl;
  102. exit(EXIT_FAILURE);
  103. }
  104. }
  105. if (min_size > max_size)
  106. {
  107. std::cerr << "Min data size must be smaller than max data size" << std::endl;
  108. exit(EXIT_FAILURE);
  109. }
  110. if (num_tasks == 0)
  111. {
  112. std::cerr << "Number of tasks must be larger than zero\n" << std::endl;
  113. exit(EXIT_FAILURE);
  114. }
  115. gearman_task_st *tasks= new gearman_task_st[num_tasks];
  116. if (not tasks)
  117. {
  118. std::cerr << "Failed to allocate " << num_tasks << " tasks" << std::endl;
  119. exit(EXIT_FAILURE);
  120. }
  121. char *blob= new char[max_size];
  122. if (not blob)
  123. {
  124. std::cerr << "Failed to allocate blob with length of " << max_size << std::endl;
  125. exit(EXIT_FAILURE);
  126. }
  127. memset(blob, 'x', max_size);
  128. bool error= false;
  129. do
  130. {
  131. for (uint32_t x= 0; x < num_tasks; x++)
  132. {
  133. size_t blob_size;
  134. if (min_size == max_size)
  135. {
  136. blob_size= max_size;
  137. }
  138. else
  139. {
  140. blob_size= size_t(rand());
  141. if (max_size > RAND_MAX)
  142. blob_size*= size_t(rand()) + 1;
  143. blob_size= (blob_size % (max_size - min_size)) + min_size;
  144. }
  145. const char *blob_ptr= blob_size ? blob : NULL;
  146. gearman_return_t ret;
  147. if (benchmark.background)
  148. {
  149. (void)gearman_client_add_task_background(&client, &(tasks[x]),
  150. &benchmark, function, NULL,
  151. blob_ptr, blob_size, &ret);
  152. }
  153. else
  154. {
  155. (void)gearman_client_add_task(&client, &(tasks[x]), &benchmark,
  156. function, NULL, blob_ptr, blob_size,
  157. &ret);
  158. }
  159. if (gearman_failed(ret))
  160. {
  161. if (ret == GEARMAN_LOST_CONNECTION)
  162. continue;
  163. if (benchmark.background)
  164. {
  165. std::cerr << "Task #" << x << " failed during gearman_client_add_task_background(" << gearman_strerror(ret) << " -> " << gearman_client_error(&client) << std::endl ;
  166. }
  167. else
  168. {
  169. std::cerr << "Task #" << x << " failed during gearman_client_add_task(" << gearman_strerror(ret) << " -> " << gearman_client_error(&client) << std::endl ;
  170. }
  171. error= true;
  172. goto exit_immediatly;
  173. }
  174. }
  175. gearman_client_set_created_fn(&client, _created);
  176. gearman_client_set_data_fn(&client, _data);
  177. gearman_client_set_status_fn(&client, _status);
  178. gearman_client_set_complete_fn(&client, _complete);
  179. gearman_client_set_fail_fn(&client, _fail);
  180. gearman_client_set_timeout(&client, 1000);
  181. gearman_return_t ret;
  182. do {
  183. ret= gearman_client_run_tasks(&client);
  184. } while (gearman_continue(ret));
  185. if (ret == GEARMAN_TIMEOUT)
  186. {
  187. error= true;
  188. }
  189. else if (gearman_failed(ret) and ret != GEARMAN_LOST_CONNECTION)
  190. {
  191. std::cerr << "gearman_client_run_tasks(" << gearman_strerror(ret) << ") -> " << gearman_client_error(&client);
  192. for (uint32_t x= 0; x < num_tasks; x++)
  193. {
  194. if (gearman_task_error(&tasks[x]))
  195. {
  196. std::cerr << "\t Task #" << x << " failed with " << gearman_task_error(&tasks[x]) << std::endl;
  197. }
  198. }
  199. error= true;
  200. }
  201. for (uint32_t x= 0; x < num_tasks; x++)
  202. {
  203. gearman_task_free(&(tasks[x]));
  204. }
  205. count--;
  206. } while (count or error);
  207. exit_immediatly:
  208. if (shutdown_worker)
  209. {
  210. gearman_client_do(&client, "shutdown", 0, 0, 0, 0, 0);
  211. }
  212. delete [] blob;
  213. delete [] tasks;
  214. gearman_client_free(&client);
  215. if (benchmark.verbose)
  216. std::cout << "Successfully completed all tasks" << std::endl;
  217. return error ? EXIT_FAILURE : 0;
  218. }
  219. static gearman_return_t _created(gearman_task_st *task)
  220. {
  221. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(gearman_task_context(task));
  222. if (benchmark->background && benchmark->verbose > 0)
  223. benchmark_check_time(benchmark);
  224. if (benchmark->verbose > 2)
  225. {
  226. std::cout << "Created: " << gearman_task_job_handle(task) << std::endl;
  227. }
  228. return GEARMAN_SUCCESS;
  229. }
  230. static gearman_return_t _status(gearman_task_st *task)
  231. {
  232. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(gearman_task_context(task));
  233. if (benchmark->verbose > 2)
  234. {
  235. std::cout << "Status " << gearman_task_job_handle(task) << " " << gearman_task_numerator(task) << " " << gearman_task_denominator(task) << std::endl;
  236. }
  237. return GEARMAN_SUCCESS;
  238. }
  239. static gearman_return_t _data(gearman_task_st *task)
  240. {
  241. char buffer[BLOBSLAP_BUFFER_SIZE];
  242. gearman_return_t ret;
  243. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(gearman_task_context(task));
  244. while (1)
  245. {
  246. size_t size= gearman_task_recv_data(task, buffer, BLOBSLAP_BUFFER_SIZE, &ret);
  247. if (gearman_failed(GEARMAN_SUCCESS))
  248. return ret;
  249. if (size == 0)
  250. break;
  251. }
  252. if (benchmark->verbose > 2)
  253. {
  254. std::cerr << "Data: " << gearman_task_job_handle(task) << " " << gearman_task_data_size(task) << std::endl;
  255. }
  256. return GEARMAN_SUCCESS;
  257. }
  258. static gearman_return_t _complete(gearman_task_st *task)
  259. {
  260. char buffer[BLOBSLAP_BUFFER_SIZE];
  261. gearman_return_t ret;
  262. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(gearman_task_context(task));
  263. while (1)
  264. {
  265. size_t size= gearman_task_recv_data(task, buffer, BLOBSLAP_BUFFER_SIZE, &ret);
  266. if (gearman_failed(ret))
  267. return ret;
  268. if (size == 0)
  269. break;
  270. }
  271. if (benchmark->verbose > 0)
  272. {
  273. benchmark_check_time(benchmark);
  274. }
  275. if (benchmark->verbose > 1)
  276. {
  277. std::cout << "Completed: " << gearman_task_job_handle(task) << " " << gearman_task_data_size(task) << std::endl;
  278. }
  279. return GEARMAN_SUCCESS;
  280. }
  281. static gearman_return_t _fail(gearman_task_st *task)
  282. {
  283. gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(gearman_task_context(task));
  284. if (benchmark->verbose > 0)
  285. benchmark_check_time(benchmark);
  286. if (benchmark->verbose > 1)
  287. {
  288. std::cerr << "Failed " << gearman_task_job_handle(task) << " " << gearman_task_error(task) << std::endl;
  289. }
  290. return GEARMAN_SUCCESS;
  291. }
  292. static void _usage(char *name)
  293. {
  294. printf("\nusage: %s\n"
  295. "\t[-c count] [-f <function>] [-h <host>] [-m <min_size>]\n"
  296. "\t[-M <max_size>] [-n <num_tasks>] [-p <port>] [-s] [-v]\n\n", name);
  297. printf("\t-c <count> - number of times to run all tasks\n");
  298. printf("\t-f <function> - function name for tasks (default %s)\n", GEARMAN_BENCHMARK_DEFAULT_FUNCTION);
  299. printf("\t-h <host> - job server host, can specify many\n");
  300. printf("\t-m <min_size> - minimum blob size (default %d)\n", BLOBSLAP_DEFAULT_BLOB_MIN_SIZE);
  301. printf("\t-M <max_size> - maximum blob size (default %d)\n", BLOBSLAP_DEFAULT_BLOB_MAX_SIZE);
  302. printf("\t-n <num_tasks> - number of tasks to run at once (default %d)\n", BLOBSLAP_DEFAULT_NUM_TASKS);
  303. printf("\t-p <port> - job server port\n");
  304. printf("\t-s <seed> - seed random number for blobsize with <seed>\n");
  305. printf("\t-e - tell worker to shutdown when done\n");
  306. printf("\t-v - increase verbose level\n");
  307. }