Skip to content

Commit 8d6d520

Browse files
committed
Preserve remote execution attribution
1 parent 3205b65 commit 8d6d520

4 files changed

Lines changed: 88 additions & 3 deletions

File tree

codex-rs/analytics/src/analytics_client_tests.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use codex_app_server_protocol::ServerNotification;
7272
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
7373
use codex_app_server_protocol::Thread;
7474
use codex_app_server_protocol::ThreadExecutionEnvironment;
75+
use codex_app_server_protocol::ThreadResumeParams;
7576
use codex_app_server_protocol::ThreadResumeResponse;
7677
use codex_app_server_protocol::ThreadStartParams;
7778
use codex_app_server_protocol::ThreadStartResponse;
@@ -2192,6 +2193,67 @@ async fn thread_execution_environment_flows_to_thread_turn_and_steer_events() {
21922193
);
21932194
out.clear();
21942195

2196+
reducer
2197+
.ingest(
2198+
AnalyticsFact::ClientRequest {
2199+
connection_id: 7,
2200+
request_id: RequestId::Integer(2),
2201+
request: Box::new(ClientRequest::ThreadResume {
2202+
request_id: RequestId::Integer(2),
2203+
params: ThreadResumeParams {
2204+
thread_id: "thread-2".to_string(),
2205+
..Default::default()
2206+
},
2207+
}),
2208+
},
2209+
&mut out,
2210+
)
2211+
.await;
2212+
reducer
2213+
.ingest(
2214+
AnalyticsFact::ClientResponse {
2215+
connection_id: 7,
2216+
response: Box::new(sample_thread_resume_response(
2217+
"thread-2", /*ephemeral*/ false, "gpt-5",
2218+
)),
2219+
},
2220+
&mut out,
2221+
)
2222+
.await;
2223+
2224+
let resume_payload = serde_json::to_value(&out[0]).expect("serialize resume thread event");
2225+
assert_eq!(
2226+
resume_payload["event_params"]["execution_environment"],
2227+
json!("remote")
2228+
);
2229+
out.clear();
2230+
2231+
reducer
2232+
.ingest(
2233+
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
2234+
SubAgentThreadStartedInput {
2235+
thread_id: "thread-subagent".to_string(),
2236+
parent_thread_id: Some("thread-2".to_string()),
2237+
product_client_id: "codex-tui".to_string(),
2238+
client_name: "codex-tui".to_string(),
2239+
client_version: "1.0.0".to_string(),
2240+
model: "gpt-5".to_string(),
2241+
ephemeral: false,
2242+
subagent_source: SubAgentSource::Review,
2243+
created_at: 123,
2244+
},
2245+
)),
2246+
&mut out,
2247+
)
2248+
.await;
2249+
2250+
let subagent_payload = serde_json::to_value(&out[0]).expect("serialize subagent thread event");
2251+
assert_eq!(
2252+
subagent_payload["event_params"]["execution_environment"],
2253+
json!("remote")
2254+
);
2255+
out.clear();
2256+
21952257
ingest_turn_prerequisites(
21962258
&mut reducer,
21972259
&mut out,

codex-rs/analytics/src/reducer.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,9 +278,17 @@ impl AnalyticsReducer {
278278
input: SubAgentThreadStartedInput,
279279
out: &mut Vec<TrackEventRequest>,
280280
) {
281-
out.push(TrackEventRequest::ThreadInitialized(
282-
subagent_thread_started_event_request(input),
283-
));
281+
let parent_thread_id = input
282+
.parent_thread_id
283+
.clone()
284+
.or_else(|| subagent_parent_thread_id(&input.subagent_source));
285+
let execution_environment = parent_thread_id
286+
.as_ref()
287+
.and_then(|parent_thread_id| self.thread_metadata.get(parent_thread_id))
288+
.and_then(|thread_metadata| thread_metadata.execution_environment);
289+
let mut event = subagent_thread_started_event_request(input);
290+
event.event_params.execution_environment = execution_environment;
291+
out.push(TrackEventRequest::ThreadInitialized(event));
284292
}
285293

286294
fn ingest_guardian_review(
@@ -732,6 +740,11 @@ impl AnalyticsReducer {
732740
let Some(connection_state) = self.connections.get(&connection_id) else {
733741
return;
734742
};
743+
let execution_environment = execution_environment.or_else(|| {
744+
self.thread_metadata
745+
.get(&thread_id)
746+
.and_then(|thread_metadata| thread_metadata.execution_environment)
747+
});
735748
let thread_metadata = ThreadMetadataState::from_thread_metadata(
736749
&thread_source,
737750
initialization_mode,

codex-rs/app-server/src/codex_message_processor.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4581,6 +4581,7 @@ impl CodexMessageProcessor {
45814581
let command = crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse(
45824582
Box::new(crate::thread_state::PendingThreadResumeRequest {
45834583
request_id: request_id.clone(),
4584+
analytics_events_client: self.analytics_events_client.clone(),
45844585
history_items,
45854586
config_snapshot,
45864587
instruction_sources,
@@ -8309,6 +8310,13 @@ async fn handle_pending_thread_resume_request(
83098310
permission_profile,
83108311
reasoning_effort,
83118312
};
8313+
pending.analytics_events_client.track_response(
8314+
connection_id.0,
8315+
ClientResponse::ThreadResume {
8316+
request_id: request_id.request_id.clone(),
8317+
response: response.clone(),
8318+
},
8319+
);
83128320
let token_usage_thread = pending.include_turns.then(|| response.thread.clone());
83138321
outgoing.send_response(request_id, response).await;
83148322
// Match cold resume: metadata-only resume should attach the listener without

codex-rs/app-server/src/thread_state.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::outgoing_message::ConnectionId;
22
use crate::outgoing_message::ConnectionRequestId;
3+
use codex_analytics::AnalyticsEventsClient;
34
use codex_app_server_protocol::RequestId;
45
use codex_app_server_protocol::ThreadGoal;
56
use codex_app_server_protocol::ThreadHistoryBuilder;
@@ -29,6 +30,7 @@ type PendingInterruptQueue = Vec<(
2930

3031
pub(crate) struct PendingThreadResumeRequest {
3132
pub(crate) request_id: ConnectionRequestId,
33+
pub(crate) analytics_events_client: AnalyticsEventsClient,
3234
pub(crate) history_items: Vec<RolloutItem>,
3335
pub(crate) config_snapshot: ThreadConfigSnapshot,
3436
pub(crate) instruction_sources: Vec<AbsolutePathBuf>,

0 commit comments

Comments
 (0)