Kafka Producer NetworkException and Timeout Exceptions

2017-11-08 Thread Shantanu Deshmukh
We are getting random NetworkExceptions and TimeoutExceptions in our production 
environment:

Brokers: 3 Zookeepers: 3 Servers: 3 Kafka: 0.10.0.1 Zookeeeper: 3.4.3

We are occasionally getting this exception in my producer logs:

Expiring 10 record(s) for TOPIC:XX: 5608 ms has passed since batch creation 
plus linger time.

Number of milliseconds in such error messages keep changing. Sometimes its ~5 
seconds other times it's up to ~13 seconds!

And very rarely we get:

NetworkException: Server disconnected before response received.

Cluster consists of 3 brokers and 3 zookeepers. Producer server and Kafka 
cluster are in samenetwork.

I am making synchronous calls. There's a web service to which multiple user 
requests call to send their data. Kafka web service has one Producer object 
which does all the sending. Producer's Request timeout was 1000ms initially 
that has been changed to 15000ms (15 seconds). Even after increasing timeout 
period TimeoutExceptions are still showing up in error logs.

What can be the reason?



Get Outlook for Android



Frequent consumer rebalances, auto commit failures

2018-05-22 Thread Shantanu Deshmukh
We have a 3 broker Kafka 0.10.0.1 cluster. There we have 3 topics with 10
partitions each. We have an application which spawns threads as consumers.
We spawn 5 consumers for each topic. I am observing that consider group
randomly keeps rebalancing. Then many times we see logs saying "Revoking
partitions for". This happens almost every 10 minutes. Consumption during
this time completely stops.

I have applied this configuration
max.poll.records 20
heartbeat.interval.ms 1
Session.timeout.ms 6000

Still this did not help. Strange thing is I observed consumer writing logs
saying "auto commit failed because poll() loop spent too much time
processing records" even when there was no data in partition to process. We
have polling interval of 500 ms, specified as argument in poll(). Initially
I had set same consumer group for all three topics' consumers. Then I
specified different CGs for different topics' consumers. Even this is not
helping.

I am trying to search over the web, checked my code, tried many
combinations of configuration but still no luck. Please help me.

Thanks & Regards,

Shantanu Deshmukh


Fwd: Frequent consumer rebalances, auto commit failures

2018-05-23 Thread Shantanu Deshmukh
Hello,

We have a 3 broker Kafka 0.10.0.1 cluster. There we have 3 topics with 10
partitions each. We have an application which spawns threads as consumers.
We spawn 5 consumers for each topic. I am observing that consider group
randomly keeps rebalancing. Then many times we see logs saying "Revoking
partitions for". This happens almost every 10 minutes. Consumption during
this time completely stops.

I have applied this configuration
max.poll.records 20
heartbeat.interval.ms 1
Session.timeout.ms 6000

Still this did not help. Strange thing is I observed consumer writing logs
saying "auto commit failed because poll() loop spent too much time
processing records" even when there was no data in partition to process. We
have polling interval of 500 ms, specified as argument in poll(). Initially
I had set same consumer group for all three topics' consumers. Then I
specified different CGs for different topics' consumers. Even this is not
helping.

I am trying to search over the web, checked my code, tried many
combinations of configuration but still no luck. Please help me.

Thanks & Regards,

Shantanu Deshmukh


Frequent consumer rebalance, auto commit failures

2018-05-23 Thread Shantanu Deshmukh
 We have a 3 broker Kafka 0.10.0.1 cluster. There we have 3 topics with 10
partitions each. We have an application which spawns threads as consumers.
We spawn 5 consumers for each topic. I am observing that consider group
randomly keeps rebalancing. Then many times we see logs saying "Revoking
partitions for". This happens almost every 10 minutes. Consumption during
this time completely stops.

I have applied this configuration
max.poll.records 20
heartbeat.interval.ms 1
Session.timeout.ms 6000

Still this did not help. Strange thing is I observed consumer writing logs
saying "auto commit failed because poll() loop spent too much time
processing records" even when there was no data in partition to process. We
have polling interval of 500 ms, specified as argument in poll(). Initially
I had set same consumer group for all three topics' consumers. Then I
specified different CGs for different topics' consumers. Even this is not
helping.

I am trying to search over the web, checked my code, tried many
combinations of configuration but still no luck. Please help me.

*Thanks & Regards,*

*Shantanu Deshmukh*


Re: Frequent consumer rebalance, auto commit failures

2018-05-24 Thread Shantanu Deshmukh
Someone please help me. I am suffering due to this issue since a long time
and not finding any solution.

On Wed, May 23, 2018 at 3:48 PM Shantanu Deshmukh 
wrote:

> We have a 3 broker Kafka 0.10.0.1 cluster. There we have 3 topics with 10
> partitions each. We have an application which spawns threads as consumers.
> We spawn 5 consumers for each topic. I am observing that consider group
> randomly keeps rebalancing. Then many times we see logs saying "Revoking
> partitions for". This happens almost every 10 minutes. Consumption during
> this time completely stops.
>
> I have applied this configuration
> max.poll.records 20
> heartbeat.interval.ms 1
> Session.timeout.ms 6000
>
> Still this did not help. Strange thing is I observed consumer writing logs
> saying "auto commit failed because poll() loop spent too much time
> processing records" even when there was no data in partition to process. We
> have polling interval of 500 ms, specified as argument in poll(). Initially
> I had set same consumer group for all three topics' consumers. Then I
> specified different CGs for different topics' consumers. Even this is not
> helping.
>
> I am trying to search over the web, checked my code, tried many
> combinations of configuration but still no luck. Please help me.
>
> *Thanks & Regards,*
>
> *Shantanu Deshmukh*
>


Re: Frequent consumer rebalance, auto commit failures

2018-05-24 Thread Shantanu Deshmukh
Hello,

There was a type in my first mail. session.timeout.ms is actually 6 not
6000. So it is less than heartbeat.interval.ms.

On Thu, May 24, 2018 at 2:46 PM Manikumar  wrote:

> heartbeat.interval.ms should be lower than session.timeout.ms.
>
> Check here:
> http://kafka.apache.org/0101/documentation.html#newconsumerconfigs
>
>
> On Thu, May 24, 2018 at 2:39 PM, Shantanu Deshmukh 
> wrote:
>
> > Someone please help me. I am suffering due to this issue since a long
> time
> > and not finding any solution.
> >
> > On Wed, May 23, 2018 at 3:48 PM Shantanu Deshmukh  >
> > wrote:
> >
> > > We have a 3 broker Kafka 0.10.0.1 cluster. There we have 3 topics with
> 10
> > > partitions each. We have an application which spawns threads as
> > consumers.
> > > We spawn 5 consumers for each topic. I am observing that consider group
> > > randomly keeps rebalancing. Then many times we see logs saying
> "Revoking
> > > partitions for". This happens almost every 10 minutes. Consumption
> during
> > > this time completely stops.
> > >
> > > I have applied this configuration
> > > max.poll.records 20
> > > heartbeat.interval.ms 1
> > > Session.timeout.ms 6000
> > >
> > > Still this did not help. Strange thing is I observed consumer writing
> > logs
> > > saying "auto commit failed because poll() loop spent too much time
> > > processing records" even when there was no data in partition to
> process.
> > We
> > > have polling interval of 500 ms, specified as argument in poll().
> > Initially
> > > I had set same consumer group for all three topics' consumers. Then I
> > > specified different CGs for different topics' consumers. Even this is
> not
> > > helping.
> > >
> > > I am trying to search over the web, checked my code, tried many
> > > combinations of configuration but still no luck. Please help me.
> > >
> > > *Thanks & Regards,*
> > >
> > > *Shantanu Deshmukh*
> > >
> >
>


kafka manual commit vs auto commit

2018-05-24 Thread Shantanu Deshmukh
Hello everyone,

We have a 3 broker Kafka 0.10.1.0 cluster in production environment. Lately
we are seeing a lot of "auto commit failed because poll() spend too much
time processing" warning messages. Also, due to such events there is
constant fear of duplicate messages and the same does happen. To tackle
this, I have tried a number of configuration combinations. I have set
max.poll.records to as low as 5. Increased max.poll.interval.ms to 10
minutes. increased session.timeout.ms to 5 minutes etc. However, this has
not helped. So, this has got me wondering shall we go after manual commit
of offsets?

Our consumer process has one topic which has produce rate of about 15k
events per second. It takes about 25ms to process one event of that topic
and we have 5 consumers for that topic. If I do manual commit of every
single event that we process what cost will it incur on host machine or
network etc? Is there a better optimized method of manual commit? Or better
yet, how to avoid "auto commit failed" error?

*Thanks & Regards,*
*Shantanu Deshmukh*


Re: Frequent consumer rebalance, auto commit failures

2018-05-24 Thread Shantanu Deshmukh
Hi M. Manna,

Thanks I will try these settings.

On Thu, May 24, 2018 at 5:15 PM M. Manna  wrote:

> Set your rebalance.backoff.ms=1 and zookeeper.session.timeout.ms=3
> in addition to what Manikumar said.
>
>
>
> On 24 May 2018 at 12:41, Shantanu Deshmukh  wrote:
>
> > Hello,
> >
> > There was a type in my first mail. session.timeout.ms is actually 6
> > not
> > 6000. So it is less than heartbeat.interval.ms.
> >
> > On Thu, May 24, 2018 at 2:46 PM Manikumar 
> > wrote:
> >
> > > heartbeat.interval.ms should be lower than session.timeout.ms.
> > >
> > > Check here:
> > > http://kafka.apache.org/0101/documentation.html#newconsumerconfigs
> > >
> > >
> > > On Thu, May 24, 2018 at 2:39 PM, Shantanu Deshmukh <
> > shantanu...@gmail.com>
> > > wrote:
> > >
> > > > Someone please help me. I am suffering due to this issue since a long
> > > time
> > > > and not finding any solution.
> > > >
> > > > On Wed, May 23, 2018 at 3:48 PM Shantanu Deshmukh <
> > shantanu...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > We have a 3 broker Kafka 0.10.0.1 cluster. There we have 3 topics
> > with
> > > 10
> > > > > partitions each. We have an application which spawns threads as
> > > > consumers.
> > > > > We spawn 5 consumers for each topic. I am observing that consider
> > group
> > > > > randomly keeps rebalancing. Then many times we see logs saying
> > > "Revoking
> > > > > partitions for". This happens almost every 10 minutes. Consumption
> > > during
> > > > > this time completely stops.
> > > > >
> > > > > I have applied this configuration
> > > > > max.poll.records 20
> > > > > heartbeat.interval.ms 1
> > > > > Session.timeout.ms 6000
> > > > >
> > > > > Still this did not help. Strange thing is I observed consumer
> writing
> > > > logs
> > > > > saying "auto commit failed because poll() loop spent too much time
> > > > > processing records" even when there was no data in partition to
> > > process.
> > > > We
> > > > > have polling interval of 500 ms, specified as argument in poll().
> > > > Initially
> > > > > I had set same consumer group for all three topics' consumers.
> Then I
> > > > > specified different CGs for different topics' consumers. Even this
> is
> > > not
> > > > > helping.
> > > > >
> > > > > I am trying to search over the web, checked my code, tried many
> > > > > combinations of configuration but still no luck. Please help me.
> > > > >
> > > > > *Thanks & Regards,*
> > > > >
> > > > > *Shantanu Deshmukh*
> > > > >
> > > >
> > >
> >
>


Re: Frequent consumer rebalance, auto commit failures

2018-05-24 Thread Shantanu Deshmukh
Hi Vincent,

Yes I reduced max.poll.records to get that same effect. I reduced it all
the way down to 5 records still I am seeing same error. What else can be
done? For one topic I can see that a single message processing is taking
about 20 seconds. So 5 of them will take 1 minute. So I set
session.timeout.ms to 5 minutes, max.poll.interval.ms to 10 minutes. But it
is not helping still.

On Thu, May 24, 2018 at 6:15 PM Vincent Maurin 
wrote:

> Hello Shantanu,
>
> It is also important to consider your consumer code. You should not spend
> to much time in between two calls to "poll" method. Otherwise, the consumer
> not calling poll will be considered dead by the group, triggering a
> rebalancing.
>
> Best
>
> On Thu, May 24, 2018 at 1:45 PM M. Manna  wrote:
>
> > Set your rebalance.backoff.ms=1 and zookeeper.session.timeout.ms
> =3
> > in addition to what Manikumar said.
> >
> >
> >
> > On 24 May 2018 at 12:41, Shantanu Deshmukh 
> wrote:
> >
> > > Hello,
> > >
> > > There was a type in my first mail. session.timeout.ms is actually
> 6
> > > not
> > > 6000. So it is less than heartbeat.interval.ms.
> > >
> > > On Thu, May 24, 2018 at 2:46 PM Manikumar 
> > > wrote:
> > >
> > > > heartbeat.interval.ms should be lower than session.timeout.ms.
> > > >
> > > > Check here:
> > > > http://kafka.apache.org/0101/documentation.html#newconsumerconfigs
> > > >
> > > >
> > > > On Thu, May 24, 2018 at 2:39 PM, Shantanu Deshmukh <
> > > shantanu...@gmail.com>
> > > > wrote:
> > > >
> > > > > Someone please help me. I am suffering due to this issue since a
> long
> > > > time
> > > > > and not finding any solution.
> > > > >
> > > > > On Wed, May 23, 2018 at 3:48 PM Shantanu Deshmukh <
> > > shantanu...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > We have a 3 broker Kafka 0.10.0.1 cluster. There we have 3 topics
> > > with
> > > > 10
> > > > > > partitions each. We have an application which spawns threads as
> > > > > consumers.
> > > > > > We spawn 5 consumers for each topic. I am observing that consider
> > > group
> > > > > > randomly keeps rebalancing. Then many times we see logs saying
> > > > "Revoking
> > > > > > partitions for". This happens almost every 10 minutes.
> Consumption
> > > > during
> > > > > > this time completely stops.
> > > > > >
> > > > > > I have applied this configuration
> > > > > > max.poll.records 20
> > > > > > heartbeat.interval.ms 1
> > > > > > Session.timeout.ms 6000
> > > > > >
> > > > > > Still this did not help. Strange thing is I observed consumer
> > writing
> > > > > logs
> > > > > > saying "auto commit failed because poll() loop spent too much
> time
> > > > > > processing records" even when there was no data in partition to
> > > > process.
> > > > > We
> > > > > > have polling interval of 500 ms, specified as argument in poll().
> > > > > Initially
> > > > > > I had set same consumer group for all three topics' consumers.
> > Then I
> > > > > > specified different CGs for different topics' consumers. Even
> this
> > is
> > > > not
> > > > > > helping.
> > > > > >
> > > > > > I am trying to search over the web, checked my code, tried many
> > > > > > combinations of configuration but still no luck. Please help me.
> > > > > >
> > > > > > *Thanks & Regards,*
> > > > > >
> > > > > > *Shantanu Deshmukh*
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: kafka manual commit vs auto commit

2018-05-24 Thread Shantanu Deshmukh
So here, when I received a message I run some business logic on it and try
to send some email. Now sometimes we have a promotional campaign running
millions of emails need to be delivered. For such numerous events is manual
commit good? Will it generate too much network activity if I commit a
single message every time?

On Thu, May 24, 2018 at 6:20 PM M. Manna  wrote:

> Manual commit is important where event consumption eventually leads to some
> post-processing/database update/state change for your application. Without
> doing all those, you cannot truly say that you have "Received" the message.
> "Receiving" is interpreted differently and it's up to your target
> application.
>
> Jason Gustafson has written some descriptive post regarding transactions. I
> don't know if you can extrapolate your changes based on those:
>
> 1) https://www.confluent.io/blog/transactions-apache-kafka/
> 2)
>
> https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
>
> ALso, please don't forget to read Javadoc on KafkaConsumer.java
>
> Regards,
>
> On 24 May 2018 at 13:29, Shantanu Deshmukh  wrote:
>
> > Hello everyone,
> >
> > We have a 3 broker Kafka 0.10.1.0 cluster in production environment.
> Lately
> > we are seeing a lot of "auto commit failed because poll() spend too much
> > time processing" warning messages. Also, due to such events there is
> > constant fear of duplicate messages and the same does happen. To tackle
> > this, I have tried a number of configuration combinations. I have set
> > max.poll.records to as low as 5. Increased max.poll.interval.ms to 10
> > minutes. increased session.timeout.ms to 5 minutes etc. However, this
> has
> > not helped. So, this has got me wondering shall we go after manual commit
> > of offsets?
> >
> > Our consumer process has one topic which has produce rate of about 15k
> > events per second. It takes about 25ms to process one event of that topic
> > and we have 5 consumers for that topic. If I do manual commit of every
> > single event that we process what cost will it incur on host machine or
> > network etc? Is there a better optimized method of manual commit? Or
> better
> > yet, how to avoid "auto commit failed" error?
> >
> > *Thanks & Regards,*
> > *Shantanu Deshmukh*
> >
>


Re: Frequent consumer rebalance, auto commit failures

2018-05-24 Thread Shantanu Deshmukh
Another observation is that when I restart my application. Consumption
doesn't start till 5-6 minutes. In kafka consumer logs I see

ConsumerCoordinator.333 - Revoking previously assigned partitions [] for
group notifications-consumer
AbstractCoordinator:381 - (Re-)joining group notifications-consumer

Then nothing. After 5-6 minutes activities start.

On Thu, May 24, 2018 at 6:49 PM Shantanu Deshmukh 
wrote:

> Hi Vincent,
>
> Yes I reduced max.poll.records to get that same effect. I reduced it all
> the way down to 5 records still I am seeing same error. What else can be
> done? For one topic I can see that a single message processing is taking
> about 20 seconds. So 5 of them will take 1 minute. So I set
> session.timeout.ms to 5 minutes, max.poll.interval.ms to 10 minutes. But
> it is not helping still.
>
> On Thu, May 24, 2018 at 6:15 PM Vincent Maurin 
> wrote:
>
>> Hello Shantanu,
>>
>> It is also important to consider your consumer code. You should not spend
>> to much time in between two calls to "poll" method. Otherwise, the
>> consumer
>> not calling poll will be considered dead by the group, triggering a
>> rebalancing.
>>
>> Best
>>
>> On Thu, May 24, 2018 at 1:45 PM M. Manna  wrote:
>>
>> > Set your rebalance.backoff.ms=10000 and zookeeper.session.timeout.ms
>> =3
>> > in addition to what Manikumar said.
>> >
>> >
>> >
>> > On 24 May 2018 at 12:41, Shantanu Deshmukh 
>> wrote:
>> >
>> > > Hello,
>> > >
>> > > There was a type in my first mail. session.timeout.ms is actually
>> 6
>> > > not
>> > > 6000. So it is less than heartbeat.interval.ms.
>> > >
>> > > On Thu, May 24, 2018 at 2:46 PM Manikumar 
>> > > wrote:
>> > >
>> > > > heartbeat.interval.ms should be lower than session.timeout.ms.
>> > > >
>> > > > Check here:
>> > > > http://kafka.apache.org/0101/documentation.html#newconsumerconfigs
>> > > >
>> > > >
>> > > > On Thu, May 24, 2018 at 2:39 PM, Shantanu Deshmukh <
>> > > shantanu...@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > Someone please help me. I am suffering due to this issue since a
>> long
>> > > > time
>> > > > > and not finding any solution.
>> > > > >
>> > > > > On Wed, May 23, 2018 at 3:48 PM Shantanu Deshmukh <
>> > > shantanu...@gmail.com
>> > > > >
>> > > > > wrote:
>> > > > >
>> > > > > > We have a 3 broker Kafka 0.10.0.1 cluster. There we have 3
>> topics
>> > > with
>> > > > 10
>> > > > > > partitions each. We have an application which spawns threads as
>> > > > > consumers.
>> > > > > > We spawn 5 consumers for each topic. I am observing that
>> consider
>> > > group
>> > > > > > randomly keeps rebalancing. Then many times we see logs saying
>> > > > "Revoking
>> > > > > > partitions for". This happens almost every 10 minutes.
>> Consumption
>> > > > during
>> > > > > > this time completely stops.
>> > > > > >
>> > > > > > I have applied this configuration
>> > > > > > max.poll.records 20
>> > > > > > heartbeat.interval.ms 1
>> > > > > > Session.timeout.ms 6000
>> > > > > >
>> > > > > > Still this did not help. Strange thing is I observed consumer
>> > writing
>> > > > > logs
>> > > > > > saying "auto commit failed because poll() loop spent too much
>> time
>> > > > > > processing records" even when there was no data in partition to
>> > > > process.
>> > > > > We
>> > > > > > have polling interval of 500 ms, specified as argument in
>> poll().
>> > > > > Initially
>> > > > > > I had set same consumer group for all three topics' consumers.
>> > Then I
>> > > > > > specified different CGs for different topics' consumers. Even
>> this
>> > is
>> > > > not
>> > > > > > helping.
>> > > > > >
>> > > > > > I am trying to search over the web, checked my code, tried many
>> > > > > > combinations of configuration but still no luck. Please help me.
>> > > > > >
>> > > > > > *Thanks & Regards,*
>> > > > > >
>> > > > > > *Shantanu Deshmukh*
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>


Re: Frequent consumer rebalance, auto commit failures

