[jira] [Commented] (KAFKA-690) TopicMetadataRequest throws exception when no topics are specified

2013-01-09 Thread Maxime Brugidou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548436#comment-13548436
 ] 

Maxime Brugidou commented on KAFKA-690:
---

this would resolve KAFKA-653

> TopicMetadataRequest throws exception when no topics are specified
> --
>
> Key: KAFKA-690
> URL: https://issues.apache.org/jira/browse/KAFKA-690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: David Arthur
> Fix For: 0.8
>
> Attachments: KAFKA-690.patch
>
>
> If no topics are sent in a TopicMetadataRequest, `readFrom` throws an 
> exception when trying to get the the head of the topic list for a debug 
> statement.
> java.util.NoSuchElementException: head of empty list
>   at scala.collection.immutable.Nil$.head(List.scala:386)
>   at scala.collection.immutable.Nil$.head(List.scala:383)
>   at 
> kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
>   at 
> kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
>   at kafka.utils.Logging$class.debug(Logging.scala:51)
>   at kafka.api.TopicMetadataRequest$.debug(TopicMetadataRequest.scala:25)
>   at 
> kafka.api.TopicMetadataRequest$.readFrom(TopicMetadataRequest.scala:43)
>   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>   at kafka.network.RequestChannel$Request.(RequestChannel.scala:47)
>   at kafka.network.Processor.read(SocketServer.scala:320)
>   at kafka.network.Processor.run(SocketServer.scala:231)
>   at java.lang.Thread.run(Thread.java:680)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-634) ConsoleProducer compresses messages and ignores the --compress flag

2013-01-09 Thread Maxime Brugidou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548441#comment-13548441
 ] 

Maxime Brugidou commented on KAFKA-634:
---

KAFKA-506 fixed this (commit f64fd3dcbaace1dba7bbd72398bb3e7d28b41d61 in the 
0.8 branch)

This will be fixed in 0.8 I guess

