[ 
https://issues.apache.org/jira/browse/IGNITE-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Petrov updated IGNITE-24224:
------------------------------------
    Description: 
The following code can be considered as a reproducer of the problem.
Its main goal is to cause Ignite to be overloaded with communication messages. 
In this particular case it will be GridJobExecuteRequest and 
GridJobExecuteResponce spamming.

{code:java}
/** */
public class BrokenSerializationTest extends AbstractSecurityTest {
    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
        SslContextFactory factory = 
(SslContextFactory)sslTrustedFactory("server", "trustone");

        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
            .setSslContextFactory(factory)
            .setFailureHandler(new StopNodeOrHaltFailureHandler());

        cfg.setCommunicationSpi(new 
TcpCommunicationSpi().setUnacknowledgedMessagesBufferSize(160)); // Small 
values of this property causes Ignite to frequently reestablish connection 
between nodes.

        return cfg;
    }

    /** Tests task execution security context in case task was initiated from 
the {@link IgniteClient}. */
    @Test
    public void test() throws Exception {
        IgniteEx ignite = startGridAllowAll("crd");
        startGridAllowAll("srv1");
        startGridAllowAll("srv2");
        startGridAllowAll("srv3");
        startGridAllowAll("srv4");
        startGridAllowAll("srv5");

        awaitPartitionMapExchange();

        Set<Throwable> errors = ConcurrentHashMap.newKeySet();
        Set<ComputeTaskFuture<Integer>> futs = ConcurrentHashMap.newKeySet();
        CountDownLatch latch = new CountDownLatch(16 * 10000);

        for (int i = 0; i < 16; i++) {
            GridTestUtils.runAsync(() -> {
                try {
                    U.sleep(1000);

                    for (int j = 0; j < 10000; j++) {
                        futs.add(ignite.context().task().<Void, Integer>execute(
                            TestTask.class.getName(),
                            null,
                            
TaskExecutionOptions.options().asPublicRequest().withProjection(ignite.cluster().nodes())
                        ).publicFuture());

                        latch.countDown();
                    }
                }
                catch (Throwable e) {
                    errors.add(e);
                }
            });
        }

        latch.await(getTestTimeout(), TimeUnit.MILLISECONDS);

        assertTrue(errors.isEmpty());

        for (ComputeTaskFuture<Integer> fut : futs) {
            assertTrue(null == fut.get(getTestTimeout()));
        }
    }

    /** */
    public static class TestTask implements ComputeTask<Void, Integer> {
        /** {@inheritDoc} */
        @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
            List<ClusterNode> subgrid,
            @Nullable Void arg
        ) throws IgniteException {
            Map<ComputeJob, ClusterNode> res = new HashMap<>();

            for (ClusterNode node : subgrid) {
                res.put(new ComputeJob() {
                    @Override public void cancel() {
                        // No-op.
                    }

                    @Override public Object execute() throws IgniteException {
                        return null;
                    }
                }, node);
            }

            return res;
        }

        /** {@inheritDoc} */
        @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> rcvd) {
            if (res.getException() != null)
                throw new IgniteException(res.getException());

            return ComputeJobResultPolicy.WAIT;
        }

        /** {@inheritDoc} */
        @Override public @Nullable Integer reduce(List<ComputeJobResult> 
results) throws IgniteException {
            assertEquals(allGrids().size(), results.size());

            return null;
        }
    }
}

{code}


It does not reproduce the problem sequentially because the problem is related 
to cases when only part of the message is read into the application buffer 
while reading from the socket.

While this reproducer is in progress you can witness different errors related 
to communication message handling e.g.