2018-05-24 Thread Shantanu Deshmukh
Hey Vincent.
That's exactly how my code is. I am doing processing within that for loop.

In KIP-62 I read that heartbeat happens via a separate thread
https://github.com/dpkp/kafka-python/issues/948. But you are saying it
happens through polling. What can be considered true?  I have set
session.timeout.ms to 5 minutes. max.poll.records is set to 5. So even if
my message takes 30 seconds to process, it still shouldn't cross this
threshold. Yet I see frequent rebalances. Then there is max.poll.interval.ms
too. Don't exactly know how it affects. But overall I am finding it very
difficult to understand these myriads of settings, also documentation is
not very clear.

On Thu, May 24, 2018 at 8:09 PM Vincent Maurin 
wrote:

> Shantanu, I was more referering to you application code.
> You should have something similar to :
>
> while (true) {
> ConsumerRecords records = consumer.poll(100);
> for (ConsumerRecord record : records) {
>   // Your logic
> }
> }
>
> You should make sure that the code within the loop doesn't take too much
> time (more than session.timeout.ms)
> From the consumer javadoc
> "The consumer will automatically ping the cluster periodically, which lets
> the cluster know that it is alive. Note that the consumer is
> single-threaded, so periodic heartbeats can only be sent when poll(long)
> <
> https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
> >
> is called. As long as the consumer is able to do this it is considered
> alive and retains the right to consume from the partitions assigned to it.
> If it stops heartbeating by failing to call poll(long)
> <
> https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
> >
> for a period of time longer than session.timeout.ms then it will be
> considered dead and its partitions will be assigned to another process."
>
> Best
>
> On Thu, May 24, 2018 at 4:07 PM Shantanu Deshmukh 
> wrote:
>
> > Another observation is that when I restart my application. Consumption
> > doesn't start till 5-6 minutes. In kafka consumer logs I see
> >
> > ConsumerCoordinator.333 - Revoking previously assigned partitions [] for
> > group notifications-consumer
> > AbstractCoordinator:381 - (Re-)joining group notifications-consumer
> >
> > Then nothing. After 5-6 minutes activities start.
> >
> > On Thu, May 24, 2018 at 6:49 PM Shantanu Deshmukh  >
> > wrote:
> >
> > > Hi Vincent,
> > >
> > > Yes I reduced max.poll.records to get that same effect. I reduced it
> all
> > > the way down to 5 records still I am seeing same error. What else can
> be
> > > done? For one topic I can see that a single message processing is
> taking
> > > about 20 seconds. So 5 of them will take 1 minute. So I set
> > > session.timeout.ms to 5 minutes, max.poll.interval.ms to 10 minutes.
> But
> > > it is not helping still.
> > >
> > > On Thu, May 24, 2018 at 6:15 PM Vincent Maurin <
> > vincent.mau...@glispa.com>
> > > wrote:
> > >
> > >> Hello Shantanu,
> > >>
> > >> It is also important to consider your consumer code. You should not
> > spend
> > >> to much time in between two calls to "poll" method. Otherwise, the
> > >> consumer
> > >> not calling poll will be considered dead by the group, triggering a
> > >> rebalancing.
> > >>
> > >> Best
> > >>
> > >> On Thu, May 24, 2018 at 1:45 PM M. Manna  wrote:
> > >>
> > >> > Set your rebalance.backoff.ms=1 and
> zookeeper.session.timeout.ms
> > >> =3
> > >> > in addition to what Manikumar said.
> > >> >
> > >> >
> > >> >
> > >> > On 24 May 2018 at 12:41, Shantanu Deshmukh 
> > >> wrote:
> > >> >
> > >> > > Hello,
> > >> > >
> > >> > > There was a type in my first mail. session.timeout.ms is actually
> > >> 6
> > >> > > not
> > >> > > 6000. So it is less than heartbeat.interval.ms.
> > >> > >
> > >> > > On Thu, May 24, 2018 at 2:46 PM Manikumar <
> > manikumar.re...@gmail.com>
> > >> > > wrote:
> > >> > >
> > >> > > > heartbeat.interval.ms should be lower than session.timeout.ms.
> > >> > > >
> > >> > > > Check here:
> > &g

Reliable way to purge data from Kafka topics

2018-05-24 Thread Shantanu Deshmukh
Hello,

We have cross data center replication. Using Kafka mirror maker we are
replicating data from our primary cluster to backup cluster. Problem arises
when we start operating from backup cluster, in case of drill or actual
outage. Data gathered at backup cluster needs to be reverse-replicated to
primary. To do that I can only think of two options. 1) Use a different CG
every time for mirror maker 2) Purge topics so that data sent by primary
doesn't get replicated back to primary again due to reverse replication.

We have opted for purging Kafka topics which are under replication. I use
kafka-topics.sh --alter command to set retention of topic to 5 seconds to
purge data. But this doesn't see to be a fool proof mechanism. Thread
responsible for doing this every minute, and even if it runs it's not sure
to work as there are multiple conditions. That, segment should be full or
certain time should have passed to roll a new segment. It so happened
during one such drill to move to backup cluster, purge command was issued
and we waited for 5 minutes. Still data wasn't purged. Due to this we faced
data duplication when reverse replication started.

Is there a better way to achieve this?


Re: Reliable way to purge data from Kafka topics

2018-05-25 Thread Shantanu Deshmukh
Hi Vincent,

Our producers are consumers are indeed local to Kafka cluster. When we
switch DC everything switches. So when we are on backup producers and
consumers on backup DC are active, everything on primary DC is stopped.

Whatever data gets accumulated on backup DC needs to be reflected in
primary DC. That's when we start reverse replication. And to clean up data
replicated from primary to backup (before switch happened), we have to
purge topics on backup Kafka cluster. And that is the challenge.

On Fri, May 25, 2018 at 12:40 PM Vincent Maurin 
wrote:

> Hi Shantanu
>
> I am not sure the scenario you are describing is the best case. I would
> more consider the problem in term of producers and consumers of the data.
> Usually is a good practice to put your producer local to your kafka
> cluster, so in your case, I would suggest you have producers in the main
> and in the backup data center / region.
> Then the question arise for your consumers and eventually your data storage
> behing. If it is centralized in one place, in could be better to no use
> mirror maker and have duplication of the consumer.
>
> So something looking more like a star schema, let me try some ascii art :
>
> Main DC :Data storage/processing DC :
> Producer --> Kafka   |Consumer >  Data storage
>  |   /->
> Backup DC :  |  /
> Producer --> Kafka   |Consumer /
>
> If you have an outage on the main, the backup can "deplace it" (maybe just
> with a DNS switch or similar)
> If you have an outage on your storage/processing part, messages will just
> be stored in kafka the time your consumers are up again (plan enough disk
> on kafka to conver your SLA)
>
> Best,
>
>
>
>
> On Fri, May 25, 2018 at 9:00 AM Jörn Franke  wrote:
>
> > Purging will never prevent that it does not get replicated for sure.
> There
> > will be always a case (error to purge etc) and then it is still
> replicated.
> > You may reduce the probability but it will never be impossible.
> >
> > Your application should be able to handle duplicated messages.
> >
> > > On 25. May 2018, at 08:54, Shantanu Deshmukh 
> > wrote:
> > >
> > > Hello,
> > >
> > > We have cross data center replication. Using Kafka mirror maker we are
> > > replicating data from our primary cluster to backup cluster. Problem
> > arises
> > > when we start operating from backup cluster, in case of drill or actual
> > > outage. Data gathered at backup cluster needs to be reverse-replicated
> to
> > > primary. To do that I can only think of two options. 1) Use a different
> > CG
> > > every time for mirror maker 2) Purge topics so that data sent by
> primary
> > > doesn't get replicated back to primary again due to reverse
> replication.
> > >
> > > We have opted for purging Kafka topics which are under replication. I
> use
> > > kafka-topics.sh --alter command to set retention of topic to 5 seconds
> to
> > > purge data. But this doesn't see to be a fool proof mechanism. Thread
> > > responsible for doing this every minute, and even if it runs it's not
> > sure
> > > to work as there are multiple conditions. That, segment should be full
> or
> > > certain time should have passed to roll a new segment. It so happened
> > > during one such drill to move to backup cluster, purge command was
> issued
> > > and we waited for 5 minutes. Still data wasn't purged. Due to this we
> > faced
> > > data duplication when reverse replication started.
> > >
> > > Is there a better way to achieve this?
> >
>


Re: Reliable way to purge data from Kafka topics

2018-05-25 Thread Shantanu Deshmukh
Hi Vincent,

We have ELK cluster in both primary and backup DC. So end goal of consumers
(Logstash) is to index logs in Elasticsearch and show them using Kibana. We
are replicating data in ELKs using mirror maker. It's not possible to
consume from both DCs at the same time as components which produce logs are
active only on one of the DCs.

If you say mirror maker is known to generate duplicates then what is other
reliable means of replication? Someone suggested Confluent Replicator.
However, it requires Confluent Kafka distro and we have Apache Kafka. We
can't change this infra at our current stage.

Thanks & Regards,

Shantanu Deshmukh

On Fri 25 May, 2018, 1:30 PM Vincent Maurin, 
wrote:

> What is the end results done by your consumers ?
> From what I understand, having the need for no duplicates means that these
> duplicates can show up somewhere ?
>
> According your needs, you can also have consumers in the two DC consuming
> from both. Then you don't have duplicate because a message is either
> produced on one cluster or the other.
> I would really avoid mirror makers here for this setup (it is the component
> creating the duplicates if you consume from both clusters at the end)
>
>
> On Fri, May 25, 2018 at 9:29 AM Shantanu Deshmukh 
> wrote:
>
> > Hi Vincent,
> >
> > Our producers are consumers are indeed local to Kafka cluster. When we
> > switch DC everything switches. So when we are on backup producers and
> > consumers on backup DC are active, everything on primary DC is stopped.
> >
> > Whatever data gets accumulated on backup DC needs to be reflected in
> > primary DC. That's when we start reverse replication. And to clean up
> data
> > replicated from primary to backup (before switch happened), we have to
> > purge topics on backup Kafka cluster. And that is the challenge.
> >
> > On Fri, May 25, 2018 at 12:40 PM Vincent Maurin <
> vincent.mau...@glispa.com
> > >
> > wrote:
> >
> > > Hi Shantanu
> > >
> > > I am not sure the scenario you are describing is the best case. I would
> > > more consider the problem in term of producers and consumers of the
> data.
> > > Usually is a good practice to put your producer local to your kafka
> > > cluster, so in your case, I would suggest you have producers in the
> main
> > > and in the backup data center / region.
> > > Then the question arise for your consumers and eventually your data
> > storage
> > > behing. If it is centralized in one place, in could be better to no use
> > > mirror maker and have duplication of the consumer.
> > >
> > > So something looking more like a star schema, let me try some ascii
> art :
> > >
> > > Main DC :Data storage/processing DC :
> > > Producer --> Kafka   |Consumer >  Data storage
> > >  |   /->
> > > Backup DC :  |  /
> > > Producer --> Kafka   |Consumer /
> > >
> > > If you have an outage on the main, the backup can "deplace it" (maybe
> > just
> > > with a DNS switch or similar)
> > > If you have an outage on your storage/processing part, messages will
> just
> > > be stored in kafka the time your consumers are up again (plan enough
> disk
> > > on kafka to conver your SLA)
> > >
> > > Best,
> > >
> > >
> > >
> > >
> > > On Fri, May 25, 2018 at 9:00 AM Jörn Franke 
> > wrote:
> > >
> > > > Purging will never prevent that it does not get replicated for sure.
> > > There
> > > > will be always a case (error to purge etc) and then it is still
> > > replicated.
> > > > You may reduce the probability but it will never be impossible.
> > > >
> > > > Your application should be able to handle duplicated messages.
> > > >
> > > > > On 25. May 2018, at 08:54, Shantanu Deshmukh <
> shantanu...@gmail.com>
> > > > wrote:
> > > > >
> > > > > Hello,
> > > > >
> > > > > We have cross data center replication. Using Kafka mirror maker we
> > are
> > > > > replicating data from our primary cluster to backup cluster.
> Problem
> > > > arises
> > > > > when we start operating from backup cluster, in case of drill or
> > actual
> > > > > outage. Data gathered at backup cluster needs to be
> > reverse-replicated
> > > to
> > > > > primary. To do that I can only thin

Re: Facing Duplication Issue in kakfa

2018-05-28 Thread Shantanu Deshmukh
Duplication can happen if your producer or consumer are exiting uncleanly.
Like if producer just crashes before it receives ack from broker your logic
will fail to register that message got produced. And when it comes back up
it will try to send that batch again. Same with consumer, if it crashes
before committing a batch of messages and comes back up it will receive
that batch. Only help is to try exiting cleanly as far as possible. Ensure
you catch kill signal then have your producer mark messages delivered to
Kafka broker as processed. In case of consumer commit more often, catch
kill signal and commit remaining. That's how I have done in my application.

On Mon, May 28, 2018 at 12:48 PM Karthick Kumar 
wrote:

> Hi,
>
> Facing Duplication inconsistently while bouncing Kafka producer and
> consumer in tomcat node. any help will be appreciated to find out the root
> cause.
>
> --
> With Regards,
> Karthick.K
>


Effect of settings segment.ms and retention.ms not accurate

2018-05-28 Thread Shantanu Deshmukh
 I have a topic otp-sms. I want that retention of this topic should be 5
minutes as OTPs are invalid post that amount of time. So I set
retention.ms=30.
However, this was not working. So reading more in depth in Kafka
configuration document I found another topic level setting that can be
tuned for topic retention to work properly. So I set segment.ms=30 as
well.

After these changes I saw that old logs go deleted. Still in the topic I
could see one record which is more than 15 minutes old and not getting
deleted. What does one have to do to actually delete messages generated n
minutes ago?


Re: Effect of settings segment.ms and retention.ms not accurate

2018-05-28 Thread Shantanu Deshmukh
Please help.

On Mon, May 28, 2018 at 5:18 PM Shantanu Deshmukh 
wrote:

>  I have a topic otp-sms. I want that retention of this topic should be 5
> minutes as OTPs are invalid post that amount of time. So I set
> retention.ms=30. However, this was not working. So reading more in
> depth in Kafka configuration document I found another topic level setting
> that can be tuned for topic retention to work properly. So I set
> segment.ms=30 as well.
>
> After these changes I saw that old logs go deleted. Still in the topic I
> could see one record which is more than 15 minutes old and not getting
> deleted. What does one have to do to actually delete messages generated n
> minutes ago?
>


Re: Facing Duplication in consumer

2018-05-28 Thread Shantanu Deshmukh
Which Kafka version?

On Mon, May 28, 2018 at 9:09 PM Dinesh Subramanian <
dsubraman...@apptivo.co.in> wrote:

> Hi,
>
> Whenever we bounce the consumer in tomcat node,  I am facing duplication.
> It is consumed from the beginning. I have this property in consumer
> "auto.offset.reset" =  "earliest". if it is new consumer means it will
> consume from the beginning, but it is consumed from the beginning for the
> consumer that we used for last 4 months and consumer offset is committed
> already in partition wise. Any help will be appreciated.
>
>
> *Thanks & Regards,*
>
> *Dinesh S*
>


Re: Frequent consumer rebalances, auto commit failures

2018-05-28 Thread Shantanu Deshmukh
Can anyone here help me please? I am at my wit's end. I now have
max.poll.records set to just 2. Still I am getting Auto offset commit
failed warning. Log file is getting full because of this warning. Session
timeout is 5 minutes, max.poll.interval.ms is 10 minutes.

On Wed, May 23, 2018 at 12:42 PM Shantanu Deshmukh 
wrote:

>
> Hello,
>
> We have a 3 broker Kafka 0.10.0.1 cluster. There we have 3 topics with 10
> partitions each. We have an application which spawns threads as consumers.
> We spawn 5 consumers for each topic. I am observing that consider group
> randomly keeps rebalancing. Then many times we see logs saying "Revoking
> partitions for". This happens almost every 10 minutes. Consumption during
> this time completely stops.
>
> I have applied this configuration
> max.poll.records 20
> heartbeat.interval.ms 1
> Session.timeout.ms 6000
>
> Still this did not help. Strange thing is I observed consumer writing logs
> saying "auto commit failed because poll() loop spent too much time
> processing records" even when there was no data in partition to process. We
> have polling interval of 500 ms, specified as argument in poll(). Initially
> I had set same consumer group for all three topics' consumers. Then I
> specified different CGs for different topics' consumers. Even this is not
> helping.
>
> I am trying to search over the web, checked my code, tried many
> combinations of configuration but still no luck. Please help me.
>
> Thanks & Regards,
>
> Shantanu Deshmukh
>


Re: Effect of settings segment.ms and retention.ms not accurate

2018-05-28 Thread Shantanu Deshmukh
But then I wonder, why such things are not mentioned anywhere in Kafka
configuration document? I relied on that setting and it caused us some
issues. If it is mentioned clearly then everyone will be aware. Could you
please point in right direction about reading timestamp of log message? I
will see about implementing that solution in code.

On Tue, May 29, 2018 at 11:37 AM Matthias J. Sax 
wrote:

> Retention time is a lower bound for how long it is guaranteed that data
> will be stored. This guarantee work "one way" only. There is no
> guarantee when data will be deleted after the bound passed.
>
> However, client side, you can always check the record timestamp and just
> drop older data that is still in the topic.
>
> Hope this helps.
>
>
> -Matthias
>
>
> On 5/28/18 9:18 PM, Shantanu Deshmukh wrote:
> > Please help.
> >
> > On Mon, May 28, 2018 at 5:18 PM Shantanu Deshmukh  >
> > wrote:
> >
> >>  I have a topic otp-sms. I want that retention of this topic should be 5
> >> minutes as OTPs are invalid post that amount of time. So I set
> >> retention.ms=30. However, this was not working. So reading more in
> >> depth in Kafka configuration document I found another topic level
> setting
> >> that can be tuned for topic retention to work properly. So I set
> >> segment.ms=30 as well.
> >>
> >> After these changes I saw that old logs go deleted. Still in the topic I
> >> could see one record which is more than 15 minutes old and not getting
> >> deleted. What does one have to do to actually delete messages generated
> n
> >> minutes ago?
> >>
> >
>
>


Correct usage of consumer groups

2018-05-29 Thread Shantanu Deshmukh
Hello,

Is it wise to use a single consumer group for multiple consumers who
consume from many different topics? Can this lead to frequent rebalance
issues?


Re: Correct usage of consumer groups

2018-05-29 Thread Shantanu Deshmukh
In one of my consumer application, I saw that 3 topics with 10 partitions
each were getting consumed by 5 different consumers having same consumer
group. And this application is seeing a lot of rebalances. Hence, I was
wondering about this.

On Tue, May 29, 2018 at 1:57 PM M. Manna  wrote:

> topic and consumer group have 1-to-many relationship. Each topic partition
> will have the messages guaranteed to be in order. Consumer rebalance issues
> can be adjusted based on the backoff and other params. What is exactly your
> concern regarding consumer group and rebalance?
>
>
>
> On 29 May 2018 at 08:26, Shantanu Deshmukh  wrote:
>
> > Hello,
> >
> > Is it wise to use a single consumer group for multiple consumers who
> > consume from many different topics? Can this lead to frequent rebalance
> > issues?
> >
>


Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
Hello,

We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10
partitions. I have an application which consumes from all these topics by
creating multiple consumer processes. All of these consumers are under a
same consumer group. I am noticing that every time we restart this
application. It takes almost 5 minutes for consumers to start consuming.
What might be going wrong?


Re: Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
No problem, here are consumer properties
-
auto.commit.interval.ms = 3000
auto.offset.reset = latest
bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = otp-notifications-consumer
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 5
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 30
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = 
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer


On Tue, May 29, 2018 at 5:36 PM M. Manna  wrote:

> Hi,
>
> It's not possible to answer questions based on text. You need to share your
> consumer.properties, and server.properties file, and also, what exactly you
> have changed from default configuration.
>
>
>
> On 29 May 2018 at 12:51, Shantanu Deshmukh  wrote:
>
> > Hello,
> >
> > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10
> > partitions. I have an application which consumes from all these topics by
> > creating multiple consumer processes. All of these consumers are under a
> > same consumer group. I am noticing that every time we restart this
> > application. It takes almost 5 minutes for consumers to start consuming.
> > What might be going wrong?
> >
>


Re: Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
 eventsProcessed++;
}
}
} catch (Exception ex) {
kafkaLogger.error(ex);
addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
logger.warn("",ex);
}
}
<<<<<<<<<<

And here are server properties.

broker.id=0
port=9092
delete.topic.enable=true
message.max.bytes=150
listeners=SSL://x.x.x.x:9092
advertised.listeners=SSL://x.x.x.x:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/lotus/kafka-logs
num.partitions=3
auto.topic.creation.enable=false
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=30
ssl.keystore.location=/opt/kafka/certificates/kafka.keystore.jks
ssl.keystore.password=
ssl.key.password=
ssl.truststore.location=/opt/kafka/certificates/kafka.truststore.jks
ssl.truststore.password=
security.inter.broker.protocol=SSL
zookeeper.connect=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
zookeeper.connection.timeout.ms=6000

On Tue, May 29, 2018 at 5:59 PM M. Manna  wrote:

