[ 
https://issues.apache.org/jira/browse/FLINK-35565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Naci Simsek updated FLINK-35565:
--------------------------------
    Description: 
h2. Summary

Flink batch job gets into an infinite fetch loop and could not gracefully 
finish if the connected Kafka topic is empty and starting offset value in Flink 
job is lower than the current start/end offset of the related topic. See below 
for details:
h2. How to reproduce

Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events 
from Kafka topic.

Related Kafka topic is empty, there are no events, and the offset value is as 
below: *15*

!image-2024-06-11-11-19-09-889.png|width=895,height=256!

 

Flink job uses a *specific starting offset* value, which is +*less*+ than the 
current offset of the topic/partition.

See below, it set as “4”

 
{code:java}
package naci.grpId;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.kafka.common.TopicPartition;

import java.util.HashMap;
import java.util.Map;

public class KafkaSource_Print {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        // Define the specific offsets for the partitions
        Map<TopicPartition, Long> specificOffsets = new HashMap<>();
        specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // Start 
from offset 4 for partition 0

        KafkaSource<String> kafkaSource = KafkaSource
                .<String>builder()
                .setBootstrapServers("localhost:9093")  // Make sure the port 
is correct
                .setTopics("topic_test")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
                .setBounded(OffsetsInitializer.latest())
                .build();

        DataStream<String> stream = env.fromSource(
                kafkaSource,
                WatermarkStrategy.noWatermarks(),
                "Kafka Source"
        );
        stream.print();

        env.execute("Flink KafkaSource test job");
    }
}{code}
 

 

Here are the initial logs printed related to the offset, as soon as the job 
gets submitted:

 
{code:java}
2024-05-30 12:15:50,010 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]
2024-05-30 12:15:50,069 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]]
2024-05-30 12:15:50,074 TRACE 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
Seeking starting offsets to specified offsets: {topic_test-0=4}
2024-05-30 12:15:50,074 INFO  org.apache.kafka.clients.consumer.KafkaConsumer   
           [] - [Consumer clientId=KafkaSource--2381765882724812354-0, 
groupId=null] Seeking to offset 4 for partition topic_test-0
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
SplitsChange handling result: [topic_test-0, start:4, stop: 15]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task AddSplitsTask: [[[Partition: topic_test-0, 
StartingOffset: 4, StoppingOffset: 15]]]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask{code}
 

Since the starting offset {color:#ff0000}*4*{color} is *out of range* for the 
Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task 
manager logs:

 
{code:java}
2024-05-30 12:15:50,193 INFO  
org.apache.kafka.clients.consumer.internals.Fetcher          [] - [Consumer 
clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position 
FetchPosition{offset=4, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: 
null)], epoch=0}} is out of range for partition topic_test-0, resetting offset
2024-05-30 12:15:50,195 INFO  
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=KafkaSource--2381765882724812354-0, groupId=null] Resetting offset for 
partition topic_test-0 to position FetchPosition{offset=15, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: 
null)], epoch=0}}.{code}
 

 

