Hi, Why do i receiving an acknowledged messages from PulsarSource? When i startup the below program (*1), I receiving same messages from PulsarSource every time. On the other hand, programs using PulsarClient will never receive acknowledged messages.(*2)
How can i receive only unacknowledged messages with PulsarSource? ---(*1)--- public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setParallelism(1); env.enableCheckpointing(1000); CheckpointConfig cc = env.getCheckpointConfig(); boolean ck = cc.isCheckpointingEnabled(); PulsarSource<Person> source = PulsarSource.builder() .setServiceUrl("pulsar://192.168.1.10:6650") .setStartCursor(StartCursor.earliest()) .setTopics("persistent://public/default/my-topic") .setSubscriptionName("my-subscription") .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true) .setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1000) .setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_TIME, 2000L) .setDeserializationSchema(Schema.JSON(Person.class), Person.class) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source") .addSink(new SinkFunction<Person>() { @Override public void invoke(Person value, Context context) throws Exception { LOG.info(value.Name); } }); env.execute("sample"); } ---------- ---(*2)--- PulsarClient client = PulsarClient .builder() .serviceUrl("pulsar://192.168.1.10:6650") .build(); Consumer<String> consumer = client .newConsumer(Schema.STRING) .subscriptionName("my-subscription") .topic("persistent://public/default/my-topic") .subscribe(); while (true) { Message<String> msg = consumer.receive(); System.out.println("Received message: " + msg.getValue()); consumer.acknowledge(msg); } ---------- Java18 Run on Eclipse Flink 1.18.1 flink-connector-pulsar 4.1.0-1.18.0 Pulsar 3.1.2 standalone on Ubuntu VM (192.168.1.10:6650) Best regards. Tatsu