> Thanks..
>
> Where is your consumer code that is consuming messages?
>
> On 29 May 2018 at 13:18, Shantanu Deshmukh  wrote:
>
> > No problem, here are consumer properties
> > -
> > auto.commit.interval.ms = 3000
> > auto.offset.reset = latest
> > bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> > check.crcs = true
> > client.id =
> > connections.max.idle.ms = 54
> > enable.auto.commit = true
> > exclude.internal.topics = true
> > fetch.max.bytes = 52428800
> > fetch.max.wait.ms = 500
> > fetch.min.bytes = 1
> > group.id = otp-notifications-consumer
> > heartbeat.interval.ms = 3000
> > interceptor.classes = null
> > key.deserializer = class
> > org.apache.kafka.common.serialization.StringDeserializer
> > max.partition.fetch.bytes = 1048576
> > max.poll.interval.ms = 30
> > max.poll.records = 5
> > metadata.max.age.ms = 30
> > metric.reporters = []
> > metrics.num.samples = 2
> > metrics.sample.window.ms = 3
> > partition.assignment.strategy = [class
> > org.apache.kafka.clients.consumer.RangeAssignor]
> > receive.buffer.bytes = 65536
> > reconnect.backoff.ms = 50
> > request.timeout.ms = 305000
> > retry.backoff.ms = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > sasl.kerberos.min.time.before.relogin = 6
> > sasl.kerberos.service.name = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > sasl.mechanism = GSSAPI
> > security.protocol = SSL
> > send.buffer.bytes = 131072
> > session.timeout.ms = 30
> > ssl.cipher.suites = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > ssl.endpoint.identification.algorithm = null
> > ssl.key.password = null
> > ssl.keymanager.algorithm = SunX509
> > ssl.keystore.location = null
> > ssl.keystore.password = null
> > ssl.keystore.type = JKS
> > ssl.protocol = TLS
> > ssl.provider = null
> > ssl.secure.random.implementation = null
> > ssl.trustmanager.algorithm = PKIX
> > ssl.truststore.location = 
> > ssl.truststore.password = [hidden]
> > ssl.truststore.type = JKS
> > value.deserializer = class
> > org.apache.kafka.common.serialization.StringDeserializer
> > 
> >
> > On Tue, May 29, 2018 at 5:36 PM M. Manna  wrote:
> >
> > > Hi,
> > >
> > > It's not possible to answer questions based on text. You need to share
> > your
> > > consumer.properties, and server.properties file, and also, what exactly
> > you
> > > have changed from default configuration.
> > >
> > >
> > >
> > > On 29 May 2018 at 12:51, Shantanu Deshmukh 
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with
> 10
> > > > partitions. I have an application which consumes from all these
> topics
> > by
> > > > creating multiple consumer processes. All of these consumers are
> under
> > a
> > > > same consumer group. I am noticing that every time we restart this
> > > > application. It takes almost 5 minutes for consumers to start
> > consuming.
> > > > What might be going wrong?
> > > >
> > >
> >
>


Re: Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
I cannot because there are messages which need high priority. Setting poll
interval to 4 second means there might be delay of 4 seconds + regular
processing time, which is not desirable.

Also, will it impact heartbeating?

On Tue, May 29, 2018 at 6:17 PM M. Manna  wrote:

> Have you tried increase the poll time higher, e.g. 4000 and see if that
> helps matters?
>
> On 29 May 2018 at 13:44, Shantanu Deshmukh  wrote:
>
> > Here is the code which consuming messages
> >
> > >>>>>>>>
> > while(true && startShutdown == false) {
> > Context context = new Context();
> > JSONObject notifJSON = new JSONObject();
> > String notificationMsg = "";
> > NotificationEvent notifEvent = null;
> > initializeContext();
> > try {
> > consumerConnect();
> > ConsumerRecords records = consumer.poll(100);
> > if(records.count() == 0) {
> > //logger.trace("No records in topic: "+this.topic);
> > continue;
> > }
> > for(ConsumerRecord record : records) {
> > try {
> > long totalStart = System.currentTimeMillis();
> > notificationMsg = record.value();
> > JSONParser jsonParser = new JSONParser();
> > logger.trace("Kafka-Msg: >>"+notificationMsg);
> > if(notificationMsg.equals("")) {
> > continue;
> > }
> > Profiler.start(workerId, "json-parse");
> > notifJSON   =
> > (JSONObject)jsonParser.parse(notificationMsg);
> > Profiler.end(workerId, "json-parse");
> > notifEvent= new NotificationEvent(notifJSON);
> > if( notifEvent.getTransactionID().equals("") == true ) {
> > notifEvent.generateTransactionID();
> > }
> > context.setEventObject(notifEvent);
> > updateContext(context);
> >
> > // Fetch template ==//
> > Profiler.start(workerId, "tpl-fetch");
> > long start = System.currentTimeMillis();
> > Template template   =
> > notifTplMngr.fetchTemplate(notifEvent);
> >
> > logger.trace("fetch-tpl:"+(System.currentTimeMillis()-start));
> > Profiler.end(workerId, "tpl-fetch");
> >
> > // Personalise template ==//
> > Profiler.start(workerId, "personalisation");
> > start = System.currentTimeMillis();
> > String message  =
> > NotificationTemplatePersonaliser.personaliseAuto(template, notifEvent);
> >
> > notifEvent.setMaskedMessage(NotificationTemplatePersonalis
> > er.getMaskedContent(template,
> > notifEvent));
> >
> > logger.trace("personalise:"+(System.currentTimeMillis()-start));
> > Profiler.end(workerId, "personalisation");
> >
> > context.setEventObject(notifEvent);
> > updateContext(context);
> >
> > // Send notification==//
> > Profiler.start(workerId, "notif-dispatch");
> > postOffice.sendNotification(message, notifEvent);
> > Profiler.end(workerId, "notif-dispatch");
> >
> > retryCount = 0;
> > logger.debug("Time to complete notification dispatch
> > :"+(System.currentTimeMillis()-totalStart));
> > if(startShutdown == true) {
> > break;
> > }
> > } catch (Exception ex) {
> > if(ex instanceof RetriableException) {
> > kafkaLogger.error(ex);
> > logger.warn("",ex);
> > addToFailedQueue(notifJSON, ex.getMessage(),
> > CODE_RETRIABLE_FAILURE);
> > } else if(ex instanceof InvalidEventException) {
> >
> > JsonLog jsonLog = new JsonLog();
> > jsonLog.setDescription("Invalid event message.
> Reason:
> > "+ex.getMessage());
> > jsonLog.setOriginalPayload(notificationMsg);
> > jsonLog.setEventType("ERROR");
> > jsonLog.setC

Re: Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
No, no dynamic topic creation.

On Tue, May 29, 2018 at 6:38 PM Jaikiran Pai 
wrote:

> Are your topics dynamically created? If so, see this
> threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg67224.html
>
> -Jaikiran
>
>
> On 29/05/18 5:21 PM, Shantanu Deshmukh wrote:
> > Hello,
> >
> > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10
> > partitions. I have an application which consumes from all these topics by
> > creating multiple consumer processes. All of these consumers are under a
> > same consumer group. I am noticing that every time we restart this
> > application. It takes almost 5 minutes for consumers to start consuming.
> > What might be going wrong?
> >
>
>


Re: Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
Thanks for your suggestion. However, this doesn't seem applicable for our
Kafka version. We are using 0.10.0.1

On Tue, May 29, 2018 at 7:04 PM Manikumar  wrote:

> Pls check "group.initial.rebalance.delay.ms" broker config property.  This
> will be the delay for the initial consumer rebalance.
>
> from docs
>
> "The rebalance will be further delayed by the value of
> group.initial.rebalance.delay.ms as new members join the group,
>  up to a maximum of max.poll.interval.ms"
>
>
> http://kafka.apache.org/documentation/#upgrade_1100_notable
>
> On Tue, May 29, 2018 at 6:51 PM, Shantanu Deshmukh 
> wrote:
>
> > No, no dynamic topic creation.
> >
> > On Tue, May 29, 2018 at 6:38 PM Jaikiran Pai 
> > wrote:
> >
> > > Are your topics dynamically created? If so, see this
> > > threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg67224.html
> > >
> > > -Jaikiran
> > >
> > >
> > > On 29/05/18 5:21 PM, Shantanu Deshmukh wrote:
> > > > Hello,
> > > >
> > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with
> 10
> > > > partitions. I have an application which consumes from all these
> topics
> > by
> > > > creating multiple consumer processes. All of these consumers are
> under
> > a
> > > > same consumer group. I am noticing that every time we restart this
> > > > application. It takes almost 5 minutes for consumers to start
> > consuming.
> > > > What might be going wrong?
> > > >
> > >
> > >
> >
>


Re: Correct usage of consumer groups

2018-05-29 Thread Shantanu Deshmukh
So can we roll segments more often? If the segments are small enough
probability of messages in a single segment reaching expiry will be higher.
However, will frequent roll-up of segments cause some side effects? Like
increased CPU, memory usage etc?

On Tue, May 29, 2018 at 11:52 PM Matthias J. Sax 
wrote:

> About the docs:
>
> Config `cleanup.policy` states:
>
> > A string that is either "delete" or "compact".
> > This string designates the retention policy to
> > use on old log segments. The default policy> ("delete") will discard old
> segments when their
> > retention time or size limit has been reached.> The "compact" setting
> will enable log
> > compaction on the topic.
>
> Because deletions happens based on segments, it it clear that some
> messages are retained longer, because a segment can only be dropped if
> _all_ messages in a segment passed the retention time.
>
> Does this make sense?
>
> Of course, we are always happy to improve the docs. Feel free to do a PR :)
>
>
> -Matthias
>
>
> On 5/29/18 3:01 AM, Shantanu Deshmukh wrote:
> > In one of my consumer application, I saw that 3 topics with 10 partitions
> > each were getting consumed by 5 different consumers having same consumer
> > group. And this application is seeing a lot of rebalances. Hence, I was
> > wondering about this.
> >
> > On Tue, May 29, 2018 at 1:57 PM M. Manna  wrote:
> >
> >> topic and consumer group have 1-to-many relationship. Each topic
> partition
> >> will have the messages guaranteed to be in order. Consumer rebalance
> issues
> >> can be adjusted based on the backoff and other params. What is exactly
> your
> >> concern regarding consumer group and rebalance?
> >>
> >>
> >>
> >> On 29 May 2018 at 08:26, Shantanu Deshmukh 
> wrote:
> >>
> >>> Hello,
> >>>
> >>> Is it wise to use a single consumer group for multiple consumers who
> >>> consume from many different topics? Can this lead to frequent rebalance
> >>> issues?
> >>>
> >>
> >
>
>


Re: Effect of settings segment.ms and retention.ms not accurate

2018-05-29 Thread Shantanu Deshmukh
This is helpful. Thanks a lot :-)

On Tue, May 29, 2018 at 11:47 PM Matthias J. Sax 
wrote:

> ConsumerRecord#timestamp()
>
> similar to ConsumerRecord#key() and ConsumerRecord#value()
>
>
> -Matthias
>
> On 5/28/18 11:22 PM, Shantanu Deshmukh wrote:
> > But then I wonder, why such things are not mentioned anywhere in Kafka
> > configuration document? I relied on that setting and it caused us some
> > issues. If it is mentioned clearly then everyone will be aware. Could you
> > please point in right direction about reading timestamp of log message? I
> > will see about implementing that solution in code.
> >
> > On Tue, May 29, 2018 at 11:37 AM Matthias J. Sax 
> > wrote:
> >
> >> Retention time is a lower bound for how long it is guaranteed that data
> >> will be stored. This guarantee work "one way" only. There is no
> >> guarantee when data will be deleted after the bound passed.
> >>
> >> However, client side, you can always check the record timestamp and just
> >> drop older data that is still in the topic.
> >>
> >> Hope this helps.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/28/18 9:18 PM, Shantanu Deshmukh wrote:
> >>> Please help.
> >>>
> >>> On Mon, May 28, 2018 at 5:18 PM Shantanu Deshmukh <
> shantanu...@gmail.com
> >>>
> >>> wrote:
> >>>
> >>>>  I have a topic otp-sms. I want that retention of this topic should
> be 5
> >>>> minutes as OTPs are invalid post that amount of time. So I set
> >>>> retention.ms=30. However, this was not working. So reading more
> in
> >>>> depth in Kafka configuration document I found another topic level
> >> setting
> >>>> that can be tuned for topic retention to work properly. So I set
> >>>> segment.ms=30 as well.
> >>>>
> >>>> After these changes I saw that old logs go deleted. Still in the
> topic I
> >>>> could see one record which is more than 15 minutes old and not getting
> >>>> deleted. What does one have to do to actually delete messages
> generated
> >> n
> >>>> minutes ago?
> >>>>
> >>>
> >>
> >>
> >
>
>


Re: retention.ms not honored for topic

2018-05-29 Thread Shantanu Deshmukh
Hey,

You should try setting topic level config by doing kafka-topics.sh --alter
--topic  --config = --zookeeper 

Make sure you also set segment.ms for topics which are not that populous.
This setting specifies amount of time after which a new segment is rolled.
So Kafka deletes only those messages which lie in a segment which is old or
full. Basically Kafka doesn't touch current segment. So if we roll soon
enough changes of messages in it getting eligible for retention.ms setting
increases. I am not fully sure what effect it might have on cluster
resources if segment.ms value is kept  too low, as broker might spend too
much resources just rolling many segments. So keep it some reasonable value.

On Tue, May 29, 2018 at 9:31 PM Thomas Hays  wrote:

> A single topic does not appear to be honoring the retention.ms
> setting. Three other topics (plus __consumer_offsets) on the Kafka
> instance are deleting segments normally.
>
> Kafka version: 2.12-0.10.2.1
> OS: CentOS 7
> Java: openjdk version "1.8.0_161"
> Zookeeper: 3.4.6
>
> Retention settings (from kafka-topics.sh describe): Topic:elk
> PartitionCount:50 ReplicationFactor:2 Configs:retention.ms=720
>
> Other config settings from server.properties
>
> log.retention.hours=48
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=30
>
> Looking in the data directory, I see multiple segment files older than 48
> hours:
>
> -rw-r--r-- 1 root root 1073676782 May 26 20:16 004713142447.log
> -rw-r--r-- 1 root root 1073105605 May 26 20:18 004715239774.log
> -rw-r--r-- 1 root root 1072907965 May 26 20:20 004717450325.log
>
> Current date/time on server: Tue May 29 10:51:49 CDT 2018
>
> This issue appears on all Kafka brokers and I have tried multiple
> rolling restarts of all Kafka brokers and the issue remains. These
> servers stopped deleting segments for this topic on May 15. This does
> not correlate to any known config change. I have found no
> error/warning messages in the logs to indicate a problem.
>
> What am I missing? Thank you.
>


Re: Best Practice for Consumer Liveliness and avoid frequent rebalancing

2018-05-31 Thread Shantanu Deshmukh
Do you want to avoid rebalancing in such way that if a consumer exits then
its previously owned partition should be left disowned? But then who will
consume from partition that was deserted by a exiting consumer? In such
case you can go for manual partition assignment. Then there is no question
of consumer-group management and subsequently rebalancing.

On Thu, May 31, 2018 at 6:00 PM M. Manna  wrote:

> Hello,
>
> We are trying to move from single partition to multi-partition approach for
> our topics. The purpose is:
>
> 1) Each production/testbed server will have a non-Daemon thread (consumer)
> running.
> 2) It will consume messages, commit offset (manual), and determine next
> steps if commit fails, app fails etc.
> 3) Ideally, 1 partition per server (consumer). If rebalance occurs, first
> (lexi ordered) server will end up having additional partition(s).
>
> As I previously understood, and also read Consumer article by Jason
> Gustafson
> <
> https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
> >,
> we should always close consumers for resource optimisation. But departing
> from a consumer group means that a rebalance will occur. In our case, we
> would like every consumer to be alive (and sleep for a while) but still
> send heartbeat so that rebalancing effort is saved. But we re worried
> whether this might cause memory leak in our application.
>
> In other words, if we don't restart the servers (shutdown hook), we would
> like to avoid invoking KafkaConsumer#close().
>
> Has anyone got similar use case that they can share with us? We are simply
> interested to know whether this is a "use-case" scenario or not a good
> practice to keep consumers alive.
>
> Any suggestion/help is appreciated.
>
> Regards,
>


Re: Frequent consumer rebalances, auto commit failures

2018-06-03 Thread Shantanu Deshmukh
Hi,
I do not have trace level logs as of now.
I am doing very basic operation with messages. The only time consuming part
is sending an e-mail. Our Email servers are very slow so sending one email
is taking upto 20 seconds. That's why I turned max.poll.records to just 2,
keppt session time out at 10 minutes. Still rebalances would happen.

However, there's an update. When I was trying potential config tuning I set
max.poll.interval.ms to 3 minutes. Later on I found that this setting is
not meant for Kafka 0.10.0.1 which we are using. So I removed that setting.
Now after more than a week since that was done, I haven't seen any
rebalance issue. But, still slow consumer startup issue persists.  Whenever
I restart my consumer process for almost 5 minutes there is no activity. I
checked in broker logs at that time I saw message "preparing to stabilise
consumer group", then there is a gap of 5 minutes and message "stabilized
group". What could be happening here?

On Fri, Jun 1, 2018 at 10:40 PM Ken Chen  wrote:

> 1. Any detail logs ?
> 2. How do you process the records after you polled the records?
> 3. How much time does it take for every round of poll ?
>
> Thanks !
>
> --
> Sent from my iPhone
>
> On May 28, 2018, at 10:44 PM, Shantanu Deshmukh 
> wrote:
>
> Can anyone here help me please? I am at my wit's end. I now have
> max.poll.records set to just 2. Still I am getting Auto offset commit
> failed warning. Log file is getting full because of this warning. Session
> timeout is 5 minutes, max.poll.interval.ms is 10 minutes.
>
> On Wed, May 23, 2018 at 12:42 PM Shantanu Deshmukh 
> wrote:
>
> >
> > Hello,
> >
> > We have a 3 broker Kafka 0.10.0.1 cluster. There we have 3 topics with 10
> > partitions each. We have an application which spawns threads as
> consumers.
> > We spawn 5 consumers for each topic. I am observing that consider group
> > randomly keeps rebalancing. Then many times we see logs saying "Revoking
> > partitions for". This happens almost every 10 minutes. Consumption during
> > this time completely stops.
> >
> > I have applied this configuration
> > max.poll.records 20
> > heartbeat.interval.ms 1
> > Session.timeout.ms 6000
> >
> > Still this did not help. Strange thing is I observed consumer writing
> logs
> > saying "auto commit failed because poll() loop spent too much time
> > processing records" even when there was no data in partition to process.
> We
> > have polling interval of 500 ms, specified as argument in poll().
> Initially
> > I had set same consumer group for all three topics' consumers. Then I
> > specified different CGs for different topics' consumers. Even this is not
> > helping.
> >
> > I am trying to search over the web, checked my code, tried many
> > combinations of configuration but still no luck. Please help me.
> >
> > Thanks & Regards,
> >
> > Shantanu Deshmukh
> >
>


Frequent "offset out of range" messages, partitions deserted by consumer

2018-06-14 Thread Shantanu Deshmukh
hanks & Regards,

Shantanu Deshmukh


Re: Frequent "offset out of range" messages, partitions deserted by consumer

2018-06-14 Thread Shantanu Deshmukh
Any help please.

On Thu, Jun 14, 2018 at 2:39 PM Shantanu Deshmukh 
wrote:

> We have a consumer application which has a single consumer group
> connecting to multiple topics. We are seeing strange behaviour in consumer
> logs. With these lines
>
>  Fetch offset 1109143 is out of range for partition otp-email-4, resetting
> offset
>  Fetch offset 952168 is out of range for partition otp-email-7, resetting
> offset
>  Fetch offset 945796 is out of range for partition otp-email-5, resetting
> offset
>  Fetch offset 950900 is out of range for partition otp-email-0, resetting
> offset
>  Fetch offset 953163 is out of range for partition otp-email-3, resetting
> offset
>  Fetch offset 1118389 is out of range for partition otp-email-6, resetting
> offset
>  Fetch offset 1112177 is out of range for partition otp-email-2, resetting
> offset
>  Fetch offset 1109539 is out of range for partition otp-email-1, resetting
> offset
>
> Some time later we saw these logs
>
> [2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:333 - Revoking
> previously assigned partitions [bulk-email-4, bulk-email-3, bulk-email-0,
> bulk-email-2, bulk-email-1] for group notifications-consumer
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator:381 - (Re-)joining
> group notifications-consumer
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:225 - Setting newly
> assigned partitions [bulk-email-8, bulk-email-7, bulk-email-9,
> bulk-email-6, bulk-email-5] for group notifications-consumer
> [2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:225 - Setting newly
> assigned partitions [transactional-sms-3, transactional-sms-2,
> transactional-sms-1, transactional-sms-0] for group notifications-consumer
> [2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:225 - Setting newly
> assigned partitions [transactional-sms-9, transactional-sms-8,
> transactional-sms-7] for group notifications-consumer
>
> I noticed that one of our topics was not seen in the list of *Setting
> newly assigned partitions*. Then that topic had no consumers attached to
> it for 8 hours at least. It's only when someone restarted application it
> started consuming from that topic. What can be going wrong here?
>
> Here is consumer config
>
> auto.commit.interval.ms = 3000
> auto.offset.reset = latest
> bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> check.crcs = true
> client.id =
> connections.max.idle.ms = 54
> enable.auto.commit = true
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = otp-notifications-consumer
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 50
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.ms = 50
> request.timeout.ms = 305000
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = SSL
> send.buffer.bytes = 131072
> session.timeout.ms = 30
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.e

Re: Frequent "offset out of range" messages, partitions deserted by consumer

2018-06-19 Thread Shantanu Deshmukh
I desperately need help. Facing this issue on production since a while now.
Someone please help me out.

On Fri, Jun 15, 2018 at 2:02 AM Lawrence Weikum  wrote:

> unsubscribe
>
>


Re: Frequent "offset out of range" messages, partitions deserted by consumer

2018-06-19 Thread Shantanu Deshmukh
It is happening via auto-commit. Frequence is 3000 ms

On Wed, Jun 20, 2018 at 10:31 AM Liam Clarke 
wrote:

> How frequently are your consumers committing offsets?
>
> On Wed, 20 Jun. 2018, 4:52 pm Shantanu Deshmukh, 
> wrote:
>
> > I desperately need help. Facing this issue on production since a while
> now.
> > Someone please help me out.
> >
> > On Fri, Jun 15, 2018 at 2:02 AM Lawrence Weikum 
> > wrote:
> >
> > > unsubscribe
> > >
> > >
> >
>


Re: Frequent "offset out of range" messages, partitions deserted by consumer

2018-06-21 Thread Shantanu Deshmukh
conusmer is always consuming. There's a trickle of messages which always
keep flowing. However, during 1am to 5am there are almost no messages.

On Wed, Jun 20, 2018 at 11:31 AM Liam Clarke 
wrote:

>  How often is the consumer actually consuming? I know there's an issue
> where old committed offsets expire after a period of time.
>
> On Wed, 20 Jun. 2018, 5:46 pm Shantanu Deshmukh, 
> wrote:
>
> > It is happening via auto-commit. Frequence is 3000 ms
> >
> > On Wed, Jun 20, 2018 at 10:31 AM Liam Clarke 
> > wrote:
> >
> > > How frequently are your consumers committing offsets?
> > >
> > > On Wed, 20 Jun. 2018, 4:52 pm Shantanu Deshmukh, <
> shantanu...@gmail.com>
> > > wrote:
> > >
> > > > I desperately need help. Facing this issue on production since a
> while
> > > now.
> > > > Someone please help me out.
> > > >
> > > > On Fri, Jun 15, 2018 at 2:02 AM Lawrence Weikum  >
> > > > wrote:
> > > >
> > > > > unsubscribe
> > > > >
> > > > >
> > > >
> > >
> >
>


Very long consumer rebalances

2018-07-06 Thread Shantanu Deshmukh
Hello everyone,

We are running a 3 broker Kafka 0.10.0.1 cluster. We have a java app which
spawns many consumer threads consuming from different topics. For every
topic we have specified different consumer-group. A lot of times I see that
whenever this application is restarted a CG on one or two topics takes more
than 5 minutes to receive partition assignment. Till that time consumers
for that topic don't consumer anything. If I go to Kafka broker and run
consumer-groups.sh and describe that particular CG I see that it is
rebalancing. There is time critical data stored in that topic and we cannot
tolerate such long delays. What can be the reason for such long rebalances.

Here's our consumer config


auto.commit.interval.ms = 3000
auto.offset.reset = latest
bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = otp-notifications-consumer
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 50
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 30
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /x/x/client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer

Please help.

*Thanks & Regards,*
*Shantanu Deshmukh*


Re: Very long consumer rebalances

2018-07-09 Thread Shantanu Deshmukh
Kind people on this group, please help me!

On Fri, Jul 6, 2018 at 3:24 PM Shantanu Deshmukh 
wrote:

> Hello everyone,
>
> We are running a 3 broker Kafka 0.10.0.1 cluster. We have a java app which
> spawns many consumer threads consuming from different topics. For every
> topic we have specified different consumer-group. A lot of times I see that
> whenever this application is restarted a CG on one or two topics takes more
> than 5 minutes to receive partition assignment. Till that time consumers
> for that topic don't consumer anything. If I go to Kafka broker and run
> consumer-groups.sh and describe that particular CG I see that it is
> rebalancing. There is time critical data stored in that topic and we cannot
> tolerate such long delays. What can be the reason for such long rebalances.
>
> Here's our consumer config
>
>
> auto.commit.interval.ms = 3000
> auto.offset.reset = latest
> bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> check.crcs = true
> client.id =
> connections.max.idle.ms = 54
> enable.auto.commit = true
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = otp-notifications-consumer
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 50
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.ms = 50
> request.timeout.ms = 305000
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = SSL
> send.buffer.bytes = 131072
> session.timeout.ms = 30
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = /x/x/client.truststore.jks
> ssl.truststore.password = [hidden]
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
>
> Please help.
>
> *Thanks & Regards,*
> *Shantanu Deshmukh*
>


Re: Very long consumer rebalances

2018-07-12 Thread Shantanu Deshmukh
Hi Steve,

Could you please shed more light on this? What section should I revisit? I
am using high-level consumer. So I am simply calling consumer.close() when
I am shutting down the process. Is there any other method to be called
before calling close()?

On Mon, Jul 9, 2018 at 5:58 PM Steve Tian  wrote:

> Please re-read the javadoc of KafkaConsumer, make sure you know how to
> wakeup/close consumer properly while shutting down your application.  Try
> to understand the motivation of KIP-62 and adjust related timeout.
>
> On Mon, Jul 9, 2018, 8:05 PM harish lohar  wrote:
>
> > Try reducing below timer
> > metadata.max.age.ms = 30
> >
> >
> > On Fri, Jul 6, 2018 at 5:55 AM Shantanu Deshmukh 
> > wrote:
> >
> > > Hello everyone,
> > >
> > > We are running a 3 broker Kafka 0.10.0.1 cluster. We have a java app
> > which
> > > spawns many consumer threads consuming from different topics. For every
> > > topic we have specified different consumer-group. A lot of times I see
> > that
> > > whenever this application is restarted a CG on one or two topics takes
> > more
> > > than 5 minutes to receive partition assignment. Till that time
> consumers
> > > for that topic don't consumer anything. If I go to Kafka broker and run
> > > consumer-groups.sh and describe that particular CG I see that it is
> > > rebalancing. There is time critical data stored in that topic and we
> > cannot
> > > tolerate such long delays. What can be the reason for such long
> > rebalances.
> > >
> > > Here's our consumer config
> > >
> > >
> > > auto.commit.interval.ms = 3000
> > > auto.offset.reset = latest
> > > bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> > > check.crcs = true
> > > client.id =
> > > connections.max.idle.ms = 54
> > > enable.auto.commit = true
> > > exclude.internal.topics = true
> > > fetch.max.bytes = 52428800
> > > fetch.max.wait.ms = 500
> > > fetch.min.bytes = 1
> > > group.id = otp-notifications-consumer
> > > heartbeat.interval.ms = 3000
> > > interceptor.classes = null
> > > key.deserializer = class
> > > org.apache.kafka.common.serialization.StringDeserializer
> > > max.partition.fetch.bytes = 1048576
> > > max.poll.interval.ms = 30
> > > max.poll.records = 50
> > > metadata.max.age.ms = 30
> > > metric.reporters = []
> > > metrics.num.samples = 2
> > > metrics.sample.window.ms = 3
> > > partition.assignment.strategy = [class
> > > org.apache.kafka.clients.consumer.RangeAssignor]
> > > receive.buffer.bytes = 65536
> > > reconnect.backoff.ms = 50
> > > request.timeout.ms = 305000
> > > retry.backoff.ms = 100
> > > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > > sasl.kerberos.min.time.before.relogin = 6
> > > sasl.kerberos.service.name = null
> > > sasl.kerberos.ticket.renew.jitter = 0.05
> > > sasl.kerberos.ticket.renew.window.factor = 0.8
> > > sasl.mechanism = GSSAPI
> > > security.protocol = SSL
> > > send.buffer.bytes = 131072
> > > session.timeout.ms = 30
> > > ssl.cipher.suites = null
> > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > > ssl.endpoint.identification.algorithm = null
> > > ssl.key.password = null
> > > ssl.keymanager.algorithm = SunX509
> > > ssl.keystore.location = null
> > > ssl.keystore.password = null
> > > ssl.keystore.type = JKS
> > > ssl.protocol = TLS
> > > ssl.provider = null
> > > ssl.secure.random.implementation = null
> > > ssl.trustmanager.algorithm = PKIX
> > > ssl.truststore.location = /x/x/client.truststore.jks
> > > ssl.truststore.password = [hidden]
> > > ssl.truststore.type = JKS
> > > value.deserializer = class
> > > org.apache.kafka.common.serialization.StringDeserializer
> > >
> > > Please help.
> > >
> > > *Thanks & Regards,*
> > > *Shantanu Deshmukh*
> > >
> >
>


Zookeeper logging “exception causing close of session 0x0” infinitely in logs

2018-08-06 Thread Shantanu Deshmukh
 We have a cluster of 3 kafka+zookeeper. Only on one of our zookeeper
servers we are seeing these logs infinitely getting written in
zookeeper.out log file

WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCxn@1033] -
Exception causing close of session 0x0 due to java.io.Exception
INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCxn@1033] -
Closed socket connection from /10.189.177.31:65429 (no session
established for client)

I have no idea what this server 10.189.177.31 is. No Kafka consumer is
running on this machine.

No changes to zookeeper was made. Due to disk full issue cluster had
crashed. So we started all brokers and zookeepers. Everything went well
except for this one. What can be done for this case? As zk is logging so
heavily it will fill up the disk again with just these logs. Please help


Re: Very long consumer rebalances

2018-08-09 Thread Shantanu Deshmukh
 I am facing too many problems these days. Now one of our consumer groups
is rebalancing every now and then. And rebalance takes very low, more than
5-10 minutes. Even after re-balancing I see that only half of the consumers
are active/receive assignment. Its all going haywire.

I am seeing these logs in kafka consumer logs. Can anyone help me
understand what is going on here? It is a very long piece of log, but
someone please help me. I am desperately looking for any solution since
more than 2 months now. But to no avail.

[2018-08-09 11:39:51] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:53] :: DEBUG ::
ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
bulk-email-consumer committed offset 25465113 for partition bulk-email-8
[2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
autocommit of offsets {bulk-email-8=OffsetAndMetadata{offset=25465113,
metadata=''}} for group bulk-email-consumer
[2018-08-09 11:39:53] :: DEBUG ::
ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
bulk-email-consumer committed offset 25463566 for partition bulk-email-6
[2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
autocommit of offsets {bulk-email-6=OffsetAndMetadata{offset=25463566,
metadata=''}} for group bulk-email-consumer
[2018-08-09 11:39:53] :: DEBUG ::
ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
bulk-email-consumer committed offset 2588 for partition bulk-email-9
[2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
autocommit of offsets {bulk-email-9=OffsetAndMetadata{offset=2588,
metadata=''}} for group bulk-email-consumer
[2018-08-09 11:39:54] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:54] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:54] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:54] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:54] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:54] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:54] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:54] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:56] :: DEBUG ::
ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
bulk-email-consumer committed offset 25463566 for partition bulk-email-6
[2018-08-09 11:39:56] :: DEBUG ::
ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
bulk-email-consumer committed offset 2588 for partition bulk-email-9
[2018-08-09 11:39:56] :: DEBUG ::
ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
bulk-email-consumer committed offset 25465113 for partition bulk-email-8
[2018-08-09 11:39:56] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
autocommit of offsets {bulk-email-6=OffsetAndMetadata{offset=25463566,
metadata=''}} for group bulk-email-consumer
[2018-08-09 11:39:56] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
autocommit of offsets {bulk-email-9=OffsetAndMetadata{offset=2588,
metadata=''}} for group bulk-email-consumer
[2018-08-09 11:39:56] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
autocommit of offsets {bulk-email-8=OffsetAndMetadata{offset=25465113,
metadata=''}} for group bulk-email-consumer
[2018-08-09 11:39:57] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:57] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:57] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:57] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:57] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:57] :: DEBUG ::
AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
heartbeat response for group bulk-email-consumer
[2018-08-09 11:39:57] :: DEBUG ::
A

