external.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. from __future__ import absolute_import
  2. import re
  3. import sys
  4. import copy
  5. import logging
  6. from . import tools
  7. from datetime import date, datetime
  8. import enum
  9. import six
  10. logger = logging.getLogger(__name__)
  11. MDS_URI_PREFIX = 'https://storage.yandex-team.ru/get-devtools/'
  12. def apply(func, value, apply_to_keys=False):
  13. """
  14. Applies func to every possible member of value
  15. :param value: could be either a primitive object or a complex one (list, dicts)
  16. :param func: func to be applied
  17. :return:
  18. """
  19. def _apply(func, value, value_path):
  20. if value_path is None:
  21. value_path = []
  22. if isinstance(value, list) or isinstance(value, tuple):
  23. res = []
  24. for ind, item in enumerate(value):
  25. path = copy.copy(value_path)
  26. path.append(ind)
  27. res.append(_apply(func, item, path))
  28. elif isinstance(value, dict):
  29. if is_external(value):
  30. # this is a special serialized object pointing to some external place
  31. res = func(value, value_path)
  32. else:
  33. res = {}
  34. for key, val in sorted(value.items(), key=lambda dict_item: dict_item[0]):
  35. path = copy.copy(value_path)
  36. path.append(key)
  37. res[_apply(func, key, path) if apply_to_keys else key] = _apply(func, val, path)
  38. else:
  39. res = func(value, value_path)
  40. return res
  41. return _apply(func, value, None)
  42. def is_coroutine(val):
  43. if sys.version_info[0] < 3:
  44. return False
  45. else:
  46. import asyncio
  47. return asyncio.iscoroutinefunction(val) or asyncio.iscoroutine(val)
  48. def serialize(value):
  49. """
  50. Serialize value to json-convertible object
  51. Ensures that all components of value can be serialized to json
  52. :param value: object to be serialized
  53. """
  54. def _serialize(val, _):
  55. if val is None:
  56. return val
  57. if isinstance(val, six.string_types) or isinstance(val, bytes):
  58. return tools.to_utf8(val)
  59. if isinstance(val, enum.Enum):
  60. return str(val)
  61. if isinstance(val, six.integer_types) or type(val) in [float, bool]:
  62. return val
  63. if is_external(val):
  64. return dict(val)
  65. if isinstance(val, (date, datetime)):
  66. return repr(val)
  67. if is_coroutine(val):
  68. return None
  69. raise ValueError("Cannot serialize value '{}' of type {}".format(val, type(val)))
  70. return apply(_serialize, value, apply_to_keys=True)
  71. def is_external(value):
  72. return isinstance(value, dict) and "uri" in value.keys()
  73. class ExternalSchema(object):
  74. File = "file"
  75. SandboxResource = "sbr"
  76. Delayed = "delayed"
  77. HTTP = "http"
  78. class CanonicalObject(dict):
  79. def __iter__(self):
  80. raise TypeError("Iterating canonical object is not implemented")
  81. class ExternalDataInfo(object):
  82. def __init__(self, data):
  83. assert is_external(data)
  84. self._data = data
  85. def __str__(self):
  86. type_str = "File" if self.is_file else "Sandbox resource"
  87. return "{}({})".format(type_str, self.path)
  88. def __repr__(self):
  89. return str(self)
  90. @property
  91. def uri(self):
  92. return self._data["uri"]
  93. @property
  94. def checksum(self):
  95. return self._data.get("checksum")
  96. @property
  97. def is_file(self):
  98. return self.uri.startswith(ExternalSchema.File)
  99. @property
  100. def is_sandbox_resource(self):
  101. return self.uri.startswith(ExternalSchema.SandboxResource)
  102. @property
  103. def is_delayed(self):
  104. return self.uri.startswith(ExternalSchema.Delayed)
  105. @property
  106. def is_http(self):
  107. return self.uri.startswith(ExternalSchema.HTTP)
  108. @property
  109. def path(self):
  110. if self.uri.count("://") != 1:
  111. logger.error("Invalid external data uri: '%s'", self.uri)
  112. return self.uri
  113. _, path = self.uri.split("://")
  114. return path
  115. def get_mds_key(self):
  116. assert self.is_http
  117. m = re.match(re.escape(MDS_URI_PREFIX) + r'(.*?)($|#)', self.uri)
  118. if m:
  119. return m.group(1)
  120. raise AssertionError("Failed to extract mds key properly from '{}'".format(self.uri))
  121. @property
  122. def size(self):
  123. return self._data.get("size")
  124. def serialize(self):
  125. return self._data
  126. @classmethod
  127. def _serialize(cls, schema, path, checksum=None, attrs=None):
  128. res = CanonicalObject({"uri": "{}://{}".format(schema, path)})
  129. if checksum:
  130. res["checksum"] = checksum
  131. if attrs:
  132. res.update(attrs)
  133. return res
  134. @classmethod
  135. def serialize_file(cls, path, checksum=None, diff_tool=None, local=False, diff_file_name=None, diff_tool_timeout=None, size=None):
  136. attrs = {}
  137. if diff_tool:
  138. attrs["diff_tool"] = diff_tool
  139. if local:
  140. attrs["local"] = local
  141. if diff_file_name:
  142. attrs["diff_file_name"] = diff_file_name
  143. if diff_tool_timeout:
  144. attrs["diff_tool_timeout"] = diff_tool_timeout
  145. if size is not None:
  146. attrs["size"] = size
  147. return cls._serialize(ExternalSchema.File, path, checksum, attrs=attrs)
  148. @classmethod
  149. def serialize_resource(cls, id, checksum=None):
  150. return cls._serialize(ExternalSchema.SandboxResource, id, checksum)
  151. @classmethod
  152. def serialize_delayed(cls, upload_id, checksum):
  153. return cls._serialize(ExternalSchema.Delayed, upload_id, checksum)
  154. def get(self, key, default=None):
  155. return self._data.get(key, default)