lhotari commented on code in PR #24663:
URL: https://github.com/apache/pulsar/pull/24663#discussion_r2298584661
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java:
##########
@@ -1548,4 +1548,50 @@ public void
testDeadLetterTopicWithMaxUnackedMessagesBlocking() throws Exception
consumer.close();
}
+
+ // reproduce issue reported in
https://github.com/apache/pulsar/issues/24541
+ @Test
+ public void sendDeadLetterTopicWithMismatchSchemaProducer() throws
Exception {
+ String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
+ admin.namespaces().createNamespace(namespace);
+ // don't enforce schema validation
+ admin.namespaces().setSchemaValidationEnforced(namespace, false);
+ // set schema compatibility strategy to always compatible
+ admin.namespaces().setSchemaCompatibilityStrategy(namespace,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+ String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ + "/sendDeadLetterTopicWithMismatchSchemaProducer");
+
+ // create topics
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().createNonPartitionedTopic(topic + "-DLQ");
+ admin.topics().createNonPartitionedTopic(topic + "-RETRY");
+
+ final int maxRedeliverCount = 1;
+ final String subscriptionName = "my-subscription";
+ Consumer<String> consumer =
pulsarClient.newConsumer(Schema.AVRO(String.class))
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(subscriptionName)
+ .deadLetterPolicy(DeadLetterPolicy.builder()
+ .deadLetterTopic(topic + "-DLQ")
+ .retryLetterTopic(topic + "-RETRY")
+ .maxRedeliverCount(maxRedeliverCount)
+ .build())
+ .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+ .messageListener((Consumer::negativeAcknowledge))
+ .subscribe();
+
+ Producer<Long> producer =
pulsarClient.newProducer(Schema.AVRO(Long.class)).topic(topic).create();
+ producer.send(1234567890L);
+
+ Thread.sleep(3000L);
+
+ assertThat(pulsar.getBrokerService().getTopicReference(topic).get()
+
.getSubscription(subscriptionName).getConsumers().get(0).getMessageRedeliverCounter())
+ .describedAs("redeliver count of topic %s should be less than
or equal to 2 because of mismatch schema",
+ topic)
+ .isLessThanOrEqualTo(maxRedeliverCount + 1);
+ producer.close();
+ consumer.close();
Review Comment:
it would be useful to test that it's possible to read the sent message from
the DLQ using `Schema.AUTO_CONSUME` since it would be the way to handle such
DLQs where there could be messages from different schemas in the same topic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]