udp.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. # -*- test-case-name: twisted.test.test_udp -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Various asynchronous UDP classes.
  6. Please do not use this module directly.
  7. @var _sockErrReadIgnore: list of symbolic error constants (from the C{errno}
  8. module) representing socket errors where the error is temporary and can be
  9. ignored.
  10. @var _sockErrReadRefuse: list of symbolic error constants (from the C{errno}
  11. module) representing socket errors that indicate connection refused.
  12. """
  13. from __future__ import division, absolute_import
  14. # System Imports
  15. import socket
  16. import operator
  17. import struct
  18. import warnings
  19. from zope.interface import implementer
  20. from twisted.python.runtime import platformType
  21. if platformType == 'win32':
  22. from errno import WSAEWOULDBLOCK
  23. from errno import WSAEINTR, WSAEMSGSIZE, WSAETIMEDOUT
  24. from errno import WSAECONNREFUSED, WSAECONNRESET, WSAENETRESET
  25. from errno import WSAEINPROGRESS
  26. from errno import WSAENOPROTOOPT as ENOPROTOOPT
  27. # Classify read and write errors
  28. _sockErrReadIgnore = [WSAEINTR, WSAEWOULDBLOCK, WSAEMSGSIZE, WSAEINPROGRESS]
  29. _sockErrReadRefuse = [WSAECONNREFUSED, WSAECONNRESET, WSAENETRESET,
  30. WSAETIMEDOUT]
  31. # POSIX-compatible write errors
  32. EMSGSIZE = WSAEMSGSIZE
  33. ECONNREFUSED = WSAECONNREFUSED
  34. EAGAIN = WSAEWOULDBLOCK
  35. EINTR = WSAEINTR
  36. else:
  37. from errno import EWOULDBLOCK, EINTR, EMSGSIZE, ECONNREFUSED, EAGAIN
  38. from errno import ENOPROTOOPT
  39. _sockErrReadIgnore = [EAGAIN, EINTR, EWOULDBLOCK]
  40. _sockErrReadRefuse = [ECONNREFUSED]
  41. # Twisted Imports
  42. from twisted.internet import base, defer, address
  43. from twisted.python import log, failure
  44. from twisted.python._oldstyle import _oldStyle
  45. from twisted.internet import abstract, error, interfaces
  46. @implementer(
  47. interfaces.IListeningPort, interfaces.IUDPTransport,
  48. interfaces.ISystemHandle)
  49. class Port(base.BasePort):
  50. """
  51. UDP port, listening for packets.
  52. @ivar maxThroughput: Maximum number of bytes read in one event
  53. loop iteration.
  54. @ivar addressFamily: L{socket.AF_INET} or L{socket.AF_INET6}, depending on
  55. whether this port is listening on an IPv4 address or an IPv6 address.
  56. @ivar _realPortNumber: Actual port number being listened on. The
  57. value will be L{None} until this L{Port} is listening.
  58. @ivar _preexistingSocket: If not L{None}, a L{socket.socket} instance which
  59. was created and initialized outside of the reactor and will be used to
  60. listen for connections (instead of a new socket being created by this
  61. L{Port}).
  62. """
  63. addressFamily = socket.AF_INET
  64. socketType = socket.SOCK_DGRAM
  65. maxThroughput = 256 * 1024
  66. _realPortNumber = None
  67. _preexistingSocket = None
  68. def __init__(self, port, proto, interface='', maxPacketSize=8192, reactor=None):
  69. """
  70. @param port: A port number on which to listen.
  71. @type port: L{int}
  72. @param proto: A C{DatagramProtocol} instance which will be
  73. connected to the given C{port}.
  74. @type proto: L{twisted.internet.protocol.DatagramProtocol}
  75. @param interface: The local IPv4 or IPv6 address to which to bind;
  76. defaults to '', ie all IPv4 addresses.
  77. @type interface: L{str}
  78. @param maxPacketSize: The maximum packet size to accept.
  79. @type maxPacketSize: L{int}
  80. @param reactor: A reactor which will notify this C{Port} when
  81. its socket is ready for reading or writing. Defaults to
  82. L{None}, ie the default global reactor.
  83. @type reactor: L{interfaces.IReactorFDSet}
  84. """
  85. base.BasePort.__init__(self, reactor)
  86. self.port = port
  87. self.protocol = proto
  88. self.maxPacketSize = maxPacketSize
  89. self.interface = interface
  90. self.setLogStr()
  91. self._connectedAddr = None
  92. self._setAddressFamily()
  93. @classmethod
  94. def _fromListeningDescriptor(cls, reactor, fd, addressFamily, protocol,
  95. maxPacketSize):
  96. """
  97. Create a new L{Port} based on an existing listening
  98. I{SOCK_DGRAM} socket.
  99. @param reactor: A reactor which will notify this L{Port} when
  100. its socket is ready for reading or writing. Defaults to
  101. L{None}, ie the default global reactor.
  102. @type reactor: L{interfaces.IReactorFDSet}
  103. @param fd: An integer file descriptor associated with a listening
  104. socket. The socket must be in non-blocking mode. Any additional
  105. attributes desired, such as I{FD_CLOEXEC}, must also be set already.
  106. @type fd: L{int}
  107. @param addressFamily: The address family (sometimes called I{domain}) of
  108. the existing socket. For example, L{socket.AF_INET}.
  109. @param addressFamily: L{int}
  110. @param protocol: A C{DatagramProtocol} instance which will be
  111. connected to the C{port}.
  112. @type proto: L{twisted.internet.protocol.DatagramProtocol}
  113. @param maxPacketSize: The maximum packet size to accept.
  114. @type maxPacketSize: L{int}
  115. @return: A new instance of C{cls} wrapping the socket given by C{fd}.
  116. @rtype: L{Port}
  117. """
  118. port = socket.fromfd(fd, addressFamily, cls.socketType)
  119. interface = port.getsockname()[0]
  120. self = cls(None, protocol, interface=interface, reactor=reactor,
  121. maxPacketSize=maxPacketSize)
  122. self._preexistingSocket = port
  123. return self
  124. def __repr__(self):
  125. if self._realPortNumber is not None:
  126. return "<%s on %s>" % (self.protocol.__class__, self._realPortNumber)
  127. else:
  128. return "<%s not connected>" % (self.protocol.__class__,)
  129. def getHandle(self):
  130. """
  131. Return a socket object.
  132. """
  133. return self.socket
  134. def startListening(self):
  135. """
  136. Create and bind my socket, and begin listening on it.
  137. This is called on unserialization, and must be called after creating a
  138. server to begin listening on the specified port.
  139. """
  140. self._bindSocket()
  141. self._connectToProtocol()
  142. def _bindSocket(self):
  143. """
  144. Prepare and assign a L{socket.socket} instance to
  145. C{self.socket}.
  146. Either creates a new SOCK_DGRAM L{socket.socket} bound to
  147. C{self.interface} and C{self.port} or takes an existing
  148. L{socket.socket} provided via the
  149. L{interfaces.IReactorSocket.adoptDatagramPort} interface.
  150. """
  151. if self._preexistingSocket is None:
  152. # Create a new socket and make it listen
  153. try:
  154. skt = self.createInternetSocket()
  155. skt.bind((self.interface, self.port))
  156. except socket.error as le:
  157. raise error.CannotListenError(self.interface, self.port, le)
  158. else:
  159. # Re-use the externally specified socket
  160. skt = self._preexistingSocket
  161. self._preexistingSocket = None
  162. # Make sure that if we listened on port 0, we update that to
  163. # reflect what the OS actually assigned us.
  164. self._realPortNumber = skt.getsockname()[1]
  165. log.msg("%s starting on %s" % (
  166. self._getLogPrefix(self.protocol), self._realPortNumber))
  167. self.connected = 1
  168. self.socket = skt
  169. self.fileno = self.socket.fileno
  170. def _connectToProtocol(self):
  171. self.protocol.makeConnection(self)
  172. self.startReading()
  173. def doRead(self):
  174. """
  175. Called when my socket is ready for reading.
  176. """
  177. read = 0
  178. while read < self.maxThroughput:
  179. try:
  180. data, addr = self.socket.recvfrom(self.maxPacketSize)
  181. except socket.error as se:
  182. no = se.args[0]
  183. if no in _sockErrReadIgnore:
  184. return
  185. if no in _sockErrReadRefuse:
  186. if self._connectedAddr:
  187. self.protocol.connectionRefused()
  188. return
  189. raise
  190. else:
  191. read += len(data)
  192. if self.addressFamily == socket.AF_INET6:
  193. # Remove the flow and scope ID from the address tuple,
  194. # reducing it to a tuple of just (host, port).
  195. #
  196. # TODO: This should be amended to return an object that can
  197. # unpack to (host, port) but also includes the flow info
  198. # and scope ID. See http://tm.tl/6826
  199. addr = addr[:2]
  200. try:
  201. self.protocol.datagramReceived(data, addr)
  202. except:
  203. log.err()
  204. def write(self, datagram, addr=None):
  205. """
  206. Write a datagram.
  207. @type datagram: L{bytes}
  208. @param datagram: The datagram to be sent.
  209. @type addr: L{tuple} containing L{str} as first element and L{int} as
  210. second element, or L{None}
  211. @param addr: A tuple of (I{stringified IPv4 or IPv6 address},
  212. I{integer port number}); can be L{None} in connected mode.
  213. """
  214. if self._connectedAddr:
  215. assert addr in (None, self._connectedAddr)
  216. try:
  217. return self.socket.send(datagram)
  218. except socket.error as se:
  219. no = se.args[0]
  220. if no == EINTR:
  221. return self.write(datagram)
  222. elif no == EMSGSIZE:
  223. raise error.MessageLengthError("message too long")
  224. elif no == ECONNREFUSED:
  225. self.protocol.connectionRefused()
  226. else:
  227. raise
  228. else:
  229. assert addr != None
  230. if (not abstract.isIPAddress(addr[0])
  231. and not abstract.isIPv6Address(addr[0])
  232. and addr[0] != "<broadcast>"):
  233. raise error.InvalidAddressError(
  234. addr[0],
  235. "write() only accepts IP addresses, not hostnames")
  236. if ((abstract.isIPAddress(addr[0]) or addr[0] == "<broadcast>")
  237. and self.addressFamily == socket.AF_INET6):
  238. raise error.InvalidAddressError(
  239. addr[0],
  240. "IPv6 port write() called with IPv4 or broadcast address")
  241. if (abstract.isIPv6Address(addr[0])
  242. and self.addressFamily == socket.AF_INET):
  243. raise error.InvalidAddressError(
  244. addr[0], "IPv4 port write() called with IPv6 address")
  245. try:
  246. return self.socket.sendto(datagram, addr)
  247. except socket.error as se:
  248. no = se.args[0]
  249. if no == EINTR:
  250. return self.write(datagram, addr)
  251. elif no == EMSGSIZE:
  252. raise error.MessageLengthError("message too long")
  253. elif no == ECONNREFUSED:
  254. # in non-connected UDP ECONNREFUSED is platform dependent, I
  255. # think and the info is not necessarily useful. Nevertheless
  256. # maybe we should call connectionRefused? XXX
  257. return
  258. else:
  259. raise
  260. def writeSequence(self, seq, addr):
  261. """
  262. Write a datagram constructed from an iterable of L{bytes}.
  263. @param seq: The data that will make up the complete datagram to be
  264. written.
  265. @type seq: an iterable of L{bytes}
  266. @type addr: L{tuple} containing L{str} as first element and L{int} as
  267. second element, or L{None}
  268. @param addr: A tuple of (I{stringified IPv4 or IPv6 address},
  269. I{integer port number}); can be L{None} in connected mode.
  270. """
  271. self.write(b"".join(seq), addr)
  272. def connect(self, host, port):
  273. """
  274. 'Connect' to remote server.
  275. """
  276. if self._connectedAddr:
  277. raise RuntimeError("already connected, reconnecting is not currently supported")
  278. if not abstract.isIPAddress(host) and not abstract.isIPv6Address(host):
  279. raise error.InvalidAddressError(
  280. host, 'not an IPv4 or IPv6 address.')
  281. self._connectedAddr = (host, port)
  282. self.socket.connect((host, port))
  283. def _loseConnection(self):
  284. self.stopReading()
  285. if self.connected: # actually means if we are *listening*
  286. self.reactor.callLater(0, self.connectionLost)
  287. def stopListening(self):
  288. if self.connected:
  289. result = self.d = defer.Deferred()
  290. else:
  291. result = None
  292. self._loseConnection()
  293. return result
  294. def loseConnection(self):
  295. warnings.warn("Please use stopListening() to disconnect port", DeprecationWarning, stacklevel=2)
  296. self.stopListening()
  297. def connectionLost(self, reason=None):
  298. """
  299. Cleans up my socket.
  300. """
  301. log.msg('(UDP Port %s Closed)' % self._realPortNumber)
  302. self._realPortNumber = None
  303. self.maxThroughput = -1
  304. base.BasePort.connectionLost(self, reason)
  305. self.protocol.doStop()
  306. self.socket.close()
  307. del self.socket
  308. del self.fileno
  309. if hasattr(self, "d"):
  310. self.d.callback(None)
  311. del self.d
  312. def setLogStr(self):
  313. """
  314. Initialize the C{logstr} attribute to be used by C{logPrefix}.
  315. """
  316. logPrefix = self._getLogPrefix(self.protocol)
  317. self.logstr = "%s (UDP)" % logPrefix
  318. def _setAddressFamily(self):
  319. """
  320. Resolve address family for the socket.
  321. """
  322. if abstract.isIPv6Address(self.interface):
  323. self.addressFamily = socket.AF_INET6
  324. elif abstract.isIPAddress(self.interface):
  325. self.addressFamily = socket.AF_INET
  326. elif self.interface:
  327. raise error.InvalidAddressError(
  328. self.interface, 'not an IPv4 or IPv6 address.')
  329. def logPrefix(self):
  330. """
  331. Return the prefix to log with.
  332. """
  333. return self.logstr
  334. def getHost(self):
  335. """
  336. Return the local address of the UDP connection
  337. @returns: the local address of the UDP connection
  338. @rtype: L{IPv4Address} or L{IPv6Address}
  339. """
  340. addr = self.socket.getsockname()
  341. if self.addressFamily == socket.AF_INET:
  342. return address.IPv4Address('UDP', *addr)
  343. elif self.addressFamily == socket.AF_INET6:
  344. return address.IPv6Address('UDP', *(addr[:2]))
  345. def setBroadcastAllowed(self, enabled):
  346. """
  347. Set whether this port may broadcast. This is disabled by default.
  348. @param enabled: Whether the port may broadcast.
  349. @type enabled: L{bool}
  350. """
  351. self.socket.setsockopt(
  352. socket.SOL_SOCKET, socket.SO_BROADCAST, enabled)
  353. def getBroadcastAllowed(self):
  354. """
  355. Checks if broadcast is currently allowed on this port.
  356. @return: Whether this port may broadcast.
  357. @rtype: L{bool}
  358. """
  359. return operator.truth(
  360. self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST))
  361. @_oldStyle
  362. class MulticastMixin:
  363. """
  364. Implement multicast functionality.
  365. """
  366. def getOutgoingInterface(self):
  367. i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF)
  368. return socket.inet_ntoa(struct.pack("@i", i))
  369. def setOutgoingInterface(self, addr):
  370. """Returns Deferred of success."""
  371. return self.reactor.resolve(addr).addCallback(self._setInterface)
  372. def _setInterface(self, addr):
  373. i = socket.inet_aton(addr)
  374. self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i)
  375. return 1
  376. def getLoopbackMode(self):
  377. return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP)
  378. def setLoopbackMode(self, mode):
  379. mode = struct.pack("b", operator.truth(mode))
  380. self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode)
  381. def getTTL(self):
  382. return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL)
  383. def setTTL(self, ttl):
  384. ttl = struct.pack("B", ttl)
  385. self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
  386. def joinGroup(self, addr, interface=""):
  387. """Join a multicast group. Returns Deferred of success."""
  388. return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 1)
  389. def _joinAddr1(self, addr, interface, join):
  390. return self.reactor.resolve(interface).addCallback(self._joinAddr2, addr, join)
  391. def _joinAddr2(self, interface, addr, join):
  392. addr = socket.inet_aton(addr)
  393. interface = socket.inet_aton(interface)
  394. if join:
  395. cmd = socket.IP_ADD_MEMBERSHIP
  396. else:
  397. cmd = socket.IP_DROP_MEMBERSHIP
  398. try:
  399. self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
  400. except socket.error as e:
  401. return failure.Failure(error.MulticastJoinError(addr, interface, *e.args))
  402. def leaveGroup(self, addr, interface=""):
  403. """Leave multicast group, return Deferred of success."""
  404. return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 0)
  405. @implementer(interfaces.IMulticastTransport)
  406. class MulticastPort(MulticastMixin, Port):
  407. """
  408. UDP Port that supports multicasting.
  409. """
  410. def __init__(self, port, proto, interface='', maxPacketSize=8192,
  411. reactor=None, listenMultiple=False):
  412. """
  413. @see: L{twisted.internet.interfaces.IReactorMulticast.listenMulticast}
  414. """
  415. Port.__init__(self, port, proto, interface, maxPacketSize, reactor)
  416. self.listenMultiple = listenMultiple
  417. def createInternetSocket(self):
  418. skt = Port.createInternetSocket(self)
  419. if self.listenMultiple:
  420. skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  421. if hasattr(socket, "SO_REUSEPORT"):
  422. try:
  423. skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
  424. except socket.error as le:
  425. # RHEL6 defines SO_REUSEPORT but it doesn't work
  426. if le.errno == ENOPROTOOPT:
  427. pass
  428. else:
  429. raise
  430. return skt