Sounds like a bug for sure.
How did you plan on solving it?

On Mon, Feb 13, 2023 at 12:46 AM Enrico Olivelli <eolive...@gmail.com>
wrote:

> Il Dom 12 Feb 2023, 04:42 Yubiao Feng <yubiao.f...@streamnative.io
> .invalid>
> ha scritto:
>
> > Hi Enrico Olivelli
> >
> > > It is good to help users to not fall into bad situations but on the
> other
> > case we cannot deal with many silly configurations that you could set up,
> > like creating a pipeline of functions that in the end create a cycle.
> >
> > Sorry, this test just helps to reproduce the problem quickly. The reality
> > is that there is only one consumer, but every restart triggers this issue
> > and ends up with a topic like this:
> > "persistent://public/default/tp1-sub1-RETRY-sub1-RETRY-sub1-RETRY...."
> >
> > > I wonder if we could simply document this fact instead of adding code
> >
> > ```java
> > Consumer<> consumer = pulsarClient.newConsumer()
> > .topicsPattern("my-property/my-ns/.*").subscriptionName("sub1")
> > .enableRetry(true)
> > ```
> >
> > With the client restarted, the code above will reproduce the problem.
> >
>
>
> I see the problem now.
>
> We must do something for this case. It must not happen. We have to fix it
>
> Thanks for your clarification
>
> Enrico
>
> >
> > On Sun, Feb 12, 2023 at 3:31 AM Enrico Olivelli <eolive...@gmail.com>
> > wrote:
> >
> > > Yubiao,
> > >
> > > Il Sab 11 Feb 2023, 19:06 Yubiao Feng <yubiao.f...@streamnative.io
> > > .invalid>
> > > ha scritto:
> > >
> > > > Hi community
> > > >
> > > > I am starting a DISCUSS for "Retry topic should not create for a
> retry
> > > > topic."
> > > >
> > > > If we use regex-topic consumer and enable retry, it is possible to
> > create
> > > > such a topic
> > > >
> "persistent://public/default/tp1-sub1-RETRY-sub2-RETRY-sub3-RETRY....".
> > > You
> > > > can reproduce this by using the test below.
> > > >
> > > > It probably doesn't make sense to create a RETRY/DLQ topic on
> > RETRY/DLQ.
> > > We
> > > > should avoid this scenario if users use the default configuration
> > (users
> > > > can enable it if they need it).
> > > >
> > >
> > > I agree that this is a bad case.
> > > But should we really care?
> > >
> > > You must do it very intentionally.
> > > It is good to help users to not fall into bad situations but on the
> other
> > > case we cannot deal with many silly configurations that you could set
> up,
> > > like creating a pipeline of functions that in the end create a cycle.
> > >
> > >
> > > I wonder if we could simply document this fact instead of adding code
> > >
> > >
> > > Enrico
> > >
> > >
> > >
> > >
> > > > ```java
> > > >     @Test
> > > >     public void testRetryTopicWillNotCreatedForRetryTopic() throws
> > > > Exception {
> > > >         final String topic = "persistent://my-property/my-ns/tp1";
> > > >         Producer<byte[]> producer =
> > > pulsarClient.newProducer(Schema.BYTES)
> > > >                 .topic(topic)
> > > >                 .create();
> > > >         for (int i = 0; i < 100; i++) {
> > > >             producer.send(String.format("Hello Pulsar [%d]",
> > > > i).getBytes());
> > > >         }
> > > >         producer.close();
> > > >
> > > >         for (int i =0; i< 10; i++) {
> > > >             Consumer<byte[]> consumer =
> > > > pulsarClient.newConsumer(Schema.BYTES)
> > > >                     .topicsPattern("my-property/my-ns/.*")
> > > >                     .subscriptionName("sub" + i)
> > > >                     .enableRetry(true)
> > > >
> > > >
> > >
> >
> .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build())
> > > >
> > > > .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
> > > >                     .subscribe();
> > > >             Message<byte[]> message = consumer.receive();
> > > >             log.info("consumer received message : {} {}",
> > > > message.getMessageId(), new String(message.getData()));
> > > >             consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
> > > >             consumer.close();
> > > >         }
> > > >
> > > >         Set<String> tps =
> > > >
> > > >
> > >
> >
> pulsar.getBrokerService().getTopics().keys().stream().collect(Collectors.toSet());
> > > >         try {
> > > >             for (String tp : tps) {
> > > >                 assertTrue(howManyKeyWordRetryInTopicName(tp,
> > > > RETRY_GROUP_TOPIC_SUFFIX) <= 1, tp);
> > > >                 assertTrue(howManyKeyWordRetryInTopicName(tp,
> > > > DLQ_GROUP_TOPIC_SUFFIX) <= 1, tp);
> > > >             }
> > > >         } finally {
> > > >             // cleanup.
> > > >             for (String tp : tps){
> > > >                 if (tp.startsWith(topic)) {
> > > >                     admin.topics().delete(tp ,true);
> > > >                 }
> > > >             }
> > > >         }
> > > >     }
> > > >
> > > >     private int howManyKeyWordRetryInTopicName(String topicName,
> String
> > > > keyWord) {
> > > >         int retryCountInTopicName = 0;
> > > >         String tpCp = topicName;
> > > >         while (true) {
> > > >             int index = tpCp.indexOf(keyWord);
> > > >             if (index < 0) {
> > > >                 break;
> > > >             }
> > > >             tpCp = tpCp.substring(index + keyWord.length());
> > > >             retryCountInTopicName++;
> > > >         }
> > > >         return  retryCountInTopicName;
> > > >     }
> > > > ```
> > > >
> > > > Thanks
> > > > Yubiao Feng
> > > >
> > >
> >
>

Reply via email to