instance.cc 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * DataDifferential Utility Library
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * All rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are
  10. * met:
  11. *
  12. * * Redistributions of source code must retain the above copyright
  13. * notice, this list of conditions and the following disclaimer.
  14. *
  15. * * Redistributions in binary form must reproduce the above
  16. * copyright notice, this list of conditions and the following disclaimer
  17. * in the documentation and/or other materials provided with the
  18. * distribution.
  19. *
  20. * * The names of its contributors may not be used to endorse or
  21. * promote products derived from this software without specific prior
  22. * written permission.
  23. *
  24. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  25. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  26. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  27. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  28. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  29. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  30. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  31. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  32. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  33. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  34. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  35. *
  36. */
  37. #include <config.h>
  38. #include "util/instance.hpp"
  39. #include <cstdio>
  40. #include <sstream>
  41. #include <iostream>
  42. #include <netdb.h>
  43. #include <poll.h>
  44. #include <sys/socket.h>
  45. #include <sys/types.h>
  46. #include <netinet/in.h>
  47. namespace datadifferential {
  48. namespace util {
  49. Instance::Instance(const std::string& hostname_arg, const std::string& service_arg) :
  50. _host(hostname_arg),
  51. _service(service_arg),
  52. _sockfd(INVALID_SOCKET),
  53. state(NOT_WRITING),
  54. _addrinfo(0),
  55. _addrinfo_next(0),
  56. _finish_fn(NULL),
  57. _operations()
  58. {
  59. }
  60. Instance::Instance(const std::string& hostname_arg, const in_port_t port_arg) :
  61. _host(hostname_arg),
  62. _sockfd(INVALID_SOCKET),
  63. state(NOT_WRITING),
  64. _addrinfo(0),
  65. _addrinfo_next(0),
  66. _finish_fn(NULL),
  67. _operations()
  68. {
  69. char tmp[BUFSIZ];
  70. snprintf(tmp, sizeof(tmp), "%u", static_cast<unsigned int>(port_arg));
  71. _service= tmp;
  72. }
  73. Instance::~Instance()
  74. {
  75. close_socket();
  76. free_addrinfo();
  77. for (Operation::vector::iterator iter= _operations.begin(); iter != _operations.end(); ++iter)
  78. {
  79. delete *iter;
  80. }
  81. _operations.clear();
  82. delete _finish_fn;
  83. }
  84. bool Instance::run()
  85. {
  86. while (not _operations.empty())
  87. {
  88. Operation::vector::value_type operation= _operations.back();
  89. switch (state)
  90. {
  91. case NOT_WRITING:
  92. {
  93. free_addrinfo();
  94. struct addrinfo ai;
  95. memset(&ai, 0, sizeof(struct addrinfo));
  96. ai.ai_socktype= SOCK_STREAM;
  97. ai.ai_protocol= IPPROTO_TCP;
  98. int ret= getaddrinfo(_host.c_str(), _service.c_str(), &ai, &_addrinfo);
  99. if (ret)
  100. {
  101. std::stringstream message;
  102. message << "Failed to connect on " << _host.c_str() << ":" << _service.c_str() << " with " << gai_strerror(ret);
  103. _last_error= message.str();
  104. return false;
  105. }
  106. }
  107. _addrinfo_next= _addrinfo;
  108. state= CONNECT;
  109. break;
  110. case NEXT_CONNECT_ADDRINFO:
  111. if (_addrinfo_next->ai_next == NULL)
  112. {
  113. std::stringstream message;
  114. message << "Error connecting to " << _host.c_str() << "." << std::endl;
  115. _last_error= message.str();
  116. return false;
  117. }
  118. _addrinfo_next= _addrinfo_next->ai_next;
  119. case CONNECT:
  120. close_socket();
  121. _sockfd= socket(_addrinfo_next->ai_family,
  122. _addrinfo_next->ai_socktype,
  123. _addrinfo_next->ai_protocol);
  124. if (_sockfd == INVALID_SOCKET)
  125. {
  126. perror("socket");
  127. continue;
  128. }
  129. if (connect(_sockfd, _addrinfo_next->ai_addr, _addrinfo_next->ai_addrlen) < 0)
  130. {
  131. switch(errno)
  132. {
  133. case EAGAIN:
  134. case EINTR:
  135. state= CONNECT;
  136. break;
  137. case EINPROGRESS:
  138. state= CONNECTING;
  139. break;
  140. case ECONNREFUSED:
  141. case ENETUNREACH:
  142. case ETIMEDOUT:
  143. default:
  144. state= NEXT_CONNECT_ADDRINFO;
  145. break;
  146. }
  147. }
  148. else
  149. {
  150. state= CONNECTING;
  151. }
  152. break;
  153. case CONNECTING:
  154. // Add logic for poll() for nonblocking.
  155. state= CONNECTED;
  156. break;
  157. case CONNECTED:
  158. case WRITING:
  159. {
  160. size_t packet_length= operation->size();
  161. const char *packet= operation->ptr();
  162. while(packet_length)
  163. {
  164. ssize_t write_size= send(_sockfd, packet, packet_length, 0);
  165. if (write_size < 0)
  166. {
  167. switch(errno)
  168. {
  169. default:
  170. std::cerr << "Failed during send(" << strerror(errno) << ")" << std::endl;
  171. break;
  172. }
  173. }
  174. packet_length-= static_cast<size_t>(write_size);
  175. packet+= static_cast<size_t>(write_size);
  176. }
  177. }
  178. state= READING;
  179. break;
  180. case READING:
  181. if (operation->has_response())
  182. {
  183. size_t total_read;
  184. ssize_t read_length;
  185. do
  186. {
  187. char buffer[BUFSIZ];
  188. read_length= recv(_sockfd, buffer, sizeof(buffer), 0);
  189. if (read_length < 0)
  190. {
  191. switch(errno)
  192. {
  193. default:
  194. std::cerr << "Error occured while reading data from " << _host.c_str() << std::endl;
  195. return false;
  196. }
  197. }
  198. operation->push(buffer, static_cast<size_t>(read_length));
  199. total_read+= static_cast<size_t>(read_length);
  200. } while (more_to_read());
  201. } // end has_response
  202. state= FINISHED;
  203. break;
  204. case FINISHED:
  205. std::string response;
  206. bool success= operation->response(response);
  207. if (_finish_fn)
  208. {
  209. if (not _finish_fn->call(success, response))
  210. {
  211. // Error was sent from _finish_fn
  212. return false;
  213. }
  214. }
  215. if (operation->reconnect())
  216. {
  217. }
  218. _operations.pop_back();
  219. delete operation;
  220. state= CONNECTED;
  221. break;
  222. } // end switch
  223. }
  224. return true;
  225. } // end run()
  226. bool Instance::more_to_read() const
  227. {
  228. struct pollfd fds;
  229. fds.fd= _sockfd;
  230. fds.events = POLLIN;
  231. if (poll(&fds, 1, 5) < 1) // Default timeout is 5
  232. {
  233. return false;
  234. }
  235. return true;
  236. }
  237. void Instance::close_socket()
  238. {
  239. if (_sockfd == INVALID_SOCKET)
  240. return;
  241. /* in case of death shutdown to avoid blocking at close() */
  242. if (shutdown(_sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
  243. {
  244. perror("shutdown");
  245. }
  246. else if (closesocket(_sockfd) == SOCKET_ERROR)
  247. {
  248. perror("close");
  249. }
  250. _sockfd= INVALID_SOCKET;
  251. }
  252. void Instance::free_addrinfo()
  253. {
  254. if (not _addrinfo)
  255. return;
  256. freeaddrinfo(_addrinfo);
  257. _addrinfo= NULL;
  258. _addrinfo_next= NULL;
  259. }
  260. } /* namespace util */
  261. } /* namespace datadifferential */