Re: Very long consumer rebalances

2018-08-09 Thread Shantanu Deshmukh
Hi,

Yes my consumer application works like below

   1. Reads how many workers are required to process each topics from
   properties file
   2. As many threads are spawned as there are workers mentioned in
   properties file, topic name is passed to this thread. FixedThreadPool
   implementation is used.
   3. Each worker thread initializes one consumer object and subscribes to
   given topic. Consumer group is simply -consumer. So if my topic
   bulk-email, then consumer group for all those threads is bulk-email-consumer
   4. Once this is done, inside an infinite while loop consumer.poll(100)
   method keeps running. So this application is a daemon. Shuts down only when
   server shuts down or in case of kill command.

I have configured session.timeout.ms in consumer properties. I haven't done
anything about zookeeper timeout. Is it required now? Since consumer
accesses only the brokers.

On Thu, Aug 9, 2018 at 3:03 PM M. Manna  wrote:

> In the simplest way, how have you implemented your consumer?
>
> 1) Does your consumers join a designated group, process messages, and then
> closes all connection? Or does it stay open perpetually until server
> shutdown?
> 2) Have you configured the session timeouts for client and zookeeper
> accordingly?
>
> Regards,
>
> On 9 August 2018 at 08:00, Shantanu Deshmukh 
> wrote:
>
> >  I am facing too many problems these days. Now one of our consumer groups
> > is rebalancing every now and then. And rebalance takes very low, more
> than
> > 5-10 minutes. Even after re-balancing I see that only half of the
> consumers
> > are active/receive assignment. Its all going haywire.
> >
> > I am seeing these logs in kafka consumer logs. Can anyone help me
> > understand what is going on here? It is a very long piece of log, but
> > someone please help me. I am desperately looking for any solution since
> > more than 2 months now. But to no avail.
> >
> > [2018-08-09 11:39:51] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:53] :: DEBUG ::
> > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > bulk-email-consumer committed offset 25465113 for partition bulk-email-8
> > [2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
> > autocommit of offsets {bulk-email-8=OffsetAndMetadata{offset=25465113,
> > metadata=''}} for group bulk-email-consumer
> > [2018-08-09 11:39:53] :: DEBUG ::
> > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > bulk-email-consumer committed offset 25463566 for partition bulk-email-6
> > [2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
> > autocommit of offsets {bulk-email-6=OffsetAndMetadata{offset=25463566,
> > metadata=''}} for group bulk-email-consumer
> > [2018-08-09 11:39:53] :: DEBUG ::
> > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > bulk-email-consumer committed offset 2588 for partition bulk-email-9
> > [2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
> > autocommit of offsets {bulk-email-9=OffsetAndMetadata{offset=2588,
> > metadata=''}} for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:56] :: DEBUG 

Re: Looking for help with a question on the consumer API

2018-08-09 Thread Shantanu Deshmukh
Consumer gets kicked out if it fails to send heart beat in designated time
period. Every call to poll sends one heart beat to consumer group
coordinator.

You need to look at *how much time is it taking to process your single
record*. *Maybe it is exceeding session.timeout.ms
 value* which you have set as 30s. Try
increasing that. Also *keep max.poll.records to a lower value*. This
setting determines how many records are fetched after the call to poll
method. If you fetch too many records then even if you keep
session.timeout.ms to a large value your consumer might still get kicked
out and group will enter rebalancing stage.

On Thu, Aug 9, 2018 at 2:00 AM Moiz Raja (moraja) 
wrote:

> Hi All,
>
> I have an issue with the consumer getting kicked out of the group possibly
> due to other issues going on in the system. The issue is detailed here
> https://stackoverflow.com/questions/51754794/how-to-reinstate-a-kafka-consumer-which-has-been-kicked-out-of-the-group
>
> Any help with this issue would be appreciated.
>
> Regards,
> -Moiz
>


Re: Very long consumer rebalances

2018-08-10 Thread Shantanu Deshmukh
All this is fine. But what I do not understand is, why only some of the
consumer groups start very late. We have 7 topics and their consumers
belong to 7 different CGs. Whenever I want to restart my application Only
one or two of them will start very late. It takes almost 5-10 minutes
before their consumers start receiving data.
When that delay occurs there is absolutely no logs generated by consumer
library.

On Fri, Aug 10, 2018 at 2:55 PM M. Manna  wrote:

> if you can upgrade, I would say upgrading to 0.10.2.x would be better for
> you (or even higher, 2.0.0). Otherwise you have to play around with
> max.poll.records and session.timeout.ms.
>
> As the doc says (or newer versions), the adjustment should be such that
> request.timeout.ms >= max.poll.interval.ms. Also, heartbeat.interval.ms
> should be curbed at (rule of thumb) 30% of session.timeout.ms.
>
> Lastly, all these have to be within the bounds of
> group.min.session.timeout.ms and group.max.session.timeout.ms.
>
> You can check all these, tune them as necessary, and retry. Some of these
> configs may or may not be applicable at runtime. so a rolling restart may
> be required before all changes take place.
>
> On 9 August 2018 at 13:48, Shantanu Deshmukh 
> wrote:
>
> > Hi,
> >
> > Yes my consumer application works like below
> >
> >1. Reads how many workers are required to process each topics from
> >properties file
> >2. As many threads are spawned as there are workers mentioned in
> >properties file, topic name is passed to this thread. FixedThreadPool
> >implementation is used.
> >3. Each worker thread initializes one consumer object and subscribes
> to
> >given topic. Consumer group is simply -consumer. So if my
> > topic
> >bulk-email, then consumer group for all those threads is
> > bulk-email-consumer
> >4. Once this is done, inside an infinite while loop consumer.poll(100)
> >method keeps running. So this application is a daemon. Shuts down only
> > when
> >server shuts down or in case of kill command.
> >
> > I have configured session.timeout.ms in consumer properties. I haven't
> > done
> > anything about zookeeper timeout. Is it required now? Since consumer
> > accesses only the brokers.
> >
> > On Thu, Aug 9, 2018 at 3:03 PM M. Manna  wrote:
> >
> > > In the simplest way, how have you implemented your consumer?
> > >
> > > 1) Does your consumers join a designated group, process messages, and
> > then
> > > closes all connection? Or does it stay open perpetually until server
> > > shutdown?
> > > 2) Have you configured the session timeouts for client and zookeeper
> > > accordingly?
> > >
> > > Regards,
> > >
> > > On 9 August 2018 at 08:00, Shantanu Deshmukh 
> > > wrote:
> > >
> > > >  I am facing too many problems these days. Now one of our consumer
> > groups
> > > > is rebalancing every now and then. And rebalance takes very low, more
> > > than
> > > > 5-10 minutes. Even after re-balancing I see that only half of the
> > > consumers
> > > > are active/receive assignment. Its all going haywire.
> > > >
> > > > I am seeing these logs in kafka consumer logs. Can anyone help me
> > > > understand what is going on here? It is a very long piece of log, but
> > > > someone please help me. I am desperately looking for any solution
> since
> > > > more than 2 months now. But to no avail.
> > > >
> > > > [2018-08-09 11:39:51] :: DEBUG ::
> > > > AbstractCoordinator$HeartbeatResponseHandler:694 - Received
> successful
> > > > heartbeat response for group bulk-email-consumer
> > > > [2018-08-09 11:39:53] :: DEBUG ::
> > > > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > > > bulk-email-consumer committed offset 25465113 for partition
> > bulk-email-8
> > > > [2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 -
> Completed
> > > > autocommit of offsets
> {bulk-email-8=OffsetAndMetadata{offset=25465113,
> > > > metadata=''}} for group bulk-email-consumer
> > > > [2018-08-09 11:39:53] :: DEBUG ::
> > > > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > > > bulk-email-consumer committed offset 25463566 for partition
> > bulk-email-6
> > > > [2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 -
> Completed
> > > > autocommit of offsets
> {bulk-email-6=OffsetAndMetadata{o

Re: How to reduce kafka's rebalance time ?

2018-08-15 Thread Shantanu Deshmukh
I am also facing the same issue. Whenever I am restarting my consumers it
is taking upto 10 minutes to start consumption. Also some of the consumers
randomly rebalance and it again takes same amount of time to complete
rebalance.
I haven't been able to figure out any solution for this issue, nor have I
received any help from here.

On Thu, Aug 16, 2018 at 9:56 AM 堅強de泡沫  wrote:

> hello:
> How to reduce kafka's rebalance time ?
> It takes a lot of time to rebalance each time. Why?


Re: How to reduce kafka's rebalance time ?

2018-08-16 Thread Shantanu Deshmukh
Hi Manna,

I meant no offense. Simply meant to say that haven't found solution to my
problem from here.
Apologies, if my sentence was off the line.

On Thu, Aug 16, 2018 at 4:05 PM M. Manna  wrote:

> You have been recommended to upgrade to a newer version of Kafka, or tune
> timeout params. Adhering to a older version is more of the users’ decision.
> Perhaps, we should simply put older versions as “End of Life”.
>
> As part of open source initiative, you are always welcome to debug and
> demonstrate how your use case is different, and raise a KIP.
>
> Not sure what you mean by “*no have I received any help from here.” *
>
> We are always actively trying to contribute as much as we can, and
> sometimes the answers may not be according to your expectations or
> timeline. Hence, the open source initiative.
>
> Hope this makes sense.
>
> Regards,
>
> Regards,
> On Thu, 16 Aug 2018 at 06:55, Shantanu Deshmukh 
> wrote:
>
> > I am also facing the same issue. Whenever I am restarting my consumers it
> > is taking upto 10 minutes to start consumption. Also some of the
> consumers
> > randomly rebalance and it again takes same amount of time to complete
> > rebalance.
> > I haven't been able to figure out any solution for this issue, nor have I
> > received any help from here.
> >
> > On Thu, Aug 16, 2018 at 9:56 AM 堅強de泡沫  wrote:
> >
> > > hello:
> > > How to reduce kafka's rebalance time ?
> > > It takes a lot of time to rebalance each time. Why?
> >
>


Re: Very long consumer rebalances

2018-08-16 Thread Shantanu Deshmukh
I saw a few topics with segment.ms and retention.ms property set. Can that
be causing any issue? I remember that this is the only change I carried out
to the cluster in last couple of months after which the problem started.

On Fri, Aug 10, 2018 at 2:55 PM M. Manna  wrote:

> if you can upgrade, I would say upgrading to 0.10.2.x would be better for
> you (or even higher, 2.0.0). Otherwise you have to play around with
> max.poll.records and session.timeout.ms.
>
> As the doc says (or newer versions), the adjustment should be such that
> request.timeout.ms >= max.poll.interval.ms. Also, heartbeat.interval.ms
> should be curbed at (rule of thumb) 30% of session.timeout.ms.
>
> Lastly, all these have to be within the bounds of
> group.min.session.timeout.ms and group.max.session.timeout.ms.
>
> You can check all these, tune them as necessary, and retry. Some of these
> configs may or may not be applicable at runtime. so a rolling restart may
> be required before all changes take place.
>
> On 9 August 2018 at 13:48, Shantanu Deshmukh 
> wrote:
>
> > Hi,
> >
> > Yes my consumer application works like below
> >
> >1. Reads how many workers are required to process each topics from
> >properties file
> >2. As many threads are spawned as there are workers mentioned in
> >properties file, topic name is passed to this thread. FixedThreadPool
> >implementation is used.
> >3. Each worker thread initializes one consumer object and subscribes
> to
> >given topic. Consumer group is simply -consumer. So if my
> > topic
> >bulk-email, then consumer group for all those threads is
> > bulk-email-consumer
> >4. Once this is done, inside an infinite while loop consumer.poll(100)
> >method keeps running. So this application is a daemon. Shuts down only
> > when
> >server shuts down or in case of kill command.
> >
> > I have configured session.timeout.ms in consumer properties. I haven't
> > done
> > anything about zookeeper timeout. Is it required now? Since consumer
> > accesses only the brokers.
> >
> > On Thu, Aug 9, 2018 at 3:03 PM M. Manna  wrote:
> >
> > > In the simplest way, how have you implemented your consumer?
> > >
> > > 1) Does your consumers join a designated group, process messages, and
> > then
> > > closes all connection? Or does it stay open perpetually until server
> > > shutdown?
> > > 2) Have you configured the session timeouts for client and zookeeper
> > > accordingly?
> > >
> > > Regards,
> > >
> > > On 9 August 2018 at 08:00, Shantanu Deshmukh 
> > > wrote:
> > >
> > > >  I am facing too many problems these days. Now one of our consumer
> > groups
> > > > is rebalancing every now and then. And rebalance takes very low, more
> > > than
> > > > 5-10 minutes. Even after re-balancing I see that only half of the
> > > consumers
> > > > are active/receive assignment. Its all going haywire.
> > > >
> > > > I am seeing these logs in kafka consumer logs. Can anyone help me
> > > > understand what is going on here? It is a very long piece of log, but
> > > > someone please help me. I am desperately looking for any solution
> since
> > > > more than 2 months now. But to no avail.
> > > >
> > > > [2018-08-09 11:39:51] :: DEBUG ::
> > > > AbstractCoordinator$HeartbeatResponseHandler:694 - Received
> successful
> > > > heartbeat response for group bulk-email-consumer
> > > > [2018-08-09 11:39:53] :: DEBUG ::
> > > > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > > > bulk-email-consumer committed offset 25465113 for partition
> > bulk-email-8
> > > > [2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 -
> Completed
> > > > autocommit of offsets
> {bulk-email-8=OffsetAndMetadata{offset=25465113,
> > > > metadata=''}} for group bulk-email-consumer
> > > > [2018-08-09 11:39:53] :: DEBUG ::
> > > > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > > > bulk-email-consumer committed offset 25463566 for partition
> > bulk-email-6
> > > > [2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 -
> Completed
> > > > autocommit of offsets
> {bulk-email-6=OffsetAndMetadata{offset=25463566,
> > > > metadata=''}} for group bulk-email-consumer
> > > > [2018-08-09 11:39:53] :: DEBUG ::
> > > > ConsumerCoordina

Re: Kafka issue

2018-08-19 Thread Shantanu Deshmukh
How many brokers are there in your cluster? This error usually comes when
one of the brokers who is leader for a partition dies and you are trying to
access it.

On Fri, Aug 17, 2018 at 9:23 PM Harish K  wrote:

> Hi,
>I have installed Kafka and created topic but while data ingestion i get
> some errors as follows.Any help would be really appreciated
>
>
> [2018-08-17 06:12:49,838] WARN Error while fetching metadata with
> correlation id 24 :
> {wikipedia=LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient)
>
> *server log:*
>
> [2018-08-17 06:06:00,719] INFO Creating /controller (is it secure? false)
> (kafka.utils.ZKCheckedEphemeral)
> [2018-08-17 06:06:00,720] INFO Result of znode creation is: OK
> (kafka.utils.ZKCheckedEphemeral)
> [2018-08-17 06:06:00,720] INFO 0 successfully elected as leader
> (kafka.server.ZookeeperLeaderElector)
> [2018-08-17 06:06:00,736] ERROR Error while electing or becoming leader on
> broker 0 (kafka.server.ZookeeperLeaderElector)
> kafka.common.KafkaException: Can't parse json string: null
> at kafka.utils.Json$.liftedTree1$1(Json.scala:40)
> at kafka.utils.Json$.parseFull(Json.scala:36)
> at
>
> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:660)
> at
>
> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:656)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at kafka.utils.ZkUtils.getReplicaAssignmentForTopics(ZkUtils.scala:656)
> at
>
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
> at
>
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:333)
> at
>
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:160)
> at
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:85)
> at
>
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:154)
> at
>
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:154)
> at
>
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:154)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at
>
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:153)
> at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:825)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:72)
> Caused by: java.lang.NullPointerException
> at
>
> scala.util.parsing.combinator.lexical.Scanners$Scanner.(Scanners.scala:44)
> at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:51)
> at scala.util.parsing.json.JSON$.parseFull(JSON.scala:65)
> at kafka.utils.Json$.liftedTree1$1(Json.scala:37)
> ... 17 more
>
>
> *Controller Log:*
>
> [2018-08-17 06:05:54,644] INFO [Controller 0]: Controller starting up
> (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,659] INFO [Controller 0]: Broker 0 starting become
> controller state transition (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,661] INFO [Controller 0]: Initialized controller epoch
> to 2294948 and zk version 2294947 (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,664] INFO [Controller 0]: Controller 0 incremented
> epoch to 2294949 (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,665] DEBUG [Controller 0]: Registering
> IsrChangeNotificationListener (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,705] INFO [Controller 0]: Controller startup complete
> (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,715] DEBUG [Controller 0]: Controller resigning,
> broker id 0 (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,715] DEBUG [Controller 0]: De-registering
> IsrChangeNotificationListener (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,717] INFO [Partition state machine on Controller 0]:
> Stopped partition state machine (kafka.controller.PartitionStateMachine)
> [2018-08-17 06:05:54,718] INFO [Replica state machine on controller 0]:
> Stopped replica state machine (kafka.controller.ReplicaStateMachine)
> [2018-08-17 06:05:54,718] INFO [Controller 0]: Broker 0 resigned as the
> controller (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,727] INFO [Controller 0]: Broker 0 starting become
> controller state transition (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,728] INFO [Controller 0]: Initialized controller epoch
> to 2294949 and zk version 2294948 (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,730] INFO [Controller 0]: Controller 0 incremented
> epoch to 2294950 (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,730] DEBUG [Controller 0]: Registering
> IsrChangeNotificationListener (kafka.controller.KafkaController)
> [2018-08-17 06:05:54,777] I

