Hi all, I have a local Flink SQL app with Kafka source running with 3 partitions (0,1,2). I am running the following code:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.setMaxParallelism(env.getParallelism() * 8); env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval()); final TableConfig tConfig = tEnv.getConfig(); tConfig.setIdleStateRetention(Duration.ofMinutes(60)); tConfig.getConfiguration().setString("table.exec.source.idle-timeout", "180000 ms"); To Test locally: 1. I sent some data to Partitions 0 with timestamp 150. 2. Sent some data to Partition 1 with timestamp 155. 3. I waited for 3 minutes ( infact waited a bit longer) 4. This should have marked P2 as idle and hence watermarks shoudl advance and data from P0 and P1 should be processed. 5. However, did not see any watermarks advance 6. Now when I sent data to Partition 2 with timestamp 160, I see Flink advanced its watermark to 150 (i.e from first data sent and processed data in P0). Ideally, I would expect to see Flink advance its watermark 3 minutes after sending data to P0 and P1 if my understanding of the "table.exec.source.idle-timeout" is correct. I also tried populating all three partitions and then repeating to only send data to some partitions. This too had no effect. On one occassion, I ran into an exception in the Flink UI but I never saw this exception again (see bottom) Posted in SO: https://stackoverflow.com/questions/69729366/flink-sql-does-not-honor-table-exec-source-idle-timeout-setting But did not get any responses. Could some one confirm the way I am setting things up and if the expected behavior is correct? 2021-10-26 16:38:14 java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52) at java.base/java.util.Optional.ifPresent(Unknown Source) at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51) at java.base/java.util.HashMap.forEach(Unknown Source) at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51) at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37) at org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37) at org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798) at java.base/java.util.HashMap.forEach(Unknown Source) at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774) at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)