workers.cc 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * All rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are
  10. * met:
  11. *
  12. * * Redistributions of source code must retain the above copyright
  13. * notice, this list of conditions and the following disclaimer.
  14. *
  15. * * Redistributions in binary form must reproduce the above
  16. * copyright notice, this list of conditions and the following disclaimer
  17. * in the documentation and/or other materials provided with the
  18. * distribution.
  19. *
  20. * * The names of its contributors may not be used to endorse or
  21. * promote products derived from this software without specific prior
  22. * written permission.
  23. *
  24. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  25. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  26. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  27. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  28. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  29. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  30. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  31. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  32. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  33. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  34. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  35. *
  36. */
  37. #include <config.h>
  38. #include <libgearman/gearman.h>
  39. #include <cassert>
  40. #include <cstdlib>
  41. #include <cstring>
  42. #include <string>
  43. #include <tests/workers.h>
  44. #ifndef __INTEL_COMPILER
  45. #pragma GCC diagnostic ignored "-Wold-style-cast"
  46. #endif
  47. void *echo_or_react_worker(gearman_job_st *job, void *,
  48. size_t *result_size, gearman_return_t *ret_ptr)
  49. {
  50. const void *workload= gearman_job_workload(job);
  51. *result_size= gearman_job_workload_size(job);
  52. if (workload == NULL or *result_size == 0)
  53. {
  54. assert(workload == NULL and *result_size == 0);
  55. *ret_ptr= GEARMAN_SUCCESS;
  56. return NULL;
  57. }
  58. else if (*result_size == gearman_literal_param_size("fail") and (not memcmp(workload, gearman_literal_param("fail"))))
  59. {
  60. *ret_ptr= GEARMAN_WORK_FAIL;
  61. return NULL;
  62. }
  63. else if (*result_size == gearman_literal_param_size("exception") and (not memcmp(workload, gearman_literal_param("exception"))))
  64. {
  65. gearman_return_t rc= gearman_job_send_exception(job, gearman_literal_param("test exception"));
  66. if (gearman_failed(rc))
  67. {
  68. *ret_ptr= GEARMAN_WORK_FAIL;
  69. return NULL;
  70. }
  71. }
  72. else if (*result_size == gearman_literal_param_size("warning") and (not memcmp(workload, gearman_literal_param("warning"))))
  73. {
  74. gearman_return_t rc= gearman_job_send_warning(job, gearman_literal_param("test warning"));
  75. if (gearman_failed(rc))
  76. {
  77. *ret_ptr= GEARMAN_WORK_FAIL;
  78. return NULL;
  79. }
  80. }
  81. void *result= malloc(*result_size);
  82. assert(result);
  83. memcpy(result, workload, *result_size);
  84. *ret_ptr= GEARMAN_SUCCESS;
  85. return result;
  86. }
  87. void *echo_or_react_chunk_worker(gearman_job_st *job, void *,
  88. size_t *result_size, gearman_return_t *ret_ptr)
  89. {
  90. const char *workload;
  91. workload= (const char *)gearman_job_workload(job);
  92. size_t workload_size= gearman_job_workload_size(job);
  93. bool fail= false;
  94. if (workload_size == gearman_literal_param_size("fail") and (not memcmp(workload, gearman_literal_param("fail"))))
  95. {
  96. fail= true;
  97. }
  98. else if (workload_size == gearman_literal_param_size("exception") and (not memcmp(workload, gearman_literal_param("exception"))))
  99. {
  100. gearman_return_t rc= gearman_job_send_exception(job, gearman_literal_param("test exception"));
  101. if (gearman_failed(rc))
  102. {
  103. *ret_ptr= GEARMAN_WORK_FAIL;
  104. return NULL;
  105. }
  106. }
  107. else if (workload_size == gearman_literal_param_size("warning") and (not memcmp(workload, gearman_literal_param("warning"))))
  108. {
  109. gearman_return_t rc= gearman_job_send_warning(job, gearman_literal_param("test warning"));
  110. if (gearman_failed(rc))
  111. {
  112. *ret_ptr= GEARMAN_WORK_FAIL;
  113. return NULL;
  114. }
  115. }
  116. for (size_t x= 0; x < workload_size; x++)
  117. {
  118. // Chunk
  119. {
  120. *ret_ptr= gearman_job_send_data(job, &workload[x], 1);
  121. if (*ret_ptr != GEARMAN_SUCCESS)
  122. {
  123. return NULL;
  124. }
  125. }
  126. // report status
  127. {
  128. *ret_ptr= gearman_job_send_status(job, (uint32_t)x,
  129. (uint32_t)workload_size);
  130. assert(gearman_success(*ret_ptr));
  131. if (gearman_failed(*ret_ptr))
  132. {
  133. return NULL;
  134. }
  135. if (fail)
  136. {
  137. *ret_ptr= GEARMAN_WORK_FAIL;
  138. return NULL;
  139. }
  140. }
  141. }
  142. *ret_ptr= GEARMAN_SUCCESS;
  143. *result_size= 0;
  144. return NULL;
  145. }
  146. // payload is unique value
  147. void *unique_worker(gearman_job_st *job, void *,
  148. size_t *result_size, gearman_return_t *ret_ptr)
  149. {
  150. const void *workload= gearman_job_workload(job);
  151. assert(job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ);
  152. assert(gearman_job_unique(job));
  153. assert(gearman_job_workload_size(job));
  154. assert(not memcmp(workload, gearman_job_unique(job), gearman_job_workload_size(job)));
  155. if (gearman_job_workload_size(job) == strlen(gearman_job_unique(job)))
  156. {
  157. if (not memcmp(workload, gearman_job_unique(job), gearman_job_workload_size(job)))
  158. {
  159. void *result= malloc(gearman_job_workload_size(job));
  160. assert(result);
  161. memcpy(result, workload, gearman_job_workload_size(job));
  162. *result_size= gearman_job_workload_size(job);
  163. *ret_ptr= GEARMAN_SUCCESS;
  164. return result;
  165. }
  166. }
  167. *result_size= 0;
  168. *ret_ptr= GEARMAN_WORK_FAIL;
  169. return NULL;
  170. }
  171. gearman_return_t cat_aggregator_fn(gearman_aggregator_st *, gearman_task_st *task, gearman_result_st *result)
  172. {
  173. std::string string_value;
  174. do
  175. {
  176. assert(task);
  177. gearman_result_st *result_ptr= gearman_task_result(task);
  178. if (result_ptr)
  179. {
  180. if (not gearman_result_size(result_ptr))
  181. return GEARMAN_WORK_EXCEPTION;
  182. string_value.append(gearman_result_value(result_ptr), gearman_result_size(result_ptr));
  183. }
  184. } while ((task= gearman_next(task)));
  185. gearman_result_store_value(result, string_value.c_str(), string_value.size());
  186. return GEARMAN_SUCCESS;
  187. }
  188. gearman_worker_error_t split_worker(gearman_job_st *job, void *)
  189. {
  190. const char *workload= static_cast<const char *>(gearman_job_workload(job));
  191. size_t workload_size= gearman_job_workload_size(job);
  192. assert(job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL);
  193. const char *chunk_begin= workload;
  194. for (size_t x= 0; x < workload_size; x++)
  195. {
  196. if (int(workload[x]) == 0 or int(workload[x]) == int(' '))
  197. {
  198. if ((workload +x -chunk_begin) == 11 and not memcmp(chunk_begin, gearman_literal_param("mapper_fail")))
  199. {
  200. return GEARMAN_WORKER_FAILED;
  201. }
  202. // NULL Chunk
  203. gearman_return_t rc= gearman_job_send_data(job, chunk_begin, workload +x -chunk_begin);
  204. if (gearman_failed(rc))
  205. {
  206. return GEARMAN_WORKER_FAILED;
  207. }
  208. chunk_begin= workload +x +1;
  209. }
  210. }
  211. if (chunk_begin < workload +workload_size)
  212. {
  213. if ((size_t(workload +workload_size) -size_t(chunk_begin) ) == 11 and not memcmp(chunk_begin, gearman_literal_param("mapper_fail")))
  214. {
  215. return GEARMAN_WORKER_FAILED;
  216. }
  217. gearman_return_t rc= gearman_job_send_data(job, chunk_begin, size_t(workload +workload_size) -size_t(chunk_begin));
  218. if (gearman_failed(rc))
  219. {
  220. return GEARMAN_WORKER_FAILED;
  221. }
  222. }
  223. return GEARMAN_WORKER_SUCCESS;
  224. }