Re: NetworkException exception while send/publishing records(Producer)

2018-08-19 Thread Shantanu Deshmukh
Firstly, record size of 150mb is too big. I am quite sure your timeout
exceptions are due to such a large record. There is a setting in producer
and broker config which allows you to specify max message size in bytes.
But still records each of size 150mb might lead to problems with increasing
volume. You need to look at how you can reduce your message size.

Kafka producer is thread safe and according to documentation you will get
best performance if you share producer with multiple threads. Don't
initiate a new kafka producer for each of your thread.

On Fri, Aug 17, 2018 at 9:26 PM Pulkit Manchanda 
wrote:

> Hi All,
>
> I am sending the multiple records to the same topic.
> I have the two approaches
> 1)Sharing the producer with all the threads
> 2) creating a new producer for every thread.
>
> I am sending the records of size ~150Mb on multiple request.
> I am running the cluster and app on my local machine with 3 brokers and
> max.request .size 1Gb.
>
> While sending the records using the following code with approach 2)
> creating a new producer I am getting the network exception
> and when I use the approach 1) sharing the producer. I get the same network
> exception and sometimes Timeout too.
> I looked onto google and StackOverflow but didn't find any solution to the
> Network Exception.
>
> val metadata = producer.send(record).get()
>
>
> java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.NetworkException: The server disconnected
> before a response was received.
> at
>
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
> at
>
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
> at
>
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
> at service.KafkaService.sendRecordToKafka(KafkaService.scala:65)
>
>
> Any help will be appreciated.
>
> Thanks
> Pulkit
>


Re: Very long consumer rebalances

2018-08-22 Thread Shantanu Deshmukh
Can anyone help me understand how to debug this issue? I tried setting log
level to trace in consumer logback configuration. But at such times nothing
appears in log, even in trace level. It is like entire code is frozen.

On Thu, Aug 16, 2018 at 6:32 PM Shantanu Deshmukh 
wrote:

