round_robin.cc 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. #include "gear_config.h"
  9. #include <libtest/test.hpp>
  10. using namespace libtest;
  11. #include <cassert>
  12. #include <cstdio>
  13. #include <cstdlib>
  14. #include <cstring>
  15. #include <memory>
  16. #include <unistd.h>
  17. #include <libgearman/gearman.h>
  18. #include "libgearman/client.hpp"
  19. #include "libgearman/worker.hpp"
  20. using namespace org::gearmand;
  21. #include "tests/start_worker.h"
  22. struct Context
  23. {
  24. bool run_worker;
  25. in_port_t _port;
  26. uint32_t _retries;
  27. server_startup_st& servers;
  28. Context(server_startup_st& arg, in_port_t port_arg) :
  29. run_worker(false),
  30. _port(port_arg),
  31. _retries(0),
  32. servers(arg)
  33. {
  34. }
  35. uint32_t retries()
  36. {
  37. return _retries;
  38. }
  39. uint32_t retries(const uint32_t arg)
  40. {
  41. return _retries= arg;
  42. }
  43. in_port_t port() const
  44. {
  45. return _port;
  46. }
  47. ~Context()
  48. {
  49. reset();
  50. }
  51. void reset()
  52. {
  53. servers.clear();
  54. _port= libtest::get_free_port();
  55. _retries= 0;
  56. }
  57. };
  58. #ifndef __INTEL_COMPILER
  59. #pragma GCC diagnostic ignored "-Wold-style-cast"
  60. #endif
  61. /* append test for worker */
  62. static gearman_return_t append_function_WORKER(gearman_job_st* job, void *context_arg)
  63. {
  64. /* this will will set the last char in the context (buffer) to the */
  65. /* first char of the work */
  66. char *buf = (char *)context_arg;
  67. assert(buf);
  68. char *work = (char *)gearman_job_workload(job);
  69. buf+= strlen(buf);
  70. *buf= *work;
  71. return GEARMAN_SUCCESS;
  72. }
  73. static test_return_t queue_add(void *object)
  74. {
  75. Context *context= (Context *)object;
  76. ASSERT_TRUE(context);
  77. libgearman::Client client(context->port());
  78. char job_handle[GEARMAN_JOB_HANDLE_SIZE];
  79. uint8_t *value= (uint8_t *)malloc(1);
  80. *value= uint8_t('0');
  81. size_t value_length= 1;
  82. context->run_worker= false;
  83. /* send strings "0", "1" ... "9" to alternating between 2 queues */
  84. /* queue1 = 1,3,5,7,9 */
  85. /* queue2 = 0,2,4,6,8 */
  86. for (uint32_t x= 0; x < 10; x++)
  87. {
  88. ASSERT_EQ(GEARMAN_SUCCESS,
  89. gearman_client_do_background(&client, (x % 2) ? "queue1" : "queue2", NULL,
  90. value, value_length,
  91. job_handle));
  92. *value = (uint8_t)(*value +1);
  93. }
  94. free(value);
  95. context->run_worker= true;
  96. return TEST_SUCCESS;
  97. }
  98. static test_return_t queue_worker(void *object)
  99. {
  100. Context *context= (Context *)object;
  101. ASSERT_TRUE(context);
  102. libgearman::Worker worker(context->port());
  103. char buffer[11];
  104. memset(buffer, 0, sizeof(buffer));
  105. ASSERT_TRUE(context->run_worker);
  106. gearman_function_t append_function_FN= gearman_function_create(append_function_WORKER);
  107. ASSERT_EQ(GEARMAN_SUCCESS, gearman_worker_define_function(&worker,
  108. test_literal_param("queue1"),
  109. append_function_FN,
  110. 0, buffer));
  111. ASSERT_EQ(GEARMAN_SUCCESS, gearman_worker_define_function(&worker,
  112. test_literal_param("queue2"),
  113. append_function_FN,
  114. 0, buffer));
  115. for (uint32_t x= 0; x < 10; x++)
  116. {
  117. ASSERT_EQ(GEARMAN_SUCCESS, gearman_worker_work(&worker));
  118. }
  119. // expect buffer to be reassembled in a predictable round robin order
  120. test_strcmp("1032547698", buffer);
  121. return TEST_SUCCESS;
  122. }
  123. struct Limit
  124. {
  125. uint32_t _count;
  126. uint32_t _expected;
  127. bool _limit;
  128. Limit(uint32_t expected_arg, bool limit_arg= false) :
  129. _count(0),
  130. _expected(expected_arg),
  131. _limit(limit_arg)
  132. { }
  133. void increment()
  134. {
  135. _count++;
  136. }
  137. void reset()
  138. {
  139. _count= 0;
  140. }
  141. uint32_t count()
  142. {
  143. return _count;
  144. }
  145. uint32_t expected()
  146. {
  147. return _expected;
  148. }
  149. gearman_return_t response() const
  150. {
  151. if (_limit)
  152. {
  153. return GEARMAN_SUCCESS;
  154. }
  155. return GEARMAN_FATAL;
  156. }
  157. bool complete()
  158. {
  159. if (_limit and _count == _expected)
  160. {
  161. return true;
  162. }
  163. return false;
  164. }
  165. };
  166. // The idea is to return GEARMAN_ERROR until we hit limit, then return
  167. // GEARMAN_SUCCESS
  168. static gearman_return_t job_retry_WORKER(gearman_job_st* job, void *context_arg)
  169. {
  170. (void)(job);
  171. assert(gearman_job_workload_size(job) == 0);
  172. assert(gearman_job_workload(job) == NULL);
  173. Limit *limit= (Limit*)context_arg;
  174. if (limit->complete())
  175. {
  176. return GEARMAN_SUCCESS;
  177. }
  178. limit->increment();
  179. return GEARMAN_ERROR;
  180. }
  181. static test_return_t _job_retry_TEST(Context *context, Limit& limit)
  182. {
  183. gearman_function_t job_retry_FN= gearman_function_create(job_retry_WORKER);
  184. std::unique_ptr<worker_handle_st> handle(test_worker_start(context->port(),
  185. NULL,
  186. __func__,
  187. job_retry_FN,
  188. &limit,
  189. gearman_worker_options_t(),
  190. 0)); // timeout
  191. libgearman::Client client(context->port());
  192. gearman_return_t rc;
  193. test_null(gearman_client_do(&client,
  194. __func__,
  195. NULL, // unique
  196. NULL, 0, // workload
  197. NULL, // result size
  198. &rc));
  199. ASSERT_EQ(uint32_t(limit.expected()), uint32_t(limit.count()));
  200. ASSERT_EQ(limit.response(), rc);
  201. return TEST_SUCCESS;
  202. }
  203. static test_return_t job_retry_GEARMAN_SUCCESS_TEST(void *object)
  204. {
  205. Context *context= (Context *)object;
  206. Limit limit(context->retries() -1, true);
  207. return _job_retry_TEST(context, limit);
  208. }
  209. static test_return_t job_retry_limit_GEARMAN_SUCCESS_TEST(void *object)
  210. {
  211. Context *context= (Context *)object;
  212. if (context->retries() < 2)
  213. {
  214. return TEST_SKIPPED;
  215. }
  216. for (uint32_t x= 1; x < context->retries(); x++)
  217. {
  218. Limit limit(uint32_t(x -1), true);
  219. ASSERT_EQ(TEST_SUCCESS, _job_retry_TEST(context, limit));
  220. }
  221. return TEST_SUCCESS;
  222. }
  223. static test_return_t job_retry_GEARMAN_FATAL_TEST(void *object)
  224. {
  225. Context *context= (Context *)object;
  226. Limit limit(context->retries());
  227. return _job_retry_TEST(context, limit);
  228. }
  229. static test_return_t round_robin_SETUP(void *object)
  230. {
  231. Context *context= (Context *)object;
  232. const char *argv[]= { "--round-robin", 0 };
  233. if (server_startup(context->servers, "gearmand", context->port(), argv))
  234. {
  235. return TEST_SUCCESS;
  236. }
  237. return TEST_FAILURE;
  238. }
  239. static test_return_t _job_retries_SETUP(Context *context)
  240. {
  241. char buffer[1024];
  242. snprintf(buffer, sizeof(buffer), "--job-retries=%u", uint32_t(context->retries()));
  243. const char *argv[]= { buffer, 0};
  244. if (server_startup(context->servers, "gearmand", context->port(), argv))
  245. {
  246. return TEST_SUCCESS;
  247. }
  248. return TEST_FAILURE;
  249. }
  250. static test_return_t job_retries_once_SETUP(void *object)
  251. {
  252. Context *context= (Context *)object;
  253. context->retries(1);
  254. return _job_retries_SETUP(context);
  255. }
  256. static test_return_t job_retries_twice_SETUP(void *object)
  257. {
  258. Context *context= (Context *)object;
  259. context->retries(2);
  260. return _job_retries_SETUP(context);
  261. }
  262. static test_return_t job_retries_ten_SETUP(void *object)
  263. {
  264. Context *context= (Context *)object;
  265. context->retries(10);
  266. return _job_retries_SETUP(context);
  267. }
  268. static test_return_t _TEARDOWN(void *object)
  269. {
  270. Context *context= (Context *)object;
  271. context->reset();
  272. return TEST_SUCCESS;
  273. }
  274. static void *world_create(server_startup_st& servers, test_return_t&)
  275. {
  276. Context *context= new Context(servers, libtest::get_free_port());
  277. return context;
  278. }
  279. static bool world_destroy(void *object)
  280. {
  281. Context *context= (Context *)object;
  282. delete context;
  283. return TEST_SUCCESS;
  284. }
  285. test_st round_robin_TESTS[] ={
  286. {"add", 0, queue_add },
  287. {"worker", 0, queue_worker },
  288. {0, 0, 0}
  289. };
  290. test_st job_retry_TESTS[] ={
  291. {"GEARMAN_FATAL", 0, job_retry_GEARMAN_FATAL_TEST },
  292. {"GEARMAN_SUCCESS", 0, job_retry_GEARMAN_SUCCESS_TEST },
  293. {"limit", 0, job_retry_limit_GEARMAN_SUCCESS_TEST },
  294. {0, 0, 0}
  295. };
  296. collection_st collection[] ={
  297. {"round_robin", round_robin_SETUP, _TEARDOWN, round_robin_TESTS },
  298. {"--job-retries=1", job_retries_once_SETUP, _TEARDOWN, job_retry_TESTS },
  299. {"--job-retries=2", job_retries_twice_SETUP, _TEARDOWN, job_retry_TESTS },
  300. {"--job-retries=10", job_retries_ten_SETUP, _TEARDOWN, job_retry_TESTS },
  301. {0, 0, 0, 0}
  302. };
  303. void get_world(libtest::Framework *world)
  304. {
  305. world->collections(collection);
  306. world->create(world_create);
  307. world->destroy(world_destroy);
  308. }