Hi Gordon, Any further thoughts on this?
I forgot to mention I am using Flink 1.3.2 and our Kafka is 0.10. We are in the process of upgrading Kafka but won’t be in Prod for at least couple of months. Thanks, Ashish > On Nov 8, 2017, at 9:35 PM, Ashish Pokharel <ashish...@yahoo.com> wrote: > > Hi Grodon, > > Thanks for your responses. It definitely makes sense. > > I could pull this stack from the logs, entire log itself is pretty big - let > me know if some samples before/after this may help. > > TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:552) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:253) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:217) > ... 7 more > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > ... 18 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Expiring 7 > record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > ... 24 more > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7 > record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append > > Also for reference here is my ProducerConfig from logs: > > INFO org.apache.kafka.clients.producer.ProducerConfig - > ProducerConfig values: > acks = 1 > batch.size = 4096 > block.on.buffer.full = false > bootstrap.servers =[xxxxxxx:xxx,xxxxxx:xxx] > buffer.memory = 33554432 > client.id = > compression.type = none > connections.max.idle.ms = 540000 > interceptor.classes = null > key.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > linger.ms = 500 > max.block.ms = 60000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.fetch.timeout.ms = 25000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 5 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > timeout.ms = 30000 > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > > Thanks, Ashish > >> On Nov 8, 2017, at 5:09 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org >> <mailto:tzuli...@apache.org>> wrote: >> >> Hi Ashish, >> >> From your description I do not yet have much of an idea of what may be >> happening. >> However, some of your observations seems reasonable. I’ll go through them >> one by one: >> >>> I did try to modify request.timeout.ms, linger.ms etc to help with the >>> issue if it were caused by a sudden burst of data or something along those >>> lines. However, what it caused the app to increase back pressure and made >>> the slower and slower until that timeout is reached. >> >> If the client is experiencing trouble in writing outstanding records to >> Kafka, and the timeout is increased, then I think increased back pressure is >> indeed the expected behavior. >> >>> I noticed that consumer fetch-rate drops tremendously while fetch-size >>> grows exponentially BEFORE the producer actually start to show higher >>> response-time and lower rates. >> >> >> Drops on fetch-rate and growth on fetch-size in the Flink Kafka consumer >> should be a natural consequence of backpressure in the job. >> The fetch loop in the consumer will be blocked temporarily when backpressure >> is propagated from downstream operators, resulting in longer fetch intervals >> and larger batches on each fetch (given that events rate are still constant). >> Therefore, I think the root cause is still along the lines of the producer >> side. >> >> Would you happen to have any logs that maybe shows any useful information on >> the producer side? >> I think we might have a better chance of finding out what is going on by >> digging there. >> Also, which Flink version & Kafka version are you using? >> >> Cheers, >> Gordon >> On 5 November 2017 at 11:24:49 PM, Ashish Pokharel (ashish...@yahoo.com >> <mailto:ashish...@yahoo.com>) wrote: >> >>> All, >>> >>> I am starting to notice a strange behavior in a particular streaming app. I >>> initially thought it was a Producer issue as I was seeing timeout >>> exceptions (records expiring in queue. I did try to modify >>> request.timeout.ms, linger.ms etc to help with the issue if it were caused >>> by a sudden burst of data or something along those lines. However, what it >>> caused the app to increase back pressure and made the slower and slower >>> until that timeout is reached. With lower timeouts, app would actually >>> raise exception and recover faster. I can tell it is not related to >>> connectivity as other apps are running just fine around the same time frame >>> connected to same brokers (we have at least 10 streaming apps connected to >>> same list of brokers) from the same data nodes. We have enabled Graphite >>> Reporter in all of our applications. After deep diving into some of >>> consumer and producer stats, I noticed that consumer fetch-rate drops >>> tremendously while fetch-size grows exponentially BEFORE the producer >>> actually start to show higher response-time and lower rates. Eventually, I >>> noticed connection resets start to occur and connection counts go up >>> momentarily. After which, things get back to normal. Data producer rates >>> remain constant around that timeframe - we have Logstash producer sending >>> data over. We checked both Logstash and Kafka metrics and they seem to be >>> showing same pattern (sort of sin wave) throughout. >>> >>> It seems to point to Kafka issue (perhaps some tuning between Flink App and >>> Kafka) but wanted to check with the experts before I start knocking down >>> Kafka Admin’s doors. Are there anything else I can look into. There are >>> quite a few default stats in Graphite but those were the ones that made >>> most sense. >>> >>> Thanks, Ashish >