Files
skill-deploy-app/scripts/validate_compose.py
Darren Neese 994332a3f0 Initial commit: _deploy_app skill
Deploy new apps or push updates to existing deployments via
Docker Compose + Caddy + Gitea webhooks. Multi-server profiles,
auto-detection of deployment status, full infrastructure provisioning.

- SKILL.md: 715-line workflow documentation
- scripts/detect_deployment.py: deployment status detection
- scripts/validate_compose.py: compose file validation
- references/: infrastructure, compose patterns, Caddy patterns
- assets/: Makefile and compose templates
- config.json: mew server profile
2026-03-25 21:12:30 -04:00

725 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
validate-compose.py — Production-readiness validator for Docker Compose files.
Usage:
python3 validate-compose.py <path/to/docker-compose.yaml> [--strict]
Exit codes:
0 — passed (no errors; warnings may exist)
1 — failed (one or more errors found)
On Windows, use `python` instead of `python3` if needed.
"""
import argparse
import re
import sys
from pathlib import Path
from typing import Any
try:
import yaml # type: ignore[import-untyped]
except ImportError:
# Attempt stdlib tomllib fallback note — yaml is stdlib-adjacent but not
# truly stdlib. Provide a clear message rather than silently failing.
print("❌ ERROR: PyYAML is required. Install with: pip install pyyaml")
sys.exit(1)
# ---------------------------------------------------------------------------
# Result accumulator
# ---------------------------------------------------------------------------
class ValidationResult:
"""Accumulates errors, warnings, and info messages from all checks."""
def __init__(self) -> None:
self.errors: list[str] = []
self.warnings: list[str] = []
self.infos: list[str] = []
def error(self, msg: str) -> None:
self.errors.append(msg)
def warn(self, msg: str) -> None:
self.warnings.append(msg)
def info(self, msg: str) -> None:
self.infos.append(msg)
@property
def passed(self) -> bool:
return len(self.errors) == 0
def print_report(self) -> None:
"""Print a formatted validation report to stdout."""
total = len(self.errors) + len(self.warnings) + len(self.infos)
if total == 0:
print("✅ No issues found.")
return
if self.errors:
print(f"\n🔴 ERRORS ({len(self.errors)})")
for e in self.errors:
print(f"{e}")
if self.warnings:
print(f"\n🟡 WARNINGS ({len(self.warnings)})")
for w in self.warnings:
print(f"{w}")
if self.infos:
print(f"\n🔵 INFO ({len(self.infos)})")
for i in self.infos:
print(f" {i}")
print()
if self.passed:
print(f"✅ Passed ({len(self.warnings)} warning(s), {len(self.infos)} info(s))")
else:
print(f"❌ Failed ({len(self.errors)} error(s), {len(self.warnings)} warning(s))")
# ---------------------------------------------------------------------------
# Helper utilities
# ---------------------------------------------------------------------------
_SECRET_PATTERNS = [
re.compile(r"(password|passwd|secret|token|key|api_key|apikey|auth|credential)", re.I),
]
_HARDCODED_VALUE_PATTERN = re.compile(
r"^(?!.*\$\{)(?!changeme)(?!placeholder)(?!your-).{8,}$"
)
_ENV_VAR_REF_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}")
_PREFERRED_PORT_MIN = 50000
_PREFERRED_PORT_MAX = 60000
_DB_CACHE_IMAGES = [
"postgres", "postgresql",
"mariadb", "mysql",
"mongo", "mongodb",
"redis", "valkey",
"memcached",
"cassandra",
"couchdb",
"influxdb",
]
def _iter_services(compose: dict[str, Any]):
"""Yield (name, service_dict) for every service in the compose file."""
for name, svc in (compose.get("services") or {}).items():
yield name, (svc or {})
def _get_depends_on_names(depends_on: Any) -> list[str]:
"""Normalise depends_on to a flat list of service name strings."""
if isinstance(depends_on, list):
return depends_on
if isinstance(depends_on, dict):
return list(depends_on.keys())
return []
def _image_name_and_tag(image: str) -> tuple[str, str]:
"""Split 'image:tag' into (image_name, tag). Tag defaults to '' if absent."""
if ":" in image:
parts = image.rsplit(":", 1)
return parts[0], parts[1]
return image, ""
def _is_db_cache_image(image: str) -> bool:
name, _ = _image_name_and_tag(image)
base = name.split("/")[-1].lower()
return any(base == db or base.startswith(db) for db in _DB_CACHE_IMAGES)
def _collect_all_string_values(obj: Any, result: list[str]) -> None:
"""Recursively collect all string leaf values from a nested structure."""
if isinstance(obj, str):
result.append(obj)
elif isinstance(obj, dict):
for v in obj.values():
_collect_all_string_values(v, result)
elif isinstance(obj, list):
for item in obj:
_collect_all_string_values(item, result)
def _parse_host_port(port_spec: Any) -> int | None:
"""
Extract the host (published) port from a port mapping.
Supports:
- "8080:80"
- "127.0.0.1:8080:80"
- {"published": 8080, "target": 80}
- 8080 (short form — interpreted as host==container)
"""
if isinstance(port_spec, dict):
published = port_spec.get("published")
if published is not None:
try:
return int(published)
except (ValueError, TypeError):
pass
return None
spec = str(port_spec)
parts = spec.split(":")
# "hostip:hostport:containerport" → parts[-2] is host port
# "hostport:containerport" → parts[0] is host port
# "containerport" → no explicit host port mapping
if len(parts) >= 2:
try:
return int(parts[-2].split("/")[0])
except (ValueError, IndexError):
pass
elif len(parts) == 1:
try:
return int(parts[0].split("/")[0])
except (ValueError, IndexError):
pass
return None
# ---------------------------------------------------------------------------
# Individual checks (original set)
# ---------------------------------------------------------------------------
def validate_image_tags(compose: dict[str, Any], result: ValidationResult) -> None:
"""Warn on :latest or untagged images."""
for name, svc in _iter_services(compose):
image = svc.get("image", "")
if not image:
continue
img_name, tag = _image_name_and_tag(image)
if not tag:
result.warn(f"[{name}] Image '{img_name}' has no tag — pin to a specific version.")
elif tag == "latest":
result.warn(f"[{name}] Image '{img_name}:latest' — never use :latest in production.")
def validate_restart_policy(compose: dict[str, Any], result: ValidationResult) -> None:
"""Check that all services have a restart policy."""
for name, svc in _iter_services(compose):
restart = svc.get("restart")
if not restart:
result.warn(f"[{name}] No restart policy — add 'restart: unless-stopped'.")
def validate_healthchecks(compose: dict[str, Any], result: ValidationResult) -> None:
"""Check that all services define or inherit a healthcheck."""
for name, svc in _iter_services(compose):
hc = svc.get("healthcheck")
if hc is None:
result.info(f"[{name}] No healthcheck defined — add one if the image supports it.")
elif isinstance(hc, dict) and hc.get("disable"):
result.info(f"[{name}] Healthcheck explicitly disabled.")
def validate_no_hardcoded_secrets(compose: dict[str, Any], result: ValidationResult) -> None:
"""Detect hardcoded secrets in environment and labels."""
for name, svc in _iter_services(compose):
env = svc.get("environment") or {}
items: list[tuple[str, str]] = []
if isinstance(env, dict):
items = list(env.items())
elif isinstance(env, list):
for entry in env:
if "=" in str(entry):
k, v = str(entry).split("=", 1)
items.append((k, v))
for key, value in items:
if not value or str(value).startswith("${"):
continue
for pat in _SECRET_PATTERNS:
if pat.search(key):
result.error(
f"[{name}] Possible hardcoded secret in env var '{key}'"
"use ${VAR_NAME} references and store values in .env."
)
break
def validate_resource_limits(compose: dict[str, Any], result: ValidationResult, strict: bool) -> None:
"""In strict mode, require resource limits on all services."""
if not strict:
return
for name, svc in _iter_services(compose):
deploy = svc.get("deploy") or {}
resources = deploy.get("resources") or {}
limits = resources.get("limits") or {}
mem = limits.get("memory") or svc.get("mem_limit")
cpus = limits.get("cpus") or svc.get("cpus")
if not mem:
result.error(f"[{name}] No memory limit set (strict mode) — add deploy.resources.limits.memory.")
if not cpus:
result.warn(f"[{name}] No CPU limit set — consider adding deploy.resources.limits.cpus.")
def validate_logging(compose: dict[str, Any], result: ValidationResult) -> None:
"""Warn when no logging config is specified."""
for name, svc in _iter_services(compose):
if not svc.get("logging"):
result.info(
f"[{name}] No logging config — consider adding logging.driver and options "
"(e.g. json-file with max-size/max-file)."
)
def validate_privileged_mode(compose: dict[str, Any], result: ValidationResult) -> None:
"""Warn on privileged containers."""
for name, svc in _iter_services(compose):
if svc.get("privileged"):
result.warn(f"[{name}] Running in privileged mode — grant only if strictly required.")
def validate_host_network(compose: dict[str, Any], result: ValidationResult) -> None:
"""Warn on host network mode."""
for name, svc in _iter_services(compose):
network_mode = svc.get("network_mode", "")
if network_mode == "host":
result.warn(f"[{name}] Using host network mode — isolate with a bridge network if possible.")
def validate_sensitive_volumes(compose: dict[str, Any], result: ValidationResult) -> None:
"""Warn on sensitive host paths mounted into containers."""
sensitive_paths = ["/etc", "/var/run/docker.sock", "/proc", "/sys", "/root", "/home"]
for name, svc in _iter_services(compose):
volumes = svc.get("volumes") or []
for vol in volumes:
if isinstance(vol, str):
host_part = vol.split(":")[0]
elif isinstance(vol, dict):
host_part = str(vol.get("source", ""))
else:
continue
for sensitive in sensitive_paths:
if host_part == sensitive or host_part.startswith(sensitive + "/"):
result.warn(
f"[{name}] Sensitive host path mounted: '{host_part}'"
"verify this is intentional."
)
def validate_traefik_network_consistency(compose: dict[str, Any], result: ValidationResult) -> None:
"""Ensure services with Traefik labels are joined to the Traefik network."""
traefik_network_names: set[str] = set()
# Heuristic: networks named 'traefik*' or 'proxy*' are Traefik-facing
for net_name in (compose.get("networks") or {}).keys():
if "traefik" in net_name.lower() or "proxy" in net_name.lower():
traefik_network_names.add(net_name)
for name, svc in _iter_services(compose):
labels = svc.get("labels") or {}
label_items: list[str] = []
if isinstance(labels, dict):
label_items = list(labels.keys())
elif isinstance(labels, list):
label_items = [str(l).split("=")[0] for l in labels]
has_traefik_label = any("traefik" in lbl.lower() for lbl in label_items)
if not has_traefik_label:
continue
svc_networks = set()
svc_net_section = svc.get("networks") or {}
if isinstance(svc_net_section, list):
svc_networks = set(svc_net_section)
elif isinstance(svc_net_section, dict):
svc_networks = set(svc_net_section.keys())
if traefik_network_names and not svc_networks.intersection(traefik_network_names):
result.warn(
f"[{name}] Has Traefik labels but is not on a Traefik-facing network "
f"({', '.join(traefik_network_names)})."
)
def validate_traefik_router_uniqueness(compose: dict[str, Any], result: ValidationResult) -> None:
"""Error on duplicate Traefik router names across services."""
seen_routers: dict[str, str] = {}
router_pattern = re.compile(r"traefik\.http\.routers\.([^.]+)\.", re.I)
for name, svc in _iter_services(compose):
labels = svc.get("labels") or {}
label_keys: list[str] = []
if isinstance(labels, dict):
label_keys = list(labels.keys())
elif isinstance(labels, list):
label_keys = [str(l).split("=")[0] for l in labels]
for key in label_keys:
m = router_pattern.match(key)
if m:
router_name = m.group(1).lower()
if router_name in seen_routers:
result.error(
f"[{name}] Duplicate Traefik router name '{router_name}' "
f"(also used in service '{seen_routers[router_name]}')."
)
else:
seen_routers[router_name] = name
def validate_container_name_uniqueness(compose: dict[str, Any], result: ValidationResult) -> None:
"""Error on duplicate container_name values."""
seen: dict[str, str] = {}
for name, svc in _iter_services(compose):
container_name = svc.get("container_name")
if not container_name:
continue
if container_name in seen:
result.error(
f"[{name}] Duplicate container_name '{container_name}' "
f"(also used by service '{seen[container_name]}')."
)
else:
seen[container_name] = name
def validate_depends_on(compose: dict[str, Any], result: ValidationResult) -> None:
"""Check that depends_on references valid service names."""
service_names = set((compose.get("services") or {}).keys())
for name, svc in _iter_services(compose):
deps = _get_depends_on_names(svc.get("depends_on") or [])
for dep in deps:
if dep not in service_names:
result.error(
f"[{name}] depends_on references unknown service '{dep}'."
)
def validate_networks(compose: dict[str, Any], result: ValidationResult) -> None:
"""Check that service networks are declared at the top level."""
declared = set((compose.get("networks") or {}).keys())
for name, svc in _iter_services(compose):
svc_nets = svc.get("networks") or {}
if isinstance(svc_nets, list):
used = set(svc_nets)
elif isinstance(svc_nets, dict):
used = set(svc_nets.keys())
else:
used = set()
for net in used:
if net not in declared:
result.error(
f"[{name}] Uses network '{net}' which is not declared in the "
"top-level 'networks' section."
)
def validate_volumes(compose: dict[str, Any], result: ValidationResult) -> None:
"""Check for undefined named volumes and orphaned top-level volume declarations."""
declared_volumes = set((compose.get("volumes") or {}).keys())
used_volumes: set[str] = set()
for name, svc in _iter_services(compose):
for vol in (svc.get("volumes") or []):
if isinstance(vol, str):
parts = vol.split(":")
ref = parts[0]
elif isinstance(vol, dict):
ref = str(vol.get("source", ""))
else:
continue
# Named volumes don't start with . / ~ or a drive letter pattern
if ref and not re.match(r"^[./~]|^[A-Za-z]:[/\\]", ref):
used_volumes.add(ref)
if declared_volumes and ref not in declared_volumes:
result.error(
f"[{name}] Uses named volume '{ref}' which is not declared "
"in the top-level 'volumes' section."
)
for vol in declared_volumes:
if vol not in used_volumes:
result.warn(f"Top-level volume '{vol}' is declared but never used by any service.")
def validate_port_conflicts(compose: dict[str, Any], result: ValidationResult) -> None:
"""Error on duplicate host port bindings."""
seen_ports: dict[int, str] = {}
for name, svc in _iter_services(compose):
for port_spec in (svc.get("ports") or []):
host_port = _parse_host_port(port_spec)
if host_port is None:
continue
if host_port in seen_ports:
result.error(
f"[{name}] Host port {host_port} conflicts with service "
f"'{seen_ports[host_port]}'."
)
else:
seen_ports[host_port] = name
# ---------------------------------------------------------------------------
# NEW checks
# ---------------------------------------------------------------------------
def validate_circular_dependencies(compose: dict[str, Any], result: ValidationResult) -> None:
"""Detect circular dependencies in the depends_on graph using DFS."""
services = compose.get("services") or {}
# Build adjacency list: service_name -> list of dependencies
graph: dict[str, list[str]] = {}
for name, svc in services.items():
graph[name] = _get_depends_on_names((svc or {}).get("depends_on") or [])
visited: set[str] = set()
in_stack: set[str] = set()
def dfs(node: str, path: list[str]) -> bool:
"""Return True if a cycle is detected."""
visited.add(node)
in_stack.add(node)
for neighbour in graph.get(node, []):
if neighbour not in graph:
# Unknown dependency — already caught by validate_depends_on
continue
if neighbour not in visited:
if dfs(neighbour, path + [neighbour]):
return True
elif neighbour in in_stack:
cycle_path = "".join(path + [neighbour])
result.error(
f"Circular dependency detected: {cycle_path}"
)
return True
in_stack.discard(node)
return False
for service_name in graph:
if service_name not in visited:
dfs(service_name, [service_name])
def validate_port_range(compose: dict[str, Any], result: ValidationResult) -> None:
"""Warn if host ports are outside the preferred 50000-60000 range."""
for name, svc in _iter_services(compose):
for port_spec in (svc.get("ports") or []):
host_port = _parse_host_port(port_spec)
if host_port is None:
continue
if not (_PREFERRED_PORT_MIN <= host_port <= _PREFERRED_PORT_MAX):
result.warn(
f"[{name}] Host port {host_port} is outside the preferred range "
f"{_PREFERRED_PORT_MIN}-{_PREFERRED_PORT_MAX}."
)
def validate_network_isolation(compose: dict[str, Any], result: ValidationResult) -> None:
"""Warn if database/cache services are exposed on external networks."""
top_level_networks = compose.get("networks") or {}
for name, svc in _iter_services(compose):
image = svc.get("image", "")
if not image or not _is_db_cache_image(image):
continue
svc_nets = svc.get("networks") or {}
if isinstance(svc_nets, list):
net_names = svc_nets
elif isinstance(svc_nets, dict):
net_names = list(svc_nets.keys())
else:
net_names = []
for net_name in net_names:
net_config = top_level_networks.get(net_name) or {}
# A network is considered "external" if it has external: true
# or if it is named in a way that suggests it is the proxy/public network.
is_external = net_config.get("external", False)
is_proxy_net = any(
kw in net_name.lower() for kw in ("traefik", "proxy", "public", "frontend")
)
if is_external or is_proxy_net:
result.warn(
f"[{name}] Database/cache service is connected to external or proxy "
f"network '{net_name}' — use an internal network for isolation."
)
def validate_version_tags(compose: dict[str, Any], result: ValidationResult) -> None:
"""Check image version tag quality beyond just the :latest check."""
semver_full = re.compile(r"^\d+\.\d+\.\d+") # major.minor.patch
semver_minor = re.compile(r"^\d+\.\d+$") # major.minor only
semver_major = re.compile(r"^\d+$") # major only
for name, svc in _iter_services(compose):
image = svc.get("image", "")
if not image:
continue
img_name, tag = _image_name_and_tag(image)
if not tag:
# Already caught by validate_image_tags — skip to avoid duplicate noise
continue
if tag == "latest":
# Also caught above — error level comes from validate_image_tags
result.error(
f"[{name}] Image '{img_name}:latest' — :latest is forbidden in production."
)
continue
if semver_full.match(tag):
# Fully pinned — great
pass
elif semver_minor.match(tag):
result.warn(
f"[{name}] Image '{img_name}:{tag}' uses major.minor only — "
"pin to a full major.minor.patch tag for reproducible builds."
)
elif semver_major.match(tag):
result.warn(
f"[{name}] Image '{img_name}:{tag}' uses major version only — "
"pin to at least major.minor.patch."
)
else:
# Non-semver tags (sha digests, named releases, etc.) — accept as info
result.info(
f"[{name}] Image '{img_name}:{tag}' uses a non-semver tag — "
"verify this is a pinned, stable release."
)
def validate_env_references(compose: dict[str, Any], result: ValidationResult) -> None:
"""
Check that ${VAR} references in service configs have matching definitions.
Scans all string values in each service's config for ${VAR} patterns, then
checks whether those variables appear in the service's `environment` block
or are referenced via `env_file`. Cannot validate the contents of .env files
— only structural consistency within the compose file itself is checked.
"""
for name, svc in _iter_services(compose):
# Collect all ${VAR} references from the service's values
all_values: list[str] = []
_collect_all_string_values(svc, all_values)
referenced_vars: set[str] = set()
for val in all_values:
for match in _ENV_VAR_REF_PATTERN.finditer(val):
referenced_vars.add(match.group(1))
if not referenced_vars:
continue
# Collect defined variable names from the environment block
env_section = svc.get("environment") or {}
defined_vars: set[str] = set()
if isinstance(env_section, dict):
defined_vars = set(env_section.keys())
elif isinstance(env_section, list):
for entry in env_section:
key = str(entry).split("=")[0]
defined_vars.add(key)
has_env_file = bool(svc.get("env_file"))
for var in sorted(referenced_vars):
if var in defined_vars:
continue # Explicitly defined — fine
if has_env_file:
# Likely in the .env file — we can't verify, so just note it
result.info(
f"[{name}] ${{{var}}} is referenced but not defined inline — "
"ensure it is present in the env_file."
)
else:
result.warn(
f"[{name}] ${{{var}}} is referenced but has no inline definition "
"and no env_file is configured — ensure it is in your .env file."
)
# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------
def run_all_checks(compose: dict[str, Any], strict: bool) -> ValidationResult:
"""Run every registered check and return the aggregated result."""
result = ValidationResult()
# Original checks
validate_image_tags(compose, result)
validate_restart_policy(compose, result)
validate_healthchecks(compose, result)
validate_no_hardcoded_secrets(compose, result)
validate_resource_limits(compose, result, strict)
validate_logging(compose, result)
validate_privileged_mode(compose, result)
validate_host_network(compose, result)
validate_sensitive_volumes(compose, result)
validate_traefik_network_consistency(compose, result)
validate_traefik_router_uniqueness(compose, result)
validate_container_name_uniqueness(compose, result)
validate_depends_on(compose, result)
validate_networks(compose, result)
validate_volumes(compose, result)
validate_port_conflicts(compose, result)
# New checks
validate_circular_dependencies(compose, result)
validate_port_range(compose, result)
validate_network_isolation(compose, result)
validate_version_tags(compose, result)
validate_env_references(compose, result)
return result
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Validate a Docker Compose file for production readiness.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("compose_file", help="Path to the docker-compose.yaml file.")
parser.add_argument(
"--strict",
action="store_true",
help="Enable strict mode: resource limits become errors, not warnings.",
)
args = parser.parse_args()
compose_path = Path(args.compose_file)
if not compose_path.exists():
print(f"❌ File not found: {compose_path}")
sys.exit(1)
try:
with compose_path.open(encoding="utf-8") as fh:
compose = yaml.safe_load(fh)
except yaml.YAMLError as exc:
print(f"❌ YAML parse error: {exc}")
sys.exit(1)
if not isinstance(compose, dict):
print("❌ Compose file did not parse to a mapping — is it a valid YAML file?")
sys.exit(1)
mode_label = " [STRICT]" if args.strict else ""
print(f"🐳 Validating: {compose_path}{mode_label}")
print("-" * 60)
result = run_all_checks(compose, strict=args.strict)
result.print_report()
sys.exit(0 if result.passed else 1)
if __name__ == "__main__":
main()