{code:java}
[2025-01-16T10:10:11,136][ERROR][grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%][TcpCommunicationSpi]
 Failed to process selector key [ses=GridSelectorNioSessionImpl 
[worker=DirectNioClientWorker [super=AbstractNioClientWorker [idx=3, 
bytesRcvd=10261, bytesSent=0, bytesRcvd0=0, bytesSent0=0, select=true, 
super=GridWorker [name=grid-nio-worker-tcp-comm-3, 
igniteInstanceName=TcpCommunicationSpi, finished=false, 
heartbeatTs=1737011410109, hashCode=520079162, interrupted=false, 
runner=grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%]]], 
writeBuf=java.nio.DirectByteBuffer[pos=0 lim=32768 cap=32768], 
readBuf=java.nio.DirectByteBuffer[pos=2398 lim=2398 cap=32768], 
inRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, 
sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, 
node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, 
consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet 
[/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, 
ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, 
connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], 
outRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, 
sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, 
node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, 
consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet 
[/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, 
ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, 
connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], 
closeSocket=true, 
outboundMessagesQueueSizeMetric=o.a.i.i.processors.metric.impl.LongAdderMetric@69a257d1,
 super=GridNioSessionImpl [locAddr=/127.0.0.1:48568, rmtAddr=/127.0.0.1:47104, 
createTime=1737011410099, closeTime=0, bytesSent=0, bytesRcvd=2398, 
bytesSent0=0, bytesRcvd0=0, sndSchedTime=1737011410099, 
lastSndTime=1737011410109, lastRcvTime=1737011410109, readsPaused=false, 
filterChain=FilterChain[filters=[GridNioCodecFilter 
[parser=o.a.i.i.util.nio.GridDirectParser@49fbc83f, directMode=true], 
GridConnectionBytesVerifyFilter, SSL filter], accepted=false, 
markedForClose=false]]]
 org.apache.ignite.IgniteException: Invalid message type: 512
        at 
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl.create(IgniteMessageFactoryImpl.java:104)
 ~[classes/:?]
        at 
org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper$2.create(GridNioServerWrapper.java:811)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridDirectParser.decode(GridDirectParser.java:81)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioCodecFilter.onMessageReceived(GridNioCodecFilter.java:113)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter.onMessageReceived(GridConnectionBytesVerifyFilter.java:88)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.onMessageReceived(GridNioSslFilter.java:413)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioServer$HeadFilter.onMessageReceived(GridNioServer.java:3752)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioFilterChain.onMessageReceived(GridNioFilterChain.java:175)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioServer$DirectNioClientWorker.processRead(GridNioServer.java:1379)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.processSelectedKeysOptimized(GridNioServer.java:2526)
 [classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.bodyInternal(GridNioServer.java:2281)
 [classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.body(GridNioServer.java:1910)
 [classes/:?]
        at 
org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125) 
[classes/:?]
        at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
{code}

Alternatively you can see NullPointerExceptions, assertions errors and so on. 

In most cases this errors will lead only to message resend and reestablishing 
connections between nodes. But in some cases FH can be invoked with subsequent 
node termination. 

The main reason for the described behavior:

1.Node0 tries to establish connection with node1.
2. During the handshake, BlockingSslHandler is used to store, encrypt and 
decrypt data received over the network. It stores the decrypted data in a 
dedicated buffer - BlockingSslHandler#appBuf.
3. Lets assume node1 sends handshake finish message and some regular 
communication messages in a single network packet.
4. Node0 decrypts this packet and writes it to the mentioned above 
BlockingSslHandlerappBuf. So now it stores the bytes related to  the handshake 
finished message and some regular messages.
5. Node0 finishes handshake and creates GridNioSslHandler to handle 
communication between nodes in NIO way. GridNioSslHandler have a separate set 
of buffers to store, encrypt and decrypt data received over the network.
6.  After hanshake is finished, node0 tries to decode and handle all bytes 
remaining in BlockingSslHandler#appBuf. See 
org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java:262 and ditches 
the BlockingSslHandler
7. If node1 sends only part of the last regular message in step 3, the bytes 
related to this messages will not be precessed during step 6 and will remain in 
BlockingSslHandler#appBuf.
8. So when node 1 sends the last part of partially written message from step 3. 
It will be processed on node 0 by GridNioSslHandler from scratch and bytes left 
in BlockingSslHandler#appBuf will be lost.

This will result in the deserialization errors mentioned above.

Note, https://issues.apache.org/jira/browse/IGNITE-22375 can greatly increase 
the probability of the described problem.



  was:
The following code can be considered as a reproducer of the problem.
Its main goal is to cause Ignite to be overloaded with communication messages. 
In this particular case it will be GridJobExecuteRequest and 
GridJobExecuteResponce spamming.

{code:java}
/** */
public class BrokenSerializationTest extends AbstractSecurityTest {
    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
        SslContextFactory factory = 
(SslContextFactory)sslTrustedFactory("server", "trustone");

        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
            .setSslContextFactory(factory)
            .setFailureHandler(new StopNodeOrHaltFailureHandler());

        cfg.setCommunicationSpi(new 
TcpCommunicationSpi().setUnacknowledgedMessagesBufferSize(160)); // Small 
values of this property causes Ignite to frequently reestablish connection 
between nodes.

        return cfg;
    }

    /** Tests task execution security context in case task was initiated from 
the {@link IgniteClient}. */
    @Test
    public void test() throws Exception {
        IgniteEx ignite = startGridAllowAll("crd");
        startGridAllowAll("srv1");
        startGridAllowAll("srv2");
        startGridAllowAll("srv3");
        startGridAllowAll("srv4");
        startGridAllowAll("srv5");

        awaitPartitionMapExchange();

        Set<Throwable> errors = ConcurrentHashMap.newKeySet();
        Set<ComputeTaskFuture<Integer>> futs = ConcurrentHashMap.newKeySet();
        CountDownLatch latch = new CountDownLatch(16 * 10000);

        for (int i = 0; i < 16; i++) {
            GridTestUtils.runAsync(() -> {
                try {
                    U.sleep(1000);

                    for (int j = 0; j < 10000; j++) {
                        futs.add(ignite.context().task().<Void, Integer>execute(
                            TestTask.class.getName(),
                            null,
                            
TaskExecutionOptions.options().asPublicRequest().withProjection(ignite.cluster().nodes())
                        ).publicFuture());

                        latch.countDown();
                    }
                }
                catch (Throwable e) {
                    errors.add(e);
                }
            });
        }

        latch.await(getTestTimeout(), TimeUnit.MILLISECONDS);

        assertTrue(errors.isEmpty());

        for (ComputeTaskFuture<Integer> fut : futs) {
            assertTrue(null == fut.get(getTestTimeout()));
        }
    }

    /** */
    public static class TestTask implements ComputeTask<Void, Integer> {
        /** {@inheritDoc} */
        @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
            List<ClusterNode> subgrid,
            @Nullable Void arg
        ) throws IgniteException {
            Map<ComputeJob, ClusterNode> res = new HashMap<>();

            for (ClusterNode node : subgrid) {
                res.put(new ComputeJob() {
                    @Override public void cancel() {
                        // No-op.
                    }

                    @Override public Object execute() throws IgniteException {
                        return null;
                    }
                }, node);
            }

            return res;
        }

        /** {@inheritDoc} */
        @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> rcvd) {
            if (res.getException() != null)
                throw new IgniteException(res.getException());

            return ComputeJobResultPolicy.WAIT;
        }

        /** {@inheritDoc} */
        @Override public @Nullable Integer reduce(List<ComputeJobResult> 
results) throws IgniteException {
            assertEquals(allGrids().size(), results.size());

            return null;
        }
    }
}

