Event parsing and normalization pipeline for LLM provider streams.
Transforms raw streaming events from 8+ LLM providers (Anthropic, OpenAI, Gemini, XAI, Claude Code, Cline, Codex, OpenCode) into a canonical ParsedStreamEvent format. Operates as both a NATS consumer for continuous stream processing and an HTTP service for direct ingestion.
Multi-Stage Pipeline:
- Provider parsing via registry (converts provider-specific formats to unified
StreamDelta) - Thinking extraction (separates
<thinking>tags from visible content) - Diff extraction (detects unified diff hunks in code blocks)
- PII redaction (removes emails, phone numbers, etc.)
- Metadata enrichment (session context + provider metrics)
- Schema validation (validates against
ParsedStreamEventbefore publishing)
Stateful Extraction:
- Per-session thinking/diff extractors with 30-minute TTL
- Automatic cleanup every 5 minutes
- Mutex-protected memory management
Authentication:
- OAuth token support (
engram_oauth_<32 hex chars>) - Dev tokens for local development (
engram_dev_<name>) - Scope-based authorization (
memory:write,ingest:write) - PostgreSQL-backed token validation
Event Types: content, thought, tool_call, usage (+ provider-specific types)
Metadata Captured: session_id, working_dir, git_remote, agent_type, cost_usd, duration_ms, model, cache_read_tokens, cache_write_tokens, reasoning_tokens
Data Flow:
Raw Event → Parser Registry → Thinking Extractor → Diff Extractor → Redactor → Schema Validation → NATS
NATS Consumer:
- Topic:
raw_events(consumer group:ingestion-group) - Publishes
consumer_readystatus on startup - Heartbeat every 10 seconds
HTTP Server:
- Port: 6175 (configurable)
- Max body size: 50MB
- Endpoints:
/health,/ingest
Dead Letter Queue: Failed events sent to ingestion.dead_letter with error details
# From monorepo root
bun run infra:up # Start NATS, FalkorDB, PostgreSQL
bun run dev --filter=@engram/ingestion # Start service
# Or from apps/ingestion
bun run devbun run build # No-op (returns success)
bun run dev # Start service| Variable | Default |
|---|---|
PORT |
6175 |
NATS_URL |
nats://localhost:4222 |
AUTH_ENABLED |
true |
AUTH_DATABASE_URL |
postgresql://postgres:postgres@localhost:6183/engram |
Authentication: Required (Bearer token with memory:write or ingest:write scope)
Request:
{
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"ingest_timestamp": "2025-01-15T10:30:00.000Z",
"provider": "anthropic",
"payload": {
"type": "content_block_delta",
"index": 0,
"delta": { "type": "text_delta", "text": "Hello!" }
},
"headers": {
"x-session-id": "sess-123",
"x-working-dir": "/Users/user/project",
"x-git-remote": "[email protected]:user/repo.git",
"x-agent-type": "claude-code"
}
}Response:
200 {"status": "processed"}- Success400 {"error": "..."}- Validation failure or unknown provider401- Missing or invalid authentication403- Insufficient scopes413- Request exceeds 50MB
Returns 200 OK (no authentication required)
bun run test # All tests
bun run test -- src/index.test.ts # Specific file
bun run test -- --watch # Watch mode| Topic | Direction | Key |
|---|---|---|
raw_events |
Consumer | event_id |
parsed_events |
Producer | session_id |
ingestion.dead_letter |
Producer | event_id |
Consumer not starting:
docker exec -it engram-nats-1 nats stream lsCheck DLQ messages:
docker exec -it engram-nats-1 nats consumer info EVENTS dead_letterSupported providers:
anthropic, openai, gemini, xai, claude_code, cline, codex, opencode
Aliases: claude → anthropic, gpt/gpt-4 → openai, grok → xai, claude-code → claude_code
bun run lint # Biome linting
bun run typecheck # TypeScript validation
bun run format # Format code