> ConsoleProducer compresses messages and ignores the --compress flag
> ---
>
> Key: KAFKA-634
> URL: https://issues.apache.org/jira/browse/KAFKA-634
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.7
>Reporter: Anentropic
>  Labels: console, producer
>
> I am using the kafka-producer-shell.sh script without the --compress option
> however my messages seem to be gzipped
> the docs say compression is off by default:
> http://incubator.apache.org/kafka/configuration.html
> The only producer.properties file I can find is at:
> /home/ubuntu/kafka-0.7.2-incubating-src/config/producer.properties
> In there is:
> compression.codec=0
> My process looks like:
> root  1748  1746  0 Nov19 ?00:02:37 java -Xmx512M -server 
> -Dlog4j.configuration=file:/usr/local/bin/kafka/../config/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false -cp 
> :/usr/local/bin/kafka/../project/boot/scala-2.8.0/lib/scala-compiler.jar:/usr/local/bin/kafka/../project/boot/scala-2.8.0/lib/scala-library.jar:/usr/local/bin/kafka/../core/target/scala_2.8.0/kafka-0.7.2.jar:/usr/local/bin/kafka/../core/lib/*.jar:/usr/local/bin/kafka/../perf/target/scala_2.8.0/kafka-perf-0.7.2.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/snappy-java-1.0.4.1.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/zkclient-0.1.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/zookeeper-3.3.4.jar
>  kafka.producer.ConsoleProducer --topic logtail --zookeeper x.x.x.x:2181
> But the messages come out gobbledegook unless I use a client that understands 
> compressed messages, and in that client it identifies the bit as set to 1, 
> gzip compression.
> Jun Rao jun...@gmail.com via incubator.apache.org 
> Nov 26 (1 day ago)
> to kafka-users 
> This seems to be a bug in ConsoleProducer. It also compresses messages and
> ignores the --compress flag. Could you file a jira?
> Thanks,
> Jun

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-634) ConsoleProducer compresses messages and ignores the --compress flag

2013-01-09 Thread Maxime Brugidou (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxime Brugidou resolved KAFKA-634.
---

   Resolution: Fixed
Fix Version/s: 0.8

> ConsoleProducer compresses messages and ignores the --compress flag
> ---
>
> Key: KAFKA-634
> URL: https://issues.apache.org/jira/browse/KAFKA-634
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.7
>Reporter: Anentropic
>  Labels: console, producer
> Fix For: 0.8
>
>
> I am using the kafka-producer-shell.sh script without the --compress option
> however my messages seem to be gzipped
> the docs say compression is off by default:
> http://incubator.apache.org/kafka/configuration.html
> The only producer.properties file I can find is at:
> /home/ubuntu/kafka-0.7.2-incubating-src/config/producer.properties
> In there is:
> compression.codec=0
> My process looks like:
> root  1748  1746  0 Nov19 ?00:02:37 java -Xmx512M -server 
> -Dlog4j.configuration=file:/usr/local/bin/kafka/../config/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false -cp 
> :/usr/local/bin/kafka/../project/boot/scala-2.8.0/lib/scala-compiler.jar:/usr/local/bin/kafka/../project/boot/scala-2.8.0/lib/scala-library.jar:/usr/local/bin/kafka/../core/target/scala_2.8.0/kafka-0.7.2.jar:/usr/local/bin/kafka/../core/lib/*.jar:/usr/local/bin/kafka/../perf/target/scala_2.8.0/kafka-perf-0.7.2.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/snappy-java-1.0.4.1.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/zkclient-0.1.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/zookeeper-3.3.4.jar
>  kafka.producer.ConsoleProducer --topic logtail --zookeeper x.x.x.x:2181
> But the messages come out gobbledegook unless I use a client that understands 
> compressed messages, and in that client it identifies the bit as set to 1, 
> gzip compression.
> Jun Rao jun...@gmail.com via incubator.apache.org 
> Nov 26 (1 day ago)
> to kafka-users 
> This seems to be a bug in ConsoleProducer. It also compresses messages and
> ignores the --compress flag. Could you file a jira?
> Thanks,
> Jun

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-690) TopicMetadataRequest throws exception when no topics are specified

2013-01-09 Thread David Arthur (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548522#comment-13548522
 ] 

David Arthur commented on KAFKA-690:


Ah, I didn't see that JIRA. I was following 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataRequest
 and read "TopicName -> The topics to produce metadata for. If empty the 
request will yield metadata for all topics", so I assumed this was a bug 
instead of a TODO

> TopicMetadataRequest throws exception when no topics are specified
> --
>
> Key: KAFKA-690
> URL: https://issues.apache.org/jira/browse/KAFKA-690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: David Arthur
> Fix For: 0.8
>
> Attachments: KAFKA-690.patch
>
>
> If no topics are sent in a TopicMetadataRequest, `readFrom` throws an 
> exception when trying to get the the head of the topic list for a debug 
> statement.
> java.util.NoSuchElementException: head of empty list
>   at scala.collection.immutable.Nil$.head(List.scala:386)
>   at scala.collection.immutable.Nil$.head(List.scala:383)
>   at 
> kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
>   at 
> kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
>   at kafka.utils.Logging$class.debug(Logging.scala:51)
>   at kafka.api.TopicMetadataRequest$.debug(TopicMetadataRequest.scala:25)
>   at 
> kafka.api.TopicMetadataRequest$.readFrom(TopicMetadataRequest.scala:43)
>   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>   at kafka.network.RequestChannel$Request.(RequestChannel.scala:47)
>   at kafka.network.Processor.read(SocketServer.scala:320)
>   at kafka.network.Processor.run(SocketServer.scala:231)
>   at java.lang.Thread.run(Thread.java:680)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: kafka 0.8 producer throughput

2013-01-09 Thread S Ahmed
What's the ack for?  If it fails, it will try another broker?  Can this be
disabled or it's a major design change?


On Wed, Jan 9, 2013 at 12:40 AM, Jun Rao  wrote:

> The 50MB/s number is for 0.7. We haven't carefully measured the performance
> in 0.8 yet. We do expect the throughput that a single producer can drive in
> 0.8 to be less. This is because the 0.8 producer needs to wait for an RPC
> response from the broker while in 0.7, there is no ack for the producer.
> Nevertheless, 2MB/s seems low. Could you try increasing flush interval to
> sth bigger, like 2?
>
> Thanks,
>
> Jun
>
> On Tue, Jan 8, 2013 at 8:32 PM, Jun Guo -X (jungu - CIIC at Cisco) <
> ju...@cisco.com> wrote:
>
> > According to Kafka official document, the producer throughput is about
> > 50MB/S. But I do some test, the producer throughout is only about 2MB/S.
> > The test environment is the same with document says. One producer, One
> > broker, One Zookeeper are in independent machine. Message size is 100
> > bytes, batch size is 200, flush interval is 600 messages. The test
> > environment is the same, the configuration is the same. The why there is
> > such big gap the my test result and the document says?
> >
>


[jira] [Commented] (KAFKA-687) Rebalance algorithm should consider partitions from all topics

2013-01-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548604#comment-13548604
 ] 

Jay Kreps commented on KAFKA-687:
-

This is a very good point, and not one I had considered.

It is probably not a trivial change because right now I think the election is 
done for each topic independently.

We have in mind in the next major release after 0.8 (0.9, presumably) to move 
this co-ordination to the server, which would be a good time to fix this. We 
could either do this balancing exactly or else just randomize the start index 
(which would be almost as good if you had many topics.

> Rebalance algorithm should consider partitions from all topics
> --
>
> Key: KAFKA-687
> URL: https://issues.apache.org/jira/browse/KAFKA-687
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.1
>Reporter: Pablo Barrera
>
> The current rebalance step, as stated in the original Kafka paper [1], splits 
> the partitions per topic between all the consumers. So if you have 100 topics 
> with 2 partitions each and 10 consumers only two consumers will be used. That 
> is, for each topic all partitions will be listed and shared between the 
> consumers in the consumer group in order (not randomly).
> If the consumer group is reading from several topics at the same time it 
> makes sense to split all the partitions from all topics between all the 
> consumer. Following the example, we will have 200 partitions in total, 20 per 
> consumer, using the 10 consumers.
> The load per topic could be different and the division should consider this. 
> However even a random division should be better than the current algorithm 
> while reading from several topics and should harm reading from a few topics 
> with several partitions.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist

2013-01-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548613#comment-13548613
 ] 

Jay Kreps commented on KAFKA-689:
-

This is pretty hacky though, no? fetching metadata should not create 
topics--that is like a getter subtly changing values underneith you. I think 
this is more evidence for needing to expose a proper create_topic api.

> Can't append to a topic/partition that does not already exist
> -
>
> Key: KAFKA-689
> URL: https://issues.apache.org/jira/browse/KAFKA-689
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8
>Reporter: David Arthur
> Attachments: kafka.log, produce-payload.bin
>
>
> With a totally fresh Kafka (empty logs dir and empty ZK), if I send a 
> ProduceRequest for a new topic, Kafka responds with 
> "kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 
> doesn't exist on 0". This is when sending a ProduceRequest over the network 
> (from Python, in this case).
> If I use the console producer it works fine (topic and partition get 
> created). If I then send the same payload from before over the network, it 
> works.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-691:
---

 Summary: Fault tolerance broken with replication factor 1
 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps


In 0.7 if a partition was down we would just send the message elsewhere. This 
meant that the partitioning was really more of a "stickiness" then a hard 
guarantee. This made it impossible to depend on it for partitioned, stateful 
processing.

In 0.8 when running with replication this should not be a problem generally as 
the partitions are now highly available and fail over to other replicas. 
However in the case of replication factor = 1 no longer really works for most 
cases as now a dead broker will give errors for that broker.

I am not sure of the best fix. Intuitively I think this is something that 
should be handled by the Partitioner interface. However currently the 
partitioner has no knowledge of which nodes are available. So you could use a 
random partitioner, but that would keep going back to the down node.



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: Kafka 0.8 without replication - handle broker failure/availability

2013-01-09 Thread Jay Kreps
This is a good point. We have discussed this a little bit before. The key
constraint is that with replication factor 1 you can choose one of the
following: (1) high availability, (2) correct semantic partitioning. That
is to say, if a particular partition is unavailable you have no choice but
to give up and throw an error or else send the message elsewhere.

Obviously replication fixes this by just making the partitions highly
available.

It isn't really correct for us to choose one of these for the user. If they
are depending on partitioning, silently sending data elsewhere may be worse
then giving an error. So the user needs to somehow specify which behavior
they want.

Here is a JIRA where we can work out the details. I suspect this is a
blocker for 0.8:
https://issues.apache.org/jira/browse/KAFKA-691

As a work around in the meantime you can probably run with
replication--although it sounds like you don't really need it, it shouldn't
hurt.

-Jay


On Wed, Jan 9, 2013 at 2:38 AM, Maxime Brugidou
wrote:

> Hello, I am currently testing the 0.8 branch (and it works quite well). We
> plan to not use the replication feature for now since we don't really need
> it, we can afford to lose data in case of unrecoverable failure from a
> broker.
>
> However, we really don't want to have producers/consumers fail if a broker
> is down. The ideal scenario (that was working on 0.7) is that producers
> would just produce to available partitions and consumers would consume from
> available partitions. If the broker comes back online, the consumer will
> catch up, if not we can decide to throw away the data.
>
> Is this feasible from 0.8? right now if i kill a broker it just makes
> everything fail...
>
> Multiple issues will come up:
> - Since now the partitions are set globally and never change, the
> availability of a topic vary depending on where the partitions are located
> - We would need tools to make sure topics are spread enough and rebalance
> them accordingly, (using the "DDL" i heard about, i'm not sure yet about
> how it works, i tried editing the json strings in zk, it somehow works, and
> there's the reassignment admin command too)
>
> That looks rather complicated, or maybe I'm missing something? The model
> that was used in 0.7 looked much easier to operate (it had drawbacks, and
> couldn't do intra-cluster replication, but at least the availability of the
> cluster was much higher).
>
> Thanks in advance for any help/clues,
>
> Maxime
>


[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist

2013-01-09 Thread David Arthur (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548635#comment-13548635
 ] 

David Arthur commented on KAFKA-689:


Could the metadata API be modified with an "auto-create" flag?

> Can't append to a topic/partition that does not already exist
> -
>
> Key: KAFKA-689
> URL: https://issues.apache.org/jira/browse/KAFKA-689
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8
>Reporter: David Arthur
> Attachments: kafka.log, produce-payload.bin
>
>
> With a totally fresh Kafka (empty logs dir and empty ZK), if I send a 
> ProduceRequest for a new topic, Kafka responds with 
> "kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 
> doesn't exist on 0". This is when sending a ProduceRequest over the network 
> (from Python, in this case).
> If I use the console producer it works fine (topic and partition get 
> created). If I then send the same payload from before over the network, it 
> works.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-690) TopicMetadataRequest throws exception when no topics are specified

2013-01-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548638#comment-13548638
 ] 

Jay Kreps commented on KAFKA-690:
-

+1 Thanks David!

Folks, objections to putting this on 0.8?

> TopicMetadataRequest throws exception when no topics are specified
> --
>
> Key: KAFKA-690
> URL: https://issues.apache.org/jira/browse/KAFKA-690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: David Arthur
> Fix For: 0.8
>
> Attachments: KAFKA-690.patch
>
>
> If no topics are sent in a TopicMetadataRequest, `readFrom` throws an 
> exception when trying to get the the head of the topic list for a debug 
> statement.
> java.util.NoSuchElementException: head of empty list
>   at scala.collection.immutable.Nil$.head(List.scala:386)
>   at scala.collection.immutable.Nil$.head(List.scala:383)
>   at 
> kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
>   at 
> kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
>   at kafka.utils.Logging$class.debug(Logging.scala:51)
>   at kafka.api.TopicMetadataRequest$.debug(TopicMetadataRequest.scala:25)
>   at 
> kafka.api.TopicMetadataRequest$.readFrom(TopicMetadataRequest.scala:43)
>   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>   at kafka.network.RequestChannel$Request.(RequestChannel.scala:47)
>   at kafka.network.Processor.read(SocketServer.scala:320)
>   at kafka.network.Processor.run(SocketServer.scala:231)
>   at java.lang.Thread.run(Thread.java:680)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: kafka 0.8 producer throughput

2013-01-09 Thread Jun Rao
In 0.8, ack is always required. The ack returns an errorcode that indicates
the reason if a produce request  fails (e.g., the request is sent to a
broker that's not a leader). It also returns the offset of the produced
messages. However, the producer can choose when to receive the acks (e.g.,
when data reaches 1 replica or all replicas). If the ack indicates an
error, the client can choose to retry. The retry logic is built into our
high level producer.

Thanks,

Jun

On Wed, Jan 9, 2013 at 6:20 AM, S Ahmed  wrote:

> What's the ack for?  If it fails, it will try another broker?  Can this be
> disabled or it's a major design change?
>
>
> On Wed, Jan 9, 2013 at 12:40 AM, Jun Rao  wrote:
>
> > The 50MB/s number is for 0.7. We haven't carefully measured the
> performance
> > in 0.8 yet. We do expect the throughput that a single producer can drive
> in
> > 0.8 to be less. This is because the 0.8 producer needs to wait for an RPC
> > response from the broker while in 0.7, there is no ack for the producer.
> > Nevertheless, 2MB/s seems low. Could you try increasing flush interval to
> > sth bigger, like 2?
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Jan 8, 2013 at 8:32 PM, Jun Guo -X (jungu - CIIC at Cisco) <
> > ju...@cisco.com> wrote:
> >
> > > According to Kafka official document, the producer throughput is about
> > > 50MB/S. But I do some test, the producer throughout is only about
> 2MB/S.
> > > The test environment is the same with document says. One producer, One
> > > broker, One Zookeeper are in independent machine. Message size is 100
> > > bytes, batch size is 200, flush interval is 600 messages. The test
> > > environment is the same, the configuration is the same. The why there
> is
> > > such big gap the my test result and the document says?
> > >
> >
>


[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist

2013-01-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548646#comment-13548646
 ] 

Jay Kreps commented on KAFKA-689:
-

Well I guess what I am saying is that getting metadata is not intuitively at 
all related to creating topics. I had noticed this code before but hadn't 
really thought about it. I assume the reason for this is because to make a 
correct produce request you have to know the host so the old strategy of doing 
auto-create on produce doesn't work in 0.8.

I think there are two sensible strategies for auto-create:
1. Auto create on produce. This is tricky because you have to somehow ensure 
that the local node would hold the partitions used (and how did the client come 
up with those partitions anyway?)
2. Add a public api for creating topics and make the client implement auto 
create client-side

I would favor (2).

There is no harm in the current scheme as long as people are warned that we 
intend to change it.

> Can't append to a topic/partition that does not already exist
> -
>
> Key: KAFKA-689
> URL: https://issues.apache.org/jira/browse/KAFKA-689
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8
>Reporter: David Arthur
> Attachments: kafka.log, produce-payload.bin
>
>
> With a totally fresh Kafka (empty logs dir and empty ZK), if I send a 
> ProduceRequest for a new topic, Kafka responds with 
> "kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 
> doesn't exist on 0". This is when sending a ProduceRequest over the network 
> (from Python, in this case).
> If I use the console producer it works fine (topic and partition get 
> created). If I then send the same payload from before over the network, it 
> works.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-687) Rebalance algorithm should consider partitions from all topics

2013-01-09 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-687:


Affects Version/s: (was: 0.8.1)
   0.9

> Rebalance algorithm should consider partitions from all topics
> --
>
> Key: KAFKA-687
> URL: https://issues.apache.org/jira/browse/KAFKA-687
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Pablo Barrera
>
> The current rebalance step, as stated in the original Kafka paper [1], splits 
> the partitions per topic between all the consumers. So if you have 100 topics 
> with 2 partitions each and 10 consumers only two consumers will be used. That 
> is, for each topic all partitions will be listed and shared between the 
> consumers in the consumer group in order (not randomly).
> If the consumer group is reading from several topics at the same time it 
> makes sense to split all the partitions from all topics between all the 
> consumer. Following the example, we will have 200 partitions in total, 20 per 
> consumer, using the 10 consumers.
> The load per topic could be different and the division should consider this. 
> However even a random division should be better than the current algorithm 
> while reading from several topics and should harm reading from a few topics 
> with several partitions.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: kafka 0.8 producer throughput

2013-01-09 Thread Jay Kreps
We haven't done a ton of performance work on 0.8 yet.

Regardless, requiring the ack will certainly reduce per-producer
throughput, but it is too early to say by how much. Obviously this won't
impact broker throughput (so if you have many producers you may not notice).

The plan to fix this is just to make the produce request non-blocking. This
will allow the same kind of throughput we had before but still allow us to
give you back and error response if you want it. The hope would be to make
this change in 0.9

-Jay


On Wed, Jan 9, 2013 at 8:24 AM, Jun Rao  wrote:

> In 0.8, ack is always required. The ack returns an errorcode that indicates
> the reason if a produce request  fails (e.g., the request is sent to a
> broker that's not a leader). It also returns the offset of the produced
> messages. However, the producer can choose when to receive the acks (e.g.,
> when data reaches 1 replica or all replicas). If the ack indicates an
> error, the client can choose to retry. The retry logic is built into our
> high level producer.
>
> Thanks,
>
> Jun
>
> On Wed, Jan 9, 2013 at 6:20 AM, S Ahmed  wrote:
>
> > What's the ack for?  If it fails, it will try another broker?  Can this
> be
> > disabled or it's a major design change?
> >
> >
> > On Wed, Jan 9, 2013 at 12:40 AM, Jun Rao  wrote:
> >
> > > The 50MB/s number is for 0.7. We haven't carefully measured the
> > performance
> > > in 0.8 yet. We do expect the throughput that a single producer can
> drive
> > in
> > > 0.8 to be less. This is because the 0.8 producer needs to wait for an
> RPC
> > > response from the broker while in 0.7, there is no ack for the
> producer.
> > > Nevertheless, 2MB/s seems low. Could you try increasing flush interval
> to
> > > sth bigger, like 2?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Jan 8, 2013 at 8:32 PM, Jun Guo -X (jungu - CIIC at Cisco) <
> > > ju...@cisco.com> wrote:
> > >
> > > > According to Kafka official document, the producer throughput is
> about
> > > > 50MB/S. But I do some test, the producer throughout is only about
> > 2MB/S.
> > > > The test environment is the same with document says. One producer,
> One
> > > > broker, One Zookeeper are in independent machine. Message size is 100
> > > > bytes, batch size is 200, flush interval is 600 messages. The test
> > > > environment is the same, the configuration is the same. The why there
> > is
> > > > such big gap the my test result and the document says?
> > > >
> > >
> >
>


[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist

2013-01-09 Thread David Arthur (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548648#comment-13548648
 ] 

David Arthur commented on KAFKA-689:


I don't disagree really, but what I'm saying is: why not piggyback topic 
creation onto the metadata API? 

Instead of: 

MetadataRequest => Topic
Topic => Name [Topic]

you could have:

MetadataRequest => Topic
Topic => Name CreateIfNotExist [Topic]

It would be a lot less code than a whole new API.

> Can't append to a topic/partition that does not already exist
> -
>
> Key: KAFKA-689
> URL: https://issues.apache.org/jira/browse/KAFKA-689
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8
>Reporter: David Arthur
> Attachments: kafka.log, produce-payload.bin
>
>
> With a totally fresh Kafka (empty logs dir and empty ZK), if I send a 
> ProduceRequest for a new topic, Kafka responds with 
> "kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 
> doesn't exist on 0". This is when sending a ProduceRequest over the network 
> (from Python, in this case).
> If I use the console producer it works fine (topic and partition get 
> created). If I then send the same payload from before over the network, it 
> works.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-648) Use uniform convention for naming properties keys

2013-01-09 Thread John Fung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548650#comment-13548650
 ] 

John Fung commented on KAFKA-648:
-

Hi Sriram,

The change in 
system_test/migration_tool_testsuite/config/migration_consumer.properties is 
not needed because the migration tool consumer is using 0.7 library and would 
not be aware of this naming convention change.

Other than that, changes in the System Test part looks good.

> Use uniform convention for naming properties keys 
> --
>
> Key: KAFKA-648
> URL: https://issues.apache.org/jira/browse/KAFKA-648
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Swapnil Ghike
>Assignee: Sriram Subramanian
>Priority: Blocker
> Fix For: 0.8, 0.8.1
>
> Attachments: configchanges-1.patch, configchanges-v2.patch, 
> configchanges-v3.patch
>
>
> Currently, the convention that we seem to use to get a property value in 
> *Config is as follows:
> val configVal = property.getType("config.val", ...) // dot is used to 
> separate two words in the key and the first letter of second word is 
> capitalized in configVal.
> We should use similar convention for groupId, consumerId, clientId, 
> correlationId.
> This change will probably be backward non-compatible.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: Kafka 0.8 without replication - handle broker failure/availability

2013-01-09 Thread Maxime Brugidou
Thanks for your response. I think the work-around is not really acceptable
for me since it will consume 3x the resources (because replication of 3 is
the minimum acceptable) and it will still make the cluster less available
anyway (unless i have only 3 brokers).

The thing is that 0.7 was making the cluster 100% available (for my use
case, accepting data loss) as long a single broker was alive.

A way to handle this would be to:
1. Have a lot of partitions per topic (more than the # of brokers)
2. Have something that rebalances the partitions and make sure a broker has
a at least a partition for each topic (to make every topic "available")
3. Have a setting in the consumer/producer that say "I don't care about
partitioning, just produce/consume wherever you can"

This is probably not simple to implement, I'll add these ideas in the JIRA
and will pursue the discussion there.

Maxime

On Wed, Jan 9, 2013 at 5:18 PM, Jay Kreps  wrote:

> As a work around in the meantime you can probably run with
> replication--although it sounds like you don't really need it, it shouldn't
> hurt.
>


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Maxime Brugidou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548667#comment-13548667
 ] 

Maxime Brugidou commented on KAFKA-691:
---

I think the work-around is not really acceptable for me since it will consume 
3x the resources (because replication of 3 is the minimum acceptable) and it 
will still make the cluster less available anyway (unless i have only 3 
brokers).

The thing is that 0.7 was making the cluster 100% available (for my use case, 
accepting data loss) as long a single broker was alive.

A way to handle this would be to:
1. Have a lot of partitions per topic (more than the # of brokers)
2. Have something that rebalances the partitions and make sure a broker has a 
at least a partition for each topic (to make every topic "available")
3. Have a setting in the consumer/producer that say "I don't care about 
partitioning, just produce/consume wherever you can"

> Fault tolerance broken with replication factor 1
> 
>
> Key: KAFKA-691
> URL: https://issues.apache.org/jira/browse/KAFKA-691
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Jay Kreps
>
> In 0.7 if a partition was down we would just send the message elsewhere. This 
> meant that the partitioning was really more of a "stickiness" then a hard 
> guarantee. This made it impossible to depend on it for partitioned, stateful 
> processing.
> In 0.8 when running with replication this should not be a problem generally 
> as the partitions are now highly available and fail over to other replicas. 
> However in the case of replication factor = 1 no longer really works for most 
> cases as now a dead broker will give errors for that broker.
> I am not sure of the best fix. Intuitively I think this is something that 
> should be handled by the Partitioner interface. However currently the 
> partitioner has no knowledge of which nodes are available. So you could use a 
> random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-690) TopicMetadataRequest throws exception when no topics are specified

2013-01-09 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548673#comment-13548673
 ] 

Neha Narkhede commented on KAFKA-690:
-

The protocol was meant to do this, so checked it on 0.8 branch.

> TopicMetadataRequest throws exception when no topics are specified
> --
>
> Key: KAFKA-690
> URL: https://issues.apache.org/jira/browse/KAFKA-690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: David Arthur
> Fix For: 0.8
>
> Attachments: KAFKA-690.patch
>
>
> If no topics are sent in a TopicMetadataRequest, `readFrom` throws an 
> exception when trying to get the the head of the topic list for a debug 
> statement.
> java.util.NoSuchElementException: head of empty list
>   at scala.collection.immutable.Nil$.head(List.scala:386)
>   at scala.collection.immutable.Nil$.head(List.scala:383)
>   at 
> kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
>   at 
> kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
>   at kafka.utils.Logging$class.debug(Logging.scala:51)
>   at kafka.api.TopicMetadataRequest$.debug(TopicMetadataRequest.scala:25)
>   at 
> kafka.api.TopicMetadataRequest$.readFrom(TopicMetadataRequest.scala:43)
>   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>   at kafka.network.RequestChannel$Request.(RequestChannel.scala:47)
>   at kafka.network.Processor.read(SocketServer.scala:320)
>   at kafka.network.Processor.run(SocketServer.scala:231)
>   at java.lang.Thread.run(Thread.java:680)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-690) TopicMetadataRequest throws exception when no topics are specified

2013-01-09 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-690.
-

Resolution: Fixed
  Assignee: David Arthur

> TopicMetadataRequest throws exception when no topics are specified
> --
>
> Key: KAFKA-690
> URL: https://issues.apache.org/jira/browse/KAFKA-690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: David Arthur
>Assignee: David Arthur
> Fix For: 0.8
>
> Attachments: KAFKA-690.patch
>
>
> If no topics are sent in a TopicMetadataRequest, `readFrom` throws an 
> exception when trying to get the the head of the topic list for a debug 
> statement.
> java.util.NoSuchElementException: head of empty list
>   at scala.collection.immutable.Nil$.head(List.scala:386)
>   at scala.collection.immutable.Nil$.head(List.scala:383)
>   at 
> kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
>   at 
> kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
>   at kafka.utils.Logging$class.debug(Logging.scala:51)
>   at kafka.api.TopicMetadataRequest$.debug(TopicMetadataRequest.scala:25)
>   at 
> kafka.api.TopicMetadataRequest$.readFrom(TopicMetadataRequest.scala:43)
>   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
>   at kafka.network.RequestChannel$Request.(RequestChannel.scala:47)
>   at kafka.network.Processor.read(SocketServer.scala:320)
>   at kafka.network.Processor.run(SocketServer.scala:231)
>   at java.lang.Thread.run(Thread.java:680)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-662) Create testcases for unclean shut down

2013-01-09 Thread John Fung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548683#comment-13548683
 ] 

John Fung commented on KAFKA-662:
-

1. testcase_9071 - This testcase is to test the basic behavior of unclean 
shutdown where no log truncation takes place.

B1   B2
===  ===

a. Both brokers are up running

b. Send 5 messages

m0m0
m1m1
m2m2
m3m3
m4m4

c. Shut down B2

d. Send 5 messages

m5
m6
m7
m8
m9

e. Shut down B1 (B1 & B2 are both down)

f. Sleep 5 seconds

g. Start B2 (B1 is still down)

h. Send 5 messages

   m10
   m11
   m12
   m13
   m14

i. Start B1 (both B1 & B2 are up running)

Recovering ...

j. Since there are no discrepancy in no. of messages maintained by the internal 
"index", there will not be any log truncation:

m0m0
m1m1
m2m2
m3m3
m4m4
m5m10
m6m11
m7m12
m8m13
m9m14


$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files 
/tmp/kafka_server_1_logs/t001-0/.log
Dumping /tmp/kafka_server_1_logs/t001-0/.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2307843899 payload: 
Topic:t001:ThreadID:0:MessageID:00
offset: 1 position: 126 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3770868426 payload: 
Topic:t001:ThreadID:0:MessageID:01
offset: 2 position: 252 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1528229081 payload: 
Topic:t001:ThreadID:0:MessageID:02
offset: 3 position: 378 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 844818728 payload: 
Topic:t001:ThreadID:0:MessageID:03
offset: 4 position: 504 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 4157601470 payload: 
Topic:t001:ThreadID:0:MessageID:04
offset: 5 position: 630 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2659447631 payload: 
Topic:t001:ThreadID:0:MessageID:05
offset: 6 position: 756 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 626523484 payload: 
Topic:t001:ThreadID:0:MessageID:06
offset: 7 position: 882 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1276901549 payload: 
Topic:t001:ThreadID:0:MessageID:07
offset: 8 position: 1008 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1963770929 payload: 
Topic:t001:ThreadID:0:MessageID:08
offset: 9 position: 1134 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 474005952 payload: 
Topic:t001:ThreadID:0:MessageID:09

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files 
/tmp/kafka_server_2_logs/t001-0/.log
Dumping /tmp/kafka_server_2_logs/t001-0/.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2307843899 payload: 
Topic:t001:ThreadID:0:MessageID:00
offset: 1 position: 126 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3770868426 payload: 
Topic:t001:ThreadID:0:MessageID:01
offset: 2 position: 252 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1528229081 payload: 
Topic:t001:ThreadID:0:MessageID:02
offset: 3 position: 378 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 844818728 payload: 
Topic:t001:ThreadID:0:MessageID:03
offset: 4 position: 504 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 4157601470 payload: 
Topic:t001:ThreadID:0:MessageID:04
offset: 5 position: 630 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1130195380 payload: 
Topic:t001:ThreadID:0:MessageID:10
offset: 6 position: 756 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 705784901 payload: 
Topic:t001:ThreadID:0:MessageID:11
offset: 7 position: 882 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2445632086 payload: 
Topic:t001:ThreadID:0:MessageID:12
offset: 8 position: 1008 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 4169754535 payload: 
Topic:t001:ThreadID:0:MessageID:13
offset: 9 position: 1134 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1025275953 payload: 
Topic:t001:ThreadID:0:MessageID:14



> Create testcas

[jira] [Commented] (KAFKA-662) Create testcases for unclean shut down

2013-01-09 Thread John Fung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548689#comment-13548689
 ] 

John Fung commented on KAFKA-662:
-

2. testcase_9072 - This testcase is to test the basic behavior of unclean 
shutdown where log truncation takes place.

B1   B2
===  ===

a. Both brokers are up running

b. Send 5 messages

m0m0
m1m1
m2m2
m3m3
m4m4

c. Shut down B2

d. Send 10 messages

m5
m6
m7
m8
m9
m10
m11
m12
m13
m14

e. Shut down B1 (B1 & B2 are both down)

f. Sleep 5 seconds

g. Start B2 (B1 is still down)

h. Send 5 messages

   m15
   m16
   m17
   m18
   m19

i. Start B1 (both B1 & B2 are up running)

Recovering ...

j. Since there are discrepancy in no. of messages maintained by the internal 
"index", there will be log truncation in B1's data log:

m0m0
m1m1
m2m2
m3m3
m4m4
m15   m15
m16   m16
m17   m17
m18   m18
m19   m19


$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files 
/tmp/kafka_server_1_logs/t001-0/.log
Dumping /tmp/kafka_server_1_logs/t001-0/.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2307843899 payload: 
Topic:t001:ThreadID:0:MessageID:00
offset: 1 position: 126 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3770868426 payload: 
Topic:t001:ThreadID:0:MessageID:01
offset: 2 position: 252 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1528229081 payload: 
Topic:t001:ThreadID:0:MessageID:02
offset: 3 position: 378 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 844818728 payload: 
Topic:t001:ThreadID:0:MessageID:03
offset: 4 position: 504 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 4157601470 payload: 
Topic:t001:ThreadID:0:MessageID:04
offset: 5 position: 630 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1414561216 payload: 
Topic:t001:ThreadID:0:MessageID:15
offset: 6 position: 756 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 4018435027 payload: 
Topic:t001:ThreadID:0:MessageID:16
offset: 7 position: 882 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2261284386 payload: 
Topic:t001:ThreadID:0:MessageID:17
offset: 8 position: 1008 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3219081918 payload: 
Topic:t001:ThreadID:0:MessageID:18
offset: 9 position: 1134 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3599978319 payload: 
Topic:t001:ThreadID:0:MessageID:19

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files 
/tmp/kafka_server_2_logs/t001-0/.log
Dumping /tmp/kafka_server_2_logs/t001-0/.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2307843899 payload: 
Topic:t001:ThreadID:0:MessageID:00
offset: 1 position: 126 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3770868426 payload: 
Topic:t001:ThreadID:0:MessageID:01
offset: 2 position: 252 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1528229081 payload: 
Topic:t001:ThreadID:0:MessageID:02
offset: 3 position: 378 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 844818728 payload: 
Topic:t001:ThreadID:0:MessageID:03
offset: 4 position: 504 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 4157601470 payload: 
Topic:t001:ThreadID:0:MessageID:04
offset: 5 position: 630 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1414561216 payload: 
Topic:t001:ThreadID:0:MessageID:15
offset: 6 position: 756 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 4018435027 payload: 
Topic:t001:ThreadID:0:MessageID:16
offset: 7 position: 882 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2261284386 payload: 
Topic:t001:ThreadID:0:MessageID:17
offset: 8 position: 1008 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3219081918 payload: 
Topic:t001:ThreadID:0:MessageID:18
offset: 9 position: 1134 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3599978319 payload: 
Topic:t001:ThreadID:0:MessageID:19



 

[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist

2013-01-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548746#comment-13548746
 ] 

Jay Kreps commented on KAFKA-689:
-

Well I think there are actually two problems.

The first is that it is mixing concerns in kind of a messy way. In some sense 
it would be the "get metadata or maybe create a new topic" api. So I would 
rather not enshrine that in the public API. I think it is okay the kind of 
hacky way it is now, and in the future we can add an api.

The second problem is that we need to move the per-topic config into zookeeper 
so that you can dynamically add topics with their own settings (flush interval, 
retention period, # of partitions, etc...there are like ten of these settings). 
This is discussed in KAFKA-554. Currently these are in the broker config, but 
that means bouncing the broker all the time which is a hassle. So we will need 
to include these in the api that creates topics. That makes it messier still 
since we would have all these properties mixed into the get_metadata call.

I agree that right now it is a lot of overhead to add an api, but I think we 
could fix that directly by making it easier to add apis. KAFKA-643 had one 
proposal for htis.
Since you just went through that it would be good to get your feedback on that 
proposal.

> Can't append to a topic/partition that does not already exist
> -
>
> Key: KAFKA-689
> URL: https://issues.apache.org/jira/browse/KAFKA-689
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8
>Reporter: David Arthur
> Attachments: kafka.log, produce-payload.bin
>
>
> With a totally fresh Kafka (empty logs dir and empty ZK), if I send a 
> ProduceRequest for a new topic, Kafka responds with 
> "kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 
> doesn't exist on 0". This is when sending a ProduceRequest over the network 
> (from Python, in this case).
> If I use the console producer it works fine (topic and partition get 
> created). If I then send the same payload from before over the network, it 
> works.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist

2013-01-09 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548748#comment-13548748
 ] 

Jun Rao commented on KAFKA-689:
---

Currently, auto-creation is a broker-side flag. Basically, the broker controls 
whether a topic can be created automatically or not. This is likely useful for 
admin. The getMetadata API implicitly implies auto-creation, subject to the 
server side config. This is probably a bit hacky. It does save one extra RPC. 
We can think a bit more if adding a separate create topic API is a better 
strategy.

> Can't append to a topic/partition that does not already exist
> -
>
> Key: KAFKA-689
> URL: https://issues.apache.org/jira/browse/KAFKA-689
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8
>Reporter: David Arthur
> Attachments: kafka.log, produce-payload.bin
>
>
> With a totally fresh Kafka (empty logs dir and empty ZK), if I send a 
> ProduceRequest for a new topic, Kafka responds with 
> "kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 
> doesn't exist on 0". This is when sending a ProduceRequest over the network 
> (from Python, in this case).
> If I use the console producer it works fine (topic and partition get 
> created). If I then send the same payload from before over the network, it 
> works.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-643) Refactor api definition layer

2013-01-09 Thread David Arthur (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548797#comment-13548797
 ] 

David Arthur commented on KAFKA-643:


+1 for splitting generic/specific parts of the API (this is basically what I do 
in my Python client).

+1 for specifying the protocol in a ~BNF form. This would require protocols to 
be specified as LL grammars (which they all are), which is required for 
efficient ByteBuffer packing/unpacking anyways. 

However, how would this scheme handle recursive definitions (like MessageSet)? 
I've always felt the depth of this should be limited to one, meaning a single 
Message can contain a compressed MessageSet which can only be composed of 
regular (uncompressed) Messages. In 
https://github.com/mumrah/kafka-python/blob/master/kafka/client.py#L355, I have 
to endlessly recurse to ensure I've fully consumed the messages - kind of a 
pain. If the depth was limited, I could decode it non-recursively. 

+0 for not using Avro et al. I understand the performance implications of using 
one of these frameworks, but it sure does make client development easier. 
However, as long as the protocol spec is clear (and correct) implementing a 
client is not so bad.

What about the Java API? As far as I can tell, the purpose of these classes is 
to delegate to the real APIs and handle Java -> Scala data type conversion. It 
seems like this should be able to be automatic/automagic. Although, I guess for 
the implicits stuff to work the Java classes must be present.

I know it's very new (Scala 10) and experimental, but macros might help in 
simplifying the APIs: http://docs.scala-lang.org/overviews/macros/overview.html.

> Refactor api definition layer
> -
>
> Key: KAFKA-643
> URL: https://issues.apache.org/jira/browse/KAFKA-643
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.1
>Reporter: Jay Kreps
>Assignee: Jay Kreps
>
> The way we are defining our protocol is really a bit embarrassing. It is full 
> of ad hoc serialization code for each API. This code is very fiddly and 
> opaque and when it has errors they are hard to debug. Since it is all done 
> one-off it is also very easy for it to become inconsistent. This was 
> tolerable when there were only two apis with a few fields each, but now there 
> are a half dozen more complex apis. By my count there is now over 1000 lines 
> of code in kafka.apis.*.
> One option would be to use protocol buffers or thrift or another 
> schema-oriented code gen RPC language. However I think this is probably the 
> wrong direction for a couple reasons. One is that we want something that 
> works well with our I/O model, both network and disk, which is very 
> NIO-centric. So it should work directly with ByteBuffers. Second I feel that 
> these systems complicate the specification of the protocol. They give a 
> schema, which is a great high-level description, but the translation of that 
> to bytes is essentially whatever their code-gen engine chooses to do. These 
> things are a great way to build application services, but not great for 
> something like what we are building.
> Instead I think we should do what we have done, specify the protocol as a 
> wiki. However we should write a little helper code to make our lives easier.
> Here is my recommendation for how this code would work. We add two helper 
> classes: Schema and Record.
> You define messages formats like this:
> import Types._
> val FetchRequestProtocol = 
>   Schema("ReplicaId"->int32, 
>"MaxWaitTime"->int32, 
>"MinBytes"->int32,
>Seq("TopicName"->utf8,
>   Seq("Partition"->int32, 
>  "FetchOffset"->int64, 
>  "MaxBytes"->int32)))
> Note that this almost exactly matches the BNF for the fetch request: 
>   
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> Once defined this schema can be used to parse messages:
>   val record: Record = FetchRequestProtocol.readFrom(buffer)
> A record is just a wrapper around an array. The readFrom method parses out 
> the fields specified in the schema and populates the array. Fields in the 
> record can be accessed by name, e.g. 
>   record("ReplicaId")
> For common access this is probably good enough. However since the position is 
> fixed, it is also possible to get the element by a Field object, which gets 
> rid of the hashmap lookup and goes directly to the right slot. E.g.
>   val ReplicaIdField = FetchRequestProtocol("ReplicaId") // do this as a 
> global variable
>   ...
>   record(ReplicaIdField)
> This will be for cases where we are a bit performance conscious and don't 
> want to do umpteen hashmap lookups to resolve string field names.
> Likew

[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548847#comment-13548847
 ] 

Jun Rao commented on KAFKA-691:
---

One thing we can do is to change the partitioner api so that it takes # of 
partitions and for each partition, an indicator whether a partition is 
available or not. The we can change the default partitioner to only route a 
message to the available partitions, if a key is not provided.

> Fault tolerance broken with replication factor 1
> 
>
> Key: KAFKA-691
> URL: https://issues.apache.org/jira/browse/KAFKA-691
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Jay Kreps
>
> In 0.7 if a partition was down we would just send the message elsewhere. This 
> meant that the partitioning was really more of a "stickiness" then a hard 
> guarantee. This made it impossible to depend on it for partitioned, stateful 
> processing.
> In 0.8 when running with replication this should not be a problem generally 
> as the partitions are now highly available and fail over to other replicas. 
> However in the case of replication factor = 1 no longer really works for most 
> cases as now a dead broker will give errors for that broker.
> I am not sure of the best fix. Intuitively I think this is something that 
> should be handled by the Partitioner interface. However currently the 
> partitioner has no knowledge of which nodes are available. So you could use a 
> random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-688) System Test - Update all testcase_xxxx_properties.json for properties keys uniform naming convention

2013-01-09 Thread John Fung (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Fung updated KAFKA-688:


Attachment: kafka-688-v1.patch

> System Test - Update all testcase__properties.json for properties keys 
> uniform naming convention
> 
>
> Key: KAFKA-688
> URL: https://issues.apache.org/jira/browse/KAFKA-688
> Project: Kafka
>  Issue Type: Task
>Reporter: John Fung
>Assignee: John Fung
>  Labels: replication-testing
> Attachments: kafka-688-v1.patch
>
>
> After the changes made in "uniform naming convention of properties keys" 
> (KAFKA-648), all testcase__properties.json files need to be updated on 
> the following properties keys changes:
> brokerid => broker.id
> log.file.size => log.segment.size
> groupid => group.id

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-683) Fix correlation ids in all requests sent to kafka

2013-01-09 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-683:


Attachment: kafka-683-v2.patch

1. DefaultEventHandler: Good suggestion, included it
2. FileMessageSet: Done
3. KafkaMigrationTool: This was accidental, reverted it
4.1 Makes sense, al though we don't really need a shouldRoll variable. 
Refactored this code a bit
4.2 Adding a function name probably will be ugly. I added a statement that 
allows us to understand where those log statements are coming from. Also, we 
already print the names of the log segments followed by whether or not the 
index and data files exist.
5. Good point. One concern with adding a contstructor without correlation id is 
that most users will use it, which will hide the correlation id information. 
And this data is useful for troubleshooting. Al though, I wonder how many users 
would think that is annoying
6. I found it useful while troubleshooting since that marks the time we 
received the request from the socket. In KafkaApis, the statement tells us the 
time when it trickles through the queues to actually get handled. This is at 
trace anyway and will not show by up in the server log otherwise.
7. Those changes to log4j.properties were not intended to be checked in. I was 
trying to see if log4j can be configured to delete older log files 
automatically. Turns out it cannot. Will revert those changes. Of course, by 
default, the server logs should go to ConsoleAppender
8. State change log in this patch just means everything from kafka.controller. 
There is another patch to format the state change log correctly


> Fix correlation ids in all requests sent to kafka
> -
>
> Key: KAFKA-683
> URL: https://issues.apache.org/jira/browse/KAFKA-683
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Critical
>  Labels: improvement, replication
> Attachments: kafka-683-v1.patch, kafka-683-v2.patch
>
>
> We should fix the correlation ids in every request sent to Kafka and fix the 
> request log on the broker to specify not only the type of request and who 
> sent it, but also the correlation id. This will be very helpful while 
> troubleshooting problems in production.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-688) System Test - Update all testcase_xxxx_properties.json for properties keys uniform naming convention

2013-01-09 Thread John Fung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548886#comment-13548886
 ] 

John Fung commented on KAFKA-688:
-

Uploaded kafka-688-v1.patch which is required after checking in the patch in 
KAFKA-648.

The changes include:

1. Updated all testcase__properties.json on the following properties keys:
brokerid => broker.id
log.file.size => log.segment.size
groupid => group.id 

2. Added "07_client" property to 
migration_tool_testsuite/testcase_/testcase__properties.json files 
under those brokers running in 0.7 code

3. Modified system_test/utils/kafka_system_test_utils.py to add "brokerid" key 
to server.properties file if the broker is running under 0.7

> System Test - Update all testcase__properties.json for properties keys 
> uniform naming convention
> 
>
> Key: KAFKA-688
> URL: https://issues.apache.org/jira/browse/KAFKA-688
> Project: Kafka
>  Issue Type: Task
>Reporter: John Fung
>Assignee: John Fung
>  Labels: replication-testing
> Attachments: kafka-688-v1.patch
>
>
> After the changes made in "uniform naming convention of properties keys" 
> (KAFKA-648), all testcase__properties.json files need to be updated on 
> the following properties keys changes:
> brokerid => broker.id
> log.file.size => log.segment.size
> groupid => group.id

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-688) System Test - Update all testcase_xxxx_properties.json for properties keys uniform naming convention

2013-01-09 Thread John Fung (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Fung updated KAFKA-688:


Status: Patch Available  (was: Open)

> System Test - Update all testcase__properties.json for properties keys 
> uniform naming convention
> 
>
> Key: KAFKA-688
> URL: https://issues.apache.org/jira/browse/KAFKA-688
> Project: Kafka
>  Issue Type: Task
>Reporter: John Fung
>Assignee: John Fung
>  Labels: replication-testing
> Attachments: kafka-688-v1.patch
>
>
> After the changes made in "uniform naming convention of properties keys" 
> (KAFKA-648), all testcase__properties.json files need to be updated on 
> the following properties keys changes:
> brokerid => broker.id
> log.file.size => log.segment.size
> groupid => group.id

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-648) Use uniform convention for naming properties keys

2013-01-09 Thread Sriram Subramanian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriram Subramanian updated KAFKA-648:
-

Attachment: configchanges-v4.patch

Reverted the change from migrationtool/consumer.properties

> Use uniform convention for naming properties keys 
> --
>
> Key: KAFKA-648
> URL: https://issues.apache.org/jira/browse/KAFKA-648
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Swapnil Ghike
>Assignee: Sriram Subramanian
>Priority: Blocker
> Fix For: 0.8, 0.8.1
>
> Attachments: configchanges-1.patch, configchanges-v2.patch, 
> configchanges-v3.patch, configchanges-v4.patch
>
>
> Currently, the convention that we seem to use to get a property value in 
> *Config is as follows:
> val configVal = property.getType("config.val", ...) // dot is used to 
> separate two words in the key and the first letter of second word is 
> capitalized in configVal.
> We should use similar convention for groupId, consumerId, clientId, 
> correlationId.
> This change will probably be backward non-compatible.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-648) Use uniform convention for naming properties keys

2013-01-09 Thread John Fung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548989#comment-13548989
 ] 

John Fung commented on KAFKA-648:
-

Tested v4 patch with KAFKA-688-v1.patch with the latest patch and it works fine.

> Use uniform convention for naming properties keys 
> --
>
> Key: KAFKA-648
> URL: https://issues.apache.org/jira/browse/KAFKA-648
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Swapnil Ghike
>Assignee: Sriram Subramanian
>Priority: Blocker
> Fix For: 0.8, 0.8.1
>
> Attachments: configchanges-1.patch, configchanges-v2.patch, 
> configchanges-v3.patch, configchanges-v4.patch
>
>
> Currently, the convention that we seem to use to get a property value in 
> *Config is as follows:
> val configVal = property.getType("config.val", ...) // dot is used to 
> separate two words in the key and the first letter of second word is 
> capitalized in configVal.
> We should use similar convention for groupId, consumerId, clientId, 
> correlationId.
> This change will probably be backward non-compatible.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-133) Publish kafka jar to a public maven repository

2013-01-09 Thread Evan Pollan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13549067#comment-13549067
 ] 

Evan Pollan commented on KAFKA-133:
---

Just to clarify -- this is marked as resolved, but I don't see any artifacts in 
maven central (search.maven.org).  Is there some other repo to which the 
artifacts have been released?

> Publish kafka jar to a public maven repository
> --
>
> Key: KAFKA-133
> URL: https://issues.apache.org/jira/browse/KAFKA-133
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.6, 0.8
>Reporter: Neha Narkhede
>  Labels: patch
> Fix For: 0.8
>
> Attachments: KAFKA-133.patch, pom.xml
>
>
> The released kafka jar must be download manually and then deploy to a private 
> repository before they can be used by a developer using maven2.
> Similar to other Apache projects, it will be nice to have a way to publish 
> Kafka releases to a public maven repo. 
> In the past, we gave it a try using sbt publish to Sonatype Nexus maven repo, 
> but ran into some authentication problems. It will be good to revisit this 
> and get it resolved.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-648) Use uniform convention for naming properties keys

2013-01-09 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13549075#comment-13549075
 ] 

Jun Rao commented on KAFKA-648:
---

Thanks for patch v4. Made another pass. Should we make the following changes?

40. KafkaConfig:
max.message.size => max.message.bytes
socket.send.buffer => socket.send.buffer.bytes
socket.receive.buffer => socket.receive.buffer.bytes
log.segment.size => log.segment.bytes
log.retention.size => log.retention.bytes
log.flush.interval => log.flush.interval.messages
log.default.flush.interval.ms => log.flush.interval.ms
log.flush.intervals.ms.per.topic => log.flush.interval.ms.per.topic
replica.socket.buffersize => replica.socket.receive.buffer.bytes
replica.fetch.size => replica.max.fetch.bytes
fetch.request.purgatory.purge.interval => 
fetch.purgatory.purge.interval.requests
producer.request.purgatory.purge.interval => 
producer.purgatory.purge.interval.requests
replica.max.lag.time.ms => max.replica.lag.time.ms
replica.max.lag.bytes => max.replica.lag.bytes
replica.fetch.max.wait.ms => max.replica.fetch.wait.ms
replica.fetch.min.bytes => min.replica.fetch.bytes

41. ProducerConfig: producer.retry.count. In other configs we have 
num.network.threads. To be consistent, shouldn't we use num.producer.retries?

42. AsyncProducerConfig:
queue.time => queue.time.ms
buffer.size => socket.send.buffer.bytes

43. ConsumerConfig:
socket.buffer.size => socket.receive.buffer.bytes
fetch.size => max.fetch.bytes


> Use uniform convention for naming properties keys 
> --
>
> Key: KAFKA-648
> URL: https://issues.apache.org/jira/browse/KAFKA-648
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Swapnil Ghike
>Assignee: Sriram Subramanian
>Priority: Blocker
> Fix For: 0.8, 0.8.1
>
> Attachments: configchanges-1.patch, configchanges-v2.patch, 
> configchanges-v3.patch, configchanges-v4.patch
>
>
> Currently, the convention that we seem to use to get a property value in 
> *Config is as follows:
> val configVal = property.getType("config.val", ...) // dot is used to 
> separate two words in the key and the first letter of second word is 
> capitalized in configVal.
> We should use similar convention for groupId, consumerId, clientId, 
> correlationId.
> This change will probably be backward non-compatible.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Maxime Brugidou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13549105#comment-13549105
 ] 

Maxime Brugidou commented on KAFKA-691:
---

I agree with Jun solution, this would solve 3 (1 and 2 can be done manualy 
already -- just send a ReassignPartition command when you add a broker)

I could probably implement this very quickly, I'm just not sure of how you get 
the availability of a partition, but i'll try to figure it out and submit a 
first patch tomorrow.

> Fault tolerance broken with replication factor 1
> 
>
> Key: KAFKA-691
> URL: https://issues.apache.org/jira/browse/KAFKA-691
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Jay Kreps
>
> In 0.7 if a partition was down we would just send the message elsewhere. This 
> meant that the partitioning was really more of a "stickiness" then a hard 
> guarantee. This made it impossible to depend on it for partitioned, stateful 
> processing.
> In 0.8 when running with replication this should not be a problem generally 
> as the partitions are now highly available and fail over to other replicas. 
> However in the case of replication factor = 1 no longer really works for most 
> cases as now a dead broker will give errors for that broker.
> I am not sure of the best fix. Intuitively I think this is something that 
> should be handled by the Partitioner interface. However currently the 
> partitioner has no knowledge of which nodes are available. So you could use a 
> random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-133) Publish kafka jar to a public maven repository

2013-01-09 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13549232#comment-13549232
 ] 

Joe Stein commented on KAFKA-133:
-

<< Just to clarify -- this is marked as resolved, but I don't see any artifacts 
in maven central (search.maven.org). Is there some other repo to which the 
artifacts have been released?

0.8 is not released yet.  once it is the artifacts for 0.8 will be published

> Publish kafka jar to a public maven repository
> --
>
> Key: KAFKA-133
> URL: https://issues.apache.org/jira/browse/KAFKA-133
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.6, 0.8
>Reporter: Neha Narkhede
>  Labels: patch
> Fix For: 0.8
>
> Attachments: KAFKA-133.patch, pom.xml
>
>
> The released kafka jar must be download manually and then deploy to a private 
> repository before they can be used by a developer using maven2.
> Similar to other Apache projects, it will be nice to have a way to publish 
> Kafka releases to a public maven repo. 
> In the past, we gave it a try using sbt publish to Sonatype Nexus maven repo, 
> but ran into some authentication problems. It will be good to revisit this 
> and get it resolved.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: kafka mavenification

2013-01-09 Thread Joe Stein
ok, in progress doing that now.

On Mon, Jan 7, 2013 at 12:24 PM, Jun Rao  wrote:

> Joe,
>
> 0.8 will be released from the 0.8 branch and trunk is for post 0.8. So, you
> will need to commit the maven changes to 0.8 and then merge them to trunk.
>
> Thanks,
>
> Jun
>
> On Sun, Jan 6, 2013 at 10:53 AM, Joe Stein  wrote:
>
> > Ok, I will commit this patch then just to trunk and we can have the 0.8
> > beta mavenized.  That works!
> >
> > /*
> > Joe Stein, Chief Architect
> > http://www.medialets.com
> > Twitter: @allthingshadoop
> > Mobile: 917-597-9771
> > */
> >
> > On Jan 6, 2013, at 1:31 PM, Neha Narkhede 
> wrote:
> >
> > >>> Where do we feel we are with 0.8 ?
> > >
> > > We can release a public beta by the end of the month, but we are still
> > few
> > > weeks away from announcing a release.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Sat, Jan 5, 2013 at 10:50 PM, Joe Stein  wrote:
> > >
> > >> Where do we feel we are with 0.8 ?
> >
>



