+1, I agree with you. We should prohibit users from creating Retry Topic and DLQ Topic in a loop. It will make uncontrollable behavior. If we allow that, It may cause great trouble to users.
Thanks, Bo Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月14日周二 03:02写道: > > Sounds like a bug for sure. > How did you plan on solving it? > > > On Mon, Feb 13, 2023 at 12:46 AM Enrico Olivelli <eolive...@gmail.com> > wrote: > > > Il Dom 12 Feb 2023, 04:42 Yubiao Feng <yubiao.f...@streamnative.io > > .invalid> > > ha scritto: > > > > > Hi Enrico Olivelli > > > > > > > It is good to help users to not fall into bad situations but on the > > other > > > case we cannot deal with many silly configurations that you could set up, > > > like creating a pipeline of functions that in the end create a cycle. > > > > > > Sorry, this test just helps to reproduce the problem quickly. The reality > > > is that there is only one consumer, but every restart triggers this issue > > > and ends up with a topic like this: > > > "persistent://public/default/tp1-sub1-RETRY-sub1-RETRY-sub1-RETRY...." > > > > > > > I wonder if we could simply document this fact instead of adding code > > > > > > ```java > > > Consumer<> consumer = pulsarClient.newConsumer() > > > .topicsPattern("my-property/my-ns/.*").subscriptionName("sub1") > > > .enableRetry(true) > > > ``` > > > > > > With the client restarted, the code above will reproduce the problem. > > > > > > > > > I see the problem now. > > > > We must do something for this case. It must not happen. We have to fix it > > > > Thanks for your clarification > > > > Enrico > > > > > > > > On Sun, Feb 12, 2023 at 3:31 AM Enrico Olivelli <eolive...@gmail.com> > > > wrote: > > > > > > > Yubiao, > > > > > > > > Il Sab 11 Feb 2023, 19:06 Yubiao Feng <yubiao.f...@streamnative.io > > > > .invalid> > > > > ha scritto: > > > > > > > > > Hi community > > > > > > > > > > I am starting a DISCUSS for "Retry topic should not create for a > > retry > > > > > topic." > > > > > > > > > > If we use regex-topic consumer and enable retry, it is possible to > > > create > > > > > such a topic > > > > > > > "persistent://public/default/tp1-sub1-RETRY-sub2-RETRY-sub3-RETRY....". > > > > You > > > > > can reproduce this by using the test below. > > > > > > > > > > It probably doesn't make sense to create a RETRY/DLQ topic on > > > RETRY/DLQ. > > > > We > > > > > should avoid this scenario if users use the default configuration > > > (users > > > > > can enable it if they need it). > > > > > > > > > > > > > I agree that this is a bad case. > > > > But should we really care? > > > > > > > > You must do it very intentionally. > > > > It is good to help users to not fall into bad situations but on the > > other > > > > case we cannot deal with many silly configurations that you could set > > up, > > > > like creating a pipeline of functions that in the end create a cycle. > > > > > > > > > > > > I wonder if we could simply document this fact instead of adding code > > > > > > > > > > > > Enrico > > > > > > > > > > > > > > > > > > > > > ```java > > > > > @Test > > > > > public void testRetryTopicWillNotCreatedForRetryTopic() throws > > > > > Exception { > > > > > final String topic = "persistent://my-property/my-ns/tp1"; > > > > > Producer<byte[]> producer = > > > > pulsarClient.newProducer(Schema.BYTES) > > > > > .topic(topic) > > > > > .create(); > > > > > for (int i = 0; i < 100; i++) { > > > > > producer.send(String.format("Hello Pulsar [%d]", > > > > > i).getBytes()); > > > > > } > > > > > producer.close(); > > > > > > > > > > for (int i =0; i< 10; i++) { > > > > > Consumer<byte[]> consumer = > > > > > pulsarClient.newConsumer(Schema.BYTES) > > > > > .topicsPattern("my-property/my-ns/.*") > > > > > .subscriptionName("sub" + i) > > > > > .enableRetry(true) > > > > > > > > > > > > > > > > > > > .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()) > > > > > > > > > > .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) > > > > > .subscribe(); > > > > > Message<byte[]> message = consumer.receive(); > > > > > log.info("consumer received message : {} {}", > > > > > message.getMessageId(), new String(message.getData())); > > > > > consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); > > > > > consumer.close(); > > > > > } > > > > > > > > > > Set<String> tps = > > > > > > > > > > > > > > > > > > > pulsar.getBrokerService().getTopics().keys().stream().collect(Collectors.toSet()); > > > > > try { > > > > > for (String tp : tps) { > > > > > assertTrue(howManyKeyWordRetryInTopicName(tp, > > > > > RETRY_GROUP_TOPIC_SUFFIX) <= 1, tp); > > > > > assertTrue(howManyKeyWordRetryInTopicName(tp, > > > > > DLQ_GROUP_TOPIC_SUFFIX) <= 1, tp); > > > > > } > > > > > } finally { > > > > > // cleanup. > > > > > for (String tp : tps){ > > > > > if (tp.startsWith(topic)) { > > > > > admin.topics().delete(tp ,true); > > > > > } > > > > > } > > > > > } > > > > > } > > > > > > > > > > private int howManyKeyWordRetryInTopicName(String topicName, > > String > > > > > keyWord) { > > > > > int retryCountInTopicName = 0; > > > > > String tpCp = topicName; > > > > > while (true) { > > > > > int index = tpCp.indexOf(keyWord); > > > > > if (index < 0) { > > > > > break; > > > > > } > > > > > tpCp = tpCp.substring(index + keyWord.length()); > > > > > retryCountInTopicName++; > > > > > } > > > > > return retryCountInTopicName; > > > > > } > > > > > ``` > > > > > > > > > > Thanks > > > > > Yubiao Feng > > > > > > > > > > > > > >