Skip to content
8 changes: 8 additions & 0 deletions api/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

All notable changes to the **Prowler API** are documented in this file.

## [1.23.0] (Prowler UNRELEASED)

### 🔄 Changed

- Attack Paths: Periodic cleanup of stale scans with dead-worker detection via Celery inspect, marking orphaned `EXECUTING` scans as `FAILED` and recovering `graph_data_ready` [(#10387)](https://github.com/prowler-cloud/prowler/pull/10387)

---

## [1.22.0] (Prowler v5.21.0)

### 🚀 Added
Expand Down
21 changes: 20 additions & 1 deletion api/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,28 @@ start_prod_server() {
poetry run gunicorn -c config/guniconf.py config.wsgi:application
}

resolve_worker_hostname() {
TASK_ID=""

if [ -n "$ECS_CONTAINER_METADATA_URI_V4" ]; then
TASK_ID=$(wget -qO- --timeout=2 "${ECS_CONTAINER_METADATA_URI_V4}/task" | \
python3 -c "import sys,json; print(json.load(sys.stdin)['TaskARN'].split('/')[-1])" 2>/dev/null)
fi

if [ -z "$TASK_ID" ]; then
TASK_ID=$(python3 -c "import uuid; print(uuid.uuid4().hex)")
fi

echo "${TASK_ID}@$(hostname)"
}

start_worker() {
echo "Starting the worker..."
poetry run python -m celery -A config.celery worker -l "${DJANGO_LOGGING_LEVEL:-info}" -Q celery,scans,scan-reports,deletion,backfill,overview,integrations,compliance,attack-paths-scans -E --max-tasks-per-child 1
poetry run python -m celery -A config.celery worker \
-n "$(resolve_worker_hostname)" \
-l "${DJANGO_LOGGING_LEVEL:-info}" \
-Q celery,scans,scan-reports,deletion,backfill,overview,integrations,compliance,attack-paths-scans \
-E --max-tasks-per-child 1
}

start_worker_beat() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from django.db import migrations


TASK_NAME = "attack-paths-cleanup-stale-scans"
INTERVAL_HOURS = 1


def create_periodic_task(apps, schema_editor):
IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule")
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")

schedule, _ = IntervalSchedule.objects.get_or_create(
every=INTERVAL_HOURS,
period="hours",
)

PeriodicTask.objects.update_or_create(
name=TASK_NAME,
defaults={
"task": TASK_NAME,
"interval": schedule,
"enabled": True,
},
)


def delete_periodic_task(apps, schema_editor):
IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule")
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")

PeriodicTask.objects.filter(name=TASK_NAME).delete()

# Clean up the schedule if no other task references it
IntervalSchedule.objects.filter(
every=INTERVAL_HOURS,
period="hours",
periodictask__isnull=True,
).delete()


class Migration(migrations.Migration):
dependencies = [
("api", "0084_googleworkspace_provider"),
("django_celery_beat", "0019_alter_periodictasks_options"),
]

operations = [
migrations.RunPython(create_periodic_task, delete_periodic_task),
]
5 changes: 5 additions & 0 deletions api/src/backend/config/django/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,8 @@
# SAML requirement
CSRF_COOKIE_SECURE = True
SESSION_COOKIE_SECURE = True

# Attack Paths
ATTACK_PATHS_SCAN_STALE_THRESHOLD_MINUTES = env.int(
"ATTACK_PATHS_SCAN_STALE_THRESHOLD_MINUTES", 2880
) # 48h
156 changes: 156 additions & 0 deletions api/src/backend/tasks/jobs/attack_paths/cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from datetime import datetime, timedelta, timezone

from celery import current_app, states
from celery.utils.log import get_task_logger
from config.django.base import ATTACK_PATHS_SCAN_STALE_THRESHOLD_MINUTES
from tasks.jobs.attack_paths.db_utils import (
finish_attack_paths_scan,
recover_graph_data_ready,
)

from api.attack_paths import database as graph_database
from api.db_router import MainRouter
from api.db_utils import rls_transaction
from api.models import AttackPathsScan, StateChoices

logger = get_task_logger(__name__)


def cleanup_stale_attack_paths_scans() -> dict:
"""
Find `EXECUTING` `AttackPathsScan` scans whose workers are dead or that have
exceeded the stale threshold, and mark them as `FAILED`.

Two-pass detection:
1. If `TaskResult.worker` exists, ping the worker.
- Dead worker: cleanup immediately (any age).
- Alive + past threshold: revoke the task, then cleanup.
- Alive + within threshold: skip.
2. If no worker field: fall back to time-based heuristic only.
"""
threshold = timedelta(minutes=ATTACK_PATHS_SCAN_STALE_THRESHOLD_MINUTES)
now = datetime.now(tz=timezone.utc)
cutoff = now - threshold

executing_scans = (
AttackPathsScan.all_objects.using(MainRouter.admin_db)
.filter(state=StateChoices.EXECUTING)
.select_related("task__task_runner_task")
)

# Cache worker liveness so each worker is pinged at most once
executing_scans = list(executing_scans)
workers = {
tr.worker
for scan in executing_scans
if (tr := getattr(scan.task, "task_runner_task", None) if scan.task else None)
and tr.worker
}
worker_alive = {w: _is_worker_alive(w) for w in workers}

cleaned_up = []

for scan in executing_scans:
task_result = (
getattr(scan.task, "task_runner_task", None) if scan.task else None
)
worker = task_result.worker if task_result else None

if worker:
alive = worker_alive.get(worker, True)

if alive:
if scan.started_at and scan.started_at >= cutoff:
continue

# Alive but stale — revoke before cleanup
_revoke_task(task_result)
reason = (
"Scan exceeded stale threshold — " "cleaned up by periodic task"
)
else:
reason = "Worker dead — cleaned up by periodic task"
else:
# No worker recorded — time-based heuristic only
if scan.started_at and scan.started_at >= cutoff:
continue
reason = (
"No worker recorded, scan exceeded stale threshold — "
"cleaned up by periodic task"
)

if _cleanup_scan(scan, task_result, reason):
cleaned_up.append(str(scan.id))

logger.info(
f"Stale `AttackPathsScan` cleanup: {len(cleaned_up)} scan(s) cleaned up"
)
return {"cleaned_up_count": len(cleaned_up), "scan_ids": cleaned_up}


def _is_worker_alive(worker: str) -> bool:
"""Ping a specific Celery worker. Returns `True` if it responds or on error."""
try:
response = current_app.control.inspect(destination=[worker], timeout=1.0).ping()
return response is not None and worker in response
except Exception:
logger.exception(f"Failed to ping worker {worker}, treating as alive")
return True


def _revoke_task(task_result) -> None:
"""Send `SIGTERM` to a hung Celery task. Non-fatal on failure."""
try:
current_app.control.revoke(
task_result.task_id, terminate=True, signal="SIGTERM"
)
logger.info(f"Revoked task {task_result.task_id}")
except Exception:
logger.exception(f"Failed to revoke task {task_result.task_id}")


def _cleanup_scan(scan, task_result, reason: str) -> bool:
"""
Clean up a single stale `AttackPathsScan`:
drop temp DB, mark `FAILED`, update `TaskResult`, recover `graph_data_ready`.

Returns `True` if the scan was actually cleaned up, `False` if skipped.
"""
scan_id_str = str(scan.id)

# 1. Drop temp Neo4j database
tmp_db_name = graph_database.get_database_name(scan.id, temporary=True)
try:
graph_database.drop_database(tmp_db_name)
except Exception:
logger.exception(f"Failed to drop temp database {tmp_db_name}")

# 2. Re-fetch within RLS (race guard against normal completion)
with rls_transaction(str(scan.tenant_id)):
try:
fresh_scan = AttackPathsScan.objects.get(id=scan.id)
except AttackPathsScan.DoesNotExist:
logger.warning(f"Scan {scan_id_str} no longer exists, skipping")
return False

if fresh_scan.state != StateChoices.EXECUTING:
logger.info(f"Scan {scan_id_str} is now {fresh_scan.state}, skipping")
return False

# 3. Mark `AttackPathsScan` as `FAILED`
finish_attack_paths_scan(
fresh_scan,
StateChoices.FAILED,
{"global_error": reason},
)

# 4. Mark `TaskResult` as `FAILURE`
if task_result:
task_result.status = states.FAILURE
task_result.save(update_fields=["status", "date_done"])

# 5. Recover graph_data_ready if provider data still exists
recover_graph_data_ready(fresh_scan)

logger.info(f"Cleaned up stale scan {scan_id_str}: {reason}")
return True
6 changes: 6 additions & 0 deletions api/src/backend/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
can_provider_run_attack_paths_scan,
db_utils as attack_paths_db_utils,
)
from tasks.jobs.attack_paths.cleanup import cleanup_stale_attack_paths_scans
from tasks.jobs.backfill import (
backfill_compliance_summaries,
backfill_daily_severity_summaries,
Expand Down Expand Up @@ -406,6 +407,11 @@ def perform_attack_paths_scan_task(self, tenant_id: str, scan_id: str):
)


@shared_task(name="attack-paths-cleanup-stale-scans", queue="attack-paths-scans")
def cleanup_stale_attack_paths_scans_task():
return cleanup_stale_attack_paths_scans()


@shared_task(name="tenant-deletion", queue="deletion", autoretry_for=(Exception,))
def delete_tenant_task(tenant_id: str):
return delete_tenant(pk=tenant_id)
Expand Down
Loading
Loading