Skip to content

Commit 588e451

Browse files
committed
feat: added job.db.fill_jobs_history_summary method
1 parent c148a9b commit 588e451

File tree

2 files changed

+61
-1
lines changed

2 files changed

+61
-1
lines changed

diracx-db/src/diracx/db/sql/job/db.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
from datetime import datetime, timezone
66
from typing import TYPE_CHECKING, Any, Iterable
77

8-
from sqlalchemy import bindparam, case, delete, func, insert, select, update
8+
from sqlalchemy import bindparam, case, delete, func, insert, select, text, update
99

1010
if TYPE_CHECKING:
1111
from sqlalchemy.sql.elements import BindParameter
1212

13+
from DIRAC.WorkloadManagementSystem.Client import JobStatus
14+
1315
from diracx.core.exceptions import InvalidQueryError
1416
from diracx.core.models import JobCommand, SearchSpec, SortSpec
1517

@@ -333,3 +335,60 @@ async def get_job_commands(self, job_ids: Iterable[int]) -> list[JobCommand]:
333335
JobCommand(job_id=cmd.JobID, command=cmd.Command, arguments=cmd.Arguments)
334336
for cmd in commands
335337
]
338+
339+
async def fill_jobs_history_summary(self):
340+
"""Fill the JobsHistorySummary table with the summary of the jobs in a final state."""
341+
# Create the staging table
342+
dialect = self.conn.dialect
343+
if dialect == "mysql":
344+
create_staging_table_sql = "CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging LIKE JobsHistorySummary"
345+
elif dialect == "postgresql":
346+
create_staging_table_sql = (
347+
"CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging "
348+
"(LIKE JobsHistorySummary INCLUDING ALL)"
349+
)
350+
elif dialect == "sqlite":
351+
create_staging_table_sql = "CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging AS JobsHistorySummary"
352+
await self.conn.execute(text(create_staging_table_sql))
353+
354+
if dialect == "mysql":
355+
current_date_expr = "UTC_DATE()"
356+
elif dialect == "postgresql":
357+
current_date_expr = "CURRENT_DATE"
358+
elif dialect == "sqlite":
359+
current_date_expr = "DATE('now')"
360+
else:
361+
raise ValueError(f"Unsupported DB dialect: {dialect}")
362+
363+
# Columns for grouping
364+
def_columns = "Status, Site, Owner, OwnerGroup, JobGroup, JobType, ApplicationStatus, MinorStatus"
365+
agg_columns = "COUNT(JobID), SUM(RescheduleCounter)"
366+
367+
# Final states list
368+
final_states = JobStatus.JOB_FINAL_STATES + JobStatus.JOB_REALLY_FINAL_STATES
369+
final_states_sql = ", ".join(f"'{state}'" for state in final_states)
370+
371+
# Build SQL statement
372+
insert_sql = f"""
373+
INSERT INTO JobsHistorySummary_staging
374+
SELECT {def_columns}, {agg_columns}
375+
FROM Jobs
376+
WHERE Status IN ({final_states_sql})
377+
AND LastUpdateTime < {current_date_expr}
378+
GROUP BY {def_columns}
379+
""" # noqa: S608
380+
await self.conn.execute(text(insert_sql))
381+
382+
stmts = []
383+
384+
if dialect in {"mysql", "sqlite", "postgresql"}:
385+
stmts = [
386+
"ALTER TABLE JobsHistorySummary RENAME TO JobsHistorySummary_old;",
387+
"ALTER TABLE JobsHistorySummary_staging RENAME TO JobsHistorySummary;",
388+
"DROP TABLE JobsHistorySummary_old;",
389+
]
390+
else:
391+
raise ValueError(f"Unsupported DB dialect: {dialect}")
392+
393+
for stmt in stmts:
394+
await self.conn.execute(text(stmt))

docs/CODING_CONVENTION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def my_ficture(): ...
5757

5858
```python
5959
from datetime import datetime, timedelta, timezone
60+
6061
delay = datetime.now(tz=timezone.utc) + timedelta(hours=1)
6162
```
6263

0 commit comments

Comments
 (0)