test_xmltodict.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. from xmltodict import parse, ParsingInterrupted
  2. import collections
  3. import unittest
  4. try:
  5. from io import BytesIO as StringIO
  6. except ImportError:
  7. from xmltodict import StringIO
  8. from xml.parsers.expat import ParserCreate
  9. from xml.parsers import expat
  10. def _encode(s):
  11. try:
  12. return bytes(s, 'ascii')
  13. except (NameError, TypeError):
  14. return s
  15. class XMLToDictTestCase(unittest.TestCase):
  16. def test_string_vs_file(self):
  17. xml = '<a>data</a>'
  18. self.assertEqual(parse(xml),
  19. parse(StringIO(_encode(xml))))
  20. def test_minimal(self):
  21. self.assertEqual(parse('<a/>'),
  22. {'a': None})
  23. self.assertEqual(parse('<a/>', force_cdata=True),
  24. {'a': None})
  25. def test_simple(self):
  26. self.assertEqual(parse('<a>data</a>'),
  27. {'a': 'data'})
  28. def test_force_cdata(self):
  29. self.assertEqual(parse('<a>data</a>', force_cdata=True),
  30. {'a': {'#text': 'data'}})
  31. def test_custom_cdata(self):
  32. self.assertEqual(parse('<a>data</a>',
  33. force_cdata=True,
  34. cdata_key='_CDATA_'),
  35. {'a': {'_CDATA_': 'data'}})
  36. def test_list(self):
  37. self.assertEqual(parse('<a><b>1</b><b>2</b><b>3</b></a>'),
  38. {'a': {'b': ['1', '2', '3']}})
  39. def test_attrib(self):
  40. self.assertEqual(parse('<a href="xyz"/>'),
  41. {'a': {'@href': 'xyz'}})
  42. def test_skip_attrib(self):
  43. self.assertEqual(parse('<a href="xyz"/>', xml_attribs=False),
  44. {'a': None})
  45. def test_custom_attrib(self):
  46. self.assertEqual(parse('<a href="xyz"/>',
  47. attr_prefix='!'),
  48. {'a': {'!href': 'xyz'}})
  49. def test_attrib_and_cdata(self):
  50. self.assertEqual(parse('<a href="xyz">123</a>'),
  51. {'a': {'@href': 'xyz', '#text': '123'}})
  52. def test_semi_structured(self):
  53. self.assertEqual(parse('<a>abc<b/>def</a>'),
  54. {'a': {'b': None, '#text': 'abcdef'}})
  55. self.assertEqual(parse('<a>abc<b/>def</a>',
  56. cdata_separator='\n'),
  57. {'a': {'b': None, '#text': 'abc\ndef'}})
  58. def test_nested_semi_structured(self):
  59. self.assertEqual(parse('<a>abc<b>123<c/>456</b>def</a>'),
  60. {'a': {'#text': 'abcdef', 'b': {
  61. '#text': '123456', 'c': None}}})
  62. def test_skip_whitespace(self):
  63. xml = """
  64. <root>
  65. <emptya> </emptya>
  66. <emptyb attr="attrvalue">
  67. </emptyb>
  68. <value>hello</value>
  69. </root>
  70. """
  71. self.assertEqual(
  72. parse(xml),
  73. {'root': {'emptya': None,
  74. 'emptyb': {'@attr': 'attrvalue'},
  75. 'value': 'hello'}})
  76. def test_keep_whitespace(self):
  77. xml = "<root> </root>"
  78. self.assertEqual(parse(xml), dict(root=None))
  79. self.assertEqual(parse(xml, strip_whitespace=False),
  80. dict(root=' '))
  81. def test_streaming(self):
  82. def cb(path, item):
  83. cb.count += 1
  84. self.assertEqual(path, [('a', {'x': 'y'}), ('b', None)])
  85. self.assertEqual(item, str(cb.count))
  86. return True
  87. cb.count = 0
  88. parse('<a x="y"><b>1</b><b>2</b><b>3</b></a>',
  89. item_depth=2, item_callback=cb)
  90. self.assertEqual(cb.count, 3)
  91. def test_streaming_interrupt(self):
  92. cb = lambda path, item: False
  93. self.assertRaises(ParsingInterrupted,
  94. parse, '<a>x</a>',
  95. item_depth=1, item_callback=cb)
  96. def test_streaming_generator(self):
  97. def cb(path, item):
  98. cb.count += 1
  99. self.assertEqual(path, [('a', {'x': 'y'}), ('b', None)])
  100. self.assertEqual(item, str(cb.count))
  101. return True
  102. cb.count = 0
  103. parse((n for n in '<a x="y"><b>1</b><b>2</b><b>3</b></a>'),
  104. item_depth=2, item_callback=cb)
  105. self.assertEqual(cb.count, 3)
  106. def test_postprocessor(self):
  107. def postprocessor(path, key, value):
  108. try:
  109. return key + ':int', int(value)
  110. except (ValueError, TypeError):
  111. return key, value
  112. self.assertEqual({'a': {'b:int': [1, 2], 'b': 'x'}},
  113. parse('<a><b>1</b><b>2</b><b>x</b></a>',
  114. postprocessor=postprocessor))
  115. def test_postprocessor_attribute(self):
  116. def postprocessor(path, key, value):
  117. try:
  118. return key + ':int', int(value)
  119. except (ValueError, TypeError):
  120. return key, value
  121. self.assertEqual({'a': {'@b:int': 1}},
  122. parse('<a b="1"/>',
  123. postprocessor=postprocessor))
  124. def test_postprocessor_skip(self):
  125. def postprocessor(path, key, value):
  126. if key == 'b':
  127. value = int(value)
  128. if value == 3:
  129. return None
  130. return key, value
  131. self.assertEqual({'a': {'b': [1, 2]}},
  132. parse('<a><b>1</b><b>2</b><b>3</b></a>',
  133. postprocessor=postprocessor))
  134. def test_unicode(self):
  135. try:
  136. value = unichr(39321)
  137. except NameError:
  138. value = chr(39321)
  139. self.assertEqual({'a': value},
  140. parse('<a>%s</a>' % value))
  141. def test_encoded_string(self):
  142. try:
  143. value = unichr(39321)
  144. except NameError:
  145. value = chr(39321)
  146. xml = '<a>%s</a>' % value
  147. self.assertEqual(parse(xml),
  148. parse(xml.encode('utf-8')))
  149. def test_namespace_support(self):
  150. xml = """
  151. <root xmlns="http://defaultns.com/"
  152. xmlns:a="http://a.com/"
  153. xmlns:b="http://b.com/"
  154. version="1.00">
  155. <x a:attr="val">1</x>
  156. <a:y>2</a:y>
  157. <b:z>3</b:z>
  158. </root>
  159. """
  160. d = {
  161. 'http://defaultns.com/:root': {
  162. '@version': '1.00',
  163. '@xmlns': {
  164. '': 'http://defaultns.com/',
  165. 'a': 'http://a.com/',
  166. 'b': 'http://b.com/',
  167. },
  168. 'http://defaultns.com/:x': {
  169. '@http://a.com/:attr': 'val',
  170. '#text': '1',
  171. },
  172. 'http://a.com/:y': '2',
  173. 'http://b.com/:z': '3',
  174. }
  175. }
  176. res = parse(xml, process_namespaces=True)
  177. self.assertEqual(res, d)
  178. def test_namespace_collapse(self):
  179. xml = """
  180. <root xmlns="http://defaultns.com/"
  181. xmlns:a="http://a.com/"
  182. xmlns:b="http://b.com/"
  183. version="1.00">
  184. <x a:attr="val">1</x>
  185. <a:y>2</a:y>
  186. <b:z>3</b:z>
  187. </root>
  188. """
  189. namespaces = {
  190. 'http://defaultns.com/': '',
  191. 'http://a.com/': 'ns_a',
  192. }
  193. d = {
  194. 'root': {
  195. '@version': '1.00',
  196. '@xmlns': {
  197. '': 'http://defaultns.com/',
  198. 'a': 'http://a.com/',
  199. 'b': 'http://b.com/',
  200. },
  201. 'x': {
  202. '@ns_a:attr': 'val',
  203. '#text': '1',
  204. },
  205. 'ns_a:y': '2',
  206. 'http://b.com/:z': '3',
  207. },
  208. }
  209. res = parse(xml, process_namespaces=True, namespaces=namespaces)
  210. self.assertEqual(res, d)
  211. def test_namespace_collapse_all(self):
  212. xml = """
  213. <root xmlns="http://defaultns.com/"
  214. xmlns:a="http://a.com/"
  215. xmlns:b="http://b.com/"
  216. version="1.00">
  217. <x a:attr="val">1</x>
  218. <a:y>2</a:y>
  219. <b:z>3</b:z>
  220. </root>
  221. """
  222. namespaces = collections.defaultdict(lambda: None)
  223. d = {
  224. 'root': {
  225. '@version': '1.00',
  226. '@xmlns': {
  227. '': 'http://defaultns.com/',
  228. 'a': 'http://a.com/',
  229. 'b': 'http://b.com/',
  230. },
  231. 'x': {
  232. '@attr': 'val',
  233. '#text': '1',
  234. },
  235. 'y': '2',
  236. 'z': '3',
  237. },
  238. }
  239. res = parse(xml, process_namespaces=True, namespaces=namespaces)
  240. self.assertEqual(res, d)
  241. def test_namespace_ignore(self):
  242. xml = """
  243. <root xmlns="http://defaultns.com/"
  244. xmlns:a="http://a.com/"
  245. xmlns:b="http://b.com/"
  246. version="1.00">
  247. <x>1</x>
  248. <a:y>2</a:y>
  249. <b:z>3</b:z>
  250. </root>
  251. """
  252. d = {
  253. 'root': {
  254. '@xmlns': 'http://defaultns.com/',
  255. '@xmlns:a': 'http://a.com/',
  256. '@xmlns:b': 'http://b.com/',
  257. '@version': '1.00',
  258. 'x': '1',
  259. 'a:y': '2',
  260. 'b:z': '3',
  261. },
  262. }
  263. self.assertEqual(parse(xml), d)
  264. def test_force_list_basic(self):
  265. xml = """
  266. <servers>
  267. <server>
  268. <name>server1</name>
  269. <os>os1</os>
  270. </server>
  271. </servers>
  272. """
  273. expectedResult = {
  274. 'servers': {
  275. 'server': [
  276. {
  277. 'name': 'server1',
  278. 'os': 'os1',
  279. },
  280. ],
  281. }
  282. }
  283. self.assertEqual(parse(xml, force_list=('server',)), expectedResult)
  284. def test_force_list_callable(self):
  285. xml = """
  286. <config>
  287. <servers>
  288. <server>
  289. <name>server1</name>
  290. <os>os1</os>
  291. </server>
  292. </servers>
  293. <skip>
  294. <server></server>
  295. </skip>
  296. </config>
  297. """
  298. def force_list(path, key, value):
  299. """Only return True for servers/server, but not for skip/server."""
  300. if key != 'server':
  301. return False
  302. return path and path[-1][0] == 'servers'
  303. expectedResult = {
  304. 'config': {
  305. 'servers': {
  306. 'server': [
  307. {
  308. 'name': 'server1',
  309. 'os': 'os1',
  310. },
  311. ],
  312. },
  313. 'skip': {
  314. 'server': None,
  315. },
  316. },
  317. }
  318. self.assertEqual(parse(xml, force_list=force_list, dict_constructor=dict), expectedResult)
  319. def test_disable_entities_true_ignores_xmlbomb(self):
  320. xml = """
  321. <!DOCTYPE xmlbomb [
  322. <!ENTITY a "1234567890" >
  323. <!ENTITY b "&a;&a;&a;&a;&a;&a;&a;&a;">
  324. <!ENTITY c "&b;&b;&b;&b;&b;&b;&b;&b;">
  325. ]>
  326. <bomb>&c;</bomb>
  327. """
  328. expectedResult = {'bomb': None}
  329. try:
  330. parse_attempt = parse(xml, disable_entities=True)
  331. except expat.ExpatError:
  332. self.assertTrue(True)
  333. else:
  334. self.assertEqual(parse_attempt, expectedResult)
  335. def test_disable_entities_false_returns_xmlbomb(self):
  336. xml = """
  337. <!DOCTYPE xmlbomb [
  338. <!ENTITY a "1234567890" >
  339. <!ENTITY b "&a;&a;&a;&a;&a;&a;&a;&a;">
  340. <!ENTITY c "&b;&b;&b;&b;&b;&b;&b;&b;">
  341. ]>
  342. <bomb>&c;</bomb>
  343. """
  344. bomb = "1234567890" * 64
  345. expectedResult = {'bomb': bomb}
  346. self.assertEqual(parse(xml, disable_entities=False), expectedResult)
  347. def test_disable_entities_true_ignores_external_dtd(self):
  348. xml = """
  349. <!DOCTYPE external [
  350. <!ENTITY ee SYSTEM "http://www.python.org/">
  351. ]>
  352. <root>&ee;</root>
  353. """
  354. expectedResult = {'root': None}
  355. try:
  356. parse_attempt = parse(xml, disable_entities=True)
  357. except expat.ExpatError:
  358. self.assertTrue(True)
  359. else:
  360. self.assertEqual(parse_attempt, expectedResult)
  361. def test_disable_entities_true_attempts_external_dtd(self):
  362. xml = """
  363. <!DOCTYPE external [
  364. <!ENTITY ee SYSTEM "http://www.python.org/">
  365. ]>
  366. <root>&ee;</root>
  367. """
  368. def raising_external_ref_handler(*args, **kwargs):
  369. parser = ParserCreate(*args, **kwargs)
  370. parser.ExternalEntityRefHandler = lambda *x: 0
  371. try:
  372. feature = "http://apache.org/xml/features/disallow-doctype-decl"
  373. parser._reader.setFeature(feature, True)
  374. except AttributeError:
  375. pass
  376. return parser
  377. expat.ParserCreate = raising_external_ref_handler
  378. # Using this try/catch because a TypeError is thrown before
  379. # the ExpatError, and Python 2.6 is confused by that.
  380. try:
  381. parse(xml, disable_entities=False, expat=expat)
  382. except expat.ExpatError:
  383. self.assertTrue(True)
  384. else:
  385. self.assertTrue(False)
  386. expat.ParserCreate = ParserCreate
  387. def test_comments(self):
  388. xml = """
  389. <a>
  390. <b>
  391. <!-- b comment -->
  392. <c>
  393. <!-- c comment -->
  394. 1
  395. </c>
  396. <d>2</d>
  397. </b>
  398. </a>
  399. """
  400. expectedResult = {
  401. 'a': {
  402. 'b': {
  403. '#comment': 'b comment',
  404. 'c': {
  405. '#comment': 'c comment',
  406. '#text': '1',
  407. },
  408. 'd': '2',
  409. },
  410. }
  411. }
  412. self.assertEqual(parse(xml, process_comments=True), expectedResult)