text.cc 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2012-2013 Data Differential, http://datadifferential.com/ All
  6. * rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are
  10. * met:
  11. *
  12. * * Redistributions of source code must retain the above copyright
  13. * notice, this list of conditions and the following disclaimer.
  14. *
  15. * * Redistributions in binary form must reproduce the above
  16. * copyright notice, this list of conditions and the following disclaimer
  17. * in the documentation and/or other materials provided with the
  18. * distribution.
  19. *
  20. * * The names of its contributors may not be used to endorse or
  21. * promote products derived from this software without specific prior
  22. * written permission.
  23. *
  24. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  25. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  26. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  27. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  28. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  29. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  30. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  31. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  32. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  33. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  34. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  35. *
  36. */
  37. #include "gear_config.h"
  38. #include "libgearman-server/common.h"
  39. #include "libgearman-server/log.h"
  40. #include "libgearman/command.h"
  41. #include "libgearman/vector.hpp"
  42. #include <cassert>
  43. #include <cerrno>
  44. #include <cstring>
  45. #define TEXT_SUCCESS "OK\r\n"
  46. #define TEXT_ERROR_ARGS "ERR INVALID_ARGUMENTS An+incomplete+set+of+arguments+was+sent+to+this+command+%.*s\r\n"
  47. #define TEXT_ERROR_CREATE_FUNCTION "ERR CREATE_FUNCTION %.*s\r\n"
  48. #define TEXT_ERROR_UNKNOWN_COMMAND "ERR UNKNOWN_COMMAND Unknown+server+command%.*s\r\n"
  49. #define TEXT_ERROR_INTERNAL_ERROR "ERR UNKNOWN_ERROR\r\n"
  50. #define TEXT_ERROR_UNKNOWN_SHOW_ARGUMENTS "ERR UNKNOWN_SHOW_ARGUMENTS\r\n"
  51. #define TEXT_ERROR_UNKNOWN_JOB "ERR UNKNOWN_JOB\r\n"
  52. gearmand_error_t server_run_text(gearman_server_con_st *server_con,
  53. gearmand_packet_st *packet)
  54. {
  55. gearman_vector_st data(GEARMAND_TEXT_RESPONSE_SIZE);
  56. if (packet->argc)
  57. {
  58. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "text command %.*s %d arguments",
  59. (uint32_t)packet->arg_size[0], packet->arg[0],
  60. packet->argc);
  61. }
  62. #if 0
  63. const struct gearman_command_info_st *command= NULL;
  64. #endif
  65. if (packet->argc == 0)
  66. {
  67. data.vec_printf(TEXT_ERROR_UNKNOWN_COMMAND, 4, "NULL");
  68. }
  69. #if 0
  70. else if ((command= gearman_command_lookup((char *)(packet->arg[0]), packet->arg_size[0])))
  71. {
  72. }
  73. #endif
  74. else if (strcasecmp("workers", (char *)(packet->arg[0])) == 0)
  75. {
  76. for (gearman_server_thread_st *thread= Server->thread_list;
  77. thread != NULL;
  78. thread= thread->next)
  79. {
  80. int error;
  81. if ((error= pthread_mutex_lock(&thread->lock)) == 0)
  82. {
  83. for (gearman_server_con_st *con= thread->con_list; con != NULL; con= con->next)
  84. {
  85. if (con->_host == NULL)
  86. {
  87. continue;
  88. }
  89. data.vec_append_printf("%d %s %s :", con->con.fd(), con->_host, con->id);
  90. for (gearman_server_worker_st *worker= con->worker_list; worker != NULL; worker= worker->con_next)
  91. {
  92. data.vec_append_printf(" %.*s",
  93. (int)(worker->function->function_name_size),
  94. worker->function->function_name);
  95. }
  96. data.vec_append_printf("\n");
  97. }
  98. if ((error= (pthread_mutex_unlock(&(thread->lock)))) != 0)
  99. {
  100. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_unlock");
  101. }
  102. }
  103. else
  104. {
  105. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
  106. }
  107. }
  108. data.vec_append_printf(".\n");
  109. }
  110. else if (strcasecmp("prioritystatus", (char *)(packet->arg[0])) == 0)
  111. {
  112. uint32_t job_queued[GEARMAN_JOB_PRIORITY_MAX];
  113. for (uint32_t function_key= 0;
  114. function_key < GEARMAND_DEFAULT_HASH_SIZE;
  115. function_key++)
  116. {
  117. for (gearman_server_function_st *function= Server->function_hash[function_key];
  118. function != NULL;
  119. function= function->next)
  120. {
  121. for (size_t priority = 0; priority < GEARMAN_JOB_PRIORITY_MAX; priority++)
  122. {
  123. job_queued[priority] = 0;
  124. for (gearman_server_job_st *server_job= function->job_list[priority];
  125. server_job != NULL;
  126. server_job= server_job->function_next)
  127. {
  128. job_queued[priority]++;
  129. }
  130. }
  131. data.vec_append_printf("%.*s\t%u\t%u\t%u\t%u\n",
  132. int(function->function_name_size), function->function_name,
  133. job_queued[GEARMAN_JOB_PRIORITY_HIGH],
  134. job_queued[GEARMAN_JOB_PRIORITY_NORMAL],
  135. job_queued[GEARMAN_JOB_PRIORITY_LOW],
  136. function->worker_count);
  137. }
  138. }
  139. data.vec_append_printf(".\n");
  140. }
  141. else if (strcasecmp("status", (char *)(packet->arg[0])) == 0)
  142. {
  143. for (uint32_t function_key= 0;
  144. function_key < GEARMAND_DEFAULT_HASH_SIZE;
  145. function_key++)
  146. {
  147. for (gearman_server_function_st *function= Server->function_hash[function_key];
  148. function != NULL;
  149. function= function->next)
  150. {
  151. data.vec_append_printf("%.*s\t%u\t%u\t%u\n",
  152. int(function->function_name_size),
  153. function->function_name, function->job_total,
  154. function->job_running, function->worker_count);
  155. }
  156. }
  157. data.vec_append_printf(".\n");
  158. }
  159. else if (packet->argc >= 3
  160. and strcasecmp("cancel", (char *)(packet->arg[0])) == 0)
  161. {
  162. if (packet->argc == 3
  163. and strcasecmp("job", (char *)(packet->arg[1])) == 0)
  164. {
  165. gearmand_error_t ret= gearman_server_job_cancel(Gearmand()->server, packet->arg[2], strlen(packet->arg[2]));
  166. if (ret == GEARMAND_SUCCESS)
  167. {
  168. data.vec_printf(TEXT_SUCCESS);
  169. }
  170. else if (ret != GEARMAND_NO_JOBS)
  171. {
  172. data.vec_printf(TEXT_ERROR_INTERNAL_ERROR);
  173. }
  174. else
  175. {
  176. data.vec_printf(TEXT_ERROR_UNKNOWN_JOB);
  177. }
  178. }
  179. }
  180. else if (packet->argc >= 2 and strcasecmp("show", (char *)(packet->arg[0])) == 0)
  181. {
  182. if (packet->argc == 3
  183. and strcasecmp("unique", (char *)(packet->arg[1])) == 0
  184. and strcasecmp("jobs", (char *)(packet->arg[2])) == 0)
  185. {
  186. for (size_t x= 0; x < Server->hashtable_buckets; x++)
  187. {
  188. for (gearman_server_job_st* server_job= Server->unique_hash[x];
  189. server_job != NULL;
  190. server_job= server_job->unique_next)
  191. {
  192. data.vec_append_printf("%.*s\n", int(server_job->unique_length), server_job->unique);
  193. }
  194. }
  195. data.vec_append_printf(".\n");
  196. }
  197. else if (packet->argc == 2
  198. and strcasecmp("jobs", (char *)(packet->arg[1])) == 0)
  199. {
  200. for (size_t x= 0; x < Server->hashtable_buckets; ++x)
  201. {
  202. for (gearman_server_job_st *server_job= Server->job_hash[x];
  203. server_job != NULL;
  204. server_job= server_job->next)
  205. {
  206. data.vec_append_printf("%s\t%u\t%u\t%u\n", server_job->job_handle, uint32_t(server_job->retries),
  207. uint32_t(server_job->ignore_job), uint32_t(server_job->job_queued));
  208. }
  209. }
  210. data.vec_append_printf(".\n");
  211. }
  212. else
  213. {
  214. data.vec_printf(TEXT_ERROR_UNKNOWN_SHOW_ARGUMENTS);
  215. }
  216. }
  217. else if (strcasecmp("create", (char *)(packet->arg[0])) == 0)
  218. {
  219. if (packet->argc == 3 and strcasecmp("function", (char *)(packet->arg[1])) == 0)
  220. {
  221. gearman_server_function_st* function= gearman_server_function_get(Server, (char *)(packet->arg[2]), packet->arg_size[2] -2);
  222. if (function)
  223. {
  224. data.vec_printf(TEXT_SUCCESS);
  225. }
  226. else
  227. {
  228. data.vec_printf(TEXT_ERROR_CREATE_FUNCTION,
  229. (int)packet->arg_size[2], (char *)(packet->arg[2]));
  230. }
  231. }
  232. else
  233. {
  234. // create
  235. data.vec_printf(TEXT_ERROR_ARGS, (int)packet->arg_size[0], (char *)(packet->arg[0]));
  236. }
  237. }
  238. else if (strcasecmp("drop", (char *)(packet->arg[0])) == 0)
  239. {
  240. if (packet->argc == 3 and strcasecmp("function", (char *)(packet->arg[1])) == 0)
  241. {
  242. bool success= false;
  243. for (uint32_t function_key= 0; function_key < GEARMAND_DEFAULT_HASH_SIZE;
  244. function_key++)
  245. {
  246. for (gearman_server_function_st *function= Server->function_hash[function_key];
  247. function != NULL;
  248. function= function->next)
  249. {
  250. if (strcasecmp(function->function_name, (char *)(packet->arg[2])) == 0)
  251. {
  252. success= true;
  253. if (function->worker_count == 0 && function->job_running == 0)
  254. {
  255. gearman_server_function_free(Server, function);
  256. data.vec_append_printf(TEXT_SUCCESS);
  257. }
  258. else
  259. {
  260. data.vec_append_printf("ERR there are still connected workers or executing clients\r\n");
  261. }
  262. break;
  263. }
  264. }
  265. }
  266. if (success == false)
  267. {
  268. data.vec_printf("ERR function not found\r\n");
  269. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "%s", data.value());
  270. }
  271. }
  272. else
  273. {
  274. // drop
  275. data.vec_printf(TEXT_ERROR_ARGS, (int)packet->arg_size[0], (char *)(packet->arg[0]));
  276. }
  277. }
  278. else if (strcasecmp("maxqueue", (char *)(packet->arg[0])) == 0)
  279. {
  280. if (packet->argc == 1)
  281. {
  282. data.vec_append_printf(TEXT_ERROR_ARGS, (int)packet->arg_size[0], (char *)(packet->arg[0]));
  283. }
  284. else
  285. {
  286. uint32_t max_queue_size[GEARMAN_JOB_PRIORITY_MAX];
  287. for (int priority= 0; priority < GEARMAN_JOB_PRIORITY_MAX; ++priority)
  288. {
  289. const int argc= priority +2;
  290. if (packet->argc > argc)
  291. {
  292. const int parameter= atoi((char *)(packet->arg[argc]));
  293. if (parameter < 0)
  294. {
  295. max_queue_size[priority]= 0;
  296. }
  297. else
  298. {
  299. max_queue_size[priority]= (uint32_t)parameter;
  300. }
  301. }
  302. else
  303. {
  304. max_queue_size[priority]= GEARMAND_DEFAULT_MAX_QUEUE_SIZE;
  305. }
  306. }
  307. /*
  308. To preserve the existing behavior of maxqueue, ensure that the
  309. one-parameter invocation is applied to all priorities.
  310. */
  311. if (packet->argc <= 3)
  312. {
  313. for (int priority= 1; priority < GEARMAN_JOB_PRIORITY_MAX; ++priority)
  314. {
  315. max_queue_size[priority]= max_queue_size[0];
  316. }
  317. }
  318. for (uint32_t function_key= 0; function_key < GEARMAND_DEFAULT_HASH_SIZE;
  319. function_key++)
  320. {
  321. for (gearman_server_function_st *function= Server->function_hash[function_key];
  322. function != NULL;
  323. function= function->next)
  324. {
  325. if (strlen((char *)(packet->arg[1])) == function->function_name_size &&
  326. (memcmp(packet->arg[1], function->function_name, function->function_name_size) == 0))
  327. {
  328. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Applying queue limits to %s", function->function_name);
  329. memcpy(function->max_queue_size, max_queue_size, sizeof(uint32_t) * GEARMAN_JOB_PRIORITY_MAX);
  330. }
  331. }
  332. }
  333. data.vec_append_printf(TEXT_SUCCESS);
  334. }
  335. }
  336. else if (strcasecmp("getpid", (char *)(packet->arg[0])) == 0)
  337. {
  338. data.vec_printf("OK %d\n", (int)getpid());
  339. }
  340. else if (strcasecmp("verbose", (char *)(packet->arg[0])) == 0)
  341. {
  342. data.vec_printf("OK %s\n", gearmand_verbose_name(Gearmand()->verbose));
  343. }
  344. else if (strcasecmp("version", (char *)(packet->arg[0])) == 0)
  345. {
  346. data.vec_printf("OK %s\n", PACKAGE_VERSION);
  347. }
  348. else
  349. {
  350. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Failed to find command %.*s(%zu)",
  351. (uint32_t)packet->arg_size[0], packet->arg[0],
  352. packet->arg_size[0]);
  353. data.vec_printf(TEXT_ERROR_UNKNOWN_COMMAND, (int)packet->arg_size[0], (char *)(packet->arg[0]));
  354. }
  355. gearman_server_packet_st *server_packet= gearman_server_packet_create(server_con->thread, false);
  356. if (server_packet == NULL)
  357. {
  358. return gearmand_gerror("calling gearman_server_packet_create()", GEARMAND_MEMORY_ALLOCATION_FAILURE);
  359. }
  360. server_packet->packet.reset(GEARMAN_MAGIC_TEXT, GEARMAN_COMMAND_TEXT);
  361. server_packet->packet.options.complete= true;
  362. server_packet->packet.options.free_data= true;
  363. if (data.size() == 0)
  364. {
  365. data.vec_append_printf(TEXT_ERROR_INTERNAL_ERROR);
  366. }
  367. gearman_string_t taken= data.take();
  368. server_packet->packet.data= gearman_c_str(taken);
  369. server_packet->packet.data_size= gearman_size(taken);
  370. int error;
  371. if ((error= pthread_mutex_lock(&server_con->thread->lock)) == 0)
  372. {
  373. GEARMAND_FIFO__ADD(server_con->io_packet, server_packet);
  374. if ((error= pthread_mutex_unlock(&(server_con->thread->lock))))
  375. {
  376. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_unlock");
  377. }
  378. }
  379. else
  380. {
  381. gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
  382. }
  383. gearman_server_con_io_add(server_con);
  384. return GEARMAND_SUCCESS;
  385. }