Thanks for the explanation, this was all super helpful. On Tue, Oct 13, 2020 at 2:16 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote:
> Hey Rex, > > I agree the documentation might be slightly misleading. To get the full > picture of that configuration I'd suggest having a look at the DataStream > Kafka connector page[1]. The Table connector is just a wrapper around the > DataStream one. > > Let me also try to clarify it a bit more. In case of Flink there are two > places where the offsets are committed: > > 1) Flink's checkpoint/savepoint. Those always take the highest priority. > Therefore e.g. when the job is restarted because of a failure, it will use > offsets that were stored in the last successful checkpoint. > > 2) Upon a checkpoint Flink can also write the offsets back to Kafka. This > is enabled by default in DataStream API and is enabled in Table API if you > provide properties.group.id[2]. This works only if you have checkpointing > enabled. If you disable checkpoints, you can still auto commit offsets from > the underlying Kafka consumer via properties.enable.auto.commit / > properties.auto.commit.interval.ms (btw, you can pass any Kafka options > with a properties.* prefix). > > Having explained that, if you set scan.startup-mode and you do not restore > from a checkpoint/savepoint: > > * group-offsets -> it will start consuming from the committed offset in > Kafka for the configured group.id, if there are none it should use > properties.auto.offset.reset option > > * earliest-offset -> it will ignore committed offsets in Kafka and start > from earliest-offsets. > > Hope it helps. > > Best, > > Dawid > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-start-position-configuration > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#properties-group-id > On 13/10/2020 07:43, Rex Fenley wrote: > > Hello, > > I've been trying to configure the offset start position for a flink kafka > consumer. when there is no committed offset, to always start at the > beginning. It seems like the typical way to do this would be setting > auto.offset.reset=earliest however, I don't see that configuration > property in the documentation. > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > However, I do see scan.startup.mode = earliest-offset, but from the docs > it sounds like this would mean it would never commit an offset and flink > would always start consuming from the beginning of the kafka stream, which > is not what I want. > > Is this the case or am I misunderstanding? How can I get the behavior that > I wish to see, where committed offsets are respected, but no offset means > start at the beginning of the kafka log stream? > > Thanks! > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>