{code}


It does not reproduce the problem sequentially because the problem is related 
to cases when only part of the message is read into the network buffer while 
reading from the socket.

While this reproducer is in progress you can witness different errors related 
to communication message handling e.g.

{code:java}
[2025-01-16T10:10:11,136][ERROR][grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%][TcpCommunicationSpi]
 Failed to process selector key [ses=GridSelectorNioSessionImpl 
[worker=DirectNioClientWorker [super=AbstractNioClientWorker [idx=3, 
bytesRcvd=10261, bytesSent=0, bytesRcvd0=0, bytesSent0=0, select=true, 
super=GridWorker [name=grid-nio-worker-tcp-comm-3, 
igniteInstanceName=TcpCommunicationSpi, finished=false, 
heartbeatTs=1737011410109, hashCode=520079162, interrupted=false, 
runner=grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%]]], 
writeBuf=java.nio.DirectByteBuffer[pos=0 lim=32768 cap=32768], 
readBuf=java.nio.DirectByteBuffer[pos=2398 lim=2398 cap=32768], 
inRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, 
sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, 
node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, 
consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet 
[/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, 
ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, 
connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], 
outRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, 
sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, 
node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, 
consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet 
[/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, 
ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, 
connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], 
closeSocket=true, 
outboundMessagesQueueSizeMetric=o.a.i.i.processors.metric.impl.LongAdderMetric@69a257d1,
 super=GridNioSessionImpl [locAddr=/127.0.0.1:48568, rmtAddr=/127.0.0.1:47104, 
createTime=1737011410099, closeTime=0, bytesSent=0, bytesRcvd=2398, 
bytesSent0=0, bytesRcvd0=0, sndSchedTime=1737011410099, 
lastSndTime=1737011410109, lastRcvTime=1737011410109, readsPaused=false, 
filterChain=FilterChain[filters=[GridNioCodecFilter 
[parser=o.a.i.i.util.nio.GridDirectParser@49fbc83f, directMode=true], 
GridConnectionBytesVerifyFilter, SSL filter], accepted=false, 
markedForClose=false]]]
 org.apache.ignite.IgniteException: Invalid message type: 512
        at 
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl.create(IgniteMessageFactoryImpl.java:104)
 ~[classes/:?]
        at 
org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper$2.create(GridNioServerWrapper.java:811)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridDirectParser.decode(GridDirectParser.java:81)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioCodecFilter.onMessageReceived(GridNioCodecFilter.java:113)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter.onMessageReceived(GridConnectionBytesVerifyFilter.java:88)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.onMessageReceived(GridNioSslFilter.java:413)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioServer$HeadFilter.onMessageReceived(GridNioServer.java:3752)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioFilterChain.onMessageReceived(GridNioFilterChain.java:175)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioServer$DirectNioClientWorker.processRead(GridNioServer.java:1379)
 ~[classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.processSelectedKeysOptimized(GridNioServer.java:2526)
 [classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.bodyInternal(GridNioServer.java:2281)
 [classes/:?]
        at 
org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.body(GridNioServer.java:1910)
 [classes/:?]
        at 
org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125) 
[classes/:?]
        at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
{code}

Alternatively you can see NullPointerExceptions, assertions errors and so on. 

In most cases this errors will lead only to message resend and reestablishing 
connections between nodes. But in some cases FH can be invoked with subsequent 
node termination. 

The main reason for the described behavior:

1.Node0 tries to establish connection with node1.
2. During the handshake, BlockingSslHandler is used to store, encrypt and 
decrypt data received over the network. It stores the decrypted data in a 
dedicated buffer - BlockingSslHandler#appBuf.
3. Lets assume node1 sends handshake finish message and some regular 
communication messages in a single network packet.
4. Node0 decrypts this packet and writes it to the mentioned above 
BlockingSslHandlerappBuf. So now it stores the bytes related to  the handshake 
finished message and some regular messages.
5. Node0 finishes handshake and creates GridNioSslHandler to handle 
communication between nodes in NIO way. GridNioSslHandler have a separate set 
of buffers to store, encrypt and decrypt data received over the network.
6.  After hanshake is finished, node0 tries to decode and handle all bytes 
remaining in BlockingSslHandler#appBuf. See 
org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java:262 and ditches 
the BlockingSslHandler
7. If node1 sends only part of the last regular message in step 3, the bytes 
related to this messages will not be precessed during step 6 and will remain in 
BlockingSslHandler#appBuf.
8. So when node 1 sends the last part of partially written message from step 3. 
It will be processed on node 0 by GridNioSslHandler from scratch and bytes left 
in BlockingSslHandler#appBuf will be lost.

This will result in the deserialization errors mentioned above.

Note, https://issues.apache.org/jira/browse/IGNITE-22375 can greatly increase 
the probability of the described problem.




> Broken serialization of communication messages in cluster with SSL enabled.
> ---------------------------------------------------------------------------
>
>                 Key: IGNITE-24224
>                 URL: https://issues.apache.org/jira/browse/IGNITE-24224
>             Project: Ignite
>          Issue Type: Bug
>            Reporter: Mikhail Petrov
>            Priority: Blocker
>
> The following code can be considered as a reproducer of the problem.
> Its main goal is to cause Ignite to be overloaded with communication 
> messages. In this particular case it will be GridJobExecuteRequest and 
> GridJobExecuteResponce spamming.
> {code:java}
> /** */
> public class BrokenSerializationTest extends AbstractSecurityTest {
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String 
> igniteInstanceName) throws Exception {
>         SslContextFactory factory = 
> (SslContextFactory)sslTrustedFactory("server", "trustone");
>         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
>             .setSslContextFactory(factory)
>             .setFailureHandler(new StopNodeOrHaltFailureHandler());
>         cfg.setCommunicationSpi(new 
> TcpCommunicationSpi().setUnacknowledgedMessagesBufferSize(160)); // Small 
> values of this property causes Ignite to frequently reestablish connection 
> between nodes.
>         return cfg;
>     }
>     /** Tests task execution security context in case task was initiated from 
> the {@link IgniteClient}. */
>     @Test
>     public void test() throws Exception {
>         IgniteEx ignite = startGridAllowAll("crd");
>         startGridAllowAll("srv1");
>         startGridAllowAll("srv2");
>         startGridAllowAll("srv3");
>         startGridAllowAll("srv4");
>         startGridAllowAll("srv5");
>         awaitPartitionMapExchange();
>         Set<Throwable> errors = ConcurrentHashMap.newKeySet();
>         Set<ComputeTaskFuture<Integer>> futs = ConcurrentHashMap.newKeySet();
>         CountDownLatch latch = new CountDownLatch(16 * 10000);
>         for (int i = 0; i < 16; i++) {
>             GridTestUtils.runAsync(() -> {
>                 try {
>                     U.sleep(1000);
>                     for (int j = 0; j < 10000; j++) {
>                         futs.add(ignite.context().task().<Void, 
> Integer>execute(
>                             TestTask.class.getName(),
>                             null,
>                             
> TaskExecutionOptions.options().asPublicRequest().withProjection(ignite.cluster().nodes())
>                         ).publicFuture());
>                         latch.countDown();
>                     }
>                 }
>                 catch (Throwable e) {
>                     errors.add(e);
>                 }
>             });
>         }
>         latch.await(getTestTimeout(), TimeUnit.MILLISECONDS);
>         assertTrue(errors.isEmpty());
>         for (ComputeTaskFuture<Integer> fut : futs) {
>             assertTrue(null == fut.get(getTestTimeout()));
>         }
>     }
>     /** */
>     public static class TestTask implements ComputeTask<Void, Integer> {
>         /** {@inheritDoc} */
>         @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
>             List<ClusterNode> subgrid,
>             @Nullable Void arg
>         ) throws IgniteException {
>             Map<ComputeJob, ClusterNode> res = new HashMap<>();
>             for (ClusterNode node : subgrid) {
>                 res.put(new ComputeJob() {
>                     @Override public void cancel() {
>                         // No-op.
>                     }
>                     @Override public Object execute() throws IgniteException {
>                         return null;
>                     }
>                 }, node);
>             }
>             return res;
>         }
>         /** {@inheritDoc} */
>         @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
> List<ComputeJobResult> rcvd) {
>             if (res.getException() != null)
>                 throw new IgniteException(res.getException());
>             return ComputeJobResultPolicy.WAIT;
>         }
>         /** {@inheritDoc} */
>         @Override public @Nullable Integer reduce(List<ComputeJobResult> 
> results) throws IgniteException {
>             assertEquals(allGrids().size(), results.size());
>             return null;
>         }
>     }
> }
> {code}
> It does not reproduce the problem sequentially because the problem is related 
> to cases when only part of the message is read into the application buffer 
> while reading from the socket.
> While this reproducer is in progress you can witness different errors related 
> to communication message handling e.g.
> {code:java}
> [2025-01-16T10:10:11,136][ERROR][grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%][TcpCommunicationSpi]
>  Failed to process selector key [ses=GridSelectorNioSessionImpl 
> [worker=DirectNioClientWorker [super=AbstractNioClientWorker [idx=3, 
> bytesRcvd=10261, bytesSent=0, bytesRcvd0=0, bytesSent0=0, select=true, 
> super=GridWorker [name=grid-nio-worker-tcp-comm-3, 
> igniteInstanceName=TcpCommunicationSpi, finished=false, 
> heartbeatTs=1737011410109, hashCode=520079162, interrupted=false, 
> runner=grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%]]], 
> writeBuf=java.nio.DirectByteBuffer[pos=0 lim=32768 cap=32768], 
> readBuf=java.nio.DirectByteBuffer[pos=2398 lim=2398 cap=32768], 
> inRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, 
> sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, 
> node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, 
> consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet 
> [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, 
> ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, 
> connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], 
> outRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, 
> rcvCnt=168, sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, 
> node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, 
> consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet 
> [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, 
> ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, 
> connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], 
> closeSocket=true, 
> outboundMessagesQueueSizeMetric=o.a.i.i.processors.metric.impl.LongAdderMetric@69a257d1,
>  super=GridNioSessionImpl [locAddr=/127.0.0.1:48568, 
> rmtAddr=/127.0.0.1:47104, createTime=1737011410099, closeTime=0, bytesSent=0, 
> bytesRcvd=2398, bytesSent0=0, bytesRcvd0=0, sndSchedTime=1737011410099, 
> lastSndTime=1737011410109, lastRcvTime=1737011410109, readsPaused=false, 
> filterChain=FilterChain[filters=[GridNioCodecFilter 
> [parser=o.a.i.i.util.nio.GridDirectParser@49fbc83f, directMode=true], 
> GridConnectionBytesVerifyFilter, SSL filter], accepted=false, 
> markedForClose=false]]]
>  org.apache.ignite.IgniteException: Invalid message type: 512
>       at 
> org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl.create(IgniteMessageFactoryImpl.java:104)
>  ~[classes/:?]
>       at 
> org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper$2.create(GridNioServerWrapper.java:811)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridDirectParser.decode(GridDirectParser.java:81)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioCodecFilter.onMessageReceived(GridNioCodecFilter.java:113)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter.onMessageReceived(GridConnectionBytesVerifyFilter.java:88)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.onMessageReceived(GridNioSslFilter.java:413)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioServer$HeadFilter.onMessageReceived(GridNioServer.java:3752)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioFilterChain.onMessageReceived(GridNioFilterChain.java:175)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioServer$DirectNioClientWorker.processRead(GridNioServer.java:1379)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.processSelectedKeysOptimized(GridNioServer.java:2526)
>  [classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.bodyInternal(GridNioServer.java:2281)
>  [classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.body(GridNioServer.java:1910)
>  [classes/:?]
>       at 
> org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125) 
> [classes/:?]
>       at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
> {code}
> Alternatively you can see NullPointerExceptions, assertions errors and so on. 
> In most cases this errors will lead only to message resend and reestablishing 
> connections between nodes. But in some cases FH can be invoked with 
> subsequent node termination. 
> The main reason for the described behavior:
> 1.Node0 tries to establish connection with node1.
> 2. During the handshake, BlockingSslHandler is used to store, encrypt and 
> decrypt data received over the network. It stores the decrypted data in a 
> dedicated buffer - BlockingSslHandler#appBuf.
> 3. Lets assume node1 sends handshake finish message and some regular 
> communication messages in a single network packet.
> 4. Node0 decrypts this packet and writes it to the mentioned above 
> BlockingSslHandlerappBuf. So now it stores the bytes related to  the 
> handshake finished message and some regular messages.
> 5. Node0 finishes handshake and creates GridNioSslHandler to handle 
> communication between nodes in NIO way. GridNioSslHandler have a separate set 
> of buffers to store, encrypt and decrypt data received over the network.
> 6.  After hanshake is finished, node0 tries to decode and handle all bytes 
> remaining in BlockingSslHandler#appBuf. See 
> org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java:262 and ditches 
> the BlockingSslHandler
> 7. If node1 sends only part of the last regular message in step 3, the bytes 
> related to this messages will not be precessed during step 6 and will remain 
> in BlockingSslHandler#appBuf.
> 8. So when node 1 sends the last part of partially written message from step 
> 3. It will be processed on node 0 by GridNioSslHandler from scratch and bytes 
> left in BlockingSslHandler#appBuf will be lost.
> This will result in the deserialization errors mentioned above.
> Note, https://issues.apache.org/jira/browse/IGNITE-22375 can greatly increase 
> the probability of the described problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to