Hi Nagireddy, I'm not sure how you monitoring kafka lag. AFAIK, you can check the metadata of the topic in your Kafka cluster to see the actual lag by following command.
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.107:39092 --group <consumer-group-name> --describe This tool is provided with Kafka distribution. If there are any gap between Kafka Connector lag and this tool, u can open a jira to report this issue[1]. Hope this helps u! [1] https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=582&projectKey=FLINK Regards, Xiangyu Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月21日周一 18:45写道: > Thanks Xiangyu, > > I have one issue, while running flink with kafka connector. its a working > fine for couple of days. > > But suddenly kafka lag went to "Negative value" > > I am trying to find the root cause for that. Any suggestions? > > > On Sat, Aug 5, 2023 at 5:57 PM xiangyu feng <xiangyu...@gmail.com> wrote: > >> Hi Nagireddy, >> >> I'm not particularly familiar with StreamingFileSink but I checked with >> the implementation of HadoopFsCommitter. AFAIK, when committing files to >> HDFS the committer will check if the temp file exist in the first place. >> [image: image.png] >> >> In your case, could u check why the committing temp file not exist on >> HDFS? Were these files deleted by mistake? I searched some information, >> this error may be due to the small file merge will merge the file that is >> being written. You can disable small file merge when writing files. >> >> Hope this helps. >> >> Regards, >> Xiangyu >> >> >> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月5日周六 18:22写道: >> >>> Hi Xiangyu/Dev, >>> >>> Did any one has solution handle below important note in >>> StreamingFileSink: >>> >>> Caused by: java.io.IOException: Cannot clean commit: Staging file does >>> not exist. >>> at org.apache.flink.runtime.fs.hdfs. >>> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit( >>> HadoopRecoverableFsDataOutputStream.java:250) >>> >>> Important Note 3: Flink and the StreamingFileSink never overwrites >>> committed data. Given this, when trying to restore from an old >>> checkpoint/savepoint which assumes an in-progress file which was committed >>> by subsequent successful checkpoints, *Flink will refuse to resume and >>> it will throw an exception as it cannot locate the in-progress file*. >>> >>> Currently i am facing same issue in the PROD code. >>> >>> >>> >>> Regards, >>> Nagireddy Y. >>> >>> >>> >>> >>> >>> >>> On Fri, Aug 4, 2023 at 12:11 PM xiangyu feng <xiangyu...@gmail.com> >>> wrote: >>> >>>> Hi ynagireddy4u, >>>> >>>> From the exception info, I think your application has met a HDFS file >>>> issue during the commit phase of checkpoint. Can u check why 'Staging file >>>> does not exist' in the first place? >>>> >>>> Regards, >>>> Xiangyu >>>> >>>> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月4日周五 >>>> 12:21写道: >>>> >>>>> Hi Xiangyu/Dev Team, >>>>> >>>>> Thanks for reply. >>>>> >>>>> In our flink job, we increase the *checkpoint timeout to 30 min.* >>>>> And the *checkpoint interval is 10 min.* >>>>> >>>>> But while running the job we got below exception. >>>>> >>>>> java.lang.RuntimeException: Error while confirming checkpoint >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .notifyCheckpointComplete(StreamTask.java:952) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924) >>>>> at org.apache.flink.util.function.FunctionUtils >>>>> .lambda$asCallable$5(FunctionUtils.java:125) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at org.apache.flink.streaming.runtime.tasks. >>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run( >>>>> StreamTaskActionExecutor.java:87) >>>>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail >>>>> .java:78) >>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>> MailboxProcessor.processMail(MailboxProcessor.java:261) >>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .runMailboxLoop(StreamTask.java:487) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>> StreamTask.java:470) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: java.io.IOException: Cannot clean commit: Staging file >>>>> does not exist. >>>>> at org.apache.flink.runtime.fs.hdfs. >>>>> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit( >>>>> HadoopRecoverableFsDataOutputStream.java:250) >>>>> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket >>>>> .onSuccessfulCompletionOfCheckpoint(Bucket.java:300) >>>>> at org.apache.flink.streaming.api.functions.sink.filesystem. >>>>> Buckets.commitUpToCheckpoint(Buckets.java:216) >>>>> at org.apache.flink.streaming.api.functions.sink.filesystem. >>>>> StreamingFileSink.notifyCheckpointComplete(StreamingFileSink.java:415) >>>>> at org.apache.flink.streaming.api.operators. >>>>> AbstractUdfStreamOperator.notifyCheckpointComplete( >>>>> AbstractUdfStreamOperator.java:130) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .lambda$notifyCheckpointComplete$8(StreamTask.java:936) >>>>> at org.apache.flink.streaming.runtime.tasks. >>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call( >>>>> StreamTaskActionExecutor.java:101) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .notifyCheckpointComplete(StreamTask.java:930) >>>>> ... 12 more >>>>> >>>>> It would be great, if you have any workaround for that. >>>>> >>>>> Regards, >>>>> Nagireddy Y. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Aug 3, 2023 at 7:24 AM xiangyu feng <xiangyu...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi ynagireddy4u, >>>>>> >>>>>> We have met this exception before. Usually it is caused by following >>>>>> reasons: >>>>>> >>>>>> 1), TaskManager is too busy with other works to send the heartbeat to >>>>>> JobMaster or TaskManager process might already exited; >>>>>> 2), There might be a network issues between this TaskManager and >>>>>> JobMaster; >>>>>> 3), In certain cases, JobMaster actor might also being too busy to >>>>>> process the RPC requests from TaskManager; >>>>>> >>>>>> Pls check if your problem fits the above situations. >>>>>> >>>>>> Best, >>>>>> Xiangyu >>>>>> >>>>>> >>>>>> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年7月31日周一 >>>>>> 20:49写道: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> Did any one face the below exception. >>>>>>> If yes, please share the resolution. >>>>>>> >>>>>>> >>>>>>> 2023-07-28 22:04:16 >>>>>>> j*ava.util.concurrent.TimeoutException: Heartbeat of TaskManager >>>>>>> with id >>>>>>> container_e19_1690528962823_0382_01_000005 timed out.* >>>>>>> at org.apache.flink.runtime.jobmaster. >>>>>>> >>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster >>>>>>> .java:1147) >>>>>>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run( >>>>>>> HeartbeatMonitorImpl.java:109) >>>>>>> at >>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: >>>>>>> 511) >>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync( >>>>>>> AkkaRpcActor.java:397) >>>>>>> at >>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( >>>>>>> AkkaRpcActor.java:190) >>>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor >>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74) >>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( >>>>>>> AkkaRpcActor.java:152) >>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>>>>> at >>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>>>>> at akka.japi.pf >>>>>>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>>>>>> at >>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>>>>> at >>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>>>>> at >>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>>>>> at >>>>>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>>>>> at >>>>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>> at >>>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool >>>>>>> .java:1339) >>>>>>> at >>>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>> at >>>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread >>>>>>> .java:107) >>>>>>> >>>>>>> Any suggestions, please share with me. >>>>>>> >>>>>>> Regards, >>>>>>> Nagireddy Y >>>>>>> >>>>>>