Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.annotation.JsonView;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import java.time.Instant;
import java.util.List;
import java.util.UUID;

public class RecentActivity {

public static class View {
public static class Public {
}
}

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record RecentActivityPage(
@JsonView(View.Public.class) int page,
@JsonView(View.Public.class) int size,
@JsonView(View.Public.class) long total,
@JsonView(View.Public.class) List<RecentActivityItem> content) {
}

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record RecentActivityItem(
@JsonView(View.Public.class) ActivityType type,
@JsonView(View.Public.class) UUID id,
@JsonView(View.Public.class) String name,
@JsonView(View.Public.class) UUID resourceId,
@JsonView(View.Public.class) String createdBy,
@JsonView(View.Public.class) Instant createdAt) {
}

@Builder(toBuilder = true)
public record RecentDatasetVersion(@NonNull UUID datasetId, @NonNull String datasetName,
@NonNull String datasetType, @NonNull Instant createdAt, @NonNull String createdBy) {
}

@RequiredArgsConstructor
@Getter
public enum ActivityType {
EXPERIMENT("experiment"),
DATASET_VERSION("dataset_version"),
TEST_SUITE_VERSION("test_suite_version"),
ALERT_EVENT("alert_event"),
OPTIMIZATION("optimization"),
AGENT_CONFIG_VERSION("agent_config_version");

@JsonValue
private final String value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class WebhookEvent<T> {

@NotNull private UUID alertId;

private UUID projectId;

@NotNull private String alertName;

@NotNull Map<String, String> alertMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -69,22 +70,26 @@ public Mono<String> sendWebhook(@NonNull WebhookEvent<?> event) {
throwable)));
}

