test_util.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. from __future__ import annotations
  2. from wsgiref.util import is_hop_by_hop
  3. from django.test import override_settings
  4. from requests.structures import CaseInsensitiveDict
  5. from sentry.silo.util import (
  6. INVALID_OUTBOUND_HEADERS,
  7. INVALID_PROXY_HEADERS,
  8. PROXY_BASE_URL_HEADER,
  9. PROXY_OI_HEADER,
  10. PROXY_PATH,
  11. PROXY_SIGNATURE_HEADER,
  12. clean_headers,
  13. clean_outbound_headers,
  14. clean_proxy_headers,
  15. encode_subnet_signature,
  16. trim_leading_slashes,
  17. verify_subnet_signature,
  18. )
  19. from sentry.testutils.cases import TestCase
  20. class SiloUtilityTest(TestCase):
  21. headers = CaseInsensitiveDict(
  22. data={
  23. "User-Agent": "Chrome",
  24. "Authorization": "Bearer tkn",
  25. "Host": "sentry.io",
  26. "Content-Length": "27",
  27. "Content-Encoding": "deflate, gzip",
  28. PROXY_OI_HEADER: "12",
  29. PROXY_SIGNATURE_HEADER: "-leander(but-in-cursive)",
  30. PROXY_BASE_URL_HEADER: "https://api.integration.com/",
  31. PROXY_PATH: "additional/path/params",
  32. "X-Test-Header-1": "One",
  33. "X-Test-Header-2": "Two",
  34. "X-Test-Header-3": "Three",
  35. "X-Forwarded-Proto": "https",
  36. }
  37. )
  38. secret = "hush-hush-im-invisible"
  39. def test_trim_leading_slashes(self):
  40. assert trim_leading_slashes("/happy-path") == "happy-path"
  41. assert trim_leading_slashes("/a/bit/nested") == "a/bit/nested"
  42. assert trim_leading_slashes("/////way-nested") == "way-nested"
  43. assert trim_leading_slashes("/") == ""
  44. assert trim_leading_slashes("not-nested-at-all") == "not-nested-at-all"
  45. assert trim_leading_slashes("/url-safe?query=h%20c%20") == "url-safe?query=h%20c%20"
  46. def test_clean_headers(self):
  47. assert clean_headers(self.headers, []) == self.headers
  48. assert "X-Test-Header-4" not in self.headers
  49. assert clean_headers(self.headers, ["X-Test-Header-4"]) == self.headers
  50. assert "X-Test-Header-1" in self.headers
  51. cleaned = clean_headers(self.headers, ["X-Test-Header-1"])
  52. assert "X-Test-Header-1" not in cleaned
  53. assert len(cleaned) == len(self.headers) - 1
  54. def test_clean_proxy_headers(self):
  55. cleaned = clean_proxy_headers(self.headers)
  56. for header in INVALID_PROXY_HEADERS:
  57. assert header in self.headers
  58. assert header not in cleaned
  59. retained_headers = filter(lambda k: k not in INVALID_PROXY_HEADERS, self.headers.keys())
  60. for header in retained_headers:
  61. assert self.headers[header] == cleaned[header]
  62. def test_clean_outbound_headers(self):
  63. cleaned = clean_outbound_headers(self.headers)
  64. for header in INVALID_OUTBOUND_HEADERS:
  65. assert header in self.headers
  66. assert header not in cleaned
  67. retained_headers = filter(lambda k: k not in INVALID_OUTBOUND_HEADERS, self.headers.keys())
  68. for header in retained_headers:
  69. assert self.headers[header] == cleaned[header]
  70. def test_clean_hop_by_hop_headers(self):
  71. headers = {
  72. **self.headers,
  73. "Keep-Alive": "timeout=5",
  74. "Transfer-Encoding": "chunked",
  75. "Proxy-Authenticate": "Basic",
  76. }
  77. hop_by_hop_headers = ["Keep-Alive", "Transfer-Encoding", "Proxy-Authenticate"]
  78. cleaned = clean_headers(headers, invalid_headers=[])
  79. for header in hop_by_hop_headers:
  80. assert header in headers
  81. assert header not in cleaned
  82. retained_headers = filter(lambda k: not is_hop_by_hop(k), headers.keys())
  83. for header in retained_headers:
  84. assert headers[header] == cleaned[header]
  85. @override_settings(SENTRY_SUBNET_SECRET=secret)
  86. def test_subnet_signature(self):
  87. signature = "v0=687940c95f7ec16fa8eb1641ac601bebfdeebf5eeaa698d3b6669077dde818ba"
  88. def _encode(
  89. secret: str = self.secret,
  90. base_url: str = "http://controlserver/api/0/internal/integration-proxy/",
  91. path: str = "/chat.postMessage",
  92. identifier: str = "21",
  93. request_body: bytes | None = b'{"some": "payload"}',
  94. ) -> str:
  95. return encode_subnet_signature(
  96. secret=secret,
  97. base_url=base_url,
  98. path=path,
  99. identifier=identifier,
  100. request_body=request_body,
  101. )
  102. def _verify(
  103. base_url: str = "http://controlserver/api/0/internal/integration-proxy/",
  104. path: str = "/chat.postMessage",
  105. identifier: str = "21",
  106. request_body: bytes | None = b'{"some": "payload"}',
  107. provided_signature: str = signature,
  108. ) -> bool:
  109. return verify_subnet_signature(
  110. base_url=base_url,
  111. path=path,
  112. identifier=identifier,
  113. request_body=request_body,
  114. provided_signature=provided_signature,
  115. )
  116. assert _encode() == signature
  117. assert _verify()
  118. # We trim slashes prior to encoding/verifying
  119. assert _encode(path="chat.postMessage") == signature
  120. assert _verify(path="chat.postMessage")
  121. assert (
  122. _encode(base_url="http://controlserver/api/0/internal/integration-proxy") == signature
  123. )
  124. assert _verify(base_url="http://controlserver/api/0/internal/integration-proxy")
  125. # Wrong secrets not be verifiable
  126. wrong_secret_signature = _encode(secret="wrong-secret")
  127. assert wrong_secret_signature != signature
  128. assert not _verify(provided_signature=wrong_secret_signature)
  129. # Any mishandled field should not be verifiable
  130. assert _encode(path="incorrect-data") != signature
  131. assert not _verify(path="incorrect-data")
  132. assert _encode(identifier="incorrect-data") != signature
  133. assert not _verify(identifier="incorrect-data")
  134. assert _encode(request_body=b"incorrect-data") != signature
  135. assert not _verify(request_body=b"incorrect-data")