diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 477c2143..263269e6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ jobs: uses: actions/setup-java@v4 with: distribution: "temurin" - java-version: "8" + java-version: "11" cache: "maven" - name: Build and test run: mvn -B -V test diff --git a/actors/core/src/main/java/cloud/orbit/actors/runtime/NodeInfo.java b/actors/core/src/main/java/cloud/orbit/actors/runtime/NodeInfo.java index 9ec3d185..14fa3c52 100644 --- a/actors/core/src/main/java/cloud/orbit/actors/runtime/NodeInfo.java +++ b/actors/core/src/main/java/cloud/orbit/actors/runtime/NodeInfo.java @@ -15,7 +15,7 @@ public class NodeInfo NodeCapabilities nodeCapabilities; boolean cannotHostActors; final ConcurrentHashMap canActivate = new ConcurrentHashMap<>(); - final ConcurrentHashMap> canActivate2 = new ConcurrentHashMap<>(); + final ConcurrentHashMap> canActivateWaiters = new ConcurrentHashMap<>(); final Set canActivatePending = ConcurrentHashMap.newKeySet(); String nodeName; diff --git a/actors/runtime/src/main/java/cloud/orbit/actors/runtime/Hosting.java b/actors/runtime/src/main/java/cloud/orbit/actors/runtime/Hosting.java index cf79fb0a..84fbeeee 100644 --- a/actors/runtime/src/main/java/cloud/orbit/actors/runtime/Hosting.java +++ b/actors/runtime/src/main/java/cloud/orbit/actors/runtime/Hosting.java @@ -52,6 +52,7 @@ import java.lang.reflect.Method; import java.security.MessageDigest; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -144,6 +145,17 @@ public List getServerNodes() public Task notifyStateChange() { + final NodeInfo localNode = activeNodes.get(clusterPeer.localAddress()); + if (localNode != null) + { + localNode.state = stage.getState(); + if (localNode.state != NodeState.RUNNING) + { + localNode.canActivate.clear(); + localNode.canActivatePending.clear(); + localNode.canActivateWaiters.clear(); + } + } return Task.allOf(activeNodes.values().stream() .filter(nodeInfo -> !nodeInfo.address.equals(clusterPeer.localAddress()) && nodeInfo.state == NodeState.RUNNING) .map(info -> info.nodeCapabilities.nodeModeChanged(clusterPeer.localAddress(), stage.getState()).exceptionally(throwable -> null))); @@ -172,10 +184,22 @@ public String getNodeName(final NodeAddress nodeAddress) @Override public Task canActivate(String interfaceName) { - if (nodeType == NodeTypeEnum.CLIENT || stage.getState() == NodeState.STOPPED) + if (nodeType == NodeTypeEnum.CLIENT) + { + return Task.fromValue(actorSupported_noneSupported); + } + final NodeState state = stage.getState(); + if (state == NodeState.STOPPED) { return Task.fromValue(actorSupported_noneSupported); } + if (state == NodeState.STOPPING) + { + if (hasOtherRunningServerNodes()) + { + return Task.fromValue(actorSupported_noneSupported); + } + } return Task.fromValue(stage.canActivateActor(interfaceName) ? actorSupported_yes : actorSupported_no); } @@ -195,9 +219,12 @@ public Task nodeModeChanged(final NodeAddress nodeAddress, final NodeState { // clear list of actors this node can activate node.canActivate.clear(); + node.canActivatePending.clear(); + node.canActivateWaiters.clear(); localAddressCache.asMap().values().removeAll(Collections.singleton(node.address)); } } + updateServerNodes(); return Task.done(); } @@ -208,10 +235,7 @@ public Task moved(RemoteReference remoteReference, NodeAddress oldAddress, { logger.debug("Move {} to from {} to {}.", remoteReference, oldAddress, newAddress); } - if (activeNodes.containsKey(newAddress)) - { - localAddressCache.put(remoteReference, newAddress); - } + localAddressCache.put(remoteReference, newAddress); return Task.done(); } @@ -240,7 +264,10 @@ public Task start() private void onClusterViewChanged(final Collection nodes) { - logger.info("Cluster view changed " + nodes + " on " + Thread.currentThread().getName()); + if (logger.isDebugEnabled()) + { + logger.debug("Cluster view changed " + nodes); + } final HashMap oldNodes = new HashMap<>(activeNodes); final HashMap newNodes = new HashMap<>(nodes.size()); @@ -252,10 +279,9 @@ private void onClusterViewChanged(final Collection nodes) NodeInfo nodeInfo = oldNodes.remove(a); if (nodeInfo == null) { - final NodeInfo newNodeInfo = new NodeInfo(a); - newNodeInfo.nodeCapabilities = stage.getRemoteObserverReference(a, NodeCapabilities.class, ""); - newNodeInfo.active = true; - nodeInfo = newNodeInfo; + nodeInfo = new NodeInfo(a); + nodeInfo.nodeCapabilities = stage.getRemoteObserverReference(a, NodeCapabilities.class, ""); + nodeInfo.active = true; } newNodes.put(a, nodeInfo); @@ -274,9 +300,7 @@ private void onClusterViewChanged(final Collection nodes) activeNodes = newNodes; consistentHashNodeTree = newHashes; - - this.serverNodes = activeNodes.values().stream().filter( - nodeInfo -> nodeInfo.active && !nodeInfo.cannotHostActors).collect(Collectors.toList()); + updateServerNodes(); stage.serverNodesUpdated(); @@ -294,7 +318,21 @@ public Task locateActor(final RemoteReference reference, final bool { // don't need to call the node call the node to check. // checks should be explicit. - return activeNodes.containsKey(address) ? Task.fromValue(address) : Task.fromValue(null); + if (activeNodes.containsKey(address)) + { + if (address.equals(clusterPeer.localAddress()) && stage.getState() != NodeState.RUNNING) + { + // node is draining, avoid routing new work to itself + } + else + { + return Task.fromValue(address); + } + } + else + { + return Task.fromValue(null); + } } return (forceActivation) ? locateAndActivateActor(reference) : locateActiveActor(reference); } @@ -305,7 +343,14 @@ private Task locateActiveActor(final RemoteReference actorRefere final NodeAddress address = localAddressCache.getIfPresent(actorReference); if (address != null && activeNodes.containsKey(address)) { - return Task.fromValue(address); + if (address.equals(clusterPeer.localAddress()) && stage.getState() != NodeState.RUNNING) + { + // node is draining, ignore local cache to encourage relocation + } + else + { + return Task.fromValue(address); + } } // try to locate the actor in the distributed directory // this can be expensive, less that activating the actor, though @@ -362,7 +407,14 @@ private Task locateAndActivateActor(final RemoteReference actorR // Is this actor already activated and in the local cache? If so, we're done if (address != null && activeNodes.containsKey(address)) { - return Task.fromValue(address); + if (address.equals(clusterPeer.localAddress()) && stage.getState() != NodeState.RUNNING) + { + // node is draining, ignore local cache to encourage relocation + } + else + { + return Task.fromValue(address); + } } // Get the distributed cache if needed @@ -443,6 +495,13 @@ private RemoteKey createRemoteKey(final RemoteReference actorReference) private Task selectNode(final String interfaceClassName) { final List currentServerNodes = serverNodes; + if (currentServerNodes.isEmpty() + && nodeType == NodeTypeEnum.SERVER + && stage.getState() == NodeState.RUNNING + && stage.canActivateActor(interfaceClassName)) + { + return Task.fromValue(clusterPeer.localAddress()); + } final List suitableNodes = currentServerNodes.stream() .filter(n -> (!n.cannotHostActors && n.state == NodeState.RUNNING) @@ -463,7 +522,7 @@ private Task selectNode(final String interfaceClassName) for (NodeInfo potentialNode : potentialNodes) { - potentialNode.canActivate2.putIfAbsent(interfaceClassName, new Task<>()); + potentialNode.canActivateWaiters.putIfAbsent(interfaceClassName, new Task<>()); // loop over the potential nodes, add the interface to a concurrent hashset, if add returns true there is no activation pending if (potentialNode.canActivatePending.add(interfaceClassName)) { @@ -483,13 +542,19 @@ private Task selectNode(final String interfaceClassName) potentialNode.cannotHostActors = true; // jic potentialNode.canActivate.put(interfaceClassName, actorSupported_no); + updateServerNodes(); } else { // found suitable host potentialNode.canActivate.put(interfaceClassName, result); } - potentialNode.canActivate2.get(interfaceClassName).complete(null); + } + potentialNode.canActivatePending.remove(interfaceClassName); + Task pending = potentialNode.canActivateWaiters.get(interfaceClassName); + if (pending != null) + { + pending.complete(null); } return null; }); @@ -497,7 +562,7 @@ private Task selectNode(final String interfaceClassName) } return Task.allOf(potentialNodes.stream().map(nodeInfo -> - nodeInfo.canActivate2.get(interfaceClassName) + nodeInfo.canActivateWaiters.get(interfaceClassName) .failAfter(timeToWaitForServersMillis, TimeUnit.MILLISECONDS)) .collect(Collectors.toList())).handle((aVoid, throwable) -> { final List suitableNodes2 = potentialNodes.stream() @@ -521,7 +586,8 @@ private boolean shouldPlaceLocally(final Class interfaceClass) { final String interfaceClassName = interfaceClass.getName(); - if (interfaceClass.isAnnotationPresent(PreferLocalPlacement.class) && + if (stage.getState() == NodeState.RUNNING && + interfaceClass.isAnnotationPresent(PreferLocalPlacement.class) && nodeType == NodeTypeEnum.SERVER && stage.canActivateActor(interfaceClassName)) { final int percentile = interfaceClass.getAnnotation(PreferLocalPlacement.class).percentile(); @@ -549,6 +615,20 @@ private String getHash(final String key) } } + private void updateServerNodes() + { + this.serverNodes = activeNodes.values().stream() + .filter(nodeInfo -> nodeInfo.active && nodeInfo.state == NodeState.RUNNING && !nodeInfo.cannotHostActors) + .collect(Collectors.toList()); + } + + private boolean hasOtherRunningServerNodes() + { + final List currentServerNodes = serverNodes; + return currentServerNodes.stream() + .anyMatch(nodeInfo -> !nodeInfo.address.equals(clusterPeer.localAddress())); + } + /** * Uses consistent hashing to determine the "owner" of a certain key. * @@ -611,9 +691,19 @@ else if (msg instanceof HostingInit) entry.getKey().equals(hostingInit.getNodeAddress())) { entry.getValue().nodeName = hostingInit.getNodeName(); + entry.getValue().cannotHostActors = hostingInit.getNodeType() == NodeTypeEnum.CLIENT; entry.getValue().canActivate.putAll(hostingInit.getSupportedActorInterfaces()); + for (String actorInterface : hostingInit.getSupportedActorInterfaces().keySet()) + { + Task waiter = entry.getValue().canActivateWaiters.remove(actorInterface); + if (waiter != null) + { + waiter.complete(null); + } + } } } + updateServerNodes(); } else { @@ -736,6 +826,9 @@ else if (msg instanceof NodeJoinedEvent) { supportedActivations.put(actorInterface, actorSupported_yes); } + final NodeTypeEnum localNodeType = stage.getMode() == Stage.StageMode.HOST + ? NodeTypeEnum.SERVER + : NodeTypeEnum.CLIENT; Map nodes = activeNodes; for (NodeAddress nodeAddress : nodes.keySet()) @@ -748,7 +841,7 @@ else if (msg instanceof NodeJoinedEvent) .withInterfaceId(0) .withMethodId(0) .withMessageType(MessageDefinitions.HOSTING_INIT) - .withPayload(new HostingInitPayload(stage.getNodeName(), supportedActivations))); + .withPayload(new HostingInitPayload(stage.getNodeName(), supportedActivations, localNodeType))); } return Task.done(); } diff --git a/actors/runtime/src/main/java/cloud/orbit/actors/runtime/HostingInit.java b/actors/runtime/src/main/java/cloud/orbit/actors/runtime/HostingInit.java index ab707681..ce9fcd91 100644 --- a/actors/runtime/src/main/java/cloud/orbit/actors/runtime/HostingInit.java +++ b/actors/runtime/src/main/java/cloud/orbit/actors/runtime/HostingInit.java @@ -29,6 +29,7 @@ package cloud.orbit.actors.runtime; import cloud.orbit.actors.cluster.NodeAddress; +import cloud.orbit.actors.runtime.NodeCapabilities.NodeTypeEnum; import java.io.Serializable; import java.util.Map; @@ -40,13 +41,16 @@ public class HostingInit implements Serializable private final String nodeName; private final NodeAddress nodeAddress; private final Map supportedActorInterfaces; + private final NodeTypeEnum nodeType; public HostingInit(final String nodeName, final NodeAddress nodeAddress, - final Map supportedActorInterfaces) + final Map supportedActorInterfaces, + final NodeTypeEnum nodeType) { this.nodeName = nodeName; this.nodeAddress = nodeAddress; this.supportedActorInterfaces = supportedActorInterfaces; + this.nodeType = nodeType; } public String getNodeName() @@ -64,6 +68,11 @@ public Map getSupportedActorInterfaces() return supportedActorInterfaces; } + public NodeTypeEnum getNodeType() + { + return nodeType; + } + @Override public String toString() { @@ -71,6 +80,7 @@ public String toString() "nodeName=" + nodeName + ", nodeAddress=" + nodeAddress + ", supportedActorInterfaces=" + supportedActorInterfaces.keySet() + + ", nodeType=" + nodeType + '}'; } } diff --git a/actors/runtime/src/main/java/cloud/orbit/actors/runtime/HostingInitPayload.java b/actors/runtime/src/main/java/cloud/orbit/actors/runtime/HostingInitPayload.java index ba3f66db..57ecac9b 100644 --- a/actors/runtime/src/main/java/cloud/orbit/actors/runtime/HostingInitPayload.java +++ b/actors/runtime/src/main/java/cloud/orbit/actors/runtime/HostingInitPayload.java @@ -31,12 +31,15 @@ import java.io.Serializable; import java.util.Map; +import cloud.orbit.actors.runtime.NodeCapabilities.NodeTypeEnum; + public class HostingInitPayload implements Serializable { private static final long serialVersionUID = 1L; private String nodeName; private Map supportedActivations; + private NodeTypeEnum nodeType; public HostingInitPayload() { @@ -48,6 +51,14 @@ public HostingInitPayload(final String nodeName, final Map supp this.supportedActivations = supportedActivations; } + public HostingInitPayload(final String nodeName, final Map supportedActivations, + final NodeTypeEnum nodeType) + { + this.nodeName = nodeName; + this.supportedActivations = supportedActivations; + this.nodeType = nodeType; + } + public String getNodeName() { return nodeName; @@ -57,4 +68,9 @@ public Map getSupportedActivations() { return supportedActivations; } + + public NodeTypeEnum getNodeType() + { + return nodeType; + } } diff --git a/actors/runtime/src/main/java/cloud/orbit/actors/runtime/Messaging.java b/actors/runtime/src/main/java/cloud/orbit/actors/runtime/Messaging.java index dc3f106f..10a7996e 100644 --- a/actors/runtime/src/main/java/cloud/orbit/actors/runtime/Messaging.java +++ b/actors/runtime/src/main/java/cloud/orbit/actors/runtime/Messaging.java @@ -278,7 +278,7 @@ protected void onMessageReceived(HandlerContext ctx, Message message) case MessageDefinitions.HOSTING_INIT: HostingInitPayload hostingInitPayload = (HostingInitPayload) message.getPayload(); ctx.fireRead(new HostingInit(hostingInitPayload.getNodeName(), message.getFromNode(), - hostingInitPayload.getSupportedActivations())); + hostingInitPayload.getSupportedActivations(), hostingInitPayload.getNodeType())); break; default: logger.error("Illegal protocol, invalid message type: {}", messageType); diff --git a/actors/test/actor-tests/src/test/java/cloud/orbit/actors/test/ShutdownTest.java b/actors/test/actor-tests/src/test/java/cloud/orbit/actors/test/ShutdownTest.java index 3c3ffec6..19548754 100644 --- a/actors/test/actor-tests/src/test/java/cloud/orbit/actors/test/ShutdownTest.java +++ b/actors/test/actor-tests/src/test/java/cloud/orbit/actors/test/ShutdownTest.java @@ -122,5 +122,52 @@ public void asyncShutdownTest() throws ExecutionException, InterruptedException } + @Test(timeout = 10_000L) + public void stoppingNodeDeclinesActivationWhenOtherServersPresent() throws ExecutionException, InterruptedException + { + Stage stage1 = createStage(); + Stage stage2 = createStage(); + Stage client = createClient(); + eventuallyTrue(() -> stage1.getHosting().getServerNodes().size() >= 2); + + Task methodCall = Actor.getReference(Shut.class, "0").doSomethingBlocking(); + fakeSync.semaphore("executing").acquire(); + + final Task stopFuture = Task.runAsync(() -> stage1.stop().join()); + waitFor(() -> stage1.getState() == NodeCapabilities.NodeState.STOPPING); + + int canActivate = stage1.getHosting().canActivate(Shut.class.getName()).join(); + assertEquals(NodeCapabilities.actorSupported_noneSupported, canActivate); + + fakeSync.semaphore("canFinish").release(); + stopFuture.join(); + eventuallyTrue(methodCall::isDone); + + stage2.stop().join(); + client.stop().join(); + } + + @Test(timeout = 10_000L) + public void stoppingNodeAllowsActivationWhenAlone() throws ExecutionException, InterruptedException + { + Stage stage1 = createStage(); + Stage client = createClient(); + + Task methodCall = Actor.getReference(Shut.class, "0").doSomethingBlocking(); + fakeSync.semaphore("executing").acquire(); + + final Task stopFuture = Task.runAsync(() -> stage1.stop().join()); + waitFor(() -> stage1.getState() == NodeCapabilities.NodeState.STOPPING); + + int canActivate = stage1.getHosting().canActivate(Shut.class.getName()).join(); + assertEquals(NodeCapabilities.actorSupported_yes, canActivate); + + fakeSync.semaphore("canFinish").release(); + stopFuture.join(); + eventuallyTrue(methodCall::isDone); + + client.stop().join(); + } + }