> I saw a few topics with segment.ms and retention.ms property set. Can
> that be causing any issue? I remember that this is the only change I
> carried out to the cluster in last couple of months after which the problem
> started.
>
> On Fri, Aug 10, 2018 at 2:55 PM M. Manna  wrote:
>
>> if you can upgrade, I would say upgrading to 0.10.2.x would be better for
>> you (or even higher, 2.0.0). Otherwise you have to play around with
>> max.poll.records and session.timeout.ms.
>>
>> As the doc says (or newer versions), the adjustment should be such that
>> request.timeout.ms >= max.poll.interval.ms. Also, heartbeat.interval.ms
>> should be curbed at (rule of thumb) 30% of session.timeout.ms.
>>
>> Lastly, all these have to be within the bounds of
>> group.min.session.timeout.ms and group.max.session.timeout.ms.
>>
>> You can check all these, tune them as necessary, and retry. Some of these
>> configs may or may not be applicable at runtime. so a rolling restart may
>> be required before all changes take place.
>>
>> On 9 August 2018 at 13:48, Shantanu Deshmukh 
>> wrote:
>>
>> > Hi,
>> >
>> > Yes my consumer application works like below
>> >
>> >1. Reads how many workers are required to process each topics from
>> >properties file
>> >2. As many threads are spawned as there are workers mentioned in
>> >properties file, topic name is passed to this thread. FixedThreadPool
>> >implementation is used.
>> >3. Each worker thread initializes one consumer object and subscribes
>> to
>> >given topic. Consumer group is simply -consumer. So if my
>> > topic
>> >bulk-email, then consumer group for all those threads is
>> > bulk-email-consumer
>> >4. Once this is done, inside an infinite while loop
>> consumer.poll(100)
>> >method keeps running. So this application is a daemon. Shuts down
>> only
>> > when
>> >server shuts down or in case of kill command.
>> >
>> > I have configured session.timeout.ms in consumer properties. I haven't
>> > done
>> > anything about zookeeper timeout. Is it required now? Since consumer
>> > accesses only the brokers.
>> >
>> > On Thu, Aug 9, 2018 at 3:03 PM M. Manna  wrote:
>> >
>> > > In the simplest way, how have you implemented your consumer?
>> > >
>> > > 1) Does your consumers join a designated group, process messages, and
>> > then
>> > > closes all connection? Or does it stay open perpetually until server
>> > > shutdown?
>> > > 2) Have you configured the session timeouts for client and zookeeper
>> > > accordingly?
>> > >
>> > > Regards,
>> > >
>> > > On 9 August 2018 at 08:00, Shantanu Deshmukh 
>> > > wrote:
>> > >
>> > > >  I am facing too many problems these days. Now one of our consumer
>> > groups
>> > > > is rebalancing every now and then. And rebalance takes very low,
>> more
>> > > than
>> > > > 5-10 minutes. Even after re-balancing I see that only half of the
>> > > consumers
>> > > > are active/receive assignment. Its all going haywire.
>> > > >
>> > > > I am seeing these logs in kafka consumer logs. Can anyone help me
>> > > > understand what is going on here? It is a very long piece of log,
>> but
>> > > > someone please help me. I am desperately looking for any solution
>> since
>> > > > more than 2 months now. But to no avail.
>> > > >
>> > > > [2018-08-09 11:39:51] :: DEBUG ::
>> > > > AbstractCoordinator$HeartbeatResponseHandler:694 - Received
>> successful
>> > > > heartbeat response for group bulk-email-consumer
>> > > > [2018-08-09 11:39:53] :: DEBUG ::
>> > > > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
>> > > > bulk-email-consumer committed offset 25465113 for partition
>> > bulk-email-8
>> > > > [2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 -
>> Completed
>> > > > autocommit of offsets
>> {bulk-email-8=OffsetAndMetad

Frequent appearance of "Marking the coordinator dead" message in consumer log

2018-08-22 Thread Shantanu Deshmukh
Hello,

We have Kafka 0.10.0.1 running on a 3 broker cluster. We have an
application which consumes from a topic having 10 partitions. 10 consumers
are spawned from this process, they belong to one consumer group.

What we have observed is that very frequently we are observing such
messages in consumer logs

[2018-08-21 11:12:46] :: WARN  :: ConsumerCoordinator:554 - Auto offset
commit failed for group otp-email-consumer: Commit cannot be completed
since the group has already rebalanced and assigned the partitions to
another member. This means that the time between subsequent calls to poll()
was longer than the configured max.poll.interval.ms, which typically
implies that the poll loop is spending too much time message processing.
You can address this either by increasing the session timeout or by
reducing the maximum size of batches returned in poll() with
max.poll.records.
[2018-08-21 11:12:46] :: INFO  :: ConsumerCoordinator:333 - Revoking
previously assigned partitions [otp-email-1, otp-email-0, otp-email-3,
otp-email-2] for group otp-email-consumer
[2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:381 - (Re-)joining
group otp-email-consumer
[2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:600 - *Marking the
coordinator x.x.x.x:9092 (id: 2147483646 rack: null) dead for group
otp-email-consumer*
[2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:600 - *Marking the
coordinator x.x.x.x:9092 (id: 2147483646 rack: null) dead for group
otp-email-consumer*
[2018-08-21 11:12:46] :: INFO  ::
AbstractCoordinator$GroupCoordinatorResponseHandler:555 - Discovered
coordinator 10.189.179.117:9092 (id: 2147483646 rack: null) for group
otp-email-consumer.
[2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:381 - (Re-)joining
group otp-email-consumer

After this, the group enters rebalancing phase and it takes about 5-10
minutes to start consuming messages again.
What does this message mean? The actual broker doesn't  go down as per our
monitoring tools. So how come it is declared dead? Please help, I am stuck
on this issue since 2 months now.

Here's our consumer configuration
auto.commit.interval.ms = 3000
auto.offset.reset = latest
bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = otp-notifications-consumer
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.
StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 50
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partition.assignment.strategy = [class org.apache.kafka.clients.
consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 30
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /x/x/client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.
StringDeserializer


Re: Frequent appearance of "Marking the coordinator dead" message in consumer log

2018-08-22 Thread Shantanu Deshmukh
Hi Steve,

Application is just sending mails. Every record is just a email request
with very basic business logic. Generally it doesn't take more than 200ms
to process a single mail. Currently it is averaging out at 70-80 ms.

On Wed, Aug 22, 2018 at 3:06 PM Steve Tian  wrote:

> How long did it take to process 50 `ConsumerRecord`s?
>
> On Wed, Aug 22, 2018, 5:16 PM Shantanu Deshmukh 
> wrote:
>
> > Hello,
> >
> > We have Kafka 0.10.0.1 running on a 3 broker cluster. We have an
> > application which consumes from a topic having 10 partitions. 10
> consumers
> > are spawned from this process, they belong to one consumer group.
> >
> > What we have observed is that very frequently we are observing such
> > messages in consumer logs
> >
> > [2018-08-21 11:12:46] :: WARN  :: ConsumerCoordinator:554 - Auto offset
> > commit failed for group otp-email-consumer: Commit cannot be completed
> > since the group has already rebalanced and assigned the partitions to
> > another member. This means that the time between subsequent calls to
> poll()
> > was longer than the configured max.poll.interval.ms, which typically
> > implies that the poll loop is spending too much time message processing.
> > You can address this either by increasing the session timeout or by
> > reducing the maximum size of batches returned in poll() with
> > max.poll.records.
> > [2018-08-21 11:12:46] :: INFO  :: ConsumerCoordinator:333 - Revoking
> > previously assigned partitions [otp-email-1, otp-email-0, otp-email-3,
> > otp-email-2] for group otp-email-consumer
> > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:381 - (Re-)joining
> > group otp-email-consumer
> > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:600 - *Marking the
> > coordinator x.x.x.x:9092 (id: 2147483646 rack: null) dead for group
> > otp-email-consumer*
> > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:600 - *Marking the
> > coordinator x.x.x.x:9092 (id: 2147483646 rack: null) dead for group
> > otp-email-consumer*
> > [2018-08-21 11:12:46] :: INFO  ::
> > AbstractCoordinator$GroupCoordinatorResponseHandler:555 - Discovered
> > coordinator 10.189.179.117:9092 (id: 2147483646 rack: null) for group
> > otp-email-consumer.
> > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:381 - (Re-)joining
> > group otp-email-consumer
> >
> > After this, the group enters rebalancing phase and it takes about 5-10
> > minutes to start consuming messages again.
> > What does this message mean? The actual broker doesn't  go down as per
> our
> > monitoring tools. So how come it is declared dead? Please help, I am
> stuck
> > on this issue since 2 months now.
> >
> > Here's our consumer configuration
> > auto.commit.interval.ms = 3000
> > auto.offset.reset = latest
> > bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> > check.crcs = true
> > client.id =
> > connections.max.idle.ms = 54
> > enable.auto.commit = true
> > exclude.internal.topics = true
> > fetch.max.bytes = 52428800
> > fetch.max.wait.ms = 500
> > fetch.min.bytes = 1
> > group.id = otp-notifications-consumer
> > heartbeat.interval.ms = 3000
> > interceptor.classes = null
> > key.deserializer = class org.apache.kafka.common.serialization.
> > StringDeserializer
> > max.partition.fetch.bytes = 1048576
> > max.poll.interval.ms = 30
> > max.poll.records = 50
> > metadata.max.age.ms = 30
> > metric.reporters = []
> > metrics.num.samples = 2
> > metrics.sample.window.ms = 3
> > partition.assignment.strategy = [class org.apache.kafka.clients.
> > consumer.RangeAssignor]
> > receive.buffer.bytes = 65536
> > reconnect.backoff.ms = 50
> > request.timeout.ms = 305000
> > retry.backoff.ms = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > sasl.kerberos.min.time.before.relogin = 6
> > sasl.kerberos.service.name = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > sasl.mechanism = GSSAPI
> > security.protocol = SSL
> > send.buffer.bytes = 131072
> > session.timeout.ms = 30
> > ssl.cipher.suites = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > ssl.endpoint.identification.algorithm = null
> > ssl.key.password = null
> > ssl.keymanager.algorithm = SunX509
> > ssl.keystore.location = null
> > ssl.keystore.password = null
> > ssl.keystore.type = JKS
> > ssl.protocol = TLS
> > ssl.provider = null
> > ssl.secure.random.implementation = null
> > ssl.trustmanager.algorithm = PKIX
> > ssl.truststore.location = /x/x/client.truststore.jks
> > ssl.truststore.password = [hidden]
> > ssl.truststore.type = JKS
> > value.deserializer = class org.apache.kafka.common.serialization.
> > StringDeserializer
> >
>


Re: Frequent appearance of "Marking the coordinator dead" message in consumer log

2018-08-22 Thread Shantanu Deshmukh
How do I check for GC pausing?

On Wed, Aug 22, 2018 at 4:12 PM Steve Tian  wrote:

> Did you observed any GC-pausing?
>
> On Wed, Aug 22, 2018, 6:38 PM Shantanu Deshmukh 
> wrote:
>
> > Hi Steve,
> >
> > Application is just sending mails. Every record is just a email request
> > with very basic business logic. Generally it doesn't take more than 200ms
> > to process a single mail. Currently it is averaging out at 70-80 ms.
> >
> > On Wed, Aug 22, 2018 at 3:06 PM Steve Tian 
> > wrote:
> >
> > > How long did it take to process 50 `ConsumerRecord`s?
> > >
> > > On Wed, Aug 22, 2018, 5:16 PM Shantanu Deshmukh  >
> > > wrote:
> > >
> > > > Hello,
> > > >
> > > > We have Kafka 0.10.0.1 running on a 3 broker cluster. We have an
> > > > application which consumes from a topic having 10 partitions. 10
> > > consumers
> > > > are spawned from this process, they belong to one consumer group.
> > > >
> > > > What we have observed is that very frequently we are observing such
> > > > messages in consumer logs
> > > >
> > > > [2018-08-21 11:12:46] :: WARN  :: ConsumerCoordinator:554 - Auto
> offset
> > > > commit failed for group otp-email-consumer: Commit cannot be
> completed
> > > > since the group has already rebalanced and assigned the partitions to
> > > > another member. This means that the time between subsequent calls to
> > > poll()
> > > > was longer than the configured max.poll.interval.ms, which typically
> > > > implies that the poll loop is spending too much time message
> > processing.
> > > > You can address this either by increasing the session timeout or by
> > > > reducing the maximum size of batches returned in poll() with
> > > > max.poll.records.
> > > > [2018-08-21 11:12:46] :: INFO  :: ConsumerCoordinator:333 - Revoking
> > > > previously assigned partitions [otp-email-1, otp-email-0,
> otp-email-3,
> > > > otp-email-2] for group otp-email-consumer
> > > > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:381 -
> > (Re-)joining
> > > > group otp-email-consumer
> > > > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:600 - *Marking
> > the
> > > > coordinator x.x.x.x:9092 (id: 2147483646 rack: null) dead for group
> > > > otp-email-consumer*
> > > > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:600 - *Marking
> > the
> > > > coordinator x.x.x.x:9092 (id: 2147483646 rack: null) dead for group
> > > > otp-email-consumer*
> > > > [2018-08-21 11:12:46] :: INFO  ::
> > > > AbstractCoordinator$GroupCoordinatorResponseHandler:555 - Discovered
> > > > coordinator 10.189.179.117:9092 (id: 2147483646 rack: null) for
> group
> > > > otp-email-consumer.
> > > > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:381 -
> > (Re-)joining
> > > > group otp-email-consumer
> > > >
> > > > After this, the group enters rebalancing phase and it takes about
> 5-10
> > > > minutes to start consuming messages again.
> > > > What does this message mean? The actual broker doesn't  go down as
> per
> > > our
> > > > monitoring tools. So how come it is declared dead? Please help, I am
> > > stuck
> > > > on this issue since 2 months now.
> > > >
> > > > Here's our consumer configuration
> > > > auto.commit.interval.ms = 3000
> > > > auto.offset.reset = latest
> > > > bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> > > > check.crcs = true
> > > > client.id =
> > > > connections.max.idle.ms = 54
> > > > enable.auto.commit = true
> > > > exclude.internal.topics = true
> > > > fetch.max.bytes = 52428800
> > > > fetch.max.wait.ms = 500
> > > > fetch.min.bytes = 1
> > > > group.id = otp-notifications-consumer
> > > > heartbeat.interval.ms = 3000
> > > > interceptor.classes = null
> > > > key.deserializer = class org.apache.kafka.common.serialization.
> > > > StringDeserializer
> > > > max.partition.fetch.bytes = 1048576
> > > > max.poll.interval.ms = 30
> > > > max.poll.records = 50
> > > > metadata.max.age.ms = 30
> > > > metric.reporters = []
> > > > metrics.num.samples = 2
> > > > met

Re: Frequent appearance of "Marking the coordinator dead" message in consumer log

2018-08-22 Thread Shantanu Deshmukh
Ohh sorry, my bad. Kafka version is 0.10.1.0 indeed and so is the client.

On Wed, Aug 22, 2018 at 4:26 PM Steve Tian  wrote:

> NVM.  What's your client version?  I'm asking as max.poll.interval.ms
> should be introduced since 0.10.1.0, which is not the version you mentioned
> in the email thread.
>
> On Wed, Aug 22, 2018, 6:51 PM Shantanu Deshmukh 
> wrote:
>
> > How do I check for GC pausing?
> >
> > On Wed, Aug 22, 2018 at 4:12 PM Steve Tian 
> > wrote:
> >
> > > Did you observed any GC-pausing?
> > >
> > > On Wed, Aug 22, 2018, 6:38 PM Shantanu Deshmukh  >
> > > wrote:
> > >
> > > > Hi Steve,
> > > >
> > > > Application is just sending mails. Every record is just a email
> request
> > > > with very basic business logic. Generally it doesn't take more than
> > 200ms
> > > > to process a single mail. Currently it is averaging out at 70-80 ms.
> > > >
> > > > On Wed, Aug 22, 2018 at 3:06 PM Steve Tian 
> > > > wrote:
> > > >
> > > > > How long did it take to process 50 `ConsumerRecord`s?
> > > > >
> > > > > On Wed, Aug 22, 2018, 5:16 PM Shantanu Deshmukh <
> > shantanu...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > We have Kafka 0.10.0.1 running on a 3 broker cluster. We have an
> > > > > > application which consumes from a topic having 10 partitions. 10
> > > > > consumers
> > > > > > are spawned from this process, they belong to one consumer group.
> > > > > >
> > > > > > What we have observed is that very frequently we are observing
> such
> > > > > > messages in consumer logs
> > > > > >
> > > > > > [2018-08-21 11:12:46] :: WARN  :: ConsumerCoordinator:554 - Auto
> > > offset
> > > > > > commit failed for group otp-email-consumer: Commit cannot be
> > > completed
> > > > > > since the group has already rebalanced and assigned the
> partitions
> > to
> > > > > > another member. This means that the time between subsequent calls
> > to
> > > > > poll()
> > > > > > was longer than the configured max.poll.interval.ms, which
> > typically
> > > > > > implies that the poll loop is spending too much time message
> > > > processing.
> > > > > > You can address this either by increasing the session timeout or
> by
> > > > > > reducing the maximum size of batches returned in poll() with
> > > > > > max.poll.records.
> > > > > > [2018-08-21 11:12:46] :: INFO  :: ConsumerCoordinator:333 -
> > Revoking
> > > > > > previously assigned partitions [otp-email-1, otp-email-0,
> > > otp-email-3,
> > > > > > otp-email-2] for group otp-email-consumer
> > > > > > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:381 -
> > > > (Re-)joining
> > > > > > group otp-email-consumer
> > > > > > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:600 -
> > *Marking
> > > > the
> > > > > > coordinator x.x.x.x:9092 (id: 2147483646 rack: null) dead for
> group
> > > > > > otp-email-consumer*
> > > > > > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:600 -
> > *Marking
> > > > the
> > > > > > coordinator x.x.x.x:9092 (id: 2147483646 rack: null) dead for
> group
> > > > > > otp-email-consumer*
> > > > > > [2018-08-21 11:12:46] :: INFO  ::
> > > > > > AbstractCoordinator$GroupCoordinatorResponseHandler:555 -
> > Discovered
> > > > > > coordinator 10.189.179.117:9092 (id: 2147483646 rack: null) for
> > > group
> > > > > > otp-email-consumer.
> > > > > > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:381 -
> > > > (Re-)joining
> > > > > > group otp-email-consumer
> > > > > >
> > > > > > After this, the group enters rebalancing phase and it takes about
> > > 5-10
> > > > > > minutes to start consuming messages again.
> > > > > > What does this message mean? The actual broker doesn't  go down
> as
> > > per
> > > > > our
> > > > >

Re: Frequent appearance of "Marking the coordinator dead" message in consumer log

2018-08-22 Thread Shantanu Deshmukh
I know average time of processing one record, it is about 70-80ms. I have
set session.timeout.ms so high total processing time for one poll
invocation should be well within it.

On Wed, Aug 22, 2018 at 5:04 PM Steve Tian  wrote:

> Have you measured the duration between two `poll` invocations and the size
> of returned `ConsumrRecords`?
>
> On Wed, Aug 22, 2018, 7:00 PM Shantanu Deshmukh 
> wrote:
>
> > Ohh sorry, my bad. Kafka version is 0.10.1.0 indeed and so is the client.
> >
> > On Wed, Aug 22, 2018 at 4:26 PM Steve Tian 
> > wrote:
> >
> > > NVM.  What's your client version?  I'm asking as max.poll.interval.ms
> > > should be introduced since 0.10.1.0, which is not the version you
> > mentioned
> > > in the email thread.
> > >
> > > On Wed, Aug 22, 2018, 6:51 PM Shantanu Deshmukh  >
> > > wrote:
> > >
> > > > How do I check for GC pausing?
> > > >
> > > > On Wed, Aug 22, 2018 at 4:12 PM Steve Tian 
> > > > wrote:
> > > >
> > > > > Did you observed any GC-pausing?
> > > > >
> > > > > On Wed, Aug 22, 2018, 6:38 PM Shantanu Deshmukh <
> > shantanu...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Steve,
> > > > > >
> > > > > > Application is just sending mails. Every record is just a email
> > > request
> > > > > > with very basic business logic. Generally it doesn't take more
> than
> > > > 200ms
> > > > > > to process a single mail. Currently it is averaging out at 70-80
> > ms.
> > > > > >
> > > > > > On Wed, Aug 22, 2018 at 3:06 PM Steve Tian <
> > steve.cs.t...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > How long did it take to process 50 `ConsumerRecord`s?
> > > > > > >
> > > > > > > On Wed, Aug 22, 2018, 5:16 PM Shantanu Deshmukh <
> > > > shantanu...@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hello,
> > > > > > > >
> > > > > > > > We have Kafka 0.10.0.1 running on a 3 broker cluster. We have
> > an
> > > > > > > > application which consumes from a topic having 10 partitions.
> > 10
> > > > > > > consumers
> > > > > > > > are spawned from this process, they belong to one consumer
> > group.
> > > > > > > >
> > > > > > > > What we have observed is that very frequently we are
> observing
> > > such
> > > > > > > > messages in consumer logs
> > > > > > > >
> > > > > > > > [2018-08-21 11:12:46] :: WARN  :: ConsumerCoordinator:554 -
> > Auto
> > > > > offset
> > > > > > > > commit failed for group otp-email-consumer: Commit cannot be
> > > > > completed
> > > > > > > > since the group has already rebalanced and assigned the
> > > partitions
> > > > to
> > > > > > > > another member. This means that the time between subsequent
> > calls
> > > > to
> > > > > > > poll()
> > > > > > > > was longer than the configured max.poll.interval.ms, which
> > > > typically
> > > > > > > > implies that the poll loop is spending too much time message
> > > > > > processing.
> > > > > > > > You can address this either by increasing the session timeout
> > or
> > > by
> > > > > > > > reducing the maximum size of batches returned in poll() with
> > > > > > > > max.poll.records.
> > > > > > > > [2018-08-21 11:12:46] :: INFO  :: ConsumerCoordinator:333 -
> > > > Revoking
> > > > > > > > previously assigned partitions [otp-email-1, otp-email-0,
> > > > > otp-email-3,
> > > > > > > > otp-email-2] for group otp-email-consumer
> > > > > > > > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordinator:381 -
> > > > > > (Re-)joining
> > > > > > > > group otp-email-consumer
> > > > > > > > [2018-08-21 11:12:46] :: INFO  :: AbstractCoordina

Re: Frequent appearance of "Marking the coordinator dead" message in consumer log

2018-08-28 Thread Shantanu Deshmukh
Someone, please help me. Only 1 or 2 out of 7 consumer groups keep
rebalancing every 5-10mins. One topic is constantly receiving 10-20
msg/sec. The other one receives a bulk load after many hours of inactivity.
CGs for both these topics are different. So, I see no observable pattern
here.

On Wed, Aug 22, 2018 at 5:47 PM Shantanu Deshmukh 
wrote:

> I know average time of processing one record, it is about 70-80ms. I have
> set session.timeout.ms so high total processing time for one poll
> invocation should be well within it.
>
> On Wed, Aug 22, 2018 at 5:04 PM Steve Tian 
> wrote:
>
>> Have you measured the duration between two `poll` invocations and the size
>> of returned `ConsumrRecords`?
>>
>> On Wed, Aug 22, 2018, 7:00 PM Shantanu Deshmukh 
>> wrote:
>>
>> > Ohh sorry, my bad. Kafka version is 0.10.1.0 indeed and so is the
>> client.
>> >
>> > On Wed, Aug 22, 2018 at 4:26 PM Steve Tian 
>> > wrote:
>> >
>> > > NVM.  What's your client version?  I'm asking as max.poll.interval.ms
>> > > should be introduced since 0.10.1.0, which is not the version you
>> > mentioned
>> > > in the email thread.
>> > >
>> > > On Wed, Aug 22, 2018, 6:51 PM Shantanu Deshmukh <
>> shantanu...@gmail.com>
>> > > wrote:
>> > >
>> > > > How do I check for GC pausing?
>> > > >
>> > > > On Wed, Aug 22, 2018 at 4:12 PM Steve Tian > >
>> > > > wrote:
>> > > >
>> > > > > Did you observed any GC-pausing?
>> > > > >
>> > > > > On Wed, Aug 22, 2018, 6:38 PM Shantanu Deshmukh <
>> > shantanu...@gmail.com
>> > > >
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Steve,
>> > > > > >
>> > > > > > Application is just sending mails. Every record is just a email
>> > > request
>> > > > > > with very basic business logic. Generally it doesn't take more
>> than
>> > > > 200ms
>> > > > > > to process a single mail. Currently it is averaging out at 70-80
>> > ms.
>> > > > > >
>> > > > > > On Wed, Aug 22, 2018 at 3:06 PM Steve Tian <
>> > steve.cs.t...@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > How long did it take to process 50 `ConsumerRecord`s?
>> > > > > > >
>> > > > > > > On Wed, Aug 22, 2018, 5:16 PM Shantanu Deshmukh <
>> > > > shantanu...@gmail.com
>> > > > > >
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hello,
>> > > > > > > >
>> > > > > > > > We have Kafka 0.10.0.1 running on a 3 broker cluster. We
>> have
>> > an
>> > > > > > > > application which consumes from a topic having 10
>> partitions.
>> > 10
>> > > > > > > consumers
>> > > > > > > > are spawned from this process, they belong to one consumer
>> > group.
>> > > > > > > >
>> > > > > > > > What we have observed is that very frequently we are
>> observing
>> > > such
>> > > > > > > > messages in consumer logs
>> > > > > > > >
>> > > > > > > > [2018-08-21 11:12:46] :: WARN  :: ConsumerCoordinator:554 -
>> > Auto
>> > > > > offset
>> > > > > > > > commit failed for group otp-email-consumer: Commit cannot be
>> > > > > completed
>> > > > > > > > since the group has already rebalanced and assigned the
>> > > partitions
>> > > > to
>> > > > > > > > another member. This means that the time between subsequent
>> > calls
>> > > > to
>> > > > > > > poll()
>> > > > > > > > was longer than the configured max.poll.interval.ms, which
>> > > > typically
>> > > > > > > > implies that the poll loop is spending too much time message
>> > > > > > processing.
>> > > > > > > > You can address this either by increasing the session
>> timeout
>> > or
>> > > by
>> > > &

Re: Frequent appearance of "Marking the coordinator dead" message in consumer log

2018-08-28 Thread Shantanu Deshmukh
Hi Ryanne,

Thanks for your response. I had even tried with 5 records and session
timeout as big as 10 minutes. Logs still showed that consumer group
rebalanced many times.
Also there is another mystery, some CGs take upto 10 minutes to subscribe
to topic and start consumption. Why might that be happening, any idea?

On Tue, Aug 28, 2018 at 8:44 PM Ryanne Dolan  wrote:

> Shantanu,
>
> Sounds like your consumers are processing too many records between poll()s.
> Notice that max.poll.records is 50. If your consumer is taking up to 200ms
> to process each record, then you'd see up to 10 seconds between poll()s.
>
> If a consumer doesn't call poll() frequently enough, Kafka will consider
> the consumer to be dead and will rebalance away from it. Since all your
> consumers are in this state, your consumer group is constantly rebalancing.
>
> Fix is easy: reduce max.poll.records.
>
> Ryanne
>
> On Tue, Aug 28, 2018 at 6:34 AM Shantanu Deshmukh 
> wrote:
>
> > Someone, please help me. Only 1 or 2 out of 7 consumer groups keep
> > rebalancing every 5-10mins. One topic is constantly receiving 10-20
> > msg/sec. The other one receives a bulk load after many hours of
> inactivity.
> > CGs for both these topics are different. So, I see no observable pattern
> > here.
> >
> > On Wed, Aug 22, 2018 at 5:47 PM Shantanu Deshmukh  >
> > wrote:
> >
> > > I know average time of processing one record, it is about 70-80ms. I
> have
> > > set session.timeout.ms so high total processing time for one poll
> > > invocation should be well within it.
> > >
> > > On Wed, Aug 22, 2018 at 5:04 PM Steve Tian 
> > > wrote:
> > >
> > >> Have you measured the duration between two `poll` invocations and the
> > size
> > >> of returned `ConsumrRecords`?
> > >>
> > >> On Wed, Aug 22, 2018, 7:00 PM Shantanu Deshmukh <
> shantanu...@gmail.com>
> > >> wrote:
> > >>
> > >> > Ohh sorry, my bad. Kafka version is 0.10.1.0 indeed and so is the
> > >> client.
> > >> >
> > >> > On Wed, Aug 22, 2018 at 4:26 PM Steve Tian  >
> > >> > wrote:
> > >> >
> > >> > > NVM.  What's your client version?  I'm asking as
> > max.poll.interval.ms
> > >> > > should be introduced since 0.10.1.0, which is not the version you
> > >> > mentioned
> > >> > > in the email thread.
> > >> > >
> > >> > > On Wed, Aug 22, 2018, 6:51 PM Shantanu Deshmukh <
> > >> shantanu...@gmail.com>
> > >> > > wrote:
> > >> > >
> > >> > > > How do I check for GC pausing?
> > >> > > >
> > >> > > > On Wed, Aug 22, 2018 at 4:12 PM Steve Tian <
> > steve.cs.t...@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Did you observed any GC-pausing?
> > >> > > > >
> > >> > > > > On Wed, Aug 22, 2018, 6:38 PM Shantanu Deshmukh <
> > >> > shantanu...@gmail.com
> > >> > > >
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Hi Steve,
> > >> > > > > >
> > >> > > > > > Application is just sending mails. Every record is just a
> > email
> > >> > > request
> > >> > > > > > with very basic business logic. Generally it doesn't take
> more
> > >> than
> > >> > > > 200ms
> > >> > > > > > to process a single mail. Currently it is averaging out at
> > 70-80
> > >> > ms.
> > >> > > > > >
> > >> > > > > > On Wed, Aug 22, 2018 at 3:06 PM Steve Tian <
> > >> > steve.cs.t...@gmail.com>
> > >> > > > > > wrote:
> > >> > > > > >
> > >> > > > > > > How long did it take to process 50 `ConsumerRecord`s?
> > >> > > > > > >
> > >> > > > > > > On Wed, Aug 22, 2018, 5:16 PM Shantanu Deshmukh <
> > >> > > > shantanu...@gmail.com
> > >> > > > > >
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Hello,
> >

Re: Frequent appearance of "Marking the coordinator dead" message in consumer log

2018-08-30 Thread Shantanu Deshmukh
I have noticed a very strange behaviour in case of session.timeout.ms
setting
There are 2 topics for which message processing takes long time. So I had
increased session time out there to 5 mins. max.poll.records was kept at
10. Consumers for these topics would start consuming after 5-10 minutes. I
reset session.timeout.ms to default value and now consumers subscribe and
start consuming immediately. Also rebalances have also reduced.

Now what is this? When rebalance occurs message in log is reads that you
need to increase session.timeout.ms or reduce max.poll.ms. Now if I
increase session.timeout.ms to any value above default consumers start very
slow. Has anyone seen such behaviour or explain me why this is hapening?

On Wed, Aug 29, 2018 at 12:04 PM Shantanu Deshmukh 
wrote:

> Hi Ryanne,
>
> Thanks for your response. I had even tried with 5 records and session
> timeout as big as 10 minutes. Logs still showed that consumer group
> rebalanced many times.
> Also there is another mystery, some CGs take upto 10 minutes to subscribe
> to topic and start consumption. Why might that be happening, any idea?
>
> On Tue, Aug 28, 2018 at 8:44 PM Ryanne Dolan 
> wrote:
>
>> Shantanu,
>>
>> Sounds like your consumers are processing too many records between
>> poll()s.
>> Notice that max.poll.records is 50. If your consumer is taking up to 200ms
>> to process each record, then you'd see up to 10 seconds between poll()s.
>>
>> If a consumer doesn't call poll() frequently enough, Kafka will consider
>> the consumer to be dead and will rebalance away from it. Since all your
>> consumers are in this state, your consumer group is constantly
>> rebalancing.
>>
>> Fix is easy: reduce max.poll.records.
>>
>> Ryanne
>>
>> On Tue, Aug 28, 2018 at 6:34 AM Shantanu Deshmukh 
>> wrote:
>>
>> > Someone, please help me. Only 1 or 2 out of 7 consumer groups keep
>> > rebalancing every 5-10mins. One topic is constantly receiving 10-20
>> > msg/sec. The other one receives a bulk load after many hours of
>> inactivity.
>> > CGs for both these topics are different. So, I see no observable pattern
>> > here.
>> >
>> > On Wed, Aug 22, 2018 at 5:47 PM Shantanu Deshmukh <
>> shantanu...@gmail.com>
>> > wrote:
>> >
>> > > I know average time of processing one record, it is about 70-80ms. I
>> have
>> > > set session.timeout.ms so high total processing time for one poll
>> > > invocation should be well within it.
>> > >
>> > > On Wed, Aug 22, 2018 at 5:04 PM Steve Tian 
>> > > wrote:
>> > >
>> > >> Have you measured the duration between two `poll` invocations and the
>> > size
>> > >> of returned `ConsumrRecords`?
>> > >>
>> > >> On Wed, Aug 22, 2018, 7:00 PM Shantanu Deshmukh <
>> shantanu...@gmail.com>
>> > >> wrote:
>> > >>
>> > >> > Ohh sorry, my bad. Kafka version is 0.10.1.0 indeed and so is the
>> > >> client.
>> > >> >
>> > >> > On Wed, Aug 22, 2018 at 4:26 PM Steve Tian <
>> steve.cs.t...@gmail.com>
>> > >> > wrote:
>> > >> >
>> > >> > > NVM.  What's your client version?  I'm asking as
>> > max.poll.interval.ms
>> > >> > > should be introduced since 0.10.1.0, which is not the version you
>> > >> > mentioned
>> > >> > > in the email thread.
>> > >> > >
>> > >> > > On Wed, Aug 22, 2018, 6:51 PM Shantanu Deshmukh <
>> > >> shantanu...@gmail.com>
>> > >> > > wrote:
>> > >> > >
>> > >> > > > How do I check for GC pausing?
>> > >> > > >
>> > >> > > > On Wed, Aug 22, 2018 at 4:12 PM Steve Tian <
>> > steve.cs.t...@gmail.com
>> > >> >
>> > >> > > > wrote:
>> > >> > > >
>> > >> > > > > Did you observed any GC-pausing?
>> > >> > > > >
>> > >> > > > > On Wed, Aug 22, 2018, 6:38 PM Shantanu Deshmukh <
>> > >> > shantanu...@gmail.com
>> > >> > > >
>> > >> > > > > wrote:
>> > >> > > > >
>> > >> > > > > > Hi Steve,
>> > >> > > > > >
>> > >> > > > > > Application is just 

Kafka producer huge memory usage (leak?)

2018-09-18 Thread Shantanu Deshmukh
Hello,

We have a 3 broker Kafka 0.10.1.0 deployment in production. There are some
applications which have Kafka Producers embedded in them which send
application logs to a topic. This topic has 10 partitions with replication
factor of 3.

We are observing that memory usage on some of these application servers
keep shooting through the roof intermittently. After taking heapdump we
found out that top suspects were:
*-*


*org.apache.kafka.common.network.Selector -*

occupies *352,519,104 (24.96%)* bytes. The memory is accumulated in one
instance of *"byte[]"* loaded by *""*.

*org.apache.kafka.common.network.KafkaChannel -*

occupies *352,527,424 (24.96%)* bytes. The memory is accumulated in one
instance of *"byte[]"* loaded by *""*

* - *

Both of these were holding about 352MB of space. 3 such instances, so they
were consuming about 1.2GB of memory.

Now regarding usage of producers. Not a huge amount of logs are being sent
to Kafka cluster. It is about 200 msgs/sec. Only one producer object is
being used throughout application. Async send function is used.

What could be the cause of such huge memory usage? Is this some sort of
memory leak in this specific Kafka version?


Re: Kafka producer huge memory usage (leak?)

2018-09-18 Thread Shantanu Deshmukh
Additionally, here's the producer config

kafka.bootstrap.servers=x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092
kafka.acks=0
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.max.block.ms=1000
kafka.request.timeout.ms=1000
kafka.max.in.flight.requests.per.connection=1
kafka.retries=0
kafka.compression.type=gzip
kafka.security.protocol=SSL
kafka.ssl.truststore.location=/data/kafka/kafka-server-truststore.jks
kafka.ssl.truststore.password=XX
kafka.linger.ms=300
logger.level=INFO

On Tue, Sep 18, 2018 at 5:36 PM Shantanu Deshmukh 
wrote:

> Hello,
>
> We have a 3 broker Kafka 0.10.1.0 deployment in production. There are some
> applications which have Kafka Producers embedded in them which send
> application logs to a topic. This topic has 10 partitions with replication
> factor of 3.
>
> We are observing that memory usage on some of these application servers
> keep shooting through the roof intermittently. After taking heapdump we
> found out that top suspects were:
> *-*
>
>
> *org.apache.kafka.common.network.Selector -*
>
> occupies *352,519,104 (24.96%)* bytes. The memory is accumulated in one
> instance of *"byte[]"* loaded by *""*.
>
> *org.apache.kafka.common.network.KafkaChannel -*
>
> occupies *352,527,424 (24.96%)* bytes. The memory is accumulated in one
> instance of *"byte[]"* loaded by *""*
>
> * - *
>
> Both of these were holding about 352MB of space. 3 such instances, so they
> were consuming about 1.2GB of memory.
>
> Now regarding usage of producers. Not a huge amount of logs are being sent
> to Kafka cluster. It is about 200 msgs/sec. Only one producer object is
> being used throughout application. Async send function is used.
>
> What could be the cause of such huge memory usage? Is this some sort of
> memory leak in this specific Kafka version?
>
>


Re: Kafka producer huge memory usage (leak?)

2018-09-18 Thread Shantanu Deshmukh
Any thoughts on this matter? Someone, please help.

On Tue, Sep 18, 2018 at 6:05 PM Shantanu Deshmukh 
wrote:

> Additionally, here's the producer config
>
> kafka.bootstrap.servers=x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092
> kafka.acks=0
> kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
>
> kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
> kafka.max.block.ms=1000
> kafka.request.timeout.ms=1000
> kafka.max.in.flight.requests.per.connection=1
> kafka.retries=0
> kafka.compression.type=gzip
> kafka.security.protocol=SSL
> kafka.ssl.truststore.location=/data/kafka/kafka-server-truststore.jks
> kafka.ssl.truststore.password=XX
> kafka.linger.ms=300
> logger.level=INFO
>
> On Tue, Sep 18, 2018 at 5:36 PM Shantanu Deshmukh 
> wrote:
>
>> Hello,
>>
>> We have a 3 broker Kafka 0.10.1.0 deployment in production. There are
>> some applications which have Kafka Producers embedded in them which send
>> application logs to a topic. This topic has 10 partitions with replication
>> factor of 3.
>>
>> We are observing that memory usage on some of these application servers
>> keep shooting through the roof intermittently. After taking heapdump we
>> found out that top suspects were:
>> *-*
>>
>>
>> *org.apache.kafka.common.network.Selector -*
>>
>> occupies *352,519,104 (24.96%)* bytes. The memory is accumulated in one
>> instance of *"byte[]"* loaded by *""*.
>>
>> *org.apache.kafka.common.network.KafkaChannel -*
>>
>> occupies *352,527,424 (24.96%)* bytes. The memory is accumulated in one
>> instance of *"byte[]"* loaded by *""*
>>
>> * - *
>>
>> Both of these were holding about 352MB of space. 3 such instances, so
>> they were consuming about 1.2GB of memory.
>>
>> Now regarding usage of producers. Not a huge amount of logs are being
>> sent to Kafka cluster. It is about 200 msgs/sec. Only one producer object
>> is being used throughout application. Async send function is used.
>>
>> What could be the cause of such huge memory usage? Is this some sort of
>> memory leak in this specific Kafka version?
>>
>>


Re: Kafka producer huge memory usage (leak?)

2018-09-21 Thread Shantanu Deshmukh
Hi Manikumar,

I checked this issue. There is this patch available.
https://github.com/apache/kafka/pull/2408.patch

I pulled Kafka 0.10.1.0 from github. Then tried applying this patch. But
several places I am getting error that patch doesn't apply.
I am new to git and patching process. Can you guide me here?

On Wed, Sep 19, 2018 at 1:02 PM Manikumar  wrote:

> Similar issue reported here:KAFKA-7304, but on broker side.  maybe you can
> create a JIRA and upload the heap dump for analysis.
>
> On Wed, Sep 19, 2018 at 11:59 AM Shantanu Deshmukh 
> wrote:
>
> > Any thoughts on this matter? Someone, please help.
> >
> > On Tue, Sep 18, 2018 at 6:05 PM Shantanu Deshmukh  >
> > wrote:
> >
> > > Additionally, here's the producer config
> > >
> > > kafka.bootstrap.servers=x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092
> > > kafka.acks=0
> > >
> >
> kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
> > >
> > >
> >
> kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
> > > kafka.max.block.ms=1000
> > > kafka.request.timeout.ms=1000
> > > kafka.max.in.flight.requests.per.connection=1
> > > kafka.retries=0
> > > kafka.compression.type=gzip
> > > kafka.security.protocol=SSL
> > > kafka.ssl.truststore.location=/data/kafka/kafka-server-truststore.jks
> > > kafka.ssl.truststore.password=XX
> > > kafka.linger.ms=300
> > > logger.level=INFO
> > >
> > > On Tue, Sep 18, 2018 at 5:36 PM Shantanu Deshmukh <
> shantanu...@gmail.com
> > >
> > > wrote:
> > >
> > >> Hello,
> > >>
> > >> We have a 3 broker Kafka 0.10.1.0 deployment in production. There are
> > >> some applications which have Kafka Producers embedded in them which
> send
> > >> application logs to a topic. This topic has 10 partitions with
> > replication
> > >> factor of 3.
> > >>
> > >> We are observing that memory usage on some of these application
> servers
> > >> keep shooting through the roof intermittently. After taking heapdump
> we
> > >> found out that top suspects were:
> > >> *-*
> > >>
> > >>
> > >> *org.apache.kafka.common.network.Selector -*
> > >>
> > >> occupies *352,519,104 (24.96%)* bytes. The memory is accumulated in
> one
> > >> instance of *"byte[]"* loaded by *""*.
> > >>
> > >> *org.apache.kafka.common.network.KafkaChannel -*
> > >>
> > >> occupies *352,527,424 (24.96%)* bytes. The memory is accumulated in
> one
> > >> instance of *"byte[]"* loaded by *""*
> > >>
> > >> * - *
> > >>
> > >> Both of these were holding about 352MB of space. 3 such instances, so
> > >> they were consuming about 1.2GB of memory.
> > >>
> > >> Now regarding usage of producers. Not a huge amount of logs are being
> > >> sent to Kafka cluster. It is about 200 msgs/sec. Only one producer
> > object
> > >> is being used throughout application. Async send function is used.
> > >>
> > >> What could be the cause of such huge memory usage? Is this some sort
> of
> > >> memory leak in this specific Kafka version?
> > >>
> > >>
> >
>


Re: Kafka producer huge memory usage (leak?)

2018-09-21 Thread Shantanu Deshmukh
Hi Manikumar,
I am using correct protocol. SSL and truststore is also correctly
configured and there is a single port which is for SSL. Otherwise it simply
wouldn't work. Data is getting produced all fine, it's just that producer
object is consuming massive amount of memory.

On Fri, Sep 21, 2018 at 2:36 PM Manikumar  wrote:

> Hi,
> Instead trying the PR,  make sure you are setting valid security protocol
> and connecting to valid broker port.
> also looks for any errors in producer logs.
>
> Thanks,
>
>
>
>
>
> On Fri, Sep 21, 2018 at 12:35 PM Shantanu Deshmukh 
> wrote:
>
> > Hi Manikumar,
> >
> > I checked this issue. There is this patch available.
> > https://github.com/apache/kafka/pull/2408.patch
> >
> > I pulled Kafka 0.10.1.0 from github. Then tried applying this patch. But
> > several places I am getting error that patch doesn't apply.
> > I am new to git and patching process. Can you guide me here?
> >
> > On Wed, Sep 19, 2018 at 1:02 PM Manikumar 
> > wrote:
> >
> > > Similar issue reported here:KAFKA-7304, but on broker side.  maybe you
> > can
> > > create a JIRA and upload the heap dump for analysis.
> > >
> > > On Wed, Sep 19, 2018 at 11:59 AM Shantanu Deshmukh <
> > shantanu...@gmail.com>
> > > wrote:
> > >
> > > > Any thoughts on this matter? Someone, please help.
> > > >
> > > > On Tue, Sep 18, 2018 at 6:05 PM Shantanu Deshmukh <
> > shantanu...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Additionally, here's the producer config
> > > > >
> > > > > kafka.bootstrap.servers=x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092
> > > > > kafka.acks=0
> > > > >
> > > >
> > >
> >
> kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
> > > > > kafka.max.block.ms=1000
> > > > > kafka.request.timeout.ms=1000
> > > > > kafka.max.in.flight.requests.per.connection=1
> > > > > kafka.retries=0
> > > > > kafka.compression.type=gzip
> > > > > kafka.security.protocol=SSL
> > > > >
> kafka.ssl.truststore.location=/data/kafka/kafka-server-truststore.jks
> > > > > kafka.ssl.truststore.password=XX
> > > > > kafka.linger.ms=300
> > > > > logger.level=INFO
> > > > >
> > > > > On Tue, Sep 18, 2018 at 5:36 PM Shantanu Deshmukh <
> > > shantanu...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > >> Hello,
> > > > >>
> > > > >> We have a 3 broker Kafka 0.10.1.0 deployment in production. There
> > are
> > > > >> some applications which have Kafka Producers embedded in them
> which
> > > send
> > > > >> application logs to a topic. This topic has 10 partitions with
> > > > replication
> > > > >> factor of 3.
> > > > >>
> > > > >> We are observing that memory usage on some of these application
> > > servers
> > > > >> keep shooting through the roof intermittently. After taking
> heapdump
> > > we
> > > > >> found out that top suspects were:
> > > > >> *-*
> > > > >>
> > > > >>
> > > > >> *org.apache.kafka.common.network.Selector -*
> > > > >>
> > > > >> occupies *352,519,104 (24.96%)* bytes. The memory is accumulated
> in
> > > one
> > > > >> instance of *"byte[]"* loaded by *""*.
> > > > >>
> > > > >> *org.apache.kafka.common.network.KafkaChannel -*
> > > > >>
> > > > >> occupies *352,527,424 (24.96%)* bytes. The memory is accumulated
> in
> > > one
> > > > >> instance of *"byte[]"* loaded by *""*
> > > > >>
> > > > >> * - *
> > > > >>
> > > > >> Both of these were holding about 352MB of space. 3 such instances,
> > so
> > > > >> they were consuming about 1.2GB of memory.
> > > > >>
> > > > >> Now regarding usage of producers. Not a huge amount of logs are
> > being
> > > > >> sent to Kafka cluster. It is about 200 msgs/sec. Only one producer
> > > > object
> > > > >> is being used throughout application. Async send function is used.
> > > > >>
> > > > >> What could be the cause of such huge memory usage? Is this some
> sort
> > > of
> > > > >> memory leak in this specific Kafka version?
> > > > >>
> > > > >>
> > > >
> > >
> >
>


Kafka SASL auth setup error: Connection to node 0 (localhost/127.0.0.1:9092) terminated during authentication

2019-04-03 Thread Shantanu Deshmukh
Hello everyone,

I am trying to setup Kafka SASL authentication on my single node Kafka on
my local machine. version 2.

Here's my Kafka broker JAAS file:

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin"
   user_admin="admin"
   user_dip="dip";
};
Client {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret";
};

Zookeeper JAAS file:

Server {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="admin-secret";
};

Kafka broker properties

listeners=SASL_PLAINTEXT://localhost:9092authroizer.class.name=kafka.security.auth.SimpleAclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAINTEXT
sasl.enabled.mechanisms=PLAINTEXT

Zookeeper properties:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=360

When I try to start Kafka server I am continuously getting this error.

[2019-04-03 16:32:31,267] DEBUG Accepted connection from
/127.0.0.1:45794 on /127.0.0.1:9092 and assigned it to processor 1,
sendBufferSize [actual|requested]: [102400|102400] recvBufferSize
[actual|requested]: [102400|102400] (kafka.network.Acceptor)
[2019-04-03 16:32:31,267] DEBUG Processor 1 listening to new
connection from /127.0.0.1:45794 (kafka.network.Processor)
[2019-04-03 16:32:31,268] WARN [Controller id=0, targetBrokerId=0]
Unexpected error from localhost/127.0.0.1; closing connection
(org.apache.kafka.common.network.Selector)
java.lang.NullPointerException
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:266)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:204)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:141)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2019-04-03 16:32:31,268] WARN [Controller id=0, targetBrokerId=0]
Connection to node 0 (localhost/127.0.0.1:9092) terminated during
authentication. This may indicate that authentication failed due to
invalid credentials. (org.apache.kafka.clients.NetworkClient)

Please help. Unable to understand this problem.


Thanks & Regards,

Shantanu Deshmukh


Re: Kafka SASL auth setup error: Connection to node 0 (localhost/127.0.0.1:9092) terminated during authentication

2019-04-09 Thread Shantanu Deshmukh
That was a blooper. But even after correcting, it still isn't working.
Still getting the same error.
Here are the configs again:

*Kafka config: *

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="admin-secret"
   user_dip="dip";
};
KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="dip"
   password="dip-secret";
};

