Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .mvn/wrapper/maven-wrapper.jar
Binary file not shown.
19 changes: 2 additions & 17 deletions .mvn/wrapper/maven-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,18 +1,3 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
wrapperVersion=3.3.4
distributionType=only-script
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.7/apache-maven-3.8.7-bin.zip
wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.1/maven-wrapper-3.1.1.jar
4 changes: 4 additions & 0 deletions mutiny-zero-flow-adapters/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>
<dependency>
<groupId>org.jspecify</groupId>
<artifactId>jspecify</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
Expand Down
2 changes: 2 additions & 0 deletions mutiny-zero-flow-adapters/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
@org.jspecify.annotations.NullMarked
module io.smallrye.mutiny.zero.flow.adapters {
requires transitive org.reactivestreams;
requires transitive org.jspecify;
exports mutiny.zero.flow.adapters;
}
4 changes: 4 additions & 0 deletions mutiny-zero-vertx-publishers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>org.jspecify</groupId>
<artifactId>jspecify</artifactId>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
Expand Down
6 changes: 4 additions & 2 deletions mutiny-zero-vertx-publishers/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
@org.jspecify.annotations.NullMarked
module io.smallrye.mutiny.zero.vertxpublishers {
exports mutiny.zero.vertxpublishers;
requires io.smallrye.mutiny.zero;
requires io.vertx.core;
requires transitive io.smallrye.mutiny.zero;
requires transitive org.jspecify;
requires transitive io.vertx.core;
}
5 changes: 5 additions & 0 deletions mutiny-zero/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
<description>Mutiny Zero is a minimal API for creating reactive-streams compliant publishers</description>

