123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474 |
- #!/usr/bin/env python3
- # Allow direct execution
- import os
- import sys
- import threading
- import unittest
- import pytest
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import abc
- import contextlib
- import enum
- import functools
- import http.server
- import json
- import random
- import socket
- import struct
- import time
- from socketserver import (
- BaseRequestHandler,
- StreamRequestHandler,
- ThreadingTCPServer,
- )
- from test.helper import http_server_port, verify_address_availability
- from yt_dlp.networking import Request
- from yt_dlp.networking.exceptions import ProxyError, TransportError
- from yt_dlp.socks import (
- SOCKS4_REPLY_VERSION,
- SOCKS4_VERSION,
- SOCKS5_USER_AUTH_SUCCESS,
- SOCKS5_USER_AUTH_VERSION,
- SOCKS5_VERSION,
- Socks5AddressType,
- Socks5Auth,
- )
- SOCKS5_USER_AUTH_FAILURE = 0x1
- class Socks4CD(enum.IntEnum):
- REQUEST_GRANTED = 90
- REQUEST_REJECTED_OR_FAILED = 91
- REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
- REQUEST_REJECTED_DIFFERENT_USERID = 93
- class Socks5Reply(enum.IntEnum):
- SUCCEEDED = 0x0
- GENERAL_FAILURE = 0x1
- CONNECTION_NOT_ALLOWED = 0x2
- NETWORK_UNREACHABLE = 0x3
- HOST_UNREACHABLE = 0x4
- CONNECTION_REFUSED = 0x5
- TTL_EXPIRED = 0x6
- COMMAND_NOT_SUPPORTED = 0x7
- ADDRESS_TYPE_NOT_SUPPORTED = 0x8
- class SocksTestRequestHandler(BaseRequestHandler):
- def __init__(self, *args, socks_info=None, **kwargs):
- self.socks_info = socks_info
- super().__init__(*args, **kwargs)
- class SocksProxyHandler(BaseRequestHandler):
- def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
- self.socks_kwargs = socks_server_kwargs or {}
- self.request_handler_class = request_handler_class
- super().__init__(*args, **kwargs)
- class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
- # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
- # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
- def handle(self):
- sleep = self.socks_kwargs.get('sleep')
- if sleep:
- time.sleep(sleep)
- version, nmethods = self.connection.recv(2)
- assert version == SOCKS5_VERSION
- methods = list(self.connection.recv(nmethods))
- auth = self.socks_kwargs.get('auth')
- if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
- self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
- self.server.close_request(self.request)
- return
- elif Socks5Auth.AUTH_USER_PASS in methods:
- self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
- _, user_len = struct.unpack('!BB', self.connection.recv(2))
- username = self.connection.recv(user_len).decode()
- pass_len = ord(self.connection.recv(1))
- password = self.connection.recv(pass_len).decode()
- if username == auth[0] and password == auth[1]:
- self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
- else:
- self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
- self.server.close_request(self.request)
- return
- elif Socks5Auth.AUTH_NONE in methods:
- self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
- else:
- self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
- self.server.close_request(self.request)
- return
- version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
- socks_info = {
- 'version': version,
- 'auth_methods': methods,
- 'command': command,
- 'client_address': self.client_address,
- 'ipv4_address': None,
- 'domain_address': None,
- 'ipv6_address': None,
- }
- if address_type == Socks5AddressType.ATYP_IPV4:
- socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
- elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
- socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
- elif address_type == Socks5AddressType.ATYP_IPV6:
- socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
- else:
- self.server.close_request(self.request)
- socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
- # dummy response, the returned IP is just a placeholder
- self.connection.sendall(struct.pack(
- '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
- self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
- class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
- # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
- # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
- def _read_until_null(self):
- return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
- def handle(self):
- sleep = self.socks_kwargs.get('sleep')
- if sleep:
- time.sleep(sleep)
- socks_info = {
- 'version': SOCKS4_VERSION,
- 'command': None,
- 'client_address': self.client_address,
- 'ipv4_address': None,
- 'port': None,
- 'domain_address': None,
- }
- version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
- socks_info['port'] = dest_port
- socks_info['command'] = command
- if version != SOCKS4_VERSION:
- self.server.close_request(self.request)
- return
- use_remote_dns = False
- if 0x0 < dest_ip <= 0xFF:
- use_remote_dns = True
- else:
- socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack('!I', dest_ip))
- user_id = self._read_until_null().decode()
- if user_id != (self.socks_kwargs.get('user_id') or ''):
- self.connection.sendall(struct.pack(
- '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
- self.server.close_request(self.request)
- return
- if use_remote_dns:
- socks_info['domain_address'] = self._read_until_null().decode()
- # dummy response, the returned IP is just a placeholder
- self.connection.sendall(
- struct.pack(
- '!BBHI', SOCKS4_REPLY_VERSION,
- self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
- self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
- class IPv6ThreadingTCPServer(ThreadingTCPServer):
- address_family = socket.AF_INET6
- class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
- def do_GET(self):
- if self.path == '/socks_info':
- payload = json.dumps(self.socks_info.copy())
- self.send_response(200)
- self.send_header('Content-Type', 'application/json; charset=utf-8')
- self.send_header('Content-Length', str(len(payload)))
- self.end_headers()
- self.wfile.write(payload.encode())
- class SocksWebSocketTestRequestHandler(SocksTestRequestHandler):
- def handle(self):
- import websockets.sync.server
- protocol = websockets.ServerProtocol()
- connection = websockets.sync.server.ServerConnection(socket=self.request, protocol=protocol, close_timeout=0)
- connection.handshake()
- for message in connection:
- if message == 'socks_info':
- connection.send(json.dumps(self.socks_info))
- connection.close()
- @contextlib.contextmanager
- def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
- server = server_thread = None
- try:
- bind_address = bind_ip or '127.0.0.1'
- server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
- server = server_type(
- (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
- server_port = http_server_port(server)
- server_thread = threading.Thread(target=server.serve_forever)
- server_thread.daemon = True
- server_thread.start()
- if '.' not in bind_address:
- yield f'[{bind_address}]:{server_port}'
- else:
- yield f'{bind_address}:{server_port}'
- finally:
- server.shutdown()
- server.server_close()
- server_thread.join(2.0)
- class SocksProxyTestContext(abc.ABC):
- REQUEST_HANDLER_CLASS = None
- def socks_server(self, server_class, *args, **kwargs):
- return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
- @abc.abstractmethod
- def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
- """return a dict of socks_info"""
- class HTTPSocksTestProxyContext(SocksProxyTestContext):
- REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
- def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
- request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
- handler.validate(request)
- return json.loads(handler.send(request).read().decode())
- class WebSocketSocksTestProxyContext(SocksProxyTestContext):
- REQUEST_HANDLER_CLASS = SocksWebSocketTestRequestHandler
- def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
- request = Request(f'ws://{target_domain or "127.0.0.1"}:{target_port or "40000"}', **req_kwargs)
- handler.validate(request)
- ws = handler.send(request)
- ws.send('socks_info')
- socks_info = ws.recv()
- ws.close()
- return json.loads(socks_info)
- CTX_MAP = {
- 'http': HTTPSocksTestProxyContext,
- 'ws': WebSocketSocksTestProxyContext,
- }
- @pytest.fixture(scope='module')
- def ctx(request):
- return CTX_MAP[request.param]()
- @pytest.mark.parametrize(
- 'handler,ctx', [
- ('Urllib', 'http'),
- ('Requests', 'http'),
- ('Websockets', 'ws'),
- ('CurlCFFI', 'http'),
- ], indirect=True)
- class TestSocks4Proxy:
- def test_socks4_no_auth(self, handler, ctx):
- with handler() as rh:
- with ctx.socks_server(Socks4ProxyHandler) as server_address:
- response = ctx.socks_info_request(
- rh, proxies={'all': f'socks4://{server_address}'})
- assert response['version'] == 4
- def test_socks4_auth(self, handler, ctx):
- with handler() as rh:
- with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
- with pytest.raises(ProxyError):
- ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
- response = ctx.socks_info_request(
- rh, proxies={'all': f'socks4://user:@{server_address}'})
- assert response['version'] == 4
- def test_socks4a_ipv4_target(self, handler, ctx):
- with ctx.socks_server(Socks4ProxyHandler) as server_address:
- with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
- response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
- assert response['version'] == 4
- assert (response['ipv4_address'] == '127.0.0.1') != (response['domain_address'] == '127.0.0.1')
- def test_socks4a_domain_target(self, handler, ctx):
- with ctx.socks_server(Socks4ProxyHandler) as server_address:
- with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
- response = ctx.socks_info_request(rh, target_domain='localhost')
- assert response['version'] == 4
- assert response['ipv4_address'] is None
- assert response['domain_address'] == 'localhost'
- def test_ipv4_client_source_address(self, handler, ctx):
- with ctx.socks_server(Socks4ProxyHandler) as server_address:
- source_address = f'127.0.0.{random.randint(5, 255)}'
- verify_address_availability(source_address)
- with handler(proxies={'all': f'socks4://{server_address}'},
- source_address=source_address) as rh:
- response = ctx.socks_info_request(rh)
- assert response['client_address'][0] == source_address
- assert response['version'] == 4
- @pytest.mark.parametrize('reply_code', [
- Socks4CD.REQUEST_REJECTED_OR_FAILED,
- Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
- Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
- ])
- def test_socks4_errors(self, handler, ctx, reply_code):
- with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
- with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
- with pytest.raises(ProxyError):
- ctx.socks_info_request(rh)
- def test_ipv6_socks4_proxy(self, handler, ctx):
- with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
- with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
- response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
- assert response['client_address'][0] == '::1'
- assert response['ipv4_address'] == '127.0.0.1'
- assert response['version'] == 4
- def test_timeout(self, handler, ctx):
- with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
- with handler(proxies={'all': f'socks4://{server_address}'}, timeout=0.5) as rh:
- with pytest.raises(TransportError):
- ctx.socks_info_request(rh)
- @pytest.mark.parametrize(
- 'handler,ctx', [
- ('Urllib', 'http'),
- ('Requests', 'http'),
- ('Websockets', 'ws'),
- ('CurlCFFI', 'http'),
- ], indirect=True)
- class TestSocks5Proxy:
- def test_socks5_no_auth(self, handler, ctx):
- with ctx.socks_server(Socks5ProxyHandler) as server_address:
- with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
- response = ctx.socks_info_request(rh)
- assert response['auth_methods'] == [0x0]
- assert response['version'] == 5
- def test_socks5_user_pass(self, handler, ctx):
- with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
- with handler() as rh:
- with pytest.raises(ProxyError):
- ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
- response = ctx.socks_info_request(
- rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
- assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
- assert response['version'] == 5
- def test_socks5_ipv4_target(self, handler, ctx):
- with ctx.socks_server(Socks5ProxyHandler) as server_address:
- with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
- response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
- assert response['ipv4_address'] == '127.0.0.1'
- assert response['version'] == 5
- def test_socks5_domain_target(self, handler, ctx):
- with ctx.socks_server(Socks5ProxyHandler) as server_address:
- with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
- response = ctx.socks_info_request(rh, target_domain='localhost')
- assert (response['ipv4_address'] == '127.0.0.1') != (response['ipv6_address'] == '::1')
- assert response['version'] == 5
- def test_socks5h_domain_target(self, handler, ctx):
- with ctx.socks_server(Socks5ProxyHandler) as server_address:
- with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
- response = ctx.socks_info_request(rh, target_domain='localhost')
- assert response['ipv4_address'] is None
- assert response['domain_address'] == 'localhost'
- assert response['version'] == 5
- def test_socks5h_ip_target(self, handler, ctx):
- with ctx.socks_server(Socks5ProxyHandler) as server_address:
- with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
- response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
- assert response['ipv4_address'] == '127.0.0.1'
- assert response['domain_address'] is None
- assert response['version'] == 5
- def test_socks5_ipv6_destination(self, handler, ctx):
- with ctx.socks_server(Socks5ProxyHandler) as server_address:
- with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
- response = ctx.socks_info_request(rh, target_domain='[::1]')
- assert response['ipv6_address'] == '::1'
- assert response['version'] == 5
- def test_ipv6_socks5_proxy(self, handler, ctx):
- with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
- with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
- response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
- assert response['client_address'][0] == '::1'
- assert response['ipv4_address'] == '127.0.0.1'
- assert response['version'] == 5
- # XXX: is there any feasible way of testing IPv6 source addresses?
- # Same would go for non-proxy source_address test...
- def test_ipv4_client_source_address(self, handler, ctx):
- with ctx.socks_server(Socks5ProxyHandler) as server_address:
- source_address = f'127.0.0.{random.randint(5, 255)}'
- verify_address_availability(source_address)
- with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
- response = ctx.socks_info_request(rh)
- assert response['client_address'][0] == source_address
- assert response['version'] == 5
- @pytest.mark.parametrize('reply_code', [
- Socks5Reply.GENERAL_FAILURE,
- Socks5Reply.CONNECTION_NOT_ALLOWED,
- Socks5Reply.NETWORK_UNREACHABLE,
- Socks5Reply.HOST_UNREACHABLE,
- Socks5Reply.CONNECTION_REFUSED,
- Socks5Reply.TTL_EXPIRED,
- Socks5Reply.COMMAND_NOT_SUPPORTED,
- Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
- ])
- def test_socks5_errors(self, handler, ctx, reply_code):
- with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
- with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
- with pytest.raises(ProxyError):
- ctx.socks_info_request(rh)
- def test_timeout(self, handler, ctx):
- with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
- with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
- with pytest.raises(TransportError):
- ctx.socks_info_request(rh)
- if __name__ == '__main__':
- unittest.main()
|