Skip to content

Commit 0907586

Browse files
committed
fix(control-plane): delegation mints per-request JWT for caller's tenant
The initial delegation layer used Control Plane's long-lived admin service JWT to authenticate forwarded requests. That's wrong for reads: Core's query_events (and Query Service's auth pipeline) resolves tenant from the JWT claims, so every forwarded query was scoped to tenant="system" (CP's admin identity) and our injected tenant_id query param was ignored. Users got cross-tenant reads — including other users' events and the "default" tenant's seed data. Live repro against the initial deploy: POST /api/v1/events → event lands at tenant="default" (Core v0.18.x hardcoded; since-fix landed via b08aa1a + redeploy) GET /api/v1/events/query → returns 100 events from tenant="default", not the caller's tenant Fix: AuthClient.SignDelegationJWT(userID, tenantID, role) mints a short-lived (60s) HS256 JWT that represents the authenticated caller. delegationClient stores the signer as a func value — not a single static token — and each forward call takes a bearer param. ProxyIngestSingle / ProxyIngestBatch / ProxyEventsQuery now: 1. Pull user_id + tenant_id + role from AuthContext (set by AuthMiddleware after JWT or ask_ validation). 2. Inject tenant into body (writes) or query param (reads). 3. Mint a per-request JWT with the caller's identity. 4. Forward with Authorization: Bearer <per-request JWT>. Backends (Core, Query Service) now see the real caller's tenant and enforce isolation as they always would. No backend changes needed. Post-redeploy smoke test (allsource-control-plane.fly.dev): - register → ask_ key with tenant=final-{ts}-at-test-local - POST /events with body.tenant_id="SPOOFED" → returns event_id - GET /events/query?event_type=final.{ts} → count=1, event has tenant_id="final-{ts}-at-test-local" (spoofed value overwritten, cross-tenant reads impossible) Also requires allsource-core redeploy to pick up the tenant_id-from- body fix that landed in main (commit b08aa1a) but shipped with v0.18.0 on Fly. Done in parallel with this commit. Tests: delegation tests updated to assert the per-request JWT shape (stub signer echoes userID/tenantID/role into the forwarded Authorization header for easy assertion). Full CP suite green.
1 parent e2a4791 commit 0907586

4 files changed

Lines changed: 121 additions & 30 deletions

File tree

apps/control-plane/auth.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,31 @@ func NewAuthClient(jwtSecret, coreURL string) *AuthClient {
101101
}
102102
}
103103

