Hi all,

I have a local Flink SQL app with Kafka source running with 3 partitions
(0,1,2).
I am running the following code:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings =
        
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
settings);

env.setMaxParallelism(env.getParallelism() * 8);
env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval());

final TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetention(Duration.ofMinutes(60));

tConfig.getConfiguration().setString("table.exec.source.idle-timeout",
"180000 ms");

To Test locally:
1. I sent some data to Partitions 0 with timestamp 150.
2. Sent some data to Partition 1 with timestamp 155.
3. I waited for 3 minutes ( infact waited a bit longer)
4. This should have marked P2 as idle and hence watermarks shoudl
advance and data from P0 and P1 should be processed.
5. However, did not see any watermarks advance
6. Now when I sent data to Partition 2 with timestamp 160, I see Flink
advanced its watermark to 150 (i.e from first data sent and processed
data in P0).

Ideally, I would expect to see Flink advance its watermark 3 minutes
after sending data to P0 and P1 if my understanding of the
"table.exec.source.idle-timeout"
is correct.

I also tried populating all three partitions and then repeating to
only send data to some partitions. This too had no effect.

On one occassion, I ran into an exception in the Flink UI but I never
saw this exception again (see bottom)
Posted in SO:
https://stackoverflow.com/questions/69729366/flink-sql-does-not-honor-table-exec-source-idle-timeout-setting

But did not get any responses. Could some one confirm the way I am
setting things up and if the expected behavior is correct?

2021-10-26 16:38:14
java.lang.NoClassDefFoundError:
org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData
    at 
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52)
    at java.base/java.util.Optional.ifPresent(Unknown Source)
    at 
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at 
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51)
    at 
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37)
    at 
org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37)
    at 
org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at 
org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774)
    at 
org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
    at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)

Reply via email to