Hi,

Why do i receiving an acknowledged messages from PulsarSource?
When i startup the below program (*1), I receiving same messages from 
PulsarSource every time.
On the other hand, programs using PulsarClient will never receive acknowledged 
messages.(*2)

How can i receive only unacknowledged messages with PulsarSource?

---(*1)---
        public static void main(String[] args) throws Exception {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().setParallelism(1);
                env.enableCheckpointing(1000);
                CheckpointConfig cc = env.getCheckpointConfig();
                boolean ck = cc.isCheckpointingEnabled();

                PulsarSource<Person> source = PulsarSource.builder()
                                .setServiceUrl("pulsar://192.168.1.10:6650")
                                .setStartCursor(StartCursor.earliest())
                                
.setTopics("persistent://public/default/my-topic")
                                .setSubscriptionName("my-subscription")
                                
.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)
                                
.setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1000)
                                
.setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_TIME, 2000L)
                                
.setDeserializationSchema(Schema.JSON(Person.class), Person.class)
                                .build();

                env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar 
Source")
                                .addSink(new SinkFunction<Person>() {
                                        @Override
                                        public void invoke(Person value, 
Context context) throws Exception {
                                                LOG.info(value.Name);

                                        }
                                });

                env.execute("sample");
        }
----------



---(*2)---
                PulsarClient client = PulsarClient
                                .builder()
                                .serviceUrl("pulsar://192.168.1.10:6650")
                                .build();
                Consumer<String> consumer = client
                                .newConsumer(Schema.STRING)
                                .subscriptionName("my-subscription")
                                .topic("persistent://public/default/my-topic")
                                .subscribe();
                while (true) {
                        Message<String> msg = consumer.receive();
                        System.out.println("Received message: " + 
msg.getValue());
                        consumer.acknowledge(msg);
                }
----------


Java18
Run on Eclipse
Flink 1.18.1
flink-connector-pulsar 4.1.0-1.18.0
Pulsar 3.1.2 standalone on Ubuntu VM (192.168.1.10:6650)


Best regards.
Tatsu


Reply via email to