[ 
https://issues.apache.org/jira/browse/IGNITE-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Petrov reassigned IGNITE-24224:
---------------------------------------

    Assignee: Mikhail Petrov

> Broken serialization of communication messages in cluster with SSL enabled.
> ---------------------------------------------------------------------------
>
>                 Key: IGNITE-24224
>                 URL: https://issues.apache.org/jira/browse/IGNITE-24224
>             Project: Ignite
>          Issue Type: Bug
>            Reporter: Mikhail Petrov
>            Assignee: Mikhail Petrov
>            Priority: Blocker
>              Labels: ise
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The following code can be considered as a reproducer of the problem.
> Its main goal is to cause Ignite to be overloaded with communication 
> messages. In this particular case it will be GridJobExecuteRequest and 
> GridJobExecuteResponce spamming.
> {code:java}
> /** */
> public class BrokenSerializationTest extends AbstractSecurityTest {
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String 
> igniteInstanceName) throws Exception {
>         SslContextFactory factory = 
> (SslContextFactory)sslTrustedFactory("server", "trustone");
>         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
>             .setSslContextFactory(factory)
>             .setFailureHandler(new StopNodeOrHaltFailureHandler());
>         cfg.setCommunicationSpi(new 
> TcpCommunicationSpi().setUnacknowledgedMessagesBufferSize(160)); // Small 
> values of this property causes Ignite to frequently reestablish connection 
> between nodes.
>         return cfg;
>     }
>     /** Tests task execution security context in case task was initiated from 
> the {@link IgniteClient}. */
>     @Test
>     public void test() throws Exception {
>         IgniteEx ignite = startGridAllowAll("crd");
>         startGridAllowAll("srv1");
>         startGridAllowAll("srv2");
>         startGridAllowAll("srv3");
>         startGridAllowAll("srv4");
>         startGridAllowAll("srv5");
>         awaitPartitionMapExchange();
>         Set<Throwable> errors = ConcurrentHashMap.newKeySet();
>         Set<ComputeTaskFuture<Integer>> futs = ConcurrentHashMap.newKeySet();
>         CountDownLatch latch = new CountDownLatch(16 * 10000);
>         for (int i = 0; i < 16; i++) {
>             GridTestUtils.runAsync(() -> {
>                 try {
>                     U.sleep(1000);
>                     for (int j = 0; j < 10000; j++) {
>                         futs.add(ignite.context().task().<Void, 
> Integer>execute(
>                             TestTask.class.getName(),
>                             null,
>                             
> TaskExecutionOptions.options().asPublicRequest().withProjection(ignite.cluster().nodes())
>                         ).publicFuture());
>                         latch.countDown();
>                     }
>                 }
>                 catch (Throwable e) {
>                     errors.add(e);
>                 }
>             });
>         }
>         latch.await(getTestTimeout(), TimeUnit.MILLISECONDS);
>         assertTrue(errors.isEmpty());
>         for (ComputeTaskFuture<Integer> fut : futs) {
>             assertTrue(null == fut.get(getTestTimeout()));
>         }
>     }
>     /** */
>     public static class TestTask implements ComputeTask<Void, Integer> {
>         /** {@inheritDoc} */
>         @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
>             List<ClusterNode> subgrid,
>             @Nullable Void arg
>         ) throws IgniteException {
>             Map<ComputeJob, ClusterNode> res = new HashMap<>();
>             for (ClusterNode node : subgrid) {
>                 res.put(new ComputeJob() {
>                     @Override public void cancel() {
>                         // No-op.
>                     }
>                     @Override public Object execute() throws IgniteException {
>                         return null;
>                     }
>                 }, node);
>             }
>             return res;
>         }
>         /** {@inheritDoc} */
>         @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
> List<ComputeJobResult> rcvd) {
>             if (res.getException() != null)
>                 throw new IgniteException(res.getException());
>             return ComputeJobResultPolicy.WAIT;
>         }
>         /** {@inheritDoc} */
>         @Override public @Nullable Integer reduce(List<ComputeJobResult> 
> results) throws IgniteException {
>             assertEquals(allGrids().size(), results.size());
>             return null;
>         }
>     }
> }
> {code}
> It does not reproduce the problem sequentially because the problem is related 
> to cases when only part of the message is read into the application buffer 
> while reading from the socket.
> While this reproducer is in progress you can witness different errors related 
> to communication message handling e.g.
> {code:java}
> [2025-01-16T10:10:11,136][ERROR][grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%][TcpCommunicationSpi]
>  Failed to process selector key [ses=GridSelectorNioSessionImpl 
> [worker=DirectNioClientWorker [super=AbstractNioClientWorker [idx=3, 
> bytesRcvd=10261, bytesSent=0, bytesRcvd0=0, bytesSent0=0, select=true, 
> super=GridWorker [name=grid-nio-worker-tcp-comm-3, 
> igniteInstanceName=TcpCommunicationSpi, finished=false, 
> heartbeatTs=1737011410109, hashCode=520079162, interrupted=false, 
> runner=grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%]]], 
> writeBuf=java.nio.DirectByteBuffer[pos=0 lim=32768 cap=32768], 
> readBuf=java.nio.DirectByteBuffer[pos=2398 lim=2398 cap=32768], 
> inRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, 
> sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, 
> node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, 
> consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet 
> [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, 
> ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, 
> connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], 
> outRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, 
> rcvCnt=168, sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, 
> node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, 
> consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet 
> [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, 
> ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, 
> connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], 
> closeSocket=true, 
> outboundMessagesQueueSizeMetric=o.a.i.i.processors.metric.impl.LongAdderMetric@69a257d1,
>  super=GridNioSessionImpl [locAddr=/127.0.0.1:48568, 
> rmtAddr=/127.0.0.1:47104, createTime=1737011410099, closeTime=0, bytesSent=0, 
> bytesRcvd=2398, bytesSent0=0, bytesRcvd0=0, sndSchedTime=1737011410099, 
> lastSndTime=1737011410109, lastRcvTime=1737011410109, readsPaused=false, 
> filterChain=FilterChain[filters=[GridNioCodecFilter 
> [parser=o.a.i.i.util.nio.GridDirectParser@49fbc83f, directMode=true], 
> GridConnectionBytesVerifyFilter, SSL filter], accepted=false, 
> markedForClose=false]]]
>  org.apache.ignite.IgniteException: Invalid message type: 512
>       at 
> org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl.create(IgniteMessageFactoryImpl.java:104)
>  ~[classes/:?]
>       at 
> org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper$2.create(GridNioServerWrapper.java:811)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridDirectParser.decode(GridDirectParser.java:81)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioCodecFilter.onMessageReceived(GridNioCodecFilter.java:113)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter.onMessageReceived(GridConnectionBytesVerifyFilter.java:88)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.onMessageReceived(GridNioSslFilter.java:413)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioServer$HeadFilter.onMessageReceived(GridNioServer.java:3752)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioFilterChain.onMessageReceived(GridNioFilterChain.java:175)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioServer$DirectNioClientWorker.processRead(GridNioServer.java:1379)
>  ~[classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.processSelectedKeysOptimized(GridNioServer.java:2526)
>  [classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.bodyInternal(GridNioServer.java:2281)
>  [classes/:?]
>       at 
> org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.body(GridNioServer.java:1910)
>  [classes/:?]
>       at 
> org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125) 
> [classes/:?]
>       at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
> {code}
> Alternatively you can see NullPointerExceptions, assertions errors and so on. 
> In most cases this errors will lead only to message resend and reestablishing 
> connections between nodes. But in some cases FH can be invoked with 
> subsequent node termination. 
> The main reason for the described behavior:
> 1.Node0 tries to establish connection with node1.
> 2. During the handshake, BlockingSslHandler is used to store, encrypt and 
> decrypt data received over the network. It stores the decrypted data in a 
> dedicated buffer - BlockingSslHandler#appBuf.
> 3. Lets assume node1 sends handshake finish message and some regular 
> communication messages in a single network packet.
> 4. Node0 decrypts this packet and writes it to the mentioned above 
> BlockingSslHandlerappBuf. So now it stores the bytes related to  the 
> handshake finished message and some regular messages.
> 5. Node0 finishes handshake and creates GridNioSslHandler to handle 
> communication between nodes in NIO way. GridNioSslHandler have a separate set 
> of buffers to store, encrypt and decrypt data received over the network.
> 6.  After hanshake is finished, node0 tries to decode and handle all bytes 
> remaining in BlockingSslHandler#appBuf. See 
> org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java:262 and ditches 
> the BlockingSslHandler
> 7. If node1 sends only part of the last regular message in step 3, some bytes 
> related to this messages can be unhandled during step 6 and will remain in 
> BlockingSslHandler#appBuf. It can happen in case when there is not enough 
> bytes to read a single communication message field.
> 8. So when node 1 sends the last part of partially written message from step 
> 3. It will be processed on node 0 by GridNioSslHandler from scratch and bytes 
> left in BlockingSslHandler#appBuf will be lost.
> This will result in the deserialization errors mentioned above.
> Note, https://issues.apache.org/jira/browse/IGNITE-22375 can greatly increase 
> the probability of the described problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to