[ https://issues.apache.org/jira/browse/IGNITE-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mikhail Petrov reassigned IGNITE-24224: --------------------------------------- Assignee: Mikhail Petrov > Broken serialization of communication messages in cluster with SSL enabled. > --------------------------------------------------------------------------- > > Key: IGNITE-24224 > URL: https://issues.apache.org/jira/browse/IGNITE-24224 > Project: Ignite > Issue Type: Bug > Reporter: Mikhail Petrov > Assignee: Mikhail Petrov > Priority: Blocker > Labels: ise > Time Spent: 0.5h > Remaining Estimate: 0h > > The following code can be considered as a reproducer of the problem. > Its main goal is to cause Ignite to be overloaded with communication > messages. In this particular case it will be GridJobExecuteRequest and > GridJobExecuteResponce spamming. > {code:java} > /** */ > public class BrokenSerializationTest extends AbstractSecurityTest { > /** {@inheritDoc} */ > @Override protected IgniteConfiguration getConfiguration(String > igniteInstanceName) throws Exception { > SslContextFactory factory = > (SslContextFactory)sslTrustedFactory("server", "trustone"); > IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName) > .setSslContextFactory(factory) > .setFailureHandler(new StopNodeOrHaltFailureHandler()); > cfg.setCommunicationSpi(new > TcpCommunicationSpi().setUnacknowledgedMessagesBufferSize(160)); // Small > values of this property causes Ignite to frequently reestablish connection > between nodes. > return cfg; > } > /** Tests task execution security context in case task was initiated from > the {@link IgniteClient}. */ > @Test > public void test() throws Exception { > IgniteEx ignite = startGridAllowAll("crd"); > startGridAllowAll("srv1"); > startGridAllowAll("srv2"); > startGridAllowAll("srv3"); > startGridAllowAll("srv4"); > startGridAllowAll("srv5"); > awaitPartitionMapExchange(); > Set<Throwable> errors = ConcurrentHashMap.newKeySet(); > Set<ComputeTaskFuture<Integer>> futs = ConcurrentHashMap.newKeySet(); > CountDownLatch latch = new CountDownLatch(16 * 10000); > for (int i = 0; i < 16; i++) { > GridTestUtils.runAsync(() -> { > try { > U.sleep(1000); > for (int j = 0; j < 10000; j++) { > futs.add(ignite.context().task().<Void, > Integer>execute( > TestTask.class.getName(), > null, > > TaskExecutionOptions.options().asPublicRequest().withProjection(ignite.cluster().nodes()) > ).publicFuture()); > latch.countDown(); > } > } > catch (Throwable e) { > errors.add(e); > } > }); > } > latch.await(getTestTimeout(), TimeUnit.MILLISECONDS); > assertTrue(errors.isEmpty()); > for (ComputeTaskFuture<Integer> fut : futs) { > assertTrue(null == fut.get(getTestTimeout())); > } > } > /** */ > public static class TestTask implements ComputeTask<Void, Integer> { > /** {@inheritDoc} */ > @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map( > List<ClusterNode> subgrid, > @Nullable Void arg > ) throws IgniteException { > Map<ComputeJob, ClusterNode> res = new HashMap<>(); > for (ClusterNode node : subgrid) { > res.put(new ComputeJob() { > @Override public void cancel() { > // No-op. > } > @Override public Object execute() throws IgniteException { > return null; > } > }, node); > } > return res; > } > /** {@inheritDoc} */ > @Override public ComputeJobResultPolicy result(ComputeJobResult res, > List<ComputeJobResult> rcvd) { > if (res.getException() != null) > throw new IgniteException(res.getException()); > return ComputeJobResultPolicy.WAIT; > } > /** {@inheritDoc} */ > @Override public @Nullable Integer reduce(List<ComputeJobResult> > results) throws IgniteException { > assertEquals(allGrids().size(), results.size()); > return null; > } > } > } > {code} > It does not reproduce the problem sequentially because the problem is related > to cases when only part of the message is read into the application buffer > while reading from the socket. > While this reproducer is in progress you can witness different errors related > to communication message handling e.g. > {code:java} > [2025-01-16T10:10:11,136][ERROR][grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%][TcpCommunicationSpi] > Failed to process selector key [ses=GridSelectorNioSessionImpl > [worker=DirectNioClientWorker [super=AbstractNioClientWorker [idx=3, > bytesRcvd=10261, bytesSent=0, bytesRcvd0=0, bytesSent0=0, select=true, > super=GridWorker [name=grid-nio-worker-tcp-comm-3, > igniteInstanceName=TcpCommunicationSpi, finished=false, > heartbeatTs=1737011410109, hashCode=520079162, interrupted=false, > runner=grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%]]], > writeBuf=java.nio.DirectByteBuffer[pos=0 lim=32768 cap=32768], > readBuf=java.nio.DirectByteBuffer[pos=2398 lim=2398 cap=32768], > inRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168, > sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, > node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, > consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet > [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, > ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, > connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], > outRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, > rcvCnt=168, sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false, > node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004, > consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet > [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false, > ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false, > connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false], > closeSocket=true, > outboundMessagesQueueSizeMetric=o.a.i.i.processors.metric.impl.LongAdderMetric@69a257d1, > super=GridNioSessionImpl [locAddr=/127.0.0.1:48568, > rmtAddr=/127.0.0.1:47104, createTime=1737011410099, closeTime=0, bytesSent=0, > bytesRcvd=2398, bytesSent0=0, bytesRcvd0=0, sndSchedTime=1737011410099, > lastSndTime=1737011410109, lastRcvTime=1737011410109, readsPaused=false, > filterChain=FilterChain[filters=[GridNioCodecFilter > [parser=o.a.i.i.util.nio.GridDirectParser@49fbc83f, directMode=true], > GridConnectionBytesVerifyFilter, SSL filter], accepted=false, > markedForClose=false]]] > org.apache.ignite.IgniteException: Invalid message type: 512 > at > org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl.create(IgniteMessageFactoryImpl.java:104) > ~[classes/:?] > at > org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper$2.create(GridNioServerWrapper.java:811) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridDirectParser.decode(GridDirectParser.java:81) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioCodecFilter.onMessageReceived(GridNioCodecFilter.java:113) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter.onMessageReceived(GridConnectionBytesVerifyFilter.java:88) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.onMessageReceived(GridNioSslFilter.java:413) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioServer$HeadFilter.onMessageReceived(GridNioServer.java:3752) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioFilterChain.onMessageReceived(GridNioFilterChain.java:175) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioServer$DirectNioClientWorker.processRead(GridNioServer.java:1379) > ~[classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.processSelectedKeysOptimized(GridNioServer.java:2526) > [classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.bodyInternal(GridNioServer.java:2281) > [classes/:?] > at > org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.body(GridNioServer.java:1910) > [classes/:?] > at > org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125) > [classes/:?] > at java.base/java.lang.Thread.run(Thread.java:829) [?:?] > {code} > Alternatively you can see NullPointerExceptions, assertions errors and so on. > In most cases this errors will lead only to message resend and reestablishing > connections between nodes. But in some cases FH can be invoked with > subsequent node termination. > The main reason for the described behavior: > 1.Node0 tries to establish connection with node1. > 2. During the handshake, BlockingSslHandler is used to store, encrypt and > decrypt data received over the network. It stores the decrypted data in a > dedicated buffer - BlockingSslHandler#appBuf. > 3. Lets assume node1 sends handshake finish message and some regular > communication messages in a single network packet. > 4. Node0 decrypts this packet and writes it to the mentioned above > BlockingSslHandlerappBuf. So now it stores the bytes related to the > handshake finished message and some regular messages. > 5. Node0 finishes handshake and creates GridNioSslHandler to handle > communication between nodes in NIO way. GridNioSslHandler have a separate set > of buffers to store, encrypt and decrypt data received over the network. > 6. After hanshake is finished, node0 tries to decode and handle all bytes > remaining in BlockingSslHandler#appBuf. See > org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java:262 and ditches > the BlockingSslHandler > 7. If node1 sends only part of the last regular message in step 3, some bytes > related to this messages can be unhandled during step 6 and will remain in > BlockingSslHandler#appBuf. It can happen in case when there is not enough > bytes to read a single communication message field. > 8. So when node 1 sends the last part of partially written message from step > 3. It will be processed on node 0 by GridNioSslHandler from scratch and bytes > left in BlockingSslHandler#appBuf will be lost. > This will result in the deserialization errors mentioned above. > Note, https://issues.apache.org/jira/browse/IGNITE-22375 can greatly increase > the probability of the described problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)