Naci Simsek created FLINK-35565:
-----------------------------------
Summary: Flink KafkaSource Batch Job Gets Into Infinite Loop after
Resetting Offset
Key: FLINK-35565
URL: https://issues.apache.org/jira/browse/FLINK-35565
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: kafka-3.1.0
Environment: This is reproduced on a *Flink 1.18.1* with the latest
Kafka connector 3.1.0-1.18 on a session cluster.
Reporter: Naci Simsek
Attachments: image-2024-06-11-11-19-09-889.png,
taskmanager_localhost_54489-ac092a_log.txt
h2. Summary
Flink batch job gets into an infinite fetch loop and could not gracefully
finish if the connected Kafka topic is empty and starting offset value in Flink
job is lower than the current start/end offset of the related topic. See below
for details:
h2. How to reproduce
Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events
from Kafka topic.
Related Kafka topic is empty, there are no events, and the offset value is as
below: *15*
!image-2024-06-11-11-19-09-889.png|width=895,height=256!
Flink job uses a *specific starting offset* value, which is +*less*+ than the
current offset of the topic/partition.
See below, it set as “4”
{{}}
{code:java}
package naci.grpId;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.kafka.common.TopicPartition;
import java.util.HashMap;
import java.util.Map;
public class KafkaSource_Print {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// Define the specific offsets for the partitions
Map<TopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // Start
from offset 4 for partition 0
KafkaSource<String> kafkaSource = KafkaSource
.<String>builder()
.setBootstrapServers("localhost:9093") // Make sure the port
is correct
.setTopics("topic_test")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
.setBounded(OffsetsInitializer.latest())
.build();
DataStream<String> stream = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
stream.print();
env.execute("Flink KafkaSource test job");
}
}{code}
{{}}
Here are the initial logs printed related to the offset, as soon as the job
gets submitted:
{{}}
{code:java}
2024-05-30 12:15:50,010 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding
split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4,
StoppingOffset: 15]]
2024-05-30 12:15:50,069 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare
to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4,
StoppingOffset: 15]]]
2024-05-30 12:15:50,074 TRACE
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] -
Seeking starting offsets to specified offsets: {topic_test-0=4}
2024-05-30 12:15:50,074 INFO org.apache.kafka.clients.consumer.KafkaConsumer
[] - [Consumer clientId=KafkaSource--2381765882724812354-0,
groupId=null] Seeking to offset 4 for partition topic_test-0
2024-05-30 12:15:50,075 DEBUG
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] -
SplitsChange handling result: [topic_test-0, start:4, stop: 15]
2024-05-30 12:15:50,075 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Finished running task AddSplitsTask: [[[Partition: topic_test-0,
StartingOffset: 4, StoppingOffset: 15]]]
2024-05-30 12:15:50,075 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare
to run FetchTask{code}
{{}}
Since the starting offset {color:#FF0000}*4*{color} is *out of range* for the
Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task
manager logs:
{{}}
{code:java}
2024-05-30 12:15:50,193 INFO
org.apache.kafka.clients.consumer.internals.Fetcher [] - [Consumer
clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position
FetchPosition{offset=4, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack:
null)], epoch=0}} is out of range for partition topic_test-0, resetting offset
2024-05-30 12:15:50,195 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer
clientId=KafkaSource--2381765882724812354-0, groupId=null] Resetting offset for
partition topic_test-0 to position FetchPosition{offset=15,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack:
null)], epoch=0}}.{code}
{{}}
Then, an {color:#FF0000}*infinite {{FetchTask}} loop*{color} starts:
{{}}
{code:java}
2024-05-30 12:16:00,079 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Finished running task FetchTask
2024-05-30 12:16:00,079 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare
to run FetchTask
2024-05-30 12:16:00,079 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current
fetch is finished.
2024-05-30 12:16:00,080 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:06,288 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:08,755 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:10,082 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Finished running task FetchTask
2024-05-30 12:16:10,082 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare
to run FetchTask
2024-05-30 12:16:10,082 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting
next source data batch from queue
2024-05-30 12:16:10,082 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current
fetch is finished.
2024-05-30 12:16:10,082 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:16,290 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:17,393 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received file
upload request for file LOG
2024-05-30 12:16:17,394 DEBUG org.apache.flink.runtime.blob.BlobClient
[] - PUT BLOB stream to /127.0.0.1:55663.
2024-05-30 12:16:18,757 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:20,084 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Finished running task FetchTask
2024-05-30 12:16:20,084 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting
next source data batch from queue
2024-05-30 12:16:20,084 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare
to run FetchTask
2024-05-30 12:16:20,084 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current
fetch is finished.
2024-05-30 12:16:20,084 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:26,293 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:28,761 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:30,086 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Finished running task FetchTask
2024-05-30 12:16:30,086 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting
next source data batch from queue
2024-05-30 12:16:30,086 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare
to run FetchTask
2024-05-30 12:16:30,086 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current
fetch is finished.
2024-05-30 12:16:30,086 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:36,296 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:38,762 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:40,087 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Finished running task FetchTask
2024-05-30 12:16:40,087 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting
next source data batch from queue
2024-05-30 12:16:40,087 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare
to run FetchTask
2024-05-30 12:16:40,088 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current
fetch is finished.
2024-05-30 12:16:40,088 TRACE
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source
reader status: NOTHING_AVAILABLE
2024-05-30 12:16:46,297 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
heartbeat request from df54e7abdfa0095dc5c214b056153dea.
2024-05-30 12:16:48,765 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
heartbeat request from e1746de110bfdd23c7dba50f3b083621.
2024-05-30 12:16:50,089 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Finished running task FetchTask{code}
{{}}
The loop *ends* as soon as I *add* a *new event* on this Kafka topic, which
will be placed in offset 15.
{+}*Expected Result*{+}: Since this is a batch job, and since there is no event
on the Kafka topic, right after offset reset, Flink connector should identify
that there is no events to process, and gracefully finish the application.
{+}*Actual Result*{+}: Flink connector infinitely tries to fetch an event from
offset:15 which actually exists but no events on that offset, application keep
fetching that same offset!
This issue is +*NOT*+ happening if the above Flink application sets a *starting
offset* +*15*+ or {+}*higher*{+}! If it is given as 15 or higher, no offset
reset is performed, and the Flink application gracefully finishes!
This is reproduced on a *Flink 1.18.1* with the latest Kafka connector
3.1.0-1.18 on a session cluster.
Logs are attached.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)