Hi Gordon,

Any further thoughts on this?

I forgot to mention I am using Flink 1.3.2 and our Kafka is 0.10. We are in the 
process of upgrading Kafka but won’t be in Prod for at least couple of months.

Thanks, Ashish

> On Nov 8, 2017, at 9:35 PM, Ashish Pokharel <ashish...@yahoo.com> wrote:
> 
> Hi Grodon,
> 
> Thanks for your responses. It definitely makes sense. 
> 
> I could pull this stack from the logs, entire log itself is pretty big - let 
> me know if some samples before/after this may help.
>  
> TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>  Could not forward element to next operator}
>                 at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
>                 at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>                 at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>                 at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>                 at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>                 at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>                 at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>                 at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
>                 at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>                 at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>                 at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>                 at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>                 at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>                 at 
> org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35)
>                 at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44)
>                 at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597)
>                 at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:552)
>                 at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:253)
>                 at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:217)
>                 ... 7 more
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>                 at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
>                 at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>                 at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>                 at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>                 at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>                 at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>                 at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>                 ... 18 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: Expiring 7 
> record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append
>                 at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>                 at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
>                 at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
>                 at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>                 ... 24 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7 
> record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append
> 
> Also for reference here is my ProducerConfig from logs:
> 
> INFO  org.apache.kafka.clients.producer.ProducerConfig              - 
> ProducerConfig values:
>                 acks = 1
>                 batch.size = 4096
>                 block.on.buffer.full = false
>                 bootstrap.servers =[xxxxxxx:xxx,xxxxxx:xxx]
>                 buffer.memory = 33554432
>                 client.id = 
>                 compression.type = none
>                 connections.max.idle.ms = 540000
>                 interceptor.classes = null
>                 key.serializer = class 
> org.apache.kafka.common.serialization.ByteArraySerializer
>                 linger.ms = 500
>                 max.block.ms = 60000
>                 max.in.flight.requests.per.connection = 5
>                 max.request.size = 1048576
>                 metadata.fetch.timeout.ms = 25000
>                 metadata.max.age.ms = 300000
>                 metric.reporters = []
>                 metrics.num.samples = 2
>                 metrics.sample.window.ms = 30000
>                 partitioner.class = class 
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>                 receive.buffer.bytes = 32768
>                 reconnect.backoff.ms = 50
>                 request.timeout.ms = 30000
>                 retries = 5
>                 retry.backoff.ms = 100
>                 sasl.jaas.config = null
>                 sasl.kerberos.kinit.cmd = /usr/bin/kinit
>                 sasl.kerberos.min.time.before.relogin = 60000
>                 sasl.kerberos.service.name = null
>                 sasl.kerberos.ticket.renew.jitter = 0.05
>                 sasl.kerberos.ticket.renew.window.factor = 0.8
>                 sasl.mechanism = GSSAPI
>                 security.protocol = PLAINTEXT
>                 send.buffer.bytes = 131072
>                 ssl.cipher.suites = null
>                 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>                 ssl.endpoint.identification.algorithm = null
>                 ssl.key.password = null
>                 ssl.keymanager.algorithm = SunX509
>                 ssl.keystore.location = null
>                 ssl.keystore.password = null
>                 ssl.keystore.type = JKS
>                 ssl.protocol = TLS
>                 ssl.provider = null
>                 ssl.secure.random.implementation = null
>                 ssl.trustmanager.algorithm = PKIX
>                 ssl.truststore.location = null
>                 ssl.truststore.password = null
>                 ssl.truststore.type = JKS
>                 timeout.ms = 30000
>                 value.serializer = class 
> org.apache.kafka.common.serialization.ByteArraySerializer
> 
> Thanks, Ashish
> 
>> On Nov 8, 2017, at 5:09 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org 
>> <mailto:tzuli...@apache.org>> wrote:
>> 
>> Hi Ashish,
>> 
>> From your description I do not yet have much of an idea of what may be 
>> happening.
>> However, some of your observations seems reasonable. I’ll go through them 
>> one by one:
>> 
>>> I did try to modify request.timeout.ms, linger.ms etc to help with the 
>>> issue if it were caused by a sudden burst of data or something along those 
>>> lines. However, what it caused the app to increase back pressure and made 
>>> the slower and slower until that timeout is reached.
>> 
>> If the client is experiencing trouble in writing outstanding records to 
>> Kafka, and the timeout is increased, then I think increased back pressure is 
>> indeed the expected behavior.
>> 
>>> I noticed that consumer fetch-rate drops tremendously while fetch-size 
>>> grows exponentially BEFORE the producer actually start to show higher 
>>> response-time and lower rates.
>> 
>> 
>> Drops on fetch-rate and growth on fetch-size in the Flink Kafka consumer 
>> should be a natural consequence of backpressure in the job.
>> The fetch loop in the consumer will be blocked temporarily when backpressure 
>> is propagated from downstream operators, resulting in longer fetch intervals 
>> and larger batches on each fetch (given that events rate are still constant).
>> Therefore, I think the root cause is still along the lines of the producer 
>> side.
>> 
>> Would you happen to have any logs that maybe shows any useful information on 
>> the producer side?
>> I think we might have a better chance of finding out what is going on by 
>> digging there.
>> Also, which Flink version & Kafka version are you using?
>> 
>> Cheers,
>> Gordon
>> On 5 November 2017 at 11:24:49 PM, Ashish Pokharel (ashish...@yahoo.com 
>> <mailto:ashish...@yahoo.com>) wrote:
>> 
>>> All, 
>>> 
>>> I am starting to notice a strange behavior in a particular streaming app. I 
>>> initially thought it was a Producer issue as I was seeing timeout 
>>> exceptions (records expiring in queue. I did try to modify 
>>> request.timeout.ms, linger.ms etc to help with the issue if it were caused 
>>> by a sudden burst of data or something along those lines. However, what it 
>>> caused the app to increase back pressure and made the slower and slower 
>>> until that timeout is reached. With lower timeouts, app would actually 
>>> raise exception and recover faster. I can tell it is not related to 
>>> connectivity as other apps are running just fine around the same time frame 
>>> connected to same brokers (we have at least 10 streaming apps connected to 
>>> same list of brokers) from the same data nodes. We have enabled Graphite 
>>> Reporter in all of our applications. After deep diving into some of 
>>> consumer and producer stats, I noticed that consumer fetch-rate drops 
>>> tremendously while fetch-size grows exponentially BEFORE the producer 
>>> actually start to show higher response-time and lower rates. Eventually, I 
>>> noticed connection resets start to occur and connection counts go up 
>>> momentarily. After which, things get back to normal. Data producer rates 
>>> remain constant around that timeframe - we have Logstash producer sending 
>>> data over. We checked both Logstash and Kafka metrics and they seem to be 
>>> showing same pattern (sort of sin wave) throughout. 
>>> 
>>> It seems to point to Kafka issue (perhaps some tuning between Flink App and 
>>> Kafka) but wanted to check with the experts before I start knocking down 
>>> Kafka Admin’s doors. Are there anything else I can look into. There are 
>>> quite a few default stats in Graphite but those were the ones that made 
>>> most sense.  
>>> 
>>> Thanks, Ashish
> 

Reply via email to