start_worker.cc 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * All rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are
  10. * met:
  11. *
  12. * * Redistributions of source code must retain the above copyright
  13. * notice, this list of conditions and the following disclaimer.
  14. *
  15. * * Redistributions in binary form must reproduce the above
  16. * copyright notice, this list of conditions and the following disclaimer
  17. * in the documentation and/or other materials provided with the
  18. * distribution.
  19. *
  20. * * The names of its contributors may not be used to endorse or
  21. * promote products derived from this software without specific prior
  22. * written permission.
  23. *
  24. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  25. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  26. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  27. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  28. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  29. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  30. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  31. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  32. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  33. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  34. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  35. *
  36. */
  37. #include <config.h>
  38. #include <libtest/test.hpp>
  39. #include <cassert>
  40. #include <cstring>
  41. #include <memory>
  42. #include <signal.h>
  43. #include <stdlib.h>
  44. #include <sys/wait.h>
  45. #include <unistd.h>
  46. #include <cstdio>
  47. #include <tests/start_worker.h>
  48. #include <util/instance.hpp>
  49. using namespace libtest;
  50. using namespace datadifferential;
  51. #include <tests/worker.h>
  52. #ifndef __INTEL_COMPILER
  53. #pragma GCC diagnostic ignored "-Wold-style-cast"
  54. #endif
  55. #define CONTEXT_MAGIC_MARKER 45
  56. struct context_st {
  57. struct worker_handle_st *handle;
  58. in_port_t port;
  59. gearman_worker_options_t options;
  60. gearman_function_t worker_fn;
  61. std::string namespace_key;
  62. std::string function_name;
  63. void *context;
  64. int magic;
  65. int _timeout;
  66. boost::barrier* _sync_point;
  67. context_st(worker_handle_st* handle_arg,
  68. gearman_function_t& arg,
  69. in_port_t port_arg,
  70. const std::string& namespace_key_arg,
  71. const std::string& function_name_arg,
  72. void *context_arg,
  73. gearman_worker_options_t& options_arg,
  74. int timeout_arg) :
  75. port(port_arg),
  76. handle(handle_arg),
  77. options(options_arg),
  78. worker_fn(arg),
  79. namespace_key(namespace_key_arg),
  80. function_name(function_name_arg),
  81. context(context_arg),
  82. _sync_point(handle_arg->sync_point()),
  83. magic(CONTEXT_MAGIC_MARKER),
  84. _timeout(timeout_arg)
  85. {
  86. }
  87. void wait(void)
  88. {
  89. if (_sync_point)
  90. {
  91. _sync_point->wait();
  92. _sync_point= NULL;
  93. }
  94. }
  95. void fail(void)
  96. {
  97. handle->failed_startup= true;
  98. handle->wait();
  99. }
  100. ~context_st()
  101. {
  102. }
  103. };
  104. static void thread_runner(context_st* con)
  105. {
  106. std::auto_ptr<context_st> context(con);
  107. assert(context.get());
  108. if (context.get() == NULL)
  109. {
  110. Error << "context_st passed to function was NULL";
  111. return;
  112. }
  113. assert (context->magic == CONTEXT_MAGIC_MARKER);
  114. if (context->magic != CONTEXT_MAGIC_MARKER)
  115. {
  116. Error << "context_st had bad magic";
  117. return;
  118. }
  119. Worker worker;
  120. if (&worker == NULL)
  121. {
  122. Error << "Failed to create Worker";
  123. return;
  124. }
  125. assert(context->handle);
  126. if (context->handle == NULL)
  127. {
  128. Error << "Progammer error, no handle found";
  129. return;
  130. }
  131. context->handle->set_worker_id(&worker);
  132. if (context->namespace_key.empty() == false)
  133. {
  134. gearman_worker_set_namespace(&worker, context->namespace_key.c_str(), context->namespace_key.length());
  135. }
  136. if (gearman_failed(gearman_worker_add_server(&worker, NULL, context->port)))
  137. {
  138. Error << "gearman_worker_add_server()";
  139. return;
  140. }
  141. if (context->_timeout)
  142. {
  143. gearman_worker_set_timeout(&worker, context->_timeout);
  144. }
  145. // Check for a working server by "asking" it for an option
  146. {
  147. size_t count= 5;
  148. bool success= false;
  149. while (--count and success == false)
  150. {
  151. success= gearman_worker_set_server_option(&worker, test_literal_param("exceptions"));
  152. }
  153. if (success == false)
  154. {
  155. Error << "gearman_worker_set_server_option() failed";
  156. return;
  157. }
  158. }
  159. if (gearman_failed(gearman_worker_define_function(&worker,
  160. context->function_name.c_str(), context->function_name.length(),
  161. context->worker_fn,
  162. 0,
  163. context->context)))
  164. {
  165. Error << "Failed to add function " << context->function_name << "(" << gearman_worker_error(&worker) << ")";
  166. return;
  167. }
  168. if (context->options != gearman_worker_options_t())
  169. {
  170. gearman_worker_add_options(&worker, context->options);
  171. }
  172. context->handle->wait();
  173. gearman_return_t ret= GEARMAN_SUCCESS;
  174. while (context->handle->is_shutdown() == false or ret != GEARMAN_SHUTDOWN)
  175. {
  176. ret= gearman_worker_work(&worker);
  177. }
  178. }
  179. worker_handle_st *test_worker_start(in_port_t port,
  180. const char *namespace_key,
  181. const char *function_name,
  182. gearman_function_t &worker_fn,
  183. void *context_arg,
  184. gearman_worker_options_t options,
  185. int timeout)
  186. {
  187. worker_handle_st *handle= new worker_handle_st();
  188. fatal_assert(handle);
  189. context_st *context= new context_st(handle, worker_fn, port,
  190. namespace_key ? namespace_key : "",
  191. function_name,
  192. context_arg, options, timeout);
  193. fatal_assert(context);
  194. handle->_thread= new boost::thread(thread_runner, context);
  195. if (handle->_thread == NULL)
  196. {
  197. delete context;
  198. delete handle;
  199. return NULL;
  200. }
  201. handle->wait();
  202. return handle;
  203. }
  204. boost::barrier* worker_handle_st::sync_point()
  205. {
  206. return &_sync_point;
  207. }
  208. void worker_handle_st::set_worker_id(gearman_worker_st* worker)
  209. {
  210. _worker_id= gearman_worker_id(worker);
  211. }
  212. worker_handle_st::worker_handle_st() :
  213. failed_startup(false),
  214. _shutdown(false),
  215. _worker_id(gearman_id_t()),
  216. _sync_point(2)
  217. {
  218. }
  219. worker_handle_st::~worker_handle_st()
  220. {
  221. shutdown();
  222. }
  223. void worker_handle_st::wait()
  224. {
  225. _sync_point.wait();
  226. }
  227. void worker_handle_st::set_shutdown()
  228. {
  229. boost::mutex::scoped_lock(_shutdown_lock);
  230. _shutdown= true;
  231. }
  232. bool worker_handle_st::is_shutdown()
  233. {
  234. boost::mutex::scoped_lock(_shutdown_lock);
  235. return _shutdown;
  236. }
  237. bool worker_handle_st::shutdown()
  238. {
  239. if (is_shutdown())
  240. {
  241. return true;
  242. }
  243. set_shutdown();
  244. gearman_return_t rc;
  245. if (gearman_failed(rc= gearman_kill(_worker_id, GEARMAN_KILL)))
  246. {
  247. Error << "failed to shutdown " << rc;
  248. return false;
  249. }
  250. _thread->join();
  251. delete _thread;
  252. return true;
  253. }