server.c 44 KB


  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Server Definitions
  11. */
  12. #include <config.h>
  13. #include <libgearman-server/common.h>
  14. #include <assert.h>
  15. #include <errno.h>
  16. #include <string.h>
  17. #include <iso646.h>
  18. /*
  19. * Private declarations
  20. */
  21. #define TEXT_SUCCESS "OK\r\n"
  22. #define TEXT_ERROR_ARGS "ERR INVALID_ARGUMENTS An+incomplete+set+of+arguments+was+sent+to+this+command+%.*s\r\n"
  23. #define TEXT_ERROR_CREATE_FUNCTION "ERR CREATE_FUNCTION %.*s\r\n"
  24. #define TEXT_ERROR_UNKNOWN_COMMAND "ERR UNKNOWN_COMMAND Unknown+server+command%.*s\r\n"
  25. #define TEXT_ERROR_INTERNAL_ERROR "ERR UNKNOWN_ERROR\r\n"
  26. /**
  27. * @addtogroup gearman_server_private Private Server Functions
  28. * @ingroup gearman_server
  29. * @{
  30. */
  31. /**
  32. * Add job to queue wihle replaying queue during startup.
  33. */
  34. static gearmand_error_t _queue_replay_add(gearman_server_st *server, void *context,
  35. const char *unique, size_t unique_size,
  36. const char *function_name, size_t function_name_size,
  37. const void *data, size_t data_size,
  38. gearmand_job_priority_t priority,
  39. int64_t when);
  40. /**
  41. * Queue an error packet.
  42. */
  43. static gearmand_error_t _server_error_packet(gearman_server_con_st *server_con,
  44. const char *error_code,
  45. const char *error_string);
  46. /**
  47. * Process text commands for a connection.
  48. */
  49. static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
  50. gearmand_packet_st *packet);
  51. /**
  52. * Send work result packets with data back to clients.
  53. */
  54. static gearmand_error_t
  55. _server_queue_work_data(gearman_server_job_st *server_job,
  56. gearmand_packet_st *packet, gearman_command_t command);
  57. /** @} */
  58. /*
  59. * Public definitions
  60. */
  61. gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
  62. gearmand_packet_st *packet)
  63. {
  64. gearmand_error_t ret;
  65. gearman_server_job_st *server_job;
  66. char job_handle[GEARMAND_JOB_HANDLE_SIZE];
  67. char option[GEARMAN_OPTION_SIZE];
  68. gearman_server_client_st *server_client= NULL;
  69. char numerator_buffer[11]; /* Max string size to hold a uint32_t. */
  70. char denominator_buffer[11]; /* Max string size to hold a uint32_t. */
  71. gearmand_job_priority_t priority;
  72. int checked_length;
  73. if (packet->magic == GEARMAN_MAGIC_RESPONSE)
  74. {
  75. return _server_error_packet(server_con, "bad_magic", "Request magic expected");
  76. }
  77. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  78. "%15s:%5s packet command %s",
  79. server_con->con.context == NULL ? "-" : server_con->con.context->host,
  80. server_con->con.context == NULL ? "-" : server_con->con.context->port,
  81. gearmand_strcommand(packet));
  82. switch (packet->command)
  83. {
  84. /* Client/worker requests. */
  85. case GEARMAN_COMMAND_ECHO_REQ:
  86. /* Reuse the data buffer and just shove the data back. */
  87. ret= gearman_server_io_packet_add(server_con, true, GEARMAN_MAGIC_RESPONSE,
  88. GEARMAN_COMMAND_ECHO_RES, packet->data,
  89. packet->data_size, NULL);
  90. if (gearmand_failed(ret))
  91. {
  92. return gearmand_gerror("gearman_server_io_packet_add", ret);
  93. }
  94. packet->options.free_data= false;
  95. break;
  96. case GEARMAN_COMMAND_SUBMIT_REDUCE_JOB: // Reduce request
  97. server_client= gearman_server_client_add(server_con);
  98. if (server_client == NULL)
  99. {
  100. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  101. }
  102. case GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND:
  103. {
  104. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  105. "Received reduce submission, Partitioner: %.*s(%lu) Reducer: %.*s(%lu) Unique: %.*s(%lu) with %d arguments",
  106. packet->arg_size[0] -1, packet->arg[0], packet->arg_size[0] -1,
  107. packet->arg_size[2] -1, packet->arg[2], packet->arg_size[2] -1, // reducer
  108. packet->arg_size[1] -1, packet->arg[1], packet->arg_size[1] -1,
  109. (int)packet->argc);
  110. if (packet->arg_size[2] -1 > GEARMAN_UNIQUE_SIZE)
  111. {
  112. gearman_server_client_free(server_client);
  113. gearmand_gerror("unique value too large", GEARMAN_ARGUMENT_TOO_LARGE);
  114. return _server_error_packet(server_con, "job failure", "Unique value too large");
  115. }
  116. gearmand_job_priority_t map_priority= GEARMAND_JOB_PRIORITY_NORMAL;
  117. /* Schedule job. */
  118. server_job= gearman_server_job_add_reducer(Server,
  119. (char *)(packet->arg[0]), packet->arg_size[0] -1, // Function
  120. (char *)(packet->arg[1]), packet->arg_size[1] -1, // unique
  121. (char *)(packet->arg[2]), packet->arg_size[2] -1, // reducer
  122. packet->data, packet->data_size, map_priority,
  123. server_client, &ret, 0);
  124. if (gearmand_success(ret))
  125. {
  126. packet->options.free_data= false;
  127. }
  128. else if (ret == GEARMAN_JOB_QUEUE_FULL)
  129. {
  130. gearman_server_client_free(server_client);
  131. return _server_error_packet(server_con, "queue_full", "Job queue is full");
  132. }
  133. else if (ret != GEARMAN_JOB_EXISTS)
  134. {
  135. gearman_server_client_free(server_client);
  136. gearmand_gerror("gearman_server_job_add", ret);
  137. return ret;
  138. }
  139. /* Queue the job created packet. */
  140. ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
  141. GEARMAN_COMMAND_JOB_CREATED,
  142. server_job->job_handle,
  143. (size_t)strlen(server_job->job_handle),
  144. NULL);
  145. if (gearmand_failed(ret))
  146. {
  147. gearman_server_client_free(server_client);
  148. gearmand_gerror("gearman_server_io_packet_add", ret);
  149. return ret;
  150. }
  151. gearmand_log_notice(GEARMAN_DEFAULT_LOG_PARAM,"accepted,%.*s,%.*s,%.*s",
  152. packet->arg_size[0] -1, packet->arg[0], // Function
  153. packet->arg_size[1] -1, packet->arg[1], // unique
  154. packet->arg_size[2] -1, packet->arg[2]); // reducer
  155. }
  156. break;
  157. /* Client requests. */
  158. case GEARMAN_COMMAND_SUBMIT_JOB:
  159. case GEARMAN_COMMAND_SUBMIT_JOB_BG:
  160. case GEARMAN_COMMAND_SUBMIT_JOB_HIGH:
  161. case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG:
  162. case GEARMAN_COMMAND_SUBMIT_JOB_LOW:
  163. case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG:
  164. case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH:
  165. if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB ||
  166. packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
  167. packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH)
  168. {
  169. priority= GEARMAND_JOB_PRIORITY_NORMAL;
  170. }
  171. else if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH ||
  172. packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG)
  173. {
  174. priority= GEARMAND_JOB_PRIORITY_HIGH;
  175. }
  176. else
  177. {
  178. priority= GEARMAND_JOB_PRIORITY_LOW;
  179. }
  180. if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
  181. packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
  182. packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG ||
  183. packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH)
  184. {
  185. server_client= NULL;
  186. }
  187. else
  188. {
  189. server_client= gearman_server_client_add(server_con);
  190. if (server_client == NULL)
  191. {
  192. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  193. }
  194. }
  195. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  196. "Received submission, %.*s/%.*s with %d arguments",
  197. packet->arg_size[0], packet->arg[0],
  198. packet->arg_size[1], packet->arg[1],
  199. (int)packet->argc);
  200. int64_t when= 0;
  201. if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH)
  202. {
  203. sscanf((char *)packet->arg[2], "%lld", (long long *)&when);
  204. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  205. "Received EPOCH job submission, %.*s/%.*s, with data for %jd at %jd, args %d",
  206. packet->arg_size[0], packet->arg[0],
  207. packet->arg_size[1], packet->arg[1],
  208. when, time(NULL),
  209. (int)packet->argc);
  210. }
  211. if (packet->arg_size[1] -1 > GEARMAN_UNIQUE_SIZE)
  212. {
  213. gearmand_gerror("unique value too large", GEARMAN_ARGUMENT_TOO_LARGE);
  214. gearman_server_client_free(server_client);
  215. return _server_error_packet(server_con, "job failure", "Unique value too large");
  216. }
  217. /* Schedule job. */
  218. server_job= gearman_server_job_add(Server,
  219. (char *)(packet->arg[0]), packet->arg_size[0] -1, // Function
  220. (char *)(packet->arg[1]), packet->arg_size[1] -1, // unique
  221. packet->data, packet->data_size, priority,
  222. server_client, &ret,
  223. when);
  224. if (gearmand_success(ret))
  225. {
  226. packet->options.free_data= false;
  227. }
  228. else if (ret == GEARMAN_JOB_QUEUE_FULL)
  229. {
  230. gearman_server_client_free(server_client);
  231. return _server_error_packet(server_con, "queue_full", "Job queue is full");
  232. }
  233. else if (ret != GEARMAN_JOB_EXISTS)
  234. {
  235. gearman_server_client_free(server_client);
  236. gearmand_gerror("gearman_server_job_add", ret);
  237. return ret;
  238. }
  239. /* Queue the job created packet. */
  240. ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
  241. GEARMAN_COMMAND_JOB_CREATED,
  242. server_job->job_handle,
  243. (size_t)strlen(server_job->job_handle),
  244. NULL);
  245. if (gearmand_failed(ret))
  246. {
  247. gearman_server_client_free(server_client);
  248. gearmand_gerror("gearman_server_io_packet_add", ret);
  249. return ret;
  250. }
  251. gearmand_log_notice(GEARMAN_DEFAULT_LOG_PARAM,"accepted,%.*s,%.*s,%jd",
  252. packet->arg_size[0], packet->arg[0], // Function
  253. packet->arg_size[1], packet->arg[1], // Unique
  254. when);
  255. break;
  256. case GEARMAN_COMMAND_GET_STATUS:
  257. {
  258. /* This may not be NULL terminated, so copy to make sure it is. */
  259. checked_length= snprintf(job_handle, GEARMAND_JOB_HANDLE_SIZE, "%.*s",
  260. (int)(packet->arg_size[0]), (char *)(packet->arg[0]));
  261. if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
  262. {
  263. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "snprintf(%d)", checked_length);
  264. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  265. }
  266. server_job= gearman_server_job_get(Server, job_handle, NULL);
  267. /* Queue status result packet. */
  268. if (server_job == NULL)
  269. {
  270. ret= gearman_server_io_packet_add(server_con, false,
  271. GEARMAN_MAGIC_RESPONSE,
  272. GEARMAN_COMMAND_STATUS_RES, job_handle,
  273. (size_t)(strlen(job_handle) + 1),
  274. "0", (size_t)2, "0", (size_t)2, "0",
  275. (size_t)2, "0", (size_t)1, NULL);
  276. }
  277. else
  278. {
  279. checked_length= snprintf(numerator_buffer, sizeof(numerator_buffer), "%u", server_job->numerator);
  280. if ((size_t)checked_length >= sizeof(numerator_buffer) || checked_length < 0)
  281. {
  282. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "snprintf(%d)", checked_length);
  283. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  284. }
  285. checked_length= snprintf(denominator_buffer, sizeof(denominator_buffer), "%u", server_job->denominator);
  286. if ((size_t)checked_length >= sizeof(denominator_buffer) || checked_length < 0)
  287. {
  288. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "snprintf(%d)", checked_length);
  289. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  290. }
  291. ret= gearman_server_io_packet_add(server_con, false,
  292. GEARMAN_MAGIC_RESPONSE,
  293. GEARMAN_COMMAND_STATUS_RES, job_handle,
  294. (size_t)(strlen(job_handle) + 1),
  295. "1", (size_t)2,
  296. server_job->worker == NULL ? "0" : "1",
  297. (size_t)2, numerator_buffer,
  298. (size_t)(strlen(numerator_buffer) + 1),
  299. denominator_buffer,
  300. (size_t)strlen(denominator_buffer),
  301. NULL);
  302. }
  303. if (ret != GEARMAN_SUCCESS)
  304. {
  305. gearmand_gerror("gearman_server_io_packet_add", ret);
  306. return ret;
  307. }
  308. }
  309. break;
  310. case GEARMAN_COMMAND_OPTION_REQ:
  311. /* This may not be NULL terminated, so copy to make sure it is. */
  312. checked_length= snprintf(option, GEARMAN_OPTION_SIZE, "%.*s",
  313. (int)(packet->arg_size[0]), (char *)(packet->arg[0]));
  314. if (checked_length >= GEARMAN_OPTION_SIZE || checked_length < 0)
  315. {
  316. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "snprintf(%d)", checked_length);
  317. return _server_error_packet(server_con, "unknown_option",
  318. "Server does not recognize given option");
  319. }
  320. if (strcasecmp(option, "exceptions") == 0)
  321. {
  322. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "'exceptions'");
  323. server_con->is_exceptions= true;
  324. }
  325. else
  326. {
  327. return _server_error_packet(server_con, "unknown_option",
  328. "Server does not recognize given option");
  329. }
  330. ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
  331. GEARMAN_COMMAND_OPTION_RES,
  332. packet->arg[0], packet->arg_size[0],
  333. NULL);
  334. if (ret != GEARMAN_SUCCESS)
  335. {
  336. gearmand_gerror("gearman_server_io_packet_add", ret);
  337. return ret;
  338. }
  339. break;
  340. /* Worker requests. */
  341. case GEARMAN_COMMAND_CAN_DO:
  342. if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
  343. packet->arg_size[0], 0) == NULL)
  344. {
  345. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  346. }
  347. break;
  348. case GEARMAN_COMMAND_CAN_DO_TIMEOUT:
  349. if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
  350. packet->arg_size[0] - 1,
  351. (in_port_t)atoi((char *)(packet->arg[1])))
  352. == NULL)
  353. {
  354. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  355. }
  356. break;
  357. case GEARMAN_COMMAND_CANT_DO:
  358. gearman_server_con_free_worker(server_con, (char *)(packet->arg[0]),
  359. packet->arg_size[0]);
  360. break;
  361. case GEARMAN_COMMAND_RESET_ABILITIES:
  362. gearman_server_con_free_workers(server_con);
  363. break;
  364. case GEARMAN_COMMAND_PRE_SLEEP:
  365. server_job= gearman_server_job_peek(server_con);
  366. if (server_job == NULL)
  367. {
  368. server_con->is_sleeping= true;
  369. /* Remove any timeouts while sleeping */
  370. gearman_server_con_delete_timeout(server_con);
  371. }
  372. else
  373. {
  374. /* If there are jobs that could be run, queue a NOOP packet to wake the
  375. worker up. This could be the result of a race codition. */
  376. ret= gearman_server_io_packet_add(server_con, false,
  377. GEARMAN_MAGIC_RESPONSE,
  378. GEARMAN_COMMAND_NOOP, NULL);
  379. if (ret != GEARMAN_SUCCESS)
  380. {
  381. gearmand_gerror("gearman_server_io_packet_add", ret);
  382. return ret;
  383. }
  384. }
  385. break;
  386. case GEARMAN_COMMAND_GRAB_JOB:
  387. case GEARMAN_COMMAND_GRAB_JOB_UNIQ:
  388. case GEARMAN_COMMAND_GRAB_JOB_ALL:
  389. server_con->is_sleeping= false;
  390. server_con->is_noop_sent= false;
  391. server_job= gearman_server_job_take(server_con);
  392. if (server_job == NULL)
  393. {
  394. /* No jobs found, queue no job packet. */
  395. ret= gearman_server_io_packet_add(server_con, false,
  396. GEARMAN_MAGIC_RESPONSE,
  397. GEARMAN_COMMAND_NO_JOB, NULL);
  398. }
  399. else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)
  400. {
  401. /*
  402. We found a runnable job, queue job assigned packet and take the job off the queue.
  403. */
  404. ret= gearman_server_io_packet_add(server_con, false,
  405. GEARMAN_MAGIC_RESPONSE,
  406. GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
  407. server_job->job_handle, (size_t)(strlen(server_job->job_handle) + 1),
  408. server_job->function->function_name, server_job->function->function_name_size + 1,
  409. server_job->unique, (size_t)(strlen(server_job->unique) + 1),
  410. server_job->data, server_job->data_size,
  411. NULL);
  412. }
  413. else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_ALL and server_job->reducer)
  414. {
  415. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  416. "Sending reduce submission, Partitioner: %.*s(%lu) Reducer: %.*s(%lu) Unique: %.*s(%lu) with data sized (%lu)" ,
  417. server_job->function->function_name_size, server_job->function->function_name, server_job->function->function_name_size,
  418. strlen(server_job->reducer), server_job->reducer, strlen(server_job->reducer),
  419. strlen(server_job->unique), server_job->unique, strlen(server_job->unique),
  420. (unsigned long)server_job->data_size);
  421. /*
  422. We found a runnable job, queue job assigned packet and take the job off the queue.
  423. */
  424. ret= gearman_server_io_packet_add(server_con, false,
  425. GEARMAN_MAGIC_RESPONSE,
  426. GEARMAN_COMMAND_JOB_ASSIGN_ALL,
  427. server_job->job_handle, (size_t)(strlen(server_job->job_handle) + 1),
  428. server_job->function->function_name, server_job->function->function_name_size + 1,
  429. server_job->unique, (size_t)(strlen(server_job->unique) + 1),
  430. server_job->reducer, (size_t)(strlen(server_job->reducer) +1),
  431. server_job->data, server_job->data_size,
  432. NULL);
  433. }
  434. else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_ALL)
  435. {
  436. /*
  437. We found a runnable job, queue job assigned packet and take the job off the queue.
  438. */
  439. ret= gearman_server_io_packet_add(server_con, false,
  440. GEARMAN_MAGIC_RESPONSE,
  441. GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
  442. server_job->job_handle, (size_t)(strlen(server_job->job_handle) +1),
  443. server_job->function->function_name, server_job->function->function_name_size +1,
  444. server_job->unique, (size_t)(strlen(server_job->unique) +1),
  445. server_job->data, server_job->data_size,
  446. NULL);
  447. }
  448. else
  449. {
  450. /* Same, but without unique ID. */
  451. ret= gearman_server_io_packet_add(server_con, false,
  452. GEARMAN_MAGIC_RESPONSE,
  453. GEARMAN_COMMAND_JOB_ASSIGN,
  454. server_job->job_handle, (size_t)(strlen(server_job->job_handle) + 1),
  455. server_job->function->function_name, server_job->function->function_name_size + 1,
  456. server_job->data, server_job->data_size,
  457. NULL);
  458. }
  459. if (gearmand_failed(ret))
  460. {
  461. gearmand_gerror("gearman_server_io_packet_add", ret);
  462. if (server_job)
  463. {
  464. return gearman_server_job_queue(server_job);
  465. }
  466. return ret;
  467. }
  468. /* Since job is assigned, we should respect function timeout */
  469. if (server_job != NULL)
  470. {
  471. gearman_server_con_add_job_timeout(server_con, server_job);
  472. }
  473. break;
  474. case GEARMAN_COMMAND_WORK_DATA:
  475. case GEARMAN_COMMAND_WORK_WARNING:
  476. server_job= gearman_server_job_get(Server,
  477. (char *)(packet->arg[0]),
  478. server_con);
  479. if (server_job == NULL)
  480. {
  481. return _server_error_packet(server_con, "job_not_found",
  482. "Job given in work result not found");
  483. }
  484. /* Queue the data/warning packet for all clients. */
  485. ret= _server_queue_work_data(server_job, packet, packet->command);
  486. if (gearmand_failed(ret))
  487. {
  488. gearmand_gerror("_server_queue_work_data", ret);
  489. return ret;
  490. }
  491. break;
  492. case GEARMAN_COMMAND_WORK_STATUS:
  493. server_job= gearman_server_job_get(Server,
  494. (char *)(packet->arg[0]),
  495. server_con);
  496. if (server_job == NULL)
  497. {
  498. return _server_error_packet(server_con, "job_not_found",
  499. "Job given in work result not found");
  500. }
  501. /* Update job status. */
  502. server_job->numerator= (uint32_t)atoi((char *)(packet->arg[1]));
  503. /* This may not be NULL terminated, so copy to make sure it is. */
  504. checked_length= snprintf(denominator_buffer, sizeof(denominator_buffer), "%.*s", (int)(packet->arg_size[2]),
  505. (char *)(packet->arg[2]));
  506. if ((size_t)checked_length > sizeof(denominator_buffer) || checked_length < 0)
  507. {
  508. gearmand_error("snprintf");
  509. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  510. }
  511. server_job->denominator= (uint32_t)atoi(denominator_buffer);
  512. /* Queue the status packet for all clients. */
  513. for (server_client= server_job->client_list; server_client;
  514. server_client= server_client->job_next)
  515. {
  516. ret= gearman_server_io_packet_add(server_client->con, false,
  517. GEARMAN_MAGIC_RESPONSE,
  518. GEARMAN_COMMAND_WORK_STATUS,
  519. packet->arg[0], packet->arg_size[0],
  520. packet->arg[1], packet->arg_size[1],
  521. packet->arg[2], packet->arg_size[2],
  522. NULL);
  523. if (gearmand_failed(ret))
  524. {
  525. gearmand_gerror("gearman_server_io_packet_add", ret);
  526. return ret;
  527. }
  528. }
  529. break;
  530. case GEARMAN_COMMAND_WORK_COMPLETE:
  531. server_job= gearman_server_job_get(Server,
  532. (char *)(packet->arg[0]),
  533. server_con);
  534. if (server_job == NULL)
  535. {
  536. return _server_error_packet(server_con, "job_not_found", "Job given in work result not found");
  537. }
  538. /* Queue the complete packet for all clients. */
  539. ret= _server_queue_work_data(server_job, packet,
  540. GEARMAN_COMMAND_WORK_COMPLETE);
  541. if (gearmand_failed(ret))
  542. {
  543. gearmand_gerror("_server_queue_work_data", ret);
  544. return ret;
  545. }
  546. /* Remove from persistent queue if one exists. */
  547. if (server_job->job_queued && Server->queue._done_fn != NULL)
  548. {
  549. ret= (*(Server->queue._done_fn))(Server, (void *)Server->queue._context,
  550. server_job->unique,
  551. (size_t)strlen(server_job->unique),
  552. server_job->function->function_name,
  553. server_job->function->function_name_size);
  554. if (gearmand_failed(ret))
  555. {
  556. gearmand_gerror("Remove from persistent queue", ret);
  557. return ret;
  558. }
  559. }
  560. /* Job is done, remove it. */
  561. gearman_server_job_free(server_job);
  562. break;
  563. case GEARMAN_COMMAND_WORK_EXCEPTION:
  564. server_job= gearman_server_job_get(Server,
  565. (char *)(packet->arg[0]),
  566. server_con);
  567. if (server_job == NULL)
  568. {
  569. return _server_error_packet(server_con, "job_not_found", "Job given in work result not found");
  570. }
  571. /* Queue the exception packet for all clients. */
  572. ret= _server_queue_work_data(server_job, packet,
  573. GEARMAN_COMMAND_WORK_EXCEPTION);
  574. if (gearmand_failed(ret))
  575. {
  576. gearmand_gerror("_server_queue_work_data", ret);
  577. return ret;
  578. }
  579. break;
  580. case GEARMAN_COMMAND_WORK_FAIL:
  581. /* This may not be NULL terminated, so copy to make sure it is. */
  582. checked_length= snprintf(job_handle, GEARMAND_JOB_HANDLE_SIZE, "%.*s",
  583. (int)(packet->arg_size[0]), (char *)(packet->arg[0]));
  584. if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
  585. {
  586. return _server_error_packet(server_con, "job_name_too_large",
  587. "Error occured due to GEARMAND_JOB_HANDLE_SIZE being too small from snprintf");
  588. }
  589. server_job= gearman_server_job_get(Server, job_handle,
  590. server_con);
  591. if (server_job == NULL)
  592. {
  593. return _server_error_packet(server_con, "job_not_found",
  594. "Job given in work result not found");
  595. }
  596. /* Queue the fail packet for all clients. */
  597. for (server_client= server_job->client_list; server_client;
  598. server_client= server_client->job_next)
  599. {
  600. ret= gearman_server_io_packet_add(server_client->con, false,
  601. GEARMAN_MAGIC_RESPONSE,
  602. GEARMAN_COMMAND_WORK_FAIL,
  603. packet->arg[0], packet->arg_size[0],
  604. NULL);
  605. if (gearmand_failed(ret))
  606. {
  607. gearmand_gerror("gearman_server_io_packet_add", ret);
  608. return ret;
  609. }
  610. }
  611. /* Remove from persistent queue if one exists. */
  612. if (server_job->job_queued && Server->queue._done_fn != NULL)
  613. {
  614. ret= (*(Server->queue._done_fn))(Server, (void *)Server->queue._context,
  615. server_job->unique,
  616. (size_t)strlen(server_job->unique),
  617. server_job->function->function_name,
  618. server_job->function->function_name_size);
  619. if (gearmand_failed(ret))
  620. {
  621. gearmand_gerror("Remove from persistent queue", ret);
  622. return ret;
  623. }
  624. }
  625. /* Job is done, remove it. */
  626. gearman_server_job_free(server_job);
  627. break;
  628. case GEARMAN_COMMAND_SET_CLIENT_ID:
  629. gearman_server_con_set_id(server_con, (char *)(packet->arg[0]),
  630. packet->arg_size[0]);
  631. break;
  632. case GEARMAN_COMMAND_TEXT:
  633. return _server_run_text(server_con, packet);
  634. case GEARMAN_COMMAND_UNUSED:
  635. case GEARMAN_COMMAND_NOOP:
  636. case GEARMAN_COMMAND_JOB_CREATED:
  637. case GEARMAN_COMMAND_NO_JOB:
  638. case GEARMAN_COMMAND_JOB_ASSIGN:
  639. case GEARMAN_COMMAND_ECHO_RES:
  640. case GEARMAN_COMMAND_ERROR:
  641. case GEARMAN_COMMAND_STATUS_RES:
  642. case GEARMAN_COMMAND_ALL_YOURS:
  643. case GEARMAN_COMMAND_OPTION_RES:
  644. case GEARMAN_COMMAND_SUBMIT_JOB_SCHED:
  645. case GEARMAN_COMMAND_JOB_ASSIGN_UNIQ:
  646. case GEARMAN_COMMAND_JOB_ASSIGN_ALL:
  647. case GEARMAN_COMMAND_MAX:
  648. default:
  649. return _server_error_packet(server_con, "bad_command", "Command not expected");
  650. }
  651. return GEARMAN_SUCCESS;
  652. }
  653. gearmand_error_t gearman_server_shutdown_graceful(gearman_server_st *server)
  654. {
  655. server->shutdown_graceful= true;
  656. if (server->job_count == 0)
  657. return GEARMAN_SHUTDOWN;
  658. return GEARMAN_SHUTDOWN_GRACEFUL;
  659. }
  660. gearmand_error_t gearman_server_queue_replay(gearman_server_st *server)
  661. {
  662. gearmand_error_t ret;
  663. if (server->queue._replay_fn == NULL)
  664. return GEARMAN_SUCCESS;
  665. server->state.queue_startup= true;
  666. ret= (*(server->queue._replay_fn))(server, (void *)server->queue._context,
  667. _queue_replay_add, server);
  668. server->state.queue_startup= false;
  669. return ret;
  670. }
  671. void *gearman_server_queue_context(const gearman_server_st *server)
  672. {
  673. return (void *)server->queue._context;
  674. }
  675. void gearman_server_set_queue(gearman_server_st *server,
  676. void *context,
  677. gearman_queue_add_fn *add,
  678. gearman_queue_flush_fn *flush,
  679. gearman_queue_done_fn *done,
  680. gearman_queue_replay_fn *replay)
  681. {
  682. server->queue._context= context;
  683. server->queue._add_fn= add;
  684. server->queue._flush_fn= flush;
  685. server->queue._done_fn= done;
  686. server->queue._replay_fn= replay;
  687. }
  688. /*
  689. * Private definitions
  690. */
  691. gearmand_error_t _queue_replay_add(gearman_server_st *server,
  692. void *context __attribute__ ((unused)),
  693. const char *unique, size_t unique_size,
  694. const char *function_name, size_t function_name_size,
  695. const void *data, size_t data_size,
  696. gearmand_job_priority_t priority,
  697. int64_t when)
  698. {
  699. gearmand_error_t ret= GEARMAN_SUCCESS;
  700. (void)gearman_server_job_add(server,
  701. function_name, function_name_size,
  702. unique, unique_size,
  703. data, data_size, priority, NULL, &ret, when);
  704. if (ret != GEARMAN_SUCCESS)
  705. gearmand_gerror("gearman_server_job_add", ret);
  706. return ret;
  707. }
  708. static gearmand_error_t _server_error_packet(gearman_server_con_st *server_con,
  709. const char *error_code,
  710. const char *error_string)
  711. {
  712. return gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
  713. GEARMAN_COMMAND_ERROR, error_code,
  714. (size_t)(strlen(error_code) + 1),
  715. error_string,
  716. (size_t)strlen(error_string), NULL);
  717. }
  718. static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
  719. gearmand_packet_st *packet)
  720. {
  721. size_t total;
  722. char *data= (char *)(char *)malloc(GEARMAN_TEXT_RESPONSE_SIZE);
  723. if (data == NULL)
  724. {
  725. gearmand_perror("malloc");
  726. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  727. }
  728. total= GEARMAN_TEXT_RESPONSE_SIZE;
  729. data[0]= 0;
  730. if (packet->argc)
  731. {
  732. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "text command %.*s", packet->arg_size[0], packet->arg[0]);
  733. }
  734. if (packet->argc == 0)
  735. {
  736. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_UNKNOWN_COMMAND, 4, "NULL");
  737. }
  738. else if (strcasecmp("workers", (char *)(packet->arg[0])) == 0)
  739. {
  740. size_t size= 0;
  741. for (gearman_server_thread_st *thread= Server->thread_list;
  742. thread != NULL;
  743. thread= thread->next)
  744. {
  745. int error;
  746. if (! (error= pthread_mutex_lock(&thread->lock)))
  747. {
  748. for (gearman_server_con_st *con= thread->con_list; con != NULL; con= con->next)
  749. {
  750. if (con->_host == NULL)
  751. {
  752. continue;
  753. }
  754. if (size > total)
  755. {
  756. size= total;
  757. }
  758. /* Make sure we have at least GEARMAN_TEXT_RESPONSE_SIZE bytes. */
  759. if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
  760. {
  761. char *new_data= (char *)realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
  762. if (new_data == NULL)
  763. {
  764. (void) pthread_mutex_unlock(&thread->lock);
  765. gearmand_perror("realloc");
  766. gearmand_debug("free");
  767. free(data);
  768. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  769. }
  770. data= new_data;
  771. total+= GEARMAN_TEXT_RESPONSE_SIZE;
  772. }
  773. int sn_checked_length= snprintf(data + size, total - size, "%d %s %s :",
  774. con->con.fd, con->_host, con->id);
  775. if ((size_t)sn_checked_length > total - size || sn_checked_length < 0)
  776. {
  777. (void) pthread_mutex_unlock(&thread->lock);
  778. gearmand_debug("free");
  779. free(data);
  780. gearmand_perror("snprintf");
  781. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  782. }
  783. size+= (size_t)sn_checked_length;
  784. if (size > total)
  785. {
  786. continue;
  787. }
  788. for (gearman_server_worker_st *worker= con->worker_list; worker != NULL; worker= worker->con_next)
  789. {
  790. int checked_length= snprintf(data + size, total - size, " %.*s",
  791. (int)(worker->function->function_name_size),
  792. worker->function->function_name);
  793. if ((size_t)checked_length > total - size || checked_length < 0)
  794. {
  795. (void) pthread_mutex_unlock(&thread->lock);
  796. gearmand_debug("free");
  797. free(data);
  798. gearmand_perror("snprintf");
  799. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  800. }
  801. size+= (size_t)checked_length;
  802. if (size > total)
  803. break;
  804. }
  805. if (size > total)
  806. {
  807. continue;
  808. }
  809. int checked_length= snprintf(data + size, total - size, "\n");
  810. if ((size_t)checked_length > total - size || checked_length < 0)
  811. {
  812. (void) pthread_mutex_unlock(&thread->lock);
  813. gearmand_debug("free");
  814. free(data);
  815. gearmand_perror("snprintf");
  816. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  817. }
  818. size+= (size_t)checked_length;
  819. }
  820. (void) pthread_mutex_unlock(&thread->lock);
  821. }
  822. else
  823. {
  824. errno= error;
  825. gearmand_error("pthread_mutex_lock");
  826. assert(! "pthread_mutex_lock");
  827. }
  828. }
  829. if (size < total)
  830. {
  831. int checked_length= snprintf(data + size, total - size, ".\n");
  832. if ((size_t)checked_length > total - size || checked_length < 0)
  833. {
  834. gearmand_perror("snprintf");
  835. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  836. }
  837. }
  838. }
  839. else if (strcasecmp("status", (char *)(packet->arg[0])) == 0)
  840. {
  841. size_t size= 0;
  842. gearman_server_function_st *function;
  843. for (function= Server->function_list; function != NULL;
  844. function= function->next)
  845. {
  846. if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
  847. {
  848. char *new_data= (char *)realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
  849. if (new_data == NULL)
  850. {
  851. gearmand_perror("realloc");
  852. gearmand_debug("free");
  853. free(data);
  854. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  855. }
  856. data= new_data;
  857. total+= GEARMAN_TEXT_RESPONSE_SIZE;
  858. }
  859. int checked_length= snprintf(data + size, total - size, "%.*s\t%u\t%u\t%u\n",
  860. (int)(function->function_name_size),
  861. function->function_name, function->job_total,
  862. function->job_running, function->worker_count);
  863. if ((size_t)checked_length > total - size || checked_length < 0)
  864. {
  865. gearmand_perror("snprintf");
  866. gearmand_debug("free");
  867. free(data);
  868. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  869. }
  870. size+= (size_t)checked_length;
  871. if (size > total)
  872. {
  873. size= total;
  874. }
  875. }
  876. if (size < total)
  877. {
  878. int checked_length= snprintf(data + size, total - size, ".\n");
  879. if ((size_t)checked_length > total - size || checked_length < 0)
  880. {
  881. gearmand_perror("snprintf");
  882. gearmand_debug("free");
  883. free(data);
  884. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  885. }
  886. }
  887. }
  888. else if (strcasecmp("create", (char *)(packet->arg[0])) == 0)
  889. {
  890. if (packet->argc == 3 && !strcasecmp("function", (char *)(packet->arg[1])))
  891. {
  892. gearman_server_function_st *function;
  893. function= gearman_server_function_get(Server, (char *)(packet->arg[2]), packet->arg_size[2] -2);
  894. if (function)
  895. {
  896. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_SUCCESS);
  897. }
  898. else
  899. {
  900. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_CREATE_FUNCTION,
  901. (int)packet->arg_size[2], (char *)(packet->arg[2]));
  902. }
  903. }
  904. else
  905. {
  906. // create
  907. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_ARGS, (int)packet->arg_size[0], (char *)(packet->arg[0]));
  908. }
  909. }
  910. else if (strcasecmp("drop", (char *)(packet->arg[0])) == 0)
  911. {
  912. if (packet->argc == 3 && !strcasecmp("function", (char *)(packet->arg[1])))
  913. {
  914. bool success= false;
  915. for (gearman_server_function_st *function= Server->function_list; function != NULL; function= function->next)
  916. {
  917. if (! strcasecmp(function->function_name, (char *)(packet->arg[2])))
  918. {
  919. success++;
  920. if (function->worker_count == 0 && function->job_running == 0)
  921. {
  922. gearman_server_function_free(Server, function);
  923. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_SUCCESS);
  924. }
  925. else
  926. {
  927. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR there are still connected workers or executing clients\r\n");
  928. }
  929. break;
  930. }
  931. }
  932. if (! success)
  933. {
  934. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR function not found\r\n");
  935. gearmand_debug(data);
  936. }
  937. }
  938. else
  939. {
  940. // drop
  941. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_ARGS, (int)packet->arg_size[0], (char *)(packet->arg[0]));
  942. }
  943. }
  944. else if (strcasecmp("maxqueue", (char *)(packet->arg[0])) == 0)
  945. {
  946. if (packet->argc == 1)
  947. {
  948. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_ARGS, (int)packet->arg_size[0], (char *)(packet->arg[0]));
  949. }
  950. else
  951. {
  952. uint32_t max_queue_size[GEARMAND_JOB_PRIORITY_MAX];
  953. for (int priority= 0; priority < GEARMAND_JOB_PRIORITY_MAX; ++priority)
  954. {
  955. const int argc= priority +2;
  956. if (packet->argc > argc)
  957. {
  958. const int parameter= atoi((char *)(packet->arg[argc]));
  959. if (parameter < 0)
  960. {
  961. max_queue_size[priority]= 0;
  962. }
  963. else
  964. {
  965. max_queue_size[priority]= (uint32_t)parameter;
  966. }
  967. }
  968. else
  969. {
  970. max_queue_size[priority]= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
  971. }
  972. }
  973. /*
  974. To preserve the existing behavior of maxqueue, ensure that the
  975. one-parameter invocation is applied to all priorities.
  976. */
  977. if (packet->argc <= 3)
  978. {
  979. for (int priority= 1; priority < GEARMAND_JOB_PRIORITY_MAX; ++priority)
  980. {
  981. max_queue_size[priority]= max_queue_size[0];
  982. }
  983. }
  984. for (gearman_server_function_st *function= Server->function_list; function != NULL; function= function->next)
  985. {
  986. if (strlen((char *)(packet->arg[1])) == function->function_name_size &&
  987. (memcmp(packet->arg[1], function->function_name, function->function_name_size) == 0))
  988. {
  989. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Applying queue limits to %s", function->function_name);
  990. memcpy(function->max_queue_size, max_queue_size, sizeof(uint32_t) * GEARMAND_JOB_PRIORITY_MAX);
  991. }
  992. }
  993. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_SUCCESS);
  994. }
  995. }
  996. else if (strcasecmp("getpid", (char *)(packet->arg[0])) == 0)
  997. {
  998. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK %d\n", (int)getpid());
  999. }
  1000. else if (strcasecmp("shutdown", (char *)(packet->arg[0])) == 0)
  1001. {
  1002. if (packet->argc == 1)
  1003. {
  1004. Server->shutdown= true;
  1005. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_SUCCESS);
  1006. }
  1007. else if (packet->argc == 2 &&
  1008. strcasecmp("graceful", (char *)(packet->arg[1])) == 0)
  1009. {
  1010. Server->shutdown_graceful= true;
  1011. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_SUCCESS);
  1012. }
  1013. else
  1014. {
  1015. // shutdown
  1016. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_ARGS, (int)packet->arg_size[0], (char *)(packet->arg[0]));
  1017. }
  1018. }
  1019. else if (strcasecmp("verbose", (char *)(packet->arg[0])) == 0)
  1020. {
  1021. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK %s\n", gearmand_verbose_name(Gearmand()->verbose));
  1022. }
  1023. else if (strcasecmp("version", (char *)(packet->arg[0])) == 0)
  1024. {
  1025. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK %s\n", PACKAGE_VERSION);
  1026. }
  1027. else
  1028. {
  1029. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Failed to find command %.*s(%llu)", packet->arg_size[0], packet->arg[0],
  1030. (unsigned long long)packet->arg_size[0]);
  1031. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_UNKNOWN_COMMAND, (int)packet->arg_size[0], (char *)(packet->arg[0]));
  1032. }
  1033. gearman_server_packet_st *server_packet= gearman_server_packet_create(server_con->thread, false);
  1034. if (server_packet == NULL)
  1035. {
  1036. gearmand_debug("free");
  1037. free(data);
  1038. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  1039. }
  1040. gearmand_packet_init(&(server_packet->packet), GEARMAN_MAGIC_TEXT, GEARMAN_COMMAND_TEXT);
  1041. server_packet->packet.magic= GEARMAN_MAGIC_TEXT;
  1042. server_packet->packet.command= GEARMAN_COMMAND_TEXT;
  1043. server_packet->packet.options.complete= true;
  1044. server_packet->packet.options.free_data= true;
  1045. if (data[0] == 0)
  1046. {
  1047. snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_INTERNAL_ERROR);
  1048. }
  1049. server_packet->packet.data= data;
  1050. server_packet->packet.data_size= strlen(data);
  1051. int error;
  1052. if (! (error= pthread_mutex_lock(&server_con->thread->lock)))
  1053. {
  1054. GEARMAN_FIFO_ADD(server_con->io_packet, server_packet,);
  1055. (void) pthread_mutex_unlock(&server_con->thread->lock);
  1056. }
  1057. else
  1058. {
  1059. errno= error;
  1060. gearmand_perror("pthread_mutex_lock");
  1061. assert(!"pthread_mutex_lock");
  1062. }
  1063. gearman_server_con_io_add(server_con);
  1064. return GEARMAN_SUCCESS;
  1065. }
  1066. static gearmand_error_t
  1067. _server_queue_work_data(gearman_server_job_st *server_job,
  1068. gearmand_packet_st *packet, gearman_command_t command)
  1069. {
  1070. gearman_server_client_st *server_client;
  1071. uint8_t *data;
  1072. gearmand_error_t ret;
  1073. for (server_client= server_job->client_list; server_client;
  1074. server_client= server_client->job_next)
  1075. {
  1076. if (command == GEARMAN_COMMAND_WORK_EXCEPTION && !(server_client->con->is_exceptions))
  1077. {
  1078. gearmand_debug("Dropping GEARMAN_COMMAND_WORK_EXCEPTION");
  1079. continue;
  1080. }
  1081. else if (command == GEARMAN_COMMAND_WORK_EXCEPTION)
  1082. {
  1083. gearmand_debug("Sending GEARMAN_COMMAND_WORK_EXCEPTION");
  1084. }
  1085. if (packet->data_size > 0)
  1086. {
  1087. if (packet->options.free_data &&
  1088. server_client->job_next == NULL)
  1089. {
  1090. data= (uint8_t *)(packet->data);
  1091. packet->options.free_data= false;
  1092. }
  1093. else
  1094. {
  1095. data= (uint8_t *)malloc(packet->data_size);
  1096. if (data == NULL)
  1097. {
  1098. gearmand_perror("malloc");
  1099. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  1100. }
  1101. memcpy(data, packet->data, packet->data_size);
  1102. }
  1103. }
  1104. else
  1105. {
  1106. data= NULL;
  1107. }
  1108. ret= gearman_server_io_packet_add(server_client->con, true,
  1109. GEARMAN_MAGIC_RESPONSE, command,
  1110. packet->arg[0], packet->arg_size[0],
  1111. data, packet->data_size, NULL);
  1112. if (ret != GEARMAN_SUCCESS)
  1113. {
  1114. gearmand_gerror("gearman_server_io_packet_add", ret);
  1115. return ret;
  1116. }
  1117. }
  1118. return GEARMAN_SUCCESS;
  1119. }