Skip to content

Commit 7d85f56

Browse files
Decouple Bulk from BulkOperations
1 parent 01eb995 commit 7d85f56

File tree

15 files changed

+1765
-21
lines changed

15 files changed

+1765
-21
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb;
17+
18+
/**
19+
* @author Christoph Strobl
20+
*/
21+
public interface MongoClientAware<T> {
22+
23+
T getMongoClient();
24+
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
21+
import org.bson.Document;
22+
import org.springframework.dao.DataAccessException;
23+
import org.springframework.data.mongodb.core.MongoTemplate.SourceAwareDocument;
24+
import org.springframework.data.mongodb.core.QueryOperations.DeleteContext;
25+
import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
26+
import org.springframework.data.mongodb.core.bulk.Bulk;
27+
import org.springframework.data.mongodb.core.bulk.Bulk.Order;
28+
import org.springframework.data.mongodb.core.bulk.BulkOperation;
29+
import org.springframework.data.mongodb.core.bulk.BulkOperation.Insert;
30+
import org.springframework.data.mongodb.core.bulk.BulkOperation.Remove;
31+
import org.springframework.data.mongodb.core.bulk.BulkOperation.Replace;
32+
import org.springframework.data.mongodb.core.bulk.BulkOperation.Update;
33+
import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateSingle;
34+
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
35+
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
36+
37+
import com.mongodb.MongoBulkWriteException;
38+
import com.mongodb.MongoNamespace;
39+
import com.mongodb.client.model.DeleteOptions;
40+
import com.mongodb.client.model.UpdateOptions;
41+
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
42+
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
43+
import com.mongodb.client.model.bulk.ClientDeleteManyOptions;
44+
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
45+
import com.mongodb.client.model.bulk.ClientReplaceOneOptions;
46+
import com.mongodb.client.model.bulk.ClientUpdateManyOptions;
47+
import com.mongodb.client.model.bulk.ClientUpdateOneOptions;
48+
49+
/**
50+
* Internal API wrapping a {@link MongoTemplate} to encapsulate {@link Bulk} handling.
51+
*
52+
* @author Christoph Strobl
53+
* @since 2026/02
54+
*/
55+
class BulkWriter {
56+
57+
MongoTemplate template;
58+
59+
BulkWriter(MongoTemplate template) {
60+
this.template = template;
61+
}
62+
63+
public ClientBulkWriteResult write(String defaultDatabase, Order order, Bulk bulk) {
64+
65+
List<ClientNamespacedWriteModel> writeModels = new ArrayList<>();
66+
List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>();
67+
68+
for (BulkOperation bulkOp : bulk.operations()) {
69+
70+
String collectionName = bulkOp.context().namespace().collection() != null
71+
? bulkOp.context().namespace().collection()
72+
: template.getCollectionName(bulkOp.context().namespace().type());
73+
74+
MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase, collectionName);
75+
if (bulkOp instanceof Insert insert) {
76+
77+
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(collectionName, insert.value(),
78+
template.getConverter());
79+
writeModels.add(ClientNamespacedWriteModel.insertOne(mongoNamespace, sourceAwareDocument.document()));
80+
afterSaveCallables.add(sourceAwareDocument);
81+
} else if (bulkOp instanceof Update update) {
82+
83+
Class<?> domainType = update.context().namespace().type();
84+
boolean multi = !(bulkOp instanceof UpdateSingle);
85+
86+
UpdateContext updateContext = template.getQueryOperations().updateContext(update.update(), update.query(),
87+
update.upsert());
88+
MongoPersistentEntity<?> entity = template.getPersistentEntity(domainType);
89+
90+
Document mappedQuery = updateContext.getMappedQuery(entity);
91+
Object mappedUpdate = updateContext.isAggregationUpdate() ? updateContext.getUpdatePipeline(domainType)
92+
: updateContext.getMappedUpdate(entity);
93+
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, update.query());
94+
95+
if (multi) {
96+
97+
ClientUpdateManyOptions updateManyOptions = ClientUpdateManyOptions.clientUpdateManyOptions();
98+
updateManyOptions.arrayFilters(updateOptions.getArrayFilters());
99+
updateManyOptions.collation(updateOptions.getCollation());
100+
updateManyOptions.upsert(updateOptions.isUpsert());
101+
updateManyOptions.hint(updateOptions.getHint());
102+
updateManyOptions.hintString(updateOptions.getHintString());
103+
104+
if (mappedUpdate instanceof List<?> pipeline) {
105+
writeModels.add(ClientNamespacedWriteModel.updateMany(mongoNamespace, mappedQuery,
106+
(List<Document>) pipeline, updateManyOptions));
107+
} else {
108+
writeModels.add(ClientNamespacedWriteModel.updateMany(mongoNamespace, mappedQuery, (Document) mappedUpdate,
109+
updateManyOptions));
110+
}
111+
} else {
112+
113+
ClientUpdateOneOptions updateOneOptions = ClientUpdateOneOptions.clientUpdateOneOptions();
114+
updateOneOptions.arrayFilters(updateOptions.getArrayFilters());
115+
updateOneOptions.collation(updateOptions.getCollation());
116+
updateOneOptions.upsert(updateOptions.isUpsert());
117+
updateOneOptions.hint(updateOptions.getHint());
118+
updateOneOptions.hintString(updateOptions.getHintString());
119+
120+
if (mappedUpdate instanceof List<?> pipeline) {
121+
writeModels.add(ClientNamespacedWriteModel.updateOne(mongoNamespace, mappedQuery, (List<Document>) pipeline,
122+
updateOneOptions));
123+
} else {
124+
writeModels.add(ClientNamespacedWriteModel.updateOne(mongoNamespace, mappedQuery, (Document) mappedUpdate,
125+
updateOneOptions));
126+
}
127+
}
128+
} else if (bulkOp instanceof Remove remove) {
129+
130+
Class<?> domainType = remove.context().namespace().type();
131+
DeleteContext deleteContext = template.getQueryOperations().deleteQueryContext(remove.query());
132+
133+
Document mappedQuery = deleteContext.getMappedQuery(template.getPersistentEntity(domainType));
134+
DeleteOptions deleteOptions = deleteContext.getDeleteOptions(domainType);
135+
ClientDeleteManyOptions clientDeleteManyOptions = ClientDeleteManyOptions.clientDeleteManyOptions();
136+
clientDeleteManyOptions.collation(deleteOptions.getCollation());
137+
clientDeleteManyOptions.hint(deleteOptions.getHint());
138+
clientDeleteManyOptions.hintString(deleteOptions.getHintString());
139+
140+
writeModels.add(ClientNamespacedWriteModel.deleteMany(mongoNamespace, mappedQuery, clientDeleteManyOptions));
141+
} else if (bulkOp instanceof Replace replace) {
142+
143+
Class<?> domainType = replace.context().namespace().type();
144+
145+
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(collectionName,
146+
replace.replacement(), template.getConverter());
147+
148+
UpdateContext updateContext = template.getQueryOperations().replaceSingleContext(replace.query(),
149+
MappedDocument.of(sourceAwareDocument.document()), replace.upsert());
150+
151+
Document mappedQuery = updateContext.getMappedQuery(template.getPersistentEntity(domainType));
152+
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, replace.query());
153+
154+
ClientReplaceOneOptions replaceOptions = ClientReplaceOneOptions.clientReplaceOneOptions();
155+
replaceOptions.upsert(updateOptions.isUpsert());
156+
replaceOptions.sort(updateOptions.getSort());
157+
replaceOptions.hint(updateOptions.getHint());
158+
replaceOptions.hintString(updateOptions.getHintString());
159+
replaceOptions.collation(updateOptions.getCollation());
160+
161+
writeModels.add(ClientNamespacedWriteModel.replaceOne(mongoNamespace, mappedQuery,
162+
sourceAwareDocument.document(), replaceOptions));
163+
afterSaveCallables.add(sourceAwareDocument);
164+
}
165+
}
166+
167+
try {
168+
169+
ClientBulkWriteResult clientBulkWriteResult = template.doWithClient(client -> client.bulkWrite(writeModels,
170+
ClientBulkWriteOptions.clientBulkWriteOptions().ordered(order.equals(Order.SEQUENTIAL))));
171+
172+
afterSaveCallables.forEach(callable -> {
173+
template.maybeEmitEvent(new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName()));
174+
template.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName());
175+
});
176+
return clientBulkWriteResult;
177+
} catch (MongoBulkWriteException e) {
178+
DataAccessException dataAccessException = template.getExceptionTranslator().translateExceptionIfPossible(e);
179+
if (dataAccessException != null) {
180+
throw dataAccessException;
181+
}
182+
throw e;
183+
}
184+
}
185+
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/EntityOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
* @see MongoTemplate
9393
* @see ReactiveMongoTemplate
9494
*/
95-
class EntityOperations {
95+
public class EntityOperations {
9696

9797
private static final String ID_FIELD = FieldName.ID.name();
9898

@@ -109,7 +109,7 @@ class EntityOperations {
109109
this(converter, new QueryMapper(converter));
110110
}
111111

112-
EntityOperations(MongoConverter converter, QueryMapper queryMapper) {
112+
public EntityOperations(MongoConverter converter, QueryMapper queryMapper) {
113113
this(converter, converter.getMappingContext(), converter.getCustomConversions(), converter.getProjectionFactory(),
114114
queryMapper);
115115
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoDatabaseFactorySupport.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.springframework.aop.framework.ProxyFactory;
2121
import org.springframework.dao.DataAccessException;
2222
import org.springframework.dao.support.PersistenceExceptionTranslator;
23+
import org.springframework.data.mongodb.MongoClientAware;
2324
import org.springframework.data.mongodb.MongoDatabaseFactory;
2425
import org.springframework.data.mongodb.SessionAwareMethodInterceptor;
2526
import org.springframework.lang.Contract;
@@ -41,7 +42,7 @@
4142
* @since 3.0
4243
* @see SimpleMongoClientDatabaseFactory
4344
*/
44-
public abstract class MongoDatabaseFactorySupport<C> implements MongoDatabaseFactory {
45+
public abstract class MongoDatabaseFactorySupport<C> implements MongoDatabaseFactory, MongoClientAware<C> {
4546

4647
private final C mongoClient;
4748
private final String databaseName;
@@ -146,7 +147,7 @@ public MongoDatabaseFactory withSession(ClientSession session) {
146147
/**
147148
* @return the Mongo client object.
148149
*/
149-
protected C getMongoClient() {
150+
public C getMongoClient() {
150151
return mongoClient;
151152
}
152153

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
3939
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
4040
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
41+
import org.springframework.data.mongodb.core.bulk.Bulk;
42+
import org.springframework.data.mongodb.core.bulk.Bulk.Order;
43+
import org.springframework.data.mongodb.core.bulk.BulkOperationResult;
4144
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
4245
import org.springframework.data.mongodb.core.convert.MongoConverter;
4346
import org.springframework.data.mongodb.core.index.IndexOperations;
@@ -168,6 +171,8 @@ public interface MongoOperations extends FluentMongoOperations {
168171
@Nullable
169172
<T> T execute(String collectionName, CollectionCallback<T> action);
170173

174+
BulkOperationResult bulkWrite(Order order, Bulk bulk);
175+
171176
/**
172177
* Obtain a {@link ClientSession session} bound instance of {@link SessionScoped} binding a new {@link ClientSession}
173178
* with given {@literal sessionOptions} to each and every command issued against MongoDB.

0 commit comments

Comments
 (0)