Hello Tony,

You mentioned in 0.11.0.0 the
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
1) while in 0.11.0.1 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2).
But from your logs it seems you set this config as 2 in both versions.
Right?

Anyways, I took a look into your logs and I think you are hitting a known
issue (https://issues.apache.org/jira/browse/KAFKA-5152) that has been
fixed in 0.11.0.0; that is why you only see the WARN log entry in 0.11.0.1
but the app is not dying out. The running out of memory issues seems not
related to the CommitFailed error. Do you have any stateful operations in
your app that use an iterator? Did you close the iterator after complete
using it?


Guozhang


On Tue, Nov 7, 2017 at 12:42 AM, Tony John <tonyjohnant...@gmail.com> wrote:

> Hi Guozang,
>
> Thanks for looking into this. I was using 0.11.0.0 version of the library
> earlier when I was getting the CommitFailed exception and the tasks were
> terminating. The application config then was Replication Factor = 2, Num
> Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll
> Interval = 2147483647. The streams config code (*Streams Config While
> Using
> 0.11.0.0*) is given below and the logs of the application while using
> 0.11.0.0 can be downloaded from
> https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0
>
> I have upgraded the libraries to 0.11.0.1 and ran into some other issues.
> Though the CommitFailed error logs are still showing up with 0.11.0.1 the
> tasks are not getting terminated, but the app quickly runs out of memory
> (GC overhead limit exceeded) and the CPU is choked, which was not the case
> earlier. The logs are available @
> https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and
> application config is also given below (*Streams Config While Using
> 0.11.0.1*).  Since I am not sure what configuration helps reduce the
> CommitFailed error, which I think could be one of the reasons for the CPU
> choke and eventually cause an OOM, I have gone ahead and used all possible
> configuration parameters, but still no luck.
>
> It would be great if you could shed some light on this as to what could be
> causing this problem.
>
> *Streams Config While Using 0.11.0.0 *
>
> val props = Properties()
> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> EngineConfig.APPLICATION_ID)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> EngineConfig.KAFKA_SERVERS)
> props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_RECORDS_CONFIG),
> 1000)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_INTERVAL_MS_CONFIG),
> Int.MAX_VALUE)
>
> streams = KafkaStreams(builder, StreamsConfig(props))
> streams.start()
>
>
> *Streams Config While Using 0.11.0.1*
>
> val props = Properties()
> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> EngineConfig.APPLICATION_ID)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> EngineConfig.KAFKA_SERVERS)
> props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)
>
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTB
> EAT_INTERVAL_MS_CONFIG),
> 10000)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_
> COMMIT_INTERVAL_MS_CONFIG),
> 10000)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE
> _AUTO_COMMIT_CONFIG),
> true)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_RECORDS_CONFIG),
> 10000)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_INTERVAL_MS_CONFIG),
> Int.MAX_VALUE)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSIO
> N_TIMEOUT_MS_CONFIG),
> 30000)
>
> streams = KafkaStreams(builder, StreamsConfig(props))
> streams.start()
>
>
> Thanks,
> Tony
>
> On Thu, Nov 2, 2017 at 4:39 PM, Tony John <tonyjohnant...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I am facing CommitFailedException in my streams application. As per the
> > log I tried changing the max.poll.interval.ms and max.poll.records. But
> > both didn't help. PFA the full stack trace of the exception and below is
> > the streams configuration used. What else could be wrong?
> >
> > val props = Properties()
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> EngineConfig.APPLICATION_ID)
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> EngineConfig.KAFKA_SERVERS)
> > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
> 1000)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
> Int.MAX_VALUE)
> >
> > streams = KafkaStreams(builder, StreamsConfig(props))
> > streams.start()
> >
> >
> > Thanks,
> > Tony
> >
>



-- 
-- Guozhang

Reply via email to