Hi!

What's the value of your config.autowatermarkInterval()? It must be larger
than 0 for table.exec.source.idle-timeout to work. More specifically, auto
watermark is to avoid sending watermark for each record (which reduces the
performance) but instead sends watermark once in each auto watermark
interval. When an interval times up, Flink will check if any record has
arrived during this interval and if there is none the idle source logic
kicks in.

Makhanchan Pandey <makhanchanpan...@gmail.com> 于2021年10月28日周四 上午4:01写道:

> Hi all,
>
> I have a local Flink SQL app with Kafka source running with 3 partitions
> (0,1,2).
> I am running the following code:
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> final EnvironmentSettings settings =
>         
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> settings);
>
> env.setMaxParallelism(env.getParallelism() * 8);
> env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval());
>
> final TableConfig tConfig = tEnv.getConfig();
> tConfig.setIdleStateRetention(Duration.ofMinutes(60));
>
> tConfig.getConfiguration().setString("table.exec.source.idle-timeout", 
> "180000 ms");
>
> To Test locally:
> 1. I sent some data to Partitions 0 with timestamp 150.
> 2. Sent some data to Partition 1 with timestamp 155.
> 3. I waited for 3 minutes ( infact waited a bit longer)
> 4. This should have marked P2 as idle and hence watermarks shoudl advance and 
> data from P0 and P1 should be processed.
> 5. However, did not see any watermarks advance
> 6. Now when I sent data to Partition 2 with timestamp 160, I see Flink 
> advanced its watermark to 150 (i.e from first data sent and processed data in 
> P0).
>
> Ideally, I would expect to see Flink advance its watermark 3 minutes after 
> sending data to P0 and P1 if my understanding of the 
> "table.exec.source.idle-timeout"
> is correct.
>
> I also tried populating all three partitions and then repeating to only send 
> data to some partitions. This too had no effect.
>
> On one occassion, I ran into an exception in the Flink UI but I never saw 
> this exception again (see bottom)
> Posted in SO:
> https://stackoverflow.com/questions/69729366/flink-sql-does-not-honor-table-exec-source-idle-timeout-setting
>
> But did not get any responses. Could some one confirm the way I am setting 
> things up and if the expected behavior is correct?
>
> 2021-10-26 16:38:14
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData
>     at 
> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52)
>     at java.base/java.util.Optional.ifPresent(Unknown Source)
>     at 
> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51)
>     at java.base/java.util.HashMap.forEach(Unknown Source)
>     at 
> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51)
>     at 
> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37)
>     at 
> org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798)
>     at java.base/java.util.HashMap.forEach(Unknown Source)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
>     at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)
>
>

Reply via email to