instance.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * DataDifferential Utility Library
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * All rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are
  10. * met:
  11. *
  12. * * Redistributions of source code must retain the above copyright
  13. * notice, this list of conditions and the following disclaimer.
  14. *
  15. * * Redistributions in binary form must reproduce the above
  16. * copyright notice, this list of conditions and the following disclaimer
  17. * in the documentation and/or other materials provided with the
  18. * distribution.
  19. *
  20. * * The names of its contributors may not be used to endorse or
  21. * promote products derived from this software without specific prior
  22. * written permission.
  23. *
  24. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  25. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  26. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  27. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  28. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  29. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  30. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  31. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  32. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  33. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  34. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  35. *
  36. */
  37. #include "gear_config.h"
  38. #include "util/instance.hpp"
  39. #include "libgearman/ssl.h"
  40. #include <cstdio>
  41. #include <iostream>
  42. #include <netdb.h>
  43. #include <netinet/in.h>
  44. #include <poll.h>
  45. #include <sstream>
  46. #include <sys/socket.h>
  47. #include <sys/types.h>
  48. #ifdef HAVE_UNISTD_H
  49. #include <unistd.h>
  50. #endif
  51. namespace datadifferential {
  52. namespace util {
  53. Instance::Instance(const std::string& hostname_arg, const std::string& service_arg) :
  54. _host(hostname_arg),
  55. _service(service_arg),
  56. _sockfd(INVALID_SOCKET),
  57. _use_ssl(false),
  58. state(NOT_WRITING),
  59. _addrinfo(0),
  60. _addrinfo_next(0),
  61. _finish_fn(NULL),
  62. _operations(),
  63. _ctx_ssl(NULL),
  64. _ssl(NULL)
  65. {
  66. }
  67. Instance::Instance(const std::string& hostname_arg, const in_port_t port_arg) :
  68. _host(hostname_arg),
  69. _sockfd(INVALID_SOCKET),
  70. _use_ssl(false),
  71. state(NOT_WRITING),
  72. _addrinfo(0),
  73. _addrinfo_next(0),
  74. _finish_fn(NULL),
  75. _operations(),
  76. _ctx_ssl(NULL),
  77. _ssl(NULL)
  78. {
  79. char tmp[BUFSIZ];
  80. snprintf(tmp, sizeof(tmp), "%u", static_cast<unsigned int>(port_arg));
  81. _service= tmp;
  82. }
  83. Instance::~Instance()
  84. {
  85. close_socket();
  86. free_addrinfo();
  87. for (Operation::vector::iterator iter= _operations.begin(); iter != _operations.end(); ++iter)
  88. {
  89. delete *iter;
  90. }
  91. _operations.clear();
  92. delete _finish_fn;
  93. #if defined(HAVE_SSL) && HAVE_SSL
  94. if (_ssl)
  95. {
  96. SSL_shutdown(_ssl);
  97. SSL_free(_ssl);
  98. }
  99. if (_ctx_ssl)
  100. {
  101. SSL_CTX_free(_ctx_ssl);
  102. }
  103. # if defined(HAVE_WOLFSSL) && HAVE_WOLFSSL
  104. wolfSSL_Cleanup();
  105. # endif // defined(HAVE_WOLFSSL)
  106. #endif
  107. }
  108. bool Instance::init_ssl()
  109. {
  110. #if defined(HAVE_SSL) && HAVE_SSL
  111. SSL_load_error_strings();
  112. SSL_library_init();
  113. #if (OPENSSL_VERSION_NUMBER < 0x10100000L)
  114. if ((_ctx_ssl= SSL_CTX_new(TLSv1_2_client_method())) == NULL)
  115. #else
  116. if ((_ctx_ssl= SSL_CTX_new(TLS_client_method())) == NULL)
  117. #endif
  118. {
  119. _last_error= "SSL_CTX_new error";
  120. return false;
  121. }
  122. if (SSL_CTX_load_verify_locations(_ctx_ssl, ssl_ca_file(), 0) != SSL_SUCCESS)
  123. {
  124. std::stringstream message;
  125. message << "Error loading CA file " << ssl_ca_file();
  126. _last_error= message.str();
  127. return false;
  128. }
  129. if (SSL_CTX_use_certificate_file(_ctx_ssl, ssl_certificate(), SSL_FILETYPE_PEM) != SSL_SUCCESS)
  130. {
  131. std::stringstream message;
  132. message << "Error loading certificate file " << ssl_certificate();
  133. _last_error= message.str();
  134. return false;
  135. }
  136. if (SSL_CTX_use_PrivateKey_file(_ctx_ssl, ssl_key(), SSL_FILETYPE_PEM) != SSL_SUCCESS)
  137. {
  138. std::stringstream message;
  139. message << "Error loading private key file " << ssl_key();
  140. _last_error= message.str();
  141. return false;
  142. }
  143. if (SSL_CTX_check_private_key(_ctx_ssl) != SSL_SUCCESS)
  144. {
  145. std::stringstream message;
  146. message << "Error checking private key";
  147. _last_error = message.str();
  148. return false;
  149. }
  150. #endif // defined(HAVE_SSL) && HAVE_SSL
  151. return true;
  152. }
  153. #if __GNUC__ >= 7
  154. #pragma GCC diagnostic warning "-Wimplicit-fallthrough"
  155. #endif
  156. bool Instance::run()
  157. {
  158. if (_use_ssl)
  159. {
  160. if (not init_ssl())
  161. {
  162. return false;
  163. }
  164. }
  165. while (not _operations.empty())
  166. {
  167. Operation::vector::value_type operation= _operations.back();
  168. switch (state)
  169. {
  170. case NOT_WRITING:
  171. {
  172. free_addrinfo();
  173. struct addrinfo ai;
  174. memset(&ai, 0, sizeof(struct addrinfo));
  175. ai.ai_socktype= SOCK_STREAM;
  176. ai.ai_protocol= IPPROTO_TCP;
  177. int ret= getaddrinfo(_host.c_str(), _service.c_str(), &ai, &_addrinfo);
  178. if (ret)
  179. {
  180. std::stringstream message;
  181. message << "Failed to connect on " << _host.c_str() << ":" << _service.c_str() << " with " << gai_strerror(ret);
  182. _last_error= message.str();
  183. return false;
  184. }
  185. }
  186. _addrinfo_next= _addrinfo;
  187. state= CONNECT;
  188. break;
  189. case NEXT_CONNECT_ADDRINFO:
  190. if (_addrinfo_next->ai_next == NULL)
  191. {
  192. std::stringstream message;
  193. message << "Error connecting to " << _host.c_str() << "." << std::endl;
  194. _last_error= message.str();
  195. return false;
  196. }
  197. _addrinfo_next= _addrinfo_next->ai_next;
  198. /* fall-thru */
  199. case CONNECT:
  200. close_socket();
  201. _sockfd= socket(_addrinfo_next->ai_family,
  202. _addrinfo_next->ai_socktype,
  203. _addrinfo_next->ai_protocol);
  204. if (_sockfd == INVALID_SOCKET)
  205. {
  206. perror("socket");
  207. continue;
  208. }
  209. if (connect(_sockfd, _addrinfo_next->ai_addr, _addrinfo_next->ai_addrlen) < 0)
  210. {
  211. switch(errno)
  212. {
  213. case EAGAIN:
  214. case EINTR:
  215. state= CONNECT;
  216. break;
  217. case EINPROGRESS:
  218. state= CONNECTING;
  219. break;
  220. case ECONNREFUSED:
  221. case ENETUNREACH:
  222. case ETIMEDOUT:
  223. default:
  224. state= NEXT_CONNECT_ADDRINFO;
  225. break;
  226. }
  227. }
  228. else
  229. {
  230. state= CONNECTING;
  231. }
  232. break;
  233. case CONNECTING:
  234. // Add logic for poll() for nonblocking.
  235. state= CONNECTED;
  236. break;
  237. case CONNECTED:
  238. case WRITING:
  239. {
  240. size_t packet_length= operation->size();
  241. const char *packet= operation->ptr();
  242. #if defined(HAVE_SSL) && HAVE_SSL
  243. if (_ctx_ssl and not _ssl)
  244. {
  245. _ssl= SSL_new(_ctx_ssl);
  246. if (_ssl == NULL)
  247. {
  248. _last_error= "SSL_new() failed";
  249. return false;
  250. }
  251. int ssl_error;
  252. if ((ssl_error= SSL_set_fd(_ssl, _sockfd)) != SSL_SUCCESS)
  253. {
  254. _last_error= "SSL_set_fd() failed";
  255. return false;
  256. }
  257. SSL_set_connect_state(_ssl);
  258. }
  259. #endif
  260. while(packet_length)
  261. {
  262. ssize_t write_size;
  263. #if defined(HAVE_SSL) && HAVE_SSL
  264. if (_ssl)
  265. {
  266. write_size= SSL_write(_ssl, (const void*)packet, int(packet_length));
  267. int ssl_error;
  268. switch ((ssl_error= SSL_get_error(_ssl, int(write_size))))
  269. {
  270. case SSL_ERROR_NONE:
  271. break;
  272. case SSL_ERROR_ZERO_RETURN:
  273. errno= ECONNRESET;
  274. write_size= SOCKET_ERROR;
  275. break;
  276. case SSL_ERROR_WANT_ACCEPT:
  277. case SSL_ERROR_WANT_CONNECT:
  278. case SSL_ERROR_WANT_READ:
  279. case SSL_ERROR_WANT_WRITE:
  280. case SSL_ERROR_WANT_X509_LOOKUP:
  281. errno= EAGAIN;
  282. write_size= SOCKET_ERROR;
  283. break;
  284. case SSL_ERROR_SYSCALL:
  285. {
  286. if (errno)
  287. {
  288. write_size= SOCKET_ERROR;
  289. break;
  290. }
  291. }
  292. /* fall-thru */
  293. case SSL_ERROR_SSL:
  294. default:
  295. {
  296. char ssl_error_buffer[SSL_ERROR_SIZE]= { 0 };
  297. ERR_error_string_n(ssl_error, ssl_error_buffer, sizeof(ssl_error_buffer));
  298. _last_error= ssl_error_buffer;
  299. errno= ECONNRESET;
  300. write_size= SOCKET_ERROR;
  301. break;
  302. }
  303. }
  304. }
  305. else
  306. #endif
  307. {
  308. write_size= send(_sockfd, packet, packet_length, 0);
  309. }
  310. if (write_size == SOCKET_ERROR)
  311. {
  312. if (_last_error.empty())
  313. {
  314. std::stringstream msg;
  315. msg << "Failed during send(" << strerror(errno) << ")";
  316. _last_error= msg.str();
  317. }
  318. return false;
  319. }
  320. packet_length-= static_cast<size_t>(write_size);
  321. packet+= static_cast<size_t>(write_size);
  322. }
  323. }
  324. state= READING;
  325. break;
  326. case READING:
  327. if (operation->has_response())
  328. {
  329. ssize_t read_size;
  330. do
  331. {
  332. char buffer[BUFSIZ];
  333. #if defined(HAVE_SSL) && HAVE_SSL
  334. if (_ssl)
  335. {
  336. {
  337. read_size= SSL_read(_ssl, (void *)buffer, sizeof(buffer));
  338. int ssl_error;
  339. switch ((ssl_error= SSL_get_error(_ssl, int(read_size))))
  340. {
  341. case SSL_ERROR_NONE:
  342. break;
  343. case SSL_ERROR_ZERO_RETURN:
  344. read_size= 0;
  345. break;
  346. case SSL_ERROR_WANT_READ:
  347. case SSL_ERROR_WANT_WRITE:
  348. case SSL_ERROR_WANT_ACCEPT:
  349. case SSL_ERROR_WANT_CONNECT:
  350. case SSL_ERROR_WANT_X509_LOOKUP:
  351. read_size= SOCKET_ERROR;
  352. errno= EAGAIN;
  353. break;
  354. case SSL_ERROR_SYSCALL:
  355. {
  356. if (errno)
  357. {
  358. std::stringstream msg;
  359. msg << "Error occurred on SSL_acceptsend(" << strerror(errno) << ")";
  360. _last_error= msg.str();
  361. read_size= SOCKET_ERROR;
  362. break;
  363. }
  364. }
  365. /* fall-thru */
  366. case SSL_ERROR_SSL:
  367. default:
  368. {
  369. char ssl_error_buffer[SSL_ERROR_SIZE]= { 0 };
  370. ERR_error_string_n(ssl_error, ssl_error_buffer, sizeof(ssl_error_buffer));
  371. _last_error= ssl_error_buffer;
  372. read_size= SOCKET_ERROR;
  373. break;
  374. }
  375. }
  376. }
  377. }
  378. else
  379. #endif
  380. {
  381. read_size= ::recv(_sockfd, buffer, sizeof(buffer), 0);
  382. }
  383. if (read_size == 0)
  384. {
  385. _last_error.clear();
  386. _last_error+= "Socket was shutdown while reading from ";
  387. _last_error+= _host;
  388. return false;
  389. }
  390. else if (read_size == SOCKET_ERROR)
  391. {
  392. if (_last_error.empty())
  393. {
  394. _last_error.clear();
  395. _last_error+= "Error occurred while reading data from ";
  396. _last_error+= _host;
  397. }
  398. return false;
  399. }
  400. operation->push(buffer, static_cast<size_t>(read_size));
  401. } while (more_to_read());
  402. } // end has_response
  403. state= FINISHED;
  404. break;
  405. case FINISHED:
  406. std::string response;
  407. bool success= operation->response(response);
  408. if (_finish_fn)
  409. {
  410. if (not _finish_fn->call(success, response))
  411. {
  412. // Error was sent from _finish_fn
  413. return false;
  414. }
  415. }
  416. if (operation->reconnect())
  417. {
  418. }
  419. _operations.pop_back();
  420. delete operation;
  421. state= CONNECTED;
  422. break;
  423. } // end switch
  424. }
  425. return true;
  426. } // end run()
  427. bool Instance::more_to_read() const
  428. {
  429. struct pollfd fds;
  430. fds.fd= _sockfd;
  431. fds.events = POLLIN;
  432. if (poll(&fds, 1, 5) < 1) // Default timeout is 5
  433. {
  434. return false;
  435. }
  436. return true;
  437. }
  438. void Instance::close_socket()
  439. {
  440. if (_sockfd != INVALID_SOCKET)
  441. {
  442. /* in case of death shutdown to avoid blocking at close() */
  443. if (shutdown(_sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
  444. {
  445. perror("shutdown");
  446. }
  447. else if (closesocket(_sockfd) == SOCKET_ERROR)
  448. {
  449. perror("close");
  450. }
  451. _sockfd= INVALID_SOCKET;
  452. }
  453. }
  454. void Instance::free_addrinfo()
  455. {
  456. if (_addrinfo)
  457. {
  458. freeaddrinfo(_addrinfo);
  459. _addrinfo= NULL;
  460. _addrinfo_next= NULL;
  461. }
  462. }
  463. } /* namespace util */
  464. } /* namespace datadifferential */