connection.py 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177
  1. #
  2. # A higher level module for using sockets (or Windows named pipes)
  3. #
  4. # multiprocessing/connection.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
  10. import errno
  11. import io
  12. import os
  13. import sys
  14. import socket
  15. import struct
  16. import time
  17. import tempfile
  18. import itertools
  19. import _multiprocessing
  20. from . import util
  21. from . import AuthenticationError, BufferTooShort
  22. from .context import reduction
  23. _ForkingPickler = reduction.ForkingPickler
  24. try:
  25. import _winapi
  26. from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
  27. except ImportError:
  28. if sys.platform == 'win32':
  29. raise
  30. _winapi = None
  31. #
  32. #
  33. #
  34. BUFSIZE = 8192
  35. # A very generous timeout when it comes to local connections...
  36. CONNECTION_TIMEOUT = 20.
  37. _mmap_counter = itertools.count()
  38. default_family = 'AF_INET'
  39. families = ['AF_INET']
  40. if hasattr(socket, 'AF_UNIX'):
  41. default_family = 'AF_UNIX'
  42. families += ['AF_UNIX']
  43. if sys.platform == 'win32':
  44. default_family = 'AF_PIPE'
  45. families += ['AF_PIPE']
  46. def _init_timeout(timeout=CONNECTION_TIMEOUT):
  47. return time.monotonic() + timeout
  48. def _check_timeout(t):
  49. return time.monotonic() > t
  50. #
  51. #
  52. #
  53. def arbitrary_address(family):
  54. '''
  55. Return an arbitrary free address for the given family
  56. '''
  57. if family == 'AF_INET':
  58. return ('localhost', 0)
  59. elif family == 'AF_UNIX':
  60. return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
  61. elif family == 'AF_PIPE':
  62. return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
  63. (os.getpid(), next(_mmap_counter)), dir="")
  64. else:
  65. raise ValueError('unrecognized family')
  66. def _validate_family(family):
  67. '''
  68. Checks if the family is valid for the current environment.
  69. '''
  70. if sys.platform != 'win32' and family == 'AF_PIPE':
  71. raise ValueError('Family %s is not recognized.' % family)
  72. if sys.platform == 'win32' and family == 'AF_UNIX':
  73. # double check
  74. if not hasattr(socket, family):
  75. raise ValueError('Family %s is not recognized.' % family)
  76. def address_type(address):
  77. '''
  78. Return the types of the address
  79. This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
  80. '''
  81. if type(address) == tuple:
  82. return 'AF_INET'
  83. elif type(address) is str and address.startswith('\\\\'):
  84. return 'AF_PIPE'
  85. elif type(address) is str or util.is_abstract_socket_namespace(address):
  86. return 'AF_UNIX'
  87. else:
  88. raise ValueError('address type of %r unrecognized' % address)
  89. #
  90. # Connection classes
  91. #
  92. class _ConnectionBase:
  93. _handle = None
  94. def __init__(self, handle, readable=True, writable=True):
  95. handle = handle.__index__()
  96. if handle < 0:
  97. raise ValueError("invalid handle")
  98. if not readable and not writable:
  99. raise ValueError(
  100. "at least one of `readable` and `writable` must be True")
  101. self._handle = handle
  102. self._readable = readable
  103. self._writable = writable
  104. # XXX should we use util.Finalize instead of a __del__?
  105. def __del__(self):
  106. if self._handle is not None:
  107. self._close()
  108. def _check_closed(self):
  109. if self._handle is None:
  110. raise OSError("handle is closed")
  111. def _check_readable(self):
  112. if not self._readable:
  113. raise OSError("connection is write-only")
  114. def _check_writable(self):
  115. if not self._writable:
  116. raise OSError("connection is read-only")
  117. def _bad_message_length(self):
  118. if self._writable:
  119. self._readable = False
  120. else:
  121. self.close()
  122. raise OSError("bad message length")
  123. @property
  124. def closed(self):
  125. """True if the connection is closed"""
  126. return self._handle is None
  127. @property
  128. def readable(self):
  129. """True if the connection is readable"""
  130. return self._readable
  131. @property
  132. def writable(self):
  133. """True if the connection is writable"""
  134. return self._writable
  135. def fileno(self):
  136. """File descriptor or handle of the connection"""
  137. self._check_closed()
  138. return self._handle
  139. def close(self):
  140. """Close the connection"""
  141. if self._handle is not None:
  142. try:
  143. self._close()
  144. finally:
  145. self._handle = None
  146. def send_bytes(self, buf, offset=0, size=None):
  147. """Send the bytes data from a bytes-like object"""
  148. self._check_closed()
  149. self._check_writable()
  150. m = memoryview(buf)
  151. if m.itemsize > 1:
  152. m = m.cast('B')
  153. n = m.nbytes
  154. if offset < 0:
  155. raise ValueError("offset is negative")
  156. if n < offset:
  157. raise ValueError("buffer length < offset")
  158. if size is None:
  159. size = n - offset
  160. elif size < 0:
  161. raise ValueError("size is negative")
  162. elif offset + size > n:
  163. raise ValueError("buffer length < offset + size")
  164. self._send_bytes(m[offset:offset + size])
  165. def send(self, obj):
  166. """Send a (picklable) object"""
  167. self._check_closed()
  168. self._check_writable()
  169. self._send_bytes(_ForkingPickler.dumps(obj))
  170. def recv_bytes(self, maxlength=None):
  171. """
  172. Receive bytes data as a bytes object.
  173. """
  174. self._check_closed()
  175. self._check_readable()
  176. if maxlength is not None and maxlength < 0:
  177. raise ValueError("negative maxlength")
  178. buf = self._recv_bytes(maxlength)
  179. if buf is None:
  180. self._bad_message_length()
  181. return buf.getvalue()
  182. def recv_bytes_into(self, buf, offset=0):
  183. """
  184. Receive bytes data into a writeable bytes-like object.
  185. Return the number of bytes read.
  186. """
  187. self._check_closed()
  188. self._check_readable()
  189. with memoryview(buf) as m:
  190. # Get bytesize of arbitrary buffer
  191. itemsize = m.itemsize
  192. bytesize = itemsize * len(m)
  193. if offset < 0:
  194. raise ValueError("negative offset")
  195. elif offset > bytesize:
  196. raise ValueError("offset too large")
  197. result = self._recv_bytes()
  198. size = result.tell()
  199. if bytesize < offset + size:
  200. raise BufferTooShort(result.getvalue())
  201. # Message can fit in dest
  202. result.seek(0)
  203. result.readinto(m[offset // itemsize :
  204. (offset + size) // itemsize])
  205. return size
  206. def recv(self):
  207. """Receive a (picklable) object"""
  208. self._check_closed()
  209. self._check_readable()
  210. buf = self._recv_bytes()
  211. return _ForkingPickler.loads(buf.getbuffer())
  212. def poll(self, timeout=0.0):
  213. """Whether there is any input available to be read"""
  214. self._check_closed()
  215. self._check_readable()
  216. return self._poll(timeout)
  217. def __enter__(self):
  218. return self
  219. def __exit__(self, exc_type, exc_value, exc_tb):
  220. self.close()
  221. if _winapi:
  222. class PipeConnection(_ConnectionBase):
  223. """
  224. Connection class based on a Windows named pipe.
  225. Overlapped I/O is used, so the handles must have been created
  226. with FILE_FLAG_OVERLAPPED.
  227. """
  228. _got_empty_message = False
  229. _send_ov = None
  230. def _close(self, _CloseHandle=_winapi.CloseHandle):
  231. ov = self._send_ov
  232. if ov is not None:
  233. # Interrupt WaitForMultipleObjects() in _send_bytes()
  234. ov.cancel()
  235. _CloseHandle(self._handle)
  236. def _send_bytes(self, buf):
  237. if self._send_ov is not None:
  238. # A connection should only be used by a single thread
  239. raise ValueError("concurrent send_bytes() calls "
  240. "are not supported")
  241. ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
  242. self._send_ov = ov
  243. try:
  244. if err == _winapi.ERROR_IO_PENDING:
  245. waitres = _winapi.WaitForMultipleObjects(
  246. [ov.event], False, INFINITE)
  247. assert waitres == WAIT_OBJECT_0
  248. except:
  249. ov.cancel()
  250. raise
  251. finally:
  252. self._send_ov = None
  253. nwritten, err = ov.GetOverlappedResult(True)
  254. if err == _winapi.ERROR_OPERATION_ABORTED:
  255. # close() was called by another thread while
  256. # WaitForMultipleObjects() was waiting for the overlapped
  257. # operation.
  258. raise OSError(errno.EPIPE, "handle is closed")
  259. assert err == 0
  260. assert nwritten == len(buf)
  261. def _recv_bytes(self, maxsize=None):
  262. if self._got_empty_message:
  263. self._got_empty_message = False
  264. return io.BytesIO()
  265. else:
  266. bsize = 128 if maxsize is None else min(maxsize, 128)
  267. try:
  268. ov, err = _winapi.ReadFile(self._handle, bsize,
  269. overlapped=True)
  270. try:
  271. if err == _winapi.ERROR_IO_PENDING:
  272. waitres = _winapi.WaitForMultipleObjects(
  273. [ov.event], False, INFINITE)
  274. assert waitres == WAIT_OBJECT_0
  275. except:
  276. ov.cancel()
  277. raise
  278. finally:
  279. nread, err = ov.GetOverlappedResult(True)
  280. if err == 0:
  281. f = io.BytesIO()
  282. f.write(ov.getbuffer())
  283. return f
  284. elif err == _winapi.ERROR_MORE_DATA:
  285. return self._get_more_data(ov, maxsize)
  286. except OSError as e:
  287. if e.winerror == _winapi.ERROR_BROKEN_PIPE:
  288. raise EOFError
  289. else:
  290. raise
  291. raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
  292. def _poll(self, timeout):
  293. if (self._got_empty_message or
  294. _winapi.PeekNamedPipe(self._handle)[0] != 0):
  295. return True
  296. return bool(wait([self], timeout))
  297. def _get_more_data(self, ov, maxsize):
  298. buf = ov.getbuffer()
  299. f = io.BytesIO()
  300. f.write(buf)
  301. left = _winapi.PeekNamedPipe(self._handle)[1]
  302. assert left > 0
  303. if maxsize is not None and len(buf) + left > maxsize:
  304. self._bad_message_length()
  305. ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
  306. rbytes, err = ov.GetOverlappedResult(True)
  307. assert err == 0
  308. assert rbytes == left
  309. f.write(ov.getbuffer())
  310. return f
  311. class Connection(_ConnectionBase):
  312. """
  313. Connection class based on an arbitrary file descriptor (Unix only), or
  314. a socket handle (Windows).
  315. """
  316. if _winapi:
  317. def _close(self, _close=_multiprocessing.closesocket):
  318. _close(self._handle)
  319. _write = _multiprocessing.send
  320. _read = _multiprocessing.recv
  321. else:
  322. def _close(self, _close=os.close):
  323. _close(self._handle)
  324. _write = os.write
  325. _read = os.read
  326. def _send(self, buf, write=_write):
  327. remaining = len(buf)
  328. while True:
  329. n = write(self._handle, buf)
  330. remaining -= n
  331. if remaining == 0:
  332. break
  333. buf = buf[n:]
  334. def _recv(self, size, read=_read):
  335. buf = io.BytesIO()
  336. handle = self._handle
  337. remaining = size
  338. while remaining > 0:
  339. chunk = read(handle, remaining)
  340. n = len(chunk)
  341. if n == 0:
  342. if remaining == size:
  343. raise EOFError
  344. else:
  345. raise OSError("got end of file during message")
  346. buf.write(chunk)
  347. remaining -= n
  348. return buf
  349. def _send_bytes(self, buf):
  350. n = len(buf)
  351. if n > 0x7fffffff:
  352. pre_header = struct.pack("!i", -1)
  353. header = struct.pack("!Q", n)
  354. self._send(pre_header)
  355. self._send(header)
  356. self._send(buf)
  357. else:
  358. # For wire compatibility with 3.7 and lower
  359. header = struct.pack("!i", n)
  360. if n > 16384:
  361. # The payload is large so Nagle's algorithm won't be triggered
  362. # and we'd better avoid the cost of concatenation.
  363. self._send(header)
  364. self._send(buf)
  365. else:
  366. # Issue #20540: concatenate before sending, to avoid delays due
  367. # to Nagle's algorithm on a TCP socket.
  368. # Also note we want to avoid sending a 0-length buffer separately,
  369. # to avoid "broken pipe" errors if the other end closed the pipe.
  370. self._send(header + buf)
  371. def _recv_bytes(self, maxsize=None):
  372. buf = self._recv(4)
  373. size, = struct.unpack("!i", buf.getvalue())
  374. if size == -1:
  375. buf = self._recv(8)
  376. size, = struct.unpack("!Q", buf.getvalue())
  377. if maxsize is not None and size > maxsize:
  378. return None
  379. return self._recv(size)
  380. def _poll(self, timeout):
  381. r = wait([self], timeout)
  382. return bool(r)
  383. #
  384. # Public functions
  385. #
  386. class Listener(object):
  387. '''
  388. Returns a listener object.
  389. This is a wrapper for a bound socket which is 'listening' for
  390. connections, or for a Windows named pipe.
  391. '''
  392. def __init__(self, address=None, family=None, backlog=1, authkey=None):
  393. family = family or (address and address_type(address)) \
  394. or default_family
  395. address = address or arbitrary_address(family)
  396. _validate_family(family)
  397. if family == 'AF_PIPE':
  398. self._listener = PipeListener(address, backlog)
  399. else:
  400. self._listener = SocketListener(address, family, backlog)
  401. if authkey is not None and not isinstance(authkey, bytes):
  402. raise TypeError('authkey should be a byte string')
  403. self._authkey = authkey
  404. def accept(self):
  405. '''
  406. Accept a connection on the bound socket or named pipe of `self`.
  407. Returns a `Connection` object.
  408. '''
  409. if self._listener is None:
  410. raise OSError('listener is closed')
  411. c = self._listener.accept()
  412. if self._authkey:
  413. deliver_challenge(c, self._authkey)
  414. answer_challenge(c, self._authkey)
  415. return c
  416. def close(self):
  417. '''
  418. Close the bound socket or named pipe of `self`.
  419. '''
  420. listener = self._listener
  421. if listener is not None:
  422. self._listener = None
  423. listener.close()
  424. @property
  425. def address(self):
  426. return self._listener._address
  427. @property
  428. def last_accepted(self):
  429. return self._listener._last_accepted
  430. def __enter__(self):
  431. return self
  432. def __exit__(self, exc_type, exc_value, exc_tb):
  433. self.close()
  434. def Client(address, family=None, authkey=None):
  435. '''
  436. Returns a connection to the address of a `Listener`
  437. '''
  438. family = family or address_type(address)
  439. _validate_family(family)
  440. if family == 'AF_PIPE':
  441. c = PipeClient(address)
  442. else:
  443. c = SocketClient(address)
  444. if authkey is not None and not isinstance(authkey, bytes):
  445. raise TypeError('authkey should be a byte string')
  446. if authkey is not None:
  447. answer_challenge(c, authkey)
  448. deliver_challenge(c, authkey)
  449. return c
  450. if sys.platform != 'win32':
  451. def Pipe(duplex=True):
  452. '''
  453. Returns pair of connection objects at either end of a pipe
  454. '''
  455. if duplex:
  456. s1, s2 = socket.socketpair()
  457. s1.setblocking(True)
  458. s2.setblocking(True)
  459. c1 = Connection(s1.detach())
  460. c2 = Connection(s2.detach())
  461. else:
  462. fd1, fd2 = os.pipe()
  463. c1 = Connection(fd1, writable=False)
  464. c2 = Connection(fd2, readable=False)
  465. return c1, c2
  466. else:
  467. def Pipe(duplex=True):
  468. '''
  469. Returns pair of connection objects at either end of a pipe
  470. '''
  471. address = arbitrary_address('AF_PIPE')
  472. if duplex:
  473. openmode = _winapi.PIPE_ACCESS_DUPLEX
  474. access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
  475. obsize, ibsize = BUFSIZE, BUFSIZE
  476. else:
  477. openmode = _winapi.PIPE_ACCESS_INBOUND
  478. access = _winapi.GENERIC_WRITE
  479. obsize, ibsize = 0, BUFSIZE
  480. h1 = _winapi.CreateNamedPipe(
  481. address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
  482. _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
  483. _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
  484. _winapi.PIPE_WAIT,
  485. 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
  486. # default security descriptor: the handle cannot be inherited
  487. _winapi.NULL
  488. )
  489. h2 = _winapi.CreateFile(
  490. address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
  491. _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
  492. )
  493. _winapi.SetNamedPipeHandleState(
  494. h2, _winapi.PIPE_READMODE_MESSAGE, None, None
  495. )
  496. overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
  497. _, err = overlapped.GetOverlappedResult(True)
  498. assert err == 0
  499. c1 = PipeConnection(h1, writable=duplex)
  500. c2 = PipeConnection(h2, readable=duplex)
  501. return c1, c2
  502. #
  503. # Definitions for connections based on sockets
  504. #
  505. class SocketListener(object):
  506. '''
  507. Representation of a socket which is bound to an address and listening
  508. '''
  509. def __init__(self, address, family, backlog=1):
  510. self._socket = socket.socket(getattr(socket, family))
  511. try:
  512. # SO_REUSEADDR has different semantics on Windows (issue #2550).
  513. if os.name == 'posix':
  514. self._socket.setsockopt(socket.SOL_SOCKET,
  515. socket.SO_REUSEADDR, 1)
  516. self._socket.setblocking(True)
  517. self._socket.bind(address)
  518. self._socket.listen(backlog)
  519. self._address = self._socket.getsockname()
  520. except OSError:
  521. self._socket.close()
  522. raise
  523. self._family = family
  524. self._last_accepted = None
  525. if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address):
  526. # Linux abstract socket namespaces do not need to be explicitly unlinked
  527. self._unlink = util.Finalize(
  528. self, os.unlink, args=(address,), exitpriority=0
  529. )
  530. else:
  531. self._unlink = None
  532. def accept(self):
  533. s, self._last_accepted = self._socket.accept()
  534. s.setblocking(True)
  535. return Connection(s.detach())
  536. def close(self):
  537. try:
  538. self._socket.close()
  539. finally:
  540. unlink = self._unlink
  541. if unlink is not None:
  542. self._unlink = None
  543. unlink()
  544. def SocketClient(address):
  545. '''
  546. Return a connection object connected to the socket given by `address`
  547. '''
  548. family = address_type(address)
  549. with socket.socket( getattr(socket, family) ) as s:
  550. s.setblocking(True)
  551. s.connect(address)
  552. return Connection(s.detach())
  553. #
  554. # Definitions for connections based on named pipes
  555. #
  556. if sys.platform == 'win32':
  557. class PipeListener(object):
  558. '''
  559. Representation of a named pipe
  560. '''
  561. def __init__(self, address, backlog=None):
  562. self._address = address
  563. self._handle_queue = [self._new_handle(first=True)]
  564. self._last_accepted = None
  565. util.sub_debug('listener created with address=%r', self._address)
  566. self.close = util.Finalize(
  567. self, PipeListener._finalize_pipe_listener,
  568. args=(self._handle_queue, self._address), exitpriority=0
  569. )
  570. def _new_handle(self, first=False):
  571. flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
  572. if first:
  573. flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
  574. return _winapi.CreateNamedPipe(
  575. self._address, flags,
  576. _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
  577. _winapi.PIPE_WAIT,
  578. _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
  579. _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
  580. )
  581. def accept(self):
  582. self._handle_queue.append(self._new_handle())
  583. handle = self._handle_queue.pop(0)
  584. try:
  585. ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
  586. except OSError as e:
  587. if e.winerror != _winapi.ERROR_NO_DATA:
  588. raise
  589. # ERROR_NO_DATA can occur if a client has already connected,
  590. # written data and then disconnected -- see Issue 14725.
  591. else:
  592. try:
  593. res = _winapi.WaitForMultipleObjects(
  594. [ov.event], False, INFINITE)
  595. except:
  596. ov.cancel()
  597. _winapi.CloseHandle(handle)
  598. raise
  599. finally:
  600. _, err = ov.GetOverlappedResult(True)
  601. assert err == 0
  602. return PipeConnection(handle)
  603. @staticmethod
  604. def _finalize_pipe_listener(queue, address):
  605. util.sub_debug('closing listener with address=%r', address)
  606. for handle in queue:
  607. _winapi.CloseHandle(handle)
  608. def PipeClient(address):
  609. '''
  610. Return a connection object connected to the pipe given by `address`
  611. '''
  612. t = _init_timeout()
  613. while 1:
  614. try:
  615. _winapi.WaitNamedPipe(address, 1000)
  616. h = _winapi.CreateFile(
  617. address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
  618. 0, _winapi.NULL, _winapi.OPEN_EXISTING,
  619. _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
  620. )
  621. except OSError as e:
  622. if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
  623. _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
  624. raise
  625. else:
  626. break
  627. else:
  628. raise
  629. _winapi.SetNamedPipeHandleState(
  630. h, _winapi.PIPE_READMODE_MESSAGE, None, None
  631. )
  632. return PipeConnection(h)
  633. #
  634. # Authentication stuff
  635. #
  636. MESSAGE_LENGTH = 40 # MUST be > 20
  637. _CHALLENGE = b'#CHALLENGE#'
  638. _WELCOME = b'#WELCOME#'
  639. _FAILURE = b'#FAILURE#'
  640. # multiprocessing.connection Authentication Handshake Protocol Description
  641. # (as documented for reference after reading the existing code)
  642. # =============================================================================
  643. #
  644. # On Windows: native pipes with "overlapped IO" are used to send the bytes,
  645. # instead of the length prefix SIZE scheme described below. (ie: the OS deals
  646. # with message sizes for us)
  647. #
  648. # Protocol error behaviors:
  649. #
  650. # On POSIX, any failure to receive the length prefix into SIZE, for SIZE greater
  651. # than the requested maxsize to receive, or receiving fewer than SIZE bytes
  652. # results in the connection being closed and auth to fail.
  653. #
  654. # On Windows, receiving too few bytes is never a low level _recv_bytes read
  655. # error, receiving too many will trigger an error only if receive maxsize
  656. # value was larger than 128 OR the if the data arrived in smaller pieces.
  657. #
  658. # Serving side Client side
  659. # ------------------------------ ---------------------------------------
  660. # 0. Open a connection on the pipe.
  661. # 1. Accept connection.
  662. # 2. Random 20+ bytes -> MESSAGE
  663. # Modern servers always send
  664. # more than 20 bytes and include
  665. # a {digest} prefix on it with
  666. # their preferred HMAC digest.
  667. # Legacy ones send ==20 bytes.
  668. # 3. send 4 byte length (net order)
  669. # prefix followed by:
  670. # b'#CHALLENGE#' + MESSAGE
  671. # 4. Receive 4 bytes, parse as network byte
  672. # order integer. If it is -1, receive an
  673. # additional 8 bytes, parse that as network
  674. # byte order. The result is the length of
  675. # the data that follows -> SIZE.
  676. # 5. Receive min(SIZE, 256) bytes -> M1
  677. # 6. Assert that M1 starts with:
  678. # b'#CHALLENGE#'
  679. # 7. Strip that prefix from M1 into -> M2
  680. # 7.1. Parse M2: if it is exactly 20 bytes in
  681. # length this indicates a legacy server
  682. # supporting only HMAC-MD5. Otherwise the
  683. # 7.2. preferred digest is looked up from an
  684. # expected "{digest}" prefix on M2. No prefix
  685. # or unsupported digest? <- AuthenticationError
  686. # 7.3. Put divined algorithm name in -> D_NAME
  687. # 8. Compute HMAC-D_NAME of AUTHKEY, M2 -> C_DIGEST
  688. # 9. Send 4 byte length prefix (net order)
  689. # followed by C_DIGEST bytes.
  690. # 10. Receive 4 or 4+8 byte length
  691. # prefix (#4 dance) -> SIZE.
  692. # 11. Receive min(SIZE, 256) -> C_D.
  693. # 11.1. Parse C_D: legacy servers
  694. # accept it as is, "md5" -> D_NAME
  695. # 11.2. modern servers check the length
  696. # of C_D, IF it is 16 bytes?
  697. # 11.2.1. "md5" -> D_NAME
  698. # and skip to step 12.
  699. # 11.3. longer? expect and parse a "{digest}"
  700. # prefix into -> D_NAME.
  701. # Strip the prefix and store remaining
  702. # bytes in -> C_D.
  703. # 11.4. Don't like D_NAME? <- AuthenticationError
  704. # 12. Compute HMAC-D_NAME of AUTHKEY,
  705. # MESSAGE into -> M_DIGEST.
  706. # 13. Compare M_DIGEST == C_D:
  707. # 14a: Match? Send length prefix &
  708. # b'#WELCOME#'
  709. # <- RETURN
  710. # 14b: Mismatch? Send len prefix &
  711. # b'#FAILURE#'
  712. # <- CLOSE & AuthenticationError
  713. # 15. Receive 4 or 4+8 byte length prefix (net
  714. # order) again as in #4 into -> SIZE.
  715. # 16. Receive min(SIZE, 256) bytes -> M3.
  716. # 17. Compare M3 == b'#WELCOME#':
  717. # 17a. Match? <- RETURN
  718. # 17b. Mismatch? <- CLOSE & AuthenticationError
  719. #
  720. # If this RETURNed, the connection remains open: it has been authenticated.
  721. #
  722. # Length prefixes are used consistently. Even on the legacy protocol, this
  723. # was good fortune and allowed us to evolve the protocol by using the length
  724. # of the opening challenge or length of the returned digest as a signal as
  725. # to which protocol the other end supports.
  726. _ALLOWED_DIGESTS = frozenset(
  727. {b'md5', b'sha256', b'sha384', b'sha3_256', b'sha3_384'})
  728. _MAX_DIGEST_LEN = max(len(_) for _ in _ALLOWED_DIGESTS)
  729. # Old hmac-md5 only server versions from Python <=3.11 sent a message of this
  730. # length. It happens to not match the length of any supported digest so we can
  731. # use a message of this length to indicate that we should work in backwards
  732. # compatible md5-only mode without a {digest_name} prefix on our response.
  733. _MD5ONLY_MESSAGE_LENGTH = 20
  734. _MD5_DIGEST_LEN = 16
  735. _LEGACY_LENGTHS = (_MD5ONLY_MESSAGE_LENGTH, _MD5_DIGEST_LEN)
  736. def _get_digest_name_and_payload(message: bytes) -> (str, bytes):
  737. """Returns a digest name and the payload for a response hash.
  738. If a legacy protocol is detected based on the message length
  739. or contents the digest name returned will be empty to indicate
  740. legacy mode where MD5 and no digest prefix should be sent.
  741. """
  742. # modern message format: b"{digest}payload" longer than 20 bytes
  743. # legacy message format: 16 or 20 byte b"payload"
  744. if len(message) in _LEGACY_LENGTHS:
  745. # Either this was a legacy server challenge, or we're processing
  746. # a reply from a legacy client that sent an unprefixed 16-byte
  747. # HMAC-MD5 response. All messages using the modern protocol will
  748. # be longer than either of these lengths.
  749. return '', message
  750. if (message.startswith(b'{') and
  751. (curly := message.find(b'}', 1, _MAX_DIGEST_LEN+2)) > 0):
  752. digest = message[1:curly]
  753. if digest in _ALLOWED_DIGESTS:
  754. payload = message[curly+1:]
  755. return digest.decode('ascii'), payload
  756. raise AuthenticationError(
  757. 'unsupported message length, missing digest prefix, '
  758. f'or unsupported digest: {message=}')
  759. def _create_response(authkey, message):
  760. """Create a MAC based on authkey and message
  761. The MAC algorithm defaults to HMAC-MD5, unless MD5 is not available or
  762. the message has a '{digest_name}' prefix. For legacy HMAC-MD5, the response
  763. is the raw MAC, otherwise the response is prefixed with '{digest_name}',
  764. e.g. b'{sha256}abcdefg...'
  765. Note: The MAC protects the entire message including the digest_name prefix.
  766. """
  767. import hmac
  768. digest_name = _get_digest_name_and_payload(message)[0]
  769. # The MAC protects the entire message: digest header and payload.
  770. if not digest_name:
  771. # Legacy server without a {digest} prefix on message.
  772. # Generate a legacy non-prefixed HMAC-MD5 reply.
  773. try:
  774. return hmac.new(authkey, message, 'md5').digest()
  775. except ValueError:
  776. # HMAC-MD5 is not available (FIPS mode?), fall back to
  777. # HMAC-SHA2-256 modern protocol. The legacy server probably
  778. # doesn't support it and will reject us anyways. :shrug:
  779. digest_name = 'sha256'
  780. # Modern protocol, indicate the digest used in the reply.
  781. response = hmac.new(authkey, message, digest_name).digest()
  782. return b'{%s}%s' % (digest_name.encode('ascii'), response)
  783. def _verify_challenge(authkey, message, response):
  784. """Verify MAC challenge
  785. If our message did not include a digest_name prefix, the client is allowed
  786. to select a stronger digest_name from _ALLOWED_DIGESTS.
  787. In case our message is prefixed, a client cannot downgrade to a weaker
  788. algorithm, because the MAC is calculated over the entire message
  789. including the '{digest_name}' prefix.
  790. """
  791. import hmac
  792. response_digest, response_mac = _get_digest_name_and_payload(response)
  793. response_digest = response_digest or 'md5'
  794. try:
  795. expected = hmac.new(authkey, message, response_digest).digest()
  796. except ValueError:
  797. raise AuthenticationError(f'{response_digest=} unsupported')
  798. if len(expected) != len(response_mac):
  799. raise AuthenticationError(
  800. f'expected {response_digest!r} of length {len(expected)} '
  801. f'got {len(response_mac)}')
  802. if not hmac.compare_digest(expected, response_mac):
  803. raise AuthenticationError('digest received was wrong')
  804. def deliver_challenge(connection, authkey: bytes, digest_name='sha256'):
  805. if not isinstance(authkey, bytes):
  806. raise ValueError(
  807. "Authkey must be bytes, not {0!s}".format(type(authkey)))
  808. assert MESSAGE_LENGTH > _MD5ONLY_MESSAGE_LENGTH, "protocol constraint"
  809. message = os.urandom(MESSAGE_LENGTH)
  810. message = b'{%s}%s' % (digest_name.encode('ascii'), message)
  811. # Even when sending a challenge to a legacy client that does not support
  812. # digest prefixes, they'll take the entire thing as a challenge and
  813. # respond to it with a raw HMAC-MD5.
  814. connection.send_bytes(_CHALLENGE + message)
  815. response = connection.recv_bytes(256) # reject large message
  816. try:
  817. _verify_challenge(authkey, message, response)
  818. except AuthenticationError:
  819. connection.send_bytes(_FAILURE)
  820. raise
  821. else:
  822. connection.send_bytes(_WELCOME)
  823. def answer_challenge(connection, authkey: bytes):
  824. if not isinstance(authkey, bytes):
  825. raise ValueError(
  826. "Authkey must be bytes, not {0!s}".format(type(authkey)))
  827. message = connection.recv_bytes(256) # reject large message
  828. if not message.startswith(_CHALLENGE):
  829. raise AuthenticationError(
  830. f'Protocol error, expected challenge: {message=}')
  831. message = message[len(_CHALLENGE):]
  832. if len(message) < _MD5ONLY_MESSAGE_LENGTH:
  833. raise AuthenticationError('challenge too short: {len(message)} bytes')
  834. digest = _create_response(authkey, message)
  835. connection.send_bytes(digest)
  836. response = connection.recv_bytes(256) # reject large message
  837. if response != _WELCOME:
  838. raise AuthenticationError('digest sent was rejected')
  839. #
  840. # Support for using xmlrpclib for serialization
  841. #
  842. class ConnectionWrapper(object):
  843. def __init__(self, conn, dumps, loads):
  844. self._conn = conn
  845. self._dumps = dumps
  846. self._loads = loads
  847. for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
  848. obj = getattr(conn, attr)
  849. setattr(self, attr, obj)
  850. def send(self, obj):
  851. s = self._dumps(obj)
  852. self._conn.send_bytes(s)
  853. def recv(self):
  854. s = self._conn.recv_bytes()
  855. return self._loads(s)
  856. def _xml_dumps(obj):
  857. return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
  858. def _xml_loads(s):
  859. (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
  860. return obj
  861. class XmlListener(Listener):
  862. def accept(self):
  863. global xmlrpclib
  864. import xmlrpc.client as xmlrpclib
  865. obj = Listener.accept(self)
  866. return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
  867. def XmlClient(*args, **kwds):
  868. global xmlrpclib
  869. import xmlrpc.client as xmlrpclib
  870. return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
  871. #
  872. # Wait
  873. #
  874. if sys.platform == 'win32':
  875. def _exhaustive_wait(handles, timeout):
  876. # Return ALL handles which are currently signalled. (Only
  877. # returning the first signalled might create starvation issues.)
  878. L = list(handles)
  879. ready = []
  880. while L:
  881. res = _winapi.WaitForMultipleObjects(L, False, timeout)
  882. if res == WAIT_TIMEOUT:
  883. break
  884. elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
  885. res -= WAIT_OBJECT_0
  886. elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
  887. res -= WAIT_ABANDONED_0
  888. else:
  889. raise RuntimeError('Should not get here')
  890. ready.append(L[res])
  891. L = L[res+1:]
  892. timeout = 0
  893. return ready
  894. _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
  895. def wait(object_list, timeout=None):
  896. '''
  897. Wait till an object in object_list is ready/readable.
  898. Returns list of those objects in object_list which are ready/readable.
  899. '''
  900. if timeout is None:
  901. timeout = INFINITE
  902. elif timeout < 0:
  903. timeout = 0
  904. else:
  905. timeout = int(timeout * 1000 + 0.5)
  906. object_list = list(object_list)
  907. waithandle_to_obj = {}
  908. ov_list = []
  909. ready_objects = set()
  910. ready_handles = set()
  911. try:
  912. for o in object_list:
  913. try:
  914. fileno = getattr(o, 'fileno')
  915. except AttributeError:
  916. waithandle_to_obj[o.__index__()] = o
  917. else:
  918. # start an overlapped read of length zero
  919. try:
  920. ov, err = _winapi.ReadFile(fileno(), 0, True)
  921. except OSError as e:
  922. ov, err = None, e.winerror
  923. if err not in _ready_errors:
  924. raise
  925. if err == _winapi.ERROR_IO_PENDING:
  926. ov_list.append(ov)
  927. waithandle_to_obj[ov.event] = o
  928. else:
  929. # If o.fileno() is an overlapped pipe handle and
  930. # err == 0 then there is a zero length message
  931. # in the pipe, but it HAS NOT been consumed...
  932. if ov and sys.getwindowsversion()[:2] >= (6, 2):
  933. # ... except on Windows 8 and later, where
  934. # the message HAS been consumed.
  935. try:
  936. _, err = ov.GetOverlappedResult(False)
  937. except OSError as e:
  938. err = e.winerror
  939. if not err and hasattr(o, '_got_empty_message'):
  940. o._got_empty_message = True
  941. ready_objects.add(o)
  942. timeout = 0
  943. ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
  944. finally:
  945. # request that overlapped reads stop
  946. for ov in ov_list:
  947. ov.cancel()
  948. # wait for all overlapped reads to stop
  949. for ov in ov_list:
  950. try:
  951. _, err = ov.GetOverlappedResult(True)
  952. except OSError as e:
  953. err = e.winerror
  954. if err not in _ready_errors:
  955. raise
  956. if err != _winapi.ERROR_OPERATION_ABORTED:
  957. o = waithandle_to_obj[ov.event]
  958. ready_objects.add(o)
  959. if err == 0:
  960. # If o.fileno() is an overlapped pipe handle then
  961. # a zero length message HAS been consumed.
  962. if hasattr(o, '_got_empty_message'):
  963. o._got_empty_message = True
  964. ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
  965. return [o for o in object_list if o in ready_objects]
  966. else:
  967. import selectors
  968. # poll/select have the advantage of not requiring any extra file
  969. # descriptor, contrarily to epoll/kqueue (also, they require a single
  970. # syscall).
  971. if hasattr(selectors, 'PollSelector'):
  972. _WaitSelector = selectors.PollSelector
  973. else:
  974. _WaitSelector = selectors.SelectSelector
  975. def wait(object_list, timeout=None):
  976. '''
  977. Wait till an object in object_list is ready/readable.
  978. Returns list of those objects in object_list which are ready/readable.
  979. '''
  980. with _WaitSelector() as selector:
  981. for obj in object_list:
  982. selector.register(obj, selectors.EVENT_READ)
  983. if timeout is not None:
  984. deadline = time.monotonic() + timeout
  985. while True:
  986. ready = selector.select(timeout)
  987. if ready:
  988. return [key.fileobj for (key, events) in ready]
  989. else:
  990. if timeout is not None:
  991. timeout = deadline - time.monotonic()
  992. if timeout < 0:
  993. return ready
  994. #
  995. # Make connection and socket objects shareable if possible
  996. #
  997. if sys.platform == 'win32':
  998. def reduce_connection(conn):
  999. handle = conn.fileno()
  1000. with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
  1001. from . import resource_sharer
  1002. ds = resource_sharer.DupSocket(s)
  1003. return rebuild_connection, (ds, conn.readable, conn.writable)
  1004. def rebuild_connection(ds, readable, writable):
  1005. sock = ds.detach()
  1006. return Connection(sock.detach(), readable, writable)
  1007. reduction.register(Connection, reduce_connection)
  1008. def reduce_pipe_connection(conn):
  1009. access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
  1010. (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
  1011. dh = reduction.DupHandle(conn.fileno(), access)
  1012. return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
  1013. def rebuild_pipe_connection(dh, readable, writable):
  1014. handle = dh.detach()
  1015. return PipeConnection(handle, readable, writable)
  1016. reduction.register(PipeConnection, reduce_pipe_connection)
  1017. else:
  1018. def reduce_connection(conn):
  1019. df = reduction.DupFd(conn.fileno())
  1020. return rebuild_connection, (df, conn.readable, conn.writable)
  1021. def rebuild_connection(df, readable, writable):
  1022. fd = df.detach()
  1023. return Connection(fd, readable, writable)
  1024. reduction.register(Connection, reduce_connection)