Jigar Bhati created FLINK-39888:
-----------------------------------
Summary: [Kafka] Allow configuring offset reset strategy
independently of startup offsets
Key: FLINK-39888
URL: https://issues.apache.org/jira/browse/FLINK-39888
Project: Flink
Issue Type: New Feature
Components: Connectors / Kafka
Reporter: Jigar Bhati
KafkaSource currently uses OffsetsInitializer for two separate concerns:
1. resolving the initial starting offsets
2. selecting Kafka's auto.offset.reset strategy when an initialized starting
offset is unavailable
These choices are independent, but the public API does not let users compose
them for all startup modes.
One concrete failure mode is broker recovery after a log corruption or
truncation event. A source may have previously checkpointed or otherwise
resolved a concrete numeric offset while Kafka was healthy. After recovery,
Kafka can report that this offset is no longer available on the partition
(which could be due to corruption or retention or anything similar). At that
point, the Kafka consumer applies auto.offset.reset. With the current coupling,
a source that originally started with earliest-offset or latest-offset also
inherits earliest/latest as the fallback for that later out-of-range concrete
offset. The job can therefore silently reset to earliest or latest instead of
failing with an out-of-range error that lets an operator decide what to do.
The desired behavior is to configure these independently. For example, users
may want a fresh source to start from earliest or latest, but use
OffsetResetStrategy.NONE once a concrete initialized offset has been chosen, so
a later unavailable offset fails explicitly instead of silently resetting.
The SQL Kafka connector has the same coupling today:
properties.auto.offset.reset is only applied for group-offsets startup, while
explicit values are overridden by the startup initializer for earliest-offset,
latest-offset, specific-offsets, and timestamp startup modes.
Proposed change:
- Add OffsetsInitializer.withOffsetResetStrategy(OffsetsInitializer,
OffsetResetStrategy).
- The wrapper delegates partition offset initialization and validation, but
overrides getAutoOffsetResetStrategy().
- Apply explicit properties.auto.offset.reset to the chosen Kafka SQL startup
initializer for all startup modes.
- Preserve existing defaults when properties.auto.offset.reset is not
explicitly configured.
- Update docs to clarify that scan.startup.mode chooses the initial position,
while auto.offset.reset controls fallback when an initialized starting offset
is unavailable.
Related:
FLINK-39382 documents current auto.offset.reset support for group-offsets.
Testing:
- KafkaSourceBuilder reset strategy tests
- DynamicKafkaSourceBuilder reset strategy tests
- initializer wrapper unit test
- static Kafka SQL source tests
- dynamic Kafka SQL source tests
--
This message was sent by Atlassian Jira
(v8.20.10#820010)