Proper way of handling connectivity issues

2016-03-27 Thread Anas Mosaad
Hi All,

I am having a kafka client (using 0.8.1) connecting to a server (running
0.8.0). After a while, we see many log messages and log grow to ~50G -
while the server is not loaded. The exception is:

ERROR Closing socket for /<> because of error
(kafka.network.Processor)

kafka.common.KafkaException: Wrong request type 10

at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:53)

at kafka.network.RequestChannel$Request.(RequestChannel.scala:49)

at kafka.network.Processor.read(SocketServer.scala:353)

at kafka.network.Processor.run(SocketServer.scala:245)

at java.lang.Thread.run(Thread.java:745)

I looked at this thread
http://mail-archives.apache.org/mod_mbox/kafka-dev/201501.mbox/%3ccaoejijgnidpnvr-tpcpkzqfkuwdj+rtymvgkkkdwmkczfqm...@mail.gmail.com%3E
But I don't believe it's an API issue as everything has been working for a
while. No code change was added to the client.

I suspect that it could be due to connectivity issue as everything has been
working for a while. If my assumption is correct, what would be the correct
way to handle such failure?
What else could be a reason for such exception?

Thanks a lot for your kind support.

-- 

*Best Regards/أطيب المنى,*

*Anas Mosaad*
*Incorta Inc.*
*+20-100-743-4510*


Re: Unexpected response on SyncGroup call

2016-03-27 Thread Cees de Groot
Ah. That makes sense. That also means that on the outgoing request, this
part should be coded as ? These parts of
the documentation aren't entirely clear on what is part of the
client/server protocol and what is part of the (client-side only) consumer
group implementation.

I'll put my request and response next to each other, I do send some
assignments in the SyncGroup request but it may be that because of me
misinterpreting the docs I accidently drop four zeroes in a bit that the
server expects as a length marker.

On Sat, Mar 26, 2016 at 6:05 PM, Dana Powers  wrote:

> The MemberAssignment bytes returned in SyncResponse should be the bytes
> that your leader sent in its SyncRequest. <<0, 0, 0, 0>> is simply an empty
> Bytes array (32 bit length of 0). The broker does not alter those bytes as
> far as I know, so despite the protocol doc describing what a
> MemberAssignment struct *should* look like, it really is up to your client
> to encode and decode that struct. Can you check what bytes your leader code
> is sending?
>
> I assume you have this covered, but for completeness: each consumer should
> check its JoinResponse to determine whether it has been selected as the
> group leader. If the consumer is the leader, it needs to do the assignments
> and send those back in its SyncRequest. All other consumers just send an
> empty SyncRequest. SyncResponse for all consumers then includes the
> assignments sent by the leader.
>
> -Dana
>
> On Sat, Mar 26, 2016 at 2:39 PM, Cees de Groot  wrote:
>
> > I'm helping out on adding 0.9 stuff to the Elixir Kafka driver (
> > https://github.com/kafkaex/kafka_ex), and I'm getting an unexpected
> > response on an integration test that makes a simple SyncGroup call.
> >
> > In Elixir terms, I'm getting <<0, 0, 0, 3, 0, 0, 0, 0, 0, 0>> back. My
> > interpretation:
> >
> > 32 bits correlation id = 0,0,0,3
> > 16 bits error code = 0,0
> >
> > There's now 32 bits left. However, I'm expecting (following
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-SyncGroupResponse
> > )
> > a MemberAssignment structure here, which is
> >
> > 16 bits of version (0)
> > 32 bits of length (it's a simple integration test, so for now i'm happy
> to
> > accept the answer "0 assignments for you, dude!" ;-)).
> >
> > Which clearly is more data than left in the response.
> >
> > Am I misinterpreting the documentation here?
> >
>



-- 

*Cees de Groot*
PRINCIPAL SOFTWARE ENGINEER
[image: PagerDuty logo] 
pagerduty.com
c...@pagerduty.com 
+1(416)435-4085

[image: Twitter] [image: FaceBook]
[image: Google+]
[image: LinkedIn]
[image: Blog]



