Deploy new apps or push updates to existing deployments via Docker Compose + Caddy + Gitea webhooks. Multi-server profiles, auto-detection of deployment status, full infrastructure provisioning. - SKILL.md: 715-line workflow documentation - scripts/detect_deployment.py: deployment status detection - scripts/validate_compose.py: compose file validation - references/: infrastructure, compose patterns, Caddy patterns - assets/: Makefile and compose templates - config.json: mew server profile
725 lines
27 KiB
Python
725 lines
27 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
validate-compose.py — Production-readiness validator for Docker Compose files.
|
||
|
||
Usage:
|
||
python3 validate-compose.py <path/to/docker-compose.yaml> [--strict]
|
||
|
||
Exit codes:
|
||
0 — passed (no errors; warnings may exist)
|
||
1 — failed (one or more errors found)
|
||
|
||
On Windows, use `python` instead of `python3` if needed.
|
||
"""
|
||
|
||
import argparse
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
try:
|
||
import yaml # type: ignore[import-untyped]
|
||
except ImportError:
|
||
# Attempt stdlib tomllib fallback note — yaml is stdlib-adjacent but not
|
||
# truly stdlib. Provide a clear message rather than silently failing.
|
||
print("❌ ERROR: PyYAML is required. Install with: pip install pyyaml")
|
||
sys.exit(1)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Result accumulator
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class ValidationResult:
|
||
"""Accumulates errors, warnings, and info messages from all checks."""
|
||
|
||
def __init__(self) -> None:
|
||
self.errors: list[str] = []
|
||
self.warnings: list[str] = []
|
||
self.infos: list[str] = []
|
||
|
||
def error(self, msg: str) -> None:
|
||
self.errors.append(msg)
|
||
|
||
def warn(self, msg: str) -> None:
|
||
self.warnings.append(msg)
|
||
|
||
def info(self, msg: str) -> None:
|
||
self.infos.append(msg)
|
||
|
||
@property
|
||
def passed(self) -> bool:
|
||
return len(self.errors) == 0
|
||
|
||
def print_report(self) -> None:
|
||
"""Print a formatted validation report to stdout."""
|
||
total = len(self.errors) + len(self.warnings) + len(self.infos)
|
||
if total == 0:
|
||
print("✅ No issues found.")
|
||
return
|
||
|
||
if self.errors:
|
||
print(f"\n🔴 ERRORS ({len(self.errors)})")
|
||
for e in self.errors:
|
||
print(f" ✖ {e}")
|
||
|
||
if self.warnings:
|
||
print(f"\n🟡 WARNINGS ({len(self.warnings)})")
|
||
for w in self.warnings:
|
||
print(f" ⚠ {w}")
|
||
|
||
if self.infos:
|
||
print(f"\n🔵 INFO ({len(self.infos)})")
|
||
for i in self.infos:
|
||
print(f" ℹ {i}")
|
||
|
||
print()
|
||
if self.passed:
|
||
print(f"✅ Passed ({len(self.warnings)} warning(s), {len(self.infos)} info(s))")
|
||
else:
|
||
print(f"❌ Failed ({len(self.errors)} error(s), {len(self.warnings)} warning(s))")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helper utilities
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_SECRET_PATTERNS = [
|
||
re.compile(r"(password|passwd|secret|token|key|api_key|apikey|auth|credential)", re.I),
|
||
]
|
||
|
||
_HARDCODED_VALUE_PATTERN = re.compile(
|
||
r"^(?!.*\$\{)(?!changeme)(?!placeholder)(?!your-).{8,}$"
|
||
)
|
||
|
||
_ENV_VAR_REF_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}")
|
||
|
||
_PREFERRED_PORT_MIN = 50000
|
||
_PREFERRED_PORT_MAX = 60000
|
||
|
||
_DB_CACHE_IMAGES = [
|
||
"postgres", "postgresql",
|
||
"mariadb", "mysql",
|
||
"mongo", "mongodb",
|
||
"redis", "valkey",
|
||
"memcached",
|
||
"cassandra",
|
||
"couchdb",
|
||
"influxdb",
|
||
]
|
||
|
||
|
||
def _iter_services(compose: dict[str, Any]):
|
||
"""Yield (name, service_dict) for every service in the compose file."""
|
||
for name, svc in (compose.get("services") or {}).items():
|
||
yield name, (svc or {})
|
||
|
||
|
||
def _get_depends_on_names(depends_on: Any) -> list[str]:
|
||
"""Normalise depends_on to a flat list of service name strings."""
|
||
if isinstance(depends_on, list):
|
||
return depends_on
|
||
if isinstance(depends_on, dict):
|
||
return list(depends_on.keys())
|
||
return []
|
||
|
||
|
||
def _image_name_and_tag(image: str) -> tuple[str, str]:
|
||
"""Split 'image:tag' into (image_name, tag). Tag defaults to '' if absent."""
|
||
if ":" in image:
|
||
parts = image.rsplit(":", 1)
|
||
return parts[0], parts[1]
|
||
return image, ""
|
||
|
||
|
||
def _is_db_cache_image(image: str) -> bool:
|
||
name, _ = _image_name_and_tag(image)
|
||
base = name.split("/")[-1].lower()
|
||
return any(base == db or base.startswith(db) for db in _DB_CACHE_IMAGES)
|
||
|
||
|
||
def _collect_all_string_values(obj: Any, result: list[str]) -> None:
|
||
"""Recursively collect all string leaf values from a nested structure."""
|
||
if isinstance(obj, str):
|
||
result.append(obj)
|
||
elif isinstance(obj, dict):
|
||
for v in obj.values():
|
||
_collect_all_string_values(v, result)
|
||
elif isinstance(obj, list):
|
||
for item in obj:
|
||
_collect_all_string_values(item, result)
|
||
|
||
|
||
def _parse_host_port(port_spec: Any) -> int | None:
|
||
"""
|
||
Extract the host (published) port from a port mapping.
|
||
|
||
Supports:
|
||
- "8080:80"
|
||
- "127.0.0.1:8080:80"
|
||
- {"published": 8080, "target": 80}
|
||
- 8080 (short form — interpreted as host==container)
|
||
"""
|
||
if isinstance(port_spec, dict):
|
||
published = port_spec.get("published")
|
||
if published is not None:
|
||
try:
|
||
return int(published)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
return None
|
||
|
||
spec = str(port_spec)
|
||
parts = spec.split(":")
|
||
# "hostip:hostport:containerport" → parts[-2] is host port
|
||
# "hostport:containerport" → parts[0] is host port
|
||
# "containerport" → no explicit host port mapping
|
||
if len(parts) >= 2:
|
||
try:
|
||
return int(parts[-2].split("/")[0])
|
||
except (ValueError, IndexError):
|
||
pass
|
||
elif len(parts) == 1:
|
||
try:
|
||
return int(parts[0].split("/")[0])
|
||
except (ValueError, IndexError):
|
||
pass
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Individual checks (original set)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def validate_image_tags(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Warn on :latest or untagged images."""
|
||
for name, svc in _iter_services(compose):
|
||
image = svc.get("image", "")
|
||
if not image:
|
||
continue
|
||
img_name, tag = _image_name_and_tag(image)
|
||
if not tag:
|
||
result.warn(f"[{name}] Image '{img_name}' has no tag — pin to a specific version.")
|
||
elif tag == "latest":
|
||
result.warn(f"[{name}] Image '{img_name}:latest' — never use :latest in production.")
|
||
|
||
|
||
def validate_restart_policy(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Check that all services have a restart policy."""
|
||
for name, svc in _iter_services(compose):
|
||
restart = svc.get("restart")
|
||
if not restart:
|
||
result.warn(f"[{name}] No restart policy — add 'restart: unless-stopped'.")
|
||
|
||
|
||
def validate_healthchecks(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Check that all services define or inherit a healthcheck."""
|
||
for name, svc in _iter_services(compose):
|
||
hc = svc.get("healthcheck")
|
||
if hc is None:
|
||
result.info(f"[{name}] No healthcheck defined — add one if the image supports it.")
|
||
elif isinstance(hc, dict) and hc.get("disable"):
|
||
result.info(f"[{name}] Healthcheck explicitly disabled.")
|
||
|
||
|
||
def validate_no_hardcoded_secrets(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Detect hardcoded secrets in environment and labels."""
|
||
for name, svc in _iter_services(compose):
|
||
env = svc.get("environment") or {}
|
||
items: list[tuple[str, str]] = []
|
||
if isinstance(env, dict):
|
||
items = list(env.items())
|
||
elif isinstance(env, list):
|
||
for entry in env:
|
||
if "=" in str(entry):
|
||
k, v = str(entry).split("=", 1)
|
||
items.append((k, v))
|
||
|
||
for key, value in items:
|
||
if not value or str(value).startswith("${"):
|
||
continue
|
||
for pat in _SECRET_PATTERNS:
|
||
if pat.search(key):
|
||
result.error(
|
||
f"[{name}] Possible hardcoded secret in env var '{key}' — "
|
||
"use ${VAR_NAME} references and store values in .env."
|
||
)
|
||
break
|
||
|
||
|
||
def validate_resource_limits(compose: dict[str, Any], result: ValidationResult, strict: bool) -> None:
|
||
"""In strict mode, require resource limits on all services."""
|
||
if not strict:
|
||
return
|
||
for name, svc in _iter_services(compose):
|
||
deploy = svc.get("deploy") or {}
|
||
resources = deploy.get("resources") or {}
|
||
limits = resources.get("limits") or {}
|
||
mem = limits.get("memory") or svc.get("mem_limit")
|
||
cpus = limits.get("cpus") or svc.get("cpus")
|
||
if not mem:
|
||
result.error(f"[{name}] No memory limit set (strict mode) — add deploy.resources.limits.memory.")
|
||
if not cpus:
|
||
result.warn(f"[{name}] No CPU limit set — consider adding deploy.resources.limits.cpus.")
|
||
|
||
|
||
def validate_logging(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Warn when no logging config is specified."""
|
||
for name, svc in _iter_services(compose):
|
||
if not svc.get("logging"):
|
||
result.info(
|
||
f"[{name}] No logging config — consider adding logging.driver and options "
|
||
"(e.g. json-file with max-size/max-file)."
|
||
)
|
||
|
||
|
||
def validate_privileged_mode(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Warn on privileged containers."""
|
||
for name, svc in _iter_services(compose):
|
||
if svc.get("privileged"):
|
||
result.warn(f"[{name}] Running in privileged mode — grant only if strictly required.")
|
||
|
||
|
||
def validate_host_network(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Warn on host network mode."""
|
||
for name, svc in _iter_services(compose):
|
||
network_mode = svc.get("network_mode", "")
|
||
if network_mode == "host":
|
||
result.warn(f"[{name}] Using host network mode — isolate with a bridge network if possible.")
|
||
|
||
|
||
def validate_sensitive_volumes(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Warn on sensitive host paths mounted into containers."""
|
||
sensitive_paths = ["/etc", "/var/run/docker.sock", "/proc", "/sys", "/root", "/home"]
|
||
for name, svc in _iter_services(compose):
|
||
volumes = svc.get("volumes") or []
|
||
for vol in volumes:
|
||
if isinstance(vol, str):
|
||
host_part = vol.split(":")[0]
|
||
elif isinstance(vol, dict):
|
||
host_part = str(vol.get("source", ""))
|
||
else:
|
||
continue
|
||
for sensitive in sensitive_paths:
|
||
if host_part == sensitive or host_part.startswith(sensitive + "/"):
|
||
result.warn(
|
||
f"[{name}] Sensitive host path mounted: '{host_part}' — "
|
||
"verify this is intentional."
|
||
)
|
||
|
||
|
||
def validate_traefik_network_consistency(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Ensure services with Traefik labels are joined to the Traefik network."""
|
||
traefik_network_names: set[str] = set()
|
||
# Heuristic: networks named 'traefik*' or 'proxy*' are Traefik-facing
|
||
for net_name in (compose.get("networks") or {}).keys():
|
||
if "traefik" in net_name.lower() or "proxy" in net_name.lower():
|
||
traefik_network_names.add(net_name)
|
||
|
||
for name, svc in _iter_services(compose):
|
||
labels = svc.get("labels") or {}
|
||
label_items: list[str] = []
|
||
if isinstance(labels, dict):
|
||
label_items = list(labels.keys())
|
||
elif isinstance(labels, list):
|
||
label_items = [str(l).split("=")[0] for l in labels]
|
||
|
||
has_traefik_label = any("traefik" in lbl.lower() for lbl in label_items)
|
||
if not has_traefik_label:
|
||
continue
|
||
|
||
svc_networks = set()
|
||
svc_net_section = svc.get("networks") or {}
|
||
if isinstance(svc_net_section, list):
|
||
svc_networks = set(svc_net_section)
|
||
elif isinstance(svc_net_section, dict):
|
||
svc_networks = set(svc_net_section.keys())
|
||
|
||
if traefik_network_names and not svc_networks.intersection(traefik_network_names):
|
||
result.warn(
|
||
f"[{name}] Has Traefik labels but is not on a Traefik-facing network "
|
||
f"({', '.join(traefik_network_names)})."
|
||
)
|
||
|
||
|
||
def validate_traefik_router_uniqueness(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Error on duplicate Traefik router names across services."""
|
||
seen_routers: dict[str, str] = {}
|
||
router_pattern = re.compile(r"traefik\.http\.routers\.([^.]+)\.", re.I)
|
||
|
||
for name, svc in _iter_services(compose):
|
||
labels = svc.get("labels") or {}
|
||
label_keys: list[str] = []
|
||
if isinstance(labels, dict):
|
||
label_keys = list(labels.keys())
|
||
elif isinstance(labels, list):
|
||
label_keys = [str(l).split("=")[0] for l in labels]
|
||
|
||
for key in label_keys:
|
||
m = router_pattern.match(key)
|
||
if m:
|
||
router_name = m.group(1).lower()
|
||
if router_name in seen_routers:
|
||
result.error(
|
||
f"[{name}] Duplicate Traefik router name '{router_name}' "
|
||
f"(also used in service '{seen_routers[router_name]}')."
|
||
)
|
||
else:
|
||
seen_routers[router_name] = name
|
||
|
||
|
||
def validate_container_name_uniqueness(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Error on duplicate container_name values."""
|
||
seen: dict[str, str] = {}
|
||
for name, svc in _iter_services(compose):
|
||
container_name = svc.get("container_name")
|
||
if not container_name:
|
||
continue
|
||
if container_name in seen:
|
||
result.error(
|
||
f"[{name}] Duplicate container_name '{container_name}' "
|
||
f"(also used by service '{seen[container_name]}')."
|
||
)
|
||
else:
|
||
seen[container_name] = name
|
||
|
||
|
||
def validate_depends_on(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Check that depends_on references valid service names."""
|
||
service_names = set((compose.get("services") or {}).keys())
|
||
for name, svc in _iter_services(compose):
|
||
deps = _get_depends_on_names(svc.get("depends_on") or [])
|
||
for dep in deps:
|
||
if dep not in service_names:
|
||
result.error(
|
||
f"[{name}] depends_on references unknown service '{dep}'."
|
||
)
|
||
|
||
|
||
def validate_networks(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Check that service networks are declared at the top level."""
|
||
declared = set((compose.get("networks") or {}).keys())
|
||
for name, svc in _iter_services(compose):
|
||
svc_nets = svc.get("networks") or {}
|
||
if isinstance(svc_nets, list):
|
||
used = set(svc_nets)
|
||
elif isinstance(svc_nets, dict):
|
||
used = set(svc_nets.keys())
|
||
else:
|
||
used = set()
|
||
for net in used:
|
||
if net not in declared:
|
||
result.error(
|
||
f"[{name}] Uses network '{net}' which is not declared in the "
|
||
"top-level 'networks' section."
|
||
)
|
||
|
||
|
||
def validate_volumes(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Check for undefined named volumes and orphaned top-level volume declarations."""
|
||
declared_volumes = set((compose.get("volumes") or {}).keys())
|
||
used_volumes: set[str] = set()
|
||
|
||
for name, svc in _iter_services(compose):
|
||
for vol in (svc.get("volumes") or []):
|
||
if isinstance(vol, str):
|
||
parts = vol.split(":")
|
||
ref = parts[0]
|
||
elif isinstance(vol, dict):
|
||
ref = str(vol.get("source", ""))
|
||
else:
|
||
continue
|
||
# Named volumes don't start with . / ~ or a drive letter pattern
|
||
if ref and not re.match(r"^[./~]|^[A-Za-z]:[/\\]", ref):
|
||
used_volumes.add(ref)
|
||
if declared_volumes and ref not in declared_volumes:
|
||
result.error(
|
||
f"[{name}] Uses named volume '{ref}' which is not declared "
|
||
"in the top-level 'volumes' section."
|
||
)
|
||
|
||
for vol in declared_volumes:
|
||
if vol not in used_volumes:
|
||
result.warn(f"Top-level volume '{vol}' is declared but never used by any service.")
|
||
|
||
|
||
def validate_port_conflicts(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Error on duplicate host port bindings."""
|
||
seen_ports: dict[int, str] = {}
|
||
for name, svc in _iter_services(compose):
|
||
for port_spec in (svc.get("ports") or []):
|
||
host_port = _parse_host_port(port_spec)
|
||
if host_port is None:
|
||
continue
|
||
if host_port in seen_ports:
|
||
result.error(
|
||
f"[{name}] Host port {host_port} conflicts with service "
|
||
f"'{seen_ports[host_port]}'."
|
||
)
|
||
else:
|
||
seen_ports[host_port] = name
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# NEW checks
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def validate_circular_dependencies(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Detect circular dependencies in the depends_on graph using DFS."""
|
||
services = compose.get("services") or {}
|
||
|
||
# Build adjacency list: service_name -> list of dependencies
|
||
graph: dict[str, list[str]] = {}
|
||
for name, svc in services.items():
|
||
graph[name] = _get_depends_on_names((svc or {}).get("depends_on") or [])
|
||
|
||
visited: set[str] = set()
|
||
in_stack: set[str] = set()
|
||
|
||
def dfs(node: str, path: list[str]) -> bool:
|
||
"""Return True if a cycle is detected."""
|
||
visited.add(node)
|
||
in_stack.add(node)
|
||
for neighbour in graph.get(node, []):
|
||
if neighbour not in graph:
|
||
# Unknown dependency — already caught by validate_depends_on
|
||
continue
|
||
if neighbour not in visited:
|
||
if dfs(neighbour, path + [neighbour]):
|
||
return True
|
||
elif neighbour in in_stack:
|
||
cycle_path = " → ".join(path + [neighbour])
|
||
result.error(
|
||
f"Circular dependency detected: {cycle_path}"
|
||
)
|
||
return True
|
||
in_stack.discard(node)
|
||
return False
|
||
|
||
for service_name in graph:
|
||
if service_name not in visited:
|
||
dfs(service_name, [service_name])
|
||
|
||
|
||
def validate_port_range(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Warn if host ports are outside the preferred 50000-60000 range."""
|
||
for name, svc in _iter_services(compose):
|
||
for port_spec in (svc.get("ports") or []):
|
||
host_port = _parse_host_port(port_spec)
|
||
if host_port is None:
|
||
continue
|
||
if not (_PREFERRED_PORT_MIN <= host_port <= _PREFERRED_PORT_MAX):
|
||
result.warn(
|
||
f"[{name}] Host port {host_port} is outside the preferred range "
|
||
f"{_PREFERRED_PORT_MIN}-{_PREFERRED_PORT_MAX}."
|
||
)
|
||
|
||
|
||
def validate_network_isolation(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Warn if database/cache services are exposed on external networks."""
|
||
top_level_networks = compose.get("networks") or {}
|
||
|
||
for name, svc in _iter_services(compose):
|
||
image = svc.get("image", "")
|
||
if not image or not _is_db_cache_image(image):
|
||
continue
|
||
|
||
svc_nets = svc.get("networks") or {}
|
||
if isinstance(svc_nets, list):
|
||
net_names = svc_nets
|
||
elif isinstance(svc_nets, dict):
|
||
net_names = list(svc_nets.keys())
|
||
else:
|
||
net_names = []
|
||
|
||
for net_name in net_names:
|
||
net_config = top_level_networks.get(net_name) or {}
|
||
# A network is considered "external" if it has external: true
|
||
# or if it is named in a way that suggests it is the proxy/public network.
|
||
is_external = net_config.get("external", False)
|
||
is_proxy_net = any(
|
||
kw in net_name.lower() for kw in ("traefik", "proxy", "public", "frontend")
|
||
)
|
||
if is_external or is_proxy_net:
|
||
result.warn(
|
||
f"[{name}] Database/cache service is connected to external or proxy "
|
||
f"network '{net_name}' — use an internal network for isolation."
|
||
)
|
||
|
||
|
||
def validate_version_tags(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""Check image version tag quality beyond just the :latest check."""
|
||
semver_full = re.compile(r"^\d+\.\d+\.\d+") # major.minor.patch
|
||
semver_minor = re.compile(r"^\d+\.\d+$") # major.minor only
|
||
semver_major = re.compile(r"^\d+$") # major only
|
||
|
||
for name, svc in _iter_services(compose):
|
||
image = svc.get("image", "")
|
||
if not image:
|
||
continue
|
||
img_name, tag = _image_name_and_tag(image)
|
||
|
||
if not tag:
|
||
# Already caught by validate_image_tags — skip to avoid duplicate noise
|
||
continue
|
||
if tag == "latest":
|
||
# Also caught above — error level comes from validate_image_tags
|
||
result.error(
|
||
f"[{name}] Image '{img_name}:latest' — :latest is forbidden in production."
|
||
)
|
||
continue
|
||
|
||
if semver_full.match(tag):
|
||
# Fully pinned — great
|
||
pass
|
||
elif semver_minor.match(tag):
|
||
result.warn(
|
||
f"[{name}] Image '{img_name}:{tag}' uses major.minor only — "
|
||
"pin to a full major.minor.patch tag for reproducible builds."
|
||
)
|
||
elif semver_major.match(tag):
|
||
result.warn(
|
||
f"[{name}] Image '{img_name}:{tag}' uses major version only — "
|
||
"pin to at least major.minor.patch."
|
||
)
|
||
else:
|
||
# Non-semver tags (sha digests, named releases, etc.) — accept as info
|
||
result.info(
|
||
f"[{name}] Image '{img_name}:{tag}' uses a non-semver tag — "
|
||
"verify this is a pinned, stable release."
|
||
)
|
||
|
||
|
||
def validate_env_references(compose: dict[str, Any], result: ValidationResult) -> None:
|
||
"""
|
||
Check that ${VAR} references in service configs have matching definitions.
|
||
|
||
Scans all string values in each service's config for ${VAR} patterns, then
|
||
checks whether those variables appear in the service's `environment` block
|
||
or are referenced via `env_file`. Cannot validate the contents of .env files
|
||
— only structural consistency within the compose file itself is checked.
|
||
"""
|
||
for name, svc in _iter_services(compose):
|
||
# Collect all ${VAR} references from the service's values
|
||
all_values: list[str] = []
|
||
_collect_all_string_values(svc, all_values)
|
||
referenced_vars: set[str] = set()
|
||
for val in all_values:
|
||
for match in _ENV_VAR_REF_PATTERN.finditer(val):
|
||
referenced_vars.add(match.group(1))
|
||
|
||
if not referenced_vars:
|
||
continue
|
||
|
||
# Collect defined variable names from the environment block
|
||
env_section = svc.get("environment") or {}
|
||
defined_vars: set[str] = set()
|
||
if isinstance(env_section, dict):
|
||
defined_vars = set(env_section.keys())
|
||
elif isinstance(env_section, list):
|
||
for entry in env_section:
|
||
key = str(entry).split("=")[0]
|
||
defined_vars.add(key)
|
||
|
||
has_env_file = bool(svc.get("env_file"))
|
||
|
||
for var in sorted(referenced_vars):
|
||
if var in defined_vars:
|
||
continue # Explicitly defined — fine
|
||
if has_env_file:
|
||
# Likely in the .env file — we can't verify, so just note it
|
||
result.info(
|
||
f"[{name}] ${{{var}}} is referenced but not defined inline — "
|
||
"ensure it is present in the env_file."
|
||
)
|
||
else:
|
||
result.warn(
|
||
f"[{name}] ${{{var}}} is referenced but has no inline definition "
|
||
"and no env_file is configured — ensure it is in your .env file."
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Orchestrator
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def run_all_checks(compose: dict[str, Any], strict: bool) -> ValidationResult:
|
||
"""Run every registered check and return the aggregated result."""
|
||
result = ValidationResult()
|
||
|
||
# Original checks
|
||
validate_image_tags(compose, result)
|
||
validate_restart_policy(compose, result)
|
||
validate_healthchecks(compose, result)
|
||
validate_no_hardcoded_secrets(compose, result)
|
||
validate_resource_limits(compose, result, strict)
|
||
validate_logging(compose, result)
|
||
validate_privileged_mode(compose, result)
|
||
validate_host_network(compose, result)
|
||
validate_sensitive_volumes(compose, result)
|
||
validate_traefik_network_consistency(compose, result)
|
||
validate_traefik_router_uniqueness(compose, result)
|
||
validate_container_name_uniqueness(compose, result)
|
||
validate_depends_on(compose, result)
|
||
validate_networks(compose, result)
|
||
validate_volumes(compose, result)
|
||
validate_port_conflicts(compose, result)
|
||
|
||
# New checks
|
||
validate_circular_dependencies(compose, result)
|
||
validate_port_range(compose, result)
|
||
validate_network_isolation(compose, result)
|
||
validate_version_tags(compose, result)
|
||
validate_env_references(compose, result)
|
||
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Entry point
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(
|
||
description="Validate a Docker Compose file for production readiness.",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog=__doc__,
|
||
)
|
||
parser.add_argument("compose_file", help="Path to the docker-compose.yaml file.")
|
||
parser.add_argument(
|
||
"--strict",
|
||
action="store_true",
|
||
help="Enable strict mode: resource limits become errors, not warnings.",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
compose_path = Path(args.compose_file)
|
||
if not compose_path.exists():
|
||
print(f"❌ File not found: {compose_path}")
|
||
sys.exit(1)
|
||
|
||
try:
|
||
with compose_path.open(encoding="utf-8") as fh:
|
||
compose = yaml.safe_load(fh)
|
||
except yaml.YAMLError as exc:
|
||
print(f"❌ YAML parse error: {exc}")
|
||
sys.exit(1)
|
||
|
||
if not isinstance(compose, dict):
|
||
print("❌ Compose file did not parse to a mapping — is it a valid YAML file?")
|
||
sys.exit(1)
|
||
|
||
mode_label = " [STRICT]" if args.strict else ""
|
||
print(f"🐳 Validating: {compose_path}{mode_label}")
|
||
print("-" * 60)
|
||
|
||
result = run_all_checks(compose, strict=args.strict)
|
||
result.print_report()
|
||
|
||
sys.exit(0 if result.passed else 1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|