Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ docs/source/_build
# pixi environments
.pixi
pixi.lock

# run_local.sh state
.run-local-env
*.egg-info
docs/templates/_builtin_markdown.jinja

Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ repos:
- types-cachetools
- types-requests
- types-python-dateutil
- types-croniter
- types-aiobotocore[essential]
- boto3-stubs[essential]
exclude: ^(diracx-client/src/diracx/client/_generated|diracx-[a-z]+/tests/|diracx-testing/|build|extensions/gubbins/gubbins-client/src/gubbins/client/_generated)
Expand Down
1 change: 1 addition & 0 deletions diracx-core/src/diracx/core/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class DiracEntryPoint(StrEnum):
MIN_CLIENT_VERSION = "diracx.min_client_version"
RESOURCES = "diracx.resources"
SERVICES = "diracx.services"
LOCK_OBJECT_TYPES = "diracx.lock_object_types"


@cached(cache=LRUCache(maxsize=1))
Expand Down
55 changes: 46 additions & 9 deletions diracx-db/src/diracx/db/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,28 @@

def parse_args():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True)
subparsers = parser.add_subparsers(required=True, dest="command")

init_sql_parser = subparsers.add_parser(
"init-sql", help="Initialise schema for SQL databases"
)
init_sql_parser.set_defaults(func=init_sql)
subparsers.add_parser("init-sql", help="Initialise schema for SQL databases")
subparsers.add_parser("init-os", help="Initialise schema for OpenSearch databases")