Re: Unexpected response on SyncGroup call

2016-03-27 Thread Dana Powers
Agree, it's a bit confusing based on wiki alone right now. I think of
MemberAssignment as a double-serialized field: first serialize to bytes
using the MemberAssignment definition (version, topics, partitions, etc);
then serialize the bytes into a request. It's the second serialization step
that adds the 32bit length. Repeat backwards for deserializing.

-Dana
On Mar 27, 2016 6:01 AM, "Cees de Groot"  wrote:

> Ah. That makes sense. That also means that on the outgoing request, this
> part should be coded as ? These parts of
> the documentation aren't entirely clear on what is part of the
> client/server protocol and what is part of the (client-side only) consumer
> group implementation.
>
> I'll put my request and response next to each other, I do send some
> assignments in the SyncGroup request but it may be that because of me
> misinterpreting the docs I accidently drop four zeroes in a bit that the
> server expects as a length marker.
>
> On Sat, Mar 26, 2016 at 6:05 PM, Dana Powers 
> wrote:
>
> > The MemberAssignment bytes returned in SyncResponse should be the bytes
> > that your leader sent in its SyncRequest. <<0, 0, 0, 0>> is simply an
> empty
> > Bytes array (32 bit length of 0). The broker does not alter those bytes
> as
> > far as I know, so despite the protocol doc describing what a
> > MemberAssignment struct *should* look like, it really is up to your
> client
> > to encode and decode that struct. Can you check what bytes your leader
> code
> > is sending?
> >
> > I assume you have this covered, but for completeness: each consumer
> should
> > check its JoinResponse to determine whether it has been selected as the
> > group leader. If the consumer is the leader, it needs to do the
> assignments
> > and send those back in its SyncRequest. All other consumers just send an
> > empty SyncRequest. SyncResponse for all consumers then includes the
> > assignments sent by the leader.
> >
> > -Dana
> >
> > On Sat, Mar 26, 2016 at 2:39 PM, Cees de Groot 
> wrote:
> >
> > > I'm helping out on adding 0.9 stuff to the Elixir Kafka driver (
> > > https://github.com/kafkaex/kafka_ex), and I'm getting an unexpected
> > > response on an integration test that makes a simple SyncGroup call.
> > >
> > > In Elixir terms, I'm getting <<0, 0, 0, 3, 0, 0, 0, 0, 0, 0>> back. My
> > > interpretation:
> > >
> > > 32 bits correlation id = 0,0,0,3
> > > 16 bits error code = 0,0
> > >
> > > There's now 32 bits left. However, I'm expecting (following
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-SyncGroupResponse
> > > )
> > > a MemberAssignment structure here, which is
> > >
> > > 16 bits of version (0)
> > > 32 bits of length (it's a simple integration test, so for now i'm happy
> > to
> > > accept the answer "0 assignments for you, dude!" ;-)).
> > >
> > > Which clearly is more data than left in the response.
> > >
> > > Am I misinterpreting the documentation here?
> > >
> >
>
>
>
> --
>
> *Cees de Groot*
> PRINCIPAL SOFTWARE ENGINEER
> [image: PagerDuty logo] 
> pagerduty.com
> c...@pagerduty.com 
> +1(416)435-4085
>
> [image: Twitter] [image: FaceBook]
> [image: Google+]
> [image: LinkedIn]
> [image: Blog]
> 
>


Altered retention.ms not working

2016-03-27 Thread Datta, Saurav
Hi all,

We had configured our topics with an incorrect retention.ms of 864. With 
this, we observed the consumers  are unable to receive any data after 
approximate 2.4 hours; this ties in to 864/1000/60/60.

Later we altered the retention.ms to 8640 (an extra 0), but the consumers 
are still unable to receive any data after approximate 2.4 hours.

Here is the command we used:
bin/kafka-topics.sh --alter --zookeeper  --topic  
--config retention.ms=8640

I realize that there maybe multiple reasons outside of retention.ms due to 
which consumers are not receiving data, but  I am interested in knowing if :

  1.  Is there anywhere else that the retention.ms configuration has to be 