private void logInfo(WebhookEvent<?> event, String workspaceId, String message, Object... args) {
try (var logContext = wrapWithMdc(Map.of(
private Map<String, String> buildMdcMap(WebhookEvent<?> event, String workspaceId) {
var mdc = new HashMap<>(Map.of(
UserLog.MARKER, UserLog.ALERT_EVENT.name(),
UserLog.WORKSPACE_ID, workspaceId,
UserLog.EVENT_ID, event.getId(),
UserLog.ALERT_ID, event.getAlertId().toString()))) {
UserLog.ALERT_ID, event.getAlertId().toString()));
if (event.getProjectId() != null) {
mdc.put(UserLog.PROJECT_ID, event.getProjectId().toString());
Comment thread
miguelgrc marked this conversation as resolved.
}
return mdc;
}

private void logInfo(WebhookEvent<?> event, String workspaceId, String message, Object... args) {
try (var logContext = wrapWithMdc(buildMdcMap(event, workspaceId))) {
userFacingLog.info(message, args);
}
}

private void logError(WebhookEvent<?> event, String workspaceId, String message, Throwable throwable) {
try (var logContext = wrapWithMdc(Map.of(
UserLog.MARKER, UserLog.ALERT_EVENT.name(),
UserLog.WORKSPACE_ID, workspaceId,
UserLog.EVENT_ID, event.getId(),
UserLog.ALERT_ID, event.getAlertId().toString()))) {
try (var logContext = wrapWithMdc(buildMdcMap(event, workspaceId))) {
userFacingLog.error(message, throwable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public <T> Mono<String> publishWebhookEvent(@NonNull AlertEventType eventType,
.eventType(eventType)
.alertType(Optional.ofNullable(alert.alertType()).orElse(AlertType.GENERAL))
.alertId(alert.id())
.projectId(alert.projectId())
.alertName(alert.name())
.alertMetadata(Optional.ofNullable(alert.metadata()).orElse(Map.of()))
.payload(payload)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.comet.opik.api.resources.v1.priv;

import com.codahale.metrics.annotation.Timed;
import com.comet.opik.api.RecentActivity;
import com.comet.opik.domain.RecentActivityService;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.auth.RequiredPermissions;
import com.comet.opik.infrastructure.auth.WorkspaceUserPermission;
import com.fasterxml.jackson.annotation.JsonView;
import io.dropwizard.jersey.errors.ErrorMessage;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.UUID;

import static com.comet.opik.utils.AsyncUtils.setRequestContext;

@Path("/v1/private/projects/{projectId}/activities")
@Produces(MediaType.APPLICATION_JSON)
Comment thread
miguelgrc marked this conversation as resolved.
@Consumes(MediaType.APPLICATION_JSON)
@Timed
@Slf4j
@RequiredArgsConstructor(onConstructor_ = @Inject)
@Tag(name = "Projects", description = "Project recent activity")
public class RecentActivityResource {

private final @NonNull RecentActivityService recentActivityService;
private final @NonNull Provider<RequestContext> requestContext;

@GET
@Operation(operationId = "getRecentActivity", summary = "Get recent activity for a project", description = "Returns the most recent activity items across all entity types for a project, sorted by date descending.", responses = {
@ApiResponse(responseCode = "200", description = "Recent activity page", content = @Content(schema = @Schema(implementation = RecentActivity.RecentActivityPage.class))),
@ApiResponse(responseCode = "400", description = "Bad Request", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "500", description = "Internal Server Error", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))
})
@RequiredPermissions(WorkspaceUserPermission.PROJECT_DATA_VIEW)
@JsonView(RecentActivity.View.Public.class)
public Response getRecentActivity(
Comment thread
miguelgrc marked this conversation as resolved.
@PathParam("projectId") UUID projectId,
@QueryParam("page") @Min(1) @DefaultValue("1") int page,
@QueryParam("size") @Min(1) @Max(100) @DefaultValue("10") int size) {
Comment thread
andrescrz marked this conversation as resolved.

var activity = recentActivityService.getRecentActivity(projectId, page, size)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
Comment thread
miguelgrc marked this conversation as resolved.
.block();

return Response.ok(activity).build();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.comet.opik.domain;

import com.comet.opik.api.DatasetVersion;
import com.comet.opik.api.RecentActivity;
import com.comet.opik.infrastructure.db.EvaluatorItemListColumnMapper;
import com.comet.opik.infrastructure.db.ExecutionPolicyColumnMapper;
import com.comet.opik.infrastructure.db.MapFlatArgumentFactory;
Expand Down Expand Up @@ -446,6 +447,35 @@ WHERE dv.id IN (<version_ids>)
List<DatasetVersion> findByIds(@BindList("version_ids") Collection<UUID> versionIds,
@Bind("workspace_id") String workspaceId);

@SqlQuery("""
SELECT dataset_id, dataset_name, dataset_type, created_at, created_by FROM (
SELECT id AS dataset_id, name AS dataset_name, type AS dataset_type, created_at, created_by
FROM datasets
WHERE workspace_id = :workspace_id
Comment thread
miguelgrc marked this conversation as resolved.
AND project_id = :project_id
AND id >= :min_id

UNION ALL

SELECT d.id, d.name, d.type, dv.created_at, dv.created_by
FROM dataset_versions dv
INNER JOIN datasets d ON dv.dataset_id = d.id AND dv.workspace_id = d.workspace_id
Comment thread
miguelgrc marked this conversation as resolved.
WHERE d.workspace_id = :workspace_id
Comment thread
miguelgrc marked this conversation as resolved.
AND d.project_id = :project_id
AND dv.id >= :min_id
AND dv.created_at > d.created_at
Comment thread
andrescrz marked this conversation as resolved.
AND dv.items_total > 0
Comment thread
andrescrz marked this conversation as resolved.
) combined
ORDER BY created_at DESC
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to similar questions: is this created_at sorting over the whole union performant? Do we lack or need index here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UNION result is at most 2 × limit rows — each half is filtered by workspace+project index and min_id primary key, then limited. MySQL filesorts ~20 rows, which is trivial. An index can't be applied across a UNION result.

LIMIT :limit
""")
@RegisterConstructorMapper(value = RecentActivity.RecentDatasetVersion.class)
List<RecentActivity.RecentDatasetVersion> findRecentActivityByProjectId(
@Bind("workspace_id") String workspaceId,
@Bind("project_id") UUID projectId,
@Bind("min_id") UUID minId,
@Bind("limit") int limit);

@SqlUpdate("DELETE FROM dataset_version_tags WHERE dataset_id IN (<dataset_ids>) AND workspace_id = :workspace_id")
void deleteAllTagsByDatasetIds(@BindList("dataset_ids") Collection<UUID> datasetIds,
@Bind("workspace_id") String workspaceId);
Expand Down
Loading
Loading