This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 204f20a0d8384e878060ce95ad64c5bfb994f948 Author: Ruimin MA <[email protected]> AuthorDate: Tue Aug 26 21:56:58 2025 +0800 [fix][client] Skip schema validation when sending messages to DLQ to avoid infinite loop when schema validation fails on an incoming message (#24663) Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit 829df71901de726a11bb3c8ee934d14b67ee9694) --- .../pulsar/client/api/DeadLetterTopicTest.java | 60 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 18 +++++-- .../client/impl/schema/AutoProduceBytesSchema.java | 2 + 3 files changed, 75 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index dd25c82155f..07cfba61e80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -1416,4 +1416,64 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { consumer.close(); } + + // reproduce issue reported in https://github.com/apache/pulsar/issues/24541 + @Test + public void sendDeadLetterTopicWithMismatchSchemaProducer() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns"); + admin.namespaces().createNamespace(namespace); + // don't enforce schema validation + admin.namespaces().setSchemaValidationEnforced(namespace, false); + // set schema compatibility strategy to always compatible + admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + final String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + + "/sendDeadLetterTopicWithMismatchSchemaProducer"); + final String retryTopic = topic + "-RETRY"; + final String deadLetterTopic = topic + "-DLQ"; + final Long deadLetterMessageValue = 1234567890L; + // create topics + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createNonPartitionedTopic(deadLetterTopic); + admin.topics().createNonPartitionedTopic(retryTopic); + + final int maxRedeliverCount = 1; + final String subscriptionName = "my-subscription"; + Consumer<String> consumer = pulsarClient.newConsumer(Schema.AVRO(String.class)) + .topic(topic) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName(subscriptionName) + .deadLetterPolicy(DeadLetterPolicy.builder() + .deadLetterTopic(deadLetterTopic) + .retryLetterTopic(retryTopic) + .maxRedeliverCount(maxRedeliverCount) + .build()) + .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) + .messageListener((Consumer::negativeAcknowledge)) + .subscribe(); + + Consumer<GenericRecord> deadLetterConsumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) + .topic(deadLetterTopic) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName(subscriptionName) + .subscribe(); + Producer<Long> producer = pulsarClient.newProducer(Schema.AVRO(Long.class)).topic(topic).create(); + producer.send(deadLetterMessageValue); + + Thread.sleep(3000L); + + assertThat(pulsar.getBrokerService().getTopicReference(topic).get() + .getSubscription(subscriptionName).getConsumers().get(0).getMessageRedeliverCounter()) + .describedAs("redeliver count of topic %s should be less than or equal to 2 because of mismatch schema", + topic) + .isLessThanOrEqualTo(maxRedeliverCount + 1); + + Message<GenericRecord> deadLetterMessage = deadLetterConsumer.receive(3, TimeUnit.SECONDS); + assertNotNull(deadLetterMessage); + assertTrue(deadLetterMessage.getValue().getNativeObject() instanceof Long); + assertEquals(deadLetterMessage.getValue().getNativeObject(), deadLetterMessageValue); + + producer.close(); + consumer.close(); + deadLetterConsumer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 4d09decf99e..9e195929af5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -100,6 +100,7 @@ import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.impl.metrics.Unit; import org.apache.pulsar.client.impl.metrics.UpDownCounter; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; +import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.client.util.RetryMessageUtil; @@ -1410,7 +1411,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle consumerEpoch = cmdMessage.getConsumerEpoch(); } if (log.isDebugEnabled()) { - log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(), + log.debug("[{}][{}] Received message: {}:{}", topic, subscription, messageId.getLedgerId(), messageId.getEntryId()); } @@ -2247,8 +2248,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle try { String originMessageIdStr = message.getMessageId().toString(); String originTopicNameStr = getOriginTopicNameStr(message); + AutoProduceBytesSchema<byte[]> deadLetterMessageSchema = + (AutoProduceBytesSchema<byte[]>) Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()); + deadLetterMessageSchema.setRequireSchemaValidation(false); TypedMessageBuilder<byte[]> typedMessageBuilderNew = - producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) + producerDLQ.newMessage(deadLetterMessageSchema) .value(message.getData()) .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); copyMessageKeysIfNeeded(message, typedMessageBuilderNew); @@ -2270,7 +2274,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle }).exceptionally(ex -> { if (ex instanceof PulsarClientException.ProducerQueueIsFullError) { log.warn( - "[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}: {}", + "[{}] [{}] [{}] Failed to send DLQ message to {} " + + "with ProducerQueueIsFullError for message id {}: {}", topicName, subscription, consumerName, deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage()); } else { @@ -2282,7 +2287,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return null; }); } catch (Exception e) { - log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}", + log.warn("[{}] [{}] [{}] Failed to process DLQ message to {} for message id {}", topicName, subscription, consumerName, deadLetterPolicy.getDeadLetterTopic(), messageId, e); result.complete(false); @@ -2307,8 +2312,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle p = deadLetterProducer; if (p == null || p.isCompletedExceptionally()) { p = createProducerWithBackOff(() -> { + AutoProduceBytesSchema<byte[]> deadLetterProducerSchema = + (AutoProduceBytesSchema<byte[]>) Schema.AUTO_PRODUCE_BYTES(schema); + deadLetterProducerSchema.setRequireSchemaValidation(false); CompletableFuture<Producer<byte[]>> newProducer = - ((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))) + ((ProducerBuilderImpl<byte[]>) client.newProducer(deadLetterProducerSchema)) .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) .topic(this.deadLetterPolicy.getDeadLetterTopic()) .producerName( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java index f5925f65a60..8641d14d7df 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.schema; import static com.google.common.base.Preconditions.checkState; import java.util.Optional; +import lombok.Setter; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValueEncodingType; @@ -31,6 +32,7 @@ import org.apache.pulsar.common.schema.SchemaType; */ public class AutoProduceBytesSchema<T> implements Schema<byte[]> { + @Setter private boolean requireSchemaValidation = true; private Schema<T> schema;