altered ?
  2.  Alter retention.ms does not work and maybe the topics have to be deleted 
and re-created with the correct configuration ?

Appreciate your feedback on this.

Regards,
Saurav


Re: Altered retention.ms not working

2016-03-27 Thread Datta, Saurav
Added the Kafka version and updated the command used.


Regards,
Saurav


From: "Datta, Saurav" mailto:sda...@paypal.com>>
Date: Sunday, March 27, 2016 at 10:22 AM
To: "users@kafka.apache.org" 
mailto:users@kafka.apache.org>>
Subject: Altered retention.ms not working

Hi all,

We had configured our topics with an incorrect retention.ms of 864. With 
this, we observed the consumers  are unable to receive any data after 
approximate 2.4 hours; this ties in to 864/1000/60/60.

Later we altered the retention.ms to 8640 (an extra 0), but the consumers 
are still unable to receive any data after approximate 2.4 hours.

Here is the command we used:
bin/kafka-topics.sh --alter --zookeeper  --topic  
--config retention.ms=8640
Version -> kafka_2.10-0.9.0.0
Command -> bin/kafka-configs.sh --alter --zookeeper  --add-config 
retention.ms=8640 --entity-name  --entity-type topics



I realize that there maybe multiple reasons outside of retention.ms due to 
which consumers are not receiving data, but  I am interested in knowing if :

  1.  Is there anywhere else that the retention.ms configuration has to be 
altered ?
  2.  Alter retention.ms does not work and maybe the topics have to be deleted 
and re-created with the correct configuration ?

Appreciate your feedback on this.

Regards,
Saurav


Re: Unexpected response on SyncGroup call

2016-03-27 Thread Cees de Groot
Makes sense. Thanks!

(and now how to find out how to turn off that signature for mailing lists
;-))

On Sun, Mar 27, 2016 at 10:54 AM, Dana Powers  wrote:

> Agree, it's a bit confusing based on wiki alone right now. I think of
> MemberAssignment as a double-serialized field: first serialize to bytes
> using the MemberAssignment definition (version, topics, partitions, etc);
> then serialize the bytes into a request. It's the second serialization step
> that adds the 32bit length. Repeat backwards for deserializing.
>
> -Dana
>
>


Offset after message deletion

2016-03-27 Thread Imre Nagi
Hi All,

I'm new in kafka. So, I have a question related to kafka offset.

>From the kafka documentation in here
, it said :

> The Kafka cluster retains all published messages—whether or not they have
> been consumed—for a configurable period of time. For example if the log
> retention is set to two days, then for the two days after a message is
> published it is available for consumption, after which it will be discarded
> to free up space.
>
After the message has been discarded, let say that we receive some new
message. What is the offset of the new message? Will it restart from 0 or
continue the latest offset?

Thanks,
Imre


Re: Offset after message deletion

2016-03-27 Thread Manikumar Reddy
It will continue from the latest offset. offset is a increasing, contiguous
sequence number per partition.

On Mon, Mar 28, 2016 at 9:11 AM, Imre Nagi  wrote:

> Hi All,
>
> I'm new in kafka. So, I have a question related to kafka offset.
>
> From the kafka documentation in here
> , it said :
>
> > The Kafka cluster retains all published messages—whether or not they have
> > been consumed—for a configurable period of time. For example if the log
> > retention is set to two days, then for the two days after a message is
> > published it is available for consumption, after which it will be
> discarded
> > to free up space.
> >
> After the message has been discarded, let say that we receive some new
> message. What is the offset of the new message? Will it restart from 0 or
> continue the latest offset?
>
> Thanks,
> Imre
>


Re: Leader was set to -1 for some topic partitions

2016-03-27 Thread Raju Bairishetti
After enabling unclean leader election then all topic partitions are having
leaders. Enabling unclean leader election may cause slight data loss but
saves from lots of offline partitions.

