Hi Nagireddy,

I'm not sure how you monitoring kafka lag. AFAIK, you can check the
metadata of the topic in your Kafka cluster to see the actual lag by
following command.

./kafka-consumer-groups.sh --bootstrap-server 192.168.0.107:39092
--group <consumer-group-name> --describe


This tool is provided with Kafka distribution. If there are any gap between
Kafka Connector lag and this tool, u can open a jira to report this
issue[1].

Hope this helps u!

[1]
https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=582&projectKey=FLINK

Regards,
Xiangyu




Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月21日周一 18:45写道:

> Thanks Xiangyu,
>
> I have one issue, while running flink with kafka connector. its a working
> fine for couple of days.
>
> But suddenly  kafka lag went to "Negative value"
>
> I am trying to find the root cause for that. Any suggestions?
>
>
> On Sat, Aug 5, 2023 at 5:57 PM xiangyu feng <xiangyu...@gmail.com> wrote:
>
>> Hi Nagireddy,
>>
>> I'm not particularly familiar with StreamingFileSink but I checked with
>> the implementation of HadoopFsCommitter. AFAIK, when committing files to
>> HDFS the committer will check if the temp file exist in the first place.
>> [image: image.png]
>>
>> In your case, could u check why the committing temp file not exist on
>> HDFS? Were these files deleted by mistake? I searched some information,
>> this error may be due to the small file merge will merge the file that is
>> being written. You can disable small file merge when writing files.
>>
>> Hope this helps.
>>
>> Regards,
>> Xiangyu
>>
>>
>> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月5日周六 18:22写道:
>>
>>> Hi Xiangyu/Dev,
>>>
>>> Did any one has solution handle below important note in
>>> StreamingFileSink:
>>>
>>> Caused by: java.io.IOException: Cannot clean commit: Staging file does
>>> not exist.
>>>     at org.apache.flink.runtime.fs.hdfs.
>>> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit(
>>> HadoopRecoverableFsDataOutputStream.java:250)
>>>
>>> Important Note 3: Flink and the StreamingFileSink never overwrites
>>> committed data. Given this, when trying to restore from an old
>>> checkpoint/savepoint which assumes an in-progress file which was committed
>>> by subsequent successful checkpoints, *Flink will refuse to resume and
>>> it will throw an exception as it cannot locate the in-progress file*.
>>>
>>> Currently i am facing same issue in the PROD code.
>>>
>>>
>>>
>>> Regards,
>>> Nagireddy Y.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Aug 4, 2023 at 12:11 PM xiangyu feng <xiangyu...@gmail.com>
>>> wrote:
>>>
>>>> Hi ynagireddy4u,
>>>>
>>>> From the exception info, I think your application has met a HDFS file
>>>> issue during the commit phase of checkpoint. Can u check why 'Staging file
>>>> does not exist' in the first place?
>>>>
>>>> Regards,
>>>> Xiangyu
>>>>
>>>> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月4日周五
>>>> 12:21写道:
>>>>
>>>>> Hi Xiangyu/Dev Team,
>>>>>
>>>>> Thanks for reply.
>>>>>
>>>>> In  our flink job, we increase the *checkpoint timeout to 30 min.*
>>>>> And the *checkpoint interval is 10 min.*
>>>>>
>>>>> But while running the job we got below exception.
>>>>>
>>>>> java.lang.RuntimeException: Error while confirming checkpoint
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .notifyCheckpointComplete(StreamTask.java:952)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924)
>>>>>     at org.apache.flink.util.function.FunctionUtils
>>>>> .lambda$asCallable$5(FunctionUtils.java:125)
>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(
>>>>> StreamTaskActionExecutor.java:87)
>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
>>>>> .java:78)
>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>> MailboxProcessor.processMail(MailboxProcessor.java:261)
>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .runMailboxLoop(StreamTask.java:487)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>> StreamTask.java:470)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.io.IOException: Cannot clean commit: Staging file
>>>>> does not exist.
>>>>>     at org.apache.flink.runtime.fs.hdfs.
>>>>> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit(
>>>>> HadoopRecoverableFsDataOutputStream.java:250)
>>>>>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
>>>>> .onSuccessfulCompletionOfCheckpoint(Bucket.java:300)
>>>>>     at org.apache.flink.streaming.api.functions.sink.filesystem.
>>>>> Buckets.commitUpToCheckpoint(Buckets.java:216)
>>>>>     at org.apache.flink.streaming.api.functions.sink.filesystem.
>>>>> StreamingFileSink.notifyCheckpointComplete(StreamingFileSink.java:415)
>>>>>     at org.apache.flink.streaming.api.operators.
>>>>> AbstractUdfStreamOperator.notifyCheckpointComplete(
>>>>> AbstractUdfStreamOperator.java:130)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .lambda$notifyCheckpointComplete$8(StreamTask.java:936)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(
>>>>> StreamTaskActionExecutor.java:101)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .notifyCheckpointComplete(StreamTask.java:930)
>>>>>     ... 12 more
>>>>>
>>>>> It would be great, if you have any workaround for that.
>>>>>
>>>>> Regards,
>>>>> Nagireddy Y.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 3, 2023 at 7:24 AM xiangyu feng <xiangyu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi ynagireddy4u,
>>>>>>
>>>>>> We have met this exception before. Usually it is caused by following
>>>>>> reasons:
>>>>>>
>>>>>> 1), TaskManager is too busy with other works to send the heartbeat to
>>>>>> JobMaster or TaskManager process might already exited;
>>>>>> 2), There might be a network issues between this TaskManager and
>>>>>> JobMaster;
>>>>>> 3), In certain cases, JobMaster actor might also being too busy to
>>>>>> process the RPC requests from TaskManager;
>>>>>>
>>>>>> Pls check if your problem fits the above situations.
>>>>>>
>>>>>> Best,
>>>>>> Xiangyu
>>>>>>
>>>>>>
>>>>>> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年7月31日周一
>>>>>> 20:49写道:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> Did any one face the below exception.
>>>>>>> If yes, please share the resolution.
>>>>>>>
>>>>>>>
>>>>>>> 2023-07-28 22:04:16
>>>>>>> j*ava.util.concurrent.TimeoutException: Heartbeat of TaskManager
>>>>>>> with id
>>>>>>> container_e19_1690528962823_0382_01_000005 timed out.*
>>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>>>
>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster
>>>>>>> .java:1147)
>>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>>>>> HeartbeatMonitorImpl.java:109)
>>>>>>>     at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
>>>>>>> 511)
>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>>>>>>> AkkaRpcActor.java:397)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
>>>>>>> AkkaRpcActor.java:190)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>>>>>> AkkaRpcActor.java:152)
>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>>>     at
>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>>>     at akka.japi.pf
>>>>>>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>>>>     at
>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>>>>>     at
>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>>     at
>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>>     at
>>>>>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>     at
>>>>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>     at
>>>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
>>>>>>> .java:1339)
>>>>>>>     at
>>>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>     at
>>>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
>>>>>>> .java:107)
>>>>>>>
>>>>>>> Any suggestions, please share with me.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Nagireddy Y
>>>>>>>
>>>>>>

Reply via email to