GitHub user robshep added a comment to the discussion: Cannot parse <null> 
schema

Hi, thanks for taking a look. 
There are 3 topics in use in this deployment. 
All have schemas. 
None of this code, or the Payload objects have changed for many months.

I notice in App1 there is another topic `-RETRY` created presumably to handle 
redelivery.  I don't maintain a schema here. 

The log above is from restarting `App1` below but I have seen the same issue 
with the "Null Schema" also when restarting `App3` also. 

**App1:**
_1 x consumer, 1 x producer_

```
@Bean(destroyMethod = "close")
@Autowired
public Consumer<UpdateMessageV1> updateListener(PulsarUpdateListener 
pulsarUpdateListener) throws PulsarClientException {
        return pulsarClient().newConsumer(AvroSchema.of(UpdateMessageV1.class))
                .messageListener(pulsarUpdateListener)
                .subscriptionMode(SubscriptionMode.Durable)
                .subscriptionName("app-tp-updater")
                .negativeAckRedeliveryDelay(20, TimeUnit.SECONDS) // try again 
after 20 seconds
                .topic("persistent://myorg/myappB/update")
                .enableRetry(true)
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();
    }

    @Bean(destroyMethod = "close")
    public Producer notifyExpiryProducer() throws PulsarClientException {
        return 
pulsarClient().newProducer(AvroSchema.of(ExpiryNotificationV1.class))
                .topic("persistent://myorg/myappB/notify:expiry")
                .create();
    }
```

**App 2:**
_1 x producer_
```
final Producer<IngressMessageV1> producer = 
pulsarClient.newProducer(AvroSchema.of(IngressMessageV1.class))
    .topic("persistent://myorg/myappA/ingress")
    .create();
```


**App3:**
_1 x producer, 2 x consumer_

```
producer = pulsarClient.newProducer(AvroSchema.of(UpdateMessageV1.class))
                        .producerName("myapp-tp-update-consumer")
                        .topic("persistent://myorg/myappB/update")
                        .create();

consumerTPNotifsV1 = 
pulsarClient.newConsumer(AvroSchema.of(ExpiryNotificationV1.class))
                .subscriptionName("app-tp-expiry")
                .topic( "persistent://myorg/appB/notify:expiry")
                .subscribe();

consumerIngressV1 = 
pulsarClient.newConsumer(AvroSchema.of(IngressMessageV1.class))
                .subscriptionName("myapp-ingress")
                .topic("persistent://myorg/myappA/ingress")
                .subscribe();

```


Thanks

Rob


GitHub link: 
https://github.com/apache/pulsar/discussions/18948#discussioncomment-4414960

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscr...@pulsar.apache.org

Reply via email to