diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 5c639d08b2596..4dfdb6b35997d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; @@ -747,329 +748,275 @@ private void processFailedMessage(UUID nodeId, throws IgniteCheckedException { assert msg != null; - switch (msg.directType()) { - case 10022: { - GridDhtLockRequest req = (GridDhtLockRequest)msg; + if (msg instanceof GridDhtLockRequest) { + GridDhtLockRequest req = (GridDhtLockRequest)msg; - GridDhtLockResponse res = new GridDhtLockResponse( - req.cacheId(), - req.version(), - req.futureId(), - req.miniId(), - 0); + GridDhtLockResponse res = new GridDhtLockResponse( + req.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + 0); - sendResponseOnFailedMessage(nodeId, res, cctx, plc); - } - - break; + sendResponseOnFailedMessage(nodeId, res, cctx, plc); + } + else if (msg instanceof GridDhtTxPrepareRequest) { + GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg; - case 10016: { - GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg; + GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse( + req.partition(), + req.version(), + req.futureId(), + req.miniId(), + req.deployInfo() != null); - GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse( - req.partition(), - req.version(), - req.futureId(), - req.miniId(), - req.deployInfo() != null); + res.error(req.classError()); - res.error(req.classError()); + sendResponseOnFailedMessage(nodeId, res, cctx, req.policy()); + } + else if (msg instanceof GridDhtAtomicUpdateRequest) { + GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg; - sendResponseOnFailedMessage(nodeId, res, cctx, req.policy()); - } + GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( + req.cacheId(), + req.partition(), + req.futureId()); - break; + res.onError(req.classError()); - case 10303: { - GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg; + sendResponseOnFailedMessage(nodeId, res, cctx, plc); - GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( - req.cacheId(), + if (req.nearNodeId() != null) { + GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), req.partition(), - req.futureId()); - - res.onError(req.classError()); - - sendResponseOnFailedMessage(nodeId, res, cctx, plc); - - if (req.nearNodeId() != null) { - GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), - req.partition(), - req.nearFutureId(), - nodeId, - req.flags()); - - nearRes.errors(new UpdateErrors(req.classError())); - - sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc); - } - } - - break; - - case 10305: { - GridNearAtomicFullUpdateRequest req = (GridNearAtomicFullUpdateRequest)msg; - - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( - req.cacheId(), + req.nearFutureId(), nodeId, - req.futureId(), - req.partition(), - false); - - res.error(req.classError()); - - sendResponseOnFailedMessage(nodeId, res, cctx, plc); - } - - break; + req.flags()); - case 10300: { - GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg; - - GridDhtForceKeysResponse res = new GridDhtForceKeysResponse( - req.cacheId(), - req.futureId(), - req.miniId(), - req.classError() - ); + nearRes.errors(new UpdateErrors(req.classError())); - sendResponseOnFailedMessage(nodeId, res, cctx, plc); + sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc); } + } + else if (msg instanceof GridNearAtomicFullUpdateRequest) { + GridNearAtomicFullUpdateRequest req = (GridNearAtomicFullUpdateRequest)msg; - break; + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( + req.cacheId(), + nodeId, + req.futureId(), + req.partition(), + false); - case 10313: { - GridNearGetRequest req = (GridNearGetRequest)msg; + res.error(req.classError()); - GridNearGetResponse res = new GridNearGetResponse( - req.cacheId(), - req.futureId(), - req.miniId(), - req.version(), - req.deployInfo() != null); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); + } + else if (msg instanceof GridDhtForceKeysRequest) { + GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg; - res.error(req.classError()); + GridDhtForceKeysResponse res = new GridDhtForceKeysResponse( + req.cacheId(), + req.futureId(), + req.miniId(), + req.classError() + ); - sendResponseOnFailedMessage(nodeId, res, cctx, plc); - } - - break; + sendResponseOnFailedMessage(nodeId, res, cctx, plc); + } + else if (msg instanceof GridNearGetRequest) { + GridNearGetRequest req = (GridNearGetRequest)msg; - case 10314: { - GridNearGetResponse res = (GridNearGetResponse)msg; + GridNearGetResponse res = new GridNearGetResponse( + req.cacheId(), + req.futureId(), + req.miniId(), + req.version(), + req.deployInfo() != null); - CacheGetFuture fut = (CacheGetFuture)cctx.mvcc().future(res.futureId()); + res.error(req.classError()); - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); + } + else if (msg instanceof GridNearGetResponse) { + GridNearGetResponse res = (GridNearGetResponse)msg; - return; - } + CacheGetFuture fut = (CacheGetFuture)cctx.mvcc().future(res.futureId()); - res.error(res.classError()); + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); - fut.onResult(nodeId, res); + return; } - break; - - case 10025: { - GridNearLockRequest req = (GridNearLockRequest)msg; - - GridNearLockResponse res = new GridNearLockResponse( - req.cacheId(), - req.version(), - req.futureId(), - req.miniId(), - false, - 0, - req.classError(), - null, - false); + res.error(res.classError()); - sendResponseOnFailedMessage(nodeId, res, cctx, plc); - } + fut.onResult(nodeId, res); + } + else if (msg instanceof GridNearLockRequest) { + GridNearLockRequest req = (GridNearLockRequest)msg; + + GridNearLockResponse res = new GridNearLockResponse( + req.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + false, + 0, + req.classError(), + null, + false); + + sendResponseOnFailedMessage(nodeId, res, cctx, plc); + } + else if (msg instanceof GridNearTxPrepareRequest) { + GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg; + + GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( + req.partition(), + req.version(), + req.futureId(), + req.miniId(), + req.version(), + req.version(), + null, + null, + null, + false, + req.deployInfo() != null); + + res.error(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, cctx, req.policy()); + } + else if (msg instanceof GridCacheQueryRequest) { + GridCacheQueryRequest req = (GridCacheQueryRequest)msg; - break; + GridCacheQueryResponse res = new GridCacheQueryResponse( + req.cacheId(), + req.id(), + req.classError(), + cctx.deploymentEnabled()); - case 10020: { - GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg; + ClusterNode node = cctx.node(nodeId); - GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( - req.partition(), - req.version(), - req.futureId(), - req.miniId(), - req.version(), - req.version(), - null, - null, - null, - false, - req.deployInfo() != null); - - res.error(req.classError()); - - sendResponseOnFailedMessage(nodeId, res, cctx, req.policy()); + if (node == null) { + U.error(log, "Failed to send message because node left grid [nodeId=" + nodeId + + ", msg=" + msg + ']'); } - - break; - - case 10911: { - GridCacheQueryRequest req = (GridCacheQueryRequest)msg; - - GridCacheQueryResponse res = new GridCacheQueryResponse( - req.cacheId(), - req.id(), - req.classError(), - cctx.deploymentEnabled()); - - ClusterNode node = cctx.node(nodeId); - - if (node == null) { - U.error(log, "Failed to send message because node left grid [nodeId=" + nodeId + - ", msg=" + msg + ']'); - } - else { - cctx.io().sendOrderedMessage( - node, - TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()), - res, - plc, - Long.MAX_VALUE); - } + else { + cctx.io().sendOrderedMessage( + node, + TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()), + res, + plc, + Long.MAX_VALUE); } + } + else if (msg instanceof GridNearSingleGetRequest) { + GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg; - break; + GridNearSingleGetResponse res = new GridNearSingleGetResponse( + req.cacheId(), + req.futureId(), + req.topologyVersion(), + null, + false, + req.deployInfo() != null); - case 10615: - case 120: { - processMessage(nodeId, msg, c); // Will be handled by Rebalance Demander. - } + res.error(req.classError()); - break; - - case 10315: { - GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg; + sendResponseOnFailedMessage(nodeId, res, cctx, plc); + } + else if (msg instanceof GridNearSingleGetResponse) { + GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg; - GridNearSingleGetResponse res = new GridNearSingleGetResponse( - req.cacheId(), - req.futureId(), - req.topologyVersion(), - null, - false, - req.deployInfo() != null); + GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)cctx.mvcc() + .future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId())); - res.error(req.classError()); + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); - sendResponseOnFailedMessage(nodeId, res, cctx, plc); + return; } - break; + res.error(res.classError()); - case 10316: { - GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg; + fut.onResult(nodeId, res); + } + else if (msg instanceof GridNearAtomicSingleUpdateRequest) { + GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg; - GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)cctx.mvcc() - .future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId())); + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( + req.cacheId(), + nodeId, + req.futureId(), + req.partition(), + false); - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); + res.error(req.classError()); - return; - } + sendResponseOnFailedMessage(nodeId, res, cctx, plc); + } + else if (msg instanceof GridNearAtomicSingleUpdateInvokeRequest) { + GridNearAtomicSingleUpdateInvokeRequest req = (GridNearAtomicSingleUpdateInvokeRequest)msg; - res.error(res.classError()); + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( + req.cacheId(), + nodeId, + req.futureId(), + req.partition(), + false); - fut.onResult(nodeId, res); - } + res.error(req.classError()); - break; + sendResponseOnFailedMessage(nodeId, res, cctx, plc); + } + else if (msg instanceof GridNearAtomicSingleUpdateFilterRequest) { + GridNearAtomicSingleUpdateFilterRequest req = (GridNearAtomicSingleUpdateFilterRequest)msg; - case 10308: { - GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg; + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( + req.cacheId(), + nodeId, + req.futureId(), + req.partition(), + false); - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( - req.cacheId(), - nodeId, - req.futureId(), - req.partition(), - false); + res.error(req.classError()); - res.error(req.classError()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); + } + else if (msg instanceof GridDhtAtomicSingleUpdateRequest) { + GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg; - sendResponseOnFailedMessage(nodeId, res, cctx, plc); - } + GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( + req.cacheId(), + req.partition(), + req.futureId()); - break; + res.onError(req.classError()); - case 10309: { - GridNearAtomicSingleUpdateInvokeRequest req = (GridNearAtomicSingleUpdateInvokeRequest)msg; + sendResponseOnFailedMessage(nodeId, res, cctx, plc); - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( - req.cacheId(), - nodeId, - req.futureId(), + if (req.nearNodeId() != null) { + GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), req.partition(), - false); - - res.error(req.classError()); - - sendResponseOnFailedMessage(nodeId, res, cctx, plc); - } - - break; - - case 10310: { - GridNearAtomicSingleUpdateFilterRequest req = (GridNearAtomicSingleUpdateFilterRequest)msg; - - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( - req.cacheId(), + req.nearFutureId(), nodeId, - req.futureId(), - req.partition(), - false); - - res.error(req.classError()); - - sendResponseOnFailedMessage(nodeId, res, cctx, plc); - } - - break; - - case 10306: { - GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg; - - GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( - req.cacheId(), - req.partition(), - req.futureId()); - - res.onError(req.classError()); + req.flags()); - sendResponseOnFailedMessage(nodeId, res, cctx, plc); + nearRes.errors(new UpdateErrors(req.classError())); - if (req.nearNodeId() != null) { - GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), - req.partition(), - req.nearFutureId(), - nodeId, - req.flags()); - - nearRes.errors(new UpdateErrors(req.classError())); - - sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc); - } + sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc); } - - break; - - default: - throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" - + msg + "]", msg.classError()); + } + else if (msg instanceof GridDhtPartitionSupplyMessage) + processMessage(nodeId, msg, c); // Will be handled by Rebalance Demander. + else { + throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + + msg + "]", msg.classError()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 29c5283c0af13..cd2704daf9007 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -3395,54 +3395,46 @@ else if (msg instanceof TxLocksResponse) { * @param msg Message. */ private void processFailedMessage(UUID nodeId, GridCacheMessage msg, Throwable err) throws IgniteCheckedException { - switch (msg.directType()) { - case 10003: { - TxLocksRequest req = (TxLocksRequest)msg; + if (msg instanceof TxLocksRequest) { + TxLocksRequest req = (TxLocksRequest)msg; - TxLocksResponse res = new TxLocksResponse(); + TxLocksResponse res = new TxLocksResponse(); - res.futureId(req.futureId()); + res.futureId(req.futureId()); - try { - cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send response, node failed: " + nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId + - ", res=" + res + ']', e); - } + try { + cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send response, node failed: " + nodeId); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId + + ", res=" + res + ']', e); + } + } + else if (msg instanceof TxLocksResponse) { + TxLocksResponse res = (TxLocksResponse)msg; - break; - - case 10004: { - TxLocksResponse res = (TxLocksResponse)msg; - - TxDeadlockFuture fut = future(res.futureId()); - - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find future for response [sender=" + nodeId + ", res=" + res + ']'); + TxDeadlockFuture fut = future(res.futureId()); - return; - } + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find future for response [sender=" + nodeId + ", res=" + res + ']'); - if (err == null) - fut.onResult(nodeId, res); - else - fut.onDone(null, err); + return; } - break; - - default: - throw new IgniteCheckedException("Failed to process message. Unsupported direct type [msg=" + - msg + ']', msg.classError()); + if (err == null) + fut.onResult(nodeId, res); + else + fut.onDone(null, err); + } + else { + throw new IgniteCheckedException("Failed to process message. Unsupported direct type [msg=" + + msg + ']', msg.classError()); } - } /**