-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop 
*/


[jira] [Commented] (KAFKA-133) Publish kafka jar to a public maven repository

2013-01-09 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13549250#comment-13549250
 ] 

Joe Stein commented on KAFKA-133:
-

committed to 0.8 branch also

> Publish kafka jar to a public maven repository
> --
>
> Key: KAFKA-133
> URL: https://issues.apache.org/jira/browse/KAFKA-133
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.6, 0.8
>Reporter: Neha Narkhede
>  Labels: patch
> Fix For: 0.8
>
> Attachments: KAFKA-133.patch, pom.xml
>
>
> The released kafka jar must be download manually and then deploy to a private 
> repository before they can be used by a developer using maven2.
> Similar to other Apache projects, it will be nice to have a way to publish 
> Kafka releases to a public maven repo. 
> In the past, we gave it a try using sbt publish to Sonatype Nexus maven repo, 
> but ran into some authentication problems. It will be good to revisit this 
> and get it resolved.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13549276#comment-13549276
 ] 

Jay Kreps commented on KAFKA-691:
-

That would be awesome. If you don't mind just give the proposed set of changes 
on the JIRA first and lets get everyone on board with how it should work since 
it is a reasonably important change (or, if you don't mind revising your patch 
we can start with that).

> Fault tolerance broken with replication factor 1
> 
>
> Key: KAFKA-691
> URL: https://issues.apache.org/jira/browse/KAFKA-691
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Jay Kreps
>
> In 0.7 if a partition was down we would just send the message elsewhere. This 
> meant that the partitioning was really more of a "stickiness" then a hard 
> guarantee. This made it impossible to depend on it for partitioned, stateful 
> processing.
> In 0.8 when running with replication this should not be a problem generally 
> as the partitions are now highly available and fail over to other replicas. 
> However in the case of replication factor = 1 no longer really works for most 
> cases as now a dead broker will give errors for that broker.
> I am not sure of the best fix. Intuitively I think this is something that 
> should be handled by the Partitioner interface. However currently the 
> partitioner has no knowledge of which nodes are available. So you could use a 
> random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: About Kafka 0.8 producer

