test_parser.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. #!/usr/bin/env python
  2. import re
  3. import random
  4. import string
  5. import threading
  6. from . import unittest, OrderedDict
  7. from jmespath import parser
  8. from jmespath import visitor
  9. from jmespath import ast
  10. from jmespath import exceptions
  11. class TestParser(unittest.TestCase):
  12. def setUp(self):
  13. self.parser = parser.Parser()
  14. def assert_parsed_ast(self, expression, expected_ast):
  15. parsed = self.parser.parse(expression)
  16. self.assertEqual(parsed.parsed, expected_ast)
  17. def test_parse_empty_string_raises_exception(self):
  18. with self.assertRaises(exceptions.EmptyExpressionError):
  19. self.parser.parse('')
  20. def test_field(self):
  21. self.assert_parsed_ast('foo', ast.field('foo'))
  22. def test_dot_syntax(self):
  23. self.assert_parsed_ast('foo.bar',
  24. ast.subexpression([ast.field('foo'),
  25. ast.field('bar')]))
  26. def test_multiple_dots(self):
  27. parsed = self.parser.parse('foo.bar.baz')
  28. self.assertEqual(
  29. parsed.search({'foo': {'bar': {'baz': 'correct'}}}), 'correct')
  30. def test_index(self):
  31. parsed = self.parser.parse('foo[1]')
  32. self.assertEqual(
  33. parsed.search({'foo': ['zero', 'one', 'two']}),
  34. 'one')
  35. def test_quoted_subexpression(self):
  36. self.assert_parsed_ast('"foo"."bar"',
  37. ast.subexpression([
  38. ast.field('foo'),
  39. ast.field('bar')]))
  40. def test_wildcard(self):
  41. parsed = self.parser.parse('foo[*]')
  42. self.assertEqual(
  43. parsed.search({'foo': ['zero', 'one', 'two']}),
  44. ['zero', 'one', 'two'])
  45. def test_wildcard_with_children(self):
  46. parsed = self.parser.parse('foo[*].bar')
  47. self.assertEqual(
  48. parsed.search({'foo': [{'bar': 'one'}, {'bar': 'two'}]}),
  49. ['one', 'two'])
  50. def test_or_expression(self):
  51. parsed = self.parser.parse('foo || bar')
  52. self.assertEqual(parsed.search({'foo': 'foo'}), 'foo')
  53. self.assertEqual(parsed.search({'bar': 'bar'}), 'bar')
  54. self.assertEqual(parsed.search({'foo': 'foo', 'bar': 'bar'}), 'foo')
  55. self.assertEqual(parsed.search({'bad': 'bad'}), None)
  56. def test_complex_or_expression(self):
  57. parsed = self.parser.parse('foo.foo || foo.bar')
  58. self.assertEqual(parsed.search({'foo': {'foo': 'foo'}}), 'foo')
  59. self.assertEqual(parsed.search({'foo': {'bar': 'bar'}}), 'bar')
  60. self.assertEqual(parsed.search({'foo': {'baz': 'baz'}}), None)
  61. def test_or_repr(self):
  62. self.assert_parsed_ast('foo || bar', ast.or_expression(ast.field('foo'),
  63. ast.field('bar')))
  64. def test_unicode_literals_escaped(self):
  65. self.assert_parsed_ast(r'`"\u2713"`', ast.literal(u'\u2713'))
  66. def test_multiselect(self):
  67. parsed = self.parser.parse('foo.{bar: bar,baz: baz}')
  68. self.assertEqual(
  69. parsed.search({'foo': {'bar': 'bar', 'baz': 'baz', 'qux': 'qux'}}),
  70. {'bar': 'bar', 'baz': 'baz'})
  71. def test_multiselect_subexpressions(self):
  72. parsed = self.parser.parse('foo.{"bar.baz": bar.baz, qux: qux}')
  73. self.assertEqual(
  74. parsed.search({'foo': {'bar': {'baz': 'CORRECT'}, 'qux': 'qux'}}),
  75. {'bar.baz': 'CORRECT', 'qux': 'qux'})
  76. def test_multiselect_with_all_quoted_keys(self):
  77. parsed = self.parser.parse('foo.{"bar": bar.baz, "qux": qux}')
  78. result = parsed.search({'foo': {'bar': {'baz': 'CORRECT'}, 'qux': 'qux'}})
  79. self.assertEqual(result, {"bar": "CORRECT", "qux": "qux"})
  80. def test_function_call_with_and_statement(self):
  81. self.assert_parsed_ast(
  82. 'f(@ && @)',
  83. {'children': [{'children': [{'children': [], 'type': 'current'},
  84. {'children': [], 'type': 'current'}],
  85. 'type': 'and_expression'}],
  86. 'type': 'function_expression',
  87. 'value': 'f'})
  88. class TestErrorMessages(unittest.TestCase):
  89. def setUp(self):
  90. self.parser = parser.Parser()
  91. def assert_error_message(self, expression, error_message,
  92. exception=exceptions.ParseError):
  93. try:
  94. self.parser.parse(expression)
  95. except exception as e:
  96. self.assertEqual(error_message, str(e))
  97. return
  98. except Exception as e:
  99. self.fail(
  100. "Unexpected error raised (%s: %s) for bad expression: %s" %
  101. (e.__class__.__name__, e, expression))
  102. else:
  103. self.fail(
  104. "ParseError not raised for bad expression: %s" % expression)
  105. def test_bad_parse(self):
  106. with self.assertRaises(exceptions.ParseError):
  107. self.parser.parse('foo]baz')
  108. def test_bad_parse_error_message(self):
  109. error_message = (
  110. 'Unexpected token: ]: Parse error at column 3, '
  111. 'token "]" (RBRACKET), for expression:\n'
  112. '"foo]baz"\n'
  113. ' ^')
  114. self.assert_error_message('foo]baz', error_message)
  115. def test_bad_parse_error_message_with_multiselect(self):
  116. error_message = (
  117. 'Invalid jmespath expression: Incomplete expression:\n'
  118. '"foo.{bar: baz,bar: bar"\n'
  119. ' ^')
  120. self.assert_error_message('foo.{bar: baz,bar: bar', error_message)
  121. def test_incomplete_expression_with_missing_paren(self):
  122. error_message = (
  123. 'Invalid jmespath expression: Incomplete expression:\n'
  124. '"length(@,"\n'
  125. ' ^')
  126. self.assert_error_message('length(@,', error_message)
  127. def test_bad_lexer_values(self):
  128. error_message = (
  129. 'Bad jmespath expression: '
  130. 'Unclosed " delimiter:\n'
  131. 'foo."bar\n'
  132. ' ^')
  133. self.assert_error_message('foo."bar', error_message,
  134. exception=exceptions.LexerError)
  135. def test_bad_unicode_string(self):
  136. # This error message is straight from the JSON parser
  137. # and pypy has a slightly different error message,
  138. # so we're not using assert_error_message.
  139. error_message = re.compile(
  140. r'Bad jmespath expression: '
  141. r'Invalid \\uXXXX escape.*\\uAZ12', re.DOTALL)
  142. with self.assertRaisesRegex(exceptions.LexerError, error_message):
  143. self.parser.parse(r'"\uAZ12"')
  144. class TestParserWildcards(unittest.TestCase):
  145. def setUp(self):
  146. self.parser = parser.Parser()
  147. self.data = {
  148. 'foo': [
  149. {'bar': [{'baz': 'one'}, {'baz': 'two'}]},
  150. {'bar': [{'baz': 'three'}, {'baz': 'four'}, {'baz': 'five'}]},
  151. ]
  152. }
  153. def test_multiple_index_wildcards(self):
  154. parsed = self.parser.parse('foo[*].bar[*].baz')
  155. self.assertEqual(parsed.search(self.data),
  156. [['one', 'two'], ['three', 'four', 'five']])
  157. def test_wildcard_mix_with_indices(self):
  158. parsed = self.parser.parse('foo[*].bar[0].baz')
  159. self.assertEqual(parsed.search(self.data),
  160. ['one', 'three'])
  161. def test_wildcard_mix_last(self):
  162. parsed = self.parser.parse('foo[0].bar[*].baz')
  163. self.assertEqual(parsed.search(self.data),
  164. ['one', 'two'])
  165. def test_indices_out_of_bounds(self):
  166. parsed = self.parser.parse('foo[*].bar[2].baz')
  167. self.assertEqual(parsed.search(self.data),
  168. ['five'])
  169. def test_root_indices(self):
  170. parsed = self.parser.parse('[0]')
  171. self.assertEqual(parsed.search(['one', 'two']), 'one')
  172. def test_root_wildcard(self):
  173. parsed = self.parser.parse('*.foo')
  174. data = {'top1': {'foo': 'bar'}, 'top2': {'foo': 'baz'},
  175. 'top3': {'notfoo': 'notfoo'}}
  176. # Sorted is being used because the order of the keys are not
  177. # required to be in any specific order.
  178. self.assertEqual(sorted(parsed.search(data)), sorted(['bar', 'baz']))
  179. self.assertEqual(sorted(self.parser.parse('*.notfoo').search(data)),
  180. sorted(['notfoo']))
  181. def test_only_wildcard(self):
  182. parsed = self.parser.parse('*')
  183. data = {'foo': 'a', 'bar': 'b', 'baz': 'c'}
  184. self.assertEqual(sorted(parsed.search(data)), sorted(['a', 'b', 'c']))
  185. def test_escape_sequences(self):
  186. self.assertEqual(self.parser.parse(r'"foo\tbar"').search(
  187. {'foo\tbar': 'baz'}), 'baz')
  188. self.assertEqual(self.parser.parse(r'"foo\nbar"').search(
  189. {'foo\nbar': 'baz'}), 'baz')
  190. self.assertEqual(self.parser.parse(r'"foo\bbar"').search(
  191. {'foo\bbar': 'baz'}), 'baz')
  192. self.assertEqual(self.parser.parse(r'"foo\fbar"').search(
  193. {'foo\fbar': 'baz'}), 'baz')
  194. self.assertEqual(self.parser.parse(r'"foo\rbar"').search(
  195. {'foo\rbar': 'baz'}), 'baz')
  196. def test_consecutive_escape_sequences(self):
  197. parsed = self.parser.parse(r'"foo\\nbar"')
  198. self.assertEqual(parsed.search({'foo\\nbar': 'baz'}), 'baz')
  199. parsed = self.parser.parse(r'"foo\n\t\rbar"')
  200. self.assertEqual(parsed.search({'foo\n\t\rbar': 'baz'}), 'baz')
  201. def test_escape_sequence_at_end_of_string_not_allowed(self):
  202. with self.assertRaises(ValueError):
  203. self.parser.parse('foobar\\')
  204. def test_wildcard_with_multiselect(self):
  205. parsed = self.parser.parse('foo.*.{a: a, b: b}')
  206. data = {
  207. 'foo': {
  208. 'one': {
  209. 'a': {'c': 'CORRECT', 'd': 'other'},
  210. 'b': {'c': 'ALSOCORRECT', 'd': 'other'},
  211. },
  212. 'two': {
  213. 'a': {'c': 'CORRECT', 'd': 'other'},
  214. 'c': {'c': 'WRONG', 'd': 'other'},
  215. },
  216. }
  217. }
  218. match = parsed.search(data)
  219. self.assertEqual(len(match), 2)
  220. self.assertIn('a', match[0])
  221. self.assertIn('b', match[0])
  222. self.assertIn('a', match[1])
  223. self.assertIn('b', match[1])
  224. class TestMergedLists(unittest.TestCase):
  225. def setUp(self):
  226. self.parser = parser.Parser()
  227. self.data = {
  228. "foo": [
  229. [["one", "two"], ["three", "four"]],
  230. [["five", "six"], ["seven", "eight"]],
  231. [["nine"], ["ten"]]
  232. ]
  233. }
  234. def test_merge_with_indices(self):
  235. parsed = self.parser.parse('foo[][0]')
  236. match = parsed.search(self.data)
  237. self.assertEqual(match, ["one", "three", "five", "seven",
  238. "nine", "ten"])
  239. def test_trailing_merged_operator(self):
  240. parsed = self.parser.parse('foo[]')
  241. match = parsed.search(self.data)
  242. self.assertEqual(
  243. match,
  244. [["one", "two"], ["three", "four"],
  245. ["five", "six"], ["seven", "eight"],
  246. ["nine"], ["ten"]])
  247. class TestParserCaching(unittest.TestCase):
  248. def test_compile_lots_of_expressions(self):
  249. # We have to be careful here because this is an implementation detail
  250. # that should be abstracted from the user, but we need to make sure we
  251. # exercise the code and that it doesn't blow up.
  252. p = parser.Parser()
  253. compiled = []
  254. compiled2 = []
  255. for i in range(parser.Parser._MAX_SIZE + 1):
  256. compiled.append(p.parse('foo%s' % i))
  257. # Rerun the test and half of these entries should be from the
  258. # cache but they should still be equal to compiled.
  259. for i in range(parser.Parser._MAX_SIZE + 1):
  260. compiled2.append(p.parse('foo%s' % i))
  261. self.assertEqual(len(compiled), len(compiled2))
  262. self.assertEqual(
  263. [expr.parsed for expr in compiled],
  264. [expr.parsed for expr in compiled2])
  265. def test_cache_purge(self):
  266. p = parser.Parser()
  267. first = p.parse('foo')
  268. cached = p.parse('foo')
  269. p.purge()
  270. second = p.parse('foo')
  271. self.assertEqual(first.parsed,
  272. second.parsed)
  273. self.assertEqual(first.parsed,
  274. cached.parsed)
  275. def test_thread_safety_of_cache(self):
  276. errors = []
  277. expressions = [
  278. ''.join(random.choice(string.ascii_letters) for _ in range(3))
  279. for _ in range(2000)
  280. ]
  281. def worker():
  282. p = parser.Parser()
  283. for expression in expressions:
  284. try:
  285. p.parse(expression)
  286. except Exception as e:
  287. errors.append(e)
  288. threads = []
  289. for i in range(10):
  290. threads.append(threading.Thread(target=worker))
  291. for thread in threads:
  292. thread.start()
  293. for thread in threads:
  294. thread.join()
  295. self.assertEqual(errors, [])
  296. class TestParserAddsExpressionAttribute(unittest.TestCase):
  297. def test_expression_available_from_parser(self):
  298. p = parser.Parser()
  299. parsed = p.parse('foo.bar')
  300. self.assertEqual(parsed.expression, 'foo.bar')
  301. class TestParsedResultAddsOptions(unittest.TestCase):
  302. def test_can_have_ordered_dict(self):
  303. p = parser.Parser()
  304. parsed = p.parse('{a: a, b: b, c: c}')
  305. options = visitor.Options(dict_cls=OrderedDict)
  306. result = parsed.search(
  307. {"c": "c", "b": "b", "a": "a"}, options=options)
  308. # The order should be 'a', 'b' because we're using an
  309. # OrderedDict
  310. self.assertEqual(list(result), ['a', 'b', 'c'])
  311. class TestRenderGraphvizFile(unittest.TestCase):
  312. def test_dot_file_rendered(self):
  313. p = parser.Parser()
  314. result = p.parse('foo')
  315. dot_contents = result._render_dot_file()
  316. self.assertEqual(dot_contents,
  317. 'digraph AST {\nfield1 [label="field(foo)"]\n}')
  318. def test_dot_file_subexpr(self):
  319. p = parser.Parser()
  320. result = p.parse('foo.bar')
  321. dot_contents = result._render_dot_file()
  322. self.assertEqual(
  323. dot_contents,
  324. 'digraph AST {\n'
  325. 'subexpression1 [label="subexpression()"]\n'
  326. ' subexpression1 -> field2\n'
  327. 'field2 [label="field(foo)"]\n'
  328. ' subexpression1 -> field3\n'
  329. 'field3 [label="field(bar)"]\n}')
  330. if __name__ == '__main__':
  331. unittest.main()