import json import logging import os from six import iteritems from .utils import build_pj_path logger = logging.getLogger(__name__) class PackageJsonWorkspaceError(RuntimeError): pass class PackageJson(object): DEP_KEY = "dependencies" DEV_DEP_KEY = "devDependencies" PEER_DEP_KEY = "peerDependencies" OPT_DEP_KEY = "optionalDependencies" PNPM_OVERRIDES_KEY = "pnpm.overrides" DEP_KEYS = (DEP_KEY, DEV_DEP_KEY, PEER_DEP_KEY, OPT_DEP_KEY, PNPM_OVERRIDES_KEY) WORKSPACE_SCHEMA = "workspace:" @classmethod def load(cls, path): """ :param path: package.json path :type path: str :rtype: PackageJson """ pj = cls(path) pj.read() return pj def __init__(self, path): # type: (str) -> None if not os.path.isabs(path): raise TypeError("Absolute path required, given: {}".format(path)) self.path = path self.data = None def read(self): with open(self.path, 'rb') as f: self.data = json.load(f) def write(self, path=None): """ :param path: path to store package.json, defaults to original path :type path: str """ if path is None: path = self.path directory = os.path.dirname(path) if not os.path.exists(directory): os.mkdir(directory) with open(path, "w") as f: json.dump(self.data, f, indent=2, ensure_ascii=False) f.write('\n') # it's better for diff algorithm in arc logger.debug("Written {}".format(path)) def get_name(self): # type: () -> str name = self.data.get("name") if not name: name = os.path.dirname(self.path).replace("/", "-").strip("-") return name def get_version(self): return self.data["version"] def get_description(self): return self.data.get("description") def get_use_prebuilder(self): return self.data.get("usePrebuilder", False) def get_nodejs_version(self): return self.data.get("engines", {}).get("node") def get_dep_specifier(self, dep_name): for name, spec in self.dependencies_iter(): if dep_name == name: return spec return None def dependencies_iter(self): for key in self.DEP_KEYS: if key == self.PNPM_OVERRIDES_KEY: deps = self.data.get("pnpm", {}).get("overrides", {}) else: deps = self.data.get(key) if not deps: continue for name, spec in iteritems(deps): yield name, spec def has_dependencies(self): first_dep = next(self.dependencies_iter(), None) return first_dep is not None def bins_iter(self): bins = self.data.get("bin") if isinstance(bins, str): yield bins elif isinstance(bins, dict): for bin in bins.values(): yield bin def get_bin_path(self, bin_name=None): # type: (str|None) -> str|None actual_bin_name = bin_name or self.get_name() # type: str bins = self.data.get("bin") if isinstance(bins, str): if bin_name is not None: logger.warning("bin_name is unused, because 'bin' is a string") return bins if isinstance(bins, dict): for name, path in bins.items(): if name == actual_bin_name: return path return None # TODO: FBP-1254 # def get_workspace_dep_spec_paths(self) -> list[tuple[str, str]]: def get_workspace_dep_spec_paths(self): """ Returns names and paths from specifiers of the defined workspace dependencies. :rtype: list[tuple[str, str]] """ spec_paths = [] schema = self.WORKSPACE_SCHEMA schema_len = len(schema) for name, spec in self.dependencies_iter(): if not spec.startswith(schema): continue spec_path = spec[schema_len:] if not (spec_path.startswith(".") or spec_path.startswith("..")): raise PackageJsonWorkspaceError( "Expected relative path specifier for workspace dependency, but got '{}' for {} in {}".format( spec, name, self.path ) ) spec_paths.append((name, spec_path)) return spec_paths def get_workspace_dep_paths(self, base_path=None): """ Returns paths of the defined workspace dependencies. :param base_path: base path to resolve relative dep paths :type base_path: str :rtype: list of str """ if base_path is None: base_path = os.path.dirname(self.path) return [os.path.normpath(os.path.join(base_path, p)) for _, p in self.get_workspace_dep_spec_paths()] def get_workspace_deps(self): """ :rtype: list of PackageJson """ ws_deps = [] pj_dir = os.path.dirname(self.path) for name, rel_path in self.get_workspace_dep_spec_paths(): dep_path = os.path.normpath(os.path.join(pj_dir, rel_path)) dep_pj = PackageJson.load(build_pj_path(dep_path)) if name != dep_pj.get_name(): raise PackageJsonWorkspaceError( "Workspace dependency name mismatch, found '{}' instead of '{}' in {}".format( name, dep_pj.get_name(), self.path ) ) ws_deps.append(dep_pj) return ws_deps def get_workspace_map(self, ignore_self=False): """ Returns absolute paths of the workspace dependencies (including transitive) mapped to package.json and depth. :param ignore_self: whether path of the current module will be excluded :type ignore_self: bool :rtype: dict of (PackageJson, int) """ ws_deps = {} # list of (pj, depth) pj_queue = [(self, 0)] while len(pj_queue): (pj, depth) = pj_queue.pop() pj_dir = os.path.dirname(pj.path) if pj_dir in ws_deps: continue if not ignore_self or pj != self: ws_deps[pj_dir] = (pj, depth) for dep_pj in pj.get_workspace_deps(): pj_queue.append((dep_pj, depth + 1)) return ws_deps def get_dep_paths_by_names(self): """ Returns dict of {dependency_name: dependency_path} """ ws_map = self.get_workspace_map() return {pj.get_name(): path for path, (pj, _) in ws_map.items()} def validate_prebuilds(self, requires_build_packages: list[str]): pnpm_overrides: dict[str, str] = self.data.get("pnpm", {}).get("overrides", {}) use_prebuild_flags: dict[str, bool] = self.data.get("@yatool/prebuilder", {}).get("usePrebuild", {}) def covered(k: str) -> bool: if k.startswith("@yandex-prebuild/"): return True return k in use_prebuild_flags not_covered = [key for key in requires_build_packages if not covered(key)] use_prebuild_keys = [key for key in use_prebuild_flags if use_prebuild_flags[key]] missing_overrides = [key for key in use_prebuild_keys if key not in pnpm_overrides] messages = [] if not_covered: messages.append("These packages possibly have addons but are not checked yet:") messages.extend([f" - {key}" for key in not_covered]) if missing_overrides: messages.append("These packages have addons but overrides are not set:") messages.extend([f" - {key}" for key in missing_overrides]) return (not messages, messages) def get_pnpm_patched_dependencies(self) -> dict[str, str]: return self.data.get("pnpm", {}).get("patchedDependencies", {})