Jigar Bhati created FLINK-39888:
-----------------------------------

             Summary: [Kafka] Allow configuring offset reset strategy 
independently of startup offsets
                 Key: FLINK-39888
                 URL: https://issues.apache.org/jira/browse/FLINK-39888
             Project: Flink
          Issue Type: New Feature
          Components: Connectors / Kafka
            Reporter: Jigar Bhati


KafkaSource currently uses OffsetsInitializer for two separate concerns:
1. resolving the initial starting offsets
2. selecting Kafka's auto.offset.reset strategy when an initialized starting 
offset is unavailable

These choices are independent, but the public API does not let users compose 
them for all startup modes.

One concrete failure mode is broker recovery after a log corruption or 
truncation event. A source may have previously checkpointed or otherwise 
resolved a concrete numeric offset while Kafka was healthy. After recovery, 
Kafka can report that this offset is no longer available on the partition 
(which could be due to corruption or retention or anything similar). At that 
point, the Kafka consumer applies auto.offset.reset. With the current coupling, 
a source that originally started with earliest-offset or latest-offset also 
inherits earliest/latest as the fallback for that later out-of-range concrete 
offset. The job can therefore silently reset to earliest or latest instead of 
failing with an out-of-range error that lets an operator decide what to do.

The desired behavior is to configure these independently. For example, users 
may want a fresh source to start from earliest or latest, but use 
OffsetResetStrategy.NONE once a concrete initialized offset has been chosen, so 
a later unavailable offset fails explicitly instead of silently resetting.

The SQL Kafka connector has the same coupling today: 
properties.auto.offset.reset is only applied for group-offsets startup, while 
explicit values are overridden by the startup initializer for earliest-offset, 
latest-offset, specific-offsets, and timestamp startup modes.

Proposed change:
- Add OffsetsInitializer.withOffsetResetStrategy(OffsetsInitializer, 
OffsetResetStrategy).
- The wrapper delegates partition offset initialization and validation, but 
overrides getAutoOffsetResetStrategy().
- Apply explicit properties.auto.offset.reset to the chosen Kafka SQL startup 
initializer for all startup modes.
- Preserve existing defaults when properties.auto.offset.reset is not 
explicitly configured.
- Update docs to clarify that scan.startup.mode chooses the initial position, 
while auto.offset.reset controls fallback when an initialized starting offset 
is unavailable.

Related:
FLINK-39382 documents current auto.offset.reset support for group-offsets.

Testing:
- KafkaSourceBuilder reset strategy tests
- DynamicKafkaSourceBuilder reset strategy tests
- initializer wrapper unit test
- static Kafka SQL source tests
- dynamic Kafka SQL source tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to