Tzu-Li (Gordon) Tai created FLINK-4280:
------------------------------------------
Summary: New Flink-specific option to set starting position of
Kafka consumer without respecting external offsets in ZK / Broker
Key: FLINK-4280
URL: https://issues.apache.org/jira/browse/FLINK-4280
Project: Flink
Issue Type: New Feature
Components: Kafka Connector
Reporter: Tzu-Li (Gordon) Tai
Fix For: 1.2.0
Currently, to start reading from the "earliest" and "latest" position in topics
for the Flink Kafka consumer, users set the Kafka config {{auto.offset.reset}}
in the provided properties configuration.
However, the way this config actually works might be a bit misleading if users
were trying to find a way to "read topics from a starting position". The way
the {{auto.offset.reset}} config works in the Flink Kafka consumer resembles
Kafka's original intent for the setting: first, existing external offsets
committed to the ZK / brokers will be checked; if none exists, then will
{{auto.offset.reset}} be respected.
I propose to add Flink-specific ways to define the starting position, without
taking into account the external offsets. The original behaviour (reference
external offsets first) can be changed to be a user option, so that the
behaviour can be retained for frequent Kafka users that may need some
collaboration with existing non-Flink Kafka consumer applications.
How users will interact with the Flink Kafka consumer after this is added:
{code}
Properties props = new Properties();
props.setProperty("flink.starting-position", "earliest/latest");
props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a
warning)
props.setProperty("group.id", "...") // this won't have effect on the starting
position anymore (may still be used in external offset committing)
...
{code}
Or, reference external offsets in ZK / broker:
{code}
Properties props = new Properties();
props.setProperty("flink.starting-position", "external-offsets");
props.setProperty("auto.offset.reset", "earliest/latest"); // default will be
latest
props.setProperty("group.id", "..."); // will be used to lookup external
offsets in ZK / broker
...
{code}
A thing we would need to decide on is what would the default value be for
{{flink.starting-position}}.
Two merits I see in adding this:
1. This compensates the way users generally interpret "read from a starting
position". As the Flink Kafka connector is somewhat essentially a "high-level"
Kafka consumer for Flink users, I think it is reasonable to add Flink-specific
functionality that users will find useful, although it wasn't supported in
Kafka's original consumer designs.
2. By adding this, the definition that "the Kafka offset store (ZK / brokers)
is used only to expose progress to the outside world, and not used to
manipulate how Kafka topics are read in Flink (unless users opt to do so)" is
even more definite. There was some discussion in this PR
(https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I think
adding this "decouples" more Flink's internal offset checkpointing from the
external Kafka's offset store.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)