104+
// SignDelegationJWT mints a short-lived JWT scoped to a single data-plane
105+
// request. The delegation layer uses this so backends (Core, Query Service)
106+
// see the authenticated caller's tenant + role, not Control Plane's admin
107+
// service identity — tenant isolation works end-to-end without every
108+
// backend needing to validate ask_ keys directly.
109+
//
110+
// 60 second TTL is comfortably longer than any single forwarded request but
111+
// short enough that a leaked token is near-useless.
112+
func (a *AuthClient) SignDelegationJWT(userID, tenantID string, role entities.Role) (string, error) {
113+
now := time.Now()
114+
claims := &Claims{
115+
UserID: userID,
116+
TenantID: tenantID,
117+
Role: role,
118+
IsAPIKey: true,
119+
StandardClaims: jwt.StandardClaims{
120+
ExpiresAt: now.Add(60 * time.Second).Unix(),
121+
IssuedAt: now.Unix(),
122+
Issuer: "allsource",
123+
Subject: userID,
124+
},
125+
}
126+
return jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString([]byte(a.jwtSecret))
127+
}
128+
104129
// SignAPIKey generates a long-lived JWT API key for a given tenant and role.
105130
// Used by agent registration, onboarding, and demo flows.
106131
func (a *AuthClient) SignAPIKey(tenantID, username string, role entities.Role) (string, error) {

apps/control-plane/delegation.go

Lines changed: 81 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,24 @@ import (
1919
"strings"
2020

2121
"github.com/gin-gonic/gin"
22+
23+
"github.com/allsource/control-plane/internal/domain/entities"
2224
)
2325

2426
// delegationClient holds the resolved backend URLs and the shared HTTP
25-
// client used to forward requests. serviceToken is Control Plane's long-lived
26-
// admin JWT — backends trust it and authorize the request on CP's behalf.
27+
// client used to forward requests. signToken mints a per-request JWT for
28+
// the authenticated caller — NOT a Control Plane admin token — so backends
29+
// see the real tenant and role and enforce isolation accordingly.
2730
type delegationClient struct {
2831
core *url.URL
2932
queryService *url.URL
30-
serviceToken string
33+
signToken func(userID, tenantID string, role entities.Role) (string, error)
3134
http *http.Client
3235
}
3336

34-
func newDelegationClient(coreURL, queryURL, serviceToken string, httpClient *http.Client) (*delegationClient, error) {
35-
if serviceToken == "" {
36-
return nil, errors.New("delegation requires serviceToken")
37+
func newDelegationClient(coreURL, queryURL string, signToken func(userID, tenantID string, role entities.Role) (string, error), httpClient *http.Client) (*delegationClient, error) {
38+
if signToken == nil {
39+
return nil, errors.New("delegation requires signToken")
3740
}
3841
core, err := url.Parse(strings.TrimRight(coreURL, "/"))
3942
if err != nil {
@@ -49,7 +52,7 @@ func newDelegationClient(coreURL, queryURL, serviceToken string, httpClient *htt
4952
return &delegationClient{
5053
core: core,
5154
queryService: qs,
52-
serviceToken: serviceToken,
55+
signToken: signToken,
5356
http: httpClient,
5457
}, nil
5558
}
@@ -70,6 +73,26 @@ func authTenantFromContext(c *gin.Context) (string, bool) {
7073
return s, true
7174
}
7275

76+
// authIdentityFromContext returns the three identity fields delegation needs
77+
// to mint a per-request JWT: user ID, tenant ID, and role. AuthMiddleware
78+
// sets these after validating either a JWT or an ask_ API key, so a
79+
// successful context lookup means the caller is authenticated.
80+
func authIdentityFromContext(c *gin.Context) (userID, tenantID string, role entities.Role, ok bool) {
81+
tenantID, ok = authTenantFromContext(c)
82+
if !ok {
83+
return
84+
}
85+
uid, uidOk := c.Get("auth_user_id")
86+
r, rOk := c.Get("auth_role")
87+
if !uidOk || !rOk {
88+
ok = false
89+
return
90+
}
91+
uidStr, _ := uid.(string)
92+
roleVal, _ := r.(entities.Role)
93+
return uidStr, tenantID, roleVal, true
94+
}
95+
7396
// injectTenantIntoObject overwrites the top-level tenant_id field in a JSON
7497
// object. Used for single-event ingest where the Core handler reads
7598
// req.tenant_id.
@@ -109,10 +132,11 @@ func injectTenantIntoBatch(body []byte, tenantID string) ([]byte, error) {
109132
}
110133

111134
// forwardRequest builds a new outgoing request against backend, copies the
112-
// body, attaches the service JWT, and copies the backend's response to the
113-
// client. It does NOT forward any inbound auth headers — the backend trusts
114-
// Control Plane, not the caller.
115-
func (d *delegationClient) forwardRequest(c *gin.Context, method string, backend *url.URL, path string, query url.Values, body []byte) {
135+
// body, attaches the per-request bearer JWT (minted from the authenticated
136+
// caller's identity), and copies the backend's response to the client. It
137+
// does NOT forward the inbound Authorization header — the backend trusts
138+
// the JWT we signed, not any ask_ key the client presented.
139+
func (d *delegationClient) forwardRequest(c *gin.Context, method string, backend *url.URL, path string, query url.Values, body []byte, bearerToken string) {
116140
target := *backend
117141
target.Path = strings.TrimRight(backend.Path, "/") + "/" + strings.TrimLeft(path, "/")
118142
if query != nil {
@@ -129,7 +153,7 @@ func (d *delegationClient) forwardRequest(c *gin.Context, method string, backend
129153
c.JSON(http.StatusInternalServerError, gin.H{"error": "delegation build request", "message": err.Error()})
130154
return
131155
}
132-
req.Header.Set("Authorization", "Bearer "+d.serviceToken)
156+
req.Header.Set("Authorization", "Bearer "+bearerToken)
133157
if body != nil {
134158
req.Header.Set("Content-Type", "application/json")
135159
}
@@ -153,12 +177,30 @@ func (d *delegationClient) forwardRequest(c *gin.Context, method string, backend
153177
}
154178
}
155179

180+
// mintBearer builds a per-request JWT that represents the authenticated
181+
// caller. Backends validate this JWT against the shared JWT_SECRET and get
182+
// the right tenant + role — which is how tenant isolation survives the hop
183+
// through Control Plane.
184+
func (cp *ControlPlane) mintBearer(c *gin.Context) (string, bool) {
185+
userID, tenantID, role, ok := authIdentityFromContext(c)
186+
if !ok {
187+
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized", "message": "no auth context"})
188+
return "", false
189+
}
190+
token, err := cp.delegation.signToken(userID, tenantID, role)
191+
if err != nil {
192+
c.JSON(http.StatusInternalServerError, gin.H{"error": "delegation sign", "message": err.Error()})
193+
return "", false
194+
}
195+
return token, true
196+
}
197+
156198
// ProxyIngestSingle forwards POST /api/v1/events to Core with tenant_id
157199
// injected into the JSON body from the authenticated caller.
158200
func (cp *ControlPlane) ProxyIngestSingle(c *gin.Context) {
159-
tenantID, ok := authTenantFromContext(c)
201+
_, tenantID, _, ok := authIdentityFromContext(c)
160202
if !ok {
161-
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized", "message": "no tenant context"})
203+
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized", "message": "no auth context"})
162204
return
163205
}
164206
body, err := io.ReadAll(c.Request.Body)
@@ -171,15 +213,19 @@ func (cp *ControlPlane) ProxyIngestSingle(c *gin.Context) {
171213
c.JSON(http.StatusBadRequest, gin.H{"error": "bad request", "message": err.Error()})
172214
return
173215
}
174-
cp.delegation.forwardRequest(c, http.MethodPost, cp.delegation.core, "/api/v1/events", nil, rewritten)
216+
bearer, ok := cp.mintBearer(c)
217+
if !ok {
218+
return
219+
}
220+
cp.delegation.forwardRequest(c, http.MethodPost, cp.delegation.core, "/api/v1/events", nil, rewritten, bearer)
175221
}
176222

177223
// ProxyIngestBatch forwards POST /api/v1/events/batch to Core with tenant_id
178224
// injected onto every event in the batch.
179225
func (cp *ControlPlane) ProxyIngestBatch(c *gin.Context) {
180-
tenantID, ok := authTenantFromContext(c)
226+
_, tenantID, _, ok := authIdentityFromContext(c)
181227
if !ok {
182-
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized", "message": "no tenant context"})
228+
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized", "message": "no auth context"})
183229
return
184230
}
185231
body, err := io.ReadAll(c.Request.Body)
@@ -192,19 +238,30 @@ func (cp *ControlPlane) ProxyIngestBatch(c *gin.Context) {
192238
c.JSON(http.StatusBadRequest, gin.H{"error": "bad request", "message": err.Error()})
193239
return
194240
}
195-
cp.delegation.forwardRequest(c, http.MethodPost, cp.delegation.core, "/api/v1/events/batch", nil, rewritten)
241+
bearer, ok := cp.mintBearer(c)
242+
if !ok {
243+
return
244+
}
245+
cp.delegation.forwardRequest(c, http.MethodPost, cp.delegation.core, "/api/v1/events/batch", nil, rewritten, bearer)
196246
}
197247

198-
// ProxyEventsQuery forwards GET /api/v1/events/query to Query Service with
199-
// tenant_id injected as a query param. Any client-supplied tenant_id is
200-
// overwritten — the authenticated caller's tenant is authoritative.
248+
// ProxyEventsQuery forwards GET /api/v1/events/query to Core with tenant_id
249+
// injected as a query param AND a per-request JWT scoped to the caller's
250+
// tenant + role. Core's query_events enforces tenant from auth, which with
251+
// the user-scoped JWT matches what we're asking for. Any client-supplied
252+
// tenant_id in the URL is overwritten — the authenticated caller's tenant
253+
// is authoritative.
201254
func (cp *ControlPlane) ProxyEventsQuery(c *gin.Context) {
202-
tenantID, ok := authTenantFromContext(c)
255+
_, tenantID, _, ok := authIdentityFromContext(c)
203256
if !ok {
204-
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized", "message": "no tenant context"})
257+
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized", "message": "no auth context"})
205258
return
206259
}
207260
q := c.Request.URL.Query()
208261
q.Set("tenant_id", tenantID)
209-
cp.delegation.forwardRequest(c, http.MethodGet, cp.delegation.queryService, "/api/v1/events/query", q, nil)
262+
bearer, ok := cp.mintBearer(c)
263+
if !ok {
264+
return
265+
}
266+
cp.delegation.forwardRequest(c, http.MethodGet, cp.delegation.core, "/api/v1/events/query", q, nil, bearer)
210267
}

apps/control-plane/delegation_test.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,16 @@ func newFakeBackend(respBy func(w http.ResponseWriter, r *http.Request)) (*fakeB
4848
// newTestCP builds a minimal ControlPlane with only the pieces the delegation
4949
// handlers use. It skips AuthMiddleware; tests call handlers directly with a
5050
// pre-populated gin.Context to isolate delegation behaviour.
51+
//
52+
// The token signer is deterministic for test assertions — it echoes the
53+
// inputs so tests can read tenant/user/role out of the forwarded Authorization
54+
// header without JWT parsing.
5155
func newTestCP(t *testing.T, coreURL, qsURL string) *ControlPlane {
5256
t.Helper()
53-
d, err := newDelegationClient(coreURL, qsURL, "test-service-token", http.DefaultClient)
57+
signer := func(userID, tenantID string, role entities.Role) (string, error) {
58+
return "test-token:" + userID + ":" + tenantID + ":" + string(role), nil
59+
}
60+
d, err := newDelegationClient(coreURL, qsURL, signer, http.DefaultClient)
5461
if err != nil {
5562
t.Fatalf("newDelegationClient: %v", err)
5663
}
@@ -63,6 +70,8 @@ func callHandler(h gin.HandlerFunc, req *http.Request, tenantID string) *httptes
6370
c.Request = req
6471
if tenantID != "" {
6572
c.Set("auth_tenant_id", tenantID)
73+
c.Set("auth_user_id", "user-"+tenantID)
74+
c.Set("auth_role", entities.RoleDeveloper)
6675
}
6776
h(c)
6877
return w
@@ -87,8 +96,8 @@ func TestProxyIngestSingle_InjectsTenantAndForwardsToCore(t *testing.T) {
8796
if core.path != "/api/v1/events" {
8897
t.Errorf("upstream path: got %q, want /api/v1/events", core.path)
8998
}
90-
if core.auth != "Bearer test-service-token" {
91-
t.Errorf("upstream auth: got %q, want service JWT", core.auth)
99+
if core.auth != "Bearer test-token:user-tenant-real:tenant-real:developer" {
100+
t.Errorf("upstream auth: got %q, want per-request JWT scoped to caller", core.auth)
92101
}
93102

94103
var forwarded map[string]any
@@ -178,8 +187,8 @@ func TestProxyEventsQuery_InjectsTenantAsQueryParam(t *testing.T) {
178187
if got := qs.query.Get("since"); got != "2026-04-17T00:00:00Z" {
179188
t.Errorf("upstream since: got %q, want original value", got)
180189
}
181-
if qs.auth != "Bearer test-service-token" {
182-
t.Errorf("upstream auth: got %q, want service JWT", qs.auth)
190+
if qs.auth != "Bearer test-token:user-tenant-real:tenant-real:developer" {
191+
t.Errorf("upstream auth: got %q, want per-request JWT scoped to caller", qs.auth)
183192
}
184193
}
185194

apps/control-plane/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func NewControlPlane(ctx context.Context) (*ControlPlane, error) {
261261
if queryURL == "" {
262262
queryURL = "http://allsource-query.internal:3902"
263263
}
264-
delegation, err := newDelegationClient(coreURL, queryURL, serviceToken, NewPooledHTTPClient())
264+
delegation, err := newDelegationClient(coreURL, queryURL, authClient.SignDelegationJWT, NewPooledHTTPClient())
265265
if err != nil {
266266
return nil, fmt.Errorf("init delegation client: %w", err)
267267
}

0 commit comments

Comments
 (0)