Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
uses: actions/setup-java@v4
with:
distribution: "temurin"
java-version: "8"
java-version: "11"
cache: "maven"
- name: Build and test
run: mvn -B -V test
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class NodeInfo
NodeCapabilities nodeCapabilities;
boolean cannotHostActors;
final ConcurrentHashMap<String, Integer> canActivate = new ConcurrentHashMap<>();
final ConcurrentHashMap<String, Task<Void>> canActivate2 = new ConcurrentHashMap<>();
final ConcurrentHashMap<String, Task<Void>> canActivateWaiters = new ConcurrentHashMap<>();
final Set<String> canActivatePending = ConcurrentHashMap.newKeySet();
String nodeName;

Expand Down
135 changes: 114 additions & 21 deletions actors/runtime/src/main/java/cloud/orbit/actors/runtime/Hosting.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import java.lang.reflect.Method;
import java.security.MessageDigest;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -144,6 +145,17 @@ public List<NodeAddress> getServerNodes()

public Task<?> notifyStateChange()
{
final NodeInfo localNode = activeNodes.get(clusterPeer.localAddress());
if (localNode != null)
{
localNode.state = stage.getState();
if (localNode.state != NodeState.RUNNING)
{
localNode.canActivate.clear();
localNode.canActivatePending.clear();
localNode.canActivateWaiters.clear();
}
}
return Task.allOf(activeNodes.values().stream()
.filter(nodeInfo -> !nodeInfo.address.equals(clusterPeer.localAddress()) && nodeInfo.state == NodeState.RUNNING)
.map(info -> info.nodeCapabilities.nodeModeChanged(clusterPeer.localAddress(), stage.getState()).exceptionally(throwable -> null)));
Expand Down Expand Up @@ -172,10 +184,22 @@ public String getNodeName(final NodeAddress nodeAddress)
@Override
public Task<Integer> canActivate(String interfaceName)
{
if (nodeType == NodeTypeEnum.CLIENT || stage.getState() == NodeState.STOPPED)
if (nodeType == NodeTypeEnum.CLIENT)
{
return Task.fromValue(actorSupported_noneSupported);
}
final NodeState state = stage.getState();
if (state == NodeState.STOPPED)
{
return Task.fromValue(actorSupported_noneSupported);
}
if (state == NodeState.STOPPING)
{
if (hasOtherRunningServerNodes())
{
return Task.fromValue(actorSupported_noneSupported);
}
}
return Task.fromValue(stage.canActivateActor(interfaceName) ? actorSupported_yes
: actorSupported_no);
}
Expand All @@ -195,9 +219,12 @@ public Task<Void> nodeModeChanged(final NodeAddress nodeAddress, final NodeState
{
// clear list of actors this node can activate
node.canActivate.clear();
node.canActivatePending.clear();
node.canActivateWaiters.clear();
localAddressCache.asMap().values().removeAll(Collections.singleton(node.address));
}
}
updateServerNodes();
return Task.done();
}

Expand All @@ -208,10 +235,7 @@ public Task<Void> moved(RemoteReference remoteReference, NodeAddress oldAddress,
{
logger.debug("Move {} to from {} to {}.", remoteReference, oldAddress, newAddress);
}
if (activeNodes.containsKey(newAddress))
{
localAddressCache.put(remoteReference, newAddress);
}
localAddressCache.put(remoteReference, newAddress);
return Task.done();
}

Expand Down Expand Up @@ -240,7 +264,10 @@ public Task<Void> start()

