This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 68f15ce665c [fix][proxy] Fix proxy OOM by replacing TopicName with a
simple conversion method (#24465)
68f15ce665c is described below
commit 68f15ce665c363263a4169bd9ac5bf2c7b5f36be
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jul 2 09:51:16 2025 +0800
[fix][proxy] Fix proxy OOM by replacing TopicName with a simple conversion
method (#24465)
---
.../pulsar/common/api/raw/MessageParser.java | 19 +++++-
.../org/apache/pulsar/common/naming/TopicName.java | 75 ++++++++++++++++++++++
.../apache/pulsar/common/naming/TopicNameTest.java | 34 +++++++++-
.../pulsar/proxy/server/LookupProxyHandler.java | 6 +-
.../pulsar/proxy/server/ParserProxyHandler.java | 14 ++--
5 files changed, 133 insertions(+), 15 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index af516fa7534..0e9aae4603d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -55,11 +55,17 @@ public class MessageParser {
void process(RawMessage message) throws IOException;
}
+ @Deprecated
+ public static void parseMessage(TopicName topicName, long ledgerId, long
entryId, ByteBuf headersAndPayload,
+ MessageProcessor processor, int
maxMessageSize) throws IOException {
+ parseMessage(topicName.toString(), ledgerId, entryId,
headersAndPayload, processor, maxMessageSize);
+ }
+
/**
* Parse a raw Pulsar entry payload and extract all the individual message
that may be included in the batch. The
* provided {@link MessageProcessor} will be invoked for each individual
message.
*/
- public static void parseMessage(TopicName topicName, long ledgerId, long
entryId, ByteBuf headersAndPayload,
+ public static void parseMessage(String topicName, long ledgerId, long
entryId, ByteBuf headersAndPayload,
MessageProcessor processor, int maxMessageSize) throws IOException
{
ByteBuf payload = headersAndPayload;
ByteBuf uncompressedPayload = null;
@@ -117,7 +123,7 @@ public class MessageParser {
}
}
- public static boolean verifyChecksum(TopicName topic, ByteBuf
headersAndPayload, long ledgerId, long entryId) {
+ public static boolean verifyChecksum(String topic, ByteBuf
headersAndPayload, long ledgerId, long entryId) {
if (hasChecksum(headersAndPayload)) {
int checksum = readChecksum(headersAndPayload);
int computedChecksum = computeChecksum(headersAndPayload);
@@ -132,7 +138,14 @@ public class MessageParser {
return true;
}
- public static ByteBuf uncompressPayloadIfNeeded(TopicName topic,
MessageMetadata msgMetadata,
+ @Deprecated
+ public static ByteBuf uncompressPayloadIfNeeded(TopicName topicName,
MessageMetadata msgMetadata,
+ ByteBuf payload, long
ledgerId, long entryId, int maxMessageSize) {
+ return uncompressPayloadIfNeeded(topicName.toString(), msgMetadata,
payload, ledgerId, entryId,
+ maxMessageSize);
+ }
+
+ public static ByteBuf uncompressPayloadIfNeeded(String topic,
MessageMetadata msgMetadata,
ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) {
CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression());
int uncompressedSize = msgMetadata.getUncompressedSize();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index b2f96bfe6e2..4d9b28df91b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.common.naming;
import com.google.common.base.Splitter;
import com.google.re2j.Pattern;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -442,4 +443,78 @@ public class TopicName implements ServiceUnitId {
public boolean isV2() {
return cluster == null;
}
+
+ /**
+ * Convert a topic name to a full topic name.
+ * In Pulsar, a full topic name is
"<domain>://<tenant>/<namespace>/<local-topic>" (v2) or
+ * "<domain>://<tenant>/<cluster>/<namespace>/<local-topic>" (v1).
However, for convenient, it's allowed for clients
+ * to pass a short topic name with v2 format:
+ * - "<local-topic>", which represents
"persistent://public/default/<local-topic>"
+ * - "<tenant>/<namespace>/<local-topic>, which represents
"persistent://<tenant>/<namespace>/<local-topic>"
+ *
+ * @param topic the topic name from client
+ * @return the full topic name.
+ */
+ public static String toFullTopicName(String topic) {
+ final int index = topic.indexOf("://");
+ if (index >= 0) {
+ TopicDomain.getEnum(topic.substring(0, index));
+ final List<String> parts = splitBySlash(topic.substring(index +
"://".length()), 4);
+ if (parts.size() != 3 && parts.size() != 4) {
+ throw new IllegalArgumentException(topic + " is invalid");
+ }
+ if (parts.size() == 3) {
+ NamespaceName.validateNamespaceName(parts.get(0),
parts.get(1));
+ if (StringUtils.isBlank(parts.get(2))) {
+ throw new IllegalArgumentException(topic + " has blank
local topic");
+ }
+ } else {
+ NamespaceName.validateNamespaceName(parts.get(0),
parts.get(1), parts.get(2));
+ if (StringUtils.isBlank(parts.get(3))) {
+ throw new IllegalArgumentException(topic + " has blank
local topic");
+ }
+ }
+ return topic; // it's a valid full topic name
+ } else {
+ List<String> parts = splitBySlash(topic, 0);
+ if (parts.size() != 1 && parts.size() != 3) {
+ throw new IllegalArgumentException(topic + " is invalid");
+ }
+ if (parts.size() == 1) {
+ if (StringUtils.isBlank(parts.get(0))) {
+ throw new IllegalArgumentException(topic + " has blank
local topic");
+ }
+ return "persistent://public/default/" + parts.get(0);
+ } else {
+ NamespaceName.validateNamespaceName(parts.get(0),
parts.get(1));
+ if (StringUtils.isBlank(parts.get(2))) {
+ throw new IllegalArgumentException(topic + " has blank
local topic");
+ }
+ return "persistent://" + topic;
+ }
+ }
+ }
+
+ private static List<String> splitBySlash(String topic, int limit) {
+ final List<String> tokens = new ArrayList<>(3);
+ final int loopCount = (limit <= 0) ? Integer.MAX_VALUE : limit - 1;
+ int beginIndex = 0;
+ for (int i = 0; i < loopCount; i++) {
+ final int endIndex = topic.indexOf('/', beginIndex);
+ if (endIndex < 0) {
+ tokens.add(topic.substring(beginIndex));
+ return tokens;
+ } else if (endIndex > beginIndex) {
+ tokens.add(topic.substring(beginIndex, endIndex));
+ } else {
+ throw new IllegalArgumentException("Invalid topic name " +
topic);
+ }
+ beginIndex = endIndex + 1;
+ }
+ if (beginIndex >= topic.length()) {
+ throw new IllegalArgumentException("Invalid topic name " + topic);
+ }
+ tokens.add(topic.substring(beginIndex));
+ return tokens;
+ }
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index 27eb82d15af..bb4798fca46 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.pulsar.common.util.Codec;
@@ -52,9 +53,8 @@ public class TopicNameTest {
assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").toString(),
"persistent://tenant/cluster/namespace/topic");
-
-
assertNotEquals(TopicName.get("persistent://tenant/cluster/namespace/topic"),
- "persistent://tenant/cluster/namespace/topic");
+
assertEquals(TopicName.toFullTopicName("persistent://tenant/cluster/namespace/topic"),
+ "persistent://tenant/cluster/namespace/topic");
assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").getDomain(),
TopicDomain.persistent);
@@ -103,6 +103,7 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class, () ->
TopicName.toFullTopicName("://tenant.namespace:my-topic"));
try {
TopicName.get("://tenant.namespace");
@@ -110,6 +111,7 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class, () ->
TopicName.toFullTopicName("://tenant.namespace"));
try {
TopicName.get("invalid://tenant/cluster/namespace/topic");
@@ -117,6 +119,8 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class,
+ () ->
TopicName.toFullTopicName("invalid://tenant/cluster/namespace/topic"));
try {
TopicName.get("tenant/cluster/namespace/topic");
@@ -124,6 +128,7 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class, () ->
TopicName.toFullTopicName("tenant/cluster/namespace/topic"));
try {
TopicName.get("persistent:///cluster/namespace/mydest-1");
@@ -131,6 +136,8 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class,
+ () ->
TopicName.toFullTopicName("persistent:///cluster/namespace/mydest-1"));
try {
TopicName.get("persistent://pulsar//namespace/mydest-1");
@@ -138,6 +145,8 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class,
+ () ->
TopicName.toFullTopicName("persistent://pulsar//namespace/mydest-1"));
try {
TopicName.get("persistent://pulsar/cluster//mydest-1");
@@ -145,6 +154,8 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class,
+ () ->
TopicName.toFullTopicName("persistent://pulsar/cluster//mydest-1"));
try {
TopicName.get("persistent://pulsar/cluster/namespace/");
@@ -152,6 +163,8 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class,
+ () ->
TopicName.toFullTopicName("persistent://pulsar/cluster/namespace/"));
try {
TopicName.get("://pulsar/cluster/namespace/");
@@ -159,6 +172,7 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class, () ->
TopicName.toFullTopicName("://pulsar/cluster/namespace/"));
assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic")
.getPersistenceNamingEncoding(),
"tenant/cluster/namespace/persistent/topic");
@@ -169,6 +183,7 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class, () ->
TopicName.toFullTopicName("://tenant.namespace"));
try {
TopicName.get("://tenant/cluster/namespace");
@@ -176,6 +191,7 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class, () ->
TopicName.toFullTopicName("://tenant//cluster/namespace"));
try {
TopicName.get(" ");
@@ -183,6 +199,7 @@ public class TopicNameTest {
} catch (IllegalArgumentException e) {
// Ok
}
+ assertThrows(IllegalArgumentException.class, () ->
TopicName.toFullTopicName(" "));
TopicName nameWithSlash =
TopicName.get("persistent://tenant/cluster/namespace/ns-abc/table/1");
assertEquals(nameWithSlash.getEncodedLocalName(),
Codec.encode("ns-abc/table/1"));
@@ -344,4 +361,15 @@ public class TopicNameTest {
assertNotEquals(tp2.toString(), tp1.toString());
assertEquals(tp2.toString(),
"persistent://tenant1/namespace1/tp1-partition-0-DLQ-partition-0");
}
+
+ @Test
+ public void testToFullTopicName() {
+ // There is no constraint for local topic name
+ assertEquals("persistent://public/default/tp???xx=",
TopicName.toFullTopicName("tp???xx="));
+ assertEquals("persistent://tenant/ns/tp???xx=",
TopicName.toFullTopicName("tenant/ns/tp???xx="));
+ assertEquals("persistent://tenant/ns/test",
TopicName.toFullTopicName("persistent://tenant/ns/test"));
+ assertThrows(IllegalArgumentException.class, () ->
TopicName.toFullTopicName("ns/topic"));
+ // v1 format is not supported when the domain is not included
+ assertThrows(IllegalArgumentException.class, () ->
TopicName.toFullTopicName("tenant/cluster/ns/topic"));
+ }
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 75109883f98..c38e2ba08c1 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -221,7 +221,7 @@ public class LookupProxyHandler {
**/
private void
handlePartitionMetadataResponse(CommandPartitionedTopicMetadata
partitionMetadata,
long clientRequestId) {
- TopicName topicName = TopicName.get(partitionMetadata.getTopic());
+ String topicName =
TopicName.toFullTopicName(partitionMetadata.getTopic());
String serviceUrl = getBrokerServiceUrl(clientRequestId);
if (serviceUrl == null) {
@@ -235,7 +235,7 @@ public class LookupProxyHandler {
if (log.isDebugEnabled()) {
log.debug("Getting connections to '{}' for Looking up topic '{}'
with clientReq Id '{}'", addr,
- topicName.getPartitionedTopicName(), clientRequestId);
+ topicName, clientRequestId);
}
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx ->
{
// Connected to backend broker
@@ -245,7 +245,7 @@ public class LookupProxyHandler {
partitionMetadata.isMetadataAutoCreationEnabled());
clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
if (t != null) {
- log.warn("[{}] failed to get Partitioned metadata : {}",
topicName.toString(),
+ log.warn("[{}] failed to get Partitioned metadata : {}",
topicName,
t.getMessage(), t);
PulsarClientException pce =
PulsarClientException.unwrap(t);
writeAndFlush(Commands.newLookupErrorResponse(clientCnx.revertClientExToErrorCode(pce),
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index 22957c9599f..3a98311eb15 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -101,7 +101,8 @@ public class ParserProxyHandler extends
ChannelInboundHandlerAdapter {
private final BaseCommand cmd = new BaseCommand();
public void channelRead(ChannelHandlerContext ctx, Object msg) {
- TopicName topicName;
+ String key;
+ String topicName;
List<RawMessage> messages = new ArrayList<>();
ByteBuf buffer = (ByteBuf) (msg);
@@ -130,8 +131,8 @@ public class ParserProxyHandler extends
ChannelInboundHandlerAdapter {
logging(ctx.channel(), cmd.getType(), "", null);
break;
}
- topicName =
TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId()
+ ","
- + ctx.channel().id()));
+ topicName =
TopicName.toFullTopicName(ParserProxyHandler.producerHashMap.get(
+ cmd.getSend().getProducerId() + "," +
ctx.channel().id()));
MutableLong msgBytes = new MutableLong(0);
MessageParser.parseMessage(topicName, -1L,
-1L, buffer, (message) -> {
@@ -139,7 +140,7 @@ public class ParserProxyHandler extends
ChannelInboundHandlerAdapter {
msgBytes.add(message.getData().readableBytes());
}, maxMessageSize);
// update topic stats
- TopicStats topicStats =
this.service.getTopicStats().computeIfAbsent(topicName.toString(),
+ TopicStats topicStats =
this.service.getTopicStats().computeIfAbsent(topicName,
topic -> new TopicStats());
topicStats.getMsgInRate().recordMultipleEvents(messages.size(),
msgBytes.longValue());
logging(ctx.channel(), cmd.getType(), "", messages);
@@ -158,8 +159,9 @@ public class ParserProxyHandler extends
ChannelInboundHandlerAdapter {
logging(ctx.channel(), cmd.getType(), "", null);
break;
}
- topicName =
TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId()
- + "," + peerChannelId));
+ topicName =
TopicName.toFullTopicName(ParserProxyHandler.consumerHashMap.get(
+ cmd.getMessage().getConsumerId() + "," +
peerChannelId));
+
msgBytes = new MutableLong(0);
MessageParser.parseMessage(topicName, -1L,
-1L, buffer, (message) -> {