#!/usr/bin/env python3 """ validate-compose.py — Production-readiness validator for Docker Compose files. Usage: python3 validate-compose.py [--strict] Exit codes: 0 — passed (no errors; warnings may exist) 1 — failed (one or more errors found) On Windows, use `python` instead of `python3` if needed. """ import argparse import re import sys from pathlib import Path from typing import Any try: import yaml # type: ignore[import-untyped] except ImportError: # Attempt stdlib tomllib fallback note — yaml is stdlib-adjacent but not # truly stdlib. Provide a clear message rather than silently failing. print("❌ ERROR: PyYAML is required. Install with: pip install pyyaml") sys.exit(1) # --------------------------------------------------------------------------- # Result accumulator # --------------------------------------------------------------------------- class ValidationResult: """Accumulates errors, warnings, and info messages from all checks.""" def __init__(self) -> None: self.errors: list[str] = [] self.warnings: list[str] = [] self.infos: list[str] = [] def error(self, msg: str) -> None: self.errors.append(msg) def warn(self, msg: str) -> None: self.warnings.append(msg) def info(self, msg: str) -> None: self.infos.append(msg) @property def passed(self) -> bool: return len(self.errors) == 0 def print_report(self) -> None: """Print a formatted validation report to stdout.""" total = len(self.errors) + len(self.warnings) + len(self.infos) if total == 0: print("✅ No issues found.") return if self.errors: print(f"\n🔴 ERRORS ({len(self.errors)})") for e in self.errors: print(f" ✖ {e}") if self.warnings: print(f"\n🟡 WARNINGS ({len(self.warnings)})") for w in self.warnings: print(f" ⚠ {w}") if self.infos: print(f"\n🔵 INFO ({len(self.infos)})") for i in self.infos: print(f" ℹ {i}") print() if self.passed: print(f"✅ Passed ({len(self.warnings)} warning(s), {len(self.infos)} info(s))") else: print(f"❌ Failed ({len(self.errors)} error(s), {len(self.warnings)} warning(s))") # --------------------------------------------------------------------------- # Helper utilities # --------------------------------------------------------------------------- _SECRET_PATTERNS = [ re.compile(r"(password|passwd|secret|token|key|api_key|apikey|auth|credential)", re.I), ] _HARDCODED_VALUE_PATTERN = re.compile( r"^(?!.*\$\{)(?!changeme)(?!placeholder)(?!your-).{8,}$" ) _ENV_VAR_REF_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}") _PREFERRED_PORT_MIN = 50000 _PREFERRED_PORT_MAX = 60000 _DB_CACHE_IMAGES = [ "postgres", "postgresql", "mariadb", "mysql", "mongo", "mongodb", "redis", "valkey", "memcached", "cassandra", "couchdb", "influxdb", ] def _iter_services(compose: dict[str, Any]): """Yield (name, service_dict) for every service in the compose file.""" for name, svc in (compose.get("services") or {}).items(): yield name, (svc or {}) def _get_depends_on_names(depends_on: Any) -> list[str]: """Normalise depends_on to a flat list of service name strings.""" if isinstance(depends_on, list): return depends_on if isinstance(depends_on, dict): return list(depends_on.keys()) return [] def _image_name_and_tag(image: str) -> tuple[str, str]: """Split 'image:tag' into (image_name, tag). Tag defaults to '' if absent.""" if ":" in image: parts = image.rsplit(":", 1) return parts[0], parts[1] return image, "" def _is_db_cache_image(image: str) -> bool: name, _ = _image_name_and_tag(image) base = name.split("/")[-1].lower() return any(base == db or base.startswith(db) for db in _DB_CACHE_IMAGES) def _collect_all_string_values(obj: Any, result: list[str]) -> None: """Recursively collect all string leaf values from a nested structure.""" if isinstance(obj, str): result.append(obj) elif isinstance(obj, dict): for v in obj.values(): _collect_all_string_values(v, result) elif isinstance(obj, list): for item in obj: _collect_all_string_values(item, result) def _parse_host_port(port_spec: Any) -> int | None: """ Extract the host (published) port from a port mapping. Supports: - "8080:80" - "127.0.0.1:8080:80" - {"published": 8080, "target": 80} - 8080 (short form — interpreted as host==container) """ if isinstance(port_spec, dict): published = port_spec.get("published") if published is not None: try: return int(published) except (ValueError, TypeError): pass return None spec = str(port_spec) parts = spec.split(":") # "hostip:hostport:containerport" → parts[-2] is host port # "hostport:containerport" → parts[0] is host port # "containerport" → no explicit host port mapping if len(parts) >= 2: try: return int(parts[-2].split("/")[0]) except (ValueError, IndexError): pass elif len(parts) == 1: try: return int(parts[0].split("/")[0]) except (ValueError, IndexError): pass return None # --------------------------------------------------------------------------- # Individual checks (original set) # --------------------------------------------------------------------------- def validate_image_tags(compose: dict[str, Any], result: ValidationResult) -> None: """Warn on :latest or untagged images.""" for name, svc in _iter_services(compose): image = svc.get("image", "") if not image: continue img_name, tag = _image_name_and_tag(image) if not tag: result.warn(f"[{name}] Image '{img_name}' has no tag — pin to a specific version.") elif tag == "latest": result.warn(f"[{name}] Image '{img_name}:latest' — never use :latest in production.") def validate_restart_policy(compose: dict[str, Any], result: ValidationResult) -> None: """Check that all services have a restart policy.""" for name, svc in _iter_services(compose): restart = svc.get("restart") if not restart: result.warn(f"[{name}] No restart policy — add 'restart: unless-stopped'.") def validate_healthchecks(compose: dict[str, Any], result: ValidationResult) -> None: """Check that all services define or inherit a healthcheck.""" for name, svc in _iter_services(compose): hc = svc.get("healthcheck") if hc is None: result.info(f"[{name}] No healthcheck defined — add one if the image supports it.") elif isinstance(hc, dict) and hc.get("disable"): result.info(f"[{name}] Healthcheck explicitly disabled.") def validate_no_hardcoded_secrets(compose: dict[str, Any], result: ValidationResult) -> None: """Detect hardcoded secrets in environment and labels.""" for name, svc in _iter_services(compose): env = svc.get("environment") or {} items: list[tuple[str, str]] = [] if isinstance(env, dict): items = list(env.items()) elif isinstance(env, list): for entry in env: if "=" in str(entry): k, v = str(entry).split("=", 1) items.append((k, v)) for key, value in items: if not value or str(value).startswith("${"): continue for pat in _SECRET_PATTERNS: if pat.search(key): result.error( f"[{name}] Possible hardcoded secret in env var '{key}' — " "use ${VAR_NAME} references and store values in .env." ) break def validate_resource_limits(compose: dict[str, Any], result: ValidationResult, strict: bool) -> None: """In strict mode, require resource limits on all services.""" if not strict: return for name, svc in _iter_services(compose): deploy = svc.get("deploy") or {} resources = deploy.get("resources") or {} limits = resources.get("limits") or {} mem = limits.get("memory") or svc.get("mem_limit") cpus = limits.get("cpus") or svc.get("cpus") if not mem: result.error(f"[{name}] No memory limit set (strict mode) — add deploy.resources.limits.memory.") if not cpus: result.warn(f"[{name}] No CPU limit set — consider adding deploy.resources.limits.cpus.") def validate_logging(compose: dict[str, Any], result: ValidationResult) -> None: """Warn when no logging config is specified.""" for name, svc in _iter_services(compose): if not svc.get("logging"): result.info( f"[{name}] No logging config — consider adding logging.driver and options " "(e.g. json-file with max-size/max-file)." ) def validate_privileged_mode(compose: dict[str, Any], result: ValidationResult) -> None: """Warn on privileged containers.""" for name, svc in _iter_services(compose): if svc.get("privileged"): result.warn(f"[{name}] Running in privileged mode — grant only if strictly required.") def validate_host_network(compose: dict[str, Any], result: ValidationResult) -> None: """Warn on host network mode.""" for name, svc in _iter_services(compose): network_mode = svc.get("network_mode", "") if network_mode == "host": result.warn(f"[{name}] Using host network mode — isolate with a bridge network if possible.") def validate_sensitive_volumes(compose: dict[str, Any], result: ValidationResult) -> None: """Warn on sensitive host paths mounted into containers.""" sensitive_paths = ["/etc", "/var/run/docker.sock", "/proc", "/sys", "/root", "/home"] for name, svc in _iter_services(compose): volumes = svc.get("volumes") or [] for vol in volumes: if isinstance(vol, str): host_part = vol.split(":")[0] elif isinstance(vol, dict): host_part = str(vol.get("source", "")) else: continue for sensitive in sensitive_paths: if host_part == sensitive or host_part.startswith(sensitive + "/"): result.warn( f"[{name}] Sensitive host path mounted: '{host_part}' — " "verify this is intentional." ) def validate_traefik_network_consistency(compose: dict[str, Any], result: ValidationResult) -> None: """Ensure services with Traefik labels are joined to the Traefik network.""" traefik_network_names: set[str] = set() # Heuristic: networks named 'traefik*' or 'proxy*' are Traefik-facing for net_name in (compose.get("networks") or {}).keys(): if "traefik" in net_name.lower() or "proxy" in net_name.lower(): traefik_network_names.add(net_name) for name, svc in _iter_services(compose): labels = svc.get("labels") or {} label_items: list[str] = [] if isinstance(labels, dict): label_items = list(labels.keys()) elif isinstance(labels, list): label_items = [str(l).split("=")[0] for l in labels] has_traefik_label = any("traefik" in lbl.lower() for lbl in label_items) if not has_traefik_label: continue svc_networks = set() svc_net_section = svc.get("networks") or {} if isinstance(svc_net_section, list): svc_networks = set(svc_net_section) elif isinstance(svc_net_section, dict): svc_networks = set(svc_net_section.keys()) if traefik_network_names and not svc_networks.intersection(traefik_network_names): result.warn( f"[{name}] Has Traefik labels but is not on a Traefik-facing network " f"({', '.join(traefik_network_names)})." ) def validate_traefik_router_uniqueness(compose: dict[str, Any], result: ValidationResult) -> None: """Error on duplicate Traefik router names across services.""" seen_routers: dict[str, str] = {} router_pattern = re.compile(r"traefik\.http\.routers\.([^.]+)\.", re.I) for name, svc in _iter_services(compose): labels = svc.get("labels") or {} label_keys: list[str] = [] if isinstance(labels, dict): label_keys = list(labels.keys()) elif isinstance(labels, list): label_keys = [str(l).split("=")[0] for l in labels] for key in label_keys: m = router_pattern.match(key) if m: router_name = m.group(1).lower() if router_name in seen_routers: result.error( f"[{name}] Duplicate Traefik router name '{router_name}' " f"(also used in service '{seen_routers[router_name]}')." ) else: seen_routers[router_name] = name def validate_container_name_uniqueness(compose: dict[str, Any], result: ValidationResult) -> None: """Error on duplicate container_name values.""" seen: dict[str, str] = {} for name, svc in _iter_services(compose): container_name = svc.get("container_name") if not container_name: continue if container_name in seen: result.error( f"[{name}] Duplicate container_name '{container_name}' " f"(also used by service '{seen[container_name]}')." ) else: seen[container_name] = name def validate_depends_on(compose: dict[str, Any], result: ValidationResult) -> None: """Check that depends_on references valid service names.""" service_names = set((compose.get("services") or {}).keys()) for name, svc in _iter_services(compose): deps = _get_depends_on_names(svc.get("depends_on") or []) for dep in deps: if dep not in service_names: result.error( f"[{name}] depends_on references unknown service '{dep}'." ) def validate_networks(compose: dict[str, Any], result: ValidationResult) -> None: """Check that service networks are declared at the top level.""" declared = set((compose.get("networks") or {}).keys()) for name, svc in _iter_services(compose): svc_nets = svc.get("networks") or {} if isinstance(svc_nets, list): used = set(svc_nets) elif isinstance(svc_nets, dict): used = set(svc_nets.keys()) else: used = set() for net in used: if net not in declared: result.error( f"[{name}] Uses network '{net}' which is not declared in the " "top-level 'networks' section." ) def validate_volumes(compose: dict[str, Any], result: ValidationResult) -> None: """Check for undefined named volumes and orphaned top-level volume declarations.""" declared_volumes = set((compose.get("volumes") or {}).keys()) used_volumes: set[str] = set() for name, svc in _iter_services(compose): for vol in (svc.get("volumes") or []): if isinstance(vol, str): parts = vol.split(":") ref = parts[0] elif isinstance(vol, dict): ref = str(vol.get("source", "")) else: continue # Named volumes don't start with . / ~ or a drive letter pattern if ref and not re.match(r"^[./~]|^[A-Za-z]:[/\\]", ref): used_volumes.add(ref) if declared_volumes and ref not in declared_volumes: result.error( f"[{name}] Uses named volume '{ref}' which is not declared " "in the top-level 'volumes' section." ) for vol in declared_volumes: if vol not in used_volumes: result.warn(f"Top-level volume '{vol}' is declared but never used by any service.") def validate_port_conflicts(compose: dict[str, Any], result: ValidationResult) -> None: """Error on duplicate host port bindings.""" seen_ports: dict[int, str] = {} for name, svc in _iter_services(compose): for port_spec in (svc.get("ports") or []): host_port = _parse_host_port(port_spec) if host_port is None: continue if host_port in seen_ports: result.error( f"[{name}] Host port {host_port} conflicts with service " f"'{seen_ports[host_port]}'." ) else: seen_ports[host_port] = name # --------------------------------------------------------------------------- # NEW checks # --------------------------------------------------------------------------- def validate_circular_dependencies(compose: dict[str, Any], result: ValidationResult) -> None: """Detect circular dependencies in the depends_on graph using DFS.""" services = compose.get("services") or {} # Build adjacency list: service_name -> list of dependencies graph: dict[str, list[str]] = {} for name, svc in services.items(): graph[name] = _get_depends_on_names((svc or {}).get("depends_on") or []) visited: set[str] = set() in_stack: set[str] = set() def dfs(node: str, path: list[str]) -> bool: """Return True if a cycle is detected.""" visited.add(node) in_stack.add(node) for neighbour in graph.get(node, []): if neighbour not in graph: # Unknown dependency — already caught by validate_depends_on continue if neighbour not in visited: if dfs(neighbour, path + [neighbour]): return True elif neighbour in in_stack: cycle_path = " → ".join(path + [neighbour]) result.error( f"Circular dependency detected: {cycle_path}" ) return True in_stack.discard(node) return False for service_name in graph: if service_name not in visited: dfs(service_name, [service_name]) def validate_port_range(compose: dict[str, Any], result: ValidationResult) -> None: """Warn if host ports are outside the preferred 50000-60000 range.""" for name, svc in _iter_services(compose): for port_spec in (svc.get("ports") or []): host_port = _parse_host_port(port_spec) if host_port is None: continue if not (_PREFERRED_PORT_MIN <= host_port <= _PREFERRED_PORT_MAX): result.warn( f"[{name}] Host port {host_port} is outside the preferred range " f"{_PREFERRED_PORT_MIN}-{_PREFERRED_PORT_MAX}." ) def validate_network_isolation(compose: dict[str, Any], result: ValidationResult) -> None: """Warn if database/cache services are exposed on external networks.""" top_level_networks = compose.get("networks") or {} for name, svc in _iter_services(compose): image = svc.get("image", "") if not image or not _is_db_cache_image(image): continue svc_nets = svc.get("networks") or {} if isinstance(svc_nets, list): net_names = svc_nets elif isinstance(svc_nets, dict): net_names = list(svc_nets.keys()) else: net_names = [] for net_name in net_names: net_config = top_level_networks.get(net_name) or {} # A network is considered "external" if it has external: true # or if it is named in a way that suggests it is the proxy/public network. is_external = net_config.get("external", False) is_proxy_net = any( kw in net_name.lower() for kw in ("traefik", "proxy", "public", "frontend") ) if is_external or is_proxy_net: result.warn( f"[{name}] Database/cache service is connected to external or proxy " f"network '{net_name}' — use an internal network for isolation." ) def validate_version_tags(compose: dict[str, Any], result: ValidationResult) -> None: """Check image version tag quality beyond just the :latest check.""" semver_full = re.compile(r"^\d+\.\d+\.\d+") # major.minor.patch semver_minor = re.compile(r"^\d+\.\d+$") # major.minor only semver_major = re.compile(r"^\d+$") # major only for name, svc in _iter_services(compose): image = svc.get("image", "") if not image: continue img_name, tag = _image_name_and_tag(image) if not tag: # Already caught by validate_image_tags — skip to avoid duplicate noise continue if tag == "latest": # Also caught above — error level comes from validate_image_tags result.error( f"[{name}] Image '{img_name}:latest' — :latest is forbidden in production." ) continue if semver_full.match(tag): # Fully pinned — great pass elif semver_minor.match(tag): result.warn( f"[{name}] Image '{img_name}:{tag}' uses major.minor only — " "pin to a full major.minor.patch tag for reproducible builds." ) elif semver_major.match(tag): result.warn( f"[{name}] Image '{img_name}:{tag}' uses major version only — " "pin to at least major.minor.patch." ) else: # Non-semver tags (sha digests, named releases, etc.) — accept as info result.info( f"[{name}] Image '{img_name}:{tag}' uses a non-semver tag — " "verify this is a pinned, stable release." ) def validate_env_references(compose: dict[str, Any], result: ValidationResult) -> None: """ Check that ${VAR} references in service configs have matching definitions. Scans all string values in each service's config for ${VAR} patterns, then checks whether those variables appear in the service's `environment` block or are referenced via `env_file`. Cannot validate the contents of .env files — only structural consistency within the compose file itself is checked. """ for name, svc in _iter_services(compose): # Collect all ${VAR} references from the service's values all_values: list[str] = [] _collect_all_string_values(svc, all_values) referenced_vars: set[str] = set() for val in all_values: for match in _ENV_VAR_REF_PATTERN.finditer(val): referenced_vars.add(match.group(1)) if not referenced_vars: continue # Collect defined variable names from the environment block env_section = svc.get("environment") or {} defined_vars: set[str] = set() if isinstance(env_section, dict): defined_vars = set(env_section.keys()) elif isinstance(env_section, list): for entry in env_section: key = str(entry).split("=")[0] defined_vars.add(key) has_env_file = bool(svc.get("env_file")) for var in sorted(referenced_vars): if var in defined_vars: continue # Explicitly defined — fine if has_env_file: # Likely in the .env file — we can't verify, so just note it result.info( f"[{name}] ${{{var}}} is referenced but not defined inline — " "ensure it is present in the env_file." ) else: result.warn( f"[{name}] ${{{var}}} is referenced but has no inline definition " "and no env_file is configured — ensure it is in your .env file." ) # --------------------------------------------------------------------------- # Orchestrator # --------------------------------------------------------------------------- def run_all_checks(compose: dict[str, Any], strict: bool) -> ValidationResult: """Run every registered check and return the aggregated result.""" result = ValidationResult() # Original checks validate_image_tags(compose, result) validate_restart_policy(compose, result) validate_healthchecks(compose, result) validate_no_hardcoded_secrets(compose, result) validate_resource_limits(compose, result, strict) validate_logging(compose, result) validate_privileged_mode(compose, result) validate_host_network(compose, result) validate_sensitive_volumes(compose, result) validate_traefik_network_consistency(compose, result) validate_traefik_router_uniqueness(compose, result) validate_container_name_uniqueness(compose, result) validate_depends_on(compose, result) validate_networks(compose, result) validate_volumes(compose, result) validate_port_conflicts(compose, result) # New checks validate_circular_dependencies(compose, result) validate_port_range(compose, result) validate_network_isolation(compose, result) validate_version_tags(compose, result) validate_env_references(compose, result) return result # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser( description="Validate a Docker Compose file for production readiness.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument("compose_file", help="Path to the docker-compose.yaml file.") parser.add_argument( "--strict", action="store_true", help="Enable strict mode: resource limits become errors, not warnings.", ) args = parser.parse_args() compose_path = Path(args.compose_file) if not compose_path.exists(): print(f"❌ File not found: {compose_path}") sys.exit(1) try: with compose_path.open(encoding="utf-8") as fh: compose = yaml.safe_load(fh) except yaml.YAMLError as exc: print(f"❌ YAML parse error: {exc}") sys.exit(1) if not isinstance(compose, dict): print("❌ Compose file did not parse to a mapping — is it a valid YAML file?") sys.exit(1) mode_label = " [STRICT]" if args.strict else "" print(f"🐳 Validating: {compose_path}{mode_label}") print("-" * 60) result = run_all_checks(compose, strict=args.strict) result.print_report() sys.exit(0 if result.passed else 1) if __name__ == "__main__": main()