Skip to content

Commit 9239b6e

Browse files
Refactor existing BulkOperations to new support model
1 parent 05f4308 commit 9239b6e

File tree

7 files changed

+492
-190
lines changed

7 files changed

+492
-190
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.stream.Collectors;
21+
22+
import org.springframework.lang.CheckReturnValue;
23+
24+
/**
25+
* Shared support for bulk operation pipelines. A pipeline holds a sequence of
26+
* {@link BulkOperationPipelineItem pipeline items}. Each item is
27+
* <ul>
28+
* <li>{@link BulkOperationPipelineItem#map(Object) mapped} when appended (e.g. domain → BSON),</li>
29+
* <li>{@link BulkOperationPipelineItem#prepareForWrite(Object) prepared} when building write models for execution,</li>
30+
* <li>{@link BulkOperationPipelineItem#finish(Object) finished} after execution (e.g. after-save events).</li>
31+
* </ul>
32+
* This allows both collection-bound bulk operations ({@link DefaultBulkOperations}) and namespaced bulk operations
33+
* ({@link NamespacedBulkOperationSupport}) to share the same pipeline lifecycle.
34+
*
35+
* @param <C> context type used for mapping and lifecycle (e.g. {@link DefaultBulkOperations.BulkOperationContext} or
36+
* {@link NamespacedBulkOperationSupport.NamespacedBulkOperationContext}).
37+
* @param <M> the write model type produced by items (e.g. {@link com.mongodb.client.model.WriteModel WriteModel}&lt;Document&gt;
38+
* or {@link com.mongodb.client.model.bulk.ClientNamespacedWriteModel}).
39+
* @author Christoph Strobl
40+
*/
41+
final class BulkOperationPipelineSupport<C, M> {
42+
43+
private final List<BulkOperationPipelineItem<C, M>> pipeline = new ArrayList<>();
44+
private final C context;
45+
46+
BulkOperationPipelineSupport(C context) {
47+
this.context = context;
48+
}
49+
50+
/**
51+
* Append an item to the pipeline. The item is {@link BulkOperationPipelineItem#map(Object) mapped} immediately
52+
* with the pipeline context.
53+
*
54+
* @param item the pipeline item to append (must not be {@literal null}).
55+
*/
56+
void append(BulkOperationPipelineItem<C, M> item) {
57+
pipeline.add(item.map(context));
58+
}
59+
60+
/**
61+
* Build the list of write models by calling {@link BulkOperationPipelineItem#prepareForWrite(Object)} on each
62+
* pipeline item.
63+
*
64+
* @return the list of write models to pass to the driver.
65+
*/
66+
List<M> models() {
67+
return pipeline.stream().map(it -> it.prepareForWrite(context)).collect(Collectors.toList());
68+
}
69+
70+
/**
71+
* Run post-processing (e.g. after-save events and callbacks) by calling
72+
* {@link BulkOperationPipelineItem#finish(Object)} on each pipeline item.
73+
*/
74+
void postProcess() {
75+
pipeline.forEach(it -> it.finish(context));
76+
}
77+
78+
/**
79+
* A single item in a bulk operation pipeline. Implementations are responsible for mapping domain data to the
80+
* driver representation, building the write model, and running lifecycle hooks.
81+
*
82+
* @param <C> context type.
83+
* @param <M> write model type.
84+
*/
85+
interface BulkOperationPipelineItem<C, M> {
86+
87+
/**
88+
* Map this item using the given context (e.g. map domain objects to BSON). Called when the item is appended
89+
* to the pipeline. May return {@code this} if already mapped or a new instance with mapped state.
90+
*
91+
* @param context the bulk operation context (must not be {@literal null}).
92+
* @return this item or a mapped copy (never {@literal null}).
93+
*/
94+
@CheckReturnValue
95+
BulkOperationPipelineItem<C, M> map(C context);
96+
97+
/**
98+
* Build the write model for the driver. Lifecycle (e.g. before-save events) may be run here. Called when
99+
* collecting models for execution.
100+
*
101+
* @param context the bulk operation context (must not be {@literal null}).
102+
* @return the write model (never {@literal null}).
103+
*/
104+
M prepareForWrite(C context);
105+
106+
/**
107+
* Run post-write lifecycle (e.g. after-save events and callbacks). Called after execution. Default is no-op.
108+
*
109+
* @param context the bulk operation context (must not be {@literal null}).
110+
*/
111+
default void finish(C context) {}
112+
}
113+
}

0 commit comments

Comments
 (0)