formdata.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import io
  2. import warnings
  3. from typing import Any, Iterable, List, Optional
  4. from urllib.parse import urlencode
  5. from multidict import MultiDict, MultiDictProxy
  6. from . import hdrs, multipart, payload
  7. from .helpers import guess_filename
  8. from .payload import Payload
  9. __all__ = ("FormData",)
  10. class FormData:
  11. """Helper class for form body generation.
  12. Supports multipart/form-data and application/x-www-form-urlencoded.
  13. """
  14. def __init__(
  15. self,
  16. fields: Iterable[Any] = (),
  17. quote_fields: bool = True,
  18. charset: Optional[str] = None,
  19. ) -> None:
  20. self._writer = multipart.MultipartWriter("form-data")
  21. self._fields: List[Any] = []
  22. self._is_multipart = False
  23. self._is_processed = False
  24. self._quote_fields = quote_fields
  25. self._charset = charset
  26. if isinstance(fields, dict):
  27. fields = list(fields.items())
  28. elif not isinstance(fields, (list, tuple)):
  29. fields = (fields,)
  30. self.add_fields(*fields)
  31. @property
  32. def is_multipart(self) -> bool:
  33. return self._is_multipart
  34. def add_field(
  35. self,
  36. name: str,
  37. value: Any,
  38. *,
  39. content_type: Optional[str] = None,
  40. filename: Optional[str] = None,
  41. content_transfer_encoding: Optional[str] = None,
  42. ) -> None:
  43. if isinstance(value, io.IOBase):
  44. self._is_multipart = True
  45. elif isinstance(value, (bytes, bytearray, memoryview)):
  46. msg = (
  47. "In v4, passing bytes will no longer create a file field. "
  48. "Please explicitly use the filename parameter or pass a BytesIO object."
  49. )
  50. if filename is None and content_transfer_encoding is None:
  51. warnings.warn(msg, DeprecationWarning)
  52. filename = name
  53. type_options: MultiDict[str] = MultiDict({"name": name})
  54. if filename is not None and not isinstance(filename, str):
  55. raise TypeError(
  56. "filename must be an instance of str. " "Got: %s" % filename
  57. )
  58. if filename is None and isinstance(value, io.IOBase):
  59. filename = guess_filename(value, name)
  60. if filename is not None:
  61. type_options["filename"] = filename
  62. self._is_multipart = True
  63. headers = {}
  64. if content_type is not None:
  65. if not isinstance(content_type, str):
  66. raise TypeError(
  67. "content_type must be an instance of str. " "Got: %s" % content_type
  68. )
  69. headers[hdrs.CONTENT_TYPE] = content_type
  70. self._is_multipart = True
  71. if content_transfer_encoding is not None:
  72. if not isinstance(content_transfer_encoding, str):
  73. raise TypeError(
  74. "content_transfer_encoding must be an instance"
  75. " of str. Got: %s" % content_transfer_encoding
  76. )
  77. msg = (
  78. "content_transfer_encoding is deprecated. "
  79. "To maintain compatibility with v4 please pass a BytesPayload."
  80. )
  81. warnings.warn(msg, DeprecationWarning)
  82. self._is_multipart = True
  83. self._fields.append((type_options, headers, value))
  84. def add_fields(self, *fields: Any) -> None:
  85. to_add = list(fields)
  86. while to_add:
  87. rec = to_add.pop(0)
  88. if isinstance(rec, io.IOBase):
  89. k = guess_filename(rec, "unknown")
  90. self.add_field(k, rec) # type: ignore[arg-type]
  91. elif isinstance(rec, (MultiDictProxy, MultiDict)):
  92. to_add.extend(rec.items())
  93. elif isinstance(rec, (list, tuple)) and len(rec) == 2:
  94. k, fp = rec
  95. self.add_field(k, fp) # type: ignore[arg-type]
  96. else:
  97. raise TypeError(
  98. "Only io.IOBase, multidict and (name, file) "
  99. "pairs allowed, use .add_field() for passing "
  100. "more complex parameters, got {!r}".format(rec)
  101. )
  102. def _gen_form_urlencoded(self) -> payload.BytesPayload:
  103. # form data (x-www-form-urlencoded)
  104. data = []
  105. for type_options, _, value in self._fields:
  106. data.append((type_options["name"], value))
  107. charset = self._charset if self._charset is not None else "utf-8"
  108. if charset == "utf-8":
  109. content_type = "application/x-www-form-urlencoded"
  110. else:
  111. content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset
  112. return payload.BytesPayload(
  113. urlencode(data, doseq=True, encoding=charset).encode(),
  114. content_type=content_type,
  115. )
  116. def _gen_form_data(self) -> multipart.MultipartWriter:
  117. """Encode a list of fields using the multipart/form-data MIME format"""
  118. if self._is_processed:
  119. raise RuntimeError("Form data has been processed already")
  120. for dispparams, headers, value in self._fields:
  121. try:
  122. if hdrs.CONTENT_TYPE in headers:
  123. part = payload.get_payload(
  124. value,
  125. content_type=headers[hdrs.CONTENT_TYPE],
  126. headers=headers,
  127. encoding=self._charset,
  128. )
  129. else:
  130. part = payload.get_payload(
  131. value, headers=headers, encoding=self._charset
  132. )
  133. except Exception as exc:
  134. raise TypeError(
  135. "Can not serialize value type: %r\n "
  136. "headers: %r\n value: %r" % (type(value), headers, value)
  137. ) from exc
  138. if dispparams:
  139. part.set_content_disposition(
  140. "form-data", quote_fields=self._quote_fields, **dispparams
  141. )
  142. # FIXME cgi.FieldStorage doesn't likes body parts with
  143. # Content-Length which were sent via chunked transfer encoding
  144. assert part.headers is not None
  145. part.headers.popall(hdrs.CONTENT_LENGTH, None)
  146. self._writer.append_payload(part)
  147. self._is_processed = True
  148. return self._writer
  149. def __call__(self) -> Payload:
  150. if self._is_multipart:
  151. return self._gen_form_data()
  152. else:
  153. return self._gen_form_urlencoded()