This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cde49480447 [fix][proxy] Fix TooLongFrameException with Pulsar Proxy
(#24626)
cde49480447 is described below
commit cde4948044767033e40bc7d4eff98a4c25fd3aff
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 14 17:11:45 2025 +0300
[fix][proxy] Fix TooLongFrameException with Pulsar Proxy (#24626)
---
.../broker/service/PulsarChannelInitializer.java | 6 +-
.../pulsar/client/api/MockBrokerService.java | 4 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 5 +-
.../client/impl/PulsarChannelInitializer.java | 8 +--
.../pulsar/common/protocol/FrameDecoderUtil.java | 67 ++++++++++++++++++++++
.../pulsar/proxy/server/DirectProxyHandler.java | 25 +++-----
.../proxy/server/ServiceChannelInitializer.java | 7 +--
7 files changed, 86 insertions(+), 36 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 3b78d593159..68da1083d22 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.flow.FlowControlHandler;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslHandler;
@@ -33,7 +32,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.protocol.ByteBufPair;
-import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;
@@ -91,8 +90,7 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new
OptionalProxyProtocolDecoder());
}
- ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
- brokerConf.getMaxMessageSize() +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+ FrameDecoderUtil.addFrameDecoder(ch.pipeline(),
brokerConf.getMaxMessageSize());
//
https://stackoverflow.com/questions/37535482/netty-disabling-auto-read-doesnt-work-for-bytetomessagedecoder
// Classes such as {@link ByteToMessageDecoder} or {@link
MessageToByteEncoder} are free to emit as many events
// as they like for any given input. so, disabling auto-read on
`ByteToMessageDecoder` doesn't work properly and
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index 7d4c9b2955d..e66880738cf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -27,7 +27,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadFactory;
@@ -65,6 +64,7 @@ import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -335,7 +335,7 @@ public class MockBrokerService {
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast("frameDecoder", new
LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
+ FrameDecoderUtil.addFrameDecoder(ch.pipeline(),
maxMessageSize);
ch.pipeline().addLast("handler", new MockServerCnx());
}
});
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 2b1109e179d..43be0072a8c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -29,7 +29,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.Errors.NativeIoException;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.concurrent.Promise;
import io.opentelemetry.api.common.Attributes;
@@ -102,6 +101,7 @@ import
org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -399,8 +399,7 @@ public class ClientCnx extends PulsarHandler {
+ "server frame size {}", ctx.channel(),
connected.getMaxMessageSize());
}
maxMessageSize = connected.getMaxMessageSize();
- ctx.pipeline().replace("frameDecoder", "newFrameDecoder", new
LengthFieldBasedFrameDecoder(
- connected.getMaxMessageSize() +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+ FrameDecoderUtil.replaceFrameDecoder(ctx.pipeline(),
connected.getMaxMessageSize());
}
if (log.isDebugEnabled()) {
log.debug("{} Connection is ready", ctx.channel());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index b20833e46a2..0d3f8a4c619 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -22,7 +22,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslHandler;
@@ -40,6 +39,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.apache.pulsar.common.util.SecurityUtility;
@@ -91,14 +91,12 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
// Setup channel except for the SsHandler for TLS enabled connections
ch.pipeline().addLast("ByteBufPairEncoder",
ByteBufPair.getEncoder(tlsEnabled));
-
- ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
- Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+ FrameDecoderUtil.addFrameDecoder(ch.pipeline(),
Commands.DEFAULT_MAX_MESSAGE_SIZE);
ChannelHandler clientCnx = clientCnxSupplier.get();
ch.pipeline().addLast("handler", clientCnx);
}
- /**
+ /**
* Initialize TLS for a channel. Should be invoked before the channel is
connected to the remote address.
*
* @param ch the channel
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/FrameDecoderUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/FrameDecoderUtil.java
new file mode 100644
index 00000000000..aee9f2c39bd
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/FrameDecoderUtil.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import lombok.experimental.UtilityClass;
+
+/**
+ * Utility class for managing Netty LenghtFieldBasedFrameDecoder instances in
a Netty ChannelPipeline
+ * for the Pulsar binary protocol.
+ */
+@UtilityClass
+public class FrameDecoderUtil {
+ public static final String FRAME_DECODER_HANDLER = "frameDecoder";
+
+ /**
+ * Adds a LengthFieldBasedFrameDecoder to the given ChannelPipeline.
+ *
+ * @param pipeline the ChannelPipeline to which the decoder will be added
+ * @param maxMessageSize the maximum size of messages that can be decoded
+ */
+ public static void addFrameDecoder(ChannelPipeline pipeline, int
maxMessageSize) {
+ pipeline.addLast(FRAME_DECODER_HANDLER,
createFrameDecoder(maxMessageSize));
+ }
+
+ /**
+ * Replaces the existing LengthFieldBasedFrameDecoder in the given
ChannelPipeline with a new one.
+ *
+ * @param pipeline the ChannelPipeline in which the decoder will be
replaced
+ * @param maxMessageSize the maximum size of messages that can be decoded
+ */
+ public static void replaceFrameDecoder(ChannelPipeline pipeline, int
maxMessageSize) {
+ pipeline.replace(FRAME_DECODER_HANDLER, FRAME_DECODER_HANDLER,
createFrameDecoder(maxMessageSize));
+ }
+
+ /**
+ * Removes the LengthFieldBasedFrameDecoder from the given ChannelPipeline.
+ * This is useful in the Pulsar Proxy to remove the decoder before direct
proxying of messages without decoding.
+ *
+ * @param pipeline the ChannelPipeline from which the decoder will be
removed
+ */
+ public static void removeFrameDecoder(ChannelPipeline pipeline) {
+ pipeline.remove(FRAME_DECODER_HANDLER);
+ }
+
+ private static LengthFieldBasedFrameDecoder createFrameDecoder(int
maxMessageSize) {
+ return new LengthFieldBasedFrameDecoder(
+ maxMessageSize + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0,
4);
+ }
+}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index a0b699ff0c1..775108a75e6 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -34,7 +34,6 @@ import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollMode;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.haproxy.HAProxyCommand;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
@@ -60,6 +59,7 @@ import
org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
@@ -180,9 +180,7 @@ public class DirectProxyHandler {
ch.pipeline().addLast("readTimeoutHandler",
new ReadTimeoutHandler(brokerProxyReadTimeoutMs,
TimeUnit.MILLISECONDS));
}
- ch.pipeline().addLast("frameDecoder", new
LengthFieldBasedFrameDecoder(
- service.getConfiguration().getMaxMessageSize() +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0,
- 4));
+ FrameDecoderUtil.addFrameDecoder(ch.pipeline(),
service.getConfiguration().getMaxMessageSize());
ch.pipeline().addLast("proxyOutboundHandler",
(ChannelHandler) new ProxyBackendHandler(config,
protocolVersion, remoteHost, featureFlags));
}
@@ -422,23 +420,16 @@ public class DirectProxyHandler {
log.debug("[{}] [{}] Removing decoder from pipeline",
inboundChannel, outboundChannel);
}
// direct tcp proxy
- inboundChannel.pipeline().remove("frameDecoder");
- outboundChannel.pipeline().remove("frameDecoder");
+ FrameDecoderUtil.removeFrameDecoder(inboundChannel.pipeline());
+
FrameDecoderUtil.removeFrameDecoder(outboundChannel.pipeline());
} else {
// Enable parsing feature, proxyLogLevel(1 or 2)
// Add parser handler
if (connected.hasMaxMessageSize()) {
- inboundChannel.pipeline()
- .replace("frameDecoder", "newFrameDecoder",
- new
LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
- +
Commands.MESSAGE_SIZE_FRAME_PADDING,
- 0, 4, 0, 4));
- outboundChannel.pipeline().replace("frameDecoder",
"newFrameDecoder",
- new LengthFieldBasedFrameDecoder(
- connected.getMaxMessageSize()
- +
Commands.MESSAGE_SIZE_FRAME_PADDING,
- 0, 4, 0, 4));
-
+
FrameDecoderUtil.replaceFrameDecoder(inboundChannel.pipeline(),
+ connected.getMaxMessageSize());
+
FrameDecoderUtil.replaceFrameDecoder(outboundChannel.pipeline(),
+ connected.getMaxMessageSize());
inboundChannel.pipeline().addBefore("handler",
"inboundParser",
new ParserProxyHandler(service,
ParserProxyHandler.FRONTEND_CONN,
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 728d27c815f..5d2ecf368d2 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -20,13 +20,12 @@ package org.apache.pulsar.proxy.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;
@@ -87,9 +86,7 @@ public class ServiceChannelInitializer extends
ChannelInitializer<SocketChannel>
if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) {
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new
OptionalProxyProtocolDecoder());
}
- ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
- this.maxMessageSize + Commands.MESSAGE_SIZE_FRAME_PADDING, 0,
4, 0, 4));
-
+ FrameDecoderUtil.addFrameDecoder(ch.pipeline(), maxMessageSize);
ch.pipeline().addLast("handler", new ProxyConnection(proxyService,
proxyService.getDnsAddressResolverGroup()));
}