My assumptions towards solving issue:
Earlier, we have enabled controlled shutdown and disabled unclean leader
election. Seems like, when we shutdown the broker(assume *broker1*),
leadership is transferred to other broker(*broker2*) as controlled shutdown
was enabled. Data was sent to the new leader(*broker2*) of that topic
partition. After restart of whole cluster, controller was trying to choose
the preferred replica as a leader(which is *broker1*) but that was lot
beyond actual leader(broker2) before the restart of cluster. As we have
disabled unclean leader election, controller is not able to elect the
preferred replica as leader.

After enabling  unclean leader election, all partitions became online
partitions.

*Seems like, enabling controller shutdown and disabling unclean leader
election together is causing multiple offline partitions in the cluster.*



On Thu, Feb 25, 2016 at 11:46 AM, Raju Bairishetti 
wrote:

> Any thoughts on this?
>
> On Mon, Feb 22, 2016 at 1:24 PM, Raju Bairishetti 
> wrote:
>
>>
>>
>> On Mon, Feb 22, 2016 at 1:13 PM, Salman Ahmed 
>> wrote:
>>
>>> We saw a similar issue a while back. If leader is -1, I believe you won't
>>> have ingestion work for that partition. Was there any data ingestion dip?
>>>
>>
>>
>> *No,  I am not seeing any data dip for topic but no data for that
>> partition whose leader was set to -1.*
>>
>> On Sun, Feb 21, 2016 at 7:44 PM Raju Bairishetti  wrote:
>>>
>>> > Hello,
>>> >We are using 0.8.2 kafka version. We are running 5 brokers in the
>>> prod
>>> > cluster. Each topic is having two partitions. We are seeing some issues
>>> > with some topic partitions.
>>> >
>>> > For some topic partitions leader was set to -1. I am not seeing any
>>> errors
>>> > in the controller and server logs. After server restart leader was set
>>> to
>>> > some topic partitions. Will it be a data loss of that topic partition.
>>> > Looks like, there is no data loss according to my application metrics
>>> but I
>>> > do not have any server logs to prove it from kafka side.
>>> >
>>> > *kafka-topics --zookeeper localhost:2181 --describe --topic click_json*
>>> >
>>> > Topic: click_json PartitionCount:2 ReplicationFactor:3
>>> > Configs:retention.bytes=42949672960
>>> >
>>> > Topic: click_json Partition: 0 Leader: 4 Replicas: 4,5,1 Isr: 4,1,5
>>> >
>>> > Topic: click_json *Partition: 1 Leader: -1 *Replicas: 5,1,2 *Isr: *
>>> >
>>> >
>>> > *Why leader was set to -1?*
>>> >
>>> > *What is the impact in case if leader was set to -1?*
>>> >
>>> > *How to recover from this error? Which option would be better -->
>>> Restart
>>> > of broker or choosing leader by running prefer leader election script?
>>> >
>>> >
>>> > FYI, we have set unclean.leader.election.enable to false on 3 machines
>>> and
>>> > unclean.leader.election.enable to true on 2 machines.
>>> >
>>> >
>>> > Thanks in advance!!!
>>> >
>>> > --
>>> > Thanks,
>>> > Raju Bairishetti,
>>> > www.lazada.com
>>> >
>>>
>>
>>
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>>
>> www.lazada.com
>>
>>
>>
>
>
> --
> Thanks,
> Raju Bairishetti,
>
> www.lazada.com
>
>
>


-- 
Thanks,
Raju Bairishetti,

www.lazada.com


Multiple Topics and Consumer Groups

2016-03-27 Thread Vinod Kakad
Hi,

I wanted to know if same consumer can be in two consumer groups.

OR

How the multiple topic subscription for consumer group works.


Thanks & Regards,
Vinod Kakad.


Re: Multiple Topics and Consumer Groups

2016-03-27 Thread Manikumar Reddy
A consumer can belong to only one consumer group.
https://kafka.apache.org/documentation.html#intro_consumers

On Mon, Mar 28, 2016 at 11:01 AM, Vinod Kakad  wrote:

> Hi,
>
> I wanted to know if same consumer can be in two consumer groups.
>
> OR
>
> How the multiple topic subscription for consumer group works.
>
>
> Thanks & Regards,
> Vinod Kakad.
>