#!/usr/bin/env python import re from . import unittest, OrderedDict from jmespath import parser from jmespath import visitor from jmespath import ast from jmespath import exceptions class TestParser(unittest.TestCase): def setUp(self): self.parser = parser.Parser() def assert_parsed_ast(self, expression, expected_ast): parsed = self.parser.parse(expression) self.assertEqual(parsed.parsed, expected_ast) def test_parse_empty_string_raises_exception(self): with self.assertRaises(exceptions.EmptyExpressionError): self.parser.parse('') def test_field(self): self.assert_parsed_ast('foo', ast.field('foo')) def test_dot_syntax(self): self.assert_parsed_ast('foo.bar', ast.subexpression([ast.field('foo'), ast.field('bar')])) def test_multiple_dots(self): parsed = self.parser.parse('foo.bar.baz') self.assertEqual( parsed.search({'foo': {'bar': {'baz': 'correct'}}}), 'correct') def test_index(self): parsed = self.parser.parse('foo[1]') self.assertEqual( parsed.search({'foo': ['zero', 'one', 'two']}), 'one') def test_quoted_subexpression(self): self.assert_parsed_ast('"foo"."bar"', ast.subexpression([ ast.field('foo'), ast.field('bar')])) def test_wildcard(self): parsed = self.parser.parse('foo[*]') self.assertEqual( parsed.search({'foo': ['zero', 'one', 'two']}), ['zero', 'one', 'two']) def test_wildcard_with_children(self): parsed = self.parser.parse('foo[*].bar') self.assertEqual( parsed.search({'foo': [{'bar': 'one'}, {'bar': 'two'}]}), ['one', 'two']) def test_or_expression(self): parsed = self.parser.parse('foo || bar') self.assertEqual(parsed.search({'foo': 'foo'}), 'foo') self.assertEqual(parsed.search({'bar': 'bar'}), 'bar') self.assertEqual(parsed.search({'foo': 'foo', 'bar': 'bar'}), 'foo') self.assertEqual(parsed.search({'bad': 'bad'}), None) def test_complex_or_expression(self): parsed = self.parser.parse('foo.foo || foo.bar') self.assertEqual(parsed.search({'foo': {'foo': 'foo'}}), 'foo') self.assertEqual(parsed.search({'foo': {'bar': 'bar'}}), 'bar') self.assertEqual(parsed.search({'foo': {'baz': 'baz'}}), None) def test_or_repr(self): self.assert_parsed_ast('foo || bar', ast.or_expression(ast.field('foo'), ast.field('bar'))) def test_unicode_literals_escaped(self): self.assert_parsed_ast(r'`"\u2713"`', ast.literal(u'\u2713')) def test_multiselect(self): parsed = self.parser.parse('foo.{bar: bar,baz: baz}') self.assertEqual( parsed.search({'foo': {'bar': 'bar', 'baz': 'baz', 'qux': 'qux'}}), {'bar': 'bar', 'baz': 'baz'}) def test_multiselect_subexpressions(self): parsed = self.parser.parse('foo.{"bar.baz": bar.baz, qux: qux}') self.assertEqual( parsed.search({'foo': {'bar': {'baz': 'CORRECT'}, 'qux': 'qux'}}), {'bar.baz': 'CORRECT', 'qux': 'qux'}) def test_multiselect_with_all_quoted_keys(self): parsed = self.parser.parse('foo.{"bar": bar.baz, "qux": qux}') result = parsed.search({'foo': {'bar': {'baz': 'CORRECT'}, 'qux': 'qux'}}) self.assertEqual(result, {"bar": "CORRECT", "qux": "qux"}) def test_function_call_with_and_statement(self): self.assert_parsed_ast( 'f(@ && @)', {'children': [{'children': [{'children': [], 'type': 'current'}, {'children': [], 'type': 'current'}], 'type': 'and_expression'}], 'type': 'function_expression', 'value': 'f'}) class TestErrorMessages(unittest.TestCase): def setUp(self): self.parser = parser.Parser() def assert_error_message(self, expression, error_message, exception=exceptions.ParseError): try: self.parser.parse(expression) except exception as e: self.assertEqual(error_message, str(e)) return except Exception as e: self.fail( "Unexpected error raised (%s: %s) for bad expression: %s" % (e.__class__.__name__, e, expression)) else: self.fail( "ParseError not raised for bad expression: %s" % expression) def test_bad_parse(self): with self.assertRaises(exceptions.ParseError): self.parser.parse('foo]baz') def test_bad_parse_error_message(self): error_message = ( 'Unexpected token: ]: Parse error at column 3, ' 'token "]" (RBRACKET), for expression:\n' '"foo]baz"\n' ' ^') self.assert_error_message('foo]baz', error_message) def test_bad_parse_error_message_with_multiselect(self): error_message = ( 'Invalid jmespath expression: Incomplete expression:\n' '"foo.{bar: baz,bar: bar"\n' ' ^') self.assert_error_message('foo.{bar: baz,bar: bar', error_message) def test_incomplete_expression_with_missing_paren(self): error_message = ( 'Invalid jmespath expression: Incomplete expression:\n' '"length(@,"\n' ' ^') self.assert_error_message('length(@,', error_message) def test_bad_lexer_values(self): error_message = ( 'Bad jmespath expression: ' 'Unclosed " delimiter:\n' 'foo."bar\n' ' ^') self.assert_error_message('foo."bar', error_message, exception=exceptions.LexerError) def test_bad_unicode_string(self): # This error message is straight from the JSON parser # and pypy has a slightly different error message, # so we're not using assert_error_message. error_message = re.compile( r'Bad jmespath expression: ' r'Invalid \\uXXXX escape.*\\uAZ12', re.DOTALL) with self.assertRaisesRegexp(exceptions.LexerError, error_message): self.parser.parse(r'"\uAZ12"') class TestParserWildcards(unittest.TestCase): def setUp(self): self.parser = parser.Parser() self.data = { 'foo': [ {'bar': [{'baz': 'one'}, {'baz': 'two'}]}, {'bar': [{'baz': 'three'}, {'baz': 'four'}, {'baz': 'five'}]}, ] } def test_multiple_index_wildcards(self): parsed = self.parser.parse('foo[*].bar[*].baz') self.assertEqual(parsed.search(self.data), [['one', 'two'], ['three', 'four', 'five']]) def test_wildcard_mix_with_indices(self): parsed = self.parser.parse('foo[*].bar[0].baz') self.assertEqual(parsed.search(self.data), ['one', 'three']) def test_wildcard_mix_last(self): parsed = self.parser.parse('foo[0].bar[*].baz') self.assertEqual(parsed.search(self.data), ['one', 'two']) def test_indices_out_of_bounds(self): parsed = self.parser.parse('foo[*].bar[2].baz') self.assertEqual(parsed.search(self.data), ['five']) def test_root_indices(self): parsed = self.parser.parse('[0]') self.assertEqual(parsed.search(['one', 'two']), 'one') def test_root_wildcard(self): parsed = self.parser.parse('*.foo') data = {'top1': {'foo': 'bar'}, 'top2': {'foo': 'baz'}, 'top3': {'notfoo': 'notfoo'}} # Sorted is being used because the order of the keys are not # required to be in any specific order. self.assertEqual(sorted(parsed.search(data)), sorted(['bar', 'baz'])) self.assertEqual(sorted(self.parser.parse('*.notfoo').search(data)), sorted(['notfoo'])) def test_only_wildcard(self): parsed = self.parser.parse('*') data = {'foo': 'a', 'bar': 'b', 'baz': 'c'} self.assertEqual(sorted(parsed.search(data)), sorted(['a', 'b', 'c'])) def test_escape_sequences(self): self.assertEqual(self.parser.parse(r'"foo\tbar"').search( {'foo\tbar': 'baz'}), 'baz') self.assertEqual(self.parser.parse(r'"foo\nbar"').search( {'foo\nbar': 'baz'}), 'baz') self.assertEqual(self.parser.parse(r'"foo\bbar"').search( {'foo\bbar': 'baz'}), 'baz') self.assertEqual(self.parser.parse(r'"foo\fbar"').search( {'foo\fbar': 'baz'}), 'baz') self.assertEqual(self.parser.parse(r'"foo\rbar"').search( {'foo\rbar': 'baz'}), 'baz') def test_consecutive_escape_sequences(self): parsed = self.parser.parse(r'"foo\\nbar"') self.assertEqual(parsed.search({'foo\\nbar': 'baz'}), 'baz') parsed = self.parser.parse(r'"foo\n\t\rbar"') self.assertEqual(parsed.search({'foo\n\t\rbar': 'baz'}), 'baz') def test_escape_sequence_at_end_of_string_not_allowed(self): with self.assertRaises(ValueError): self.parser.parse('foobar\\') def test_wildcard_with_multiselect(self): parsed = self.parser.parse('foo.*.{a: a, b: b}') data = { 'foo': { 'one': { 'a': {'c': 'CORRECT', 'd': 'other'}, 'b': {'c': 'ALSOCORRECT', 'd': 'other'}, }, 'two': { 'a': {'c': 'CORRECT', 'd': 'other'}, 'c': {'c': 'WRONG', 'd': 'other'}, }, } } match = parsed.search(data) self.assertEqual(len(match), 2) self.assertIn('a', match[0]) self.assertIn('b', match[0]) self.assertIn('a', match[1]) self.assertIn('b', match[1]) class TestMergedLists(unittest.TestCase): def setUp(self): self.parser = parser.Parser() self.data = { "foo": [ [["one", "two"], ["three", "four"]], [["five", "six"], ["seven", "eight"]], [["nine"], ["ten"]] ] } def test_merge_with_indices(self): parsed = self.parser.parse('foo[][0]') match = parsed.search(self.data) self.assertEqual(match, ["one", "three", "five", "seven", "nine", "ten"]) def test_trailing_merged_operator(self): parsed = self.parser.parse('foo[]') match = parsed.search(self.data) self.assertEqual( match, [["one", "two"], ["three", "four"], ["five", "six"], ["seven", "eight"], ["nine"], ["ten"]]) class TestParserCaching(unittest.TestCase): def test_compile_lots_of_expressions(self): # We have to be careful here because this is an implementation detail # that should be abstracted from the user, but we need to make sure we # exercise the code and that it doesn't blow up. p = parser.Parser() compiled = [] compiled2 = [] for i in range(parser.Parser._MAX_SIZE + 1): compiled.append(p.parse('foo%s' % i)) # Rerun the test and half of these entries should be from the # cache but they should still be equal to compiled. for i in range(parser.Parser._MAX_SIZE + 1): compiled2.append(p.parse('foo%s' % i)) self.assertEqual(len(compiled), len(compiled2)) self.assertEqual( [expr.parsed for expr in compiled], [expr.parsed for expr in compiled2]) def test_cache_purge(self): p = parser.Parser() first = p.parse('foo') cached = p.parse('foo') p.purge() second = p.parse('foo') self.assertEqual(first.parsed, second.parsed) self.assertEqual(first.parsed, cached.parsed) class TestParserAddsExpressionAttribute(unittest.TestCase): def test_expression_available_from_parser(self): p = parser.Parser() parsed = p.parse('foo.bar') self.assertEqual(parsed.expression, 'foo.bar') class TestParsedResultAddsOptions(unittest.TestCase): def test_can_have_ordered_dict(self): p = parser.Parser() parsed = p.parse('{a: a, b: b, c: c}') options = visitor.Options(dict_cls=OrderedDict) result = parsed.search( {"c": "c", "b": "b", "a": "a"}, options=options) # The order should be 'a', 'b' because we're using an # OrderedDict self.assertEqual(list(result), ['a', 'b', 'c']) class TestRenderGraphvizFile(unittest.TestCase): def test_dot_file_rendered(self): p = parser.Parser() result = p.parse('foo') dot_contents = result._render_dot_file() self.assertEqual(dot_contents, 'digraph AST {\nfield1 [label="field(foo)"]\n}') def test_dot_file_subexpr(self): p = parser.Parser() result = p.parse('foo.bar') dot_contents = result._render_dot_file() self.assertEqual( dot_contents, 'digraph AST {\n' 'subexpression1 [label="subexpression()"]\n' ' subexpression1 -> field2\n' 'field2 [label="field(foo)"]\n' ' subexpression1 -> field3\n' 'field3 [label="field(bar)"]\n}') if __name__ == '__main__': unittest.main()