Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions codex-rs/analytics/src/analytics_client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadExecutionEnvironment;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
use codex_app_server_protocol::Turn;
Expand Down Expand Up @@ -818,6 +820,7 @@ fn thread_initialized_event_serializes_expected_shape() {
initialization_mode: ThreadInitializationMode::New,
subagent_source: None,
parent_thread_id: None,
execution_environment: Some(ThreadExecutionEnvironment::Remote),
created_at: 1,
},
});
Expand Down Expand Up @@ -849,6 +852,7 @@ fn thread_initialized_event_serializes_expected_shape() {
"initialization_mode": "new",
"subagent_source": null,
"parent_thread_id": null,
"execution_environment": "remote",
"created_at": 1
}
})
Expand Down Expand Up @@ -1750,6 +1754,7 @@ fn turn_event_serializes_expected_shape() {
initialization_mode: ThreadInitializationMode::New,
subagent_source: None,
parent_thread_id: None,
execution_environment: Some(ThreadExecutionEnvironment::Remote),
model: Some("gpt-5".to_string()),
model_provider: "openai".to_string(),
sandbox_policy: Some("read_only"),
Expand Down Expand Up @@ -1811,6 +1816,7 @@ fn turn_event_serializes_expected_shape() {
"initialization_mode": "new",
"subagent_source": null,
"parent_thread_id": null,
"execution_environment": "remote",
"model": "gpt-5",
"model_provider": "openai",
"sandbox_policy": "read_only",
Expand Down Expand Up @@ -2145,6 +2151,111 @@ async fn turn_lifecycle_emits_turn_event() {
assert_eq!(payload["event_params"]["total_tokens"], json!(321));
}

#[tokio::test]
async fn thread_execution_environment_flows_to_thread_turn_and_steer_events() {
let mut reducer = AnalyticsReducer::default();
let mut out = Vec::new();

ingest_initialize(&mut reducer, &mut out).await;
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(1),
request: Box::new(ClientRequest::ThreadStart {
request_id: RequestId::Integer(1),
params: ThreadStartParams {
execution_environment: Some(ThreadExecutionEnvironment::Remote),
..Default::default()
},
}),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
},
&mut out,
)
.await;

let thread_payload = serde_json::to_value(&out[0]).expect("serialize thread event");
assert_eq!(
thread_payload["event_params"]["execution_environment"],
json!("remote")
);
out.clear();

ingest_turn_prerequisites(
&mut reducer,
&mut out,
/*include_initialize*/ false,
/*include_resolved_config*/ true,
/*include_started*/ false,
/*include_token_usage*/ false,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
"thread-2", "turn-2", /*request_id*/ 4,
)),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
"thread-2",
"turn-2",
AppServerTurnStatus::Completed,
/*codex_error_info*/ None,
))),
&mut out,
)
.await;

let turn_steer_event = out
.iter()
.find(|event| matches!(event, TrackEventRequest::TurnSteer(_)))
.expect("turn steer event should be emitted");
let turn_steer_payload =
serde_json::to_value(turn_steer_event).expect("serialize turn steer event");
assert_eq!(
turn_steer_payload["event_params"]["execution_environment"],
json!("remote")
);

let turn_event = out
.iter()
.find(|event| matches!(event, TrackEventRequest::TurnEvent(_)))
.expect("turn event should be emitted");
let turn_payload = serde_json::to_value(turn_event).expect("serialize turn event");
assert_eq!(
turn_payload["event_params"]["execution_environment"],
json!("remote")
);
}

#[tokio::test]
async fn accepted_steers_increment_turn_steer_count() {
let mut reducer = AnalyticsReducer::default();
Expand Down
5 changes: 5 additions & 0 deletions codex-rs/analytics/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::facts::TurnSteerResult;
use crate::facts::TurnSubmissionType;
use crate::now_unix_seconds;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::ThreadExecutionEnvironment;
use codex_login::default_client::originator;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::approvals::NetworkApprovalProtocol;
Expand Down Expand Up @@ -114,6 +115,7 @@ pub(crate) struct ThreadInitializedEventParams {
pub(crate) initialization_mode: ThreadInitializationMode,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) execution_environment: Option<ThreadExecutionEnvironment>,
pub(crate) created_at: u64,
}

