The DDS Agent is a core component of the DDS (Dynamic Distributed System) framework responsible for managing user tasks on worker nodes. It provides task execution, shared memory communication, and intercom services for distributed computing environments.
- Overview
- Architecture
- Shared Memory Transport
- User Task Handling
- Process Lifecycle
- Configuration
- Error Handling
The DDS Agent acts as a worker node daemon that:
- Connects to the DDS Commander for task coordination
- Manages multiple task execution slots
- Provides shared memory-based inter-process communication
- Handles task lifecycle (assignment, activation, monitoring, termination)
- Forwards messages between user tasks and the DDS network
- AgentConnectionManager: Main service coordinator and network management
- CommanderChannel: TCP connection to DDS Commander for control messages
- SMIntercomChannel: Shared memory channel for user task communication
- Slot Management: Isolated execution environments for user tasks
graph TB
subgraph "DDS Agent Process"
ACM[AgentConnectionManager]
CC[CommanderChannel]
SMC[SMIntercomChannel]
subgraph "Thread Pools"
MT[Main Threads]
IT[Intercom Threads]
end
subgraph "Slot Management"
S1[Slot 1]
S2[Slot 2]
SN[Slot N]
end
end
subgraph "External Components"
CMD[DDS Commander]
UT1[User Task 1]
UT2[User Task 2]
UTN[User Task N]
end
subgraph "Shared Memory"
IQ[Input Queues]
OQ[Output Queues]
end
CMD <==> CC
CC --> ACM
ACM --> SMC
SMC <==> IQ
SMC <==> OQ
IQ <==> UT1
IQ <==> UT2
IQ <==> UTN
OQ <==> UT1
OQ <==> UT2
OQ <==> UTN
S1 -.-> UT1
S2 -.-> UT2
SN -.-> UTN
ACM --> MT
ACM --> IT
The agent uses a multi-threaded architecture:
- Main Threads (4 threads): Handle network I/O and commander communication
- Intercom Threads (6 + NumLeaderFW): Process shared memory messages and user task communication
- Signal Handler: Manages graceful shutdown on SIGINT/SIGTERM/SIGQUIT
The DDS Agent implements a sophisticated shared memory transport system for high-performance inter-process communication.
graph LR
subgraph "Agent Process"
Agent[DDS Agent]
SMC[SMIntercomChannel]
end
subgraph "Shared Memory Queues"
subgraph "Input Queues"
IQ1[Leader Input Queue 1]
IQ2[Leader Input Queue 2]
IQN[Leader Input Queue N]
end
subgraph "Output Queues"
OQ1[Leader Output Queue Slot1]
OQ2[Leader Output Queue Slot2]
OQN[Leader Output Queue SlotN]
end
end
subgraph "User Tasks"
UT1[User Task Slot 1]
UT2[User Task Slot 2]
UTN[User Task Slot N]
end
Agent --> SMC
SMC <==> IQ1
SMC <==> IQ2
SMC <==> IQN
SMC <==> OQ1
SMC <==> OQ2
SMC <==> OQN
UT1 <==> IQ1
UT1 <==> OQ1
UT2 <==> IQ2
UT2 <==> OQ2
UTN <==> IQN
UTN <==> OQN
The shared memory system uses boost::interprocess::message_queue with the following characteristics:
Queue Types:
- Input Queues: Multiple queues for receiving messages from user tasks
- Output Queues: Per-slot queues for sending messages to specific user tasks
- Message Routing: Agent forwards messages between network and shared memory
Queue Naming Convention:
// Input queues (from user tasks to agent)
std::vector<std::string> inputNames = userDefaults.getSMLeaderInputNames();
// Output queues (from agent to specific user task)
std::string outputName = userDefaults.getSMLeaderOutputName(protocolHeaderID);Message Flow:
- User tasks write to input queues
- Agent reads from all input queues
- Agent processes and routes messages
- Agent writes responses to specific output queues
- User tasks read from their assigned output queue
sequenceDiagram
participant Agent
participant SMChannel
participant InputQueue
participant OutputQueue
participant UserTask
Agent->>SMChannel: Initialize with queue names
SMChannel->>InputQueue: Create/Open input queues
SMChannel->>OutputQueue: Create/Open output queue
UserTask->>InputQueue: Send message
SMChannel->>InputQueue: Poll for messages
InputQueue-->>SMChannel: Deliver message
SMChannel->>Agent: Process message
Agent->>SMChannel: Send response
SMChannel->>OutputQueue: Write response
UserTask->>OutputQueue: Read response
Cleanup Process:
- Agent removes message queues on shutdown
- Per-slot cleanup removes slot-specific output queues
- Graceful cleanup prevents orphaned shared memory objects
The DDS Agent manages user task execution through a sophisticated slot-based system.
stateDiagram-v2
[*] --> SlotCreated: ADD_SLOT command
SlotCreated --> TaskAssigned: ASSIGN_USER_TASK
TaskAssigned --> TaskActivated: ACTIVATE_USER_TASK
TaskActivated --> TaskRunning: Process spawned
TaskRunning --> TaskCompleted: Normal exit
TaskRunning --> TaskTerminated: STOP_USER_TASK
TaskRunning --> TaskFailed: Process error
TaskCompleted --> SlotAvailable: Clean slot
TaskTerminated --> SlotAvailable: Clean slot
TaskFailed --> SlotAvailable: Clean slot
SlotAvailable --> TaskAssigned: New assignment
SlotAvailable --> [*]: Agent shutdown
Each task slot maintains comprehensive metadata:
struct SSlotInfo {
slotId_t m_id; // Unique slot identifier
std::string m_sUsrExe; // User executable path
std::string m_sUsrEnv; // Environment setup script
taskId_t m_taskID; // DDS task ID
uint32_t m_taskIndex; // Task index in topology
uint32_t m_collectionIndex; // Collection index (if applicable)
std::string m_taskPath; // Task path in topology
std::string m_groupName; // Task group name
std::string m_collectionName; // Collection name
std::string m_taskName; // Task name
pid_t m_pid; // Process ID of running task
assets_t m_taskAssets; // Task-specific file assets
};sequenceDiagram
participant Commander
participant Agent
participant Slot
participant FileSystem
Commander->>Agent: ASSIGN_USER_TASK
Agent->>Agent: Validate topology hash
Agent->>Slot: Create slot info
Agent->>Agent: Replace path placeholders
Note over Agent: %taskIndex% → actual index
Note over Agent: %collectionIndex% → actual index
Note over Agent: %DDS_DEFAULT_TASK_PATH% → slot directory
Agent->>FileSystem: Create slot directory
Agent->>Agent: Enable intercom queue
Agent->>Commander: Reply SUCCESS/ERROR
sequenceDiagram
participant Commander
participant Agent
participant TaskWrapper
participant UserProcess
participant Environment
Commander->>Agent: ACTIVATE_USER_TASK
Agent->>Environment: Set task environment variables
Note over Environment: DDS_TASK_ID, DDS_TASK_INDEX,<br/>DDS_COLLECTION_INDEX, DDS_TASK_PATH,<br/>DDS_GROUP_NAME, DDS_COLLECTION_NAME,<br/>DDS_TASK_NAME, DDS_SLOT_ID
Agent->>TaskWrapper: Generate task wrapper script
Note over TaskWrapper: Source custom environment<br/>Execute user task
Agent->>TaskWrapper: Apply execute permissions
Agent->>UserProcess: spawn(bash wrapper_script)
UserProcess->>Agent: Return PID
Agent->>Agent: Register task monitoring
Agent->>Commander: Reply with PID
The agent sets up a comprehensive environment for each user task:
| Variable | Description | Example |
|---|---|---|
DDS_TASK_ID |
Unique task identifier | 12345 |
DDS_TASK_INDEX |
Task index in topology | 0, 1, 2, ... |
DDS_COLLECTION_INDEX |
Collection index (if part of collection) | 0, 1, 2, ... |
DDS_TASK_PATH |
Full path in topology | /main/group1/collection1/task1 |
DDS_GROUP_NAME |
Task group name | group1 |
DDS_COLLECTION_NAME |
Collection name | collection1 |
DDS_TASK_NAME |
Task name | task1 |
DDS_SLOT_ID |
Slot identifier | 1, 2, 3, ... |
DDS_SESSION_ID |
Session identifier | uuid-string |
$DDS_USER_HOME/
├── slots/
│ ├── 1/ # Slot 1 directory
│ │ ├── dds_user_task_wrapper.sh # Generated wrapper script
│ │ ├── user_assets/ # Task-specific files
│ │ └── logs/ # Task output logs
│ ├── 2/ # Slot 2 directory
│ └── N/ # Slot N directory
└── log/
├── task1_2024-01-01-12-00-00_12345_out.log
└── task1_2024-01-01-12-00-00_12345_err.log
The agent generates a wrapper script for each task execution:
#!/bin/bash
# Generated by DDS Agent
# Source custom environment (if provided)
source /path/to/custom/environment.sh
# Execute user task
/path/to/user/executable args...graph TB
subgraph "Task Monitoring"
TM[Task Monitor]
RM[Resource Monitor]
PM[Process Monitor]
end
subgraph "User Process"
UP[User Task]
CP[Child Processes]
GP[Grandchild Processes]
end
subgraph "Monitoring Actions"
DC[Disk Space Check]
PC[Process Check]
TC[Task Completion]
TT[Task Termination]
end
TM --> RM
TM --> PM
RM --> DC
PM --> PC
PC --> TC
PC --> TT
UP --> CP
CP --> GP
PM -.-> UP
PM -.-> CP
PM -.-> GP
The agent implements a sophisticated termination strategy:
-
SIGTERM Phase (5 seconds):
- Send SIGTERM to main process
- Enumerate all child and grandchild processes
- Send SIGTERM to all descendants
- Wait for graceful shutdown
-
SIGKILL Phase:
- If processes still running after timeout
- Send SIGKILL to all remaining processes
- Force termination
sequenceDiagram
participant Agent
participant MainProcess
participant ChildProcess
participant GrandchildProcess
Agent->>Agent: Enumerate process tree
Agent->>MainProcess: SIGTERM
Agent->>ChildProcess: SIGTERM
Agent->>GrandchildProcess: SIGTERM
Note over Agent: Wait 5 seconds
alt Graceful shutdown
MainProcess-->>Agent: Process exits
ChildProcess-->>Agent: Process exits
GrandchildProcess-->>Agent: Process exits
else Force termination
Agent->>MainProcess: SIGKILL
Agent->>ChildProcess: SIGKILL
Agent->>GrandchildProcess: SIGKILL
end
Agent->>Agent: Cleanup slot resources
The agent continuously monitors system resources:
- Disk Space: Prevents task execution when disk space is low
- Memory Usage: Tracks task memory consumption
- Process Status: Monitors task health and completion
- Queue Status: Manages shared memory queue overflow
struct SOptions_t {
uint32_t m_slots; // Number of task slots
std::string m_groupName; // Agent group name
enum ECommand {
cmd_start, // Start agent
cmd_clean // Clean shared memory
} m_Command;
};The agent integrates with DDS User Defaults for:
- Session ID management
- Slot directory configuration
- Shared memory queue naming
- Resource limits and timeouts
- Access permissions for output files
Agent connects to commander using configuration from server info file:
[server]
host=localhost
port=20000graph LR
subgraph "Connection States"
C[Connected]
D[Disconnected]
R[Reconnecting]
F[Failed]
end
subgraph "Error Responses"
RT[Retry Timer]
AC[Attempt Counter]
SD[Shutdown]
end
C -->|Connection Lost| D
D -->|Retry| R
R -->|Success| C
R -->|Failure| F
F -->|Max Attempts| SD
D --> RT
R --> AC
F --> AC
Reconnection Strategy:
- Maximum 5 connection attempts
- 5-second delay between attempts
- Exponential backoff for shared memory retries
- Automatic shutdown after maximum failures
| Error Type | Response | Recovery |
|---|---|---|
| Task Assignment Failure | Reply ERROR to commander | Slot remains available |
| Task Activation Failure | Reply ERROR to commander | Clean slot state |
| Process Spawn Failure | Reply ERROR to commander | Reset slot |
| Task Crash | Send TASK_DONE with exit code | Auto-cleanup slot |
| Shared Memory Error | Log error, retry | Attempt queue recreation |
| Disk Space Full | Reject new tasks | Continue monitoring |
Normal Shutdown:
- Stop accepting new connections
- Terminate all running tasks gracefully
- Remove shared memory queues
- Clean slot directories
- Remove agent ID file
Emergency Cleanup:
- Force terminate all processes
- Remove all message queues
- Clean temporary files
- Reset shared memory state
This documentation covers the core functionality of the DDS Agent. For implementation details, refer to the source code in the src/ directory.