Hi Grodon,

Thanks for your responses. It definitely makes sense. 

I could pull this stack from the logs, entire log itself is pretty big - let me 
know if some samples before/after this may help.
 
TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
 Could not forward element to next operator}
                at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
                at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
                at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
                at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
                at 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35)
                at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44)
                at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597)
                at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:552)
                at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:253)
                at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:217)
                ... 7 more
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
                at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
                ... 18 more
Caused by: java.lang.Exception: Failed to send data to Kafka: Expiring 7 
record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append
                at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
                at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
                at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
                ... 24 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7 
record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append

Also for reference here is my ProducerConfig from logs:

INFO  org.apache.kafka.clients.producer.ProducerConfig              - 
ProducerConfig values:
                acks = 1
                batch.size = 4096
                block.on.buffer.full = false
                bootstrap.servers =[xxxxxxx:xxx,xxxxxx:xxx]
                buffer.memory = 33554432
                client.id = 
                compression.type = none
                connections.max.idle.ms = 540000
                interceptor.classes = null
                key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
                linger.ms = 500
                max.block.ms = 60000
                max.in.flight.requests.per.connection = 5
                max.request.size = 1048576
                metadata.fetch.timeout.ms = 25000
                metadata.max.age.ms = 300000
                metric.reporters = []
                metrics.num.samples = 2
                metrics.sample.window.ms = 30000
                partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
                receive.buffer.bytes = 32768
                reconnect.backoff.ms = 50
                request.timeout.ms = 30000
                retries = 5
                retry.backoff.ms = 100
                sasl.jaas.config = null
                sasl.kerberos.kinit.cmd = /usr/bin/kinit
                sasl.kerberos.min.time.before.relogin = 60000
                sasl.kerberos.service.name = null
                sasl.kerberos.ticket.renew.jitter = 0.05
                sasl.kerberos.ticket.renew.window.factor = 0.8
                sasl.mechanism = GSSAPI
                security.protocol = PLAINTEXT
                send.buffer.bytes = 131072
                ssl.cipher.suites = null
                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                ssl.endpoint.identification.algorithm = null
                ssl.key.password = null
                ssl.keymanager.algorithm = SunX509
                ssl.keystore.location = null
                ssl.keystore.password = null
                ssl.keystore.type = JKS
                ssl.protocol = TLS
                ssl.provider = null
                ssl.secure.random.implementation = null
                ssl.trustmanager.algorithm = PKIX
                ssl.truststore.location = null
                ssl.truststore.password = null
                ssl.truststore.type = JKS
                timeout.ms = 30000
                value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

Thanks, Ashish

> On Nov 8, 2017, at 5:09 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
> 
> Hi Ashish,
> 
> From your description I do not yet have much of an idea of what may be 
> happening.
> However, some of your observations seems reasonable. I’ll go through them one 
> by one:
> 
>> I did try to modify request.timeout.ms, linger.ms etc to help with the issue 
>> if it were caused by a sudden burst of data or something along those lines. 
>> However, what it caused the app to increase back pressure and made the 
>> slower and slower until that timeout is reached.
> 
> If the client is experiencing trouble in writing outstanding records to 
> Kafka, and the timeout is increased, then I think increased back pressure is 
> indeed the expected behavior.
> 
>> I noticed that consumer fetch-rate drops tremendously while fetch-size grows 
>> exponentially BEFORE the producer actually start to show higher 
>> response-time and lower rates.
> 
> 
> Drops on fetch-rate and growth on fetch-size in the Flink Kafka consumer 
> should be a natural consequence of backpressure in the job.
> The fetch loop in the consumer will be blocked temporarily when backpressure 
> is propagated from downstream operators, resulting in longer fetch intervals 
> and larger batches on each fetch (given that events rate are still constant).
> Therefore, I think the root cause is still along the lines of the producer 
> side.
> 
> Would you happen to have any logs that maybe shows any useful information on 
> the producer side?
> I think we might have a better chance of finding out what is going on by 
> digging there.
> Also, which Flink version & Kafka version are you using?
> 
> Cheers,
> Gordon
> On 5 November 2017 at 11:24:49 PM, Ashish Pokharel (ashish...@yahoo.com 
> <mailto:ashish...@yahoo.com>) wrote:
> 
>> All, 
>> 
>> I am starting to notice a strange behavior in a particular streaming app. I 
>> initially thought it was a Producer issue as I was seeing timeout exceptions 
>> (records expiring in queue. I did try to modify request.timeout.ms, 
>> linger.ms etc to help with the issue if it were caused by a sudden burst of 
>> data or something along those lines. However, what it caused the app to 
>> increase back pressure and made the slower and slower until that timeout is 
>> reached. With lower timeouts, app would actually raise exception and recover 
>> faster. I can tell it is not related to connectivity as other apps are 
>> running just fine around the same time frame connected to same brokers (we 
>> have at least 10 streaming apps connected to same list of brokers) from the 
>> same data nodes. We have enabled Graphite Reporter in all of our 
>> applications. After deep diving into some of consumer and producer stats, I 
>> noticed that consumer fetch-rate drops tremendously while fetch-size grows 
>> exponentially BEFORE the producer actually start to show higher 
>> response-time and lower rates. Eventually, I noticed connection resets start 
>> to occur and connection counts go up momentarily. After which, things get 
>> back to normal. Data producer rates remain constant around that timeframe - 
>> we have Logstash producer sending data over. We checked both Logstash and 
>> Kafka metrics and they seem to be showing same pattern (sort of sin wave) 
>> throughout. 
>> 
>> It seems to point to Kafka issue (perhaps some tuning between Flink App and 
>> Kafka) but wanted to check with the experts before I start knocking down 
>> Kafka Admin’s doors. Are there anything else I can look into. There are 
>> quite a few default stats in Graphite but those were the ones that made most 
>> sense.  
>> 
>> Thanks, Ashish

Reply via email to