Il Dom 12 Feb 2023, 04:42 Yubiao Feng <yubiao.f...@streamnative.io.invalid> ha scritto:
> Hi Enrico Olivelli > > > It is good to help users to not fall into bad situations but on the other > case we cannot deal with many silly configurations that you could set up, > like creating a pipeline of functions that in the end create a cycle. > > Sorry, this test just helps to reproduce the problem quickly. The reality > is that there is only one consumer, but every restart triggers this issue > and ends up with a topic like this: > "persistent://public/default/tp1-sub1-RETRY-sub1-RETRY-sub1-RETRY...." > > > I wonder if we could simply document this fact instead of adding code > > ```java > Consumer<> consumer = pulsarClient.newConsumer() > .topicsPattern("my-property/my-ns/.*").subscriptionName("sub1") > .enableRetry(true) > ``` > > With the client restarted, the code above will reproduce the problem. > I see the problem now. We must do something for this case. It must not happen. We have to fix it Thanks for your clarification Enrico > > On Sun, Feb 12, 2023 at 3:31 AM Enrico Olivelli <eolive...@gmail.com> > wrote: > > > Yubiao, > > > > Il Sab 11 Feb 2023, 19:06 Yubiao Feng <yubiao.f...@streamnative.io > > .invalid> > > ha scritto: > > > > > Hi community > > > > > > I am starting a DISCUSS for "Retry topic should not create for a retry > > > topic." > > > > > > If we use regex-topic consumer and enable retry, it is possible to > create > > > such a topic > > > "persistent://public/default/tp1-sub1-RETRY-sub2-RETRY-sub3-RETRY....". > > You > > > can reproduce this by using the test below. > > > > > > It probably doesn't make sense to create a RETRY/DLQ topic on > RETRY/DLQ. > > We > > > should avoid this scenario if users use the default configuration > (users > > > can enable it if they need it). > > > > > > > I agree that this is a bad case. > > But should we really care? > > > > You must do it very intentionally. > > It is good to help users to not fall into bad situations but on the other > > case we cannot deal with many silly configurations that you could set up, > > like creating a pipeline of functions that in the end create a cycle. > > > > > > I wonder if we could simply document this fact instead of adding code > > > > > > Enrico > > > > > > > > > > > ```java > > > @Test > > > public void testRetryTopicWillNotCreatedForRetryTopic() throws > > > Exception { > > > final String topic = "persistent://my-property/my-ns/tp1"; > > > Producer<byte[]> producer = > > pulsarClient.newProducer(Schema.BYTES) > > > .topic(topic) > > > .create(); > > > for (int i = 0; i < 100; i++) { > > > producer.send(String.format("Hello Pulsar [%d]", > > > i).getBytes()); > > > } > > > producer.close(); > > > > > > for (int i =0; i< 10; i++) { > > > Consumer<byte[]> consumer = > > > pulsarClient.newConsumer(Schema.BYTES) > > > .topicsPattern("my-property/my-ns/.*") > > > .subscriptionName("sub" + i) > > > .enableRetry(true) > > > > > > > > > .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()) > > > > > > .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) > > > .subscribe(); > > > Message<byte[]> message = consumer.receive(); > > > log.info("consumer received message : {} {}", > > > message.getMessageId(), new String(message.getData())); > > > consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); > > > consumer.close(); > > > } > > > > > > Set<String> tps = > > > > > > > > > pulsar.getBrokerService().getTopics().keys().stream().collect(Collectors.toSet()); > > > try { > > > for (String tp : tps) { > > > assertTrue(howManyKeyWordRetryInTopicName(tp, > > > RETRY_GROUP_TOPIC_SUFFIX) <= 1, tp); > > > assertTrue(howManyKeyWordRetryInTopicName(tp, > > > DLQ_GROUP_TOPIC_SUFFIX) <= 1, tp); > > > } > > > } finally { > > > // cleanup. > > > for (String tp : tps){ > > > if (tp.startsWith(topic)) { > > > admin.topics().delete(tp ,true); > > > } > > > } > > > } > > > } > > > > > > private int howManyKeyWordRetryInTopicName(String topicName, String > > > keyWord) { > > > int retryCountInTopicName = 0; > > > String tpCp = topicName; > > > while (true) { > > > int index = tpCp.indexOf(keyWord); > > > if (index < 0) { > > > break; > > > } > > > tpCp = tpCp.substring(index + keyWord.length()); > > > retryCountInTopicName++; > > > } > > > return retryCountInTopicName; > > > } > > > ``` > > > > > > Thanks > > > Yubiao Feng > > > > > >