init_os_parser = subparsers.add_parser(
"init-os", help="Initialise schema for OpenSearch databases"
local_urls_parser = subparsers.add_parser(
"generate-local-urls",
help="Print shell exports for all registered DB URLs using sqlite",
)
local_urls_parser.add_argument(
"tmp_dir", help="Temporary directory for database files"
)
init_os_parser.set_defaults(func=init_os)

args = parser.parse_args()
logger.setLevel(logging.INFO)
asyncio.run(args.func())

if args.command == "init-sql":
asyncio.run(init_sql())
elif args.command == "init-os":
asyncio.run(init_os())
elif args.command == "generate-local-urls":
generate_local_urls(args.tmp_dir)


async def init_sql():
Expand Down Expand Up @@ -53,5 +60,35 @@ async def init_os():
await db.create_index_template()


def generate_local_urls(tmp_dir: str) -> None:
"""Print shell export statements for all registered DB URLs using sqlite.

Intended for use with eval in shell scripts::

eval "$(python -m diracx.db generate-local-urls /tmp/dir)"
"""
import json

from diracx.core.extensions import DiracEntryPoint, select_from_extension

seen: set[str] = set()
for ep in select_from_extension(group=DiracEntryPoint.SQL_DB):
if ep.name in seen:
continue
seen.add(ep.name)
url = f"sqlite+aiosqlite:///{tmp_dir}/{ep.name.lower()}.db"
print(f'export DIRACX_DB_URL_{ep.name.upper()}="{url}"')

seen.clear()
for ep in select_from_extension(group=DiracEntryPoint.OS_DB):
if ep.name in seen:
continue
seen.add(ep.name)
v = json.dumps(
{"sqlalchemy_dsn": f"sqlite+aiosqlite:///{tmp_dir}/{ep.name.lower()}.db"}
)
print(f"export DIRACX_OS_DB_{ep.name.upper()}='{v}'")


if __name__ == "__main__":
parse_args()
1 change: 1 addition & 0 deletions diracx-routers/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies = [
"diracx-core",
"diracx-logic",
"diracx-db",
"diracx-tasks",
"python-dotenv", # TODO: We might not need this
"python-multipart",
"fastapi>=0.121.0",
Expand Down
75 changes: 5 additions & 70 deletions diracx-routers/src/diracx/routers/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,7 @@
from __future__ import annotations

__all__ = (
"Config",
"AuthDB",
"JobDB",
"JobLoggingDB",
"SandboxMetadataDB",
"TaskQueueDB",
"PilotAgentsDB",
"add_settings_annotation",
"AvailableSecurityProperties",
)

from functools import partial
from typing import Annotated, TypeVar

from fastapi import Depends

from diracx.core.config import Config as _Config
from diracx.core.config import ConfigSource
from diracx.core.properties import SecurityProperty
from diracx.core.settings import AuthSettings as _AuthSettings
from diracx.core.settings import DevelopmentSettings as _DevelopmentSettings
from diracx.core.settings import SandboxStoreSettings as _SandboxStoreSettings
from diracx.db.os import JobParametersDB as _JobParametersDB
from diracx.db.sql import AuthDB as _AuthDB
from diracx.db.sql import JobDB as _JobDB
from diracx.db.sql import JobLoggingDB as _JobLoggingDB
from diracx.db.sql import PilotAgentsDB as _PilotAgentsDB
from diracx.db.sql import SandboxMetadataDB as _SandboxMetadataDB
from diracx.db.sql import TaskQueueDB as _TaskQueueDB

T = TypeVar("T")

# Use scope="function" to ensure DB commits happen before sending HTTP responses
# This prevents race conditions when DIRAC immediately queries data after DiracX writes it
DBDepends = partial(Depends, scope="function")
"""Router DI types — re-exported from the canonical definitions in diracx-tasks."""

from __future__ import annotations

def add_settings_annotation(cls: T) -> T:
"""Add a `Depends` annotation to a class that has a `create` classmethod."""
return Annotated[cls, Depends(cls.create)] # type: ignore


# Databases
AuthDB = Annotated[_AuthDB, DBDepends(_AuthDB.transaction)]
JobDB = Annotated[_JobDB, DBDepends(_JobDB.transaction)]
JobLoggingDB = Annotated[_JobLoggingDB, DBDepends(_JobLoggingDB.transaction)]
PilotAgentsDB = Annotated[_PilotAgentsDB, DBDepends(_PilotAgentsDB.transaction)]
SandboxMetadataDB = Annotated[
_SandboxMetadataDB, DBDepends(_SandboxMetadataDB.transaction)
]
TaskQueueDB = Annotated[_TaskQueueDB, DBDepends(_TaskQueueDB.transaction)]

# Opensearch databases
JobParametersDB = Annotated[_JobParametersDB, DBDepends(_JobParametersDB.session)]


# Miscellaneous
Config = Annotated[_Config, Depends(ConfigSource.create)]
AvailableSecurityProperties = Annotated[
set[SecurityProperty], Depends(SecurityProperty.available_properties)
]

AuthSettings = Annotated[_AuthSettings, Depends(_AuthSettings.create)]
DevelopmentSettings = Annotated[
_DevelopmentSettings, Depends(_DevelopmentSettings.create)
]
SandboxStoreSettings = Annotated[
_SandboxStoreSettings, Depends(_SandboxStoreSettings.create)
]
# Re-export everything from the canonical location
from diracx.tasks.plumbing.depends import * # noqa: F401, F403
from diracx.tasks.plumbing.depends import __all__ # noqa: F401
3 changes: 3 additions & 0 deletions diracx-tasks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# diracx-tasks

Asynchronous task system for DiracX.
62 changes: 62 additions & 0 deletions diracx-tasks/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
[project]
name = "diracx-tasks"
description = "Asynchronous task system for DiracX"
readme = "README.md"
requires-python = ">=3.11"
keywords = []
license = {text = "GPL-3.0-only"}
classifiers = [
"Intended Audience :: Science/Research",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
"Topic :: System :: Distributed Computing",
]
dependencies = [
"diracx-core",
"diracx-db",
"diracx-logic",
"fastapi>=0.121.0",
"redis[hiredis]",
"msgpack",
"opentelemetry-api",
"python-dateutil",
"croniter",
"pydantic >=2.10",
]
dynamic = ["version"]

[project.optional-dependencies]
testing = ["diracx-testing", "fakeredis"]

[project.scripts]
diracx-task-run = "diracx.tasks.task_run:main"

[project.entry-points."diracx.dbs.sql"]
TaskDB = "diracx.tasks.plumbing.persistence:TaskDB"

[build-system]
requires = ["hatchling", "hatch-vcs"]
build-backend = "hatchling.build"

[tool.hatch.version]
source = "vcs"

[tool.hatch.version.raw-options]
root = ".."

[tool.hatch.build.targets.sdist.force-include]
"../LICENSE" = "LICENSE"

[tool.hatch.build.targets.wheel]
packages = ["src/diracx"]

[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = [
"-v",
"--cov=diracx.tasks", "--cov-report=term-missing",
"-pdiracx.testing",
"--import-mode=importlib",
]
asyncio_mode = "auto"
1 change: 1 addition & 0 deletions diracx-tasks/src/diracx/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from __future__ import annotations
1 change: 1 addition & 0 deletions diracx-tasks/src/diracx/tasks/plumbing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from __future__ import annotations
21 changes: 21 additions & 0 deletions diracx-tasks/src/diracx/tasks/plumbing/_redis_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Role-specific Redis type aliases.

Zero runtime cost — purely for readability and grep-ability.
Each alias documents *why* a function needs a Redis connection:

- ``LockCoordinator`` — acquiring / releasing / extending locks
- ``MessageTransport`` — enqueuing, reading, or promoting task messages
- ``ResultCache`` — storing / retrieving task results
- ``CallbackRegistry`` — tracking callback groups and firing callbacks
"""

from __future__ import annotations

from typing import TypeAlias

from redis.asyncio import Redis

LockCoordinator: TypeAlias = Redis
MessageTransport: TypeAlias = Redis
ResultCache: TypeAlias = Redis
CallbackRegistry: TypeAlias = Redis
Loading
Loading