workers_v1.cc 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * All rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are
  10. * met:
  11. *
  12. * * Redistributions of source code must retain the above copyright
  13. * notice, this list of conditions and the following disclaimer.
  14. *
  15. * * Redistributions in binary form must reproduce the above
  16. * copyright notice, this list of conditions and the following disclaimer
  17. * in the documentation and/or other materials provided with the
  18. * distribution.
  19. *
  20. * * The names of its contributors may not be used to endorse or
  21. * promote products derived from this software without specific prior
  22. * written permission.
  23. *
  24. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  25. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  26. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  27. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  28. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  29. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  30. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  31. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  32. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  33. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  34. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  35. *
  36. */
  37. #include <config.h>
  38. #include <libtest/test.hpp>
  39. using namespace libtest;
  40. #include <libgearman/gearman.h>
  41. #include <cassert>
  42. #include <cerrno>
  43. #include <cstdlib>
  44. #include <cstring>
  45. #include <climits>
  46. #include <cstdio>
  47. #include <string>
  48. #include <iostream>
  49. #include <tests/workers_v1.h>
  50. #ifndef __INTEL_COMPILER
  51. #pragma GCC diagnostic ignored "-Wold-style-cast"
  52. #endif
  53. void *echo_or_react_worker(gearman_job_st *job, void *,
  54. size_t *result_size, gearman_return_t *ret_ptr)
  55. {
  56. const void *workload= gearman_job_workload(job);
  57. *result_size= gearman_job_workload_size(job);
  58. if (workload == NULL or *result_size == 0)
  59. {
  60. assert(workload == NULL and *result_size == 0);
  61. *ret_ptr= GEARMAN_SUCCESS;
  62. return NULL;
  63. }
  64. else if (*result_size == test_literal_param_size("fail") and (not memcmp(workload, test_literal_param("fail"))))
  65. {
  66. *ret_ptr= GEARMAN_WORK_FAIL;
  67. return NULL;
  68. }
  69. else if (*result_size == test_literal_param_size("exception") and (not memcmp(workload, test_literal_param("exception"))))
  70. {
  71. gearman_return_t rc= gearman_job_send_exception(job, test_literal_param("test exception"));
  72. if (gearman_failed(rc))
  73. {
  74. *ret_ptr= GEARMAN_WORK_FAIL;
  75. return NULL;
  76. }
  77. }
  78. else if (*result_size == test_literal_param_size("warning") and (not memcmp(workload, test_literal_param("warning"))))
  79. {
  80. gearman_return_t rc= gearman_job_send_warning(job, test_literal_param("test warning"));
  81. if (gearman_failed(rc))
  82. {
  83. *ret_ptr= GEARMAN_WORK_FAIL;
  84. return NULL;
  85. }
  86. }
  87. void *result= malloc(*result_size);
  88. assert(result);
  89. memcpy(result, workload, *result_size);
  90. *ret_ptr= GEARMAN_SUCCESS;
  91. return result;
  92. }
  93. void *echo_or_react_chunk_worker(gearman_job_st *job, void *,
  94. size_t *result_size, gearman_return_t *ret_ptr)
  95. {
  96. const char *workload;
  97. workload= (const char *)gearman_job_workload(job);
  98. size_t workload_size= gearman_job_workload_size(job);
  99. bool fail= false;
  100. if (workload_size == test_literal_param_size("fail") and (not memcmp(workload, test_literal_param("fail"))))
  101. {
  102. fail= true;
  103. }
  104. else if (workload_size == test_literal_param_size("exception") and (not memcmp(workload, test_literal_param("exception"))))
  105. {
  106. gearman_return_t rc= gearman_job_send_exception(job, test_literal_param("test exception"));
  107. if (gearman_failed(rc))
  108. {
  109. *ret_ptr= GEARMAN_WORK_FAIL;
  110. return NULL;
  111. }
  112. }
  113. else if (workload_size == test_literal_param_size("warning") and (not memcmp(workload, test_literal_param("warning"))))
  114. {
  115. gearman_return_t rc= gearman_job_send_warning(job, test_literal_param("test warning"));
  116. if (gearman_failed(rc))
  117. {
  118. *ret_ptr= GEARMAN_WORK_FAIL;
  119. return NULL;
  120. }
  121. }
  122. for (size_t x= 0; x < workload_size; x++)
  123. {
  124. // Chunk
  125. {
  126. *ret_ptr= gearman_job_send_data(job, &workload[x], 1);
  127. if (*ret_ptr != GEARMAN_SUCCESS)
  128. {
  129. return NULL;
  130. }
  131. }
  132. // report status
  133. {
  134. *ret_ptr= gearman_job_send_status(job, (uint32_t)x,
  135. (uint32_t)workload_size);
  136. assert(gearman_success(*ret_ptr));
  137. if (gearman_failed(*ret_ptr))
  138. {
  139. return NULL;
  140. }
  141. if (fail)
  142. {
  143. *ret_ptr= GEARMAN_WORK_FAIL;
  144. return NULL;
  145. }
  146. }
  147. }
  148. *ret_ptr= GEARMAN_SUCCESS;
  149. *result_size= 0;
  150. return NULL;
  151. }
  152. // payload is unique value
  153. void *unique_worker(gearman_job_st *job, void *,
  154. size_t *result_size, gearman_return_t *ret_ptr)
  155. {
  156. const char *workload= static_cast<const char *>(gearman_job_workload(job));
  157. assert(job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ);
  158. assert(gearman_job_unique(job));
  159. assert(strlen(gearman_job_unique(job)));
  160. assert(gearman_job_workload_size(job));
  161. assert(strlen(gearman_job_unique(job)) == gearman_job_workload_size(job));
  162. assert(not memcmp(workload, gearman_job_unique(job), gearman_job_workload_size(job)));
  163. if (gearman_job_workload_size(job) == strlen(gearman_job_unique(job)))
  164. {
  165. if (not memcmp(workload, gearman_job_unique(job), gearman_job_workload_size(job)))
  166. {
  167. void *result= malloc(gearman_job_workload_size(job));
  168. assert(result);
  169. memcpy(result, workload, gearman_job_workload_size(job));
  170. *result_size= gearman_job_workload_size(job);
  171. *ret_ptr= GEARMAN_SUCCESS;
  172. return result;
  173. }
  174. }
  175. *result_size= 0;
  176. *ret_ptr= GEARMAN_WORK_FAIL;
  177. return NULL;
  178. }
  179. gearman_return_t cat_aggregator_fn(gearman_aggregator_st *, gearman_task_st *task, gearman_result_st *result)
  180. {
  181. std::string string_value;
  182. do
  183. {
  184. assert(task);
  185. gearman_result_st *result_ptr= gearman_task_result(task);
  186. if (result_ptr)
  187. {
  188. if (not gearman_result_size(result_ptr))
  189. return GEARMAN_WORK_EXCEPTION;
  190. string_value.append(gearman_result_value(result_ptr), gearman_result_size(result_ptr));
  191. }
  192. } while ((task= gearman_next(task)));
  193. gearman_result_store_value(result, string_value.c_str(), string_value.size());
  194. return GEARMAN_SUCCESS;
  195. }
  196. gearman_return_t split_worker(gearman_job_st *job, void* /* context */)
  197. {
  198. const char *workload= static_cast<const char *>(gearman_job_workload(job));
  199. size_t workload_size= gearman_job_workload_size(job);
  200. assert(job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL);
  201. const char *chunk_begin= workload;
  202. for (size_t x= 0; x < workload_size; x++)
  203. {
  204. if (int(workload[x]) == 0 or int(workload[x]) == int(' '))
  205. {
  206. if ((workload +x -chunk_begin) == 11 and not memcmp(chunk_begin, test_literal_param("mapper_fail")))
  207. {
  208. return GEARMAN_FATAL;
  209. }
  210. // NULL Chunk
  211. gearman_return_t rc= gearman_job_send_data(job, chunk_begin, workload +x -chunk_begin);
  212. if (gearman_failed(rc))
  213. {
  214. return GEARMAN_FATAL;
  215. }
  216. chunk_begin= workload +x +1;
  217. }
  218. }
  219. if (chunk_begin < workload +workload_size)
  220. {
  221. if ((size_t(workload +workload_size) -size_t(chunk_begin) ) == 11 and not memcmp(chunk_begin, test_literal_param("mapper_fail")))
  222. {
  223. return GEARMAN_FATAL;
  224. }
  225. gearman_return_t rc= gearman_job_send_data(job, chunk_begin, size_t(workload +workload_size) -size_t(chunk_begin));
  226. if (gearman_failed(rc))
  227. {
  228. return GEARMAN_FATAL;
  229. }
  230. }
  231. return GEARMAN_SUCCESS;
  232. }
  233. static pthread_mutex_t increment_reset_worker_mutex= PTHREAD_MUTEX_INITIALIZER;
  234. void *increment_reset_worker(gearman_job_st *job, void *,
  235. size_t *result_size, gearman_return_t *ret_ptr)
  236. {
  237. static long counter= 0;
  238. long change= 0;
  239. const char *workload= (const char*)gearman_job_workload(job);
  240. if (gearman_job_workload_size(job) == test_literal_param_size("reset") and (not memcmp(workload, test_literal_param("reset"))))
  241. {
  242. pthread_mutex_lock(&increment_reset_worker_mutex);
  243. counter= 0;
  244. pthread_mutex_unlock(&increment_reset_worker_mutex);
  245. *ret_ptr= GEARMAN_SUCCESS;
  246. return NULL;
  247. }
  248. else if (workload and gearman_job_workload_size(job))
  249. {
  250. char *temp= static_cast<char *>(malloc(gearman_job_workload_size(job) +1));
  251. assert(temp);
  252. memcpy(temp, workload, gearman_job_workload_size(job));
  253. temp[gearman_job_workload_size(job)]= 0;
  254. change= strtol(temp, (char **)NULL, 10);
  255. free(temp);
  256. if (change == LONG_MIN or change == LONG_MAX or ( change == 0 and errno < 0))
  257. {
  258. gearman_job_send_warning(job, test_literal_param("strtol() failed"));
  259. *ret_ptr= GEARMAN_WORK_FAIL;
  260. return NULL;
  261. }
  262. }
  263. char *result;
  264. {
  265. pthread_mutex_lock(&increment_reset_worker_mutex);
  266. counter= counter +change;
  267. result= (char *)malloc(40);
  268. if (not result)
  269. {
  270. gearman_job_send_warning(job, test_literal_param("malloc() failed"));
  271. *ret_ptr= GEARMAN_WORK_FAIL;
  272. return NULL;
  273. }
  274. *result_size= size_t(snprintf(result, 40, "%ld", counter));
  275. pthread_mutex_unlock(&increment_reset_worker_mutex);
  276. }
  277. *ret_ptr= GEARMAN_SUCCESS;
  278. return result;
  279. }