This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 204f20a0d8384e878060ce95ad64c5bfb994f948
Author: Ruimin MA <[email protected]>
AuthorDate: Tue Aug 26 21:56:58 2025 +0800

    [fix][client] Skip schema validation when sending messages to DLQ to avoid 
infinite loop when schema validation fails on an incoming message (#24663)
    
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit 829df71901de726a11bb3c8ee934d14b67ee9694)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 60 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 18 +++++--
 .../client/impl/schema/AutoProduceBytesSchema.java |  2 +
 3 files changed, 75 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index dd25c82155f..07cfba61e80 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -1416,4 +1416,64 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         consumer.close();
 
     }
+
+    // reproduce issue reported in 
https://github.com/apache/pulsar/issues/24541
+    @Test
+    public void sendDeadLetterTopicWithMismatchSchemaProducer() throws 
Exception {
+        String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
+        admin.namespaces().createNamespace(namespace);
+        // don't enforce schema validation
+        admin.namespaces().setSchemaValidationEnforced(namespace, false);
+        // set schema compatibility strategy to always compatible
+        admin.namespaces().setSchemaCompatibilityStrategy(namespace, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace
+                + "/sendDeadLetterTopicWithMismatchSchemaProducer");
+        final String retryTopic = topic + "-RETRY";
+        final String deadLetterTopic = topic + "-DLQ";
+        final Long deadLetterMessageValue = 1234567890L;
+        // create topics
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createNonPartitionedTopic(deadLetterTopic);
+        admin.topics().createNonPartitionedTopic(retryTopic);
+
+        final int maxRedeliverCount = 1;
+        final String subscriptionName = "my-subscription";
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.AVRO(String.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subscriptionName)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .deadLetterTopic(deadLetterTopic)
+                        .retryLetterTopic(retryTopic)
+                        .maxRedeliverCount(maxRedeliverCount)
+                        .build())
+                .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+                .messageListener((Consumer::negativeAcknowledge))
+                .subscribe();
+
+        Consumer<GenericRecord> deadLetterConsumer = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(deadLetterTopic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subscriptionName)
+                .subscribe();
+        Producer<Long> producer = 
pulsarClient.newProducer(Schema.AVRO(Long.class)).topic(topic).create();
+        producer.send(deadLetterMessageValue);
+
+        Thread.sleep(3000L);
+
+        assertThat(pulsar.getBrokerService().getTopicReference(topic).get()
+                
.getSubscription(subscriptionName).getConsumers().get(0).getMessageRedeliverCounter())
+                .describedAs("redeliver count of topic %s should be less than 
or equal to 2 because of mismatch schema",
+                        topic)
+                .isLessThanOrEqualTo(maxRedeliverCount + 1);
+
+        Message<GenericRecord> deadLetterMessage = 
deadLetterConsumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(deadLetterMessage);
+        assertTrue(deadLetterMessage.getValue().getNativeObject() instanceof 
Long);
+        assertEquals(deadLetterMessage.getValue().getNativeObject(), 
deadLetterMessageValue);
+
+        producer.close();
+        consumer.close();
+        deadLetterConsumer.close();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 4d09decf99e..9e195929af5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -100,6 +100,7 @@ import 
org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.client.impl.metrics.Unit;
 import org.apache.pulsar.client.impl.metrics.UpDownCounter;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.RetryMessageUtil;
@@ -1410,7 +1411,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             consumerEpoch = cmdMessage.getConsumerEpoch();
         }
         if (log.isDebugEnabled()) {
-            log.debug("[{}][{}] Received message: {}/{}", topic, subscription, 
messageId.getLedgerId(),
+            log.debug("[{}][{}] Received message: {}:{}", topic, subscription, 
messageId.getLedgerId(),
                     messageId.getEntryId());
         }
 
@@ -2247,8 +2248,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     try {
                         String originMessageIdStr = 
message.getMessageId().toString();
                         String originTopicNameStr = 
getOriginTopicNameStr(message);
+                        AutoProduceBytesSchema<byte[]> deadLetterMessageSchema 
=
+                            (AutoProduceBytesSchema<byte[]>) 
Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get());
+                        
deadLetterMessageSchema.setRequireSchemaValidation(false);
                         TypedMessageBuilder<byte[]> typedMessageBuilderNew =
-                                
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+                                producerDLQ.newMessage(deadLetterMessageSchema)
                                         .value(message.getData())
                                         .properties(getPropertiesMap(message, 
originMessageIdStr, originTopicNameStr));
                         copyMessageKeysIfNeeded(message, 
typedMessageBuilderNew);
@@ -2270,7 +2274,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                 }).exceptionally(ex -> {
                                     if (ex instanceof 
PulsarClientException.ProducerQueueIsFullError) {
                                         log.warn(
-                                                "[{}] [{}] [{}] Failed to send 
DLQ message to {} for message id {}: {}",
+                                                "[{}] [{}] [{}] Failed to send 
DLQ message to {} "
+                                                        + "with 
ProducerQueueIsFullError for message id {}: {}",
                                                 topicName, subscription, 
consumerName,
                                                 
deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage());
                                     } else {
@@ -2282,7 +2287,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                     return null;
                                 });
                     } catch (Exception e) {
-                        log.warn("[{}] [{}] [{}] Failed to send DLQ message to 
{} for message id {}",
+                        log.warn("[{}] [{}] [{}] Failed to process DLQ message 
to {} for message id {}",
                                 topicName, subscription, consumerName, 
deadLetterPolicy.getDeadLetterTopic(), messageId,
                                 e);
                         result.complete(false);
@@ -2307,8 +2312,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 p = deadLetterProducer;
                 if (p == null || p.isCompletedExceptionally()) {
                     p = createProducerWithBackOff(() -> {
+                        AutoProduceBytesSchema<byte[]> 
deadLetterProducerSchema =
+                                (AutoProduceBytesSchema<byte[]>) 
Schema.AUTO_PRODUCE_BYTES(schema);
+                        
deadLetterProducerSchema.setRequireSchemaValidation(false);
                         CompletableFuture<Producer<byte[]>> newProducer =
-                                ((ProducerBuilderImpl<byte[]>) 
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
+                                ((ProducerBuilderImpl<byte[]>) 
client.newProducer(deadLetterProducerSchema))
                                         
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
                                         
.topic(this.deadLetterPolicy.getDeadLetterTopic())
                                         .producerName(
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index f5925f65a60..8641d14d7df 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.schema;
 
 import static com.google.common.base.Preconditions.checkState;
 import java.util.Optional;
+import lombok.Setter;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -31,6 +32,7 @@ import org.apache.pulsar.common.schema.SchemaType;
  */
 public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
 
+    @Setter
     private boolean requireSchemaValidation = true;
     private Schema<T> schema;
 

Reply via email to