Hi Grodon, Thanks for your responses. It definitely makes sense.
I could pull this stack from the logs, entire log itself is pretty big - let me know if some samples before/after this may help. TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:552) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:253) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:217) ... 7 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) ... 18 more Caused by: java.lang.Exception: Failed to send data to Kafka: Expiring 7 record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) ... 24 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7 record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append Also for reference here is my ProducerConfig from logs: INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = 1 batch.size = 4096 block.on.buffer.full = false bootstrap.servers =[xxxxxxx:xxx,xxxxxx:xxx] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 500 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.fetch.timeout.ms = 25000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 5 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS timeout.ms = 30000 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer Thanks, Ashish > On Nov 8, 2017, at 5:09 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > > Hi Ashish, > > From your description I do not yet have much of an idea of what may be > happening. > However, some of your observations seems reasonable. I’ll go through them one > by one: > >> I did try to modify request.timeout.ms, linger.ms etc to help with the issue >> if it were caused by a sudden burst of data or something along those lines. >> However, what it caused the app to increase back pressure and made the >> slower and slower until that timeout is reached. > > If the client is experiencing trouble in writing outstanding records to > Kafka, and the timeout is increased, then I think increased back pressure is > indeed the expected behavior. > >> I noticed that consumer fetch-rate drops tremendously while fetch-size grows >> exponentially BEFORE the producer actually start to show higher >> response-time and lower rates. > > > Drops on fetch-rate and growth on fetch-size in the Flink Kafka consumer > should be a natural consequence of backpressure in the job. > The fetch loop in the consumer will be blocked temporarily when backpressure > is propagated from downstream operators, resulting in longer fetch intervals > and larger batches on each fetch (given that events rate are still constant). > Therefore, I think the root cause is still along the lines of the producer > side. > > Would you happen to have any logs that maybe shows any useful information on > the producer side? > I think we might have a better chance of finding out what is going on by > digging there. > Also, which Flink version & Kafka version are you using? > > Cheers, > Gordon > On 5 November 2017 at 11:24:49 PM, Ashish Pokharel (ashish...@yahoo.com > <mailto:ashish...@yahoo.com>) wrote: > >> All, >> >> I am starting to notice a strange behavior in a particular streaming app. I >> initially thought it was a Producer issue as I was seeing timeout exceptions >> (records expiring in queue. I did try to modify request.timeout.ms, >> linger.ms etc to help with the issue if it were caused by a sudden burst of >> data or something along those lines. However, what it caused the app to >> increase back pressure and made the slower and slower until that timeout is >> reached. With lower timeouts, app would actually raise exception and recover >> faster. I can tell it is not related to connectivity as other apps are >> running just fine around the same time frame connected to same brokers (we >> have at least 10 streaming apps connected to same list of brokers) from the >> same data nodes. We have enabled Graphite Reporter in all of our >> applications. After deep diving into some of consumer and producer stats, I >> noticed that consumer fetch-rate drops tremendously while fetch-size grows >> exponentially BEFORE the producer actually start to show higher >> response-time and lower rates. Eventually, I noticed connection resets start >> to occur and connection counts go up momentarily. After which, things get >> back to normal. Data producer rates remain constant around that timeframe - >> we have Logstash producer sending data over. We checked both Logstash and >> Kafka metrics and they seem to be showing same pattern (sort of sin wave) >> throughout. >> >> It seems to point to Kafka issue (perhaps some tuning between Flink App and >> Kafka) but wanted to check with the experts before I start knocking down >> Kafka Admin’s doors. Are there anything else I can look into. There are >> quite a few default stats in Graphite but those were the ones that made most >> sense. >> >> Thanks, Ashish