lhotari commented on code in PR #24663:
URL: https://github.com/apache/pulsar/pull/24663#discussion_r2298584661
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java:
##########
@@ -1548,4 +1548,50 @@ public void
testDeadLetterTopicWithMaxUnackedMessagesBlocking() throws Exception
consumer.close();
}
+
+ // reproduce issue reported in
https://github.com/apache/pulsar/issues/24541
+ @Test
+ public void sendDeadLetterTopicWithMismatchSchemaProducer() throws
Exception {
+ String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
+ admin.namespaces().createNamespace(namespace);
+ // don't enforce schema validation
+ admin.namespaces().setSchemaValidationEnforced(namespace, false);
+ // set schema compatibility strategy to always compatible
+ admin.namespaces().setSchemaCompatibilityStrategy(namespace,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+ String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ + "/sendDeadLetterTopicWithMismatchSchemaProducer");
+
+ // create topics
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().createNonPartitionedTopic(topic + "-DLQ");
+ admin.topics().createNonPartitionedTopic(topic + "-RETRY");
+
+ final int maxRedeliverCount = 1;
+ final String subscriptionName = "my-subscription";
+ Consumer<String> consumer =
pulsarClient.newConsumer(Schema.AVRO(String.class))
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(subscriptionName)
+ .deadLetterPolicy(DeadLetterPolicy.builder()
+ .deadLetterTopic(topic + "-DLQ")
+ .retryLetterTopic(topic + "-RETRY")
+ .maxRedeliverCount(maxRedeliverCount)
+ .build())
+ .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+ .messageListener((Consumer::negativeAcknowledge))
+ .subscribe();
+
+ Producer<Long> producer =
pulsarClient.newProducer(Schema.AVRO(Long.class)).topic(topic).create();
+ producer.send(1234567890L);
+
+ Thread.sleep(3000L);
+
+ assertThat(pulsar.getBrokerService().getTopicReference(topic).get()
+
.getSubscription(subscriptionName).getConsumers().get(0).getMessageRedeliverCounter())
+ .describedAs("redeliver count of topic %s should be less than
or equal to 2 because of mismatch schema",
+ topic)
+ .isLessThanOrEqualTo(maxRedeliverCount + 1);
+ producer.close();
+ consumer.close();
Review Comment:
At the end, it would be useful to test that it's possible to read the sent
message from the DLQ using `Schema.AUTO_CONSUME` since it would be the way to
handle such DLQs where there could be messages from different schemas in the
same topic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]