test_xmltodict.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. from xmltodict import parse, ParsingInterrupted
  2. import unittest
  3. try:
  4. from io import BytesIO as StringIO
  5. except ImportError:
  6. from xmltodict import StringIO
  7. from xml.parsers.expat import ParserCreate
  8. from xml.parsers import expat
  9. def _encode(s):
  10. try:
  11. return bytes(s, 'ascii')
  12. except (NameError, TypeError):
  13. return s
  14. class XMLToDictTestCase(unittest.TestCase):
  15. def test_string_vs_file(self):
  16. xml = '<a>data</a>'
  17. self.assertEqual(parse(xml),
  18. parse(StringIO(_encode(xml))))
  19. def test_minimal(self):
  20. self.assertEqual(parse('<a/>'),
  21. {'a': None})
  22. self.assertEqual(parse('<a/>', force_cdata=True),
  23. {'a': None})
  24. def test_simple(self):
  25. self.assertEqual(parse('<a>data</a>'),
  26. {'a': 'data'})
  27. def test_force_cdata(self):
  28. self.assertEqual(parse('<a>data</a>', force_cdata=True),
  29. {'a': {'#text': 'data'}})
  30. def test_custom_cdata(self):
  31. self.assertEqual(parse('<a>data</a>',
  32. force_cdata=True,
  33. cdata_key='_CDATA_'),
  34. {'a': {'_CDATA_': 'data'}})
  35. def test_list(self):
  36. self.assertEqual(parse('<a><b>1</b><b>2</b><b>3</b></a>'),
  37. {'a': {'b': ['1', '2', '3']}})
  38. def test_attrib(self):
  39. self.assertEqual(parse('<a href="xyz"/>'),
  40. {'a': {'@href': 'xyz'}})
  41. def test_skip_attrib(self):
  42. self.assertEqual(parse('<a href="xyz"/>', xml_attribs=False),
  43. {'a': None})
  44. def test_custom_attrib(self):
  45. self.assertEqual(parse('<a href="xyz"/>',
  46. attr_prefix='!'),
  47. {'a': {'!href': 'xyz'}})
  48. def test_attrib_and_cdata(self):
  49. self.assertEqual(parse('<a href="xyz">123</a>'),
  50. {'a': {'@href': 'xyz', '#text': '123'}})
  51. def test_semi_structured(self):
  52. self.assertEqual(parse('<a>abc<b/>def</a>'),
  53. {'a': {'b': None, '#text': 'abcdef'}})
  54. self.assertEqual(parse('<a>abc<b/>def</a>',
  55. cdata_separator='\n'),
  56. {'a': {'b': None, '#text': 'abc\ndef'}})
  57. def test_nested_semi_structured(self):
  58. self.assertEqual(parse('<a>abc<b>123<c/>456</b>def</a>'),
  59. {'a': {'#text': 'abcdef', 'b': {
  60. '#text': '123456', 'c': None}}})
  61. def test_skip_whitespace(self):
  62. xml = """
  63. <root>
  64. <emptya> </emptya>
  65. <emptyb attr="attrvalue">
  66. </emptyb>
  67. <value>hello</value>
  68. </root>
  69. """
  70. self.assertEqual(
  71. parse(xml),
  72. {'root': {'emptya': None,
  73. 'emptyb': {'@attr': 'attrvalue'},
  74. 'value': 'hello'}})
  75. def test_keep_whitespace(self):
  76. xml = "<root> </root>"
  77. self.assertEqual(parse(xml), dict(root=None))
  78. self.assertEqual(parse(xml, strip_whitespace=False),
  79. dict(root=' '))
  80. def test_streaming(self):
  81. def cb(path, item):
  82. cb.count += 1
  83. self.assertEqual(path, [('a', {'x': 'y'}), ('b', None)])
  84. self.assertEqual(item, str(cb.count))
  85. return True
  86. cb.count = 0
  87. parse('<a x="y"><b>1</b><b>2</b><b>3</b></a>',
  88. item_depth=2, item_callback=cb)
  89. self.assertEqual(cb.count, 3)
  90. def test_streaming_interrupt(self):
  91. cb = lambda path, item: False
  92. self.assertRaises(ParsingInterrupted,
  93. parse, '<a>x</a>',
  94. item_depth=1, item_callback=cb)
  95. def test_postprocessor(self):
  96. def postprocessor(path, key, value):
  97. try:
  98. return key + ':int', int(value)
  99. except (ValueError, TypeError):
  100. return key, value
  101. self.assertEqual({'a': {'b:int': [1, 2], 'b': 'x'}},
  102. parse('<a><b>1</b><b>2</b><b>x</b></a>',
  103. postprocessor=postprocessor))
  104. def test_postprocessor_attribute(self):
  105. def postprocessor(path, key, value):
  106. try:
  107. return key + ':int', int(value)
  108. except (ValueError, TypeError):
  109. return key, value
  110. self.assertEqual({'a': {'@b:int': 1}},
  111. parse('<a b="1"/>',
  112. postprocessor=postprocessor))
  113. def test_postprocessor_skip(self):
  114. def postprocessor(path, key, value):
  115. if key == 'b':
  116. value = int(value)
  117. if value == 3:
  118. return None
  119. return key, value
  120. self.assertEqual({'a': {'b': [1, 2]}},
  121. parse('<a><b>1</b><b>2</b><b>3</b></a>',
  122. postprocessor=postprocessor))
  123. def test_unicode(self):
  124. try:
  125. value = unichr(39321)
  126. except NameError:
  127. value = chr(39321)
  128. self.assertEqual({'a': value},
  129. parse('<a>%s</a>' % value))
  130. def test_encoded_string(self):
  131. try:
  132. value = unichr(39321)
  133. except NameError:
  134. value = chr(39321)
  135. xml = '<a>%s</a>' % value
  136. self.assertEqual(parse(xml),
  137. parse(xml.encode('utf-8')))
  138. def test_namespace_support(self):
  139. xml = """
  140. <root xmlns="http://defaultns.com/"
  141. xmlns:a="http://a.com/"
  142. xmlns:b="http://b.com/">
  143. <x a:attr="val">1</x>
  144. <a:y>2</a:y>
  145. <b:z>3</b:z>
  146. </root>
  147. """
  148. d = {
  149. 'http://defaultns.com/:root': {
  150. 'http://defaultns.com/:x': {
  151. '@xmlns': {
  152. '': 'http://defaultns.com/',
  153. 'a': 'http://a.com/',
  154. 'b': 'http://b.com/',
  155. },
  156. '@http://a.com/:attr': 'val',
  157. '#text': '1',
  158. },
  159. 'http://a.com/:y': '2',
  160. 'http://b.com/:z': '3',
  161. }
  162. }
  163. res = parse(xml, process_namespaces=True)
  164. self.assertEqual(res, d)
  165. def test_namespace_collapse(self):
  166. xml = """
  167. <root xmlns="http://defaultns.com/"
  168. xmlns:a="http://a.com/"
  169. xmlns:b="http://b.com/">
  170. <x a:attr="val">1</x>
  171. <a:y>2</a:y>
  172. <b:z>3</b:z>
  173. </root>
  174. """
  175. namespaces = {
  176. 'http://defaultns.com/': '',
  177. 'http://a.com/': 'ns_a',
  178. }
  179. d = {
  180. 'root': {
  181. 'x': {
  182. '@xmlns': {
  183. '': 'http://defaultns.com/',
  184. 'a': 'http://a.com/',
  185. 'b': 'http://b.com/',
  186. },
  187. '@ns_a:attr': 'val',
  188. '#text': '1',
  189. },
  190. 'ns_a:y': '2',
  191. 'http://b.com/:z': '3',
  192. },
  193. }
  194. res = parse(xml, process_namespaces=True, namespaces=namespaces)
  195. self.assertEqual(res, d)
  196. def test_namespace_ignore(self):
  197. xml = """
  198. <root xmlns="http://defaultns.com/"
  199. xmlns:a="http://a.com/"
  200. xmlns:b="http://b.com/">
  201. <x>1</x>
  202. <a:y>2</a:y>
  203. <b:z>3</b:z>
  204. </root>
  205. """
  206. d = {
  207. 'root': {
  208. '@xmlns': 'http://defaultns.com/',
  209. '@xmlns:a': 'http://a.com/',
  210. '@xmlns:b': 'http://b.com/',
  211. 'x': '1',
  212. 'a:y': '2',
  213. 'b:z': '3',
  214. },
  215. }
  216. self.assertEqual(parse(xml), d)
  217. def test_force_list_basic(self):
  218. xml = """
  219. <servers>
  220. <server>
  221. <name>server1</name>
  222. <os>os1</os>
  223. </server>
  224. </servers>
  225. """
  226. expectedResult = {
  227. 'servers': {
  228. 'server': [
  229. {
  230. 'name': 'server1',
  231. 'os': 'os1',
  232. },
  233. ],
  234. }
  235. }
  236. self.assertEqual(parse(xml, force_list=('server',)), expectedResult)
  237. def test_force_list_callable(self):
  238. xml = """
  239. <config>
  240. <servers>
  241. <server>
  242. <name>server1</name>
  243. <os>os1</os>
  244. </server>
  245. </servers>
  246. <skip>
  247. <server></server>
  248. </skip>
  249. </config>
  250. """
  251. def force_list(path, key, value):
  252. """Only return True for servers/server, but not for skip/server."""
  253. if key != 'server':
  254. return False
  255. return path and path[-1][0] == 'servers'
  256. expectedResult = {
  257. 'config': {
  258. 'servers': {
  259. 'server': [
  260. {
  261. 'name': 'server1',
  262. 'os': 'os1',
  263. },
  264. ],
  265. },
  266. 'skip': {
  267. 'server': None,
  268. },
  269. },
  270. }
  271. self.assertEqual(parse(xml, force_list=force_list, dict_constructor=dict), expectedResult)
  272. def test_disable_entities_true_ignores_xmlbomb(self):
  273. xml = """
  274. <!DOCTYPE xmlbomb [
  275. <!ENTITY a "1234567890" >
  276. <!ENTITY b "&a;&a;&a;&a;&a;&a;&a;&a;">
  277. <!ENTITY c "&b;&b;&b;&b;&b;&b;&b;&b;">
  278. ]>
  279. <bomb>&c;</bomb>
  280. """
  281. expectedResult = {'bomb': None}
  282. try:
  283. parse_attempt = parse(xml, disable_entities=True)
  284. except expat.ExpatError:
  285. self.assertTrue(True)
  286. else:
  287. self.assertEqual(parse_attempt, expectedResult)
  288. def test_disable_entities_false_returns_xmlbomb(self):
  289. xml = """
  290. <!DOCTYPE xmlbomb [
  291. <!ENTITY a "1234567890" >
  292. <!ENTITY b "&a;&a;&a;&a;&a;&a;&a;&a;">
  293. <!ENTITY c "&b;&b;&b;&b;&b;&b;&b;&b;">
  294. ]>
  295. <bomb>&c;</bomb>
  296. """
  297. bomb = "1234567890" * 64
  298. expectedResult = {'bomb': bomb}
  299. self.assertEqual(parse(xml, disable_entities=False), expectedResult)
  300. def test_disable_entities_true_ignores_external_dtd(self):
  301. xml = """
  302. <!DOCTYPE external [
  303. <!ENTITY ee SYSTEM "http://www.python.org/">
  304. ]>
  305. <root>&ee;</root>
  306. """
  307. expectedResult = {'root': None}
  308. try:
  309. parse_attempt = parse(xml, disable_entities=True)
  310. except expat.ExpatError:
  311. self.assertTrue(True)
  312. else:
  313. self.assertEqual(parse_attempt, expectedResult)
  314. def test_disable_entities_true_attempts_external_dtd(self):
  315. xml = """
  316. <!DOCTYPE external [
  317. <!ENTITY ee SYSTEM "http://www.python.org/">
  318. ]>
  319. <root>&ee;</root>
  320. """
  321. def raising_external_ref_handler(*args, **kwargs):
  322. parser = ParserCreate(*args, **kwargs)
  323. parser.ExternalEntityRefHandler = lambda *x: 0
  324. try:
  325. feature = "http://apache.org/xml/features/disallow-doctype-decl"
  326. parser._reader.setFeature(feature, True)
  327. except AttributeError:
  328. pass
  329. return parser
  330. expat.ParserCreate = raising_external_ref_handler
  331. # Using this try/catch because a TypeError is thrown before
  332. # the ExpatError, and Python 2.6 is confused by that.
  333. try:
  334. parse(xml, disable_entities=False, expat=expat)
  335. except expat.ExpatError:
  336. self.assertTrue(True)
  337. else:
  338. self.assertTrue(False)
  339. expat.ParserCreate = ParserCreate