Thanks for the explanation, this was all super helpful.

On Tue, Oct 13, 2020 at 2:16 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hey Rex,
>
> I agree the documentation might be slightly misleading. To get the full
> picture of that configuration I'd suggest having a look at the DataStream
> Kafka connector page[1]. The Table connector is just a wrapper around the
> DataStream one.
>
> Let me also try to clarify it a bit more. In case of Flink there are two
> places where the offsets are committed:
>
> 1) Flink's checkpoint/savepoint. Those always take the highest priority.
> Therefore e.g. when the job is restarted because of a failure, it will use
> offsets that were stored in the last successful checkpoint.
>
> 2) Upon a checkpoint Flink can also write the offsets back to Kafka. This
> is enabled by default in DataStream API and is enabled in Table API if you
> provide properties.group.id[2]. This works only if you have checkpointing
> enabled. If you disable checkpoints, you can still auto commit offsets from
> the underlying Kafka consumer via properties.enable.auto.commit /
> properties.auto.commit.interval.ms (btw, you can pass any Kafka options
> with a properties.* prefix).
>
> Having explained that, if you set scan.startup-mode and you do not restore
> from a checkpoint/savepoint:
>
> * group-offsets -> it will start consuming from the committed offset in
> Kafka for the configured group.id, if there are none it should use
> properties.auto.offset.reset option
>
> * earliest-offset -> it will ignore committed offsets in Kafka and start
> from earliest-offsets.
>
> Hope it helps.
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#properties-group-id
> On 13/10/2020 07:43, Rex Fenley wrote:
>
> Hello,
>
> I've been trying to configure the offset start position for a flink kafka
> consumer. when there is no committed offset, to always start at the
> beginning. It seems like the typical way to do this would be setting
> auto.offset.reset=earliest however, I don't see that configuration
> property in the documentation.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> However, I do see scan.startup.mode = earliest-offset, but from the docs
> it sounds like this would mean it would never commit an offset and flink
> would always start consuming from the beginning of the kafka stream, which
> is not what I want.
>
> Is this the case or am I misunderstanding? How can I get the behavior that
> I wish to see, where committed offsets are respected, but no offset means
> start at the beginning of the kafka log stream?
>
> Thanks!
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to