Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,23 @@

@Singleton
@AutoBind(interfaces = JobQueueBackend.class)
@SuppressWarnings("PMD.TooManyMethods")
public class JobQueueBackendRedis extends AbstractJobQueueBackend<String>
implements JobQueueBackend {

private static final Logger LOGGER = LoggerFactory.getLogger(JobQueueBackendRedis.class);
private static final List<Integer> INITIAL_LEVELS =
IntStream.range(0, 24).map(i -> -1).boxed().toList();

// Redis key constants
private static final String REDIS_KEY_PRIORITIES = "xtraplatform:jobs:priorities:";
private static final String REDIS_KEY_QUEUE = "xtraplatform:jobs:queue:";
private static final String REDIS_KEY_JOB = "xtraplatform:jobs:job:";
private static final String REDIS_KEY_SET = "xtraplatform:jobs:set:";
private static final String REDIS_KEY_TAKEN = "xtraplatform:jobs:taken";
private static final String REDIS_KEY_FAILED = "xtraplatform:jobs:failed";
private static final String REDIS_KEY_NOTIFICATIONS = "xtraplatform:jobs:notifications";

private final boolean enabled;
private final Redis redis;
private final ObjectMapper mapper;
Expand All @@ -64,7 +74,8 @@ public class JobQueueBackendRedis extends AbstractJobQueueBackend<String>
AppContext appContext, Jackson jackson, VolatileRegistry volatileRegistry, Redis redis) {
super(volatileRegistry);

// TODO: housekeeping might check taken list using RPOPLPUSH with same source and destination
// NOPMD - TODO: housekeeping might check taken list using RPOPLPUSH with same source and
// destination
// this way it can check for timeouts, then use a transaction with LREM, LPUSH and HMSET to
// retry

Expand Down Expand Up @@ -96,56 +107,49 @@ public void setJobTypes(Function<String, Optional<? extends Class<?>>> jobTypesM

@Override
protected String createQueue(String type, int priority) {
redis.cmd().zadd("xtraplatform:jobs:priorities:" + type, priority, String.valueOf(priority));
redis.cmd().zadd(REDIS_KEY_PRIORITIES + type, priority, String.valueOf(priority));

return "xtraplatform:jobs:queue:" + type + ":" + priority;
return REDIS_KEY_QUEUE + type + ":" + priority;
}

@Override
protected Set<String> getTypes() {
return redis.cmd().keys("xtraplatform:jobs:priorities:*").stream()
.map(key -> key.substring("xtraplatform:jobs:priorities:".length()))
return redis.cmd().keys(REDIS_KEY_PRIORITIES + "*").stream()
.map(key -> key.substring(REDIS_KEY_PRIORITIES.length()))
.collect(LinkedHashSet::new, LinkedHashSet::add, LinkedHashSet::addAll);
}

@Override
protected Set<Integer> getPriorities(String type) {
List<String> priorities = redis.cmd().zrevrange("xtraplatform:jobs:priorities:" + type, 0, -1);
List<String> priorities = redis.cmd().zrevrange(REDIS_KEY_PRIORITIES + type, 0, -1);

return new LinkedHashSet<>(priorities.stream().map(Integer::parseInt).toList());
}

@Override
protected void updateJob(Job job) {
try {
redis.json().jsonSet("xtraplatform:jobs:job:" + job.getId(), mapper.writeValueAsString(job));
redis.json().jsonSet(REDIS_KEY_JOB + job.getId(), mapper.writeValueAsString(job));
} catch (Throwable e) {
throw new RuntimeException(e);
throw new IllegalStateException("Failed to serialize job to JSON: " + job.getId(), e);
}
}

@Override
public void updateJob(Job job, int progressDelta) {
redis
.json()
.jsonNumIncrBy(
"xtraplatform:jobs:job:" + job.getId(), Path2.of("$.current"), progressDelta);
redis.json().jsonNumIncrBy(REDIS_KEY_JOB + job.getId(), Path2.of("$.current"), progressDelta);
redis
.json()
.jsonSet(
"xtraplatform:jobs:job:" + job.getId(),
Path2.of("$.updatedAt"),
Instant.now().getEpochSecond());
REDIS_KEY_JOB + job.getId(), Path2.of("$.updatedAt"), Instant.now().getEpochSecond());
}

@Override
protected void updateJobSet(JobSet jobSet) {
try {
redis
.json()
.jsonSet("xtraplatform:jobs:set:" + jobSet.getId(), mapper.writeValueAsString(jobSet));
redis.json().jsonSet(REDIS_KEY_SET + jobSet.getId(), mapper.writeValueAsString(jobSet));
} catch (Throwable e) {
throw new RuntimeException(e);
throw new IllegalStateException("Failed to serialize job set to JSON: " + jobSet.getId(), e);
}
}

Expand All @@ -154,7 +158,7 @@ public void startJobSet(JobSet jobSet) {
redis
.json()
.jsonSet(
"xtraplatform:jobs:set:" + jobSet.getId(),
REDIS_KEY_SET + jobSet.getId(),
Path2.of("$.startedAt"),
Instant.now().getEpochSecond());
}
Expand Down Expand Up @@ -184,8 +188,7 @@ public void updateJobSet(JobSet jobSet, int progressDelta, Map<String, Object> d

@Override
protected Optional<JobSet> getJobSet(String setId) {
String jobSetJson =
redis.json().jsonGetAsPlainString("xtraplatform:jobs:set:" + setId, Path.ROOT_PATH);
String jobSetJson = redis.json().jsonGetAsPlainString(REDIS_KEY_SET + setId, Path.ROOT_PATH);

if (Objects.isNull(jobSetJson)) {
return Optional.empty();
Expand All @@ -196,7 +199,7 @@ protected Optional<JobSet> getJobSet(String setId) {

return Optional.ofNullable(job);
} catch (Throwable e) {
throw new RuntimeException(e);
throw new IllegalStateException("Failed to deserialize job set from JSON: " + setId, e);
}
}

Expand All @@ -206,8 +209,8 @@ protected void queueJob(Job job, boolean untake) {
updateJob(job);

if (untake) {
// TODO: use a transaction here
redis.cmd().lrem("xtraplatform:jobs:taken", 1, job.getId());
// NOPMD - TODO: use a transaction here
redis.cmd().lrem(REDIS_KEY_TAKEN, 1, job.getId());
redis.cmd().rpush(queue, job.getId());
} else {
redis.cmd().lpush(queue, job.getId());
Expand Down Expand Up @@ -241,7 +244,7 @@ protected Job failJob(Job job, String error) {

updateJob(failedJob);

redis.cmd().rpush("xtraplatform:jobs:failed", job.getId());
redis.cmd().rpush(REDIS_KEY_FAILED, job.getId());

return failedJob;
}
Expand All @@ -250,17 +253,15 @@ protected Job failJob(Job job, String error) {
protected Job doneJob(Job job) {
Job doneJob = job.done();

redis.json().jsonDel("xtraplatform:jobs:job:" + doneJob.getId());
redis.json().jsonDel(REDIS_KEY_JOB + doneJob.getId());

return doneJob;
}

@Override
protected Optional<Job> takeJob(String queue) {
String jobId =
redis
.cmd()
.lmove(queue, "xtraplatform:jobs:taken", ListDirection.RIGHT, ListDirection.LEFT);
redis.cmd().lmove(queue, REDIS_KEY_TAKEN, ListDirection.RIGHT, ListDirection.LEFT);

if (Objects.nonNull(jobId)) {
return getJob(jobId);
Expand All @@ -271,7 +272,7 @@ protected Optional<Job> takeJob(String queue) {

@Override
protected Optional<Job> untakeJob(String jobId) {
long count = redis.cmd().lrem("xtraplatform:jobs:taken", 1, jobId);
long count = redis.cmd().lrem(REDIS_KEY_TAKEN, 1, jobId);

if (count > 0) {
return getJob(jobId);
Expand All @@ -284,7 +285,7 @@ protected Optional<Job> untakeJob(String jobId) {
protected List<? extends BaseJob> onJobFinished(Job job, JobSet jobSet) {
List<? extends BaseJob> followUps = jobSet.done(job);

redis.json().jsonDel("xtraplatform:jobs:job:" + job.getId());
redis.json().jsonDel(REDIS_KEY_JOB + job.getId());

return followUps;
}
Expand All @@ -305,33 +306,33 @@ protected List<Job> getJobsInQueue(String queue) {

@Override
protected void notifyObservers(String type) {
redis.pubsub().publish("xtraplatform:jobs:notifications", type);
redis.pubsub().publish(REDIS_KEY_NOTIFICATIONS, type);
}

@Override
public void onPush(Consumer<String> callback) {
redis.pubsub().subscribe("xtraplatform:jobs:notifications", callback);
redis.pubsub().subscribe(REDIS_KEY_NOTIFICATIONS, callback);
}

@Override
public boolean doneSet(String jobSetId) {
long count = redis.json().jsonDel("xtraplatform:jobs:set:" + jobSetId);
long count = redis.json().jsonDel(REDIS_KEY_SET + jobSetId);

return count > 0;
}

@Override
public boolean error(String jobId, String error, boolean retry) {
// TODO: retry logic
// NOPMD - TODO: retry logic
return false;
}

@Override
public Collection<JobSet> getSets() {
Set<String> jobSetIds = redis.cmd().keys("xtraplatform:jobs:set:*");
Set<String> jobSetIds = redis.cmd().keys(REDIS_KEY_SET + "*");

return jobSetIds.stream()
.map(id -> id.substring("xtraplatform:jobs:set:".length()))
.map(id -> id.substring(REDIS_KEY_SET.length()))
.map(this::getJobSet)
.filter(Optional::isPresent)
.map(Optional::get)
Expand All @@ -340,18 +341,17 @@ public Collection<JobSet> getSets() {

@Override
protected List<String> getTakenIds() {
return redis.cmd().lrange("xtraplatform:jobs:taken", 0, -1);
return redis.cmd().lrange(REDIS_KEY_TAKEN, 0, -1);
}

@Override
protected List<String> getFailedIds() {
return redis.cmd().lrange("xtraplatform:jobs:failed", 0, -1);
return redis.cmd().lrange(REDIS_KEY_FAILED, 0, -1);
}

@Override
protected Optional<Job> getJob(String jobId) {
String jobJson =
redis.json().jsonGetAsPlainString("xtraplatform:jobs:job:" + jobId, Path.ROOT_PATH);
String jobJson = redis.json().jsonGetAsPlainString(REDIS_KEY_JOB + jobId, Path.ROOT_PATH);

if (Objects.isNull(jobJson)) {
return Optional.empty();
Expand All @@ -362,7 +362,7 @@ protected Optional<Job> getJob(String jobId) {

return Optional.ofNullable(job);
} catch (Throwable e) {
throw new RuntimeException(e);
throw new IllegalStateException("Failed to deserialize job from JSON: " + jobId, e);
}
}

Expand All @@ -382,7 +382,8 @@ private Object unpackDetails(Job job) {
}

} catch (IOException e) {
throw new RuntimeException(e);
throw new IllegalStateException(
"Failed to convert job details to target type: " + job.getType(), e);
}
}

Expand All @@ -405,7 +406,8 @@ private Object unpackSetDetails(JobSet jobSet) {
}

} catch (IOException e) {
throw new RuntimeException(e);
throw new IllegalStateException(
"Failed to convert job set details to target type: " + jobSet.getType(), e);
}
}

Expand All @@ -428,14 +430,10 @@ private void applyJsonPaths(String jobSetId, Map<String, Object> jsonPathUpdates
redis
.json()
.jsonNumIncrBy(
"xtraplatform:jobs:set:" + jobSetId,
Path2.of(entry.getKey()),
(Integer) entry.getValue());
REDIS_KEY_SET + jobSetId, Path2.of(entry.getKey()), (Integer) entry.getValue());
continue;
}
redis
.json()
.jsonSet("xtraplatform:jobs:set:" + jobSetId, Path2.of(entry.getKey()), entry.getValue());
redis.json().jsonSet(REDIS_KEY_SET + jobSetId, Path2.of(entry.getKey()), entry.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public void onStop() {
AppLifeCycle.super.onStop();
}

@SuppressWarnings("PMD.AvoidSynchronizedAtMethodLevel")
@Override
protected synchronized void onVolatileStart() {
super.onVolatileStart();
Expand Down Expand Up @@ -145,7 +146,7 @@ public Tuple<State, String> check() {
connect();

if (Objects.isNull(jedis)) {
// TODO: retry
// NOPMD - TODO: retry
if (Objects.nonNull(connectionError)) {
return Tuple.of(State.UNAVAILABLE, connectionError.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

@Singleton
@AutoBind
@SuppressWarnings({"PMD.GodClass", "PMD.TooManyMethods"})
public class ReactiveRx implements Reactive {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO: the queue was introduced as a mean to protect the connection pool and prevent deadlocks
// because of a bug (running.get() < queueSize instead of running.get() < capacity) it was never
Expand All @@ -35,8 +33,6 @@
// FeatureStreams
public class RunnerRx implements Runner {

private static final Logger LOGGER = LoggerFactory.getLogger(RunnerRx.class);

private final Scheduler scheduler;
private final String name;
private final int capacity;
Expand All @@ -45,11 +41,16 @@ public class RunnerRx implements Runner {
private final AtomicInteger running;

public RunnerRx(String name) {
this(name, Runner.DYNAMIC_CAPACITY, Runner.DYNAMIC_CAPACITY);
this(
getConfig(Runner.DYNAMIC_CAPACITY), name, Runner.DYNAMIC_CAPACITY, Runner.DYNAMIC_CAPACITY);
}

public RunnerRx(String name, int capacity, int queueSize) {
this(getConfig(name, capacity), name, capacity, queueSize);
this(getConfig(capacity), name, capacity, queueSize);
}

public RunnerRx(int capacity, int queueSize) {
this(getConfig(capacity), "default", capacity, queueSize);
}

RunnerRx(ExecutorService executorService, String name, int capacity, int queueSize) {
Expand All @@ -58,7 +59,6 @@ public RunnerRx(String name, int capacity, int queueSize) {
}

// TODO: thread names
getDispatcherName(name);
this.scheduler = Schedulers.from(executorService);
scheduler.start();

Expand Down Expand Up @@ -153,26 +153,19 @@ public int getActiveStreams() {
return running.get();
}

private static ExecutorService getConfig(String name, int capacity) {
return capacity == Runner.DYNAMIC_CAPACITY
? getDefaultConfig(name)
: getConfig(name, capacity, capacity);
private static ExecutorService getConfig(int capacity) {
return capacity == Runner.DYNAMIC_CAPACITY ? getDefaultConfig() : getConfig(capacity, capacity);
}

private static ExecutorService getDefaultConfig(String name) {
return getConfig(name, 8, 64);
private static ExecutorService getDefaultConfig() {
return getConfig(8, 64);
}

// TODO
private static ExecutorService getConfig(String name, int parallelismMin, int parallelismMax) {
private static ExecutorService getConfig(int parallelismMin, int parallelismMax) {

return Executors.newWorkStealingPool(Math.max(1, parallelismMax));
}

private static String getDispatcherName(String name) {
return String.format("stream.%s", name);
}

@Override
public void close() {
if (Objects.nonNull(scheduler)) {
Expand Down