lockfile.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import base64
  2. import yaml
  3. import os
  4. import io
  5. import re
  6. from six.moves.urllib import parse as urlparse
  7. from six import iteritems
  8. from ..base import PackageJson, BaseLockfile, LockfilePackageMeta, LockfilePackageMetaInvalidError
  9. LOCKFILE_VERSION = "lockfileVersion"
  10. class PnpmLockfile(BaseLockfile):
  11. IMPORTER_KEYS = PackageJson.DEP_KEYS + ("specifiers",)
  12. def read(self):
  13. with io.open(self.path, "rb") as f:
  14. self.data = yaml.load(f, Loader=yaml.CSafeLoader) or {LOCKFILE_VERSION: "6.0"}
  15. version_in_data = LOCKFILE_VERSION in self.data
  16. r = re.compile('^[56]\\.\\d$')
  17. if not version_in_data or not r.match(str(self.data[LOCKFILE_VERSION])):
  18. raise Exception(
  19. 'Error of project configuration: {} has lockfileVersion: {}. '.format(
  20. self.path, self.data[LOCKFILE_VERSION] if version_in_data else "<no-version>"
  21. )
  22. + 'This version is not supported. Please, delete pnpm-lock.yaml and regenerate it using "ya tool nots --clean update-lockfile"'
  23. )
  24. def write(self, path=None):
  25. """
  26. :param path: path to store lockfile, defaults to original path
  27. :type path: str
  28. """
  29. if path is None:
  30. path = self.path
  31. with open(path, "w") as f:
  32. yaml.dump(self.data, f, Dumper=yaml.CSafeDumper)
  33. def get_packages_meta(self):
  34. """
  35. Extracts packages meta from lockfile.
  36. :rtype: list of LockfilePackageMeta
  37. """
  38. packages = self.data.get("packages", {})
  39. return map(lambda x: _parse_package_meta(*x), iteritems(packages))
  40. def update_tarball_resolutions(self, fn):
  41. """
  42. :param fn: maps `LockfilePackageMeta` instance to new `resolution.tarball` value
  43. :type fn: lambda
  44. """
  45. packages = self.data.get("packages", {})
  46. for key, meta in iteritems(packages):
  47. meta["resolution"]["tarball"] = fn(_parse_package_meta(key, meta, allow_file_protocol=True))
  48. packages[key] = meta
  49. def get_importers(self):
  50. """
  51. Returns "importers" section from the lockfile or creates similar structure from "dependencies" and "specifiers".
  52. :rtype: dict of dict of dict of str
  53. """
  54. importers = self.data.get("importers")
  55. if importers is not None:
  56. return importers
  57. importer = {k: self.data[k] for k in self.IMPORTER_KEYS if k in self.data}
  58. return {".": importer} if importer else {}
  59. def merge(self, lf):
  60. """
  61. Merges two lockfiles:
  62. 1. Converts the lockfile to monorepo-like lockfile with "importers" section instead of "dependencies" and "specifiers".
  63. 2. Merges `lf`'s dependencies and specifiers to importers.
  64. 3. Merges `lf`'s packages to the lockfile.
  65. :param lf: lockfile to merge
  66. :type lf: PnpmLockfile
  67. """
  68. importers = self.get_importers()
  69. build_path = os.path.dirname(self.path)
  70. for [importer, imports] in iteritems(lf.get_importers()):
  71. importer_path = os.path.normpath(os.path.join(os.path.dirname(lf.path), importer))
  72. importer_rel_path = os.path.relpath(importer_path, build_path)
  73. importers[importer_rel_path] = imports
  74. self.data["importers"] = importers
  75. for k in self.IMPORTER_KEYS:
  76. self.data.pop(k, None)
  77. packages = self.data.get("packages", {})
  78. for k, v in iteritems(lf.data.get("packages", {})):
  79. if k not in packages:
  80. packages[k] = v
  81. self.data["packages"] = packages
  82. def validate_has_addons_flags(self):
  83. packages = self.data.get("packages", {})
  84. invalid_keys = []
  85. for key, meta in iteritems(packages):
  86. if meta.get("requiresBuild") and "hasAddons" not in meta:
  87. invalid_keys.append(key)
  88. return (not invalid_keys, invalid_keys)
  89. def get_requires_build_packages(self):
  90. packages = self.data.get("packages", {})
  91. requires_build_packages = []
  92. for key, meta in iteritems(packages):
  93. if meta.get("requiresBuild"):
  94. # /expect-webdriverio@4.1.3(typescript@5.1.6)
  95. pkg = key[1:].split("(")[0]
  96. requires_build_packages.append(pkg)
  97. return requires_build_packages
  98. def _parse_package_meta(key, meta, allow_file_protocol=False):
  99. """
  100. :param key: uniq package key from lockfile
  101. :type key: string
  102. :param meta: package meta dict from lockfile
  103. :type meta: dict
  104. :rtype: LockfilePackageMetaInvalidError
  105. """
  106. try:
  107. tarball_url = _parse_tarball_url(meta["resolution"]["tarball"], allow_file_protocol)
  108. sky_id = _parse_sky_id_from_tarball_url(meta["resolution"]["tarball"])
  109. integrity_algorithm, integrity = _parse_package_integrity(meta["resolution"]["integrity"])
  110. except KeyError as e:
  111. raise TypeError("Invalid package meta for key {}, missing {} key".format(key, e))
  112. except LockfilePackageMetaInvalidError as e:
  113. raise TypeError("Invalid package meta for key {}, parse error: {}".format(key, e))
  114. return LockfilePackageMeta(key, tarball_url, sky_id, integrity, integrity_algorithm)
  115. def _parse_tarball_url(tarball_url, allow_file_protocol):
  116. if tarball_url.startswith("file:") and not allow_file_protocol:
  117. raise LockfilePackageMetaInvalidError("tarball cannot point to a file, got {}".format(tarball_url))
  118. return tarball_url.split("?")[0]
  119. def _parse_sky_id_from_tarball_url(tarball_url):
  120. """
  121. :param tarball_url: tarball url
  122. :type tarball_url: string
  123. :rtype: string
  124. """
  125. if tarball_url.startswith("file:"):
  126. return ""
  127. rbtorrent_param = urlparse.parse_qs(urlparse.urlparse(tarball_url).query).get("rbtorrent")
  128. if rbtorrent_param is None:
  129. return ""
  130. return "rbtorrent:{}".format(rbtorrent_param[0])
  131. def _parse_package_integrity(integrity):
  132. """
  133. Returns tuple of algorithm and hash (hex).
  134. :param integrity: package integrity in format "{algo}-{base64_of_hash}"
  135. :type integrity: string
  136. :rtype: (str, str)
  137. """
  138. algo, hash_b64 = integrity.split("-", 1)
  139. if algo not in ("sha1", "sha512"):
  140. raise LockfilePackageMetaInvalidError(
  141. f"Invalid package integrity algorithm, expected one of ('sha1', 'sha512'), got '{algo}'"
  142. )
  143. try:
  144. base64.b64decode(hash_b64)
  145. except TypeError as e:
  146. raise LockfilePackageMetaInvalidError(
  147. "Invalid package integrity encoding, integrity: {}, error: {}".format(integrity, e)
  148. )
  149. return (algo, hash_b64)