universal.cc 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Gearman State Definitions
  41. */
  42. #include <libgearman/common.h>
  43. #include <cassert>
  44. #include <cerrno>
  45. #include <cstdarg>
  46. #include <cstdio>
  47. #include <cstdlib>
  48. #include <cstring>
  49. #include <unistd.h>
  50. void gearman_universal_initialize(gearman_universal_st &self, const gearman_universal_options_t *options)
  51. {
  52. { // Set defaults on all options.
  53. self.options.dont_track_packets= false;
  54. self.options.non_blocking= false;
  55. self.options.stored_non_blocking= false;
  56. }
  57. if (options)
  58. {
  59. while (*options != GEARMAN_MAX)
  60. {
  61. /**
  62. @note Check for bad value, refactor gearman_add_options().
  63. */
  64. gearman_universal_add_options(self, *options);
  65. options++;
  66. }
  67. }
  68. self.verbose= GEARMAN_VERBOSE_NEVER;
  69. self.con_count= 0;
  70. self.packet_count= 0;
  71. self.pfds_size= 0;
  72. self.sending= 0;
  73. self.timeout= -1;
  74. self.con_list= NULL;
  75. self.packet_list= NULL;
  76. self.pfds= NULL;
  77. self.log_fn= NULL;
  78. self.log_context= NULL;
  79. self.allocator= gearman_default_allocator();
  80. self._namespace= NULL;
  81. self.error.rc= GEARMAN_SUCCESS;
  82. self.error.last_errno= 0;
  83. self.error.last_error[0]= 0;
  84. self.wakeup_fd[0]= INVALID_SOCKET;
  85. self.wakeup_fd[1]= INVALID_SOCKET;
  86. }
  87. void gearman_nap(int arg)
  88. {
  89. if (arg < 1)
  90. { }
  91. else
  92. {
  93. #ifdef WIN32
  94. sleep(arg/1000000);
  95. #else
  96. struct timespec global_sleep_value= { 0, static_cast<long>(arg * 1000)};
  97. nanosleep(&global_sleep_value, NULL);
  98. #endif
  99. }
  100. }
  101. void gearman_nap(gearman_universal_st &self)
  102. {
  103. gearman_nap(self.timeout);
  104. }
  105. void gearman_universal_clone(gearman_universal_st &destination, const gearman_universal_st &source, bool has_wakeup_fd)
  106. {
  107. int wakeup_fd[2];
  108. if (has_wakeup_fd)
  109. {
  110. wakeup_fd[0]= destination.wakeup_fd[0];
  111. wakeup_fd[1]= destination.wakeup_fd[1];
  112. }
  113. gearman_universal_initialize(destination);
  114. if (has_wakeup_fd)
  115. {
  116. destination.wakeup_fd[0]= wakeup_fd[0];
  117. destination.wakeup_fd[1]= wakeup_fd[1];
  118. }
  119. (void)gearman_universal_set_option(destination, GEARMAN_NON_BLOCKING, source.options.non_blocking);
  120. (void)gearman_universal_set_option(destination, GEARMAN_DONT_TRACK_PACKETS, source.options.dont_track_packets);
  121. destination.timeout= source.timeout;
  122. destination._namespace= gearman_string_clone(source._namespace);
  123. for (gearman_connection_st *con= source.con_list; con; con= con->next)
  124. {
  125. if (gearman_connection_copy(destination, *con) == NULL)
  126. {
  127. gearman_universal_free(destination);
  128. return;
  129. }
  130. }
  131. }
  132. void gearman_universal_free(gearman_universal_st &universal)
  133. {
  134. gearman_free_all_cons(universal);
  135. gearman_free_all_packets(universal);
  136. gearman_string_free(universal._namespace);
  137. if (universal.pfds)
  138. {
  139. // created realloc()
  140. free(universal.pfds);
  141. }
  142. }
  143. gearman_return_t gearman_universal_set_option(gearman_universal_st &self, gearman_universal_options_t option, bool value)
  144. {
  145. switch (option)
  146. {
  147. case GEARMAN_NON_BLOCKING:
  148. self.options.non_blocking= value;
  149. break;
  150. case GEARMAN_DONT_TRACK_PACKETS:
  151. self.options.dont_track_packets= value;
  152. break;
  153. case GEARMAN_MAX:
  154. default:
  155. return GEARMAN_INVALID_COMMAND;
  156. }
  157. return GEARMAN_SUCCESS;
  158. }
  159. int gearman_universal_timeout(gearman_universal_st &self)
  160. {
  161. return self.timeout;
  162. }
  163. void gearman_universal_set_timeout(gearman_universal_st &self, int timeout)
  164. {
  165. self.timeout= timeout;
  166. }
  167. void gearman_set_log_fn(gearman_universal_st &self, gearman_log_fn *function,
  168. void *context, gearman_verbose_t verbose)
  169. {
  170. self.log_fn= function;
  171. self.log_context= context;
  172. self.verbose= verbose;
  173. }
  174. void gearman_set_workload_malloc_fn(gearman_universal_st& universal,
  175. gearman_malloc_fn *function,
  176. void *context)
  177. {
  178. universal.allocator.malloc= function;
  179. universal.allocator.context= context;
  180. }
  181. void gearman_set_workload_free_fn(gearman_universal_st& universal,
  182. gearman_free_fn *function,
  183. void *context)
  184. {
  185. universal.allocator.free= function;
  186. universal.allocator.context= context;
  187. }
  188. void gearman_free_all_cons(gearman_universal_st& universal)
  189. {
  190. while (universal.con_list)
  191. {
  192. delete universal.con_list;
  193. }
  194. }
  195. void gearman_reset(gearman_universal_st& universal)
  196. {
  197. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  198. {
  199. con->close_socket();
  200. }
  201. }
  202. gearman_return_t gearman_flush_all(gearman_universal_st& universal)
  203. {
  204. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  205. {
  206. if (con->events & POLLOUT)
  207. {
  208. continue;
  209. }
  210. gearman_return_t ret= con->flush();
  211. if (gearman_failed(ret) and ret != GEARMAN_IO_WAIT)
  212. {
  213. return ret;
  214. }
  215. }
  216. return GEARMAN_SUCCESS;
  217. }
  218. gearman_return_t gearman_wait(gearman_universal_st& universal)
  219. {
  220. struct pollfd *pfds;
  221. bool have_shutdown_pipe= universal.wakeup_fd[0] == INVALID_SOCKET ? false : true;
  222. size_t con_count= universal.con_count +int(have_shutdown_pipe);
  223. if (universal.pfds_size < con_count)
  224. {
  225. pfds= static_cast<pollfd*>(realloc(universal.pfds, con_count * sizeof(struct pollfd)));
  226. if (pfds == NULL)
  227. {
  228. gearman_perror(universal, "pollfd realloc");
  229. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  230. }
  231. universal.pfds= pfds;
  232. universal.pfds_size= con_count;
  233. }
  234. else
  235. {
  236. pfds= universal.pfds;
  237. }
  238. nfds_t x= 0;
  239. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  240. {
  241. if (con->events == 0)
  242. continue;
  243. pfds[x].fd= con->fd;
  244. pfds[x].events= con->events;
  245. pfds[x].revents= 0;
  246. x++;
  247. }
  248. if (x == 0)
  249. {
  250. return gearman_error(universal, GEARMAN_NO_ACTIVE_FDS, "no active file descriptors");
  251. }
  252. // Wakeup handling, we only make use of this if we have active connections
  253. size_t pipe_array_iterator= 0;
  254. if (have_shutdown_pipe)
  255. {
  256. pipe_array_iterator= x;
  257. pfds[x].fd= universal.wakeup_fd[0];
  258. pfds[x].events= POLLIN;
  259. pfds[x].revents= 0;
  260. x++;
  261. }
  262. int ret= 0;
  263. while (universal.timeout)
  264. {
  265. ret= poll(pfds, x, universal.timeout);
  266. if (ret == -1)
  267. {
  268. switch(errno)
  269. {
  270. case EINTR:
  271. continue;
  272. default:
  273. return gearman_perror(universal, "poll");
  274. }
  275. }
  276. break;
  277. }
  278. if (ret == 0)
  279. {
  280. gearman_error(universal, GEARMAN_TIMEOUT, "timeout reached, no servers were available");
  281. return GEARMAN_TIMEOUT;
  282. }
  283. x= 0;
  284. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  285. {
  286. if (con->events == 0)
  287. {
  288. continue;
  289. }
  290. int err;
  291. socklen_t len= sizeof (err);
  292. if (getsockopt(con->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
  293. {
  294. con->cached_errno= err;
  295. }
  296. con->set_revents(pfds[x].revents);
  297. x++;
  298. }
  299. if (have_shutdown_pipe and pfds[pipe_array_iterator].revents)
  300. {
  301. char buffer[1];
  302. ssize_t read_length= read(universal.wakeup_fd[0], buffer, sizeof(buffer));
  303. if (read_length > 0)
  304. {
  305. return GEARMAN_SHUTDOWN_GRACEFUL;
  306. }
  307. if (read_length == 0)
  308. {
  309. return GEARMAN_SHUTDOWN;
  310. }
  311. perror("shudown read");
  312. // @todo figure out what happens in an error
  313. }
  314. return GEARMAN_SUCCESS;
  315. }
  316. gearman_connection_st *gearman_ready(gearman_universal_st& universal)
  317. {
  318. /*
  319. We can't keep universal between calls since connections may be removed during
  320. processing. If this list ever gets big, we may want something faster.
  321. */
  322. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  323. {
  324. if (con->options.ready)
  325. {
  326. con->options.ready= false;
  327. return con;
  328. }
  329. }
  330. return NULL;
  331. }
  332. /**
  333. @note _push_blocking is only used for echo (and should be fixed
  334. when tricky flip/flop in IO is fixed).
  335. */
  336. static inline void _push_blocking(gearman_universal_st& universal, bool &orig_block_universal)
  337. {
  338. orig_block_universal= universal.options.non_blocking;
  339. universal.options.non_blocking= false;
  340. }
  341. static inline void _pop_non_blocking(gearman_universal_st& universal, bool orig_block_universal)
  342. {
  343. universal.options.non_blocking= orig_block_universal;
  344. }
  345. gearman_return_t gearman_echo(gearman_universal_st& universal,
  346. const void *workload,
  347. size_t workload_size)
  348. {
  349. bool orig_block_universal;
  350. if (workload == NULL)
  351. {
  352. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "workload was NULL");
  353. }
  354. if (workload_size == 0)
  355. {
  356. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "workload_size was 0");
  357. }
  358. if (workload_size > GEARMAN_MAX_ECHO_SIZE)
  359. {
  360. return gearman_error(universal, GEARMAN_ARGUMENT_TOO_LARGE, "workload_size was greater then GEARMAN_MAX_ECHO_SIZE");
  361. }
  362. gearman_packet_st packet;
  363. gearman_return_t ret= gearman_packet_create_args(universal, packet, GEARMAN_MAGIC_REQUEST,
  364. GEARMAN_COMMAND_ECHO_REQ,
  365. &workload, &workload_size, 1);
  366. if (gearman_failed(ret))
  367. {
  368. #if 0
  369. assert_msg(universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  370. #endif
  371. gearman_packet_free(&packet);
  372. return ret;
  373. }
  374. _push_blocking(universal, orig_block_universal);
  375. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  376. {
  377. ret= con->send_packet(packet, true);
  378. if (gearman_failed(ret))
  379. {
  380. #if 0
  381. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  382. #endif
  383. goto exit;
  384. }
  385. con->options.packet_in_use= true;
  386. gearman_packet_st *packet_ptr= con->receiving(con->_packet, ret, true);
  387. if (gearman_failed(ret))
  388. {
  389. #if 0
  390. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  391. #endif
  392. con->free_private_packet();
  393. con->recv_packet= NULL;
  394. goto exit;
  395. }
  396. assert(packet_ptr);
  397. if (con->_packet.data_size != workload_size or
  398. memcmp(workload, con->_packet.data, workload_size))
  399. {
  400. #if 0
  401. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  402. #endif
  403. con->free_private_packet();
  404. con->recv_packet= NULL;
  405. ret= gearman_error(universal, GEARMAN_ECHO_DATA_CORRUPTION, "corruption during echo");
  406. goto exit;
  407. }
  408. con->recv_packet= NULL;
  409. con->free_private_packet();
  410. }
  411. ret= GEARMAN_SUCCESS;
  412. exit:
  413. gearman_packet_free(&packet);
  414. _pop_non_blocking(universal, orig_block_universal);
  415. return ret;
  416. }
  417. bool gearman_request_option(gearman_universal_st &universal,
  418. gearman_string_t &option)
  419. {
  420. bool orig_block_universal;
  421. const void *args[]= { gearman_c_str(option) };
  422. size_t args_size[]= { gearman_size(option) };
  423. gearman_packet_st packet;
  424. gearman_return_t ret= gearman_packet_create_args(universal, packet, GEARMAN_MAGIC_REQUEST,
  425. GEARMAN_COMMAND_OPTION_REQ,
  426. args, args_size, 1);
  427. if (gearman_failed(ret))
  428. {
  429. gearman_packet_free(&packet);
  430. gearman_error(universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "gearman_packet_create_args()");
  431. return ret;
  432. }
  433. _push_blocking(universal, orig_block_universal);
  434. for (gearman_connection_st *con= universal.con_list; con != NULL; con= con->next)
  435. {
  436. ret= con->send_packet(packet, true);
  437. if (gearman_failed(ret))
  438. {
  439. goto exit;
  440. }
  441. gearman_packet_st recv_packet;
  442. assert(con->recv_state == GEARMAN_CON_RECV_UNIVERSAL_NONE);
  443. gearman_packet_st *packet_ptr= con->receiving(recv_packet, ret, true);
  444. if (ret == GEARMAN_NOT_CONNECTED)
  445. {
  446. goto exit;
  447. }
  448. else if (gearman_failed(ret))
  449. {
  450. con->recv_packet= NULL;
  451. gearman_packet_free(&recv_packet);
  452. goto exit;
  453. }
  454. assert(packet_ptr);
  455. if (packet_ptr->command == GEARMAN_COMMAND_ERROR)
  456. {
  457. con->recv_packet= NULL;
  458. gearman_packet_free(&recv_packet);
  459. ret= gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "invalid server option");
  460. goto exit;
  461. }
  462. gearman_packet_free(&recv_packet);
  463. }
  464. ret= GEARMAN_SUCCESS;
  465. exit:
  466. gearman_packet_free(&packet);
  467. _pop_non_blocking(universal, orig_block_universal);
  468. return gearman_success(ret);
  469. }
  470. void gearman_free_all_packets(gearman_universal_st &universal)
  471. {
  472. while (universal.packet_list)
  473. {
  474. gearman_packet_free(universal.packet_list);
  475. }
  476. }
  477. gearman_id_t gearman_universal_id(gearman_universal_st &universal)
  478. {
  479. gearman_id_t handle= { universal.wakeup_fd[0], universal.wakeup_fd[1] };
  480. return handle;
  481. }
  482. /*
  483. * Local Definitions
  484. */
  485. void gearman_universal_set_namespace(gearman_universal_st& universal, const char *namespace_key, size_t namespace_key_size)
  486. {
  487. gearman_string_free(universal._namespace);
  488. universal._namespace= gearman_string_create(NULL, namespace_key, namespace_key_size);
  489. }