monitor rebalanace events

2016-03-01 Thread craig w
Is there some way to monitor when a rebalance occurs other than at the
consumer level?


KafkaConsumer poll() problems

2016-03-01 Thread Péricé Robin
Hello everybody,

I'm having troubles using KafkaConsumer 0.9.0.0 API. My Consumer class
doesn't consumer messages properly.

-
| *Consumer*.java |
-

public final void run() { try {
*consumer.subscribe(Collections.singletonList(topicName));* boolean end =
false; while (!closed.get()) { while (!end) { *final ConsumerRecords records = consumer.poll(1000);* if (records == null || records.
count() == 0) { System.err.println("In ConsumerThread consumer.poll
received nothing on " + topicName); } else { /* some processing on records
here */ end = true; } } } } catch (final WakeupException e) { if (!closed.
get()) { throw e; } } finally { consumer.close(); } }


| *Producer*.java |



public final void run() {
while (true) {
*producer.send(new ProducerRecord(topicName, timems,
tuple), callback);*
}
}

--

I run this exactly the same way as in Github example :
https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java

But I only get "In ConsumerThread consumer.poll received nothing". Poll()
never send me messages ... But when I use command line tools I can see my
messages on the topic.

When I run the basic example from GitHub everything works fine ... So it's
seems like I'm missing something.






*CONSOLE*

[2016-03-01 14:42:14,280] INFO [GroupCoordinator 0]: Preparing to
restabilize group KafkaChannelBasicTestConsumer with old generation 1
(kafka.coordin
ator.GroupCoordinator)
[2016-03-01 14:42:14,281] INFO [GroupCoordinator 0]: Group
KafkaChannelBasicTestConsumer generation 1 is dead and removed
(kafka.coordinator.GroupCoor
dinator)
[2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Preparing to
restabilize group KafkaChannelBasicTestConsumer with old generation 0
(kafka.coordin
ator.GroupCoordinator)
[2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Stabilized group
KafkaChannelBasicTestConsumer generation 1
(kafka.coordinator.GroupCoordinator)
[2016-03-01 14:42:22,797] INFO [GroupCoordinator 0]: Assignment received
from leader for group KafkaChannelBasicTestConsumer for generation 1
(kafka.c
oordinator.GroupCoordinator)
[2016-03-01 14:42:55,808] INFO [GroupCoordinator 0]: Preparing to
restabilize group KafkaChannelBasicTestConsumer with old generation 1
(kafka.coordin
ator.GroupCoordinator)
[2016-03-01 14:42:55,809] INFO [GroupCoordinator 0]: Group
KafkaChannelBasicTestConsumer generation 1 is dead and removed
(kafka.coordinator.GroupCoor
dinator)


I really need help on this !

Regards,

Robin


Re: KafkaConsumer poll() problems

2016-03-01 Thread craig w
Can you try posting your code into a Gist (gist.github.com) or Pastebin, so
it's formatted and easier to read?

On Tue, Mar 1, 2016 at 8:49 AM, Péricé Robin  wrote:

> Hello everybody,
>
> I'm having troubles using KafkaConsumer 0.9.0.0 API. My Consumer class
> doesn't consumer messages properly.
>
> -
> | *Consumer*.java |
> -
>
> public final void run() { try {
> *consumer.subscribe(Collections.singletonList(topicName));* boolean end =
> false; while (!closed.get()) { while (!end) { *final ConsumerRecords ITupleApi> records = consumer.poll(1000);* if (records == null || records.
> count() == 0) { System.err.println("In ConsumerThread consumer.poll
> received nothing on " + topicName); } else { /* some processing on records
> here */ end = true; } } } } catch (final WakeupException e) { if (!closed.
> get()) { throw e; } } finally { consumer.close(); } }
>
> 
> | *Producer*.java |
> 
>
>
> public final void run() {
> while (true) {
> *producer.send(new ProducerRecord(topicName, timems,
> tuple), callback);*
> }
> }
>
> --
>
> I run this exactly the same way as in Github example :
>
> https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
>
> But I only get "In ConsumerThread consumer.poll received nothing". Poll()
> never send me messages ... But when I use command line tools I can see my
> messages on the topic.
>
> When I run the basic example from GitHub everything works fine ... So it's
> seems like I'm missing something.
>
>
>
>
>
>
> *CONSOLE*
>
> [2016-03-01 14:42:14,280] INFO [GroupCoordinator 0]: Preparing to
> restabilize group KafkaChannelBasicTestConsumer with old generation 1
> (kafka.coordin
> ator.GroupCoordinator)
> [2016-03-01 14:42:14,281] INFO [GroupCoordinator 0]: Group
> KafkaChannelBasicTestConsumer generation 1 is dead and removed
> (kafka.coordinator.GroupCoor
> dinator)
> [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Preparing to
> restabilize group KafkaChannelBasicTestConsumer with old generation 0
> (kafka.coordin
> ator.GroupCoordinator)
> [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Stabilized group
> KafkaChannelBasicTestConsumer generation 1
> (kafka.coordinator.GroupCoordinator)
> [2016-03-01 14:42:22,797] INFO [GroupCoordinator 0]: Assignment received
> from leader for group KafkaChannelBasicTestConsumer for generation 1
> (kafka.c
> oordinator.GroupCoordinator)
> [2016-03-01 14:42:55,808] INFO [GroupCoordinator 0]: Preparing to
> restabilize group KafkaChannelBasicTestConsumer with old generation 1
> (kafka.coordin
> ator.GroupCoordinator)
> [2016-03-01 14:42:55,809] INFO [GroupCoordinator 0]: Group
> KafkaChannelBasicTestConsumer generation 1 is dead and removed
> (kafka.coordinator.GroupCoor
> dinator)
>
>
> I really need help on this !
>
> Regards,
>
> Robin
>



-- 

https://github.com/mindscratch
https://www.google.com/+CraigWickesser
https://twitter.com/mind_scratch
https://twitter.com/craig_links


Re: KafkaConsumer poll() problems

2016-03-01 Thread Péricé Robin
Producer : https://gist.github.com/r0perice/9ce2bece76dd4113a44a
Consumer : https://gist.github.com/r0perice/8dcee160017ccd779d59
Console : https://gist.github.com/r0perice/5a8e2b2939651b1ac893

2016-03-01 14:50 GMT+01:00 craig w :

> Can you try posting your code into a Gist (gist.github.com) or Pastebin,
> so
> it's formatted and easier to read?
>
> On Tue, Mar 1, 2016 at 8:49 AM, Péricé Robin 
> wrote:
>
> > Hello everybody,
> >
> > I'm having troubles using KafkaConsumer 0.9.0.0 API. My Consumer class
> > doesn't consumer messages properly.
> >
> > -
> > | *Consumer*.java |
> > -
> >
> > public final void run() { try {
> > *consumer.subscribe(Collections.singletonList(topicName));* boolean end =
> > false; while (!closed.get()) { while (!end) { *final
> ConsumerRecords > ITupleApi> records = consumer.poll(1000);* if (records == null ||
> records.
> > count() == 0) { System.err.println("In ConsumerThread consumer.poll
> > received nothing on " + topicName); } else { /* some processing on
> records
> > here */ end = true; } } } } catch (final WakeupException e) { if
> (!closed.
> > get()) { throw e; } } finally { consumer.close(); } }
> >
> > 
> > | *Producer*.java |
> > 
> >
> >
> > public final void run() {
> > while (true) {
> > *producer.send(new ProducerRecord(topicName, timems,
> > tuple), callback);*
> > }
> > }
> >
> > --
> >
> > I run this exactly the same way as in Github example :
> >
> >
> https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
> >
> > But I only get "In ConsumerThread consumer.poll received nothing". Poll()
> > never send me messages ... But when I use command line tools I can see my
> > messages on the topic.
> >
> > When I run the basic example from GitHub everything works fine ... So
> it's
> > seems like I'm missing something.
> >
> >
> >
> >
> >
> >
> > *CONSOLE*
> >
> > [2016-03-01 14:42:14,280] INFO [GroupCoordinator 0]: Preparing to
> > restabilize group KafkaChannelBasicTestConsumer with old generation 1
> > (kafka.coordin
> > ator.GroupCoordinator)
> > [2016-03-01 14:42:14,281] INFO [GroupCoordinator 0]: Group
> > KafkaChannelBasicTestConsumer generation 1 is dead and removed
> > (kafka.coordinator.GroupCoor
> > dinator)
> > [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Preparing to
> > restabilize group KafkaChannelBasicTestConsumer with old generation 0
> > (kafka.coordin
> > ator.GroupCoordinator)
> > [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Stabilized group
> > KafkaChannelBasicTestConsumer generation 1
> > (kafka.coordinator.GroupCoordinator)
> > [2016-03-01 14:42:22,797] INFO [GroupCoordinator 0]: Assignment received
> > from leader for group KafkaChannelBasicTestConsumer for generation 1
> > (kafka.c
> > oordinator.GroupCoordinator)
> > [2016-03-01 14:42:55,808] INFO [GroupCoordinator 0]: Preparing to
> > restabilize group KafkaChannelBasicTestConsumer with old generation 1
> > (kafka.coordin
> > ator.GroupCoordinator)
> > [2016-03-01 14:42:55,809] INFO [GroupCoordinator 0]: Group
> > KafkaChannelBasicTestConsumer generation 1 is dead and removed
> > (kafka.coordinator.GroupCoor
> > dinator)
> >
> >
> > I really need help on this !
> >
> > Regards,
> >
> > Robin
> >
>
>
>
> --
>
> https://github.com/mindscratch
> https://www.google.com/+CraigWickesser
> https://twitter.com/mind_scratch
> https://twitter.com/craig_links
>


Turn off retention.ms

2016-03-01 Thread Roshan Punnoose
Hi,

Is it possible to set up a topic with only a size limit (retention.bytes)
and not a time limit (retention.ms)?

Roshan


Re: monitor rebalanace events

2016-03-01 Thread Christian Posta
This for the new consumer? or old high-level consumer?

For the old high-level consumer, rebalance happens on the consumer side for
the following events:

* Opening new streams
* ZK Session expiration
* When partition information changes or consumer information changes

I don't think there is a place with in the broker that broadcasts when a
rebalance happens. There are listeners in the Consumer you can set to get
that information, but as you pointed out in your question, this happens on
the consumer side. I guess you could set watches on the following paths and
expect a rebalance when those watches get triggered, but you'd miss out on
ZK session expirations etc.

* a ZK watch for /consumers//ids
* a ZK data watch for /brokers/topics/




On Tue, Mar 1, 2016 at 6:37 AM, craig w  wrote:

> Is there some way to monitor when a rebalance occurs other than at the
> consumer level?
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Is Apache Kafka ready for deployment in IPv6/IPv4 or IPv6-only networks?

2016-03-01 Thread Marcos Luis Ortiz Valmaseda
Regards to all the list.

I sent a question at Twitter yesterday asking this, and Neha Narkhede, from
Confluent said to me that the best place to ask this is here, so here are
my questions:
- Is Kafka ready to be deployed in dual stack networks where you could
create an Kafka´s architecture using IPv6 and IPv4 interfaces?

- If this is possible, Could you provide me any example of this?

I´m working in an article to support the idea to use Apache Kakfa for
Industrial Internet Cloud-based Analytics, but without a good support for
IPv6-based networks it could be a challenge, and we couldn´t use the last
features of Kafka 0.9 like Kafka Connect.

Thanks for your time


Re: Turn off retention.ms

2016-03-01 Thread Olson,Andrew
A very high retention time (e.g. 50 years in milliseconds) would effectively 
accomplish this.



On 3/1/16, 8:39 AM, "Roshan Punnoose"  wrote:

>Hi,
>
>Is it possible to set up a topic with only a size limit (retention.bytes)
>and not a time limit (retention.ms)?
>
>Roshan

CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.


Kafka Rest Proxy

2016-03-01 Thread Jan Omar
Hey guys, 

Is someone using the kafka rest proxy from confluent? 

We have an issue, that all messages for a certain topic end up in the same 
partition. Has anyone faced this issue before? We're not using a custom 
  partitioner class, so it's using the default partitioner. We're sending 
messages without a specific partition and without a key, like this: 
  

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data 
'{"records":[{"value":{"foo":"bar"}}]}' "http://x.x.x.x:8082/topics/testme"; 


and yet for some reason every message ends up in partition 11... 

It's a test topic with 30 partitions on 3 brokers and our rest (producer) 
config is very simple:

id=1
zookeeper.connect=zookeeper-four.acc:2181...etc

Any help would be appreciated.

Thanks!




Spark Streaming + Kafka 0.9

2016-03-01 Thread Adam Kunicki
Has anyone built the Spark Streaming kafka receiver for Kafka 0.9?
Asking since spark master branch is still using 0.8.2.1

-- 
Adam Kunicki
StreamSets | Field Engineer
mobile: 415.890.DATA (3282) | linkedin 


Re: Spark Streaming + Kafka 0.9

2016-03-01 Thread Marko Bonaći
These two issues track progress of Kafka consumer 0.9.
https://github.com/apache/spark/pull/10953
https://github.com/apache/spark/pull/11143

Marko Bonaći
Monitoring | Alerting | Anomaly Detection | Centralized Log Management
Solr & Elasticsearch Support
Sematext  | Contact


On Tue, Mar 1, 2016 at 5:26 PM, Adam Kunicki  wrote:

> Has anyone built the Spark Streaming kafka receiver for Kafka 0.9?
> Asking since spark master branch is still using 0.8.2.1
>
> --
> Adam Kunicki
> StreamSets | Field Engineer
> mobile: 415.890.DATA (3282) | linkedin 
>


Re: Need to understand the graph

2016-03-01 Thread Alexis Midon
"request queue time" is the time it takes for IO threads to pick up the
request. As you increase the load on your broker, it makes sense to see
higher queue time.
Here are more details on the request/response model in a Kafka broker
(0.8.2).

All your requests and responses are belong to RequestChannel (
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala
).

The center piece of all Kafka Request/Response handling is the
`kafka.network.RequestChannel`. RequestChannel is a container for all Kafka
Requests waiting to be handled, and the Kafka Responses ready to be sent
back to the clients.
Requests are queued in a single bounded queue. Up to `queued.max.requests`
requests will be stored. The response queues are not bounded.

## First, let's see how the Requests are created.
A SocketServer (created by the main class KafkaServer) starts up multiple
threads to handle network connections in a non-blocking fashion:
. a single Acceptor thread
. `network.threads` Processor threads

The Acceptor  thread handles OP_ACCEPT events and passes new socket channel
to the Processor threads (with a simple round-robin algorithm).

A Processor thread has 3 responsibilities:
1. register for monitoring the newly created socket channel passed by the
Acceptor thread
2. read data from the socket channels until a complete Kafka request is
deserialized. The Request is then handed off to the RequestChannel. The
hand-off might block if the request queue is full.
3. poll the Kafka Responses queued in the RequestChannel, and write the
data on the corresponding socket channel. Each Processor has its own queue
in the RequestChannel so that a response is processed by the same thread
which read the request from the connection.

## Second, let's see how the Responses are built.
During start up KafkaServer creates an instance of
KafkaRequestHandlerPool.  KafkaRequestHandlerPool is a thread pool of
`io.thread` threads of KafkaRequestHandler (named "kafka-request-handler-"
).
A KafkaRequestHandler has a fairly simple job: it polls Request from the
RequestChannel and passes them to KafkaApis (`KafkaApis.handle()`).
KafkaApis dispatches the request and eventually a Response is pushed to the
RequestChannel. The response will then be picked up by a Processor thread
as described earlier.


A diagram:
http://postimg.org/image/6gadny6px/

The different stages of the request/response handling are nicely measured
with the following metrics:
kafka.network.RequestMetrics.RequestQueueTimeMs
kafka.network.RequestMetrics.LocalTimeMs
kafka.network.RequestMetrics.RemoteTimeMs
kafka.network.RequestMetrics.ResponseQueueTimeMs
kafka.network.RequestMetrics.ResponseSendTimeMs
kafka.network.RequestMetrics.TotalTimeMs



On Mon, Feb 29, 2016 at 2:30 PM awa...@pch.com 
wrote:

> Hello,
>
> I have an issue with the timing of Kafka. As and when the time increases
> with load testing, we see the increase in "request queue time". What is the
> "request queue time" ?
>
> Some basic config we are using
>
> 2 Kafka Nodes(CPU - 8 CPU's [Thread(s) per core:2], Mem - 32 GB Ram)
> pkafkaapp01.pchoso.com
> pkafkaapp02.pchoso.com
>
> Tests duration: 13:05 - 14:05
> Messages published - 869156
>
> Load Average, CPU, memory is all under control so not sure what the issue
> is.
>
> Below are some SPM graphs showing the state of my system.
> Here's the 'Requests' graph:
>   https://apps.sematext.com/spm-reports/s/lCOJULIKuJ


Re: Need to understand the graph

2016-03-01 Thread Alexis Midon
Also the metrics kafka.network.RequestChannel.RequestQueueSize and
ResponseQueueSize
will give you the saturation of the network and IO threads.

On Tue, Mar 1, 2016 at 9:21 AM Alexis Midon  wrote:

> "request queue time" is the time it takes for IO threads to pick up the
> request. As you increase the load on your broker, it makes sense to see
> higher queue time.
> Here are more details on the request/response model in a Kafka broker
> (0.8.2).
>
> All your requests and responses are belong to RequestChannel (
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala
> ).
>
> The center piece of all Kafka Request/Response handling is the
> `kafka.network.RequestChannel`. RequestChannel is a container for all Kafka
> Requests waiting to be handled, and the Kafka Responses ready to be sent
> back to the clients.
> Requests are queued in a single bounded queue. Up to `queued.max.requests`
> requests will be stored. The response queues are not bounded.
>
> ## First, let's see how the Requests are created.
> A SocketServer (created by the main class KafkaServer) starts up multiple
> threads to handle network connections in a non-blocking fashion:
> . a single Acceptor thread
> . `network.threads` Processor threads
>
> The Acceptor  thread handles OP_ACCEPT events and passes new socket
> channel to the Processor threads (with a simple round-robin algorithm).
>
> A Processor thread has 3 responsibilities:
> 1. register for monitoring the newly created socket channel passed by the
> Acceptor thread
> 2. read data from the socket channels until a complete Kafka request is
> deserialized. The Request is then handed off to the RequestChannel. The
> hand-off might block if the request queue is full.
> 3. poll the Kafka Responses queued in the RequestChannel, and write the
> data on the corresponding socket channel. Each Processor has its own queue
> in the RequestChannel so that a response is processed by the same thread
> which read the request from the connection.
>
> ## Second, let's see how the Responses are built.
> During start up KafkaServer creates an instance of
> KafkaRequestHandlerPool.  KafkaRequestHandlerPool is a thread pool of
> `io.thread` threads of KafkaRequestHandler (named "kafka-request-handler-"
> ).
> A KafkaRequestHandler has a fairly simple job: it polls Request from the
> RequestChannel and passes them to KafkaApis (`KafkaApis.handle()`).
> KafkaApis dispatches the request and eventually a Response is pushed to the
> RequestChannel. The response will then be picked up by a Processor thread
> as described earlier.
>
>
> A diagram:
> http://postimg.org/image/6gadny6px/
>
> The different stages of the request/response handling are nicely measured
> with the following metrics:
> kafka.network.RequestMetrics.RequestQueueTimeMs
> kafka.network.RequestMetrics.LocalTimeMs
> kafka.network.RequestMetrics.RemoteTimeMs
> kafka.network.RequestMetrics.ResponseQueueTimeMs
> kafka.network.RequestMetrics.ResponseSendTimeMs
> kafka.network.RequestMetrics.TotalTimeMs
>
>
>
> On Mon, Feb 29, 2016 at 2:30 PM awa...@pch.com 
> wrote:
>
>> Hello,
>>
>> I have an issue with the timing of Kafka. As and when the time increases
>> with load testing, we see the increase in "request queue time". What is the
>> "request queue time" ?
>>
>> Some basic config we are using
>>
>> 2 Kafka Nodes(CPU - 8 CPU's [Thread(s) per core:2], Mem - 32 GB Ram)
>> pkafkaapp01.pchoso.com
>> pkafkaapp02.pchoso.com
>>
>> Tests duration: 13:05 - 14:05
>> Messages published - 869156
>>
>> Load Average, CPU, memory is all under control so not sure what the issue
>> is.
>>
>> Below are some SPM graphs showing the state of my system.
>> Here's the 'Requests' graph:
>>   https://apps.sematext.com/spm-reports/s/lCOJULIKuJ
>
>


Large Size Error even when the message is small

2016-03-01 Thread Fang Wong
[2016-02-26 20:33:42,997] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, [2016-02-26
20:33:42,997] INFO Closing socket connection to /10.224.146.58 due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,025] INFO Closing socket connection to /10.224.146.62
due to invalid request: Request of length 1937006964 is not valid, it is
larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,047] INFO Closing socket connection to /10.224.146.63
due to invalid request: Request of length 1937006964 is not valid, it is
larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,049] INFO Closing socket connection to /10.224.146.61
due to invalid request: Request of length 1937006964 is not valid, it is
larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,055] INFO Closing socket connection to /10.224.146.60
due to invalid request: Request of length 1937006964 is not valid, it is
larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,112] INFO Closing socket connection to /10.224.146.59
due to invalid request: Request of length 1937006964 is not valid, it is
larger than the maximum size of 104857600 bytes. (kafka.network.Processor)of
104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)


Fwd: Is Apache Kafka ready for deployment in IPv6/IPv4 or IPv6-only networks?

2016-03-01 Thread Marcos Luis Ortiz Valmaseda
Regards to all the list.

I sent a question at Twitter yesterday asking this, and Neha Narkhede, from
Confluent said to me that the best place to ask this is here, so here are
my questions:
- Is Kafka ready to be deployed in dual stack networks where you could
create an Kafka´s architecture using IPv6 and IPv4 interfaces?

- If this is possible, Could you provide me any example of this?

I´m working in an article to support the idea to use Apache Kakfa for
Industrial Internet Cloud-based Analytics, but without a good support for
IPv6-based networks it could be a challenge, and we couldn´t use the last
features of Kafka 0.9 like Kafka Connect.

Thanks for your time


Kafka 0.9 Spout

2016-03-01 Thread Ankit Jain
Hi All,

We would need to use the SSL feature of Kafka and that would require the
Kafka Spout upgrade from version 0.8.x to 0.9.x as the SSL is only
supported by new consumer API.

Please share any URL for the same.

-- 
Thanks,
Ankit


Re: Kafka Rest Proxy

2016-03-01 Thread James Cheng
Jan,

I don't use the rest proxy, but Confluent has a mailing list where you can 
probably get more info:

Here's the direct link: 
https://groups.google.com/forum/#!forum/confluent-platform

And it is linked off of here: http://www.confluent.io/developer#documentation

-James

> On Mar 1, 2016, at 3:25 AM, Jan Omar  wrote:
>
> Hey guys,
>
> Is someone using the kafka rest proxy from confluent?
>
> We have an issue, that all messages for a certain topic end up in the same 
> partition. Has anyone faced this issue before? We're not using a custom   
> partitioner class, so it's using the default partitioner. We're sending 
> messages without a specific partition and without a key, like this:
>
> curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data 
> '{"records":[{"value":{"foo":"bar"}}]}' "http://x.x.x.x:8082/topics/testme";
>
> and yet for some reason every message ends up in partition 11...
>
> It's a test topic with 30 partitions on 3 brokers and our rest (producer) 
> config is very simple:
>
> id=1
> zookeeper.connect=zookeeper-four.acc:2181...etc
>
> Any help would be appreciated.
>
> Thanks!
>
>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Need to understand the graph

2016-03-01 Thread Wagle, Arun
Hi Alexis,

Thanks for the detailed response.

A few questions

 1.  What is the response that is sent back by Kafka. Does it block the client 
till the message is successfully published/replicated to the servers ? A client 
here is a REST service which just pushes data to a kafka topic
 2.  Is RequestQueueTime the time required before it is picked up for sending 
to the processor ?
 3.  What is the ResponseQueueTime and ResponseSendTime. Is this the time after 
which the data is finally available for consumers ?
 4.  What is RemoteTimeMs ? As per doc it says "Time the request waits for the 
follower”. Whats a follower in this case.

Thanks in advance

Regards,
Arun Wagle

From: Alexis Midon mailto:alexis.mi...@airbnb.com>>
Date: Tuesday, March 1, 2016 at 12:30 PM
To: "users@kafka.apache.org" 
mailto:users@kafka.apache.org>>, Arun Wagle 
mailto:awa...@pch.com>>
Subject: Re: Need to understand the graph

Also the metrics kafka.network.RequestChannel.RequestQueueSize and 
ResponseQueueSize will give you the saturation of the network and IO threads.

On Tue, Mar 1, 2016 at 9:21 AM Alexis Midon 
mailto:alexis.mi...@airbnb.com>> wrote:
"request queue time" is the time it takes for IO threads to pick up the 
request. As you increase the load on your broker, it makes sense to see higher 
queue time.
Here are more details on the request/response model in a Kafka broker (0.8.2).

All your requests and responses are belong to RequestChannel 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala).

The center piece of all Kafka Request/Response handling is the 
`kafka.network.RequestChannel`. RequestChannel is a container for all Kafka 
Requests waiting to be handled, and the Kafka Responses ready to be sent back 
to the clients.
Requests are queued in a single bounded queue. Up to `queued.max.requests` 
requests will be stored. The response queues are not bounded.

## First, let's see how the Requests are created.
A SocketServer (created by the main class KafkaServer) starts up multiple 
threads to handle network connections in a non-blocking fashion:
. a single Acceptor thread
. `network.threads` Processor threads

The Acceptor  thread handles OP_ACCEPT events and passes new socket channel to 
the Processor threads (with a simple round-robin algorithm).

A Processor thread has 3 responsibilities:
1. register for monitoring the newly created socket channel passed by the 
Acceptor thread
2. read data from the socket channels until a complete Kafka request is 
deserialized. The Request is then handed off to the RequestChannel. The 
hand-off might block if the request queue is full.
3. poll the Kafka Responses queued in the RequestChannel, and write the data on 
the corresponding socket channel. Each Processor has its own queue in the 
RequestChannel so that a response is processed by the same thread which read 
the request from the connection.

## Second, let's see how the Responses are built.
During start up KafkaServer creates an instance of KafkaRequestHandlerPool.  
KafkaRequestHandlerPool is a thread pool of `io.thread` threads of 
KafkaRequestHandler (named "kafka-request-handler-" ).
A KafkaRequestHandler has a fairly simple job: it polls Request from the 
RequestChannel and passes them to KafkaApis (`KafkaApis.handle()`). KafkaApis 
dispatches the request and eventually a Response is pushed to the 
RequestChannel. The response will then be picked up by a Processor thread as 
described earlier.


A diagram:
http://postimg.org/image/6gadny6px/

The different stages of the request/response handling are nicely measured with 
the following metrics:
kafka.network.RequestMetrics.RequestQueueTimeMs
kafka.network.RequestMetrics.LocalTimeMs
kafka.network.RequestMetrics.RemoteTimeMs
kafka.network.RequestMetrics.ResponseQueueTimeMs
kafka.network.RequestMetrics.ResponseSendTimeMs
kafka.network.RequestMetrics.TotalTimeMs



On Mon, Feb 29, 2016 at 2:30 PM awa...@pch.com 
mailto:st.comm.c...@gmail.com>> wrote:
Hello,

I have an issue with the timing of Kafka. As and when the time increases with 
load testing, we see the increase in "request queue time". What is the "request 
queue time" ?

Some basic config we are using

2 Kafka Nodes(CPU - 8 CPU's [Thread(s) per core:2], Mem - 32 GB Ram)
pkafkaapp01.pchoso.com
pkafkaapp02.pchoso.com

Tests duration: 13:05 - 14:05
Messages published - 869156

Load Average, CPU, memory is all under control so not sure what the issue is.

Below are some SPM graphs showing the state of my system.
Here's the 'Requests' graph:
  https://apps.sematext.com/spm-reports/s/lCOJULIKuJ


Re: Kafka 0.9 Spout

2016-03-01 Thread Harsha
Ankit,
We've one in the making and getting to ready to be merged in .
You can follow the PR here
https://github.com/apache/storm/pull/1131 . Any storm related
questions please send an email at these lists
http://storm.apache.org/getting-help.html .

Thanks,
Harsha

On Tue, Mar 1, 2016, at 12:56 PM, Ankit Jain wrote:
> Hi All,
> 
> We would need to use the SSL feature of Kafka and that would require the
> Kafka Spout upgrade from version 0.8.x to 0.9.x as the SSL is only
> supported by new consumer API.
> 
> Please share any URL for the same.
> 
> -- 
> Thanks,
> Ankit


Unavailable partitions (Leader: -1 and ISR is empty) and we can't figure out how to get them back online

2016-03-01 Thread James Cheng
Hi,

We have 44 partitions in our cluster that are unavailable. kafka-topics.sh is 
reporting them with Leader: -1, and with no brokers in the ISR. Zookeeper says 
that broker 5 should be the partition leader for this topic partition. These 
are topics with replication-factor 1. Most of the topics have little to no data 
in them, so they are low-traffic topics. We currently cannot produce to them. 
And I can't find anything in the logs that seems to explain why the broker is 
not taking the partitions back online. Can anyone help?

Relevant log lines are attached below.

Questions:
* What does Leader: -1 mean?
* Why doesn't the broker take the partition back online?
* Is there more debugging/logging that I can turn on?
* unclean.leader.election.enable=false right now, although during a previous 
boot of the broker, we set it to true to get some partitions back online. These 
ones never came back online.

Thanks,
-James

in zookeeper
---
$ get /brokers/topics/the.topic.name
{"version":1,"partitions":{"0":[5]}}

server.log

[2016-03-01 06:29:13,869] WARN Found an corrupted index file, 
/TivoData/kafka/the.topic.name-0/.index, deleting and 
rebuilding index... (kafka.log.Log)
[2016-03-01 06:29:13,870] INFO Recovering unflushed segment 0 in log 
the.topic.name-0. (kafka.log.Log)
[2016-03-01 06:29:13,870] INFO Completed load of log the.topic.name-0 with log 
end offset 0 (kafka.log.Log)


state-change.log
-
state-change.log.2016-03-01-06:[2016-03-01 06:34:20,498] TRACE Broker 5 cached 
leader info 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 for partition [the.topic.name,0] in response to UpdateMetadata request sent by 
controller 2 epoch 20 with correlation id 0 (state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:20,695] TRACE Broker 5 
received LeaderAndIsr request 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 correlation id 1 from controller 2 epoch 20 for partition [the.topic.name,0] 
(state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:20,957] TRACE Broker 5 
handling LeaderAndIsr request correlationId 1 from controller 2 epoch 20 
starting the become-follower transition for partition [the.topic.name,0] 
(state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:23,531] ERROR Broker 5 
received LeaderAndIsrRequest with correlation id 1 from controller 2 epoch 20 
for partition [the.topic.name,0] but cannot become follower since the new 
leader -1 is unavailable. (state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:30,075] TRACE Broker 5 
completed LeaderAndIsr request correlationId 1 from controller 2 epoch 20 for 
the become-follower transition for partition [the.topic.name,0] 
(state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:30,458] TRACE Broker 5 cached 
leader info 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 for partition [the.topic.name,0] in response to UpdateMetadata request sent by 
controller 2 epoch 20 with correlation id 2 (state.change.logger)

state-change.log on the controller:
[2016-03-01 06:34:15,077] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 2 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,145] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 5 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,200] TRACE Broker 2 cached leader info 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 for partition [the.topic.name,0] in response to UpdateMetadata request sent by 
controller 2 epoch 20 with correlation id 9144 (state.change.logger)
[2016-03-01 06:34:15,276] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 4 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,418] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 1 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,484] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 3 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,585] TRACE Controller 2 epoch 20 changed state of replica 
5 for partition [the.topic.name,0] from OfflineReplica to OnlineReplica 
(state.change.logger)
[2016-03-01 06:34:15,606] TRACE Controller 2 epoch 20 sending become-follower 
LeaderAndIsr request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to 
broker 5 for partition [the.topic.name,0] (state.change.logger)
[2016-03-01 06

About the number of partitions

2016-03-01 Thread BYEONG-GI KIM
Hello.

I have questions about how many partitions are optimal while using kafka.
As far as I know, even if there are multiple consumers that belong to a
consumer group, say *group_A*, only one consumer can receive a kafka
message produced by a producer if there is a partition. So, as a result,
multiple partitions are required in order to distribute the message to all
the consumers in group_A if I want the consumers to get the message.

Is it right?

I'm considering developing several kafka consumer applications, e.g.,
message saver, message analyzer, etc., so a message from a producer must be
consumed by those kinds of consumers.

Any advice and help would be really appreciated.

Thanks in advance!

Best regards

Kim


due to invalid request: Request of length

2016-03-01 Thread Fang Wong
[2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)
[2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)


larger than the maximum size

2016-03-01 Thread Fang Wong
Hi,

Anybody know how to fix the following error? I didn't send any large size
message, seems the system was sending a large message itself:

[2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to
invalid request: Request of length 1937006964 is not valid, it is larger
than the maximum size of 104857600 bytes. (kafka.network.Processor)

Thanks,
Fang


Re: due to invalid request: Request of length

2016-03-01 Thread Anirudh P
Hello,

What is the size of the message you are sending. If it is 1937006964 then
you should set message.max.bytes and replica.fetch.max.bytes to values
larger than the size of the message you are trying to send.

If it isnt, then its most likely an encoding issue on your side. The kafka
message is size delimited.

Make sure you are not sending a partial request(or a request with an
invalid size). The server might wrongly calculate the size of the next
request and you could see such an error.

Thank you,
Anirudh

On Wed, Mar 2, 2016 at 6:18 AM, Fang Wong  wrote:

> [2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
>


About bootstrap.servers

2016-03-01 Thread 田守枝
Hi All:
I want to known why use "bootstrap.servers"  to establish the initial 
connection to the Kafka cluster when I initialize a Producer or Consumer?
Why not let producer or consumer connect to the zookeeper to get the 
broker's ip and port?  I think this is one way to decouple the client and 
brokers!


  


Tian Shouzhi


2016/3/2




kafka snappy compression

2016-03-01 Thread wanzao
hi,all:
why i send a messageset of  872 bytes snappy-compressed data to 
kafka,and the messageSet stored in kafka was of 3940 payloadsize? 
I used sarama AsyncProducer using snappy compression to send 
messageSet(5 messages per messageSet),I found that the origin data were 3940 
bytes, and the compressed data were 832 bytes.It indicated the snappy 
compression has worked..While I checked kafka log data??it showed "offset: 
1047825 position: 484851876 isvalid: true payloadsize: 3607 magic: 0 
compresscodec: SnappyCompressionCodec crc: 2412106187" and it seemed that the 
kafka broker had decode data and encode data again..
 How can I store compressed data in kafka which should be 832 bytes per 
messageSet in this case ? My kafka version is kafka_2.9.1-0.8.2.1.


   3ks!

Re: Kafka node liveness check

2016-03-01 Thread tao xiao
Thanks Elias for sharing

On Mon, 29 Feb 2016 at 22:23 Elias Abacioglu <
elias.abacio...@deltaprojects.com> wrote:

> Crap, forgot to remove my signature.. I guess my e-mail will now get
> spammed forever :(
>
>
>
>
>
> On Mon, Feb 29, 2016 at 3:14 PM, Elias Abacioglu <
> elias.abacio...@deltaprojects.com> wrote:
>
> > We've setup jmxtrans and use it to check these two values.
> > UncleanLeaderElectionsPerSec
> > UnderReplicatedPartitions
> >
> > Here is our shinken/nagios configuration:
> >
> > define command {
> >   command_name check_kafka_underreplicated
> >   command_line $USER1$/check_jmx -U
> > service:jmx:rmi:///jndi/rmi://$HOSTADDRESS$:/jmxrmi -O
> > "kafka.server":type="ReplicaManager",name="UnderReplicatedPartitions" -A
> > Value -w $ARG1$ -c $ARG2$
> > }
> >
> > define command {
> >   command_name check_kafka_uncleanleader
> >   command_line $USER1$/check_jmx -U
> > service:jmx:rmi:///jndi/rmi://$HOSTADDRESS$:/jmxrmi -O
> >
> "kafka.controller":type="ControllerStats",name="UncleanLeaderElectionsPerSec"
> > -A Count -w $ARG1$ -c $ARG2$
> > }
> >
> > define service {
> >   hostgroup_name KafkaBroker
> >   use generic-service
> >   service_description Kafka Unclean Leader Elections per sec
> >   check_command check_kafka_uncleanleader!1!10
> >   check_interval 15
> >   retry_interval 5
> > }
> > define service {
> >   hostgroup_name KafkaBroker
> >   use generic-service
> >   service_description Kafka Under Replicated Partitions
> >   check_command check_kafka_underreplicated!1!10
> >   check_interval 15
> >   retry_interval 5
> > }
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Feb 29, 2016 at 12:41 PM, tao xiao  wrote:
> >
> >> Thanks Jens. What I want to achieve is to check every broker within a
> >> cluster functions probably. The way you suggest can identify the
> liveness
> >> of a cluster but it doesn't necessarily mean every broker in the cluster
> >> is
> >> alive. In order to achieve that I can either create a topic with number
> of
> >> partitions being same as the number of brokers and min.insync.isr=number
> >> of
> >> brokers or one topic per broker and then send ping message to broker.
> But
> >> this approach is definitely not scalable as we expand the cluster.
> >> Therefore I am looking for a way to achieve this.
> >>
> >> On Mon, 29 Feb 2016 at 16:54 Jens Rantil  wrote:
> >>
> >> > Hi,
> >> >
> >> > I assume you first want to ask yourself what liveness you would like
> to
> >> > check for. I guess the most realistic check is to put a "ping" message
> >> on
> >> > the broken and make sure that you can consume it.
> >> >
> >> > Cheers,
> >> > Jens
> >> >
> >> > On Fri, Feb 26, 2016 at 12:38 PM, tao xiao 
> >> wrote:
> >> >
> >> > > Hi team,
> >> > >
> >> > > What is the best way to verify a specific Kafka node functions
> >> properly?
> >> > > Telnet the port is one of the approach but I don't think it tells me
> >> > > whether or not the broker can still receive/send traffics. I am
> >> thinking
> >> > to
> >> > > ask for metadata from the broker using consumer.partitionsFor. If it
> >> can
> >> > > return partitioninfo it is considered live. Is this a good approach?
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > Jens Rantil
> >> > Backend engineer
> >> > Tink AB
> >> >
> >> > Email: jens.ran...@tink.se
> >> > Phone: +46 708 84 18 32
> >> > Web: www.tink.se
> >> >
> >> > Facebook  Linkedin
> >> > <
> >> >
> >>
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >> > >
> >> >  Twitter 
> >> >
> >>
> >
> >
>