private void onClusterViewChanged(final Collection<NodeAddress> nodes)
{
logger.info("Cluster view changed " + nodes + " on " + Thread.currentThread().getName());
if (logger.isDebugEnabled())
{
logger.debug("Cluster view changed " + nodes);
}

final HashMap<NodeAddress, NodeInfo> oldNodes = new HashMap<>(activeNodes);
final HashMap<NodeAddress, NodeInfo> newNodes = new HashMap<>(nodes.size());
Expand All @@ -252,10 +279,9 @@ private void onClusterViewChanged(final Collection<NodeAddress> nodes)
NodeInfo nodeInfo = oldNodes.remove(a);
if (nodeInfo == null)
{
final NodeInfo newNodeInfo = new NodeInfo(a);
newNodeInfo.nodeCapabilities = stage.getRemoteObserverReference(a, NodeCapabilities.class, "");
newNodeInfo.active = true;
nodeInfo = newNodeInfo;
nodeInfo = new NodeInfo(a);
nodeInfo.nodeCapabilities = stage.getRemoteObserverReference(a, NodeCapabilities.class, "");
nodeInfo.active = true;
}
newNodes.put(a, nodeInfo);

Expand All @@ -274,9 +300,7 @@ private void onClusterViewChanged(final Collection<NodeAddress> nodes)
activeNodes = newNodes;

consistentHashNodeTree = newHashes;

this.serverNodes = activeNodes.values().stream().filter(
nodeInfo -> nodeInfo.active && !nodeInfo.cannotHostActors).collect(Collectors.toList());
updateServerNodes();

stage.serverNodesUpdated();

Expand All @@ -294,7 +318,21 @@ public Task<NodeAddress> locateActor(final RemoteReference reference, final bool
{
// don't need to call the node call the node to check.
// checks should be explicit.
return activeNodes.containsKey(address) ? Task.fromValue(address) : Task.fromValue(null);
if (activeNodes.containsKey(address))
{
if (address.equals(clusterPeer.localAddress()) && stage.getState() != NodeState.RUNNING)
{
// node is draining, avoid routing new work to itself
}
else
{
return Task.fromValue(address);
}
}
else
{
return Task.fromValue(null);
}
}
return (forceActivation) ? locateAndActivateActor(reference) : locateActiveActor(reference);
}
Expand All @@ -305,7 +343,14 @@ private Task<NodeAddress> locateActiveActor(final RemoteReference<?> actorRefere
final NodeAddress address = localAddressCache.getIfPresent(actorReference);
if (address != null && activeNodes.containsKey(address))
{
return Task.fromValue(address);
if (address.equals(clusterPeer.localAddress()) && stage.getState() != NodeState.RUNNING)
{
// node is draining, ignore local cache to encourage relocation
}
else
{
return Task.fromValue(address);
}
}
// try to locate the actor in the distributed directory
// this can be expensive, less that activating the actor, though
Expand Down Expand Up @@ -362,7 +407,14 @@ private Task<NodeAddress> locateAndActivateActor(final RemoteReference<?> actorR
// Is this actor already activated and in the local cache? If so, we're done
if (address != null && activeNodes.containsKey(address))
{
return Task.fromValue(address);
if (address.equals(clusterPeer.localAddress()) && stage.getState() != NodeState.RUNNING)
{
// node is draining, ignore local cache to encourage relocation
}
else
{
return Task.fromValue(address);
}
}

// Get the distributed cache if needed
Expand Down Expand Up @@ -443,6 +495,13 @@ private RemoteKey createRemoteKey(final RemoteReference actorReference)
private Task<NodeAddress> selectNode(final String interfaceClassName)
{
final List<NodeInfo> currentServerNodes = serverNodes;
if (currentServerNodes.isEmpty()
&& nodeType == NodeTypeEnum.SERVER
&& stage.getState() == NodeState.RUNNING
&& stage.canActivateActor(interfaceClassName))
{
return Task.fromValue(clusterPeer.localAddress());
}

final List<NodeInfo> suitableNodes = currentServerNodes.stream()
.filter(n -> (!n.cannotHostActors && n.state == NodeState.RUNNING)
Expand All @@ -463,7 +522,7 @@ private Task<NodeAddress> selectNode(final String interfaceClassName)

for (NodeInfo potentialNode : potentialNodes)
{
potentialNode.canActivate2.putIfAbsent(interfaceClassName, new Task<>());
potentialNode.canActivateWaiters.putIfAbsent(interfaceClassName, new Task<>());
// loop over the potential nodes, add the interface to a concurrent hashset, if add returns true there is no activation pending
if (potentialNode.canActivatePending.add(interfaceClassName))
{
Expand All @@ -483,21 +542,27 @@ private Task<NodeAddress> selectNode(final String interfaceClassName)
potentialNode.cannotHostActors = true;
// jic
potentialNode.canActivate.put(interfaceClassName, actorSupported_no);
updateServerNodes();
}
else
{
// found suitable host
potentialNode.canActivate.put(interfaceClassName, result);
}
potentialNode.canActivate2.get(interfaceClassName).complete(null);
}
potentialNode.canActivatePending.remove(interfaceClassName);
Task<Void> pending = potentialNode.canActivateWaiters.get(interfaceClassName);
if (pending != null)
{
pending.complete(null);
}
return null;
});
}
}

return Task.allOf(potentialNodes.stream().map(nodeInfo ->
nodeInfo.canActivate2.get(interfaceClassName)
nodeInfo.canActivateWaiters.get(interfaceClassName)
.failAfter(timeToWaitForServersMillis, TimeUnit.MILLISECONDS))
.collect(Collectors.toList())).handle((aVoid, throwable) -> {
final List<NodeInfo> suitableNodes2 = potentialNodes.stream()
Expand All @@ -521,7 +586,8 @@ private boolean shouldPlaceLocally(final Class<?> interfaceClass)
{
final String interfaceClassName = interfaceClass.getName();

if (interfaceClass.isAnnotationPresent(PreferLocalPlacement.class) &&
if (stage.getState() == NodeState.RUNNING &&
interfaceClass.isAnnotationPresent(PreferLocalPlacement.class) &&
nodeType == NodeTypeEnum.SERVER && stage.canActivateActor(interfaceClassName))
{
final int percentile = interfaceClass.getAnnotation(PreferLocalPlacement.class).percentile();
Expand Down Expand Up @@ -549,6 +615,20 @@ private String getHash(final String key)
}
}

private void updateServerNodes()
{
this.serverNodes = activeNodes.values().stream()
.filter(nodeInfo -> nodeInfo.active && nodeInfo.state == NodeState.RUNNING && !nodeInfo.cannotHostActors)
.collect(Collectors.toList());
}

private boolean hasOtherRunningServerNodes()
{
final List<NodeInfo> currentServerNodes = serverNodes;
return currentServerNodes.stream()
.anyMatch(nodeInfo -> !nodeInfo.address.equals(clusterPeer.localAddress()));
}

/**
* Uses consistent hashing to determine the "owner" of a certain key.
*
Expand Down Expand Up @@ -611,9 +691,19 @@ else if (msg instanceof HostingInit)
entry.getKey().equals(hostingInit.getNodeAddress()))
{
entry.getValue().nodeName = hostingInit.getNodeName();
entry.getValue().cannotHostActors = hostingInit.getNodeType() == NodeTypeEnum.CLIENT;
entry.getValue().canActivate.putAll(hostingInit.getSupportedActorInterfaces());
for (String actorInterface : hostingInit.getSupportedActorInterfaces().keySet())
{
Task<Void> waiter = entry.getValue().canActivateWaiters.remove(actorInterface);
if (waiter != null)
{
waiter.complete(null);
}
}
}
}
updateServerNodes();
}
else
{
Expand Down Expand Up @@ -736,6 +826,9 @@ else if (msg instanceof NodeJoinedEvent)
{
supportedActivations.put(actorInterface, actorSupported_yes);
}
final NodeTypeEnum localNodeType = stage.getMode() == Stage.StageMode.HOST
? NodeTypeEnum.SERVER
: NodeTypeEnum.CLIENT;

Map<NodeAddress, NodeInfo> nodes = activeNodes;
for (NodeAddress nodeAddress : nodes.keySet())
Expand All @@ -748,7 +841,7 @@ else if (msg instanceof NodeJoinedEvent)
.withInterfaceId(0)
.withMethodId(0)
.withMessageType(MessageDefinitions.HOSTING_INIT)
.withPayload(new HostingInitPayload(stage.getNodeName(), supportedActivations)));
.withPayload(new HostingInitPayload(stage.getNodeName(), supportedActivations, localNodeType)));
}
return Task.done();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
package cloud.orbit.actors.runtime;

import cloud.orbit.actors.cluster.NodeAddress;
import cloud.orbit.actors.runtime.NodeCapabilities.NodeTypeEnum;

import java.io.Serializable;
import java.util.Map;
Expand All @@ -40,13 +41,16 @@ public class HostingInit implements Serializable
private final String nodeName;
private final NodeAddress nodeAddress;
private final Map<String, Integer> supportedActorInterfaces;
private final NodeTypeEnum nodeType;

public HostingInit(final String nodeName, final NodeAddress nodeAddress,
final Map<String, Integer> supportedActorInterfaces)
final Map<String, Integer> supportedActorInterfaces,
final NodeTypeEnum nodeType)
{
this.nodeName = nodeName;
this.nodeAddress = nodeAddress;
this.supportedActorInterfaces = supportedActorInterfaces;
this.nodeType = nodeType;
}

public String getNodeName()
Expand All @@ -64,13 +68,19 @@ public Map<String, Integer> getSupportedActorInterfaces()
return supportedActorInterfaces;
}

public NodeTypeEnum getNodeType()
{
return nodeType;
}

@Override
public String toString()
{
return "HostingInit{" +
"nodeName=" + nodeName +
", nodeAddress=" + nodeAddress +
", supportedActorInterfaces=" + supportedActorInterfaces.keySet() +
", nodeType=" + nodeType +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
import java.io.Serializable;
import java.util.Map;

import cloud.orbit.actors.runtime.NodeCapabilities.NodeTypeEnum;

public class HostingInitPayload implements Serializable
{
private static final long serialVersionUID = 1L;

private String nodeName;
private Map<String, Integer> supportedActivations;
private NodeTypeEnum nodeType;

public HostingInitPayload()
{
Expand All @@ -48,6 +51,14 @@ public HostingInitPayload(final String nodeName, final Map<String, Integer> supp
this.supportedActivations = supportedActivations;
}

public HostingInitPayload(final String nodeName, final Map<String, Integer> supportedActivations,
final NodeTypeEnum nodeType)
{
this.nodeName = nodeName;
this.supportedActivations = supportedActivations;
this.nodeType = nodeType;
}

public String getNodeName()
{
return nodeName;
Expand All @@ -57,4 +68,9 @@ public Map<String, Integer> getSupportedActivations()
{
return supportedActivations;
}

public NodeTypeEnum getNodeType()
{
return nodeType;
}
}
Loading