Expand Down Expand Up @@ -466,6 +468,7 @@ pub(crate) struct CodexTurnEventParams {
pub(crate) initialization_mode: ThreadInitializationMode,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) execution_environment: Option<ThreadExecutionEnvironment>,
pub(crate) model: Option<String>,
pub(crate) model_provider: String,
pub(crate) sandbox_policy: Option<&'static str>,
Expand Down Expand Up @@ -518,6 +521,7 @@ pub(crate) struct CodexTurnSteerEventParams {
pub(crate) thread_source: Option<String>,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) execution_environment: Option<ThreadExecutionEnvironment>,
pub(crate) num_input_images: usize,
pub(crate) result: TurnSteerResult,
pub(crate) rejection_reason: Option<TurnSteerRejectionReason>,
Expand Down Expand Up @@ -722,6 +726,7 @@ pub(crate) fn subagent_thread_started_event_request(
parent_thread_id: input
.parent_thread_id
.or_else(|| subagent_parent_thread_id(&input.subagent_source)),
execution_environment: None,
created_at: input.created_at,
};
ThreadInitializedEvent {
Expand Down
80 changes: 75 additions & 5 deletions codex-rs/analytics/src/reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadExecutionEnvironment;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput;
use codex_git_utils::collect_git_info;
Expand Down Expand Up @@ -89,12 +90,14 @@ struct ThreadMetadataState {
initialization_mode: ThreadInitializationMode,
subagent_source: Option<String>,
parent_thread_id: Option<String>,
execution_environment: Option<ThreadExecutionEnvironment>,
}

impl ThreadMetadataState {
fn from_thread_metadata(
session_source: &SessionSource,
initialization_mode: ThreadInitializationMode,
execution_environment: Option<ThreadExecutionEnvironment>,
) -> Self {
let (subagent_source, parent_thread_id) = match session_source {
SessionSource::SubAgent(subagent_source) => (
Expand All @@ -114,11 +117,15 @@ impl ThreadMetadataState {
initialization_mode,
subagent_source,
parent_thread_id,
execution_environment,
}
}
}

enum RequestState {
ThreadInitialized {
execution_environment: Option<ThreadExecutionEnvironment>,
},
TurnStart(PendingTurnStartState),
TurnSteer(PendingTurnSteerState),
}
Expand Down Expand Up @@ -319,6 +326,30 @@ impl AnalyticsReducer {
request: ClientRequest,
) {
match request {
ClientRequest::ThreadStart { params, .. } => {
self.requests.insert(
(connection_id, request_id),
RequestState::ThreadInitialized {
execution_environment: params.execution_environment,
},
);
}
ClientRequest::ThreadResume { params, .. } => {
self.requests.insert(
(connection_id, request_id),
RequestState::ThreadInitialized {
execution_environment: params.execution_environment,
},
);
}
ClientRequest::ThreadFork { params, .. } => {
self.requests.insert(
(connection_id, request_id),
RequestState::ThreadInitialized {
execution_environment: params.execution_environment,
},
);
}
ClientRequest::TurnStart { params, .. } => {
self.requests.insert(
(connection_id, request_id),
Expand Down Expand Up @@ -497,30 +528,48 @@ impl AnalyticsReducer {
out: &mut Vec<TrackEventRequest>,
) {
match response {
ClientResponse::ThreadStart { response, .. } => {
ClientResponse::ThreadStart {
request_id,
response,
} => {
let execution_environment =
self.thread_initialized_execution_environment(connection_id, request_id);
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::New,
execution_environment,
out,
);
}
ClientResponse::ThreadResume { response, .. } => {
ClientResponse::ThreadResume {
request_id,
response,
} => {
let execution_environment =
self.thread_initialized_execution_environment(connection_id, request_id);
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::Resumed,
execution_environment,
out,
);
}
ClientResponse::ThreadFork { response, .. } => {
ClientResponse::ThreadFork {
request_id,
response,
} => {
let execution_environment =
self.thread_initialized_execution_environment(connection_id, request_id);
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::Forked,
execution_environment,
out,
);
}
Expand Down Expand Up @@ -580,6 +629,7 @@ impl AnalyticsReducer {
out: &mut Vec<TrackEventRequest>,
) {
match request {
RequestState::ThreadInitialized { .. } => {}
RequestState::TurnStart(_) => {}
RequestState::TurnSteer(pending_request) => {
self.ingest_turn_steer_error_response(
Expand Down Expand Up @@ -674,15 +724,19 @@ impl AnalyticsReducer {
thread: codex_app_server_protocol::Thread,
model: String,
initialization_mode: ThreadInitializationMode,
execution_environment: Option<ThreadExecutionEnvironment>,
out: &mut Vec<TrackEventRequest>,
) {
let thread_source: SessionSource = thread.source.into();
let thread_id = thread.id;
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
let thread_metadata =
ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode);
let thread_metadata = ThreadMetadataState::from_thread_metadata(
&thread_source,
initialization_mode,
execution_environment,
);
self.thread_connections
.insert(thread_id.clone(), connection_id);
self.thread_metadata
Expand All @@ -700,12 +754,26 @@ impl AnalyticsReducer {
initialization_mode,
subagent_source: thread_metadata.subagent_source,
parent_thread_id: thread_metadata.parent_thread_id,
execution_environment,
created_at: u64::try_from(thread.created_at).unwrap_or_default(),
},
},
));
}

fn thread_initialized_execution_environment(
&mut self,
connection_id: u64,
request_id: RequestId,
) -> Option<ThreadExecutionEnvironment> {
match self.requests.remove(&(connection_id, request_id)) {
Some(RequestState::ThreadInitialized {
execution_environment,
}) => execution_environment,
Some(RequestState::TurnStart(_)) | Some(RequestState::TurnSteer(_)) | None => None,
}
}

fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
let Some(connection_id) = self.thread_connections.get(&input.thread_id) else {
tracing::warn!(
Expand Down Expand Up @@ -802,6 +870,7 @@ impl AnalyticsReducer {
thread_source: thread_metadata.thread_source.map(str::to_string),
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
execution_environment: thread_metadata.execution_environment,
num_input_images: pending_request.num_input_images,
result,
rejection_reason,
Expand Down Expand Up @@ -916,6 +985,7 @@ fn codex_turn_event_params(
initialization_mode: thread_metadata.initialization_mode,
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
execution_environment: thread_metadata.execution_environment,
model: Some(model),
model_provider,
sandbox_policy: Some(sandbox_policy_mode(
Expand Down
Loading
Loading