Then, an {color:#ff0000}*infinite {{FetchTask}} loop*{color} starts:

 
{code:java}
2024-05-30 12:16:00,079 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask
2024-05-30 12:16:00,079 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask
2024-05-30 12:16:00,079 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
fetch is finished.
2024-05-30 12:16:00,080 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:06,288 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:08,755 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:10,082 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask
2024-05-30 12:16:10,082 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask
2024-05-30 12:16:10,082 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
next source data batch from queue
2024-05-30 12:16:10,082 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
fetch is finished.
2024-05-30 12:16:10,082 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:16,290 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:17,393 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received file 
upload request for file LOG
2024-05-30 12:16:17,394 DEBUG org.apache.flink.runtime.blob.BlobClient          
           [] - PUT BLOB stream to /127.0.0.1:55663.
2024-05-30 12:16:18,757 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:20,084 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask
2024-05-30 12:16:20,084 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
next source data batch from queue
2024-05-30 12:16:20,084 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask
2024-05-30 12:16:20,084 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
fetch is finished.
2024-05-30 12:16:20,084 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:26,293 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:28,761 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:30,086 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask
2024-05-30 12:16:30,086 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
next source data batch from queue
2024-05-30 12:16:30,086 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask
2024-05-30 12:16:30,086 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
fetch is finished.
2024-05-30 12:16:30,086 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:36,296 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:38,762 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:40,087 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask
2024-05-30 12:16:40,087 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
next source data batch from queue
2024-05-30 12:16:40,087 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask
2024-05-30 12:16:40,088 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
fetch is finished.
2024-05-30 12:16:40,088 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:46,297 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:48,765 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:50,089 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask{code}
 

The loop *ends* as soon as I *add* a *new event* on this Kafka topic, which 
will be placed in offset 15.

{+}*Expected Result*{+}: Since this is a batch job, and since there is no event 
on the Kafka topic, right after offset reset, Flink connector should identify 
that there is no events to process, and gracefully finish the application.

{+}*Actual Result*{+}: Flink connector infinitely tries to fetch an event from 
offset:15 which actually exists but no events on that offset, application keep 
fetching that same offset!

 

This issue is +*NOT*+ happening if the above Flink application sets a *starting 
offset* +*15*+ or {+}*higher*{+}! If it is given as 15 or higher, no offset 
reset is performed, and the Flink application gracefully finishes!

This is reproduced on a *Flink 1.18.1* with the latest Kafka connector 
3.1.0-1.18 on a session cluster.

Logs are attached.

  was:
h2. Summary

Flink batch job gets into an infinite fetch loop and could not gracefully 
finish if the connected Kafka topic is empty and starting offset value in Flink 
job is lower than the current start/end offset of the related topic. See below 
for details:
h2. How to reproduce

Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events 
from Kafka topic.

Related Kafka topic is empty, there are no events, and the offset value is as 
below: *15*

!image-2024-06-11-11-19-09-889.png|width=895,height=256!

 

Flink job uses a *specific starting offset* value, which is +*less*+ than the 
current offset of the topic/partition.

See below, it set as “4”

{{}}
{code:java}
package naci.grpId;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.kafka.common.TopicPartition;

import java.util.HashMap;
import java.util.Map;

public class KafkaSource_Print {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        // Define the specific offsets for the partitions
        Map<TopicPartition, Long> specificOffsets = new HashMap<>();
        specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // Start 
from offset 4 for partition 0

        KafkaSource<String> kafkaSource = KafkaSource
                .<String>builder()
                .setBootstrapServers("localhost:9093")  // Make sure the port 
is correct
                .setTopics("topic_test")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
                .setBounded(OffsetsInitializer.latest())
                .build();

        DataStream<String> stream = env.fromSource(
                kafkaSource,
                WatermarkStrategy.noWatermarks(),
                "Kafka Source"
        );
        stream.print();

        env.execute("Flink KafkaSource test job");
    }
}{code}
{{}}

 

Here are the initial logs printed related to the offset, as soon as the job 
gets submitted:

{{}}
{code:java}
2024-05-30 12:15:50,010 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]
2024-05-30 12:15:50,069 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]]
2024-05-30 12:15:50,074 TRACE 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
Seeking starting offsets to specified offsets: {topic_test-0=4}
2024-05-30 12:15:50,074 INFO  org.apache.kafka.clients.consumer.KafkaConsumer   
           [] - [Consumer clientId=KafkaSource--2381765882724812354-0, 
groupId=null] Seeking to offset 4 for partition topic_test-0
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
SplitsChange handling result: [topic_test-0, start:4, stop: 15]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task AddSplitsTask: [[[Partition: topic_test-0, 
StartingOffset: 4, StoppingOffset: 15]]]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask{code}
{{}}

 

Since the starting offset {color:#FF0000}*4*{color} is *out of range* for the 
Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task 
manager logs:

{{}}
{code:java}
2024-05-30 12:15:50,193 INFO  
org.apache.kafka.clients.consumer.internals.Fetcher          [] - [Consumer 
clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position 
FetchPosition{offset=4, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: 
null)], epoch=0}} is out of range for partition topic_test-0, resetting offset
2024-05-30 12:15:50,195 INFO  
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=KafkaSource--2381765882724812354-0, groupId=null] Resetting offset for 
partition topic_test-0 to position FetchPosition{offset=15, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: 
null)], epoch=0}}.{code}
{{}}

 

