[ https://issues.apache.org/jira/browse/IGNITE-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mikhail Petrov updated IGNITE-24224: ------------------------------------ Description: The following code can be considered as a reproducer of the problem. Its main goal is to cause Ignite to be overloaded with communication messages. In this particular case it will be GridJobExecuteRequest and GridJobExecuteResponce spamming. {code:java} /** */ public class BrokenSerializationTest extends AbstractSecurityTest { /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { SslContextFactory factory = (SslContextFactory)sslTrustedFactory("server", "trustone"); IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName) .setSslContextFactory(factory) .setFailureHandler(new StopNodeOrHaltFailureHandler()); cfg.setCommunicationSpi(new TcpCommunicationSpi().setUnacknowledgedMessagesBufferSize(160)); // Small values of this property causes Ignite to frequently reestablish connection between nodes. return cfg; } /** Tests task execution security context in case task was initiated from the {@link IgniteClient}. */ @Test public void test() throws Exception { IgniteEx ignite = startGridAllowAll("crd"); startGridAllowAll("srv1"); startGridAllowAll("srv2"); startGridAllowAll("srv3"); startGridAllowAll("srv4"); startGridAllowAll("srv5"); awaitPartitionMapExchange(); Set<Throwable> errors = ConcurrentHashMap.newKeySet(); Set<ComputeTaskFuture<Integer>> futs = ConcurrentHashMap.newKeySet(); CountDownLatch latch = new CountDownLatch(16 * 10000); for (int i = 0; i < 16; i++) { GridTestUtils.runAsync(() -> { try { U.sleep(1000); for (int j = 0; j < 10000; j++) { futs.add(ignite.context().task().<Void, Integer>execute( TestTask.class.getName(), null, TaskExecutionOptions.options().asPublicRequest().withProjection(ignite.cluster().nodes()) ).publicFuture()); latch.countDown(); } } catch (Throwable e) { errors.add(e); } }); } latch.await(getTestTimeout(), TimeUnit.MILLISECONDS); assertTrue(errors.isEmpty()); for (ComputeTaskFuture<Integer> fut : futs) { assertTrue(null == fut.get(getTestTimeout())); } } /** */ public static class TestTask implements ComputeTask<Void, Integer> { /** {@inheritDoc} */ @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map( List<ClusterNode> subgrid, @Nullable Void arg ) throws IgniteException { Map<ComputeJob, ClusterNode> res = new HashMap<>(); for (ClusterNode node : subgrid) { res.put(new ComputeJob() { @Override public void cancel() { // No-op. } @Override public Object execute() throws IgniteException { return null; } }, node); } return res; } /** {@inheritDoc} */ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { if (res.getException() != null) throw new IgniteException(res.getException()); return ComputeJobResultPolicy.WAIT; } /** {@inheritDoc} */ @Override public @Nullable Integer reduce(List<ComputeJobResult> results) throws IgniteException { assertEquals(allGrids().size(), results.size()); return null; } } } {code} It does not reproduce the problem sequentially because the problem is related to cases when only part of the message is read into the application buffer while reading from the socket. While this reproducer is in progress you can witness different errors related to communication message handling e.g. {code:java} [2025-01-16T10:10:11,136][ERROR][grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%][TcpCommunicationSpi] Failed to process selector key [ses=GridSelectorNioSessionImpl [worker=DirectNioClientWorker [super=AbstractNioClientWorker [idx=3, bytesRcvd=10261, bytesSent=0, bytesRcvd0=0, bytesSent0=0, select=true, super=GridWorker [name=grid-nio-worker-tcp-comm-3, igniteInstanceName=TcpCommunicationSpi, finished=false, heartbeatTs=1737011410109, hashCode=520079162, interrupted=false, runner=grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%]]], writeBuf=java.nio.DirectByteBuffer[pos=0 lim=32768 cap=32768], readBuf=java.nio.DirectByteBuffer[pos=2398 lim=2398 cap=32768], inRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], outRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], closeSocket=true, outboundMessagesQueueSizeMetric=o.a.i.i.processors.metric.impl.LongAdderMetric@69a257d1, super=GridNioSessionImpl [locAddr=/127.0.0.1:48568, rmtAddr=/127.0.0.1:47104, createTime=1737011410099, closeTime=0, bytesSent=0, bytesRcvd=2398, bytesSent0=0, bytesRcvd0=0, sndSchedTime=1737011410099, lastSndTime=1737011410109, lastRcvTime=1737011410109, readsPaused=false, filterChain=FilterChain[filters=[GridNioCodecFilter [parser=o.a.i.i.util.nio.GridDirectParser@49fbc83f, directMode=true], GridConnectionBytesVerifyFilter, SSL filter], accepted=false, markedForClose=false]]] org.apache.ignite.IgniteException: Invalid message type: 512 at org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl.create(IgniteMessageFactoryImpl.java:104) ~[classes/:?] at org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper$2.create(GridNioServerWrapper.java:811) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridDirectParser.decode(GridDirectParser.java:81) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioCodecFilter.onMessageReceived(GridNioCodecFilter.java:113) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter.onMessageReceived(GridConnectionBytesVerifyFilter.java:88) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) ~[classes/:?] at org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.onMessageReceived(GridNioSslFilter.java:413) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioServer$HeadFilter.onMessageReceived(GridNioServer.java:3752) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioFilterChain.onMessageReceived(GridNioFilterChain.java:175) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioServer$DirectNioClientWorker.processRead(GridNioServer.java:1379) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.processSelectedKeysOptimized(GridNioServer.java:2526) [classes/:?] at org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.bodyInternal(GridNioServer.java:2281) [classes/:?] at org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.body(GridNioServer.java:1910) [classes/:?] at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125) [classes/:?] at java.base/java.lang.Thread.run(Thread.java:829) [?:?] {code} Alternatively you can see NullPointerExceptions, assertions errors and so on. In most cases this errors will lead only to message resend and reestablishing connections between nodes. But in some cases FH can be invoked with subsequent node termination. The main reason for the described behavior: 1.Node0 tries to establish connection with node1. 2. During the handshake, BlockingSslHandler is used to store, encrypt and decrypt data received over the network. It stores the decrypted data in a dedicated buffer - BlockingSslHandler#appBuf. 3. Lets assume node1 sends handshake finish message and some regular communication messages in a single network packet. 4. Node0 decrypts this packet and writes it to the mentioned above BlockingSslHandlerappBuf. So now it stores the bytes related to the handshake finished message and some regular messages. 5. Node0 finishes handshake and creates GridNioSslHandler to handle communication between nodes in NIO way. GridNioSslHandler have a separate set of buffers to store, encrypt and decrypt data received over the network. 6. After hanshake is finished, node0 tries to decode and handle all bytes remaining in BlockingSslHandler#appBuf. See org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java:262 and ditches the BlockingSslHandler 7. If node1 sends only part of the last regular message in step 3, the bytes related to this messages will not be precessed during step 6 and will remain in BlockingSslHandler#appBuf. 8. So when node 1 sends the last part of partially written message from step 3. It will be processed on node 0 by GridNioSslHandler from scratch and bytes left in BlockingSslHandler#appBuf will be lost. This will result in the deserialization errors mentioned above. Note, https://issues.apache.org/jira/browse/IGNITE-22375 can greatly increase the probability of the described problem. was: The following code can be considered as a reproducer of the problem. Its main goal is to cause Ignite to be overloaded with communication messages. In this particular case it will be GridJobExecuteRequest and GridJobExecuteResponce spamming. {code:java} /** */ public class BrokenSerializationTest extends AbstractSecurityTest { /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { SslContextFactory factory = (SslContextFactory)sslTrustedFactory("server", "trustone"); IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName) .setSslContextFactory(factory) .setFailureHandler(new StopNodeOrHaltFailureHandler()); cfg.setCommunicationSpi(new TcpCommunicationSpi().setUnacknowledgedMessagesBufferSize(160)); // Small values of this property causes Ignite to frequently reestablish connection between nodes. return cfg; } /** Tests task execution security context in case task was initiated from the {@link IgniteClient}. */ @Test public void test() throws Exception { IgniteEx ignite = startGridAllowAll("crd"); startGridAllowAll("srv1"); startGridAllowAll("srv2"); startGridAllowAll("srv3"); startGridAllowAll("srv4"); startGridAllowAll("srv5"); awaitPartitionMapExchange(); Set<Throwable> errors = ConcurrentHashMap.newKeySet(); Set<ComputeTaskFuture<Integer>> futs = ConcurrentHashMap.newKeySet(); CountDownLatch latch = new CountDownLatch(16 * 10000); for (int i = 0; i < 16; i++) { GridTestUtils.runAsync(() -> { try { U.sleep(1000); for (int j = 0; j < 10000; j++) { futs.add(ignite.context().task().<Void, Integer>execute( TestTask.class.getName(), null, TaskExecutionOptions.options().asPublicRequest().withProjection(ignite.cluster().nodes()) ).publicFuture()); latch.countDown(); } } catch (Throwable e) { errors.add(e); } }); } latch.await(getTestTimeout(), TimeUnit.MILLISECONDS); assertTrue(errors.isEmpty()); for (ComputeTaskFuture<Integer> fut : futs) { assertTrue(null == fut.get(getTestTimeout())); } } /** */ public static class TestTask implements ComputeTask<Void, Integer> { /** {@inheritDoc} */ @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map( List<ClusterNode> subgrid, @Nullable Void arg ) throws IgniteException { Map<ComputeJob, ClusterNode> res = new HashMap<>(); for (ClusterNode node : subgrid) { res.put(new ComputeJob() { @Override public void cancel() { // No-op. } @Override public Object execute() throws IgniteException { return null; } }, node); } return res; } /** {@inheritDoc} */ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { if (res.getException() != null) throw new IgniteException(res.getException()); return ComputeJobResultPolicy.WAIT; } /** {@inheritDoc} */ @Override public @Nullable Integer reduce(List<ComputeJobResult> results) throws IgniteException { assertEquals(allGrids().size(), results.size()); return null; } } } {code} It does not reproduce the problem sequentially because the problem is related to cases when only part of the message is read into the network buffer while reading from the socket. While this reproducer is in progress you can witness different errors related to communication message handling e.g. {code:java} [2025-01-16T10:10:11,136][ERROR][grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%][TcpCommunicationSpi] Failed to process selector key [ses=GridSelectorNioSessionImpl [worker=DirectNioClientWorker [super=AbstractNioClientWorker [idx=3, bytesRcvd=10261, bytesSent=0, bytesRcvd0=0, bytesSent0=0, select=true, super=GridWorker [name=grid-nio-worker-tcp-comm-3, igniteInstanceName=TcpCommunicationSpi, finished=false, heartbeatTs=1737011410109, hashCode=520079162, interrupted=false, runner=grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%]]], writeBuf=java.nio.DirectByteBuffer[pos=0 lim=32768 cap=32768], readBuf=java.nio.DirectByteBuffer[pos=2398 lim=2398 cap=32768], inRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], outRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], closeSocket=true, outboundMessagesQueueSizeMetric=o.a.i.i.processors.metric.impl.LongAdderMetric@69a257d1, super=GridNioSessionImpl [locAddr=/127.0.0.1:48568, rmtAddr=/127.0.0.1:47104, createTime=1737011410099, closeTime=0, bytesSent=0, bytesRcvd=2398, bytesSent0=0, bytesRcvd0=0, sndSchedTime=1737011410099, lastSndTime=1737011410109, lastRcvTime=1737011410109, readsPaused=false, filterChain=FilterChain[filters=[GridNioCodecFilter [parser=o.a.i.i.util.nio.GridDirectParser@49fbc83f, directMode=true], GridConnectionBytesVerifyFilter, SSL filter], accepted=false, markedForClose=false]]] org.apache.ignite.IgniteException: Invalid message type: 512 at org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl.create(IgniteMessageFactoryImpl.java:104) ~[classes/:?] at org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper$2.create(GridNioServerWrapper.java:811) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridDirectParser.decode(GridDirectParser.java:81) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioCodecFilter.onMessageReceived(GridNioCodecFilter.java:113) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter.onMessageReceived(GridConnectionBytesVerifyFilter.java:88) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) ~[classes/:?] at org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.onMessageReceived(GridNioSslFilter.java:413) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioServer$HeadFilter.onMessageReceived(GridNioServer.java:3752) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioFilterChain.onMessageReceived(GridNioFilterChain.java:175) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioServer$DirectNioClientWorker.processRead(GridNioServer.java:1379) ~[classes/:?] at org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.processSelectedKeysOptimized(GridNioServer.java:2526) [classes/:?] at org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.bodyInternal(GridNioServer.java:2281) [classes/:?] at org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.body(GridNioServer.java:1910) [classes/:?] at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125) [classes/:?] at java.base/java.lang.Thread.run(Thread.java:829) [?:?] {code} Alternatively you can see NullPointerExceptions, assertions errors and so on. In most cases this errors will lead only to message resend and reestablishing connections between nodes. But in some cases FH can be invoked with subsequent node termination. The main reason for the described behavior: 1.Node0 tries to establish connection with node1. 2. During the handshake, BlockingSslHandler is used to store, encrypt and decrypt data received over the network. It stores the decrypted data in a dedicated buffer - BlockingSslHandler#appBuf. 3. Lets assume node1 sends handshake finish message and some regular communication messages in a single network packet. 4. Node0 decrypts this packet and writes it to the mentioned above BlockingSslHandlerappBuf. So now it stores the bytes related to the handshake finished message and some regular messages. 5. Node0 finishes handshake and creates GridNioSslHandler to handle communication between nodes in NIO way. GridNioSslHandler have a separate set of buffers to store, encrypt and decrypt data received over the network. 6. After hanshake is finished, node0 tries to decode and handle all bytes remaining in BlockingSslHandler#appBuf. See org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java:262 and ditches the BlockingSslHandler 7. If node1 sends only part of the last regular message in step 3, the bytes related to this messages will not be precessed during step 6 and will remain in BlockingSslHandler#appBuf. 8. So when node 1 sends the last part of partially written message from step 3. It will be processed on node 0 by GridNioSslHandler from scratch and bytes left in BlockingSslHandler#appBuf will be lost. This will result in the deserialization errors mentioned above. Note, https://issues.apache.org/jira/browse/IGNITE-22375 can greatly increase the probability of the described problem. > Broken serialization of communication messages in cluster with SSL enabled. > --------------------------------------------------------------------------- > > Key: IGNITE-24224 > URL: https://issues.apache.org/jira/browse/IGNITE-24224 > Project: Ignite > Issue Type: Bug > Reporter: Mikhail Petrov > Priority: Blocker > > The following code can be considered as a reproducer of the problem. > Its main goal is to cause Ignite to be overloaded with communication > messages. In this particular case it will be GridJobExecuteRequest and > GridJobExecuteResponce spamming. > {code:java} > /** */ > public class BrokenSerializationTest extends AbstractSecurityTest { > /** {@inheritDoc} */ > @Override protected IgniteConfiguration getConfiguration(String > igniteInstanceName) throws Exception { > SslContextFactory factory = > (SslContextFactory)sslTrustedFactory("server", "trustone"); > IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName) > .setSslContextFactory(factory) > .setFailureHandler(new StopNodeOrHaltFailureHandler()); > cfg.setCommunicationSpi(new > TcpCommunicationSpi().setUnacknowledgedMessagesBufferSize(160)); // Small > values of this property causes Ignite to frequently reestablish connection > between nodes. > return cfg; > } > /** Tests task execution security context in case task was initiated from > the {@link IgniteClient}. */ > @Test > public void test() throws Exception { > IgniteEx ignite = startGridAllowAll("crd"); > startGridAllowAll("srv1"); > startGridAllowAll("srv2"); > startGridAllowAll("srv3"); > startGridAllowAll("srv4"); > startGridAllowAll("srv5"); > awaitPartitionMapExchange(); > Set<Throwable> errors = ConcurrentHashMap.newKeySet(); > Set<ComputeTaskFuture<Integer>> futs = ConcurrentHashMap.newKeySet(); > CountDownLatch latch = new CountDownLatch(16 * 10000); > for (int i = 0; i < 16; i++) { > GridTestUtils.runAsync(() -> { > try { > U.sleep(1000); > for (int j = 0; j < 10000; j++) { > futs.add(ignite.context().task().<Void, > Integer>execute( > TestTask.class.getName(), > null, > > TaskExecutionOptions.options().asPublicRequest().withProjection(ignite.cluster().nodes()) > ).publicFuture()); > latch.countDown(); > } > } > catch (Throwable e) { > errors.add(e); > } > }); > } > latch.await(getTestTimeout(), TimeUnit.MILLISECONDS); > assertTrue(errors.isEmpty()); > for (ComputeTaskFuture<Integer> fut : futs) { > assertTrue(null == fut.get(getTestTimeout())); > } > } > /** */ > public static class TestTask implements ComputeTask<Void, Integer> { > /** {@inheritDoc} */ > @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map( > List<ClusterNode> subgrid, > @Nullable Void arg > ) throws IgniteException { > Map<ComputeJob, ClusterNode> res = new HashMap<>(); > for (ClusterNode node : subgrid) { > res.put(new ComputeJob() { > @Override public void cancel() { > // No-op. > } > @Override public Object execute() throws IgniteException { > return null; > } > }, node); > } > return res; > } > /** {@inheritDoc} */ > @Override public ComputeJobResultPolicy result(ComputeJobResult res, > List<ComputeJobResult> rcvd) { > if (res.getException() != null) > throw new IgniteException(res.getException()); > return ComputeJobResultPolicy.WAIT; > } > /** {@inheritDoc} */ > @Override public @Nullable Integer reduce(List<ComputeJobResult> > results) throws IgniteException { > assertEquals(allGrids().size(), results.size()); > return null; > } > } > } > {code} > It does not reproduce the problem sequentially because the problem is related > to cases when only part of the message is read into the application buffer > while reading from the socket. > While this reproducer is in progress you can witness different errors related > to communication message handling e.g. > {code:java} > [2025-01-16T10:10:11,136][ERROR][grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%][TcpCommunicationSpi] > Failed to process selector key [ses=GridSelectorNioSessionImpl > [worker=DirectNioClientWorker [super=AbstractNioClientWorker [idx=3, > bytesRcvd=10261, bytesSent=0, bytesRcvd0=0, bytesSent0=0, select=true, > super=GridWorker [name=grid-nio-worker-tcp-comm-3, > igniteInstanceName=TcpCommunicationSpi, finished=false, > heartbeatTs=1737011410109, hashCode=520079162, interrupted=false, > runner=grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%]]], > writeBuf=java.nio.DirectByteBuffer[pos=0 lim=32768 cap=32768], > readBuf=java.nio.DirectByteBuffer[pos=2398 lim=2398 cap=32768], > inRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, > sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, > node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, > consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet > [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, > ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, > connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], > outRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, > rcvCnt=168, sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, > node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, > consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet > [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, > ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, > connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], > closeSocket=true, > outboundMessagesQueueSizeMetric=o.a.i.i.processors.metric.impl.LongAdderMetric@69a257d1, > super=GridNioSessionImpl [locAddr=/127.0.0.1:48568, > rmtAddr=/127.0.0.1:47104, createTime=1737011410099, closeTime=0, bytesSent=0, > bytesRcvd=2398, bytesSent0=0, bytesRcvd0=0, sndSchedTime=1737011410099, > lastSndTime=1737011410109, lastRcvTime=1737011410109, readsPaused=false, > filterChain=FilterChain[filters=[GridNioCodecFilter > [parser=o.a.i.i.util.nio.GridDirectParser@49fbc83f, directMode=true], > GridConnectionBytesVerifyFilter, SSL filter], accepted=false, > markedForClose=false]]] > org.apache.ignite.IgniteException: Invalid message type: 512 > at > org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl.create(IgniteMessageFactoryImpl.java:104) > ~[classes/:?] > at > org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper$2.create(GridNioServerWrapper.java:811) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridDirectParser.decode(GridDirectParser.java:81) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioCodecFilter.onMessageReceived(GridNioCodecFilter.java:113) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter.onMessageReceived(GridConnectionBytesVerifyFilter.java:88) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.onMessageReceived(GridNioSslFilter.java:413) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioServer$HeadFilter.onMessageReceived(GridNioServer.java:3752) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioFilterChain.onMessageReceived(GridNioFilterChain.java:175) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioServer$DirectNioClientWorker.processRead(GridNioServer.java:1379) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.processSelectedKeysOptimized(GridNioServer.java:2526) > [classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.bodyInternal(GridNioServer.java:2281) > [classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.body(GridNioServer.java:1910) > [classes/:?] > at > org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125) > [classes/:?] > at java.base/java.lang.Thread.run(Thread.java:829) [?:?] > {code} > Alternatively you can see NullPointerExceptions, assertions errors and so on. > In most cases this errors will lead only to message resend and reestablishing > connections between nodes. But in some cases FH can be invoked with > subsequent node termination. > The main reason for the described behavior: > 1.Node0 tries to establish connection with node1. > 2. During the handshake, BlockingSslHandler is used to store, encrypt and > decrypt data received over the network. It stores the decrypted data in a > dedicated buffer - BlockingSslHandler#appBuf. > 3. Lets assume node1 sends handshake finish message and some regular > communication messages in a single network packet. > 4. Node0 decrypts this packet and writes it to the mentioned above > BlockingSslHandlerappBuf. So now it stores the bytes related to the > handshake finished message and some regular messages. > 5. Node0 finishes handshake and creates GridNioSslHandler to handle > communication between nodes in NIO way. GridNioSslHandler have a separate set > of buffers to store, encrypt and decrypt data received over the network. > 6. After hanshake is finished, node0 tries to decode and handle all bytes > remaining in BlockingSslHandler#appBuf. See > org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java:262 and ditches > the BlockingSslHandler > 7. If node1 sends only part of the last regular message in step 3, the bytes > related to this messages will not be precessed during step 6 and will remain > in BlockingSslHandler#appBuf. > 8. So when node 1 sends the last part of partially written message from step > 3. It will be processed on node 0 by GridNioSslHandler from scratch and bytes > left in BlockingSslHandler#appBuf will be lost. > This will result in the deserialization errors mentioned above. > Note, https://issues.apache.org/jira/browse/IGNITE-22375 can greatly increase > the probability of the described problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)