queue.cc 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * All rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are
  10. * met:
  11. *
  12. * * Redistributions of source code must retain the above copyright
  13. * notice, this list of conditions and the following disclaimer.
  14. *
  15. * * Redistributions in binary form must reproduce the above
  16. * copyright notice, this list of conditions and the following disclaimer
  17. * in the documentation and/or other materials provided with the
  18. * distribution.
  19. *
  20. * * The names of its contributors may not be used to endorse or
  21. * promote products derived from this software without specific prior
  22. * written permission.
  23. *
  24. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  25. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  26. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  27. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  28. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  29. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  30. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  31. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  32. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  33. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  34. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  35. *
  36. */
  37. #include "gear_config.h"
  38. #include <iostream>
  39. #include <boost/program_options.hpp>
  40. #include "libgearman-server/common.h"
  41. #include <libgearman-server/queue.h>
  42. #include <libgearman-server/plugins/queue/base.h>
  43. #include <libgearman-server/queue.hpp>
  44. #include <libgearman-server/log.h>
  45. #include <assert.h>
  46. gearmand_error_t gearman_queue_add(gearman_server_st *server,
  47. const char *unique,
  48. size_t unique_size,
  49. const char *function_name,
  50. size_t function_name_size,
  51. const void *data,
  52. size_t data_size,
  53. gearman_job_priority_t priority,
  54. int64_t when)
  55. {
  56. assert(server->state.queue_startup == false);
  57. gearmand_error_t ret;
  58. if (server->queue_version == QUEUE_VERSION_FUNCTION)
  59. {
  60. assert(server->queue.functions->_add_fn);
  61. ret= (*(server->queue.functions->_add_fn))(server,
  62. (void *)server->queue.functions->_context,
  63. unique, unique_size,
  64. function_name,
  65. function_name_size,
  66. data, data_size, priority,
  67. when);
  68. }
  69. else
  70. {
  71. assert(server->queue.object);
  72. ret= server->queue.object->store(server,
  73. unique, unique_size,
  74. function_name,
  75. function_name_size,
  76. data, data_size, priority,
  77. when);
  78. }
  79. if (gearmand_success(ret))
  80. {
  81. ret= gearman_queue_flush(server);
  82. }
  83. return ret;
  84. }
  85. gearmand_error_t gearman_queue_flush(gearman_server_st *server)
  86. {
  87. if (server->queue_version == QUEUE_VERSION_FUNCTION)
  88. {
  89. assert(server->queue.functions->_flush_fn);
  90. return (*(server->queue.functions->_flush_fn))(server, (void *)server->queue.functions->_context);
  91. }
  92. assert(server->queue.object);
  93. return server->queue.object->flush(server);
  94. }
  95. gearmand_error_t gearman_queue_done(gearman_server_st *server,
  96. const char *unique,
  97. size_t unique_size,
  98. const char *function_name,
  99. size_t function_name_size)
  100. {
  101. if (server->queue_version == QUEUE_VERSION_FUNCTION)
  102. {
  103. assert(server->queue.functions->_done_fn);
  104. return (*(server->queue.functions->_done_fn))(server,
  105. (void *)server->queue.functions->_context,
  106. unique, unique_size,
  107. function_name,
  108. function_name_size);
  109. }
  110. assert(server->queue.object);
  111. return server->queue.object->done(server,
  112. unique, unique_size,
  113. function_name,
  114. function_name_size);
  115. }
  116. void gearman_server_save_job(gearman_server_st& server,
  117. const gearman_server_job_st* server_job)
  118. {
  119. if (server.queue_version == QUEUE_VERSION_CLASS)
  120. {
  121. assert(server.queue.object);
  122. server.queue.object->save_job(server, server_job);
  123. }
  124. }
  125. void gearman_server_set_queue(gearman_server_st& server,
  126. void *context,
  127. gearman_queue_add_fn *add,
  128. gearman_queue_flush_fn *flush,
  129. gearman_queue_done_fn *done,
  130. gearman_queue_replay_fn *replay)
  131. {
  132. delete server.queue.functions;
  133. server.queue.functions= NULL;
  134. delete server.queue.object;
  135. server.queue.object= NULL;
  136. server.queue_version= QUEUE_VERSION_FUNCTION;
  137. server.queue.functions= new queue_st();
  138. if (server.queue.functions)
  139. {
  140. server.queue.functions->_context= context;
  141. server.queue.functions->_add_fn= add;
  142. server.queue.functions->_flush_fn= flush;
  143. server.queue.functions->_done_fn= done;
  144. server.queue.functions->_replay_fn= replay;
  145. }
  146. assert(server.queue.functions);
  147. }
  148. void gearman_server_set_queue(gearman_server_st& server,
  149. gearmand::queue::Context* context)
  150. {
  151. delete server.queue.functions;
  152. server.queue.functions= NULL;
  153. delete server.queue.object;
  154. server.queue.object= NULL;
  155. assert(context);
  156. {
  157. server.queue_version= QUEUE_VERSION_CLASS;
  158. server.queue.object= context;
  159. }
  160. }
  161. namespace gearmand {
  162. namespace queue {
  163. plugins::Queue::vector all_queue_modules;
  164. void add(plugins::Queue* arg)
  165. {
  166. all_queue_modules.push_back(arg);
  167. }
  168. void load_options(boost::program_options::options_description &all)
  169. {
  170. for (plugins::Queue::vector::iterator iter= all_queue_modules.begin();
  171. iter != all_queue_modules.end();
  172. ++iter)
  173. {
  174. all.add((*iter)->command_line_options());
  175. }
  176. }
  177. gearmand_error_t initialize(gearmand_st *, std::string name)
  178. {
  179. bool launched= false;
  180. if (name.empty())
  181. {
  182. return GEARMAN_SUCCESS;
  183. }
  184. std::transform(name.begin(), name.end(), name.begin(), ::tolower);
  185. for (plugins::Queue::vector::iterator iter= all_queue_modules.begin();
  186. iter != all_queue_modules.end();
  187. ++iter)
  188. {
  189. if ((*iter)->compare(name) == 0)
  190. {
  191. if (launched)
  192. {
  193. return gearmand_gerror("Attempt to initialize multiple queues", GEARMAN_UNKNOWN_OPTION);
  194. }
  195. gearmand_error_t rc;
  196. if (gearmand_failed(rc= (*iter)->initialize()))
  197. {
  198. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, rc,
  199. "Failed to initialize %s: %s", name.c_str(), (*iter)->error_string().c_str());
  200. }
  201. launched= true;
  202. }
  203. }
  204. if (launched == false)
  205. {
  206. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_UNKNOWN_OPTION, "Unknown queue %s", name.c_str());
  207. }
  208. return GEARMAN_SUCCESS;
  209. }
  210. } // namespace queue
  211. } // namespace gearmand