123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- import argparse
- import json
- import logging
- import posixpath
- import random
- import re
- import requests
- import string
- import sys
- import urllib.parse
- #######################################################################################################################
- # Utilities
- def some(s):
- return random.choice(sorted(s))
- def not_some(s):
- test_set = random.choice([string.ascii_uppercase + string.ascii_lowercase,
- string.digits,
- string.digits + ".E-",
- '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJK'
- 'LMNOPQRSTUVWXYZ!"#$%\'()*+,-./:;<=>?@[\\]^_`{|}~ '])
- test_len = random.choice([1, 2, 3, 37, 61, 121])
- while True:
- x = ''.join([random.choice(test_set) for _ in range(test_len)])
- if x not in s:
- return x
- def build_url(host_maybe_scheme, base_path):
- try:
- if '//' not in host_maybe_scheme:
- host_maybe_scheme = '//' + host_maybe_scheme
- url_tuple = urllib.parse.urlparse(host_maybe_scheme)
- if base_path[0] == '/':
- base_path = base_path[1:]
- return url_tuple.netloc, posixpath.join(url_tuple.path, base_path)
- except Exception as e:
- L.error(f"Critical failure decoding arguments -> {e}")
- sys.exit(-1)
- #######################################################################################################################
- # Data-model and processing
- class Param(object):
- def __init__(self, name, location, kind):
- self.location = location
- self.kind = kind
- self.name = name
- self.values = set()
- def dump(self):
- print(f"{self.name} in {self.location} is {self.kind} : {{{self.values}}}")
- def does_response_fit_schema(schema_path, schema, resp):
- '''The schema_path argument tells us where we are (globally) in the schema. The schema argument is the
- sub-tree within the schema json that we are validating against. The resp is the json subtree from the
- target host's response.
- The basic idea is this: swagger defines a model of valid json trees. In this sense it is a formal
- language and we can validate a given server response by checking if the language accepts a particular
- server response. This is basically a parser, but instead of strings we are operating on languages
- of trees.
- This could probably be extended to arbitrary swagger definitions - but the amount of work increases
- rapidly as we attempt to cover the full semantics of languages of trees defined in swagger. Instead
- we have some special cases that describe the parts of the semantics that we've used to describe the
- netdata API.
- If we hit an error (in the schema) that prevents further checks then we return early, otherwise we
- try to collect as many errors as possible.
- '''
- success = True
- if "type" not in schema:
- L.error(f"Cannot progress past {schema_path} -> no type specified in dictionary")
- print(json.dumps(schema, indent=2))
- return False
- if schema["type"] == "object":
- if isinstance(resp, dict) and "properties" in schema and isinstance(schema["properties"], dict):
- L.debug(f"Validate properties against dictionary at {schema_path}")
- for k, v in schema["properties"].items():
- L.debug(f"Validate {k} received with {v}")
- if v.get("required", False) and k not in resp:
- L.error(f"Missing {k} in response at {schema_path}")
- print(json.dumps(resp, indent=2))
- return False
- if k in resp:
- if not does_response_fit_schema(posixpath.join(schema_path, k), v, resp[k]):
- success = False
- elif isinstance(resp, dict) and "additionalProperties" in schema \
- and isinstance(schema["additionalProperties"], dict):
- kv_schema = schema["additionalProperties"]
- L.debug(f"Validate additionalProperties against every value in dictionary at {schema_path}")
- if "type" in kv_schema and kv_schema["type"] == "object":
- for k, v in resp.items():
- if not does_response_fit_schema(posixpath.join(schema_path, k), kv_schema, v):
- success = False
- else:
- L.error("Don't understand what the additionalProperties means (it has no type?)")
- return False
- else:
- L.error(f"Can't understand schema at {schema_path}")
- print(json.dumps(schema, indent=2))
- return False
- elif schema["type"] == "string":
- if isinstance(resp, str):
- L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path}")
- return True
- L.error(f"{repr(resp)} does not match schema {repr(schema)} at {schema_path}")
- return False
- elif schema["type"] == "boolean":
- if isinstance(resp, bool):
- L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path}")
- return True
- L.error(f"{repr(resp)} does not match schema {repr(schema)} at {schema_path}")
- return False
- elif schema["type"] == "number":
- if 'nullable' in schema and resp is None:
- L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path} (because nullable)")
- return True
- if isinstance(resp, int) or isinstance(resp, float):
- L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path}")
- return True
- L.error(f"{repr(resp)} does not match schema {repr(schema)} at {schema_path}")
- return False
- elif schema["type"] == "integer":
- if 'nullable' in schema and resp is None:
- L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path} (because nullable)")
- return True
- if isinstance(resp, int):
- L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path}")
- return True
- L.error(f"{repr(resp)} does not match schema {repr(schema)} at {schema_path}")
- return False
- elif schema["type"] == "array":
- if "items" not in schema:
- L.error(f"Schema for array at {schema_path} does not specify items!")
- return False
- item_schema = schema["items"]
- if not isinstance(resp, list):
- L.error(f"Server did not return a list for {schema_path} (typed as array in schema)")
- return False
- for i, item in enumerate(resp):
- if not does_response_fit_schema(posixpath.join(schema_path, str(i)), item_schema, item):
- success = False
- else:
- L.error(f"Invalid swagger type {schema['type']} for {type(resp)} at {schema_path}")
- print(json.dumps(schema, indent=2))
- return False
- return success
- class GetPath(object):
- def __init__(self, url, spec):
- self.url = url
- self.req_params = {}
- self.opt_params = {}
- self.success = None
- self.failures = {}
- if 'parameters' in spec.keys():
- for p in spec['parameters']:
- name = p['name']
- req = p.get('required', False)
- target = self.req_params if req else self.opt_params
- target[name] = Param(name, p['in'], p['type'])
- if 'default' in p:
- defs = p['default']
- if isinstance(defs, list):
- for d in defs:
- target[name].values.add(d)
- else:
- target[name].values.add(defs)
- if 'enum' in p:
- for v in p['enum']:
- target[name].values.add(v)
- if req and len(target[name].values) == 0:
- print(f"FAIL: No default values in swagger for required parameter {name} in {self.url}")
- for code, schema in spec['responses'].items():
- if code[0] == "2" and 'schema' in schema:
- self.success = schema['schema']
- elif code[0] == "2":
- L.error(f"2xx response with no schema in {self.url}")
- else:
- self.failures[code] = schema
- def generate_success(self, host):
- url_args = "&".join([f"{p.name}={some(p.values)}" for p in self.req_params.values()])
- base_url = urllib.parse.urljoin(host, self.url)
- test_url = f"{base_url}?{url_args}"
- if url_filter.match(test_url):
- try:
- resp = requests.get(url=test_url, verify=(not args.tls_no_verify))
- self.validate(test_url, resp, True)
- except Exception as e:
- L.error(f"Network failure in test {e}")
- else:
- L.debug(f"url_filter skips {test_url}")
- def generate_failure(self, host):
- all_params = list(self.req_params.values()) + list(self.opt_params.values())
- bad_param = ''.join([random.choice(string.ascii_lowercase) for _ in range(5)])
- while bad_param in all_params:
- bad_param = ''.join([random.choice(string.ascii_lowercase) for _ in range(5)])
- all_params.append(Param(bad_param, "query", "string"))
- url_args = "&".join([f"{p.name}={not_some(p.values)}" for p in all_params])
- base_url = urllib.parse.urljoin(host, self.url)
- test_url = f"{base_url}?{url_args}"
- if url_filter.match(test_url):
- try:
- resp = requests.get(url=test_url, verify=(not args.tls_no_verify))
- self.validate(test_url, resp, False)
- except Exception as e:
- L.error(f"Network failure in test {e}")
- def validate(self, test_url, resp, expect_success):
- try:
- resp_json = json.loads(resp.text)
- except json.decoder.JSONDecodeError as e:
- L.error(f"Non-json response from {test_url}")
- return
- success_code = resp.status_code >= 200 and resp.status_code < 300
- if success_code and expect_success:
- if self.success is not None:
- if does_response_fit_schema(posixpath.join(self.url, str(resp.status_code)), self.success, resp_json):
- L.info(f"tested {test_url}")
- else:
- L.error(f"tested {test_url}")
- else:
- L.error(f"Missing schema {test_url}")
- elif not success_code and not expect_success:
- schema = self.failures.get(str(resp.status_code), None)
- if schema is not None:
- if does_response_fit_schema(posixpath.join(self.url, str(resp.status_code)), schema, resp_json):
- L.info(f"tested {test_url}")
- else:
- L.error(f"tested {test_url}")
- else:
- L.error("Missing schema for {resp.status_code} from {test_url}")
- else:
- L.error(f"Received incorrect status code {resp.status_code} against {test_url}")
- def get_the_spec(url):
- if url[:7] == "file://":
- with open(url[7:]) as f:
- return f.read()
- return requests.get(url=url).text
- # Swagger paths look absolute but they are relative to the base.
- def not_absolute(path):
- return path[1:] if path[0] == '/' else path
- def find_ref(spec, path):
- if len(path) > 0 and path[0] == '#':
- return find_ref(spec, path[1:])
- if len(path) == 1:
- return spec[path[0]]
- return find_ref(spec[path[0]], path[1:])
- def resolve_refs(spec, spec_root=None):
- '''Find all "$ref" keys in the swagger spec and inline their target schemas.
- As with all inliners this will break if a definition recursively links to itself, but this should not
- happen in swagger as embedding a structure inside itself would produce a record of infinite size.'''
- if spec_root is None:
- spec_root = spec
- newspec = {}
- for k, v in spec.items():
- if k == "$ref":
- path = v.split('/')
- target = find_ref(spec_root, path)
- # Unfold one level of the tree and erase the $ref if possible.
- if isinstance(target, dict):
- for kk, vv in resolve_refs(target, spec_root).items():
- newspec[kk] = vv
- else:
- newspec[k] = target
- elif isinstance(v, dict):
- newspec[k] = resolve_refs(v, spec_root)
- else:
- newspec[k] = v
- # This is an artifact of inline the $refs when they are inside a properties key as their children should be
- # pushed up into the parent dictionary. They must be merged (union) rather than replace as we use this to
- # implement polymorphism in the data-model.
- if 'properties' in newspec and isinstance(newspec['properties'], dict) and \
- 'properties' in newspec['properties']:
- sub = newspec['properties']['properties']
- del newspec['properties']['properties']
- if 'type' in newspec['properties']:
- del newspec['properties']['type']
- for k, v in sub.items():
- newspec['properties'][k] = v
- return newspec
- #######################################################################################################################
- # Initialization
- random.seed(7) # Default is reproducible sequences
- parser = argparse.ArgumentParser()
- parser.add_argument('--url', type=str,
- default='https://raw.githubusercontent.com/netdata/netdata/master/web/api/netdata-swagger.json',
- help='The URL of the API definition in swagger. The default will pull the latest version '
- 'from the main branch.')
- parser.add_argument('--host', type=str,
- help='The URL of the target host to fuzz. The default will read the host from the swagger '
- 'definition.')
- parser.add_argument('--reseed', action='store_true',
- help="Pick a random seed for the PRNG. The default uses a constant seed for reproducibility.")
- parser.add_argument('--passes', action='store_true',
- help="Log information about tests that pass")
- parser.add_argument('--detail', action='store_true',
- help="Log information about the response/schema comparisons during each test")
- parser.add_argument('--filter', type=str,
- default=".*",
- help="Supply a regex used to filter the testing URLs generated")
- parser.add_argument('--tls-no-verify', action='store_true',
- help="Disable TLS certification verification to allow connection to hosts that use"
- "self-signed certificates")
- parser.add_argument('--dump-inlined', action='store_true',
- help='Dump the inlined swagger spec instead of fuzzing. For "reasons".')
- args = parser.parse_args()
- if args.reseed:
- random.seed()
- spec = json.loads(get_the_spec(args.url))
- inlined_spec = resolve_refs(spec)
- if args.dump_inlined:
- print(json.dumps(inlined_spec, indent=2))
- sys.exit(-1)
- logging.addLevelName(40, "FAIL")
- logging.addLevelName(20, "PASS")
- logging.addLevelName(10, "DETAIL")
- L = logging.getLogger()
- handler = logging.StreamHandler(sys.stdout)
- if not args.passes and not args.detail:
- L.setLevel(logging.ERROR)
- elif args.passes and not args.detail:
- L.setLevel(logging.INFO)
- elif args.detail:
- L.setLevel(logging.DEBUG)
- handler.setFormatter(logging.Formatter(fmt="%(levelname)s %(message)s"))
- L.addHandler(handler)
- url_filter = re.compile(args.filter)
- if spec['swagger'] != '2.0':
- L.error(f"Unexpected swagger version")
- sys.exit(-1)
- L.info(f"Fuzzing {spec['info']['title']} / {spec['info']['version']}")
- host, base_url = build_url(args.host or spec['host'], inlined_spec['basePath'])
- L.info(f"Target host is {base_url}")
- paths = []
- for name, p in inlined_spec['paths'].items():
- if 'get' in p:
- name = not_absolute(name)
- paths.append(GetPath(posixpath.join(base_url, name), p['get']))
- elif 'put' in p:
- L.error(f"Generation of PUT methods (for {name} is unimplemented")
- for s in inlined_spec['schemes']:
- for p in paths:
- resp = p.generate_success(s + "://" + host)
- resp = p.generate_failure(s+"://"+host)