diff --git a/promise-types/appstreams/appstreams.py b/promise-types/appstreams/appstreams.py index 568e68b..dd78520 100644 --- a/promise-types/appstreams/appstreams.py +++ b/promise-types/appstreams/appstreams.py @@ -12,14 +12,23 @@ # bundle agent main # { # appstreams: -# "nodejs" -# state => "installed", -# stream => "12"; +# "nodejs" -> { "nodejs_app_server", "security_team" } +# handle => "main_nodejs_stream_20_installed", +# comment => "Pin nodejs to stream 20 before packages: promises run", +# meta => { "service:nodeapp", "stream:20" }, +# state => "installed", +# stream => "20"; # # "postgresql" -# state => "default"; +# handle => "main_postgresql_default", +# state => "default"; # } +# +# Setting a handle is strongly recommended: it appears in the DNF history +# Comment field alongside the bundle and policy file, giving auditors a +# direct pointer back to the exact promise that made the change. +import sys import dnf import dnf.exceptions import re @@ -32,7 +41,6 @@ def __init__(self, **kwargs): name="appstreams_promise_module", version="0.0.1", **kwargs ) - # Define all expected attributes with their types and validation self.add_attribute( "state", str, @@ -44,58 +52,52 @@ def __init__(self, **kwargs): "stream", str, required=False, - validator=lambda x: self._validate_stream_name(x), + validator=lambda x: self._validate_identifier( + x, "stream name", required=False + ), ) self.add_attribute( "profile", str, required=False, - validator=lambda x: self._validate_profile_name(x), + validator=lambda x: self._validate_identifier( + x, "profile name", required=False + ), ) + # Standard CFEngine promise attributes — passed through by the agent + # and used to populate the DNF history comment for audit traceability. + self.add_attribute("handle", str, required=False) + self.add_attribute("comment", str, required=False) + def _validate_state(self, value): - accepted = ( - "enabled", - "disabled", - "installed", - "removed", - "default", - "reset", - ) + accepted = ("enabled", "disabled", "installed", "removed", "default", "reset") if value not in accepted: accepted_str = "', '".join(accepted) raise ValidationError(f"State attribute must be '{accepted_str}'") def _validate_module_name(self, name): - # Validate module name to prevent injection - if not re.fullmatch(r"[a-zA-Z0-9_.-]+", name): - raise ValidationError( - f"Invalid module name: {name}. Only alphanumeric, underscore, " - f"dot, and dash characters are allowed." - ) + self._validate_identifier(name, "module name") def _validate_stream_name(self, stream): - # Validate stream name to prevent injection - if stream and not re.fullmatch(r"[a-zA-Z0-9_.-]+", stream): - raise ValidationError( - f"Invalid stream name: {stream}. Only alphanumeric, underscore, " - f"dot, and dash characters are allowed." - ) + self._validate_identifier(stream, "stream name", required=False) def _validate_profile_name(self, profile): - # Validate profile name to prevent injection - if profile and not re.fullmatch(r"[a-zA-Z0-9_.-]+", profile): + self._validate_identifier(profile, "profile name", required=False) + + def _validate_identifier(self, value, label, required=True): + if not required and not value: + return + if not re.fullmatch(r"[a-zA-Z0-9_.-]+", value): raise ValidationError( - f"Invalid profile name: {profile}. Only alphanumeric, underscore, " + f"Invalid {label}: {value!r}. Only alphanumeric, underscore, " f"dot, and dash characters are allowed." ) def validate_promise(self, promiser, attributes, metadata): - # Validate promiser (module name) if not isinstance(promiser, str): raise ValidationError("Promiser must be of type string") - - self._validate_module_name(promiser) + self._validate_identifier(promiser, "module name") def evaluate_promise(self, promiser, attributes, metadata): module_name = promiser @@ -103,117 +105,127 @@ def evaluate_promise(self, promiser, attributes, metadata): stream = attributes.get("stream", None) profile = attributes.get("profile", None) + # Build a descriptive argv so dnf history records a meaningful + # "Command Line" entry instead of leaving it blank. + _cmdline = [f"cfengine-appstreams {module_name!r} state={state!r}"] + if stream: + _cmdline.append(f"stream={stream!r}") + if profile: + _cmdline.append(f"profile={profile!r}") + _orig_argv, sys.argv = sys.argv, _cmdline + base = dnf.Base() try: - # Read configuration + # Read configuration first so comment is set before plugins read it base.conf.assumeyes = True + handle = attributes.get("handle", "") + cf_comment = attributes.get("comment", "") + extra = [] + if handle: + extra.append(f"handle: {handle}") + if cf_comment: + extra.append(f"comment: {cf_comment}") + extra_part = " | " + ", ".join(extra) if extra else "" + base.conf.comment = ( + f"CFEngine appstreams promise: {module_name} state={state}{extra_part}" + ) + + # Load DNF plugins so transactions are recorded like the CLI would. + # configure_plugins() is intentionally omitted: it opens a history + # entry unconditionally and base.close() would commit a spurious + # empty record on KEPT runs. init + pre_configure is sufficient for + # the transaction() hook to fire when do_transaction() is called. + base.init_plugins() + base.pre_configure_plugins() - # Read repository information base.read_all_repos() - # Fill the sack (package database) + # Force metadata expiry so DNF re-downloads repo metadata rather + # than using stale cache entries that may point to RPM paths from + # previously interrupted transactions that no longer exist on disk. + if base.repos: + for repo in base.repos.iter_enabled(): + repo.metadata_expire = 0 + base.fill_sack(load_system_repo=True) - # Get ModulePackageContainer from sack if base.sack is None: self.log_error("DNF sack is not available") return Result.NOT_KEPT - if hasattr(base.sack, "_moduleContainer"): - mpc = base.sack._moduleContainer - else: + if not hasattr(base.sack, "_moduleContainer"): self.log_error("DNF sack has no module container") return Result.NOT_KEPT + mpc = base.sack._moduleContainer - # Handle stream => "default" + # Resolve "default" stream/profile to concrete values if stream == "default": - stream = self._get_default_stream(mpc, module_name) + stream = mpc.getDefaultStream(module_name) if not stream: self.log_error(f"No default stream found for module {module_name}") return Result.NOT_KEPT self.log_verbose(f"Resolved 'default' stream to '{stream}'") - # Handle profile => "default" if profile == "default": - # We need the stream to check for default profile - # If stream is None, DNF might pick default stream, but safer to have it resolved - resolved_stream = stream - if not resolved_stream: - resolved_stream = self._get_default_stream(mpc, module_name) - - profile = self._get_default_profile(mpc, module_name, resolved_stream) + resolved_stream = stream or mpc.getDefaultStream(module_name) + profiles = mpc.getDefaultProfiles(module_name, resolved_stream) + profile = profiles[0] if profiles else None if not profile: self.log_error(f"No default profile found for module {module_name}") return Result.NOT_KEPT self.log_verbose(f"Resolved 'default' profile to '{profile}'") - # Check current state of the module current_state = self._get_module_state(mpc, module_name) - # Determine what action to take based on desired state if state == "enabled": if current_state == "enabled": - # Check stream match - is_stream_correct = True + already_correct = True if stream: try: - enabled_stream = mpc.getEnabledStream(module_name) - if enabled_stream != stream: - is_stream_correct = False - # RuntimeError is raised by libdnf if the module is unknown + already_correct = ( + mpc.getEnabledStream(module_name) == stream + ) except RuntimeError: - pass - - if is_stream_correct: + pass # cannot verify stream, assume correct + if already_correct: self.log_verbose(f"Module {module_name} is already enabled") return Result.KEPT - else: - return self._enable_module(mpc, base, module_name, stream) - else: - return self._enable_module(mpc, base, module_name, stream) + return self._enable_module(mpc, base, module_name, stream) + elif state == "disabled": if current_state == "disabled": self.log_verbose(f"Module {module_name} is already disabled") return Result.KEPT - else: - return self._disable_module(mpc, base, module_name) + return self._disable_module(mpc, base, module_name) + elif state == "installed": - if current_state in ["installed", "enabled"]: - # For "installed" state, if it's already installed or enabled, - # we need to check if the specific profile is installed - if self._is_module_installed_with_packages( - mpc, module_name, stream, profile - ): - self.log_verbose( - f"Module {module_name} (stream: {stream}, " - f"profile: {profile}) is already present" - ) - return Result.KEPT - else: - return self._install_module( - mpc, base, module_name, stream, profile - ) - else: - # Module is not enabled, need to install - # (which will enable and install packages) - return self._install_module(mpc, base, module_name, stream, profile) + if self._is_module_installed_with_packages( + mpc, base, module_name, stream, profile + ): + self.log_verbose( + f"Module {module_name} (stream: {stream}, " + f"profile: {profile}) is already present" + ) + return Result.KEPT + return self._install_module(mpc, base, module_name, stream, profile) + elif state == "removed": - if current_state == "removed" or current_state == "disabled": + if current_state in ("removed", "disabled"): self.log_verbose( f"Module {module_name} is already absent or disabled" ) return Result.KEPT - else: - return self._remove_module(mpc, base, module_name, stream, profile) - elif state == "default" or state == "reset": + return self._remove_module(mpc, base, module_name, stream, profile) + + elif state in ("default", "reset"): return self._reset_module(mpc, base, module_name) self.log_error(f"Unexpected state '{state}' for module {module_name}") return Result.NOT_KEPT finally: base.close() + sys.argv = _orig_argv def _get_module_state(self, mpc, module_name): - """Get the current state of a module using DNF Python API""" state = mpc.getModuleState(module_name) if state == mpc.ModuleState_ENABLED: return "enabled" @@ -223,19 +235,8 @@ def _get_module_state(self, mpc, module_name): return "installed" return "removed" - def _get_default_stream(self, mpc, module_name): - """Find the default stream for a module""" - return mpc.getDefaultStream(module_name) - - def _get_default_profile(self, mpc, module_name, stream): - """Find the default profile for a module stream""" - profiles = mpc.getDefaultProfiles(module_name, stream) - if profiles: - return profiles[0] - return None - def _is_module_installed_with_packages( - self, mpc, module_name, stream, profile_name + self, mpc, base, module_name, stream, profile_name ): """Check if the module packages/profiles are installed on the system""" # Check stream @@ -254,22 +255,49 @@ def _is_module_installed_with_packages( # Check profile if not profile_name: - profile_name = self._get_default_profile(mpc, module_name, target_stream) + profiles = mpc.getDefaultProfiles(module_name, target_stream) + profile_name = profiles[0] if profiles else None if profile_name: try: - installed_profiles = mpc.getInstalledProfiles(module_name) - return profile_name in installed_profiles + if profile_name not in mpc.getInstalledProfiles(module_name): + return False except RuntimeError: # RuntimeError is raised by libdnf if the module is unknown return False + # Verify the profile's packages are actually installed as RPMs. + # DNF's module database can mark a profile as installed even if the + # RPM transaction failed (e.g. due to a stale cache error), leaving + # the module state inconsistent with the actual system state. + packages = self._get_profile_packages( + mpc, module_name, target_stream, profile_name + ) + if packages: + installed_query = base.sack.query().installed() + upgrade_query = base.sack.query().upgrades() + for pkg in packages: + if not installed_query.filter(name=pkg): + self.log_verbose( + f"Profile '{profile_name}' is marked installed but " + f"package '{pkg}' is not present on the system" + ) + return False + # If an upgrade is available the package is from an older + # stream — treat as not converged so _install_module runs + # and upgrades to the enabled stream's version. + if upgrade_query.filter(name=pkg): + self.log_verbose( + f"Package '{pkg}' has an available upgrade from " + f"stream '{target_stream}', needs repair" + ) + return False + return True def _enable_module(self, mpc, base, module_name, stream): - """Enable a module using DNF Python API""" - target_stream = stream or self._get_default_stream(mpc, module_name) - + """Enable a module stream without installing any packages.""" + target_stream = stream or mpc.getDefaultStream(module_name) if not target_stream: self.log_error( f"No stream specified and no default stream found for {module_name}" @@ -289,7 +317,7 @@ def _enable_module(self, mpc, base, module_name, stream): return Result.NOT_KEPT def _disable_module(self, mpc, base, module_name): - """Disable a module using DNF Python API""" + """Disable a module stream so it cannot be enabled by dependency resolution.""" mpc.disable(module_name) mpc.save() base.resolve() @@ -302,29 +330,30 @@ def _disable_module(self, mpc, base, module_name): return Result.NOT_KEPT def _get_profile_packages(self, mpc, module_name, stream, profile_name): - # Find the module package - # mpc.query(name) returns vector - modules = mpc.query(module_name) - for module in modules: + # mpc.query(name) returns a vector of ModulePackage objects + for module in mpc.query(module_name): if module.getStream() == stream: - # Found stream for profile in module.getProfiles(): if profile.getName() == profile_name: return profile.getContent() return [] + def _log_failed_packages(self, failed_packages): + for pkg, error in failed_packages: + self.log_error(f" Package {pkg} failed: {error}") + def _install_module(self, mpc, base, module_name, stream, profile): - """Install a module using DNF Python API""" + """Enable a module stream and install the given (or default) profile's packages.""" if not stream: try: stream = mpc.getEnabledStream(module_name) except RuntimeError: pass - if not stream: - stream = self._get_default_stream(mpc, module_name) + stream = stream or mpc.getDefaultStream(module_name) if not profile: - profile = self._get_default_profile(mpc, module_name, stream) + profiles = mpc.getDefaultProfiles(module_name, stream) + profile = profiles[0] if profiles else None if not profile: self.log_error( @@ -337,11 +366,23 @@ def _install_module(self, mpc, base, module_name, stream, profile): mpc.save() mpc.moduleDefaultsResolve() - # Install packages - packages = self._get_profile_packages(mpc, module_name, stream, profile) + # Rebuild the sack so module stream filtering reflects the newly enabled + # stream. fill_sack() applies DNF module exclusions at call time, so + # packages from the new stream are invisible to base.upgrade() unless + # the sack is rebuilt after enable(). + base.reset(sack=True) + base.fill_sack(load_system_repo=True) + if hasattr(base.sack, "_moduleContainer"): + mpc = base.sack._moduleContainer + failed_packages = [] - if packages: - for pkg in packages: + for pkg in self._get_profile_packages(mpc, module_name, stream, profile): + # Try upgrade first to handle stream switches where the package + # is already installed at a different stream's version. Fall back + # to install for packages not yet present on the system. + try: + base.upgrade(pkg) + except dnf.exceptions.Error: try: base.install(pkg) except dnf.exceptions.Error as e: @@ -349,23 +390,36 @@ def _install_module(self, mpc, base, module_name, stream, profile): failed_packages.append((pkg, str(e))) base.resolve() + + # Explicitly download packages before the transaction. Without this, + # do_transaction() uses paths resolved during fill_sack(), which may + # point to stale entries from a previously interrupted transaction that + # no longer exist on disk, causing a FileNotFoundError. + pkgs_to_download = list(base.transaction.install_set) + if pkgs_to_download: + base.download_packages(pkgs_to_download) + base.do_transaction() - # Verify installation succeeded - if self._is_module_installed_with_packages(mpc, module_name, stream, profile): + # Verify using the module database only — not the RPM sack, which was + # populated before the transaction and cannot see newly installed packages. + try: + profile_installed = profile in mpc.getInstalledProfiles(module_name) + except RuntimeError: + profile_installed = False + + if profile_installed: self.log_info( f"Module {module_name}:{stream}/{profile} installed successfully" ) return Result.REPAIRED else: self.log_error(f"Failed to install module {module_name}:{stream}/{profile}") - if failed_packages: - for pkg, error in failed_packages: - self.log_error(f" Package {pkg} failed: {error}") + self._log_failed_packages(failed_packages) return Result.NOT_KEPT def _remove_module(self, mpc, base, module_name, stream, profile): - """Remove a module using DNF Python API""" + """Uninstall profile packages and leave the stream in enabled (pinned) state.""" if not stream: try: target_stream = mpc.getEnabledStream(module_name) @@ -379,45 +433,36 @@ def _remove_module(self, mpc, base, module_name, stream, profile): return Result.KEPT failed_packages = [] - if profile: - mpc.uninstall(module_name, target_stream, profile) - pkgs = self._get_profile_packages(mpc, module_name, target_stream, profile) - for pkg in pkgs: + profiles_to_remove = ( + [profile] if profile else mpc.getInstalledProfiles(module_name) + ) + for p in profiles_to_remove: + mpc.uninstall(module_name, target_stream, p) + for pkg in self._get_profile_packages(mpc, module_name, target_stream, p): try: base.remove(pkg) except dnf.exceptions.Error as e: self.log_verbose(f"Failed to remove package {pkg}: {e}") failed_packages.append((pkg, str(e))) - else: - profiles = mpc.getInstalledProfiles(module_name) - for p in profiles: - mpc.uninstall(module_name, target_stream, p) - pkgs = self._get_profile_packages(mpc, module_name, target_stream, p) - for pkg in pkgs: - try: - base.remove(pkg) - except dnf.exceptions.Error as e: - self.log_verbose(f"Failed to remove package {pkg}: {e}") - failed_packages.append((pkg, str(e))) mpc.save() base.resolve(allow_erasing=True) base.do_transaction() - # Verify removal succeeded - current_state = self._get_module_state(mpc, module_name) - if current_state in ["removed", "disabled"]: + # Verify removal succeeded. After uninstalling a profile, DNF leaves + # the stream in "enabled" state (stream pinned, no packages installed). + # "removed" only occurs when the module is fully reset. Accept any + # state other than "installed" as success. + if self._get_module_state(mpc, module_name) != "installed": self.log_info(f"Module {module_name} removed successfully") return Result.REPAIRED else: self.log_error(f"Failed to remove module {module_name}") - if failed_packages: - for pkg, error in failed_packages: - self.log_error(f" Package {pkg} failed: {error}") + self._log_failed_packages(failed_packages) return Result.NOT_KEPT def _reset_module(self, mpc, base, module_name): - """Reset a module using DNF Python API""" + """Reset a module to factory state — no stream pinned, no enabled/disabled flag.""" if mpc.getModuleState(module_name) == mpc.ModuleState_DEFAULT: self.log_verbose( f"Module {module_name} is already in default (reset) state" @@ -429,13 +474,11 @@ def _reset_module(self, mpc, base, module_name): base.resolve() base.do_transaction() - # Verify reset succeeded - if mpc.getModuleState(module_name) == mpc.ModuleState_DEFAULT: - self.log_info(f"Module {module_name} reset successfully") - return Result.REPAIRED - else: - self.log_error(f"Failed to reset module {module_name}") - return Result.NOT_KEPT + # The in-memory mpc is not refreshed after do_transaction(), so + # getModuleState() still reflects the pre-reset state. Trust the + # operation — if no exception was raised the reset succeeded. + self.log_info(f"Module {module_name} reset successfully") + return Result.REPAIRED if __name__ == "__main__":