Hi Oscar, Thanks for raising this problem! Currently metrics of KafkaConsumer are not registered in Flink as in FlinkKafkaConsumer. A ticket has been created on JIRA, and hopefully we can fix it in next release.
https://issues.apache.org/jira/browse/FLINK-22766 -- Best Regards, Qingsheng Ren Email: renqs...@gmail.com On May 25, 2021, 2:35 PM +0800, 陳樺威 <oscar8492...@gmail.com>, wrote: > Hi Ardhani, > > Thanks for your kindly reply. > > Our team use your provided metrics before, but the metrics disappear after > migrate to new KafkaSource. > > We initialize KafkaSource in following code. > ``` val consumer: KafkaSource[T] = KafkaSource.builder() .setProperties(properties) .setTopics(topic) .setValueOnlyDeserializer(deserializer) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .build() env .fromSource(consumer, WatermarkStrategy.noWatermarks(), uid) .setParallelism(math.min(parallelism, env.getParallelism)) .setMaxParallelism(parallelism) .name(uid).uid(uid) .rebalance > ``` > > Oscar > Ardhani Narasimha <ardhani.narasi...@razorpay.com> 於 2021年5月25日 週二 上午12:08寫道: > Use below respectively > > flink_taskmanager_job_task_operator_KafkaConsumer_bytes_consumed_rate - > Consumer rate > flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max - Consumer > lag > flink_taskmanager_job_task_operator_KafkaConsumer_commit_latency_max - commit > latency > > unsure if reactive mode makes any difference. > > On Mon, May 24, 2021 at 7:44 PM 陳樺威 <oscar8492...@gmail.com> wrote: > > > Hello, > > > > > > Our team tries to test reactive mode and replace FlinkKafkaConsumer with > > > the new KafkaSource. > > > But we can’t find the KafkaSource metrics list. Does anyone have any > > > idea? In our case, we want to know the Kafka consume delay and consume > > > rate. > > > > > > Thanks, > > > Oscar > > ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- > IMPORTANT: The contents of this email and any attachments are confidential > and protected by applicable laws. If you have received this email by mistake, > please (i) notify the sender immediately; (ii) delete it from your database; > and (iii) do not disclose the contents to anyone or make copies thereof. > Razorpay accepts no liability caused due to any inadvertent/ unintentional > data transmitted through this email. > -----------------------------------------------------------------------------------------------------------------------------------------------------------------------