Hi guys, I have another question related to the KPL problem. I wonder what the consequences of overwhelming KPL internal queue (kinesis) can be.
From my observation in experimenting with 1.4.2 (which does not have backpressure support yet in the open pr stated below), when the flink cluster is processing too fast and the throughput on the sink kinesis stream is limited, i.e, the throughput exceeding exception starts to be thrown, we quite often get the following exception (pasted in the end) very soon and all the subtasks switching status to cancelling and restarted. From the exception trace, I can see that yarn got shutdown and all task managers are terminated. I suspect it is because of the memory issue. Whenever the throughput exceeding exception is thrown, it implicitly means that the internal unbounded queue in KPL may grow rapidly. we set the recordTtl = 60s and we can still see the record expiration exception along with exceeded throughput exception. Which leads me to wonder that if the internal unbounded queue grows too large and exhaust all the memory in the node and eventually crashing the yarn and the job manager. Well, This is just my hypothesis. I wonder if someone has already encountered or investigated similar issues and could shed some light on it. java.lang.Exception: TaskManager was lost/killed: container_1529095945616_0009_01_000004 @ ip-172-31-64-249.ec2.internal (dataPort=44591) at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:426) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) From: "Liu, Gavin (CAI - Atlanta)" <gavin....@coxautoinc.com> Date: Wednesday, June 20, 2018 at 12:11 PM To: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2 Thanks, Gordon. You are quick and It is very helpful to me. I tried some other alternatives to resolve this, finally thought about rewriting the FlinkKinesisProducer class for our need. Glad that I asked before I started. Really appreciate the quick response. From: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> Date: Wednesday, June 20, 2018 at 12:05 PM To: "Liu, Gavin (CAI - Atlanta)" <gavin....@coxautoinc.com>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2 Hi Gavin, The problem is that the Kinesis producer currently does not propagate backpressure properly. Records are added to the internally used KPL client’s queue, without any queue size limit. This is considered a bug, and already has a pull request for it [1], which we should probably push towards being merged soon. What the pull request essentially does, is adding an upper bound to the number pending records in the KPL producer queue. Once the upper bound is hit, input to the Kinesis producer sink is blocked, and therefore propagating backpressure further upstream. Cheers, Gordon [1] https://github.com/apache/flink/pull/6021 On 20 June 2018 at 6:00:30 PM, Liu, Gavin (CAI - Atlanta) (gavin....@coxautoinc.com<mailto:gavin....@coxautoinc.com>) wrote: Hi guys, I am new to flink framework. And we are building an application that takes kinesis stream for both flink source and sink. The flink version we are using is 1.4.2, which is also the version for the flink-connector-kinesis. We built the flink-connector-kinesis jar explicitly with KPL version 0.12.6 due to the existing problems with default 0.12.5. I get a rough idea how the backpressure works with flink through reading http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3cf8dd76c0-9de0-412a-8c24-b72af0d42...@data-artisans.com%3E From my experiment with flink and flink-connector-kinesis, the back pressure only happens within flink processing operations, i.e., not in the flink producer to kinesis. More specifically, when the throughput from KPL exceeds the kinesis throughput limitations, flink does not slow down at all, i.e., it does not add pressure on the processing chain up to the flink consumer. Correct me if I misunderstood this. It looks like the flink producer (in the flink-connector-kinesis) is a standalone component, once a record is collected and sent to the producer, flink core finishes all the processing and does not care the fate of the record any more, it is the responsibility of the connector to continue the job. I am expecting back pressure to happen from the source kinesis stream to the sink kinesis stream, whenever the sink kinesis stream could not handle the volume, it adds back pressure. Could someone illustrate a bit more why flink connector is designed in such a way. Also correct me if I stated anything wrong. Gavin Liu