Hi Enrico Olivelli

> It is good to help users to not fall into bad situations but on the other
case we cannot deal with many silly configurations that you could set up,
like creating a pipeline of functions that in the end create a cycle.

Sorry, this test just helps to reproduce the problem quickly. The reality
is that there is only one consumer, but every restart triggers this issue
and ends up with a topic like this:
"persistent://public/default/tp1-sub1-RETRY-sub1-RETRY-sub1-RETRY...."

> I wonder if we could simply document this fact instead of adding code

```java
Consumer<> consumer = pulsarClient.newConsumer()
.topicsPattern("my-property/my-ns/.*").subscriptionName("sub1")
.enableRetry(true)
```

With the client restarted, the code above will reproduce the problem.

On Sun, Feb 12, 2023 at 3:31 AM Enrico Olivelli <eolive...@gmail.com> wrote:

> Yubiao,
>
> Il Sab 11 Feb 2023, 19:06 Yubiao Feng <yubiao.f...@streamnative.io
> .invalid>
> ha scritto:
>
> > Hi community
> >
> > I am starting a DISCUSS for "Retry topic should not create for a retry
> > topic."
> >
> > If we use regex-topic consumer and enable retry, it is possible to create
> > such a topic
> > "persistent://public/default/tp1-sub1-RETRY-sub2-RETRY-sub3-RETRY....".
> You
> > can reproduce this by using the test below.
> >
> > It probably doesn't make sense to create a RETRY/DLQ topic on RETRY/DLQ.
> We
> > should avoid this scenario if users use the default configuration (users
> > can enable it if they need it).
> >
>
> I agree that this is a bad case.
> But should we really care?
>
> You must do it very intentionally.
> It is good to help users to not fall into bad situations but on the other
> case we cannot deal with many silly configurations that you could set up,
> like creating a pipeline of functions that in the end create a cycle.
>
>
> I wonder if we could simply document this fact instead of adding code
>
>
> Enrico
>
>
>
>
> > ```java
> >     @Test
> >     public void testRetryTopicWillNotCreatedForRetryTopic() throws
> > Exception {
> >         final String topic = "persistent://my-property/my-ns/tp1";
> >         Producer<byte[]> producer =
> pulsarClient.newProducer(Schema.BYTES)
> >                 .topic(topic)
> >                 .create();
> >         for (int i = 0; i < 100; i++) {
> >             producer.send(String.format("Hello Pulsar [%d]",
> > i).getBytes());
> >         }
> >         producer.close();
> >
> >         for (int i =0; i< 10; i++) {
> >             Consumer<byte[]> consumer =
> > pulsarClient.newConsumer(Schema.BYTES)
> >                     .topicsPattern("my-property/my-ns/.*")
> >                     .subscriptionName("sub" + i)
> >                     .enableRetry(true)
> >
> >
> .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build())
> >
> > .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
> >                     .subscribe();
> >             Message<byte[]> message = consumer.receive();
> >             log.info("consumer received message : {} {}",
> > message.getMessageId(), new String(message.getData()));
> >             consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
> >             consumer.close();
> >         }
> >
> >         Set<String> tps =
> >
> >
> pulsar.getBrokerService().getTopics().keys().stream().collect(Collectors.toSet());
> >         try {
> >             for (String tp : tps) {
> >                 assertTrue(howManyKeyWordRetryInTopicName(tp,
> > RETRY_GROUP_TOPIC_SUFFIX) <= 1, tp);
> >                 assertTrue(howManyKeyWordRetryInTopicName(tp,
> > DLQ_GROUP_TOPIC_SUFFIX) <= 1, tp);
> >             }
> >         } finally {
> >             // cleanup.
> >             for (String tp : tps){
> >                 if (tp.startsWith(topic)) {
> >                     admin.topics().delete(tp ,true);
> >                 }
> >             }
> >         }
> >     }
> >
> >     private int howManyKeyWordRetryInTopicName(String topicName, String
> > keyWord) {
> >         int retryCountInTopicName = 0;
> >         String tpCp = topicName;
> >         while (true) {
> >             int index = tpCp.indexOf(keyWord);
> >             if (index < 0) {
> >                 break;
> >             }
> >             tpCp = tpCp.substring(index + keyWord.length());
> >             retryCountInTopicName++;
> >         }
> >         return  retryCountInTopicName;
> >     }
> > ```
> >
> > Thanks
> > Yubiao Feng
> >
>

Reply via email to