start_worker.cc 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * All rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are
  10. * met:
  11. *
  12. * * Redistributions of source code must retain the above copyright
  13. * notice, this list of conditions and the following disclaimer.
  14. *
  15. * * Redistributions in binary form must reproduce the above
  16. * copyright notice, this list of conditions and the following disclaimer
  17. * in the documentation and/or other materials provided with the
  18. * distribution.
  19. *
  20. * * The names of its contributors may not be used to endorse or
  21. * promote products derived from this software without specific prior
  22. * written permission.
  23. *
  24. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  25. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  26. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  27. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  28. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  29. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  30. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  31. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  32. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  33. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  34. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  35. *
  36. */
  37. #include <config.h>
  38. #include <cassert>
  39. #include <cstring>
  40. #include <signal.h>
  41. #include <stdlib.h>
  42. #include <sys/wait.h>
  43. #include <unistd.h>
  44. #include <pthread.h>
  45. #include <semaphore.h>
  46. #ifdef HAVE_UUID_UUID_H
  47. #include <uuid/uuid.h>
  48. #endif
  49. #include <cstdio>
  50. #include <libtest/test.hpp>
  51. #include <tests/start_worker.h>
  52. #include <util/instance.hpp>
  53. using namespace libtest;
  54. using namespace datadifferential;
  55. #ifndef __INTEL_COMPILER
  56. #pragma GCC diagnostic ignored "-Wold-style-cast"
  57. #endif
  58. /*
  59. Print to debug the output of what workers a server might have.
  60. */
  61. class Finish : public util::Instance::Finish
  62. {
  63. public:
  64. bool call(bool success, const std::string &response)
  65. {
  66. if (success)
  67. {
  68. if (response.empty())
  69. {
  70. std::cout << "OK" << std::endl;
  71. }
  72. else
  73. {
  74. std::cout << response;
  75. }
  76. }
  77. else if (not response.empty())
  78. {
  79. std::cerr << "Error: " << response;
  80. }
  81. else
  82. {
  83. std::cerr << "Error" << std::endl;
  84. }
  85. return true;
  86. }
  87. };
  88. class Status : public util::Instance::Finish
  89. {
  90. bool _dropped;
  91. public:
  92. Status() :
  93. _dropped(false)
  94. { }
  95. bool call(bool success, const std::string &)
  96. {
  97. _dropped= success;
  98. return true;
  99. }
  100. bool dropped() const
  101. {
  102. return _dropped;
  103. }
  104. };
  105. static gearman_return_t shutdown_fn(gearman_job_st*, void* /* context */)
  106. {
  107. return GEARMAN_SHUTDOWN;
  108. }
  109. struct context_st {
  110. in_port_t port;
  111. const char *function_name;
  112. struct worker_handle_st *handle;
  113. gearman_worker_options_t options;
  114. gearman_function_t& worker_fn;
  115. const char *namespace_key;
  116. std::string _shutdown_function;
  117. void *context;
  118. sem_t lock;
  119. context_st(worker_handle_st* handle_arg, gearman_function_t& arg) :
  120. port(handle_arg->port()),
  121. handle(handle_arg),
  122. options(gearman_worker_options_t()),
  123. worker_fn(arg),
  124. namespace_key(NULL),
  125. _shutdown_function(handle_arg->shutdown_function()),
  126. context(0)
  127. {
  128. sem_init(&lock, 0, 0);
  129. }
  130. const std::string& shutdown_function() const
  131. {
  132. return _shutdown_function;
  133. }
  134. ~context_st()
  135. {
  136. sem_destroy(&lock);
  137. }
  138. };
  139. static void *thread_runner(void *con)
  140. {
  141. context_st *context= (context_st *)con;
  142. assert(context);
  143. gearman_worker_st *worker= gearman_worker_create(NULL);
  144. assert(worker);
  145. context->handle->_worker_ptr= worker;
  146. if (context->namespace_key)
  147. {
  148. gearman_worker_set_namespace(worker, context->namespace_key, strlen(context->namespace_key));
  149. }
  150. gearman_return_t rc= gearman_worker_add_server(worker, NULL, context->port);
  151. assert(rc == GEARMAN_SUCCESS);
  152. bool success= gearman_worker_set_server_option(worker, test_literal_param("exceptions"));
  153. assert(success);
  154. if (gearman_failed(gearman_worker_define_function(worker,
  155. context->function_name, strlen(context->function_name),
  156. context->worker_fn,
  157. 0,
  158. context->context)))
  159. {
  160. Error << "Failed to add function " << context->function_name << "(" << gearman_worker_error(worker) << ")";
  161. pthread_exit(0);
  162. }
  163. gearman_function_t shutdown_function= gearman_function_create(shutdown_fn);
  164. if (gearman_failed(gearman_worker_define_function(worker,
  165. context->shutdown_function().c_str(), context->shutdown_function().size(),
  166. shutdown_function,
  167. 0, 0)))
  168. {
  169. Error << "Failed to add function shutdown(" << gearman_worker_error(worker) << ")";
  170. pthread_exit(0);
  171. }
  172. if (context->options != gearman_worker_options_t())
  173. {
  174. gearman_worker_add_options(worker, context->options);
  175. }
  176. assert(context->handle);
  177. sem_post(&context->lock);
  178. while (context->handle->is_shutdown() == false)
  179. {
  180. gearman_return_t ret= gearman_worker_work(worker);
  181. (void)ret;
  182. }
  183. gearman_worker_free(worker);
  184. delete context;
  185. pthread_exit(0);
  186. }
  187. struct worker_handle_st *test_worker_start(in_port_t port,
  188. const char *namespace_key,
  189. const char *function_name,
  190. gearman_function_t &worker_fn,
  191. void *context_arg,
  192. gearman_worker_options_t options)
  193. {
  194. worker_handle_st *handle= new worker_handle_st(namespace_key, function_name, port);
  195. assert(handle);
  196. context_st *context= new context_st(handle, worker_fn);
  197. context->port= port;
  198. context->function_name= function_name;
  199. context->context= context_arg;
  200. context->handle= handle;
  201. context->options= options;
  202. context->namespace_key= namespace_key;
  203. test_assert_errno(pthread_create(&handle->thread, NULL, thread_runner, context));
  204. sem_wait(&context->lock);
  205. return handle;
  206. }
  207. worker_handle_st::worker_handle_st(const char *namespace_key, const std::string& name_arg, in_port_t port_arg) :
  208. _shutdown(false),
  209. _name(name_arg),
  210. _port(port_arg),
  211. _worker_ptr(0)
  212. {
  213. pthread_mutex_init(&_shutdown_lock, NULL);
  214. uuid_t uuid;
  215. char uuid_string[37];
  216. uuid_generate(uuid);
  217. uuid_unparse(uuid, uuid_string);
  218. uuid_string[36]= 0;
  219. _shutdown_function.append(uuid_string);
  220. _shutdown_function.append("_SHUTDOWN");
  221. if (namespace_key)
  222. {
  223. _fully_shutdown_function.append(namespace_key);
  224. }
  225. _fully_shutdown_function+= _shutdown_function;
  226. }
  227. bool worker_handle_st::is_shutdown()
  228. {
  229. bool tmp;
  230. pthread_mutex_lock(&_shutdown_lock);
  231. tmp= _shutdown;
  232. pthread_mutex_unlock(&_shutdown_lock);
  233. return tmp;
  234. }
  235. bool worker_handle_st::shutdown()
  236. {
  237. if (is_shutdown())
  238. return true;
  239. set_shutdown();
  240. gearman_client_st *client= gearman_client_create(NULL);
  241. if (client == NULL)
  242. {
  243. Error << "gearman_client_create(" << gearman_client_error(client) << ")";
  244. gearman_client_free(client);
  245. return false;
  246. }
  247. if (gearman_failed(gearman_client_add_server(client, NULL, port())))
  248. {
  249. Error << "gearman_client_add_server(" << gearman_client_error(client) << ")";
  250. gearman_client_free(client);
  251. return false;
  252. }
  253. // If the worker is non-responsive this will allow us to not get stuck in
  254. // gearman_wait().
  255. gearman_client_set_timeout(client, 1000);
  256. gearman_return_t rc;
  257. (void)gearman_client_do(client, shutdown_function(true).c_str(), NULL, NULL, 0, 0, &rc);
  258. gearman_client_free(client);
  259. if (gearman_failed(rc))
  260. {
  261. Error << "Trying to see what workers are registered:" << port();
  262. util::Instance instance("localhost", port());
  263. instance.set_finish(new Finish);
  264. instance.push(new util::Operation(test_literal_param("workers\r\n")));
  265. instance.run();
  266. pthread_cancel(thread);
  267. return false;
  268. }
  269. else
  270. {
  271. Status *status;
  272. util::Instance instance("localhost", port());
  273. instance.set_finish(status= new Status);
  274. std::string execute(test_literal_param("drop function "));
  275. execute.append(shutdown_function(true));
  276. execute.append("\r\n");
  277. instance.push(new util::Operation(execute.c_str(), execute.size()));
  278. instance.run();
  279. if (not status->dropped())
  280. {
  281. Error << "Was unable to drop function " << shutdown_function(true);
  282. }
  283. }
  284. void *unused;
  285. pthread_join(thread, &unused);
  286. return true;
  287. }
  288. worker_handle_st::~worker_handle_st()
  289. {
  290. shutdown();
  291. }