Hey Rex,

I agree the documentation might be slightly misleading. To get the full
picture of that configuration I'd suggest having a look at the
DataStream Kafka connector page[1]. The Table connector is just a
wrapper around the DataStream one.

Let me also try to clarify it a bit more. In case of Flink there are two
places where the offsets are committed:

1) Flink's checkpoint/savepoint. Those always take the highest priority.
Therefore e.g. when the job is restarted because of a failure, it will
use offsets that were stored in the last successful checkpoint.

2) Upon a checkpoint Flink can also write the offsets back to Kafka.
This is enabled by default in DataStream API and is enabled in Table API
if you provide properties.group.id[2]. This works only if you have
checkpointing enabled. If you disable checkpoints, you can still auto
commit offsets from the underlying Kafka consumer via
properties.enable.auto.commit / properties.auto.commit.interval.ms (btw,
you can pass any Kafka options with a properties.* prefix).

Having explained that, if you set scan.startup-mode and you do not
restore from a checkpoint/savepoint:

* group-offsets -> it will start consuming from the committed offset in
Kafka for the configured group.id, if there are none it should use
properties.auto.offset.reset option

* earliest-offset -> it will ignore committed offsets in Kafka and start
from earliest-offsets.

Hope it helps.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#properties-group-id

On 13/10/2020 07:43, Rex Fenley wrote:
> Hello,
>
> I've been trying to configure the offset start position for a flink
> kafka consumer. when there is no committed offset, to always start at
> the beginning. It seems like the typical way to do this would be
> setting |auto.offset.reset=earliest| however, I don't see that
> configuration property in the documentation.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> However, I do see |scan.startup.mode = earliest-offset|, but from the
> docs it sounds like this would mean it would never commit an offset
> and flink would always start consuming from the beginning of the kafka
> stream, which is not what I want.
>
> Is this the case or am I misunderstanding? How can I get the behavior
> that I wish to see, where committed offsets are respected, but no
> offset means start at the beginning of the kafka log stream?
>
> Thanks!
> -- 
>
> Rex Fenley | Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>|  BLOG
> <http://blog.remind.com/> |  FOLLOW US
> <https://twitter.com/remindhq> |  LIKE US
> <https://www.facebook.com/remindhq>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to