<dependencies>
<dependency>
<groupId>org.jspecify</groupId>
<artifactId>jspecify</artifactId>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny</artifactId>
Expand Down
2 changes: 2 additions & 0 deletions mutiny-zero/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
@org.jspecify.annotations.NullMarked
module io.smallrye.mutiny.zero {
requires transitive org.jspecify;
exports mutiny.zero;
exports mutiny.zero.operators;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.jspecify.annotations.Nullable;

public class CompletionStagePublisher<T> implements Publisher<T> {

private final Supplier<CompletionStage<T>> completionStageSupplier;
Expand All @@ -29,7 +31,7 @@ private static class CompletionStageSubscription<T> implements Flow.Subscription
private final Supplier<CompletionStage<T>> completionStageSupplier;
private final Subscriber<? super T> subscriber;
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
private CompletableFuture<T> completableFuture;
private @Nullable CompletableFuture<T> completableFuture;

enum State {
INIT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicBoolean;

import org.jspecify.annotations.Nullable;

public class PublisherToCompletionStageSubscriber<T> implements Subscriber<T> {

private final CompletableFuture<Optional<T>> future;
private final AtomicBoolean completed = new AtomicBoolean();
private Flow.Subscription subscription;
private Flow.@Nullable Subscription subscription;

public PublisherToCompletionStageSubscriber(CompletableFuture<Optional<T>> future) {
this.future = future;
Expand All @@ -25,6 +27,7 @@ public void onSubscribe(Flow.Subscription subscription) {
@Override
public void onNext(T value) {
if (completed.compareAndSet(false, true)) {
assert subscription != null;
subscription.cancel();
future.complete(Optional.of(value));
}
Expand All @@ -33,6 +36,7 @@ public void onNext(T value) {
@Override
public void onError(Throwable throwable) {
if (completed.compareAndSet(false, true)) {
assert subscription != null;
subscription.cancel();
future.completeExceptionally(throwable);
}
Expand Down
4 changes: 3 additions & 1 deletion mutiny-zero/src/main/java/mutiny/zero/internal/TubeBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;

import org.jspecify.annotations.Nullable;

import mutiny.zero.Tube;

public abstract class TubeBase<T> implements Tube<T>, Subscription {
Expand All @@ -21,7 +23,7 @@ public abstract class TubeBase<T> implements Tube<T>, Subscription {
protected final AtomicLong requested = new AtomicLong();
protected final ConcurrentLinkedQueue<T> dispatchQueue = new ConcurrentLinkedQueue<>();

protected volatile Throwable failure;
protected volatile @Nullable Throwable failure;
protected volatile boolean completed = false;

protected Runnable terminationAction = () -> {
Expand Down
12 changes: 10 additions & 2 deletions mutiny-zero/src/main/java/mutiny/zero/operators/Concatenate.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.jspecify.annotations.Nullable;

import mutiny.zero.internal.Helper;

/**
Expand Down Expand Up @@ -44,8 +46,8 @@ public void subscribe(Flow.Subscriber<? super T> subscriber) {
}

private class Processor implements Flow.Processor<T, T>, Flow.Subscription {
private Flow.Subscriber<? super T> downstream;
private Flow.Subscription upstreamSubscription;
private Flow.@Nullable Subscriber<? super T> downstream;
private Flow.@Nullable Subscription upstreamSubscription;

private final AtomicBoolean cancelled = new AtomicBoolean();
private final AtomicLong demand = new AtomicLong();
Expand All @@ -64,6 +66,7 @@ private void subscribeNext() {
Flow.Publisher<T> publisher = publisherIterator.next();
publisher.subscribe(this);
} else {
assert downstream != null;
downstream.onComplete();
}
}
Expand All @@ -81,6 +84,7 @@ public void onSubscribe(Flow.Subscription subscription) {
}
} else {
downstreamIsReady = true;
assert downstream != null;
downstream.onSubscribe(this);
}
}
Expand All @@ -91,6 +95,7 @@ public void onNext(T item) {
if (!unboundedDemand) {
demand.decrementAndGet();
}
assert downstream != null;
downstream.onNext(item);
}
}
Expand All @@ -99,6 +104,7 @@ public void onNext(T item) {
public void onError(Throwable throwable) {
if (!cancelled.get()) {
cancel();
assert downstream != null;
downstream.onError(throwable);
}
}
Expand All @@ -122,13 +128,15 @@ public void request(long n) {
if (n == Long.MAX_VALUE) {
unboundedDemand = true;
}
assert upstreamSubscription != null;
upstreamSubscription.request(n);
}
}

@Override
public void cancel() {
if (cancelled.compareAndSet(false, true)) {
assert upstreamSubscription != null;
upstreamSubscription.cancel();
upstreamSubscription = null;
}
Expand Down
13 changes: 11 additions & 2 deletions mutiny-zero/src/main/java/mutiny/zero/operators/ProcessorBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;

import org.jspecify.annotations.Nullable;

abstract class ProcessorBase<I, O> implements Flow.Processor<I, O>, Flow.Subscription {

private Flow.Subscriber<? super O> downstream;
private Flow.Subscription upstreamSubscription;
private Flow.@Nullable Subscriber<? super O> downstream;
private Flow.@Nullable Subscription upstreamSubscription;

private final AtomicBoolean cancelled = new AtomicBoolean();

Expand All @@ -15,10 +17,12 @@ protected boolean cancelled() {
}

protected Flow.Subscription upstreamSubscription() {
assert upstreamSubscription != null;
return upstreamSubscription;
}

protected Flow.Subscriber<? super O> downstream() {
assert downstream != null;
return downstream;
}

Expand All @@ -34,20 +38,23 @@ public void subscribe(Flow.Subscriber<? super O> subscriber) {
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.upstreamSubscription = subscription;
assert downstream != null;
downstream.onSubscribe(this);
}

@Override
public void onError(Throwable throwable) {
if (!cancelled()) {
cancel();
assert downstream != null;
downstream.onError(throwable);
}
}

@Override
public void onComplete() {
if (!cancelled()) {
assert downstream != null;
downstream.onComplete();
}
}
Expand All @@ -57,13 +64,15 @@ public void onComplete() {
@Override
public void request(long n) {
if (!cancelled()) {
assert upstreamSubscription != null;
upstreamSubscription.request(n);
}
}

@Override
public void cancel() {
if (cancelled.compareAndSet(false, true)) {
assert upstreamSubscription != null;
upstreamSubscription.cancel();
upstreamSubscription = null;
}
Expand Down
8 changes: 5 additions & 3 deletions mutiny-zero/src/main/java/mutiny/zero/operators/Recover.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.concurrent.Flow;
import java.util.function.Function;

import org.jspecify.annotations.Nullable;

/**
* A {@link java.util.concurrent.Flow.Publisher} that recovers from failure using a {@link Function}.
* <p>
Expand All @@ -22,15 +24,15 @@
public class Recover<T> implements Flow.Publisher<T> {

private final Flow.Publisher<T> upstream;
private final Function<Throwable, T> function;
private final Function<Throwable, @Nullable T> function;

/**
* Build a new recovery publisher.
*
* @param upstream the upstream publisher
* @param function the recovery function, must not return {@code null} values
* @param function the recovery function where returning {@code null} implies completion
*/
public Recover(Flow.Publisher<T> upstream, Function<Throwable, T> function) {
public Recover(Flow.Publisher<T> upstream, Function<Throwable, @Nullable T> function) {
this.upstream = requireNonNull(upstream, "The upstream cannot be null");
this.function = requireNonNull(function, "The function cannot be null");
}
Expand Down
Loading