123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- # -*- test-case-name: twisted.protocols.haproxy.test.test_v1parser -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- IProxyParser implementation for version one of the PROXY protocol.
- """
- from typing import Tuple, Union
- from zope.interface import implementer
- from twisted.internet import address
- from . import _info, _interfaces
- from ._exceptions import (
- InvalidNetworkProtocol,
- InvalidProxyHeader,
- MissingAddressData,
- convertError,
- )
- @implementer(_interfaces.IProxyParser)
- class V1Parser:
- """
- PROXY protocol version one header parser.
- Version one of the PROXY protocol is a human readable format represented
- by a single, newline delimited binary string that contains all of the
- relevant source and destination data.
- """
- PROXYSTR = b"PROXY"
- UNKNOWN_PROTO = b"UNKNOWN"
- TCP4_PROTO = b"TCP4"
- TCP6_PROTO = b"TCP6"
- ALLOWED_NET_PROTOS = (
- TCP4_PROTO,
- TCP6_PROTO,
- UNKNOWN_PROTO,
- )
- NEWLINE = b"\r\n"
- def __init__(self) -> None:
- self.buffer = b""
- def feed(
- self, data: bytes
- ) -> Union[Tuple[_info.ProxyInfo, bytes], Tuple[None, None]]:
- """
- Consume a chunk of data and attempt to parse it.
- @param data: A bytestring.
- @type data: L{bytes}
- @return: A two-tuple containing, in order, a
- L{_interfaces.IProxyInfo} and any bytes fed to the
- parser that followed the end of the header. Both of these values
- are None until a complete header is parsed.
- @raises InvalidProxyHeader: If the bytes fed to the parser create an
- invalid PROXY header.
- """
- self.buffer += data
- if len(self.buffer) > 107 and self.NEWLINE not in self.buffer:
- raise InvalidProxyHeader()
- lines = (self.buffer).split(self.NEWLINE, 1)
- if not len(lines) > 1:
- return (None, None)
- self.buffer = b""
- remaining = lines.pop()
- header = lines.pop()
- info = self.parse(header)
- return (info, remaining)
- @classmethod
- def parse(cls, line: bytes) -> _info.ProxyInfo:
- """
- Parse a bytestring as a full PROXY protocol header line.
- @param line: A bytestring that represents a valid HAProxy PROXY
- protocol header line.
- @type line: bytes
- @return: A L{_interfaces.IProxyInfo} containing the parsed data.
- @raises InvalidProxyHeader: If the bytestring does not represent a
- valid PROXY header.
- @raises InvalidNetworkProtocol: When no protocol can be parsed or is
- not one of the allowed values.
- @raises MissingAddressData: When the protocol is TCP* but the header
- does not contain a complete set of addresses and ports.
- """
- originalLine = line
- proxyStr = None
- networkProtocol = None
- sourceAddr = None
- sourcePort = None
- destAddr = None
- destPort = None
- with convertError(ValueError, InvalidProxyHeader):
- proxyStr, line = line.split(b" ", 1)
- if proxyStr != cls.PROXYSTR:
- raise InvalidProxyHeader()
- with convertError(ValueError, InvalidNetworkProtocol):
- networkProtocol, line = line.split(b" ", 1)
- if networkProtocol not in cls.ALLOWED_NET_PROTOS:
- raise InvalidNetworkProtocol()
- if networkProtocol == cls.UNKNOWN_PROTO:
- return _info.ProxyInfo(originalLine, None, None)
- with convertError(ValueError, MissingAddressData):
- sourceAddr, line = line.split(b" ", 1)
- with convertError(ValueError, MissingAddressData):
- destAddr, line = line.split(b" ", 1)
- with convertError(ValueError, MissingAddressData):
- sourcePort, line = line.split(b" ", 1)
- with convertError(ValueError, MissingAddressData):
- destPort = line.split(b" ")[0]
- if networkProtocol == cls.TCP4_PROTO:
- return _info.ProxyInfo(
- originalLine,
- address.IPv4Address("TCP", sourceAddr.decode(), int(sourcePort)),
- address.IPv4Address("TCP", destAddr.decode(), int(destPort)),
- )
- return _info.ProxyInfo(
- originalLine,
- address.IPv6Address("TCP", sourceAddr.decode(), int(sourcePort)),
- address.IPv6Address("TCP", destAddr.decode(), int(destPort)),
- )
|