This is somewhat implied in https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#consumer-offset-committing.

/> Note that Kafka source does //*NOT*//rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of consumer and consuming group for monitoring./
/
/
and https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version/
/

/> Set |setStartFromGroupOffsets(true)| on the consumer so that we get read offsets from Kafka. This will only take effect when there is no read offset in Flink state, which is why the next step is very important./
/
/
/
/
Dynamic partition discovery shouldn't have an effect because you are not creating partitions/topics.
//

On 21/07/2022 12:14, Alexis Sarda-Espinosa wrote:
I would suggest updating the documentation to include that statement.

I imagine dynamic partition discovery has no effect on this?

Regards,
Alexis.

Am Do., 21. Juli 2022 um 10:03 Uhr schrieb Chesnay Schepler <ches...@apache.org>:

    Flink only reads the offsets from Kafka when the job is initially
    started from a clear slate.
    Once checkpoints are involved it only relies on offsets stored in
    the state.

    On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote:
    Hello again,

    I just performed a test
    using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST).
    I did a few tests in the following order, and I noticed a few
    weird things. Note that our job uses Processing Time windows, so
    watermarks are irrelevant.

    1. After the job had been running for a while, we manually moved
    the consumer group's offset to 12 hours in the past [1] (without
    restarting the job).
      - After this, the consumer simply stopped reading messages -
    the consumer lag in Kafka stayed at around 150k (no new data arrived)

    2. We restarted the job with a checkpoint.
      - The consumer lag in Kafka dropped down to 0, but no data was
    emitted from the windows.

    3. We stopped the job, moved the offset again, and restarted
    Without any checkpoint/savepoint.
      - This time the consumer correctly processed the backlog and
    emitted events from the windows.

    This was done with Flink 1.15.0.

    Is this expected? In other words, if there's a mismatch between
    Flink's state's offset and Kafka's offset, will the job be unable
    to run?



    [1] The command to move the offset was:

    kafka-consumer-groups.sh \
      --bootstrap-server ... \
      --topic our-topic \
      --group our-group \
      --command-config kafka-preprod.properties \
      --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \
      --execute

    Regards,
    Alexis.

    Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa
    <sarda.espin...@gmail.com>:

        Hi Yaroslav,

        The test I did was just using earliest, I'll test with
        committed offset again, thanks.

        Regards,
        Alexis.

        On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko,
        <yaros...@goldsky.io> wrote:

            Hi Alexis,

            Do you use OffsetsInitializer.committedOffsets() to
            specify your Kafka consumer offsets? In this case, it
            should get the offsets from Kafka and not the state.

            On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa
            <sarda.espin...@gmail.com> wrote:

                Hello,

                Regarding the new Kafka source (configure with a
                consumer group), I found out that if I manually
                change the group's offset with Kafka's admin API
                independently of Flink (while the job is running),
                the Flink source will ignore that and reset it to
                whatever it stored internally. Is there any way to
                prevent this?

                Regards,
                Alexis.


Reply via email to