Then, an {color:#FF0000}*infinite {{FetchTask}} loop*{color} starts:

{{}}
{code:java}
2024-05-30 12:16:00,079 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask
2024-05-30 12:16:00,079 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask
2024-05-30 12:16:00,079 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
fetch is finished.
2024-05-30 12:16:00,080 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:06,288 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:08,755 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:10,082 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask
2024-05-30 12:16:10,082 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask
2024-05-30 12:16:10,082 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
next source data batch from queue
2024-05-30 12:16:10,082 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
fetch is finished.
2024-05-30 12:16:10,082 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:16,290 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:17,393 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received file 
upload request for file LOG
2024-05-30 12:16:17,394 DEBUG org.apache.flink.runtime.blob.BlobClient          
           [] - PUT BLOB stream to /127.0.0.1:55663.
2024-05-30 12:16:18,757 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:20,084 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask
2024-05-30 12:16:20,084 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
next source data batch from queue
2024-05-30 12:16:20,084 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask
2024-05-30 12:16:20,084 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
fetch is finished.
2024-05-30 12:16:20,084 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:26,293 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:28,761 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:30,086 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask
2024-05-30 12:16:30,086 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
next source data batch from queue
2024-05-30 12:16:30,086 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask
2024-05-30 12:16:30,086 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
fetch is finished.
2024-05-30 12:16:30,086 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:36,296 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:38,762 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:40,087 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask
2024-05-30 12:16:40,087 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
next source data batch from queue
2024-05-30 12:16:40,087 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask
2024-05-30 12:16:40,088 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
fetch is finished.
2024-05-30 12:16:40,088 TRACE 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:46,297 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:48,765 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:50,089 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task FetchTask{code}
{{}}

The loop *ends* as soon as I *add* a *new event* on this Kafka topic, which 
will be placed in offset 15.

{+}*Expected Result*{+}: Since this is a batch job, and since there is no event 
on the Kafka topic, right after offset reset, Flink connector should identify 
that there is no events to process, and gracefully finish the application.

{+}*Actual Result*{+}: Flink connector infinitely tries to fetch an event from 
offset:15 which actually exists but no events on that offset, application keep 
fetching that same offset!

 

This issue is +*NOT*+ happening if the above Flink application sets a *starting 
offset* +*15*+ or {+}*higher*{+}! If it is given as 15 or higher, no offset 
reset is performed, and the Flink application gracefully finishes!

This is reproduced on a *Flink 1.18.1* with the latest Kafka connector 
3.1.0-1.18 on a session cluster.

Logs are attached.


> Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset
> --------------------------------------------------------------------------
>
>                 Key: FLINK-35565
>                 URL: https://issues.apache.org/jira/browse/FLINK-35565
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: kafka-3.1.0
>         Environment: This is reproduced on a *Flink 1.18.1* with the latest 
> Kafka connector 3.1.0-1.18 on a session cluster.
>            Reporter: Naci Simsek
>            Priority: Major
>         Attachments: image-2024-06-11-11-19-09-889.png, 
> taskmanager_localhost_54489-ac092a_log.txt
>
>
> h2. Summary
> Flink batch job gets into an infinite fetch loop and could not gracefully 
> finish if the connected Kafka topic is empty and starting offset value in 
> Flink job is lower than the current start/end offset of the related topic. 
> See below for details:
> h2. How to reproduce
> Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events 
> from Kafka topic.
> Related Kafka topic is empty, there are no events, and the offset value is as 
> below: *15*
> !image-2024-06-11-11-19-09-889.png|width=895,height=256!
>  
> Flink job uses a *specific starting offset* value, which is +*less*+ than the 
> current offset of the topic/partition.
> See below, it set as “4”
>  
> {code:java}
> package naci.grpId;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.connector.kafka.source.KafkaSource;
> import 
> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.kafka.common.TopicPartition;
> import java.util.HashMap;
> import java.util.Map;
> public class KafkaSource_Print {
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>         // Define the specific offsets for the partitions
>         Map<TopicPartition, Long> specificOffsets = new HashMap<>();
>         specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // 
> Start from offset 4 for partition 0
>         KafkaSource<String> kafkaSource = KafkaSource
>                 .<String>builder()
>                 .setBootstrapServers("localhost:9093")  // Make sure the port 
> is correct
>                 .setTopics("topic_test")
>                 .setValueOnlyDeserializer(new SimpleStringSchema())
>                 
> .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
>                 .setBounded(OffsetsInitializer.latest())
>                 .build();
>         DataStream<String> stream = env.fromSource(
>                 kafkaSource,
>                 WatermarkStrategy.noWatermarks(),
>                 "Kafka Source"
>         );
>         stream.print();
>         env.execute("Flink KafkaSource test job");
>     }
> }{code}
>  
>  
> Here are the initial logs printed related to the offset, as soon as the job 
> gets submitted:
>  
> {code:java}
> 2024-05-30 12:15:50,010 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
> split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, 
> StoppingOffset: 15]]
> 2024-05-30 12:15:50,069 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, 
> StoppingOffset: 15]]]
> 2024-05-30 12:15:50,074 TRACE 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
> Seeking starting offsets to specified offsets: {topic_test-0=4}
> 2024-05-30 12:15:50,074 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
>              [] - [Consumer clientId=KafkaSource--2381765882724812354-0, 
> groupId=null] Seeking to offset 4 for partition topic_test-0
> 2024-05-30 12:15:50,075 DEBUG 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
> SplitsChange handling result: [topic_test-0, start:4, stop: 15]
> 2024-05-30 12:15:50,075 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task AddSplitsTask: [[[Partition: topic_test-0, 
> StartingOffset: 4, StoppingOffset: 15]]]
> 2024-05-30 12:15:50,075 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask{code}
>  
> Since the starting offset {color:#ff0000}*4*{color} is *out of range* for the 
> Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task 
> manager logs:
>  
> {code:java}
> 2024-05-30 12:15:50,193 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher          [] - [Consumer 
> clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position 
> FetchPosition{offset=4, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: 
> null)], epoch=0}} is out of range for partition topic_test-0, resetting offset
> 2024-05-30 12:15:50,195 INFO  
> org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
> clientId=KafkaSource--2381765882724812354-0, groupId=null] Resetting offset 
> for partition topic_test-0 to position FetchPosition{offset=15, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: 
> null)], epoch=0}}.{code}
>  
>  
> Then, an {color:#ff0000}*infinite {{FetchTask}} loop*{color} starts:
>  
> {code:java}
> 2024-05-30 12:16:00,079 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask
> 2024-05-30 12:16:00,079 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask
> 2024-05-30 12:16:00,079 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
> fetch is finished.
> 2024-05-30 12:16:00,080 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:06,288 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:08,755 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:10,082 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask
> 2024-05-30 12:16:10,082 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask
> 2024-05-30 12:16:10,082 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
> next source data batch from queue
> 2024-05-30 12:16:10,082 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
> fetch is finished.
> 2024-05-30 12:16:10,082 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:16,290 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:17,393 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> file upload request for file LOG
> 2024-05-30 12:16:17,394 DEBUG org.apache.flink.runtime.blob.BlobClient        
>              [] - PUT BLOB stream to /127.0.0.1:55663.
> 2024-05-30 12:16:18,757 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:20,084 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask
> 2024-05-30 12:16:20,084 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
> next source data batch from queue
> 2024-05-30 12:16:20,084 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask
> 2024-05-30 12:16:20,084 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
> fetch is finished.
> 2024-05-30 12:16:20,084 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:26,293 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:28,761 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:30,086 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask
> 2024-05-30 12:16:30,086 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
> next source data batch from queue
> 2024-05-30 12:16:30,086 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask
> 2024-05-30 12:16:30,086 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
> fetch is finished.
> 2024-05-30 12:16:30,086 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:36,296 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:38,762 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:40,087 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask
> 2024-05-30 12:16:40,087 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting 
> next source data batch from queue
> 2024-05-30 12:16:40,087 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run FetchTask
> 2024-05-30 12:16:40,088 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current 
> fetch is finished.
> 2024-05-30 12:16:40,088 TRACE 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source 
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:46,297 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:48,765 DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received 
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:50,089 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task FetchTask{code}
>  
> The loop *ends* as soon as I *add* a *new event* on this Kafka topic, which 
> will be placed in offset 15.
> {+}*Expected Result*{+}: Since this is a batch job, and since there is no 
> event on the Kafka topic, right after offset reset, Flink connector should 
> identify that there is no events to process, and gracefully finish the 
> application.
> {+}*Actual Result*{+}: Flink connector infinitely tries to fetch an event 
> from offset:15 which actually exists but no events on that offset, 
> application keep fetching that same offset!
>  
> This issue is +*NOT*+ happening if the above Flink application sets a 
> *starting offset* +*15*+ or {+}*higher*{+}! If it is given as 15 or higher, 
> no offset reset is performed, and the Flink application gracefully finishes!
> This is reproduced on a *Flink 1.18.1* with the latest Kafka connector 
> 3.1.0-1.18 on a session cluster.
> Logs are attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to