Skip to content

Commit fb03ff3

Browse files
authored
Redis jobs (#426)
1 parent 1365f83 commit fb03ff3

File tree

12 files changed

+403
-76
lines changed

12 files changed

+403
-76
lines changed

gradle/layers.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[versions]
2-
xtraplatform-core = '6.6.0-SNAPSHOT'
2+
xtraplatform-core = '6.6.0-redis-jobs-SNAPSHOT'
33
xtraplatform-native = '2.5.0'
44

xtraplatform-tiles/src/main/java/de/ii/xtraplatform/tiles/app/TileSeedingJobCreator.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import de.ii.xtraplatform.entities.domain.EntityRegistry;
1414
import de.ii.xtraplatform.jobs.domain.Job;
1515
import de.ii.xtraplatform.jobs.domain.JobProcessor;
16+
import de.ii.xtraplatform.jobs.domain.JobQueueMin;
1617
import de.ii.xtraplatform.jobs.domain.JobResult;
1718
import de.ii.xtraplatform.jobs.domain.JobSet;
1819
import de.ii.xtraplatform.tiles.domain.TileGenerationParameters;
@@ -25,14 +26,14 @@
2526
import java.io.IOException;
2627
import java.time.Duration;
2728
import java.time.Instant;
29+
import java.util.HashSet;
2830
import java.util.LinkedHashSet;
2931
import java.util.List;
3032
import java.util.Locale;
3133
import java.util.Map;
3234
import java.util.Map.Entry;
3335
import java.util.Optional;
3436
import java.util.Set;
35-
import java.util.function.Consumer;
3637
import java.util.stream.Collectors;
3738
import javax.inject.Inject;
3839
import javax.inject.Singleton;
@@ -51,7 +52,7 @@ public class TileSeedingJobCreator implements JobProcessor<Boolean, TileSeedingJ
5152

5253
@Inject
5354
TileSeedingJobCreator(AppContext appContext, EntityRegistry entityRegistry) {
54-
this.concurrency = appContext.getConfiguration().getBackgroundTasks().getMaxThreads();
55+
this.concurrency = appContext.getConfiguration().getJobConcurrency();
5556
this.entityRegistry = entityRegistry;
5657
}
5758

@@ -72,9 +73,9 @@ public int getConcurrency(JobSet jobSet) {
7273
}
7374

7475
@Override
75-
public JobResult process(Job job, JobSet jobSet, Consumer<Job> pushJob) {
76-
TileSeedingJobSet seedingJobSet = getSetDetails(jobSet);
77-
boolean isCleanup = getDetails(job);
76+
public JobResult process(Job job, JobSet jobSet, JobQueueMin jobQueue) {
77+
TileSeedingJobSet seedingJobSet = getSetDetails(jobSet, jobQueue);
78+
boolean isCleanup = getDetails(job, jobQueue);
7879

7980
Optional<TileProvider> optionalTileProvider = getTileProvider(seedingJobSet.getTileProvider());
8081
if (optionalTileProvider.isPresent()) {
@@ -171,6 +172,8 @@ public JobResult process(Job job, JobSet jobSet, Consumer<Job> pushJob) {
171172

172173
boolean allRaster = true;
173174
boolean someRaster = false;
175+
Set<String> tilesets = new HashSet<>();
176+
Set<Integer> levels = new HashSet<>();
174177

175178
for (String tileSet : seedingJobSet.getTileSets().keySet()) {
176179
Map<String, Set<TileMatrixSetLimits>> tileMatrixSets =
@@ -219,16 +222,30 @@ public JobResult process(Job job, JobSet jobSet, Consumer<Job> pushJob) {
219222
Optional.of(seedingJobSet.getTileSetParameters().get(tileSet)),
220223
jobSet.getId());
221224

222-
pushJob.accept(job2);
223-
jobSet.init(job2.getTotal().get());
224-
seedingJobSet.init(
225-
tileSet, tileMatrixSet, subMatrix.getLevel(), job2.getTotal().get());
225+
jobQueue.initJobSet(
226+
jobSet,
227+
job2.getTotal().get(),
228+
Map.<String, Object>of(
229+
"tileSet",
230+
tileSet,
231+
"tileMatrixSet",
232+
tileMatrixSet,
233+
"level",
234+
subMatrix.getLevel(),
235+
"count",
236+
job2.getTotal().get(),
237+
"isFirstTileset",
238+
tilesets.add(tileSet),
239+
"isFirstLevel",
240+
levels.add(subMatrix.getLevel())));
241+
242+
jobQueue.push(job2);
226243
}
227244
});
228245
}
229246

230247
if (jobSet.isDone()) {
231-
jobSet.getCleanup().ifPresent(pushJob);
248+
jobSet.getCleanup().ifPresent(jobQueue::push);
232249
return JobResult.success(); // early return
233250
}
234251

@@ -264,6 +281,15 @@ public Class<TileSeedingJobSet> getSetDetailsType() {
264281
return TileSeedingJobSet.class;
265282
}
266283

284+
@Override
285+
public Map<String, Class<?>> getJobTypes() {
286+
return Map.of(
287+
TileSeedingJobSet.TYPE,
288+
TileSeedingJobSet.class,
289+
TileSeedingJobSet.TYPE_SETUP,
290+
Boolean.class);
291+
}
292+
267293
private Optional<TileProvider> getTileProvider(String id) {
268294
return entityRegistry.getEntity(TileProvider.class, id);
269295
}

xtraplatform-tiles/src/main/java/de/ii/xtraplatform/tiles/app/VectorSeedingJobProcessor.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414
import de.ii.xtraplatform.entities.domain.EntityRegistry;
1515
import de.ii.xtraplatform.jobs.domain.Job;
1616
import de.ii.xtraplatform.jobs.domain.JobProcessor;
17+
import de.ii.xtraplatform.jobs.domain.JobQueueMin;
1718
import de.ii.xtraplatform.jobs.domain.JobResult;
1819
import de.ii.xtraplatform.jobs.domain.JobSet;
1920
import de.ii.xtraplatform.tiles.domain.TileProvider;
2021
import de.ii.xtraplatform.tiles.domain.TileSeedingJob;
2122
import de.ii.xtraplatform.tiles.domain.TileSeedingJobSet;
2223
import java.io.IOException;
24+
import java.util.Map;
2325
import java.util.Optional;
2426
import java.util.concurrent.atomic.AtomicInteger;
2527
import java.util.function.Consumer;
@@ -39,7 +41,7 @@ public class VectorSeedingJobProcessor implements JobProcessor<TileSeedingJob, T
3941

4042
@Inject
4143
VectorSeedingJobProcessor(AppContext appContext, EntityRegistry entityRegistry) {
42-
this.concurrency = appContext.getConfiguration().getBackgroundTasks().getMaxThreads();
44+
this.concurrency = appContext.getConfiguration().getJobConcurrency();
4345
this.entityRegistry = entityRegistry;
4446
}
4547

@@ -59,9 +61,9 @@ public int getConcurrency(JobSet jobSet) {
5961
}
6062

6163
@Override
62-
public JobResult process(Job job, JobSet jobSet, Consumer<Job> pushJob) {
63-
TileSeedingJob seedingJob = getDetails(job);
64-
TileSeedingJobSet seedingJobSet = getSetDetails(jobSet);
64+
public JobResult process(Job job, JobSet jobSet, JobQueueMin jobQueue) {
65+
TileSeedingJob seedingJob = getDetails(job, jobQueue);
66+
TileSeedingJobSet seedingJobSet = getSetDetails(jobSet, jobQueue);
6567

6668
Optional<TileProvider> optionalTileProvider = getTileProvider(seedingJob.getTileProvider());
6769
if (optionalTileProvider.isPresent()) {
@@ -91,7 +93,7 @@ public JobResult process(Job job, JobSet jobSet, Consumer<Job> pushJob) {
9193
tileProvider.getId(),
9294
job.getId());
9395
}
94-
pushJob.accept(job);
96+
jobQueue.push(job);
9597
}
9698
},
9799
true);
@@ -102,14 +104,15 @@ public JobResult process(Job job, JobSet jobSet, Consumer<Job> pushJob) {
102104
Consumer<Integer> updateProgress =
103105
(current) -> {
104106
int delta = current - last.getAndSet(current);
105-
106-
job.update(delta);
107-
jobSet.update(delta);
108-
seedingJobSet.update(
109-
seedingJob.getTileSet(),
110-
seedingJob.getTileMatrixSet(),
111-
seedingJob.getSubMatrices().get(0).getLevel(),
112-
delta);
107+
Map<String, Object> detailParameters =
108+
Map.of(
109+
"tileSet", seedingJob.getTileSet(),
110+
"tileMatrixSet", seedingJob.getTileMatrixSet(),
111+
"level", seedingJob.getSubMatrices().get(0).getLevel(),
112+
"delta", delta);
113+
114+
jobQueue.updateJob(job, delta);
115+
jobQueue.updateJobSet(jobSet, delta, detailParameters);
113116
};
114117

115118
try {
@@ -135,6 +138,15 @@ public Class<TileSeedingJobSet> getSetDetailsType() {
135138
return TileSeedingJobSet.class;
136139
}
137140

141+
@Override
142+
public Map<String, Class<?>> getJobTypes() {
143+
return Map.of(
144+
TileSeedingJob.TYPE_MVT,
145+
TileSeedingJob.class,
146+
TileSeedingJob.TYPE_PNG,
147+
TileSeedingJob.class);
148+
}
149+
138150
private Optional<TileProvider> getTileProvider(String id) {
139151
return entityRegistry.getEntity(TileProvider.class, id);
140152
}

xtraplatform-tiles/src/main/java/de/ii/xtraplatform/tiles/domain/TileGenerationParameters.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
package de.ii.xtraplatform.tiles.domain;
99

1010
import com.fasterxml.jackson.annotation.JsonIgnore;
11+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
1112
import de.ii.xtraplatform.crs.domain.BoundingBox;
1213
import de.ii.xtraplatform.features.domain.transform.PropertyTransformations;
1314
import java.util.Map;
1415
import java.util.Optional;
1516
import org.immutables.value.Value;
1617

1718
@Value.Immutable
19+
@JsonDeserialize(builder = ImmutableTileGenerationParameters.Builder.class)
1820
public interface TileGenerationParameters extends GenerationParameters {
1921

2022
@Override

xtraplatform-tiles/src/main/java/de/ii/xtraplatform/tiles/domain/TileSeedingJob.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
package de.ii.xtraplatform.tiles.domain;
99

1010
import com.fasterxml.jackson.annotation.JsonIgnore;
11+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
1112
import de.ii.xtraplatform.jobs.domain.Job;
13+
import de.ii.xtraplatform.jobs.domain.Job.JobDetails;
1214
import de.ii.xtraplatform.tiles.app.FeatureEncoderMVT;
1315
import de.ii.xtraplatform.tiles.domain.ImmutableTileSeedingJob.Builder;
1416
import java.util.List;
@@ -19,7 +21,8 @@
1921
import org.immutables.value.Value;
2022

2123
@Value.Immutable
22-
public interface TileSeedingJob {
24+
@JsonDeserialize(builder = ImmutableTileSeedingJob.Builder.class)
25+
public interface TileSeedingJob extends JobDetails {
2326

2427
String TYPE_MVT = TileSeedingJobSet.type("vector", "mvt");
2528
String TYPE_PNG = TileSeedingJobSet.type("raster", "png");

0 commit comments

Comments
 (0)