*Zookeeper config:*

Server {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="admin-secret";
};

On Mon, Apr 8, 2019 at 2:11 PM 1095193...@qq.com <1095193...@qq.com> wrote:

>
>
> On 2019/04/03 13:08:45, Shantanu Deshmukh  wrote:
> > Hello everyone,
> >
> > I am trying to setup Kafka SASL authentication on my single node Kafka on
> > my local machine. version 2.
> >
> > Here's my Kafka broker JAAS file:
> >
> > KafkaServer {
> >org.apache.kafka.common.security.plain.PlainLoginModule required
> >username="admin"
> >password="admin"
> >user_admin="admin"
> >user_dip="dip";
> > };
> > Client {
> >org.apache.kafka.common.security.plain.PlainLoginModule required
> >username="admin"
> >password="admin-secret";
> > };
> >
> > Zookeeper JAAS file:
> >
> > Server {
> >org.apache.kafka.common.security.plain.PlainLoginModule required
> >username="admin"
> >password="admin-secret"
> >user_admin="admin-secret";
> > };
> >
> > Kafka broker properties
> >
> > listeners=SASL_PLAINTEXT://localhost:9092authroizer.class.name
> =kafka.security.auth.SimpleAclAuthorizer
> > security.inter.broker.protocol=SASL_PLAINTEXT
> > sasl.mechanism.inter.broker.protocol=PLAINTEXT
> > sasl.enabled.mechanisms=PLAINTEXT
> >
> > Zookeeper properties:
> >
> >
> authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
> > requireClientAuthScheme=sasl
> > jaasLoginRenew=360
> >
> > When I try to start Kafka server I am continuously getting this error.
> >
> > [2019-04-03 16:32:31,267] DEBUG Accepted connection from
> > /127.0.0.1:45794 on /127.0.0.1:9092 and assigned it to processor 1,
> > sendBufferSize [actual|requested]: [102400|102400] recvBufferSize
> > [actual|requested]: [102400|102400] (kafka.network.Acceptor)
> > [2019-04-03 16:32:31,267] DEBUG Processor 1 listening to new
> > connection from /127.0.0.1:45794 (kafka.network.Processor)
> > [2019-04-03 16:32:31,268] WARN [Controller id=0, targetBrokerId=0]
> > Unexpected error from localhost/127.0.0.1; closing connection
> > (org.apache.kafka.common.network.Selector)
> > java.lang.NullPointerException
> > at
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:266)
> > at
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:204)
> > at
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:141)
> > at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
> > at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
> > at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
> > at
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
> > at
> kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279)
> > at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> > [2019-04-03 16:32:31,268] WARN [Controller id=0, targetBrokerId=0]
> > Connection to node 0 (localhost/127.0.0.1:9092) terminated during
> > authentication. This may indicate that authentication failed due to
> > invalid credentials. (org.apache.kafka.clients.NetworkClient)
> >
> > Please help. Unable to understand this problem.
> >
> >
> > Thanks & Regards,
> >
> > Shantanu Deshmukh
> >
> user_admin="admin" in KafkaServer is not consistent with
> password="admin-secret" in Client
>


Re: How to handle kafka large messages

