Kafka 0.8: delete topic with data

2014-02-27 Thread Yury Ruchin
Hi,

I'm using Kafka 0.8 which does not have a command to delete topic. However,
I need the functionality and I'm trying to adopt this approach:
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala.
I see it simply deletes the topic node from ZK. My question is what will
happen with the topic data after removing the node? Will Kafka brokers be
able to detect that topic does not exist anymore and purge its data?

Thanks,
Yury


Re: Reg Partition and Replica?

2014-02-27 Thread David Morales de Frías
Maybe these pictures can help you:

https://kafka.apache.org/images/log_anatomy.png
http://www.michael-noll.com/blog/uploads/kafka-topics-partitions-replicas.png
http://www.michael-noll.com/blog/uploads/kafka-cluster-overview.png

And of course, this post might help too:

http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/





2014-02-27 7:17 GMT+01:00 Balasubramanian Jayaraman (Contingent) <
balasubramanian.jayara...@autodesk.com>:

> Hi,
>
> I have a doubt regarding to Partition and replica. What is the difference
> between them?
> I created a topic "test1" with 5 partition and 3 replicas. I am sending a
> message to the topic "test1". I see that different partitions are present
> in the different Kafka brokers.
> Some questions on this topic
>
> 1.  Will that message be sent to all the partitions?
>
> 2.  When it will be replicated to all the partitions?
>
> 3.  When it will be replicated to all the replicas?
>
> Thanks
> Bala
>


Re: Kafka-0.8 Log4j Appender

2014-02-27 Thread Neha Narkhede
You can create a JIRA here - https://issues.apache.org/jira/browse/KAFKA.




On Wed, Feb 26, 2014 at 9:33 PM, 김동경  wrote:

> Actually, I am quite newbie to this.
> What do you exactly want me to do?
> You want me to raise an issue for this?
> Then which JIRA can I access and what I should do?
>
>
> 2014-02-26 20:48 GMT+09:00 Neha Narkhede :
>
> > I think this is a bug with the KafkaLog4jAppender that is triggered when
> > the message send logs an error that in turn tries to resend the message
> and
> > gets into this infinite loop. Could you file a JIRA?
> >
> >
> > On Tue, Feb 25, 2014 at 9:51 PM, 김동경  wrote:
> >
> > > Dear all.
> > >
> > > Are there anyone who tried running Kafka-0.8 Log4j Appender?
> > > I want to send my application log into Kafka via Log4j Appender.
> > >
> > >
> > > Here is my log4j.properties.
> > > I couldn`t find any proper encoder, so I just configure it to use
> default
> > > encoder.
> > > (e.g I commented the line.)
> > >
> > >
> > >
> >
> --
> > > log4j.rootLogger=INFO, stdout, KAFKA
> > >
> > > log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> > > log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> > > log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
> > >
> > >
> > > log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
> > > log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
> > > log4j.appender.KAFKA.layout.ConversionPattern=%-5p: %c - %m%n
> > > log4j.appender.KAFKA.BrokerList=hnode01:9092
> > > log4j.appender.KAFKA.Topic=DKTestEvent
> > >
> > > #log4j.appender.KAFKA.SerializerClass=kafka.log4j.AppenderStringEncoder
> > >
> > >
> >
> --
> > >
> > >
> > > And this is my sample application.
> > >
> > >
> >
> --
> > > import org.apache.log4j.Logger;
> > > import org.apache.log4j.BasicConfigurator;
> > > import org.apache.log4j.PropertyConfigurator;
> > >
> > > public class HelloWorld {
> > >
> > > static Logger logger = Logger.getLogger(HelloWorld.class.getName());
> > >
> > > public static void main(String[] args) {
> > > PropertyConfigurator.configure(args[0]);
> > >
> > > logger.info("Entering application.");
> > > logger.debug("Debugging!.");
> > > logger.info("Exiting application.");
> > > }
> > > }
> > >
> > >
> >
> --
> > >
> > > Since my project is maven project, I attached pom.xml also.
> > >
> > >
> > >
> >
> --
> > > http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
> > > http://www.w3.org/2001/XMLSchema-instance";
> > > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> > > http://maven.apache.org/xsd/maven-4.0.0.xsd";>
> > > 4.0.0
> > > com.my.app
> > > log4-appender
> > > 0.0.1-SNAPSHOT
> > >
> > > 
> > > 
> > > org.apache.kafka
> > > kafka_2.8.2
> > > 0.8.0
> > > 
> > >
> > > 
> > > log4j
> > > log4j
> > > 1.2.17
> > > 
> > > 
> > >
> > > 
> > >
> > >
> >
> --
> > >
> > >
> > > And I am getting these error:
> > >
> > >
> >
> ---
> > > INFO [main] (Logging.scala:67) - Verifying properties
> > >  INFO [main] (Logging.scala:67) - Property metadata.broker.list is
> > > overridden to hnode01:9092
> > >  INFO [main] (Logging.scala:67) - Property serializer.class is
> overridden
> > > to kafka.serializer.StringEncoder
> > >  INFO [main] (HelloWorld.java:14) - Entering application.
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:hnode01,port:9092 with correlation id 0 for 1 topic(s)
> > > Set(DKTestEvent)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:hnode01,port:9092 with correlation id 1 for 1 topic(s)
> > > Set(DKTestEvent)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:hnode01,port:9092 with correlation id 2 for 1 topic(s)
> > > Set(DKTestEvent)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:hnode01,port:9092 with correlation id 3 for 1 topic(s)
> > > Set(DKTestEvent)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:hnode01,port:9092 with correlation id 4 for 1 topic(s)
> > > Set(DKTestEvent)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:hnode01,port:9092 with correlation id 5 for 1 topic(s)
> > > Set(DKTestEvent)
> > > .
> > > .
> > > .
> > > INFO [main] (H

Re: Reg Partition and Replica?

2014-02-27 Thread Martin Kleppmann
Hi Bala,

Partitions are what give Kafka parallelism and allow it to scale. Every message 
exists in exactly one partition.

Replicas are exact copies of partitions on different machines. They allow Kafka 
to be reliable and not lose messages if a machine dies.

So the answers are:

1. No, a message is sent to one partition.

2. Each partition is replicated separately, so it doesn't make sense to 
replicate from one partition to another.

3. Depends whether any parts of the cluster are currently in a failed state, 
see http://kafka.apache.org/documentation.html#replication


Martin

On 27 Feb 2014, at 06:17, Balasubramanian Jayaraman (Contingent) 
 wrote:

> Hi,
> 
> I have a doubt regarding to Partition and replica. What is the difference 
> between them?
> I created a topic "test1" with 5 partition and 3 replicas. I am sending a 
> message to the topic "test1". I see that different partitions are present in 
> the different Kafka brokers.
> Some questions on this topic
> 
> 1.  Will that message be sent to all the partitions?
> 
> 2.  When it will be replicated to all the partitions?
> 
> 3.  When it will be replicated to all the replicas?
> 
> Thanks
> Bala



Re: Problems consuming snappy compressed messages via SimpleConsumer

2014-02-27 Thread Neha Narkhede
I was actually referring to kafka-simple-consumer-shell.sh. Please make
sure that you pass the same fetch size that you configured your high level
consumer with, to the kafka-simple-consumer-shell with the --fetchsize
option.

Thanks,
Neha


On Wed, Feb 26, 2014 at 6:26 PM, Dan Hoffman  wrote:

> The kafka-console-consumer is fine.
>
> On Wednesday, February 26, 2014, Neha Narkhede 
> wrote:
>
> > Actually I meant the simple consumer shell that ships with kafka in the
> bin
> > directory.
> >
> >
> > On Wed, Feb 26, 2014 at 6:17 PM, Dan Hoffman 
> wrote:
> >
> > > I haven't tried that yet. But since the high level consumer can consume
> > it,
> > > should it matter who published it?
> > >
> > > On Wednesday, February 26, 2014, Neha Narkhede <
> neha.narkh...@gmail.com>
> > > wrote:
> > >
> > > > Do you see the same issue if you send snappy data using the console
> > > > producer instead of librdkafka?
> > > >
> > > >
> > > > On Wed, Feb 26, 2014 at 5:58 PM, Dan Hoffman  > > >
> > > > wrote:
> > > >
> > > > > Publisher (using librdkafka C api) has sent both gzip and snappy
> > > > compressed
> > > > > messages.  I find that the java Simple Consumer (
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example#
> > > > > )
> > > > > is unable to read the snappy ones, while the High Level one is.
> Is
> > > this
> > > > > expected?  Is there something you have to do in order to handle the
> > > > snappy
> > > > > messages?   There is no error messages provided, it simply acts as
> if
> > > > there
> > > > > are no further messages.
> > > > >
> > > >
> > >
> >
>


Re: Kafka 0.8: delete topic with data

2014-02-27 Thread Neha Narkhede
No, delete topic support doesn't exist in 0.8 and the inclusion of
DeleteTopicCommand was overlooked when we cut the release. So using that
command can cause unexpected issues in the cluster and we don't recommend
you use it. Delete topic is available in beta in the upcoming 0.8.1 release
but I think it will be stable only in 0.8.2.

Thanks,
Neha


On Thu, Feb 27, 2014 at 1:16 AM, Yury Ruchin  wrote:

> Hi,
>
> I'm using Kafka 0.8 which does not have a command to delete topic. However,
> I need the functionality and I'm trying to adopt this approach:
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
> .
> I see it simply deletes the topic node from ZK. My question is what will
> happen with the topic data after removing the node? Will Kafka brokers be
> able to detect that topic does not exist anymore and purge its data?
>
> Thanks,
> Yury
>


Re: New Consumer API discussion

2014-02-27 Thread Neha Narkhede
Rob,

The use of the callbacks is explained in the javadoc here -
http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html

Let me know if it makes sense. The hope is to improve the javadoc so that
it is self explanatory.

Thanks,
Neha


On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers
wrote:

> Neha, what does the use of the RebalanceBeginCallback and
> RebalanceEndCallback look like?
>
> thanks,
> Rob
>
> On Feb 25, 2014, at 3:51 PM, Neha Narkhede 
> wrote:
>
> > How do you know n? The whole point is that you need to be able to fetch
> the
> > end offset. You can't a priori decide you will load 1m messages without
> > knowing what is there.
> >
> > Hmm. I think what you are pointing out is that in the new consumer API,
> we
> > don't have a way to issue the equivalent of the existing
> getOffsetsBefore()
> > API. Agree that is a flaw that we should fix.
> >
> > Will update the docs/wiki with a few use cases that I've collected so far
> > and see if the API covers those.
> >
> > I would prefer PartitionsAssigned and PartitionsRevoked as that seems
> > clearer to me
> >
> > Well the RebalanceBeginCallback interface will have
> onPartitionsAssigned()
> > as the callback. Similarly, the RebalanceEndCallback interface will have
> > onPartitionsRevoked() as the callback. Makes sense?
> >
> > Thanks,
> > Neha
> >
> >
> > On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps  wrote:
> >
> >> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that seems
> >> clearer to me.
> >>
> >> -Jay
> >>
> >>
> >> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <
> neha.narkh...@gmail.com
> >>> wrote:
> >>
> >>> Thanks for the reviews so far! There are a few outstanding questions -
> >>>
> >>> 1.  It will be good to make the rebalance callbacks forward compatible
> >> with
> >>> Java 8 capabilities. We can change it to PartitionsAssignedCallback
> >>> and PartitionsRevokedCallback or RebalanceBeginCallback and
> >>> RebalanceEndCallback?
> >>>
> >>> If there are no objections, I will change it to RebalanceBeginCallback
> >> and
> >>> RebalanceEndCallback.
> >>>
> >>> 2.  The return type for committed() is List.
> There
> >>> was a suggestion to change it to either be Map or
> >>> Map
> >>>
> >>> Do people have feedback on this suggestion?
> >>>
> >>>
> >>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <
> neha.narkh...@gmail.com
>  wrote:
> >>>
>  Robert,
> 
>  Are you saying it is possible to get events from the high-level
> >>> consumerregarding various state machine changes?  For instance, can we
> >> get a
>  notification when a rebalance starts and ends, when a partition is
>  assigned/unassigned, when an offset is committed on a partition, when
> a
>  leader changes and so on?  I call this OOB traffic, since they are not
> >>> the
>  core messages streaming, but side-band events, yet they are still
>  potentially useful to consumers.
> 
>  In the current proposal, you get notified when the state machine
> >> changes
>  i.e. before and after a rebalance is triggered. Look at
>  ConsumerRebalanceCallback<
> >>>
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> 
>  .Leader changes do not count as state machine changes for consumer
>  rebalance purposes.
> 
>  Thanks,
>  Neha
> 
> 
>  On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <
> >> neha.narkh...@gmail.com
>  wrote:
> 
> > Jay/Robert -
> >
> >
> > I think what Robert is saying is that we need to think through the
> >>> offset
> > API to enable "batch processing" of topic data. Think of a process
> >> that
> > periodically kicks off to compute a data summary or do a data load or
> > something like that. I think what we need to support this is an api
> to
> > fetch the last offset from the server for a partition. Something like
> >   long lastOffset(TopicPartition tp)
> > and for symmetry
> >   long firstOffset(TopicPartition tp)
> >
> > Likely this would have to be batched.
> >
> > A fixed range of data load can be done using the existing APIs as
> > follows. This assumes you know the endOffset which can be
> >> currentOffset
> >>> + n
> > (number of messages in the load)
> >
> > long startOffset = consumer.position(partition);
> > long endOffset = startOffset + n;
> > while(consumer.position(partition) <= endOffset) {
> > List messages = consumer.poll(timeout,
> > TimeUnit.MILLISECONDS);
> > process(messages, endOffset);  // processes messages
> >> until
> > endOffset
> > }
> >
> > Does that make sense?
> >
> >
> > On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <
> >> neha.narkh...@gmail.com
>  wrote:
> >
> >> Thanks for the review, Jun. Here are some comments -
> >>
> 

Re: Unable to consume Snappy compressed messages with Simple Consumer

2014-02-27 Thread Dan Hoffman
that worked!   My publisher is sending 1MB payload and compressing it with
snappy.  I would have thought that with compression that it would have fit
into the 10 bytes default of the sample code.  I guess not!

Thanks.


On Thu, Feb 27, 2014 at 1:37 AM, Jun Rao  wrote:

> Try making the last parameter in the following call larger (say to
> 1,000,000).
>
> .addFetch(a_topic, a_partition, readOffset, 10)
>
> Thanks,
>
> Jun
>
>
> On Wed, Feb 26, 2014 at 9:32 PM, Dan Hoffman  wrote:
>
> > I'm not sure what you mean - could you be more specific in terms what I
> > might need to adjust in the simple consumer example code?
> >
> >
> > On Thu, Feb 27, 2014 at 12:24 AM, Jun Rao  wrote:
> >
> > > Are you using a fetch size larger than the whole compressed unit?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Feb 26, 2014 at 5:40 PM, Dan Hoffman 
> > wrote:
> > >
> > > > Publisher (using librdkafka C api) has sent both gzip and snappy
> > > compressed
> > > > messages.  I find that the java Simple Consumer (
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example#
> > > > )
> > > > is unable to read the snappy ones, while the High Level one is.   Is
> > this
> > > > expected?  Is there something you have to do in order to handle the
> > > snappy
> > > > messages?   There is no error messages provided, it simply acts as if
> > > there
> > > > are no further messages.
> > > >
> > >
> >
>


Re: New Consumer API discussion

2014-02-27 Thread Robert Withers
Neha,

I see how one might wish to implement onPartitionsAssigned and 
onPartitionsRevoked, but I don’t have a sense for how I might supply these 
implementations to a running consumer.  What would the setup code look like to 
start a high-level consumer with these provided implementations?

thanks,
Rob


On Feb 27, 2014, at 3:48 AM, Neha Narkhede  wrote:

> Rob,
> 
> The use of the callbacks is explained in the javadoc here -
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> 
> Let me know if it makes sense. The hope is to improve the javadoc so that
> it is self explanatory.
> 
> Thanks,
> Neha
> 
> 
> On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers
> wrote:
> 
>> Neha, what does the use of the RebalanceBeginCallback and
>> RebalanceEndCallback look like?
>> 
>> thanks,
>> Rob
>> 
>> On Feb 25, 2014, at 3:51 PM, Neha Narkhede 
>> wrote:
>> 
>>> How do you know n? The whole point is that you need to be able to fetch
>> the
>>> end offset. You can't a priori decide you will load 1m messages without
>>> knowing what is there.
>>> 
>>> Hmm. I think what you are pointing out is that in the new consumer API,
>> we
>>> don't have a way to issue the equivalent of the existing
>> getOffsetsBefore()
>>> API. Agree that is a flaw that we should fix.
>>> 
>>> Will update the docs/wiki with a few use cases that I've collected so far
>>> and see if the API covers those.
>>> 
>>> I would prefer PartitionsAssigned and PartitionsRevoked as that seems
>>> clearer to me
>>> 
>>> Well the RebalanceBeginCallback interface will have
>> onPartitionsAssigned()
>>> as the callback. Similarly, the RebalanceEndCallback interface will have
>>> onPartitionsRevoked() as the callback. Makes sense?
>>> 
>>> Thanks,
>>> Neha
>>> 
>>> 
>>> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps  wrote:
>>> 
 1. I would prefer PartitionsAssigned and PartitionsRevoked as that seems
 clearer to me.
 
 -Jay
 
 
 On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <
>> neha.narkh...@gmail.com
> wrote:
 
> Thanks for the reviews so far! There are a few outstanding questions -
> 
> 1.  It will be good to make the rebalance callbacks forward compatible
 with
> Java 8 capabilities. We can change it to PartitionsAssignedCallback
> and PartitionsRevokedCallback or RebalanceBeginCallback and
> RebalanceEndCallback?
> 
> If there are no objections, I will change it to RebalanceBeginCallback
 and
> RebalanceEndCallback.
> 
> 2.  The return type for committed() is List.
>> There
> was a suggestion to change it to either be Map or
> Map
> 
> Do people have feedback on this suggestion?
> 
> 
> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <
>> neha.narkh...@gmail.com
>> wrote:
> 
>> Robert,
>> 
>> Are you saying it is possible to get events from the high-level
> consumerregarding various state machine changes?  For instance, can we
 get a
>> notification when a rebalance starts and ends, when a partition is
>> assigned/unassigned, when an offset is committed on a partition, when
>> a
>> leader changes and so on?  I call this OOB traffic, since they are not
> the
>> core messages streaming, but side-band events, yet they are still
>> potentially useful to consumers.
>> 
>> In the current proposal, you get notified when the state machine
 changes
>> i.e. before and after a rebalance is triggered. Look at
>> ConsumerRebalanceCallback<
> 
 
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
>> 
>> .Leader changes do not count as state machine changes for consumer
>> rebalance purposes.
>> 
>> Thanks,
>> Neha
>> 
>> 
>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <
 neha.narkh...@gmail.com
>> wrote:
>> 
>>> Jay/Robert -
>>> 
>>> 
>>> I think what Robert is saying is that we need to think through the
> offset
>>> API to enable "batch processing" of topic data. Think of a process
 that
>>> periodically kicks off to compute a data summary or do a data load or
>>> something like that. I think what we need to support this is an api
>> to
>>> fetch the last offset from the server for a partition. Something like
>>>  long lastOffset(TopicPartition tp)
>>> and for symmetry
>>>  long firstOffset(TopicPartition tp)
>>> 
>>> Likely this would have to be batched.
>>> 
>>> A fixed range of data load can be done using the existing APIs as
>>> follows. This assumes you know the endOffset which can be
 currentOffset
> + n
>>> (number of messages in the load)
>>> 
>>> long startOffset = consumer.position(partition);
>>> long endOffset = startOffset + n;
>>> while(consumer.position(partition) <= endOf

can't connect to kafka from a java client.

2014-02-27 Thread rails
Goal: I am trying to send messages to kafka from a java cleint.
And it has been a pain..
Let me describe in brief.

1. I have installed kafka on a centos VM.
2. I ran the zookeeper that comes with it, the server , the producer and
the client with all of the default properties files.
I sent and received messages successfully.
3. I have a telnet connection from my computer to the zookeeper (2181) port
and the kafka server (9092) port at the VM.

Now, I want write java code to send messages to the topic.
I use the example from the quick start at the site:

Properties props = new Properties();
props.put("zk.connect", "http://XX.XX.XX.XX:2181";); // XX is the ip
props.put("serializer.class", "kafka.serializer.StringEncoder");
producer = new Producer(new ProducerConfig(props));

and it fails on the fourth line with the following excetptions :

kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries
and
rg.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to
zookeeper server within timeout: 400


Problems.
1. The exception.
The bad parameters:
In the kafka quick start example I see that it needs only zk.connect,
serializer.class  . when I run
it it yells it needs metadata.broker.list in the constructor of the
Producer. Does it? So I feel the ip and port of the kafka server.
and btw - is it zk.connect or zookeeper connect?ZkTimeoutException: Unable
to connect to zookeeper server within timeout: 400

3. maven bad versions

I go to the site, i see that the latest version is  kafka_2.8.0-0.8.0.
problem no 1 - I download it using intelij (I think it is maven
central) -
I get all related jars - only that the kafka jars are empty (contain
only 
manifest
).

problem no 2 - there are later versions then the one in the site. Are
they official.
any way, I downloaded org.apache.kafka:kafka_2.10:0.8.0
4. Using wireshark I see three SYN & ACK (triple handshake) and than and
then FYN & ACK
 right after. in the logs of the zookeeper I see the following
  [2014-02-27 01:43:42,127] WARN EndOfStreamException: Unable to read
additional data from client sessionid 0x0, likely client has closed socket

  (org.apache.zookeeper.server.NIOServerCnxn)
  Which means that I close the connection. Why?

Oh, The horror. The horror. 


Re: can't connect to kafka from a java client.

2014-02-27 Thread Jun Rao
zk.connect is needed in Kafka 0.7. Since you are using Kafka 0.8, you need
to set metadata.broker.list. ZK is no longer needed in the producer in 0.8.
You can follow http://kafka.apache.org/documentation.html for 0.8
documentation.

The latest release in maven is 0.8.0. We publish different kafka jars for
different versions of scala. That's why you see 2.8.0, 2.10.0, etc. Those
refer to the scala versions. There is a problem with kafka_2.8.0-0.8.0 in
maven (we will fix it in the next release). The rest of the scala versions
are fine.

Let us know if you see any other problems.

Thanks,

Jun


On Thu, Feb 27, 2014 at 3:32 AM, rails  wrote:

> Goal: I am trying to send messages to kafka from a java cleint.
> And it has been a pain..
> Let me describe in brief.
>
> 1. I have installed kafka on a centos VM.
> 2. I ran the zookeeper that comes with it, the server , the producer and
> the client with all of the default properties files.
> I sent and received messages successfully.
> 3. I have a telnet connection from my computer to the zookeeper (2181) port
> and the kafka server (9092) port at the VM.
>
> Now, I want write java code to send messages to the topic.
> I use the example from the quick start at the site:
>
> Properties props = new Properties();
> props.put("zk.connect", "http://XX.XX.XX.XX:2181";); // XX is the ip
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> producer = new Producer(new ProducerConfig(props));
>
> and it fails on the fourth line with the following excetptions :
>
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries
> and
> rg.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to
> zookeeper server within timeout: 400
>
>
> Problems.
> 1. The exception.
> The bad parameters:
> In the kafka quick start example I see that it needs only zk.connect,
> serializer.class  . when I run
> it it yells it needs metadata.broker.list in the constructor of the
> Producer. Does it? So I feel the ip and port of the kafka server.
> and btw - is it zk.connect or zookeeper connect?ZkTimeoutException: Unable
> to connect to zookeeper server within timeout: 400
>
> 3. maven bad versions
>
> I go to the site, i see that the latest version is  kafka_2.8.0-0.8.0.
> problem no 1 - I download it using intelij (I think it is maven
> central) -
> I get all related jars - only that the kafka jars are empty (contain
> only manifest<
> http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.kafka%22%20AND%20a%3A%22kafka_2.8.0%22
> >
> ).
>
> problem no 2 - there are later versions then the one in the site. Are
> they official.
> any way, I downloaded org.apache.kafka:kafka_2.10:0.8.0
> 4. Using wireshark I see three SYN & ACK (triple handshake) and than and
> then FYN & ACK
>  right after. in the logs of the zookeeper I see the following
>   [2014-02-27 01:43:42,127] WARN EndOfStreamException: Unable to read
> additional data from client sessionid 0x0, likely client has closed socket
>
>   (org.apache.zookeeper.server.NIOServerCnxn)
>   Which means that I close the connection. Why?
>
> Oh, The horror. The horror. <
> http://en.wikipedia.org/wiki/Heart_of_Darkness>
>


Does high level consumer support setting offset

2014-02-27 Thread Yonghui Zhao
Hi,

In kafka 0.8, does high level consumer support setting offset?

Our service reads kafka data but won't  flush the data immediately, so if
restarted the data in memory will be lost.  We want to reset kafka consumer
offset to an old offset.

If the consumer group has only 1 machine, we can record all partition
offset  for our topic in disk. Does the high level consumer support
 setting  offset in zk to the recorded offset?


Re: New Consumer API discussion

2014-02-27 Thread Neha Narkhede
Is 
thiswhat
you are looking for? Basically, I think from the overall feedback, it
looks like code snippets don't seem to work for overall understanding of
the APIs. I plan to update the javadoc with more complete examples that
have been discussed so far on this thread and generally on the mailing list.

Thanks,
Neha




On Thu, Feb 27, 2014 at 4:17 AM, Robert Withers
wrote:

> Neha,
>
> I see how one might wish to implement onPartitionsAssigned and
> onPartitionsRevoked, but I don't have a sense for how I might supply these
> implementations to a running consumer.  What would the setup code look like
> to start a high-level consumer with these provided implementations?
>
> thanks,
> Rob
>
>
> On Feb 27, 2014, at 3:48 AM, Neha Narkhede 
> wrote:
>
> > Rob,
> >
> > The use of the callbacks is explained in the javadoc here -
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> >
> > Let me know if it makes sense. The hope is to improve the javadoc so that
> > it is self explanatory.
> >
> > Thanks,
> > Neha
> >
> >
> > On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers
> > wrote:
> >
> >> Neha, what does the use of the RebalanceBeginCallback and
> >> RebalanceEndCallback look like?
> >>
> >> thanks,
> >> Rob
> >>
> >> On Feb 25, 2014, at 3:51 PM, Neha Narkhede 
> >> wrote:
> >>
> >>> How do you know n? The whole point is that you need to be able to fetch
> >> the
> >>> end offset. You can't a priori decide you will load 1m messages without
> >>> knowing what is there.
> >>>
> >>> Hmm. I think what you are pointing out is that in the new consumer API,
> >> we
> >>> don't have a way to issue the equivalent of the existing
> >> getOffsetsBefore()
> >>> API. Agree that is a flaw that we should fix.
> >>>
> >>> Will update the docs/wiki with a few use cases that I've collected so
> far
> >>> and see if the API covers those.
> >>>
> >>> I would prefer PartitionsAssigned and PartitionsRevoked as that seems
> >>> clearer to me
> >>>
> >>> Well the RebalanceBeginCallback interface will have
> >> onPartitionsAssigned()
> >>> as the callback. Similarly, the RebalanceEndCallback interface will
> have
> >>> onPartitionsRevoked() as the callback. Makes sense?
> >>>
> >>> Thanks,
> >>> Neha
> >>>
> >>>
> >>> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps 
> wrote:
> >>>
>  1. I would prefer PartitionsAssigned and PartitionsRevoked as that
> seems
>  clearer to me.
> 
>  -Jay
> 
> 
>  On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <
> >> neha.narkh...@gmail.com
> > wrote:
> 
> > Thanks for the reviews so far! There are a few outstanding questions
> -
> >
> > 1.  It will be good to make the rebalance callbacks forward
> compatible
>  with
> > Java 8 capabilities. We can change it to PartitionsAssignedCallback
> > and PartitionsRevokedCallback or RebalanceBeginCallback and
> > RebalanceEndCallback?
> >
> > If there are no objections, I will change it to
> RebalanceBeginCallback
>  and
> > RebalanceEndCallback.
> >
> > 2.  The return type for committed() is List.
> >> There
> > was a suggestion to change it to either be Map
> or
> > Map
> >
> > Do people have feedback on this suggestion?
> >
> >
> > On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <
> >> neha.narkh...@gmail.com
> >> wrote:
> >
> >> Robert,
> >>
> >> Are you saying it is possible to get events from the high-level
> > consumerregarding various state machine changes?  For instance, can
> we
>  get a
> >> notification when a rebalance starts and ends, when a partition is
> >> assigned/unassigned, when an offset is committed on a partition,
> when
> >> a
> >> leader changes and so on?  I call this OOB traffic, since they are
> not
> > the
> >> core messages streaming, but side-band events, yet they are still
> >> potentially useful to consumers.
> >>
> >> In the current proposal, you get notified when the state machine
>  changes
> >> i.e. before and after a rebalance is triggered. Look at
> >> ConsumerRebalanceCallback<
> >
> 
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> >>
> >> .Leader changes do not count as state machine changes for consumer
> >> rebalance purposes.
> >>
> >> Thanks,
> >> Neha
> >>
> >>
> >> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <
>  neha.narkh...@gmail.com
> >> wrote:
> >>
> >>> Jay/Robert -
> >>>
> >>>
> >>> I think what Robert is saying is that we need to think through the
> > offset
> >>> API to enable "batch processing" of topic data. Think of a process
>

Re: Does high level consumer support setting offset

2014-02-27 Thread Guozhang Wang
Hello Yonghui,

In 0.8 high level consumer does not support setting offsets directly.
However, you can use this tool to manually set the offset in ZK before you
re-start the consumer to change the starting offset:

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

Guozhang


On Thu, Feb 27, 2014 at 8:25 AM, Yonghui Zhao  wrote:

> Hi,
>
> In kafka 0.8, does high level consumer support setting offset?
>
> Our service reads kafka data but won't  flush the data immediately, so if
> restarted the data in memory will be lost.  We want to reset kafka consumer
> offset to an old offset.
>
> If the consumer group has only 1 machine, we can record all partition
> offset  for our topic in disk. Does the high level consumer support
>  setting  offset in zk to the recorded offset?
>



-- 
-- Guozhang


Re: Does high level consumer support setting offset

2014-02-27 Thread Neha Narkhede
Currently, the only way to rewind the high level consumer to a different
offset is to first shutdown all instances of the consumer, update zookeeper
with the desired offsets and then restart the consumer. We are addressing
this in the new consumer APIs for Kafka 0.9. If you can review the proposal
on
this thread, that will be appreciated.

Thanks,
Neha


On Thu, Feb 27, 2014 at 8:25 AM, Yonghui Zhao  wrote:

> Hi,
>
> In kafka 0.8, does high level consumer support setting offset?
>
> Our service reads kafka data but won't  flush the data immediately, so if
> restarted the data in memory will be lost.  We want to reset kafka consumer
> offset to an old offset.
>
> If the consumer group has only 1 machine, we can record all partition
> offset  for our topic in disk. Does the high level consumer support
>  setting  offset in zk to the recorded offset?
>


How does one measure performance of an existing Kafka cluster?

2014-02-27 Thread Dan Hoffman
Let's say I have a running cluster and users/apps are pounding away at it.
 Is there a quick and easy way to measure its current throughput?   I know
there are utilities for generating volume to get stats, but I'd like to
simply get some stats about its current operation.  Is there a good way to
do this?


Re: How does one measure performance of an existing Kafka cluster?

2014-02-27 Thread Jun Rao
You can take a look at the jmx in
http://kafka.apache.org/documentation.html#monitoring

Thanks,

Jun


On Thu, Feb 27, 2014 at 7:50 PM, Dan Hoffman  wrote:

> Let's say I have a running cluster and users/apps are pounding away at it.
>  Is there a quick and easy way to measure its current throughput?   I know
> there are utilities for generating volume to get stats, but I'd like to
> simply get some stats about its current operation.  Is there a good way to
> do this?
>


Re: How does one measure performance of an existing Kafka cluster?

2014-02-27 Thread Dan Hoffman
Are the numbers for the entire cluster or just the broker connected to?
 (I'm interested in the former)


On Thu, Feb 27, 2014 at 11:26 PM, Jun Rao  wrote:

> You can take a look at the jmx in
> http://kafka.apache.org/documentation.html#monitoring
>
> Thanks,
>
> Jun
>
>
> On Thu, Feb 27, 2014 at 7:50 PM, Dan Hoffman  wrote:
>
> > Let's say I have a running cluster and users/apps are pounding away at
> it.
> >  Is there a quick and easy way to measure its current throughput?   I
> know
> > there are utilities for generating volume to get stats, but I'd like to
> > simply get some stats about its current operation.  Is there a good way
> to
> > do this?
> >
>


Re: New Consumer API discussion

2014-02-27 Thread Robert Withers
Thank you, Neha, that makes it clear.  Really, the aspect of all this that we 
could really use is a way to do exactly once processing.  We are looking at 
more critical data.  What are the latest thoughts on how to achieve exactly 
once and how might that affect a consumer API?

Thanks,
Rob

On Feb 27, 2014, at 10:29 AM, Neha Narkhede  wrote:

> Is 
> thiswhat
> you are looking for? Basically, I think from the overall feedback, it
> looks like code snippets don't seem to work for overall understanding of
> the APIs. I plan to update the javadoc with more complete examples that
> have been discussed so far on this thread and generally on the mailing list.
> 
> Thanks,
> Neha
> 
> 
> 
> 
> On Thu, Feb 27, 2014 at 4:17 AM, Robert Withers
> wrote:
> 
>> Neha,
>> 
>> I see how one might wish to implement onPartitionsAssigned and
>> onPartitionsRevoked, but I don't have a sense for how I might supply these
>> implementations to a running consumer.  What would the setup code look like
>> to start a high-level consumer with these provided implementations?
>> 
>> thanks,
>> Rob
>> 
>> 
>> On Feb 27, 2014, at 3:48 AM, Neha Narkhede 
>> wrote:
>> 
>>> Rob,
>>> 
>>> The use of the callbacks is explained in the javadoc here -
>>> 
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
>>> 
>>> Let me know if it makes sense. The hope is to improve the javadoc so that
>>> it is self explanatory.
>>> 
>>> Thanks,
>>> Neha
>>> 
>>> 
>>> On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers
>>> wrote:
>>> 
 Neha, what does the use of the RebalanceBeginCallback and
 RebalanceEndCallback look like?
 
 thanks,
 Rob
 
 On Feb 25, 2014, at 3:51 PM, Neha Narkhede 
 wrote:
 
> How do you know n? The whole point is that you need to be able to fetch
 the
> end offset. You can't a priori decide you will load 1m messages without
> knowing what is there.
> 
> Hmm. I think what you are pointing out is that in the new consumer API,
 we
> don't have a way to issue the equivalent of the existing
 getOffsetsBefore()
> API. Agree that is a flaw that we should fix.
> 
> Will update the docs/wiki with a few use cases that I've collected so
>> far
> and see if the API covers those.
> 
> I would prefer PartitionsAssigned and PartitionsRevoked as that seems
> clearer to me
> 
> Well the RebalanceBeginCallback interface will have
 onPartitionsAssigned()
> as the callback. Similarly, the RebalanceEndCallback interface will
>> have
> onPartitionsRevoked() as the callback. Makes sense?
> 
> Thanks,
> Neha
> 
> 
> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps 
>> wrote:
> 
>> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that
>> seems
>> clearer to me.
>> 
>> -Jay
>> 
>> 
>> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <
 neha.narkh...@gmail.com
>>> wrote:
>> 
>>> Thanks for the reviews so far! There are a few outstanding questions
>> -
>>> 
>>> 1.  It will be good to make the rebalance callbacks forward
>> compatible
>> with
>>> Java 8 capabilities. We can change it to PartitionsAssignedCallback
>>> and PartitionsRevokedCallback or RebalanceBeginCallback and
>>> RebalanceEndCallback?
>>> 
>>> If there are no objections, I will change it to
>> RebalanceBeginCallback
>> and
>>> RebalanceEndCallback.
>>> 
>>> 2.  The return type for committed() is List.
 There
>>> was a suggestion to change it to either be Map
>> or
>>> Map
>>> 
>>> Do people have feedback on this suggestion?
>>> 
>>> 
>>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <
 neha.narkh...@gmail.com
 wrote:
>>> 
 Robert,
 
 Are you saying it is possible to get events from the high-level
>>> consumerregarding various state machine changes?  For instance, can
>> we
>> get a
 notification when a rebalance starts and ends, when a partition is
 assigned/unassigned, when an offset is committed on a partition,
>> when
 a
 leader changes and so on?  I call this OOB traffic, since they are
>> not
>>> the
 core messages streaming, but side-band events, yet they are still
 potentially useful to consumers.
 
 In the current proposal, you get notified when the state machine
>> changes
 i.e. before and after a rebalance is triggered. Look at
 ConsumerRebalanceCallback<
>>> 
>> 
 
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
 
 .Leader changes do not count as sta

Re: How does one measure performance of an existing Kafka cluster?

2014-02-27 Thread Neha Narkhede
The jmx beans will expose the metrics per broker. You would need some
utility to aggregate across all brokers in a cluster.

Thanks,
Neha


On Thu, Feb 27, 2014 at 8:31 PM, Dan Hoffman  wrote:

> Are the numbers for the entire cluster or just the broker connected to?
>  (I'm interested in the former)
>
>
> On Thu, Feb 27, 2014 at 11:26 PM, Jun Rao  wrote:
>
> > You can take a look at the jmx in
> > http://kafka.apache.org/documentation.html#monitoring
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Feb 27, 2014 at 7:50 PM, Dan Hoffman 
> wrote:
> >
> > > Let's say I have a running cluster and users/apps are pounding away at
> > it.
> > >  Is there a quick and easy way to measure its current throughput?   I
> > know
> > > there are utilities for generating volume to get stats, but I'd like to
> > > simply get some stats about its current operation.  Is there a good way
> > to
> > > do this?
> > >
> >
>


RE: How does one measure performance of an existing Kafka cluster?

2014-02-27 Thread Dan Hoffman
I'm surprised that doesn't already exist - I would think that would be a
common requirement?

-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Thursday, February 27, 2014 11:41 PM
To: users@kafka.apache.org
Subject: Re: How does one measure performance of an existing Kafka cluster?

The jmx beans will expose the metrics per broker. You would need some
utility to aggregate across all brokers in a cluster.

Thanks,
Neha


On Thu, Feb 27, 2014 at 8:31 PM, Dan Hoffman  wrote:

> Are the numbers for the entire cluster or just the broker connected to?
>  (I'm interested in the former)
>
>
> On Thu, Feb 27, 2014 at 11:26 PM, Jun Rao  wrote:
>
> > You can take a look at the jmx in
> > http://kafka.apache.org/documentation.html#monitoring
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Feb 27, 2014 at 7:50 PM, Dan Hoffman 
> wrote:
> >
> > > Let's say I have a running cluster and users/apps are pounding 
> > > away at
> > it.
> > >  Is there a quick and easy way to measure its current throughput?   I
> > know
> > > there are utilities for generating volume to get stats, but I'd 
> > > like to simply get some stats about its current operation.  Is 
> > > there a good way
> > to
> > > do this?
> > >
> >
>



Re: How does one measure performance of an existing Kafka cluster?

2014-02-27 Thread Joe Stein
It is but everyone has different systems to-do that with (Cacti, Graphite,
Ganglia, Riemann, Etc) there are reporters available for them even as a
service like from Sematext, Boundary and DataDog too.  More of the
ecosystem https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem or
maybe if you describe your existing environment you can find out what is
available for that from the list.

In the performance test
https://cwiki.apache.org/confluence/display/KAFKA/Performance+testing there
is a built in CSV reporter which you can use this R script
https://issues.apache.org/jira/browse/KAFKA-1190 to plot with... you can
use Excel to sum/group/filter the CSV has you need too.

On Thu, Feb 27, 2014 at 11:58 PM, Dan Hoffman  wrote:

> I'm surprised that doesn't already exist - I would think that would be a
> common requirement?
>
> -Original Message-
> From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> Sent: Thursday, February 27, 2014 11:41 PM
> To: users@kafka.apache.org
> Subject: Re: How does one measure performance of an existing Kafka cluster?
>
> The jmx beans will expose the metrics per broker. You would need some
> utility to aggregate across all brokers in a cluster.
>
> Thanks,
> Neha
>
>
> On Thu, Feb 27, 2014 at 8:31 PM, Dan Hoffman  wrote:
>
> > Are the numbers for the entire cluster or just the broker connected to?
> >  (I'm interested in the former)
> >
> >
> > On Thu, Feb 27, 2014 at 11:26 PM, Jun Rao  wrote:
> >
> > > You can take a look at the jmx in
> > > http://kafka.apache.org/documentation.html#monitoring
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Feb 27, 2014 at 7:50 PM, Dan Hoffman 
> > wrote:
> > >
> > > > Let's say I have a running cluster and users/apps are pounding
> > > > away at
> > > it.
> > > >  Is there a quick and easy way to measure its current throughput?   I
> > > know
> > > > there are utilities for generating volume to get stats, but I'd
> > > > like to simply get some stats about its current operation.  Is
> > > > there a good way
> > > to
> > > > do this?
> > > >
> > >
> >
>
>