2013-01-09 Thread 杨涛
Hi,

 Then is to say: for kafka 0.8, when you add a new broker into the
cluster, you need not to change the producer broker setting, is it right?

 

 

Bruce



Re: About Kafka 0.8 producer

2013-01-09 Thread Jay Kreps
Yes, in neither 0.7 or 0.8 should you need to change either the producer,
consumer, or broker config when adding a new broker.

-jay


On Wed, Jan 9, 2013 at 8:18 PM, 杨涛  wrote:

> Hi,
>
>  Then is to say: for kafka 0.8, when you add a new broker into the
> cluster, you need not to change the producer broker setting, is it right?
>
>
>
>
>
> Bruce
>
>


Kafka open source projects

2013-01-09 Thread Jay Kreps
I have been noticing a lot of cool Kafka integrations floating around. I
took some time and went through github and emails and tried to update the
some of the pointers we have. S

Powered By - If you are using Kafka, add yourself here.
  https://cwiki.apache.org/confluence/display/KAFKA/Powered+By

Papers, Presentations, and Blogs - If you have a good write-up or video,
add it here.

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+papers+and+presentations

Clients - If you have a Kafka client, add it here.
  https://cwiki.apache.org/confluence/display/KAFKA/Clients

Integrations - Misc. cool plugins (this was my favorite).
  https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