2019-04-09 Thread Shantanu Deshmukh
Well,
from your own synopsis it is clear that message you want to send it much
larger than max.message.bytes setting on broker. You can modify it.
However, do keep in mind that if you seem to be constantly increasing this
limit then you have to look at your message itself. Does it really need to
be that large? Large messages can stress out the cluster.

On Tue, Apr 9, 2019 at 4:50 PM Rammohan Vanteru 
wrote:

> Hi Users,
>
> Let me know if any one faced this issue.
>
> I have went through multiple articles but has different answers. Just want
> to check with kafka users.
>
> Below are the setting i have on kafka cluster. What are the tuning
> parameters to overcome this large message size issue.
>
>
> Kafka version: 0.11
> Number of nodes in a kafka cluster: 3 nodes
> Number topic and partitions: 1topic and 10 partitions.
> Message size: upto 5mb
> Max.messages.bytes on topic is 2mb
>
> Error message:
>
> 201904-09 00:00:02.469 ERROR 35301 --- [ad | producer-1]
> c.b.a.s.p.KafkaTelemetryConsumer : Failed to send TelemetryHarvesterServer
> with data size 1090517 to kafka.
>
> org.springframework.kafka.core.KafkaProducerException: Failed to send;
> nested exception is org.apache.kafka.common.errors.RecordTooLargeException:
> The request included a message larger than the max message size the server
> will accept.
> ```
>


Re: Kafka SASL auth setup error: Connection to node 0 (localhost/127.0.0.1:9092) terminated during authentication

2019-04-10 Thread Shantanu Deshmukh
So you mean three sections, namely KafkaServer, KafkaClient and Client
needs to be kept inside a file and passed to broker. And section Server
needs to be passed to zookeeper? I did exactly that, even copied your
config. It still isn't working.

On Wed, Apr 10, 2019 at 7:07 AM 1095193...@qq.com <1095193...@qq.com> wrote:

>
>
> On 2019/04/09 11:21:10, Shantanu Deshmukh  wrote:
> > That was a blooper. But even after correcting, it still isn't working.
> > Still getting the same error.
> > Here are the configs again:
> >
> > *Kafka config: *
> >
> > KafkaServer {
> >org.apache.kafka.common.security.plain.PlainLoginModule required
> >username="admin"
> >password="admin-secret"
> >user_admin="admin-secret"
> >user_dip="dip";
> > };
> > KafkaClient {
> >org.apache.kafka.common.security.plain.PlainLoginModule required
> >username="dip"
> >password="dip-secret";
> > };
> >
> > *Zookeeper config:*
> >
> > Server {
> >org.apache.kafka.common.security.plain.PlainLoginModule required
> >username="admin"
> >password="admin-secret"
> >user_admin="admin-secret";
> > };
> >
> > On Mon, Apr 8, 2019 at 2:11 PM 1095193...@qq.com <1095193...@qq.com>
> wrote:
> >
> > >
> > >
> > > On 2019/04/03 13:08:45, Shantanu Deshmukh 
> wrote:
> > > > Hello everyone,
> > > >
> > > > I am trying to setup Kafka SASL authentication on my single node
> Kafka on
> > > > my local machine. version 2.
> > > >
> > > > Here's my Kafka broker JAAS file:
> > > >
> > > > KafkaServer {
> > > >org.apache.kafka.common.security.plain.PlainLoginModule required
> > > >username="admin"
> > > >password="admin"
> > > >user_admin="admin"
> > > >user_dip="dip";
> > > > };
> > > > Client {
> > > >org.apache.kafka.common.security.plain.PlainLoginModule required
> > > >username="admin"
> > > >password="admin-secret";
> > > > };
> > > >
> > > > Zookeeper JAAS file:
> > > >
> > > > Server {
> > > >org.apache.kafka.common.security.plain.PlainLoginModule required
> > > >username="admin"
> > > >password="admin-secret"
> > > >user_admin="admin-secret";
> > > > };
> > > >
> > > > Kafka broker properties
> > > >
> > > > listeners=SASL_PLAINTEXT://localhost:9092authroizer.class.name
> > > =kafka.security.auth.SimpleAclAuthorizer
> > > > security.inter.broker.protocol=SASL_PLAINTEXT
> > > > sasl.mechanism.inter.broker.protocol=PLAINTEXT
> > > > sasl.enabled.mechanisms=PLAINTEXT
> > > >
> > > > Zookeeper properties:
> > > >
> > > >
> > >
> authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
> > > > requireClientAuthScheme=sasl
> > > > jaasLoginRenew=360
> > > >
> > > > When I try to start Kafka server I am continuously getting this
> error.
> > > >
> > > > [2019-04-03 16:32:31,267] DEBUG Accepted connection from
> > > > /127.0.0.1:45794 on /127.0.0.1:9092 and assigned it to processor 1,
> > > > sendBufferSize [actual|requested]: [102400|102400] recvBufferSize
> > > > [actual|requested]: [102400|102400] (kafka.network.Acceptor)
> > > > [2019-04-03 16:32:31,267] DEBUG Processor 1 listening to new
> > > > connection from /127.0.0.1:45794 (kafka.network.Processor)
> > > > [2019-04-03 16:32:31,268] WARN [Controller id=0, targetBrokerId=0]
> > > > Unexpected error from localhost/127.0.0.1; closing connection
> > > > (org.apache.kafka.common.network.Selector)
> > > > java.lang.NullPointerException
> > > > at
> > >
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:266)
> > > > at
> > >
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:204)
> > > > at
> > >
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:141)
> > > > at
&

Kafka 2.0.0 - How to verify if Kafka compression is working

2021-05-11 Thread Shantanu Deshmukh
I am trying snappy compression on my producer. Here's my setup

Kafka - 2.0.0
Spring-Kafka - 2.1.2

Here's my producer config

compressed producer ==

configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10);

config of un-compressed producer 

configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);

My payload is almost 1mb worth of string. After sending 1000 compressed and
1000 uncompressed such messages this is the result
===
[shantanu@oc0148610736 uncompressed-string-test-0]$ du -hsc
/data/compressed-string-test-0/*
8.0K /data/compressed-string-test-0/.index
990M /data/compressed-string-test-0/.log
12K /data/compressed-string-test-0/.timeindex
4.0K /data/compressed-string-test-0/leader-epoch-checkpoint
990M total

[shantanu@oc0148610736 uncompressed-string-test-0]$ du -shc
/data/uncompressed-string-test-0/*
8.0K/data/uncompressed-string-test-0/.index
992M/data/uncompressed-string-test-0/.log
12K /data/uncompressed-string-test-0/.timeindex
4.0K/data/uncompressed-string-test-0/leader-epoch-checkpoint
992Mtotal
===

Here we can see the difference is merely 2MB. Is compression even working?
I used dump-log-segment tool
===
[shantanu@oc0148610736 kafka_2.11-2.0.0]$ sh bin/kafka-run-class.sh
kafka.tools.DumpLogSegments --files
/data/compressed-string-test-0/.log --print-data-log |
head | grep compresscodec

offset: 0 position: 0 CreateTime: 1620744081357 isvalid: true keysize:
-1 valuesize: 103 magic: 2 compresscodec: SNAPPY producerId: -1
producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []
payload: 
klxhbpyxmcazvhekqnltuenwhsewjjfmctcqyrppellyfqglfnvhqctlfplslhpuulknsncbgzzndizwmlnelotcbniyprdgihdazwn
===

I can see SNAPPY is mentioned as compression codec. But the difference
between compressed and uncompressed disk size is negligible.

I tried gzip later on. And results are
===
[shantanu@oc0148610736 uncompressed-string-test-0]$ du -hsc
/data/compressed-string-test-0/*
8.0K /data/compressed-string-test-0/.index
640M /data/compressed-string-test-0/.log
12K /data/compressed-string-test-0/.timeindex
4.0K /data/compressed-string-test-0/leader-epoch-checkpoint
640M total
===

So gzip seems to have worked somehow. I tried lz4 compression as well.
Results were same as that of snappy.

Is snappy/lz4 compression really working here? Gzip seems to be working but
I have read a lot that snappy gives best CPU usage to compression ratio
balance. So we want to go ahead with snappy.

Please help

*Thanks & Regards,*
*Shantanu*


Re: Kafka 2.0.0 - How to verify if Kafka compression is working

2021-05-11 Thread Shantanu Deshmukh
Hey Nitin,

I have already done that. I used dump-log-segments option. And I can see
the codec used is snappy/gzip/lz4. My question is, only gzip is giving me
compression. Rest are equivalent to uncompressed storage,

On Wed, May 12, 2021 at 11:16 AM nitin agarwal 
wrote:

> You can read the data from the disk and see compression type.
> https://thehoard.blog/how-kafkas-storage-internals-work-3a29b02e026
>
> Thanks,
> Nitin
>
> On Wed, May 12, 2021 at 11:10 AM Shantanu Deshmukh 
> wrote:
>
> > I am trying snappy compression on my producer. Here's my setup
> >
> > Kafka - 2.0.0
> > Spring-Kafka - 2.1.2
> >
> > Here's my producer config
> >
> > compressed producer ==
> >
> > configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > bootstrapServer);
> > configProps.put(
> > ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > StringSerializer.class);
> > configProps.put(
> > ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > StringSerializer.class);
> > configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
> > configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10);
> >
> > config of un-compressed producer 
> >
> > configProps.put(
> > ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > bootstrapServer);
> > configProps.put(
> > ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > StringSerializer.class);
> > configProps.put(
> > ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > StringSerializer.class);
> >
> > My payload is almost 1mb worth of string. After sending 1000 compressed
> and
> > 1000 uncompressed such messages this is the result
> > ===
> > [shantanu@oc0148610736 uncompressed-string-test-0]$ du -hsc
> > /data/compressed-string-test-0/*
> > 8.0K /data/compressed-string-test-0/.index
> > 990M /data/compressed-string-test-0/.log
> > 12K /data/compressed-string-test-0/.timeindex
> > 4.0K /data/compressed-string-test-0/leader-epoch-checkpoint
> > 990M total
> >
> > [shantanu@oc0148610736 uncompressed-string-test-0]$ du -shc
> > /data/uncompressed-string-test-0/*
> > 8.0K/data/uncompressed-string-test-0/.index
> > 992M/data/uncompressed-string-test-0/.log
> > 12K /data/uncompressed-string-test-0/.timeindex
> > 4.0K/data/uncompressed-string-test-0/leader-epoch-checkpoint
> > 992Mtotal
> > ===
> >
> > Here we can see the difference is merely 2MB. Is compression even
> working?
> > I used dump-log-segment tool
> > ===
> > [shantanu@oc0148610736 kafka_2.11-2.0.0]$ sh bin/kafka-run-class.sh
> > kafka.tools.DumpLogSegments --files
> > /data/compressed-string-test-0/.log --print-data-log
> |
> > head | grep compresscodec
> >
> > offset: 0 position: 0 CreateTime: 1620744081357 isvalid: true keysize:
> > -1 valuesize: 103 magic: 2 compresscodec: SNAPPY producerId: -1
> > producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []
> > payload:
> >
> klxhbpyxmcazvhekqnltuenwhsewjjfmctcqyrppellyfqglfnvhqctlfplslhpuulknsncbgzzndizwmlnelotcbniyprdgihdazwn
> > ===
> >
> > I can see SNAPPY is mentioned as compression codec. But the difference
> > between compressed and uncompressed disk size is negligible.
> >
> > I tried gzip later on. And results are
> > ===
> > [shantanu@oc0148610736 uncompressed-string-test-0]$ du -hsc
> > /data/compressed-string-test-0/*
> > 8.0K /data/compressed-string-test-0/.index
> > 640M /data/compressed-string-test-0/.log
> > 12K /data/compressed-string-test-0/.timeindex
> > 4.0K /data/compressed-string-test-0/leader-epoch-checkpoint
> > 640M total
> > ===
> >
> > So gzip seems to have worked somehow. I tried lz4 compression as well.
> > Results were same as that of snappy.
> >
> > Is snappy/lz4 compression really working here? Gzip seems to be working
> but
> > I have read a lot that snappy gives best CPU usage to compression ratio
> > balance. So we want to go ahead with snappy.
> >
> > Please help
> >
> > *Thanks & Regards,*
> > *Shantanu*
> >
>


Re: Kafka 2.0.0 - How to verify if Kafka compression is working

2021-05-12 Thread Shantanu Deshmukh
I have some updates on this.
I tried this on latest kafka 2.8. Ran my application. Results are same,
snappy and lz4 dont seem to be working as uncompressed and compressed
storage both measure the same.

*I even tried kafka-producer-perf-test tool*. Below are the results

Without any compression:
==>>
sh bin/kafka-producer-perf-test.sh --num-records 10 --throughput 1
--record-size 102400 --topic perf-test-uncompressed --producer-props
*compression.type=none* bootstrap.servers=localhost:9092 --print-metrics

10 records sent, *862.113558 records/sec (84.19 MB/sec)*, 376.08 ms avg
latency, 1083.00 ms max latency, 371 ms 50th, 610 ms 95th, 778 ms 99th,
1061 ms 99.9th.
...
producer-topic-metrics:*compression-rate*:{client-id=producer-1,
topic=perf-test-uncompressed}   : *1.000*

With snappy compression:
==>>
sh bin/kafka-producer-perf-test.sh --num-records 10 --throughput 1
--record-size 102400 --topic perf-test-uncompressed --producer-props
*compression.type=snappy
batch.size=10 linger.ms <http://linger.ms>=5
*bootstrap.servers=localhost:9092
--print-metrics

10 records sent, 599.905215 *records/sec (58.58 MB/sec)*, 540.79 ms avg
latency, 1395.00 ms max latency, 521 ms 50th, 816 ms 95th, 1016 ms 99th,
1171 ms 99.9th.
...
producer-topic-metrics:*compression-rate*:{client-id=producer-1,
topic=perf-test-uncompressed}   : *1.001*

<<=====
Above mentioned compression-rate didnt change even with

With  Gzip compression
*==>>*
sh bin/kafka-producer-perf-test.sh --num-records 10 --throughput 1
--record-size 102400 --topic perf-test-compressed --producer-props
*compression.type=gzip* bootstrap.servers=localhost:9092 *batch.size=10
linger.ms <http://linger.ms>=5* --print-metrics

10 records sent, *200.760078 records/sec (19.61 MB/sec)*, 1531.40 ms
avg latency, 2744.00 ms max latency, 1514 ms 50th, 1897 ms 95th, 2123 ms
99th, 2610 ms 99.9th.
...
producer-topic-metrics:*compression-rate*:{client-id=producer-1,
topic=perf-test-compressed}   : *0.635*

*<<*

To summarise*:*
compression type
messages sent
avg latency/throughput
effective compression-rate
none
10
862.113558 records/sec (84.19 MB/sec)
1.000
snappy
10
599.905215 records/sec (58.58 MB/sec),
1.001
gzip
10
200.760078 records/sec (19.61 MB/sec)
0.635

In short snappy = uncompressed !! Why is this happening?

On Wed, May 12, 2021 at 11:40 AM Shantanu Deshmukh 
wrote:

> Hey Nitin,
>
> I have already done that. I used dump-log-segments option. And I can see
> the codec used is snappy/gzip/lz4. My question is, only gzip is giving me
> compression. Rest are equivalent to uncompressed storage,
>
> On Wed, May 12, 2021 at 11:16 AM nitin agarwal 
> wrote:
>
>> You can read the data from the disk and see compression type.
>> https://thehoard.blog/how-kafkas-storage-internals-work-3a29b02e026
>>
>> Thanks,
>> Nitin
>>
>> On Wed, May 12, 2021 at 11:10 AM Shantanu Deshmukh > >
>> wrote:
>>
>> > I am trying snappy compression on my producer. Here's my setup
>> >
>> > Kafka - 2.0.0
>> > Spring-Kafka - 2.1.2
>> >
>> > Here's my producer config
>> >
>> > compressed producer ==
>> >
>> > configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>> > bootstrapServer);
>> > configProps.put(
>> > ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>> > StringSerializer.class);
>> > configProps.put(
>> > ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>> > StringSerializer.class);
>> > configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
>> > configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10);
>> >
>> > config of un-compressed producer 
>> >
>> > configProps.put(
>> > ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>> > bootstrapServer);
>> > configProps.put(
>> > ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>> > StringSerializer.class);
>> > configProps.put(
>> > ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>> > StringSerializer.class);
>> >
>> > My payload is almost 1mb worth of string. After sending 1000 compressed
>> and
>> > 1000 uncompressed such messages this is the result
>> > ===
>> > [shantanu@oc0148610736 uncompressed-string-test-0]$ du -hsc
>> > /data/compressed-string-test-0/*
>> > 8.0K /data/compressed-string-test-0/0

Re: Kafka 2.0.0 - How to verify if Kafka compression is working

2021-05-12 Thread Shantanu Deshmukh
Hi Scott,
Thanks for responding.

As part of my test. I have also run my own program wherein I used both
random strings and json messages.
I was aware that snappy may not perform good on random strings. So I used a
10kb json message and sent 10,000 messages of that same json.
So with this test payload the entire message is same, repeating over and
over again.

At least with that it could have compressed. But to no avail!

I also applied batch.size=1 and linger.ms=10 still it didnt work.

On Thu, May 13, 2021 at 11:32 AM Scott Carey  wrote:

> Snappy and lz4 do not have entropy encoders (
> https://en.wikipedia.org/wiki/Entropy_encoding).  If your data is random
> text they will not compress.  If your text is a string of all zeros or any
> repeating pattern, it will compress significantly.  If its something like
> JSON, or XML it will compress.
>
> I suspect you either aren't using real world data, or haven't compared the
> compression with different types of data (json?  web pages?   numbers?).
> No compression test is of much use unless you specify _what_ you are trying
> to compress and either construct a realistic corpus for your use case, or
> test with a few well defined types of real data that might be similar to
> your expected use case.
>
>   Gzip and zstandard have entropy encoding (Huffman for gzip, and a
> combination of Huffman and ANS for zstandard).  With these, even if your
> text is purely random _text_, it will compress somewhat since text doesn't
> use all 256 possible byte values and so it can use less than 8 bits per
> character in the encoding.
>
>
>
> On Wed, May 12, 2021, 22:35 Shantanu Deshmukh 
> wrote:
>
> > I have some updates on this.
> > I tried this on latest kafka 2.8. Ran my application. Results are same,
> > snappy and lz4 dont seem to be working as uncompressed and compressed
> > storage both measure the same.
> >
> > *I even tried kafka-producer-perf-test tool*. Below are the results
> >
> > Without any compression:
> > ==>>
> > sh bin/kafka-producer-perf-test.sh --num-records 10 --throughput
> 1
> > --record-size 102400 --topic perf-test-uncompressed --producer-props
> > *compression.type=none* bootstrap.servers=localhost:9092 --print-metrics
> >
> > 10 records sent, *862.113558 records/sec (84.19 MB/sec)*, 376.08 ms
> avg
> > latency, 1083.00 ms max latency, 371 ms 50th, 610 ms 95th, 778 ms 99th,
> > 1061 ms 99.9th.
> > ...
> > producer-topic-metrics:*compression-rate*:{client-id=producer-1,
> > topic=perf-test-uncompressed}   : *1.000*
> >
> > With snappy compression:
> > ==>>
> > sh bin/kafka-producer-perf-test.sh --num-records 10 --throughput
> 1
> > --record-size 102400 --topic perf-test-uncompressed --producer-props
> > *compression.type=snappy
> > batch.size=10 linger.ms <http://linger.ms>=5
> > *bootstrap.servers=localhost:9092
> > --print-metrics
> >
> > 10 records sent, 599.905215 *records/sec (58.58 MB/sec)*, 540.79 ms
> avg
> > latency, 1395.00 ms max latency, 521 ms 50th, 816 ms 95th, 1016 ms 99th,
> > 1171 ms 99.9th.
> > ...
> > producer-topic-metrics:*compression-rate*:{client-id=producer-1,
> > topic=perf-test-uncompressed}   : *1.001*
> >
> > <<=====
> > Above mentioned compression-rate didnt change even with
> >
> > With  Gzip compression
> > *==>>*
> > sh bin/kafka-producer-perf-test.sh --num-records 10 --throughput
> 1
> > --record-size 102400 --topic perf-test-compressed --producer-props
> > *compression.type=gzip* bootstrap.servers=localhost:9092
> *batch.size=10
> > linger.ms <http://linger.ms>=5* --print-metrics
> >
> > 10 records sent, *200.760078 records/sec (19.61 MB/sec)*, 1531.40 ms
> > avg latency, 2744.00 ms max latency, 1514 ms 50th, 1897 ms 95th, 2123 ms
> > 99th, 2610 ms 99.9th.
> > ...
> > producer-topic-metrics:*compression-rate*:{client-id=producer-1,
> > topic=perf-test-compressed}   : *0.635*
> >
> > *<<====*
> >
> > To summarise*:*
> > compression type
> > messages sent
> > avg latency/throughput
> > effective compression-rate
> > none
> > 10
> > 862.113558 records/sec (84.19 MB/sec)
> > 1.000
> > snappy
> > 10
> > 599.905215 records/sec (58.58 MB/sec),
> > 1.001
> > gzip
> > 10
> > 200.760078 records/sec (19.61 MB/sec)
> > 0.635
> >
> > In short snappy = uncompressed !! Why is