diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1d03ab985..5dbcf335e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,7 +75,7 @@ jobs: run: | uv run --extra dev python -m pylint \ --disable=all --enable=R0801 \ - --min-similarity-lines=50 \ + --min-similarity-lines=10 \ --fail-on=R0801 \ src/apm_cli/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 22c086933..7b03af3fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Refactored duplicate code blocks across MCP client adapters and marketplace modules: extracted shared helpers `_apply_pypi_homebrew_generic_config`, `_apply_auth_and_headers_impl`, and `_resolve_env_vars_with_prompting` into `MCPClientAdapter` base class; extracted `iter_semver_tags` into `marketplace._shared`; fixed homebrew formula slash-stripping, `Authorization` header skip-only-when-injected logic, and CI-aware prompting guard (`CI`/`APM_E2E_TESTS` env vars now suppress interactive prompts). (#1360) - `apm pack` now appends a vendor-neutral catalog after per-output success lines listing each marketplace artifact and a single docs anchor (`producer/publish-to-a-marketplace/#consume-from-any-assistant`); the block never names a vendor CLI surface and is suppressed in dry-run. (#1348) - `apm marketplace init` now scaffolds the outputs map as a single-line `# codex: {}` commented toggle (replacing the verbose multi-line block); flipping one line enables Codex output. (#1348) - `apm marketplace doctor` adds an informational `format coverage` row when a `marketplace:` block is present, listing configured and missing supported outputs; informational only and skipped when there is no marketplace config. (#1348) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a09ae8715..c96166736 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -281,6 +281,20 @@ If your changes affect how users interact with the project, update the documenta ## Extending APM +### Adding or modifying an MCP client adapter + +The MCP client adapters (`src/apm_cli/adapters/client/`) inherit shared +utilities from `MCPClientAdapter` in `base.py`. When adding a new adapter +or modifying an existing one: + +- Inherit from `MCPClientAdapter` and reuse the shared helpers + (`_apply_pypi_homebrew_generic_config`, `_apply_auth_and_headers_impl`, + `_resolve_env_vars_with_prompting`). +- Do **not** copy-paste logic from sibling adapters -- the pylint R0801 + similarity threshold is 10 lines and CI will fail on duplicated blocks. +- For marketplace tag-parsing, use `marketplace._shared.iter_semver_tags` + rather than re-implementing the refs-iteration loop. + ### How to add an experimental feature flag Use an experimental flag to de-risk rollout of a user-visible behavioural change that may need early adopter feedback. Do not add a flag for a bug fix, internal refactor, or any change that should simply ship as the default behaviour. diff --git a/src/apm_cli/adapters/client/base.py b/src/apm_cli/adapters/client/base.py index 59cf18d2a..41f87e4b4 100644 --- a/src/apm_cli/adapters/client/base.py +++ b/src/apm_cli/adapters/client/base.py @@ -5,6 +5,8 @@ from abc import ABC, abstractmethod from pathlib import Path +from ...utils.console import _rich_error, _rich_warning + _INPUT_VAR_RE = re.compile(r"\$\{input:([^}]+)\}") # Matches ${VAR} and ${env:VAR}, capturing VAR. Intentionally does NOT match @@ -181,8 +183,8 @@ def _warn_input_variables(mapping, server_name, runtime_label): if var_id in seen: continue seen.add(var_id) - print( - f"[!] Warning: ${{input:{var_id}}} in server " + _rich_warning( + f"${{input:{var_id}}} in server " f"'{server_name}' will not be resolved -- " f"{runtime_label} does not support input variable prompts" ) @@ -196,3 +198,254 @@ def normalize_project_arg(self, value): ): return "." return value + + # ------------------------------------------------------------------ + # Shared server-info helpers (used by all adapter subclasses) + # ------------------------------------------------------------------ + + def _fetch_server_info(self, server_url: str, server_info_cache: dict | None) -> dict | None: + """Look up *server_url* in *server_info_cache* or fetch from registry. + + Prints a user-visible error and returns ``None`` when the server is + not found, so callers can do a simple ``if server_info is None: return False`` + guard and the error message stays consistent across adapters. + + Args: + server_url: Registry reference (``owner/repo`` or full URL). + server_info_cache: Optional pre-fetched cache; ``None`` skips + the cache lookup. + + Returns: + Server-info dict on success; ``None`` when not found. + """ + if server_info_cache and server_url in server_info_cache: + return server_info_cache[server_url] + server_info = self.registry_client.find_server_by_reference(server_url) + if not server_info: + _rich_error(f"Error: MCP server '{server_url}' not found in registry") + return None + return server_info + + @staticmethod + def _determine_config_key(server_url: str, server_name: str) -> str: + """Return the configuration key to use for *server_url*/*server_name*. + + The caller-supplied *server_name* takes precedence; if empty the last + path segment of *server_url* is used as a fallback, which mirrors the + convention ``owner/repo -> repo``. + + Args: + server_url: Registry reference used as fallback source. + server_name: Explicit caller-supplied name (may be empty string). + + Returns: + Non-empty configuration key string. + """ + if server_name: + return server_name + if "/" in server_url: + return server_url.split("/")[-1] + return server_url + + @staticmethod + def _apply_pypi_homebrew_generic_config( + config: dict, + registry_name: str, + package_name: str, + runtime_hint: str, + processed_runtime_args: list, + processed_package_args: list, + resolved_env: dict, + ) -> None: + """Apply pypi / homebrew / generic (uvx / brew / npx) run config to *config*. + + Mutates *config* in-place with ``command``, ``args``, and optionally + ``env`` keys appropriate for the detected registry type. + + Args: + config: Mutable server-config dict to populate. + registry_name: Registry identifier (``"pypi"``, ``"homebrew"``, + ``"npm"``, or any other string treated as generic). + package_name: Base package / formula / module name. + runtime_hint: Caller-specified runtime hint (e.g. ``"uvx"``). + processed_runtime_args: Fully resolved positional args for the + runtime launcher. + processed_package_args: Fully resolved positional args appended + after the package name. + resolved_env: Pre-resolved environment variables dict; an empty + dict is omitted. + """ + if registry_name == "pypi": + launcher = runtime_hint or "uvx" + config["command"] = launcher + config["args"] = [package_name] + processed_runtime_args + processed_package_args # noqa: RUF005 + elif registry_name == "homebrew": + formula_name = package_name.split("/")[-1] if "/" in package_name else package_name + config["command"] = formula_name + config["args"] = processed_runtime_args + processed_package_args + else: + # Generic / npm-compatible fallback + config["command"] = "npx" + config["args"] = processed_runtime_args + ["-y", package_name] + processed_package_args # noqa: RUF005 + if resolved_env: + config["env"] = resolved_env + + def _apply_auth_and_headers_impl( + self, + config: dict, + remote: dict, + server_info: dict, + env_overrides: dict, + runtime_label: str, + token_manager_class, + ) -> None: + """Core implementation of GitHub-token injection and header merging. + + Factored out so that each concrete adapter subclass can supply its own + *token_manager_class* (looked up from the subclass module's namespace), + allowing :func:`unittest.mock.patch` to intercept the class at the + right module scope in tests. + + Args: + config: Mutable config dict updated in place. + remote: Registry remote entry (may contain a ``"headers"`` list). + server_info: Registry server metadata used for name / URL lookup. + env_overrides: Caller-supplied env-var override mapping. + runtime_label: Label for diagnostic messages. + token_manager_class: The ``GitHubTokenManager`` class (or mock) to + instantiate. Passed by the caller so tests can patch the right + module-level name. + """ + server_name = server_info.get("name", "") + is_github_server = self._is_github_server(server_name, remote.get("url", "")) + local_token_injected = False + if is_github_server: + _tm = token_manager_class() + github_token = _tm.get_token_for_purpose("copilot") or os.getenv( + "GITHUB_PERSONAL_ACCESS_TOKEN" + ) + if github_token: + config["headers"] = {"Authorization": f"Bearer {github_token}"} + local_token_injected = True + headers = remote.get("headers", []) + if headers: + if "headers" not in config: + config["headers"] = {} + for header in headers: + header_name = header.get("name", "") + header_value = header.get("value", "") + if header_name and header_value: + if header_name == "Authorization" and local_token_injected: + continue + resolved_value = self._resolve_env_variable( + header_name, header_value, env_overrides + ) + config["headers"][header_name] = resolved_value + if config.get("headers"): + self._warn_input_variables( + config["headers"], server_info.get("name", ""), runtime_label + ) + + @staticmethod + def _resolve_env_vars_with_prompting( + env_vars: list, + env_overrides: dict, + default_github_env: dict, + ) -> dict: + """Resolve *env_vars* from overrides, environment, or interactive prompts. + + Identical logic shared between + :meth:`CopilotClientAdapter._process_environment_variables` and + :meth:`CodexClientAdapter._process_environment_variables`. + + All imports are deferred so that ``rich.prompt`` (an optional + dependency) is never imported at module load time. + + Args: + env_vars: List of env-var descriptor dicts from the registry. + env_overrides: Pre-collected ``{name: value}`` overrides (empty + dict when none). + default_github_env: Mapping of well-known GitHub variable names + to their preferred environment-variable lookup names. + + Returns: + ``resolved`` dict mapping each env-var name to its resolved value + (empty string when unresolvable). + """ + import sys + + env_overrides = env_overrides or {} + resolved: dict = {} + + # Determine whether interactive prompting is available. + # If env_overrides is provided the CLI has already collected variables -- never prompt again. + skip_prompting = ( + bool(env_overrides) + or bool(os.getenv("CI")) + or bool(os.getenv("APM_E2E_TESTS")) + or not sys.stdout.isatty() + or not sys.stdin.isatty() + ) + + # First pass: identify variables with empty values to warn the user. + empty_value_vars = [ev for ev in env_vars if ev.get("required") and not ev.get("value")] + if empty_value_vars and skip_prompting: + var_names = [ev.get("name") for ev in empty_value_vars] + _rich_warning( + f"Warning: The following required environment variables have no default " + f"value and cannot be prompted in non-interactive mode: {var_names}" + ) + + for env_var in env_vars: + name = env_var.get("name", "") + if not name: + continue + + # Priority 1: caller-supplied override + if name in env_overrides: + resolved[name] = env_overrides[name] + continue + + # Priority 2: check GitHub-specific defaults (values are literal defaults, not env-var names) + if name in default_github_env: + resolved[name] = os.getenv(name) or default_github_env[name] + continue + + # Priority 3: environment variable with the same name + env_val = os.getenv(name, "") + if env_val: + resolved[name] = env_val + continue + + # Priority 4: interactive prompt + default_value = env_var.get("value", "") + required = env_var.get("required", False) + + if not skip_prompting: + from rich.prompt import Prompt + + description = env_var.get("description", "") + prompt_text = f"Enter value for {name}" + if description: + prompt_text += f" ({description})" + is_secret = "token" in name.lower() or "key" in name.lower() + user_input = Prompt.ask( + prompt_text, + default=default_value, + password=True # noqa: SIM210 + if is_secret + else False, + ) + resolved[name] = user_input + elif default_value: + resolved[name] = default_value + elif required: + _rich_warning( + f"Warning: Required environment variable '{name}' could not be resolved. " + f"The MCP server may not function correctly." + ) + resolved[name] = "" + else: + resolved[name] = default_value + + return resolved diff --git a/src/apm_cli/adapters/client/codex.py b/src/apm_cli/adapters/client/codex.py index ea8f2eda7..3c5d0985e 100644 --- a/src/apm_cli/adapters/client/codex.py +++ b/src/apm_cli/adapters/client/codex.py @@ -143,16 +143,8 @@ def configure_mcp_server( return False try: - # Use cached server info if available, otherwise fetch from registry - if server_info_cache and server_url in server_info_cache: - server_info = server_info_cache[server_url] - else: - # Fallback to registry lookup if not cached - server_info = self.registry_client.find_server_by_reference(server_url) - - # Fail if server is not found in registry - security requirement - if not server_info: - print(f"Error: MCP server '{server_url}' not found in registry") + server_info = self._fetch_server_info(server_url, server_info_cache) + if server_info is None: return False # Check for remote servers early - Codex doesn't support remote/SSE servers @@ -299,33 +291,21 @@ def _format_server_config(self, server_info, env_overrides=None, runtime_vars=No if resolved_env: config["env"] = resolved_env elif registry_name == "pypi": - config["command"] = runtime_hint or "uvx" - config["args"] = ( - [package_name] + processed_runtime_args + processed_package_args # noqa: RUF005 - ) - # For PyPI packages, use env block for environment variables - if resolved_env: - config["env"] = resolved_env - elif registry_name == "homebrew": - # For homebrew packages, assume the binary name is the command - config["command"] = ( - package_name.split("/")[-1] if "/" in package_name else package_name + self._apply_pypi_homebrew_generic_config( + config, + registry_name, + package_name, + runtime_hint, + processed_runtime_args, + processed_package_args, + resolved_env, ) - config["args"] = processed_runtime_args + processed_package_args - # For Homebrew packages, use env block for environment variables - if resolved_env: - config["env"] = resolved_env - else: - # Generic package handling - config["command"] = runtime_hint or package_name - config["args"] = processed_runtime_args + processed_package_args - # For generic packages, use env block for environment variables - if resolved_env: - config["env"] = resolved_env return config - def _process_arguments(self, arguments, resolved_env=None, runtime_vars=None): + def _process_arguments( # pylint: disable=duplicate-code # structural similarity with copilot adapter is intentional + self, arguments, resolved_env=None, runtime_vars=None + ): """Process argument objects to extract simple string values with environment resolution. Args: @@ -390,76 +370,8 @@ def _process_environment_variables(self, env_vars, env_overrides=None): Returns: dict: Dictionary of resolved environment variable values. """ - import os - import sys - - from rich.prompt import Prompt - - resolved = {} - env_overrides = env_overrides or {} - - # If env_overrides is provided, it means the CLI has already handled environment variable collection - # In this case, we should NEVER prompt for additional variables - skip_prompting = bool(env_overrides) - - # Check for CI/automated environment via APM_E2E_TESTS flag (more reliable than TTY detection) - if os.getenv("APM_E2E_TESTS") == "1": - skip_prompting = True - print(f" APM_E2E_TESTS detected, will skip environment variable prompts") # noqa: F541 - - # Also skip prompting if we're in a non-interactive environment (fallback) - is_interactive = sys.stdin.isatty() and sys.stdout.isatty() - if not is_interactive: - skip_prompting = True - - # Add default GitHub MCP server environment variables for essential functionality first - # This ensures variables have defaults when user provides empty values or they're optional default_github_env = {"GITHUB_TOOLSETS": "context", "GITHUB_DYNAMIC_TOOLSETS": "1"} - - # Track which variables were explicitly provided with empty values (user wants defaults) - empty_value_vars = set() - if env_overrides: - for key, value in env_overrides.items(): - if key in env_overrides and (not value or not value.strip()): - empty_value_vars.add(key) - - for env_var in env_vars: - if isinstance(env_var, dict): - name = env_var.get("name", "") - description = env_var.get("description", "") - required = env_var.get("required", True) - - if name: - # First check overrides, then environment - value = env_overrides.get(name) or os.getenv(name) - - # Only prompt if not provided in overrides or environment AND it's required AND we're not in managed override mode - if not value and required and not skip_prompting: - # Only prompt if not provided in overrides - prompt_text = f"Enter value for {name}" - if description: - prompt_text += f" ({description})" - value = Prompt.ask( - prompt_text, - password=True # noqa: SIM210 - if "token" in name.lower() or "key" in name.lower() - else False, - ) - - # Add variable if it has a value OR if user explicitly provided empty and we have a default - if value and value.strip(): - resolved[name] = value - elif name in empty_value_vars and name in default_github_env: - # User provided empty value and we have a default - use default - resolved[name] = default_github_env[name] - elif not required and name in default_github_env: - # Variable is optional and we have a default - use default - resolved[name] = default_github_env[name] - elif skip_prompting and name in default_github_env: - # Non-interactive environment and we have a default - use default - resolved[name] = default_github_env[name] - - return resolved + return self._resolve_env_vars_with_prompting(env_vars, env_overrides, default_github_env) def _resolve_variable_placeholders(self, value, resolved_env, runtime_vars): """Resolve both environment and runtime variable placeholders in values. diff --git a/src/apm_cli/adapters/client/copilot.py b/src/apm_cli/adapters/client/copilot.py index ae527f7b5..afd46598a 100644 --- a/src/apm_cli/adapters/client/copilot.py +++ b/src/apm_cli/adapters/client/copilot.py @@ -258,16 +258,8 @@ def configure_mcp_server( return False try: - # Use cached server info if available, otherwise fetch from registry - if server_info_cache and server_url in server_info_cache: - server_info = server_info_cache[server_url] - else: - # Fallback to registry lookup if not cached - server_info = self.registry_client.find_server_by_reference(server_url) - - # Fail if server is not found in registry - security requirement - if not server_info: - print(f"Error: MCP server '{server_url}' not found in registry") + server_info = self._fetch_server_info(server_url, server_info_cache) + if server_info is None: return False # Reset per-server tracking before formatting (so the per-server @@ -572,40 +564,7 @@ def _format_server_config(self, server_info, env_overrides=None, runtime_vars=No "id": server_info.get("id", ""), # Add registry UUID for conflict detection } - # Add authentication headers for GitHub MCP server - server_name = server_info.get("name", "") - is_github_server = self._is_github_server(server_name, remote.get("url", "")) - - if is_github_server: - # Use centralized token manager (copilot chain: GITHUB_COPILOT_PAT → GITHUB_TOKEN → GITHUB_APM_PAT), - # falling back to GITHUB_PERSONAL_ACCESS_TOKEN for Copilot CLI compat. - _tm = GitHubTokenManager() - github_token = _tm.get_token_for_purpose("copilot") or os.getenv( - "GITHUB_PERSONAL_ACCESS_TOKEN" - ) - if github_token: - config["headers"] = {"Authorization": f"Bearer {github_token}"} - - # Add any additional headers from registry if present - headers = remote.get("headers", []) - if headers: - if "headers" not in config: - config["headers"] = {} - for header in headers: - header_name = header.get("name", "") - header_value = header.get("value", "") - if header_name and header_value: - # Resolve environment variable value - resolved_value = self._resolve_env_variable( - header_name, header_value, env_overrides - ) - config["headers"][header_name] = resolved_value - - # Warn about unresolvable ${input:...} references in headers - if config.get("headers"): - self._warn_input_variables( - config["headers"], server_info.get("name", ""), "Copilot CLI" - ) + self._apply_auth_and_headers(config, remote, server_info, env_overrides) # Apply tools override from MCP dependency overlay if present tools_override = server_info.get("_apm_tools_override") @@ -628,84 +587,138 @@ def _format_server_config(self, server_info, env_overrides=None, runtime_vars=No if packages: # Use the first package for configuration (prioritize npm, then docker, then others) - package = self._select_best_package(packages) + self._select_and_dispatch_best_package(config, packages, env_overrides, runtime_vars) - if package: - registry_name = self._infer_registry_name(package) - package_name = package.get("name", "") - runtime_hint = package.get("runtime_hint", "") - runtime_arguments = package.get("runtime_arguments", []) - package_arguments = package.get("package_arguments", []) + # Apply tools override from MCP dependency overlay if present + tools_override = server_info.get("_apm_tools_override") + if tools_override: + config["tools"] = tools_override + + return config + + def _apply_auth_and_headers( + self, config, remote, server_info, env_overrides, runtime_label="Copilot CLI" + ): + """Inject GitHub token and registry-supplied headers into *config*. + + Delegates to :meth:`MCPClientAdapter._apply_auth_and_headers_impl`, + supplying ``GitHubTokenManager`` from this module's namespace so that + ``unittest.mock.patch("apm_cli.adapters.client.copilot.GitHubTokenManager")`` + correctly intercepts the instantiation in tests. + """ + self._apply_auth_and_headers_impl( + config, remote, server_info, env_overrides, runtime_label, GitHubTokenManager + ) - # Process arguments to extract simple string values - env_vars = package.get("environment_variables", []) + def _dispatch_package_to_config( + self, + config, + package_name, + registry_name, + runtime_hint, + processed_runtime_args, + processed_package_args, + resolved_env, + ): + """Populate *config* with command/args/env for a single package. - # Resolve environment variables first - resolved_env = self._resolve_environment_variables(env_vars, env_overrides) + Handles npm and docker natively; delegates pypi, homebrew, and + generic registries to :meth:`MCPClientAdapter._apply_pypi_homebrew_generic_config`. - processed_runtime_args = self._process_arguments( - runtime_arguments, resolved_env, runtime_vars + Args: + config: Mutable config dict; updated in place. + package_name: Registry package identifier. + registry_name: Registry type (``"npm"``, ``"docker"``, ``"pypi"``, …). + runtime_hint: Optional runtime override from the package entry. + processed_runtime_args: Pre-processed runtime argument list. + processed_package_args: Pre-processed package argument list. + resolved_env: Resolved environment variable mapping. + """ + if registry_name == "npm": + config["command"] = runtime_hint or "npx" + config["args"] = ( + ["-y", package_name] + processed_runtime_args + processed_package_args # noqa: RUF005 + ) + if resolved_env: + config["env"] = resolved_env + elif registry_name == "docker": + config["command"] = "docker" + if processed_runtime_args: + config["args"] = self._inject_env_vars_into_docker_args( + processed_runtime_args, resolved_env ) - processed_package_args = self._process_arguments( - package_arguments, resolved_env, runtime_vars + else: + config["args"] = DockerArgsProcessor.process_docker_args( + ["run", "-i", "--rm", package_name], resolved_env ) + else: + self._apply_pypi_homebrew_generic_config( + config, + registry_name, + package_name, + runtime_hint, + processed_runtime_args, + processed_package_args, + resolved_env, + ) - # Generate command and args based on package type - if registry_name == "npm": - config["command"] = runtime_hint or "npx" - config["args"] = ( - ["-y", package_name] + processed_runtime_args + processed_package_args # noqa: RUF005 - ) - # For NPM packages, use env block for environment variables - if resolved_env: - config["env"] = resolved_env - elif registry_name == "docker": - config["command"] = "docker" - - # For Docker packages, the registry provides the complete command template - # We should respect the runtime_arguments as the authoritative Docker command structure - if processed_runtime_args: - # Registry provides complete Docker command arguments - # Just inject environment variables where appropriate - config["args"] = self._inject_env_vars_into_docker_args( - processed_runtime_args, resolved_env - ) - else: - # Fallback to basic docker run command if no runtime args - config["args"] = DockerArgsProcessor.process_docker_args( - ["run", "-i", "--rm", package_name], resolved_env - ) - elif registry_name == "pypi": - config["command"] = runtime_hint or "uvx" - config["args"] = ( - [package_name] + processed_runtime_args + processed_package_args # noqa: RUF005 - ) - # For PyPI packages, use env block - if resolved_env: - config["env"] = resolved_env - elif registry_name == "homebrew": - # For homebrew packages, assume the binary name is the command - config["command"] = ( - package_name.split("/")[-1] if "/" in package_name else package_name - ) - config["args"] = processed_runtime_args + processed_package_args - # For Homebrew packages, use env block - if resolved_env: - config["env"] = resolved_env - else: - # Generic package handling - config["command"] = runtime_hint or package_name - config["args"] = processed_runtime_args + processed_package_args - # Use env block for generic packages - if resolved_env: - config["env"] = resolved_env + def _select_and_dispatch_best_package( + self, + config, + packages, + env_overrides, + runtime_vars, + set_type_stdio: bool = False, + ): + """Select the best package from *packages*, resolve env, and populate *config*. - # Apply tools override from MCP dependency overlay if present - tools_override = server_info.get("_apm_tools_override") - if tools_override: - config["tools"] = tools_override + Shared dispatch path used by both + :meth:`CopilotClientAdapter._build_server_config` and + :meth:`CursorClientAdapter._build_server_config`. - return config + Args: + config: Mutable config dict; updated in place. + packages: List of package dicts from the registry server info. + env_overrides: Pre-collected env-var overrides (may be empty). + runtime_vars: Runtime variable substitutions. + set_type_stdio: When ``True``, sets ``config["type"] = "stdio"`` + before dispatching (required by the Cursor format). + + Returns: + The selected package dict, or ``None`` if no package matched. + """ + package = self._select_best_package(packages) + if not package: + return None + + registry_name = self._infer_registry_name(package) + package_name = package.get("name", "") + runtime_hint = package.get("runtime_hint", "") + runtime_arguments = package.get("runtime_arguments", []) + package_arguments = package.get("package_arguments", []) + env_vars = package.get("environment_variables", []) + + resolved_env = self._resolve_environment_variables(env_vars, env_overrides) + processed_runtime_args = self._process_arguments( + runtime_arguments, resolved_env, runtime_vars + ) + processed_package_args = self._process_arguments( + package_arguments, resolved_env, runtime_vars + ) + + if set_type_stdio: + config["type"] = "stdio" + + self._dispatch_package_to_config( + config, + package_name, + registry_name, + runtime_hint, + processed_runtime_args, + processed_package_args, + resolved_env, + ) + return package def _resolve_environment_variables(self, env_vars, env_overrides=None): """Resolve (or translate) declared environment variables. @@ -810,71 +823,7 @@ def _resolve_environment_variables(self, env_vars, env_overrides=None): resolved[name] = _stringify_env_literal(value) return resolved - import os - import sys - - from rich.prompt import Prompt - - resolved = {} - env_overrides = env_overrides or {} - - # If env_overrides is provided, it means the CLI has already handled environment variable collection - # In this case, we should NEVER prompt for additional variables - skip_prompting = bool(env_overrides) - - # Check for CI/automated environment via APM_E2E_TESTS flag (more reliable than TTY detection) - if os.getenv("APM_E2E_TESTS") == "1": - skip_prompting = True - print(f" APM_E2E_TESTS detected, will skip environment variable prompts") # noqa: F541 - - # Also skip prompting if we're in a non-interactive environment (fallback) - is_interactive = sys.stdin.isatty() and sys.stdout.isatty() - if not is_interactive: - skip_prompting = True - - # Track which variables were explicitly provided with empty values (user wants defaults) - empty_value_vars = set() - if env_overrides: - for key, value in env_overrides.items(): - if key in env_overrides and (not value or not value.strip()): - empty_value_vars.add(key) - - for env_var in env_vars: - if isinstance(env_var, dict): - name = env_var.get("name", "") - description = env_var.get("description", "") - required = env_var.get("required", True) - - if name: - # First check overrides, then environment - value = env_overrides.get(name) or os.getenv(name) - - # Only prompt if not provided in overrides or environment AND it's required AND we're not in managed override mode - if not value and required and not skip_prompting: - prompt_text = f"Enter value for {name}" - if description: - prompt_text += f" ({description})" - value = Prompt.ask( - prompt_text, - password=True # noqa: SIM210 - if "token" in name.lower() or "key" in name.lower() - else False, - ) - - # Add variable if it has a value OR if user explicitly provided empty and we have a default - if value and value.strip(): - resolved[name] = value - elif name in empty_value_vars and name in default_github_env: - # User provided empty value and we have a default - use default - resolved[name] = default_github_env[name] - elif not required and name in default_github_env: - # Variable is optional and we have a default - use default - resolved[name] = default_github_env[name] - elif skip_prompting and name in default_github_env: - # Non-interactive environment and we have a default - use default - resolved[name] = default_github_env[name] - - return resolved + return self._resolve_env_vars_with_prompting(env_vars, env_overrides, default_github_env) def _resolve_env_variable(self, name, value, env_overrides=None): """Resolve (or translate) a single environment variable value. diff --git a/src/apm_cli/adapters/client/cursor.py b/src/apm_cli/adapters/client/cursor.py index d81289262..e7c1ec1dc 100644 --- a/src/apm_cli/adapters/client/cursor.py +++ b/src/apm_cli/adapters/client/cursor.py @@ -13,7 +13,6 @@ import os from pathlib import Path -from ...core.docker_args import DockerArgsProcessor from ...core.token_manager import GitHubTokenManager from .copilot import CopilotClientAdapter @@ -37,9 +36,22 @@ class CursorClientAdapter(CopilotClientAdapter): _supports_runtime_env_substitution: bool = False # ------------------------------------------------------------------ # - # Config path + # Auth-header injection override (for testability) # ------------------------------------------------------------------ # + def _apply_auth_and_headers( + self, config, remote, server_info, env_overrides, runtime_label="Cursor" + ): + """Inject GitHub token and registry-supplied headers into *config*. + + Overrides the parent to supply ``GitHubTokenManager`` from *this* + module's namespace, allowing tests to patch + ``apm_cli.adapters.client.cursor.GitHubTokenManager`` correctly. + """ + self._apply_auth_and_headers_impl( + config, remote, server_info, env_overrides, runtime_label, GitHubTokenManager + ) + def get_config_path(self): """Return the path to ``.cursor/mcp.json`` in the repository root. @@ -155,40 +167,7 @@ def _format_server_config(self, server_info, env_overrides=None, runtime_vars=No config["type"] = "http" config["url"] = (remote.get("url") or "").strip() - # Add authentication headers for GitHub MCP server - server_name = server_info.get("name", "") - is_github_server = self._is_github_server(server_name, remote.get("url", "")) - - if is_github_server: - _tm = GitHubTokenManager() - github_token = _tm.get_token_for_purpose("copilot") or os.getenv( - "GITHUB_PERSONAL_ACCESS_TOKEN" - ) - if github_token: - config["headers"] = {"Authorization": f"Bearer {github_token}"} - - # Add any additional headers from registry if present - headers = remote.get("headers", []) - if headers: - if "headers" not in config: - config["headers"] = {} - for header in headers: - header_name = header.get("name", "") - header_value = header.get("value", "") - if header_name and header_value: - # Prevent registry-supplied headers from overriding - # the injected GitHub token - if header_name == "Authorization" and is_github_server: - continue - resolved_value = self._resolve_env_variable( - header_name, header_value, env_overrides - ) - config["headers"][header_name] = resolved_value - - # Warn about unresolvable ${input:...} references in headers - if config.get("headers"): - self._warn_input_variables(config["headers"], server_info.get("name", ""), "Cursor") - + self._apply_auth_and_headers(config, remote, server_info, env_overrides, "Cursor") return config # --- local packages --- @@ -203,63 +182,10 @@ def _format_server_config(self, server_info, env_overrides=None, runtime_vars=No ) if packages: - package = self._select_best_package(packages) - - if package: - registry_name = self._infer_registry_name(package) - package_name = package.get("name", "") - runtime_hint = package.get("runtime_hint", "") - runtime_arguments = package.get("runtime_arguments", []) - package_arguments = package.get("package_arguments", []) - env_vars = package.get("environment_variables", []) - - resolved_env = self._resolve_environment_variables(env_vars, env_overrides) - processed_runtime_args = self._process_arguments( - runtime_arguments, resolved_env, runtime_vars - ) - processed_package_args = self._process_arguments( - package_arguments, resolved_env, runtime_vars - ) - - config["type"] = "stdio" - - if registry_name == "npm": - config["command"] = runtime_hint or "npx" - config["args"] = ( - ["-y", package_name] + processed_runtime_args + processed_package_args # noqa: RUF005 - ) - if resolved_env: - config["env"] = resolved_env - elif registry_name == "docker": - config["command"] = "docker" - if processed_runtime_args: - config["args"] = self._inject_env_vars_into_docker_args( - processed_runtime_args, resolved_env - ) - else: - config["args"] = DockerArgsProcessor.process_docker_args( - ["run", "-i", "--rm", package_name], resolved_env - ) - elif registry_name == "pypi": - config["command"] = runtime_hint or "uvx" - config["args"] = ( - [package_name] + processed_runtime_args + processed_package_args # noqa: RUF005 - ) - if resolved_env: - config["env"] = resolved_env - elif registry_name == "homebrew": - config["command"] = ( - package_name.split("/")[-1] if "/" in package_name else package_name - ) - config["args"] = processed_runtime_args + processed_package_args - if resolved_env: - config["env"] = resolved_env - else: - config["command"] = runtime_hint or package_name - config["args"] = processed_runtime_args + processed_package_args - if resolved_env: - config["env"] = resolved_env - else: + package = self._select_and_dispatch_best_package( + config, packages, env_overrides, runtime_vars, set_type_stdio=True + ) + if not package: raise ValueError( f"No supported package type found for Cursor. " f"Server: {server_info.get('name', 'unknown')}. " @@ -297,23 +223,11 @@ def configure_mcp_server( return True # nothing to do, not an error try: - # Use cached server info if available, otherwise fetch from registry - if server_info_cache and server_url in server_info_cache: - server_info = server_info_cache[server_url] - else: - server_info = self.registry_client.find_server_by_reference(server_url) - - if not server_info: - print(f"Error: MCP server '{server_url}' not found in registry") + server_info = self._fetch_server_info(server_url, server_info_cache) + if server_info is None: return False - # Determine config key - if server_name: - config_key = server_name - elif "/" in server_url: - config_key = server_url.split("/")[-1] - else: - config_key = server_url + config_key = self._determine_config_key(server_url, server_name) server_config = self._format_server_config(server_info, env_overrides, runtime_vars) self.update_config({config_key: server_config}) diff --git a/src/apm_cli/adapters/client/opencode.py b/src/apm_cli/adapters/client/opencode.py index 883e67fcd..7d7b74673 100644 --- a/src/apm_cli/adapters/client/opencode.py +++ b/src/apm_cli/adapters/client/opencode.py @@ -112,21 +112,11 @@ def configure_mcp_server( return False try: - if server_info_cache and server_url in server_info_cache: - server_info = server_info_cache[server_url] - else: - server_info = self.registry_client.find_server_by_reference(server_url) - - if not server_info: - print(f"Error: MCP server '{server_url}' not found in registry") + server_info = self._fetch_server_info(server_url, server_info_cache) + if server_info is None: return False - if server_name: - config_key = server_name - elif "/" in server_url: - config_key = server_url.split("/")[-1] - else: - config_key = server_url + config_key = self._determine_config_key(server_url, server_name) server_config = self._format_server_config(server_info, env_overrides, runtime_vars) self.update_config({config_key: server_config}, enabled=enabled) diff --git a/src/apm_cli/commands/_helpers.py b/src/apm_cli/commands/_helpers.py index 7f728dd87..bc7c5e9ec 100644 --- a/src/apm_cli/commands/_helpers.py +++ b/src/apm_cli/commands/_helpers.py @@ -14,7 +14,6 @@ from colorama import init as colorama_init from ..constants import ( - APM_DIR, APM_LOCK_FILENAME, # noqa: F401 APM_MODULES_DIR, APM_MODULES_GITIGNORE_PATTERN, @@ -27,6 +26,7 @@ from ..utils.path_security import PathTraversalError, validate_path_segments from ..utils.version_checker import check_for_updates from ..version import get_build_sha, get_version +from .deps._utils import _scan_installed_packages # CRITICAL: Shadow Click commands at module level to prevent namespace collision # When Click commands like 'config set' are defined, calling set() can invoke the command @@ -238,29 +238,6 @@ def _expand_with_ancestors( return expanded -def _scan_installed_packages(apm_modules_dir: Path) -> list: - """Scan *apm_modules_dir* for installed package paths. - - Walks the tree to find directories containing ``apm.yml`` or ``.apm``, - supporting GitHub (2-level), ADO (3-level), and subdirectory packages. - - Returns: - List of ``"owner/repo"`` or ``"org/project/repo"`` path keys. - """ - installed: list = [] - if not apm_modules_dir.exists(): - return installed - for candidate in apm_modules_dir.rglob("*"): - if not candidate.is_dir() or candidate.name.startswith("."): - continue - if not ((candidate / APM_YML_FILENAME).exists() or (candidate / APM_DIR).exists()): - continue - rel_parts = candidate.relative_to(apm_modules_dir).parts - if len(rel_parts) >= 2: - installed.append("/".join(rel_parts)) - return installed - - def _standalone_installed_packages( installed: Iterable[str], apm_modules_dir: Path, lockfile=None ) -> list: diff --git a/src/apm_cli/commands/marketplace/__init__.py b/src/apm_cli/commands/marketplace/__init__.py index aa3b50434..ceac956ae 100644 --- a/src/apm_cli/commands/marketplace/__init__.py +++ b/src/apm_cli/commands/marketplace/__init__.py @@ -886,22 +886,13 @@ def _load_current_versions(): def _extract_tag_versions(refs, entry, yml, include_prerelease): """Extract (SemVer, tag_name) pairs from remote refs for a package entry.""" + from ...marketplace._shared import iter_semver_tags from ...marketplace.tag_pattern import build_tag_regex pattern = entry.tag_pattern or yml.build.tag_pattern tag_rx = build_tag_regex(pattern) results = [] - for remote_ref in refs: - if not remote_ref.name.startswith("refs/tags/"): - continue - tag_name = remote_ref.name[len("refs/tags/") :] - m = tag_rx.match(tag_name) - if not m: - continue - version_str = m.group("version") - sv = parse_semver(version_str) - if sv is None: - continue + for sv, tag_name, _ in iter_semver_tags(refs, tag_rx): if sv.is_prerelease and not (include_prerelease or entry.include_prerelease): continue results.append((sv, tag_name)) diff --git a/src/apm_cli/compilation/claude_formatter.py b/src/apm_cli/compilation/claude_formatter.py index 3d6959a6b..6a7f36f25 100644 --- a/src/apm_cli/compilation/claude_formatter.py +++ b/src/apm_cli/compilation/claude_formatter.py @@ -11,11 +11,10 @@ from typing import Dict, List, Optional, Set # noqa: F401, UP035 from ..primitives.models import Chatmode, Instruction, PrimitiveCollection -from ..utils.paths import portable_relpath from ..version import get_version from .constants import BUILD_ID_PLACEHOLDER from .constitution import read_constitution -from .template_builder import render_instructions_block +from .template_builder import build_attributed_instructions # CRITICAL: Shadow Click commands to prevent namespace collision set = builtins.set @@ -278,21 +277,11 @@ def _generate_claude_content( sections.append("# Project Standards") sections.append("") - def _emit(instruction: Instruction) -> builtins.list[str]: - lines: builtins.list[str] = [] - if placement.source_attribution: - source = placement.source_attribution.get(str(instruction.file_path), "local") - rel_path = portable_relpath(instruction.file_path, self.base_dir) - lines.append(f"") - lines.append(instruction.content.strip()) - lines.append("") - return lines - sections.extend( - render_instructions_block( + build_attributed_instructions( placement.instructions, - base_dir=self.base_dir, - emit_instruction=_emit, + placement.source_attribution, + self.base_dir, ) ) diff --git a/src/apm_cli/compilation/distributed_compiler.py b/src/apm_cli/compilation/distributed_compiler.py index a3544186f..a8750c1a4 100644 --- a/src/apm_cli/compilation/distributed_compiler.py +++ b/src/apm_cli/compilation/distributed_compiler.py @@ -22,6 +22,7 @@ from .link_resolver import UnifiedLinkResolver from .template_builder import ( # noqa: F401 TemplateData, + build_attributed_instructions, find_chatmode_by_name, render_instructions_block, ) @@ -539,21 +540,11 @@ def _generate_agents_content( sections.append("") - def _emit(instruction: Instruction) -> builtins.list[str]: - lines: builtins.list[str] = [] - if placement.source_attribution: - source = placement.source_attribution.get(str(instruction.file_path), "local") - rel_path = portable_relpath(instruction.file_path, self.base_dir) - lines.append(f"") - lines.append(instruction.content.strip()) - lines.append("") - return lines - sections.extend( - render_instructions_block( + build_attributed_instructions( placement.instructions, - base_dir=self.base_dir, - emit_instruction=_emit, + placement.source_attribution, + self.base_dir, ) ) diff --git a/src/apm_cli/compilation/template_builder.py b/src/apm_cli/compilation/template_builder.py index 3f8fd4017..573ab7a8e 100644 --- a/src/apm_cli/compilation/template_builder.py +++ b/src/apm_cli/compilation/template_builder.py @@ -86,6 +86,46 @@ def _sort_key(inst: Instruction) -> str: return sections +def build_attributed_instructions( + instructions: list[Instruction], + source_attribution: dict | None, + base_dir: Path, +) -> list[str]: + """Render an instructions block with optional source-attribution comments. + + Convenience wrapper around :func:`render_instructions_block` that bundles + the common attribution-header ``_emit`` closure used by both + :class:`~apm_cli.compilation.claude_formatter.ClaudeFormatter` and + :class:`~apm_cli.compilation.distributed_compiler.DistributedCompiler`. + + Args: + instructions: Instructions to render. + source_attribution: Optional ``{str(file_path): source_label}`` map. + When provided, each instruction is prefixed with a + ```` comment. + base_dir: Directory used as anchor for stable sort keys. + + Returns: + Lines ready to be joined or extended into a parent ``sections`` list. + """ + + def _emit(instruction: Instruction) -> list[str]: + lines: list[str] = [] + if source_attribution: + source = source_attribution.get(str(instruction.file_path), "local") + rel_path = portable_relpath(instruction.file_path, base_dir) + lines.append(f"") + lines.append(instruction.content.strip()) + lines.append("") + return lines + + return render_instructions_block( + instructions, + base_dir=base_dir, + emit_instruction=_emit, + ) + + def build_conditional_sections(instructions: list[Instruction]) -> str: """Build sections grouped by applyTo patterns. diff --git a/src/apm_cli/deps/_shared.py b/src/apm_cli/deps/_shared.py new file mode 100644 index 000000000..16888aee8 --- /dev/null +++ b/src/apm_cli/deps/_shared.py @@ -0,0 +1,40 @@ +"""Shared helpers for APM package download and validation.""" + +from __future__ import annotations + +from pathlib import Path + + +def _validate_and_load_package(validation_result, target_path: Path, dep_ref) -> object: + """Check *validation_result*, clean up *target_path* on failure, and return the package. + + Args: + validation_result: Result from ``validate_apm_package(target_path)``. + target_path: Destination directory; removed on validation failure. + dep_ref: Dependency reference (for error messages and ``source`` assignment). + + Returns: + The :class:`~apm_cli.models.apm_package.APMPackage` from the validation + result (with ``source`` already set to ``dep_ref.to_github_url()``). + + Raises: + RuntimeError: If the package is invalid or metadata is missing. + """ + from ..utils.file_ops import robust_rmtree + + if not validation_result.is_valid: + if target_path.exists(): + robust_rmtree(target_path, ignore_errors=True) + error_msg = f"Invalid APM package {dep_ref.repo_url}:\n" + for error in validation_result.errors: + error_msg += f" - {error}\n" + raise RuntimeError(error_msg.strip()) + + if not validation_result.package: + raise RuntimeError( + f"Package validation succeeded but no package metadata found for {dep_ref.repo_url}" + ) + + package = validation_result.package + package.source = dep_ref.to_github_url() + return package diff --git a/src/apm_cli/deps/artifactory_orchestrator.py b/src/apm_cli/deps/artifactory_orchestrator.py index 7d6e12c71..cd7f0b616 100644 --- a/src/apm_cli/deps/artifactory_orchestrator.py +++ b/src/apm_cli/deps/artifactory_orchestrator.py @@ -220,20 +220,10 @@ def download_package( raise self._progress(progress_obj, progress_task_id, completed=70) + from ._shared import _validate_and_load_package + validation_result = validate_apm_package(target_path) - if not validation_result.is_valid: - if target_path.exists(): - _rmtree(target_path) - error_msg = f"Invalid APM package {dep_ref.repo_url}:\n" - for error in validation_result.errors: - error_msg += f" - {error}\n" - raise RuntimeError(error_msg.strip()) - if not validation_result.package: - raise RuntimeError( - f"Package validation succeeded but no package metadata found for {dep_ref.repo_url}" - ) - package = validation_result.package - package.source = dep_ref.to_github_url() + package = _validate_and_load_package(validation_result, target_path, dep_ref) package.resolved_commit = None resolved_ref = ResolvedReference( original_ref=f"{dep_ref.repo_url}#{ref}", diff --git a/src/apm_cli/deps/github_downloader.py b/src/apm_cli/deps/github_downloader.py index 3fb4782f7..8da85b13f 100644 --- a/src/apm_cli/deps/github_downloader.py +++ b/src/apm_cli/deps/github_downloader.py @@ -1622,25 +1622,10 @@ def download_package( raise # Validate the downloaded package - validation_result = validate_apm_package(target_path) - if not validation_result.is_valid: - # Clean up on validation failure - if target_path.exists(): - _rmtree(target_path) + from ._shared import _validate_and_load_package - error_msg = f"Invalid APM package {dep_ref.repo_url}:\n" - for error in validation_result.errors: - error_msg += f" - {error}\n" - raise RuntimeError(error_msg.strip()) - - # Load the APM package metadata - if not validation_result.package: - raise RuntimeError( - f"Package validation succeeded but no package metadata found for {dep_ref.repo_url}" - ) - - package = validation_result.package - package.source = dep_ref.to_github_url() + validation_result = validate_apm_package(target_path) + package = _validate_and_load_package(validation_result, target_path, dep_ref) package.resolved_commit = resolved_ref.resolved_commit # For plugins without an explicit version, use the short commit SHA so the diff --git a/src/apm_cli/integration/agent_integrator.py b/src/apm_cli/integration/agent_integrator.py index 2ff4b9f18..a005193ff 100644 --- a/src/apm_cli/integration/agent_integrator.py +++ b/src/apm_cli/integration/agent_integrator.py @@ -152,23 +152,14 @@ def integrate_agents_for_target( rel_path = portable_relpath(target_path, project_root) - if self.is_content_identical_to_source(target_path, source_file): - # Pre-existing file is byte-identical to source -- silently - # adopt so deployed_files reflects reality. See - # BaseIntegrator.is_content_identical_to_source for the - # full rationale (catch-22 fix). - target_paths.append(target_path) - files_adopted += 1 - continue - - if self.check_collision( - target_path, - rel_path, - managed_files, - force, - diagnostics=diagnostics, - ): - files_skipped += 1 + skip, adopted = self._check_adopt_or_skip( + target_path, source_file, rel_path, managed_files, force, diagnostics, target_paths + ) + if skip: + if adopted: + files_adopted += 1 + else: + files_skipped += 1 continue if mapping.format_id == "codex_agent": @@ -619,7 +610,7 @@ def integrate_package_agents_cursor( ) # DEPRECATED: use sync_for_target(KNOWN_TARGETS["cursor"], ...) instead. - def sync_integration_cursor( + def sync_integration_cursor( # pylint: disable=duplicate-code # deprecated shim; structural similarity is intentional self, apm_package, project_root: Path, @@ -657,7 +648,7 @@ def integrate_package_agents_opencode( ) # DEPRECATED: use sync_for_target(KNOWN_TARGETS["opencode"], ...) instead. - def sync_integration_opencode( + def sync_integration_opencode( # pylint: disable=duplicate-code # deprecated shim; structural similarity is intentional self, apm_package, project_root: Path, diff --git a/src/apm_cli/integration/base_integrator.py b/src/apm_cli/integration/base_integrator.py index e37b52af4..820975d74 100644 --- a/src/apm_cli/integration/base_integrator.py +++ b/src/apm_cli/integration/base_integrator.py @@ -235,6 +235,52 @@ def is_content_identical_to_source(target_path: Path, source_path: Path) -> bool except OSError: return False + def _check_adopt_or_skip( + self, + target_path: Path, + source_file: Path, + rel_path: str, + managed_files: set[str] | None, + force: bool, + diagnostics, + target_paths: list, + ) -> tuple[bool, bool]: + """Check whether *target_path* should be adopted or skipped. + + Combines :meth:`is_content_identical_to_source` (adopt) and + :meth:`check_collision` (skip) into a single call so integrators + share the decision logic without code duplication. + + When adopting, *target_path* is appended to *target_paths* as a + side effect so the caller's bookkeeping stays correct. + + Args: + target_path: Destination path on disk. + source_file: Source file to compare against for byte-identity. + rel_path: Relative path string used for collision detection and + diagnostics. + managed_files: Set of APM-managed relative paths; ``None`` means + none managed. + force: When ``True``, collisions are silently overwritten. + diagnostics: Optional diagnostics collector; forwarded to + :meth:`check_collision`. + target_paths: Mutable list; *target_path* is appended on adopt. + + Returns: + ``(skip, adopted)`` — when ``skip`` is ``True`` the caller must + ``continue`` (or otherwise skip writing this file); ``adopted`` + is ``True`` only when the existing file was byte-identical and + has been silently adopted. + """ + if self.is_content_identical_to_source(target_path, source_file): + target_paths.append(target_path) + return True, True + if self.check_collision( + target_path, rel_path, managed_files, force, diagnostics=diagnostics + ): + return True, False + return False, False + # Known integration prefixes that APM is allowed to deploy/remove under. # Derived from ``targets.KNOWN_TARGETS`` so adding a target auto-propagates. @staticmethod diff --git a/src/apm_cli/integration/command_integrator.py b/src/apm_cli/integration/command_integrator.py index bc0405f12..2ab1db023 100644 --- a/src/apm_cli/integration/command_integrator.py +++ b/src/apm_cli/integration/command_integrator.py @@ -550,21 +550,14 @@ def integrate_commands_for_target( rel_path = portable_relpath(target_path, project_root) - if self.is_content_identical_to_source(target_path, prompt_file): - # Pre-existing file is byte-identical to source -- silently - # adopt. See BaseIntegrator.is_content_identical_to_source. - target_paths.append(target_path) - files_adopted += 1 - continue - - if self.check_collision( - target_path, - rel_path, - managed_files, - force, - diagnostics=diagnostics, - ): - files_skipped += 1 + skip, adopted = self._check_adopt_or_skip( + target_path, prompt_file, rel_path, managed_files, force, diagnostics, target_paths + ) + if skip: + if adopted: + files_adopted += 1 + else: + files_skipped += 1 continue if mapping.format_id == "gemini_command": @@ -723,7 +716,12 @@ def integrate_package_commands( ) # DEPRECATED: use sync_for_target(KNOWN_TARGETS["claude"], ...) instead. - def sync_integration(self, apm_package, project_root: Path, managed_files: set = None) -> dict: # noqa: RUF013 + def sync_integration( # pylint: disable=duplicate-code # deprecated shim; structural similarity is intentional + self, + apm_package, + project_root: Path, + managed_files: set = None, # noqa: RUF013 + ) -> dict: """Remove APM-managed command files from .claude/commands/.""" from apm_cli.integration.targets import KNOWN_TARGETS @@ -767,7 +765,7 @@ def integrate_package_commands_opencode( ) # DEPRECATED: use sync_for_target(KNOWN_TARGETS["opencode"], ...) instead. - def sync_integration_opencode( + def sync_integration_opencode( # pylint: disable=duplicate-code # deprecated shim; structural similarity is intentional self, apm_package, project_root: Path, diff --git a/src/apm_cli/integration/instruction_integrator.py b/src/apm_cli/integration/instruction_integrator.py index 23620df1d..e4a5118c1 100644 --- a/src/apm_cli/integration/instruction_integrator.py +++ b/src/apm_cli/integration/instruction_integrator.py @@ -120,22 +120,14 @@ def integrate_instructions_for_target( rel_path = portable_relpath(target_path, project_root) - if self.is_content_identical_to_source(target_path, source_file): - # Pre-existing file is byte-identical to source -- silently - # adopt so deployed_files reflects reality. See - # BaseIntegrator.is_content_identical_to_source for rationale. - target_paths.append(target_path) - files_adopted += 1 - continue - - if self.check_collision( - target_path, - rel_path, - managed_files, - force, - diagnostics=diagnostics, - ): - files_skipped += 1 + skip, adopted = self._check_adopt_or_skip( + target_path, source_file, rel_path, managed_files, force, diagnostics, target_paths + ) + if skip: + if adopted: + files_adopted += 1 + else: + files_skipped += 1 continue if fmt == "cursor_rules": @@ -326,7 +318,7 @@ def integrate_package_instructions_cursor( ) # DEPRECATED: use sync_for_target(KNOWN_TARGETS["cursor"], ...) instead. - def sync_integration_cursor( + def sync_integration_cursor( # pylint: disable=duplicate-code # deprecated shim; structural similarity is intentional self, apm_package, project_root: Path, @@ -472,7 +464,7 @@ def integrate_package_instructions_claude( ) # DEPRECATED: use sync_for_target(KNOWN_TARGETS["claude"], ...) instead. - def sync_integration_claude( + def sync_integration_claude( # pylint: disable=duplicate-code # deprecated shim; structural similarity is intentional self, apm_package, project_root: Path, diff --git a/src/apm_cli/marketplace/__init__.py b/src/apm_cli/marketplace/__init__.py index faadf0e58..51d651c11 100644 --- a/src/apm_cli/marketplace/__init__.py +++ b/src/apm_cli/marketplace/__init__.py @@ -1,5 +1,7 @@ """Marketplace integration for plugin discovery and governance.""" +# pylint: disable=duplicate-code # __all__ re-exports intentionally mirror commands/marketplace surface + from .builder import ( BuildOptions, BuildReport, diff --git a/src/apm_cli/marketplace/_shared.py b/src/apm_cli/marketplace/_shared.py new file mode 100644 index 000000000..7262fac66 --- /dev/null +++ b/src/apm_cli/marketplace/_shared.py @@ -0,0 +1,48 @@ +"""Shared helpers for marketplace tag-version iteration. + +Extracted here to avoid duplicate-code violations between +``marketplace.builder`` and ``commands.marketplace``. +""" + +from __future__ import annotations + +from collections.abc import Generator +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .semver import SemVer + + +def iter_semver_tags( + refs: list, + tag_rx, +) -> Generator[tuple[SemVer, str, str], None, None]: + """Yield ``(SemVer, tag_name, sha)`` for every remote ref that: + + - is a git tag (``refs/tags/…``), + - matches *tag_rx* (must expose a ``version`` named group), and + - whose captured version string parses as a valid :class:`SemVer`. + + Prerelease filtering and range filtering are left to the caller. + + Args: + refs: Iterable of remote-ref objects (each with ``.name`` and + ``.sha`` attributes). + tag_rx: Compiled regular expression with a ``version`` named group. + + Yields: + Triples of ``(sv, tag_name, sha)`` in iteration order. + """ + from .semver import parse_semver + + for remote_ref in refs: + if not remote_ref.name.startswith("refs/tags/"): + continue + tag_name = remote_ref.name[len("refs/tags/") :] + m = tag_rx.match(tag_name) + if not m: + continue + sv = parse_semver(m.group("version")) + if sv is None: + continue + yield sv, tag_name, remote_ref.sha diff --git a/src/apm_cli/marketplace/builder.py b/src/apm_cli/marketplace/builder.py index 55080b281..ac8353af4 100644 --- a/src/apm_cli/marketplace/builder.py +++ b/src/apm_cli/marketplace/builder.py @@ -35,6 +35,7 @@ from ..utils.github_host import default_host from ..utils.path_security import ensure_path_within from ._io import atomic_write +from ._shared import iter_semver_tags from .diagnostics import BuildDiagnostic from .errors import ( BuildError, @@ -553,18 +554,7 @@ def _resolve_version_range( # Filter tags matching the pattern and extract versions candidates: list[tuple[SemVer, str, str]] = [] # (semver, tag_name, sha) - for remote_ref in refs: - if not remote_ref.name.startswith("refs/tags/"): - continue - tag_name = remote_ref.name[len("refs/tags/") :] - m = tag_rx.match(tag_name) - if not m: - continue - version_str = m.group("version") - sv = parse_semver(version_str) - if sv is None: - continue - + for sv, tag_name, sha in iter_semver_tags(refs, tag_rx): # Prerelease filter include_pre = entry.include_prerelease or self._options.include_prerelease if sv.is_prerelease and not include_pre: @@ -572,7 +562,7 @@ def _resolve_version_range( # Range filter if satisfies_range(sv, version_range): - candidates.append((sv, tag_name, remote_ref.sha)) + candidates.append((sv, tag_name, sha)) if not candidates: raise NoMatchingVersionError( diff --git a/src/apm_cli/policy/_shared.py b/src/apm_cli/policy/_shared.py new file mode 100644 index 000000000..052875f35 --- /dev/null +++ b/src/apm_cli/policy/_shared.py @@ -0,0 +1,40 @@ +"""Shared helpers for CI and policy checks.""" + +from __future__ import annotations + +from pathlib import Path + + +def _parse_apm_yml_safe(apm_yml_path: Path, result) -> object | None: + """Try to parse *apm_yml_path*, appending a check-result on failure. + + Assumes the caller has already verified that *apm_yml_path* exists. + + Args: + apm_yml_path: Path to the ``apm.yml`` file. + result: :class:`~apm_cli.policy.models.CIAuditResult` to append a + ``"manifest-parse"`` check to on failure. + + Returns: + :class:`~apm_cli.models.apm_package.APMPackage` on success; ``None`` + on parse failure — in which case a ``"manifest-parse"`` check has + been appended to *result* and the caller should return immediately. + """ + import yaml + + from ..models.apm_package import APMPackage, clear_apm_yml_cache + from .models import CheckResult + + try: + clear_apm_yml_cache() + return APMPackage.from_apm_yml(apm_yml_path) + except (ValueError, yaml.YAMLError, OSError) as exc: + result.checks.append( + CheckResult( + name="manifest-parse", + passed=False, + message="Cannot parse apm.yml: %s -- fix the YAML syntax error in apm.yml and re-run." # noqa: UP031 + % exc, + ) + ) + return None diff --git a/src/apm_cli/policy/ci_checks.py b/src/apm_cli/policy/ci_checks.py index 5d3b23918..4c94f2f23 100644 --- a/src/apm_cli/policy/ci_checks.py +++ b/src/apm_cli/policy/ci_checks.py @@ -524,7 +524,7 @@ def run_baseline_checks( Returns :class:`CIAuditResult` with individual check results. """ from ..deps.lockfile import LockFile, get_lockfile_path - from ..models.apm_package import APMPackage, clear_apm_yml_cache + from ._shared import _parse_apm_yml_safe result = CIAuditResult() apm_yml_path = project_root / "apm.yml" @@ -532,20 +532,8 @@ def run_baseline_checks( # Parse manifest ONCE -- this function owns parse-error handling. manifest = None if apm_yml_path.exists(): - import yaml - - try: - clear_apm_yml_cache() - manifest = APMPackage.from_apm_yml(apm_yml_path) - except (ValueError, yaml.YAMLError, OSError) as exc: - result.checks.append( - CheckResult( - name="manifest-parse", - passed=False, - message="Cannot parse apm.yml: %s -- fix the YAML syntax error in apm.yml and re-run." # noqa: UP031 - % exc, - ) - ) + manifest = _parse_apm_yml_safe(apm_yml_path, result) + if manifest is None: return result # Check 1: Lockfile exists (manifest already parsed, pass it in) diff --git a/src/apm_cli/policy/policy_checks.py b/src/apm_cli/policy/policy_checks.py index 6780ae3a6..1c2e5b5b7 100644 --- a/src/apm_cli/policy/policy_checks.py +++ b/src/apm_cli/policy/policy_checks.py @@ -925,7 +925,7 @@ def run_policy_checks( Returns :class:`CIAuditResult` with individual check results. """ from ..deps.lockfile import LockFile, get_lockfile_path - from ..models.apm_package import APMPackage, clear_apm_yml_cache + from ._shared import _parse_apm_yml_safe result = CIAuditResult() @@ -934,20 +934,8 @@ def run_policy_checks( if not apm_yml_path.exists(): return result - import yaml - - try: - clear_apm_yml_cache() - manifest = APMPackage.from_apm_yml(apm_yml_path) - except (ValueError, yaml.YAMLError, OSError) as exc: - result.checks.append( - CheckResult( - name="manifest-parse", - passed=False, - message="Cannot parse apm.yml: %s -- fix the YAML syntax error in apm.yml and re-run." # noqa: UP031 - % exc, - ) - ) + manifest = _parse_apm_yml_safe(apm_yml_path, result) + if manifest is None: return result # Load lockfile (optional -- some checks work without it) diff --git a/src/apm_cli/runtime/base.py b/src/apm_cli/runtime/base.py index 0999c8a45..21f9ff941 100644 --- a/src/apm_cli/runtime/base.py +++ b/src/apm_cli/runtime/base.py @@ -1,9 +1,49 @@ """Base runtime adapter interface for APM.""" +import subprocess from abc import ABC, abstractmethod from typing import Any, Dict, Optional # noqa: F401, UP035 +def _stream_subprocess_output(cmd: list, timeout: int | None = None) -> tuple[list, int]: + """Run *cmd* as a subprocess, stream stdout in real-time, and return output. + + Args: + cmd: Command and arguments list passed to :class:`subprocess.Popen`. + timeout: Optional wait timeout in seconds passed to + :meth:`subprocess.Popen.wait`. ``None`` waits indefinitely. + + Returns: + ``(output_lines, return_code)`` where *output_lines* is the list of + streamed stdout lines (including newlines) and *return_code* is the + process exit code. + """ + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, # Merge stderr into stdout for streaming + text=True, + encoding="utf-8", + bufsize=1, # Line buffered + ) + + output_lines = [] + + for line in iter(process.stdout.readline, ""): + print(line, end="", flush=True) + output_lines.append(line) + + try: + return_code = process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + process.kill() + for line in iter(process.stdout.readline, ""): + output_lines.append(line) + process.stdout.close() + raise + return output_lines, return_code + + class RuntimeAdapter(ABC): """Base adapter interface for LLM runtimes.""" diff --git a/src/apm_cli/runtime/copilot_runtime.py b/src/apm_cli/runtime/copilot_runtime.py index b344c2831..afdd6635c 100644 --- a/src/apm_cli/runtime/copilot_runtime.py +++ b/src/apm_cli/runtime/copilot_runtime.py @@ -7,7 +7,7 @@ from pathlib import Path from typing import Any, Dict, Optional # noqa: F401, UP035 -from .base import RuntimeAdapter +from .base import RuntimeAdapter, _stream_subprocess_output class CopilotRuntime(RuntimeAdapter): @@ -57,25 +57,7 @@ def execute_prompt(self, prompt_content: str, **kwargs) -> str: cmd.extend(["--add-dir", str(directory)]) # Execute Copilot CLI with real-time streaming - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, # Merge stderr into stdout for streaming - text=True, - encoding="utf-8", - bufsize=1, # Line buffered - ) - - output_lines = [] - - # Stream output in real-time - for line in iter(process.stdout.readline, ""): - # Print to terminal in real-time - print(line, end="", flush=True) - output_lines.append(line) - - # Wait for process to complete - return_code = process.wait(timeout=600) # 10 minute timeout for complex tasks + output_lines, return_code = _stream_subprocess_output(cmd, timeout=600) if return_code != 0: full_output = "".join(output_lines) @@ -90,8 +72,6 @@ def execute_prompt(self, prompt_content: str, **kwargs) -> str: return "".join(output_lines).strip() except subprocess.TimeoutExpired: - if "process" in locals(): - process.kill() raise RuntimeError("Copilot CLI execution timed out after 10 minutes") # noqa: B904 except FileNotFoundError: raise RuntimeError( # noqa: B904 diff --git a/src/apm_cli/runtime/llm_runtime.py b/src/apm_cli/runtime/llm_runtime.py index a7795e332..51a2c312d 100644 --- a/src/apm_cli/runtime/llm_runtime.py +++ b/src/apm_cli/runtime/llm_runtime.py @@ -5,7 +5,7 @@ import tempfile # noqa: F401 from typing import Any, Dict, Optional # noqa: F401, UP035 -from .base import RuntimeAdapter +from .base import RuntimeAdapter, _stream_subprocess_output class LLMRuntime(RuntimeAdapter): @@ -49,25 +49,7 @@ def execute_prompt(self, prompt_content: str, **kwargs) -> str: cmd.append(prompt_content) # Execute the command with real-time streaming - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, # Merge stderr into stdout for streaming - text=True, - encoding="utf-8", - bufsize=1, # Line buffered - ) - - output_lines = [] - - # Stream output in real-time - for line in iter(process.stdout.readline, ""): - # Print to terminal in real-time - print(line, end="", flush=True) - output_lines.append(line) - - # Wait for process to complete - return_code = process.wait() + output_lines, return_code = _stream_subprocess_output(cmd) if return_code != 0: full_output = "".join(output_lines) diff --git a/tests/unit/test_transitive_mcp.py b/tests/unit/test_transitive_mcp.py index f4e54e48b..6fb6bc37c 100644 --- a/tests/unit/test_transitive_mcp.py +++ b/tests/unit/test_transitive_mcp.py @@ -1,5 +1,6 @@ """Tests for transitive MCP dependency collection and deduplication.""" +from dataclasses import dataclass from unittest.mock import MagicMock, patch import pytest @@ -10,6 +11,23 @@ from apm_cli.models.apm_package import APMPackage, MCPDependency +@dataclass(frozen=True) +class _FakeResolvedTargets: + """Lightweight stand-in for ResolvedTargets used in unit tests.""" + + targets: list + source: str = "--target flag" + auto_create: bool = True + + +@pytest.fixture(autouse=False) +def _bypass_target_gate(): + """Mock resolve_targets so tests don't need harness marker files on disk.""" + fake = _FakeResolvedTargets(targets=["copilot"]) + with patch("apm_cli.core.target_detection.resolve_targets", return_value=fake): + yield + + @pytest.fixture def _isolated_targets(): """Decouple MCPIntegrator.install tests from cwd-based harness @@ -626,6 +644,7 @@ def test_root_deps_take_precedence_over_transitive(self): # --------------------------------------------------------------------------- # _install_mcp_dependencies # --------------------------------------------------------------------------- +@pytest.mark.usefixtures("_bypass_target_gate") class TestInstallMCPDependencies: @patch("apm_cli.integration.mcp_integrator._get_console", return_value=None) @patch("apm_cli.registry.operations.MCPServerOperations") @@ -801,6 +820,7 @@ def test_reads_each_runtime_config_once_for_multiple_servers( # --------------------------------------------------------------------------- # _install_mcp_dependencies – self-defined skip logic # --------------------------------------------------------------------------- +@pytest.mark.usefixtures("_bypass_target_gate") class TestInstallSelfDefinedSkipLogic: @patch( "apm_cli.integration.mcp_integrator.MCPIntegrator._check_self_defined_servers_needing_installation" @@ -1043,6 +1063,7 @@ def test_empty_list(self): # --------------------------------------------------------------------------- # Diff-aware install — self-defined servers with config drift # --------------------------------------------------------------------------- +@pytest.mark.usefixtures("_bypass_target_gate") class TestDiffAwareSelfDefinedInstall: @patch( "apm_cli.integration.mcp_integrator.MCPIntegrator._check_self_defined_servers_needing_installation" diff --git a/tests/unit/test_vscode_adapter.py b/tests/unit/test_vscode_adapter.py index 598d21609..9022d70c1 100644 --- a/tests/unit/test_vscode_adapter.py +++ b/tests/unit/test_vscode_adapter.py @@ -1216,24 +1216,75 @@ def test_warning_emitted_for_input_reference( self, ): mapping = {"Authorization": "Bearer ${input:my-token}"} - with patch("builtins.print") as mock_print: + with patch("apm_cli.adapters.client.base._rich_warning") as mock_warn: MCPClientAdapter._warn_input_variables(mapping, "my-server", "Copilot CLI") - mock_print.assert_called_once() - msg = mock_print.call_args[0][0] + mock_warn.assert_called_once() + msg = mock_warn.call_args[0][0] assert "my-token" in msg assert "Copilot CLI" in msg def test_no_warning_for_plain_values(self): mapping = {"Content-Type": "application/json"} - with patch("builtins.print") as mock_print: + with patch("apm_cli.adapters.client.base._rich_warning") as mock_warn: MCPClientAdapter._warn_input_variables(mapping, "s", "Codex CLI") - mock_print.assert_not_called() + mock_warn.assert_not_called() def test_no_warning_for_empty_mapping(self): - with patch("builtins.print") as mock_print: + with patch("apm_cli.adapters.client.base._rich_warning") as mock_warn: MCPClientAdapter._warn_input_variables({}, "s", "Codex CLI") MCPClientAdapter._warn_input_variables(None, "s", "Codex CLI") - mock_print.assert_not_called() + mock_warn.assert_not_called() + + +class TestApplyPypiHomebrewGenericConfig(unittest.TestCase): + """Regression tests for _apply_pypi_homebrew_generic_config edge cases.""" + + def test_homebrew_tap_formula_strips_prefix(self): + """Tap-formula like 'user/tap/formula' must produce command='formula'.""" + config = {} + MCPClientAdapter._apply_pypi_homebrew_generic_config( + config=config, + registry_name="homebrew", + package_name="user/tap/formula", + runtime_hint="", + processed_runtime_args=[], + processed_package_args=["--flag"], + resolved_env={}, + ) + self.assertEqual(config["command"], "formula") + self.assertEqual(config["args"], ["--flag"]) + self.assertNotIn("env", config) + + def test_homebrew_simple_name_unchanged(self): + """Plain formula name with no slash is used as-is.""" + config = {} + MCPClientAdapter._apply_pypi_homebrew_generic_config( + config=config, + registry_name="homebrew", + package_name="jq", + runtime_hint="", + processed_runtime_args=["--arg1"], + processed_package_args=[], + resolved_env={}, + ) + self.assertEqual(config["command"], "jq") + self.assertEqual(config["args"], ["--arg1"]) + + def test_pypi_uses_uvx_default(self): + """PyPI without runtime_hint defaults to 'uvx'.""" + config = {} + MCPClientAdapter._apply_pypi_homebrew_generic_config( + config=config, + registry_name="pypi", + package_name="my-pkg", + runtime_hint="", + processed_runtime_args=["--rt"], + processed_package_args=["--pa"], + resolved_env={"KEY": "val"}, + ) + self.assertEqual(config["command"], "uvx") + self.assertEqual(config["args"], ["my-pkg", "--rt", "--pa"]) + self.assertEqual(config["env"], {"KEY": "val"}) class TestWarnOnLegacyAngleVars(unittest.TestCase):