I don't know if all these things actually work, I haven't tried them. If
you know that any of them work well (or don't) please add appropriate
notes. In any case, I thought in the spirit of open source its best to link
them up and complain when they break :-)

-Jay


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13549373#comment-13549373
 ] 

Jun Rao commented on KAFKA-691:
---

DefaultEventHander.getPartitionListForTopic() returns Seq[PartitionAndLeader]. 
If PartitionAndLeader.leaderBrokerIdOpt is none, the partition is not 
available. 

There is another tricky issue. If a partition is not available, when do we 
refresh the metadata to check if the partition becomes available again? 
Currently, we refresh the metadata if we fail to send the data. However, if we 
always route the messages to available partitions, we may never fail to send. 
One possible solution is that if there is at least one partition not available 
in Seq[PartitionAndLeader], we refresh the metadata if a configurable amount of 
time has passed (e.g., 10 mins).

> Fault tolerance broken with replication factor 1
> 
>
> Key: KAFKA-691
> URL: https://issues.apache.org/jira/browse/KAFKA-691
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Jay Kreps
>
> In 0.7 if a partition was down we would just send the message elsewhere. This 
> meant that the partitioning was really more of a "stickiness" then a hard 
> guarantee. This made it impossible to depend on it for partitioned, stateful 
> processing.
> In 0.8 when running with replication this should not be a problem generally 
> as the partitions are now highly available and fail over to other replicas. 
> However in the case of replication factor = 1 no longer really works for most 
> cases as now a dead broker will give errors for that broker.
> I am not sure of the best fix. Intuitively I think this is something that 
> should be handled by the Partitioner interface. However currently the 
> partitioner has no knowledge of which nodes are available. So you could use a 
> random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira