round_robin.cc 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. #include "config.h"
  9. #if defined(NDEBUG)
  10. # undef NDEBUG
  11. #endif
  12. #include <cassert>
  13. #include <cstdio>
  14. #include <cstdlib>
  15. #include <cstring>
  16. #include <memory>
  17. #include <unistd.h>
  18. #include <libgearman/gearman.h>
  19. #include <libtest/test.h>
  20. #include <libtest/server.h>
  21. #define ROUND_ROBIN_WORKER_TEST_PORT 32124
  22. struct worker_test_st
  23. {
  24. pid_t gearmand_pid;
  25. gearman_worker_st worker;
  26. bool run_worker;
  27. worker_test_st() :
  28. gearmand_pid(-1),
  29. worker(),
  30. run_worker(false)
  31. { }
  32. };
  33. /* Prototypes */
  34. void *world_create(test_return_t *error);
  35. test_return_t world_destroy(void *object);
  36. #ifndef __INTEL_COMPILER
  37. #pragma GCC diagnostic ignored "-Wold-style-cast"
  38. #endif
  39. /* append test for worker */
  40. static void *append_function(gearman_job_st *job __attribute__((unused)),
  41. void *context, size_t *result_size,
  42. gearman_return_t *ret_ptr __attribute__((unused)))
  43. {
  44. /* this will will set the last char in the context (buffer) to the */
  45. /* first char of the work */
  46. char * buf = (char *)context;
  47. char * work = (char *)gearman_job_workload(job);
  48. buf += strlen(buf);
  49. *buf = *work;
  50. *result_size= 0;
  51. return NULL;
  52. }
  53. static test_return_t queue_add(void *object)
  54. {
  55. gearman_return_t rc;
  56. worker_test_st *test= (worker_test_st *)object;
  57. gearman_client_st client;
  58. gearman_client_st *client_ptr;
  59. char job_handle[GEARMAN_JOB_HANDLE_SIZE];
  60. uint8_t *value= (uint8_t *)strdup("0");
  61. size_t value_length= 1;
  62. test->run_worker= false;
  63. client_ptr= gearman_client_create(&client);
  64. test_truth(client_ptr);
  65. rc= gearman_client_add_server(&client, NULL, ROUND_ROBIN_WORKER_TEST_PORT);
  66. test_truth(GEARMAN_SUCCESS == rc);
  67. /* send strings "0", "1" ... "9" to alternating between 2 queues */
  68. /* queue1 = 1,3,5,7,9 */
  69. /* queue2 = 0,2,4,6,8 */
  70. for (uint32_t x= 0; x < 10; x++)
  71. {
  72. rc= gearman_client_do_background(&client, x % 2 ? "queue1" : "queue2", NULL,
  73. value, value_length, job_handle);
  74. test_truth(GEARMAN_SUCCESS == rc);
  75. *value = (uint8_t)(*value + 1);
  76. }
  77. gearman_client_free(&client);
  78. free(value);
  79. test->run_worker= true;
  80. return TEST_SUCCESS;
  81. }
  82. static test_return_t queue_worker(void *object)
  83. {
  84. worker_test_st *test= (worker_test_st *)object;
  85. gearman_worker_st *worker= &(test->worker);
  86. char buffer[11];
  87. memset(buffer, 0, sizeof(buffer));
  88. test_truth(test->run_worker);
  89. if (gearman_worker_add_function(worker, "queue1", 5, append_function, buffer) != GEARMAN_SUCCESS)
  90. {
  91. return TEST_FAILURE;
  92. }
  93. if (gearman_worker_add_function(worker, "queue2", 5, append_function, buffer) != GEARMAN_SUCCESS)
  94. {
  95. return TEST_FAILURE;
  96. }
  97. for (uint32_t x= 0; x < 10; x++)
  98. {
  99. gearman_return_t rc;
  100. test_true_got(gearman_success(rc= gearman_worker_work(worker)), gearman_strerror(rc));
  101. }
  102. // expect buffer to be reassembled in a predictable round robin order
  103. test_strcmp("1032547698", buffer);
  104. return TEST_SUCCESS;
  105. }
  106. void *world_create(test_return_t *error)
  107. {
  108. const char *argv[2]= { "test_gearmand", "--round-robin"};
  109. pid_t gearmand_pid;
  110. gearmand_pid= test_gearmand_start(ROUND_ROBIN_WORKER_TEST_PORT, 2, argv);
  111. if (gearmand_pid == -1)
  112. {
  113. *error= TEST_FAILURE;
  114. return NULL;
  115. }
  116. worker_test_st *test= new (std::nothrow) worker_test_st;;
  117. if (not test)
  118. {
  119. *error= TEST_MEMORY_ALLOCATION_FAILURE;
  120. return NULL;
  121. }
  122. if (gearman_worker_create(&(test->worker)) == NULL)
  123. {
  124. *error= TEST_FAILURE;
  125. return NULL;
  126. }
  127. if (gearman_worker_add_server(&(test->worker), NULL, ROUND_ROBIN_WORKER_TEST_PORT) != GEARMAN_SUCCESS)
  128. {
  129. *error= TEST_FAILURE;
  130. return NULL;
  131. }
  132. test->gearmand_pid= gearmand_pid;
  133. *error= TEST_SUCCESS;
  134. return (void *)test;
  135. }
  136. test_return_t world_destroy(void *object)
  137. {
  138. worker_test_st *test= (worker_test_st *)object;
  139. gearman_worker_free(&(test->worker));
  140. test_gearmand_stop(test->gearmand_pid);
  141. delete test;
  142. return TEST_SUCCESS;
  143. }
  144. test_st tests[] ={
  145. {"add", 0, queue_add },
  146. {"worker", 0, queue_worker },
  147. {0, 0, 0}
  148. };
  149. collection_st collection[] ={
  150. {"round_robin", 0, 0, tests},
  151. {0, 0, 0, 0}
  152. };
  153. void get_world(world_st *world)
  154. {
  155. world->collections= collection;
  156. world->create= world_create;
  157. world->destroy= world_destroy;
  158. }