Sorry for joining this discussion late, but there is already a metric for
the offset lag in our 0.9+ consumers.
Its called the "records-lag-max":
https://kafka.apache.org/documentation/#new_consumer_fetch_monitoring and
its exposed via Flink's metrics system.
The only issue is that it only shows the maximum lag across all partitions,
not detailed per-partition metrics.

On Mon, Mar 20, 2017 at 3:43 PM, Bruno Aranda <brunoara...@gmail.com> wrote:

> Hi,
>
> Thanks! The proposal sounds very good to us too.
>
> Bruno
>
> On Sun, 19 Mar 2017 at 10:57 Florian König <florian.koe...@micardo.com>
> wrote:
>
>> Thanks Gordon for the detailed explanation! That makes sense and explains
>> the expected behaviour.
>>
>> The JIRA for the new metric also sounds very good. Can’t wait to have
>> this in the Flink GUI (KafkaOffsetMonitor has some problems and stops
>> working after 1-2 days, don’t know the reason yet).
>>
>> All the best,
>> Florian
>>
>>
>> > Am 18.03.2017 um 08:38 schrieb Tzu-Li (Gordon) Tai <tzuli...@apache.org
>> >:
>> >
>> > @Florian
>> > the 0.9 / 0.10 version and 0.8 version behave a bit differently right
>> now for the offset committing.
>> >
>> > In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable”
>> etc. settings will be completely ignored and overwritten before used to
>> instantiate the interval Kafka clients, hence committing will only happen
>> on Flink checkpoints.
>> >
>> > In 0.8, this isn’t the case. Both automatic periodic committing and
>> committing on checkpoints can take place. That’s perhaps why you’re
>> observing the 0.8 consumer to be committing more frequently.
>> >
>> > FYI: This behaviour will be unified in Flink 1.3.0. If you’re
>> interested, you can take a look at https://github.com/apache/
>> flink/pull/3527.
>> >
>> > - Gordon
>> >
>> >
>> > On March 17, 2017 at 6:07:38 PM, Florian König (
>> florian.koe...@micardo.com) wrote:
>> >
>> >> Why is that so? The checkpoint contains the Kafka offset and would be
>> able to start reading wherever it left off, regardless of any offset stored
>> in Kafka or Zookeeper. Why is the offset not committed regularly,
>> independently from the checkpointing? Or did I misconfigure anything?
>>
>>
>>

Reply via email to