[jira] [Commented] (KAFKA-1328) Add new consumer APIs

2014-05-13 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13994082#comment-13994082
 ] 

Neha Narkhede commented on KAFKA-1328:
--

Updated reviewboard https://reviews.apache.org/r/19731/
 against branch trunk

> Add new consumer APIs
> -
>
> Key: KAFKA-1328
> URL: https://issues.apache.org/jira/browse/KAFKA-1328
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
> Attachments: KAFKA-1328.patch, KAFKA-1328_2014-04-10_17:13:24.patch, 
> KAFKA-1328_2014-04-10_18:30:48.patch, KAFKA-1328_2014-04-11_10:54:19.patch, 
> KAFKA-1328_2014-04-11_11:16:44.patch, KAFKA-1328_2014-04-12_18:30:22.patch, 
> KAFKA-1328_2014-04-12_19:12:12.patch, KAFKA-1328_2014-05-05_11:35:07.patch, 
> KAFKA-1328_2014-05-05_11:35:41.patch, KAFKA-1328_2014-05-09_17:18:55.patch
>
>
> New consumer API discussion is here - 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201402.mbox/%3CCAOG_4QYBHwyi0xN=hl1fpnrtkvfjzx14ujfntft3nn_mw3+...@mail.gmail.com%3E
> This JIRA includes reviewing and checking in the new consumer APIs



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 21243: Patch for KAFKA-1442

2014-05-13 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21243/#review42551
---

Ship it!


Ship It!

- Neha Narkhede


On May 8, 2014, 10:36 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21243/
> ---
> 
> (Updated May 8, 2014, 10:36 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1442
> https://issues.apache.org/jira/browse/KAFKA-1442
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1442. RBTools post-review is deprecated.
> 
> 
> Diffs
> -
> 
>   kafka-patch-review.py dc6664d22a0b18f96d7d3513a6d1fe739d45c000 
> 
> Diff: https://reviews.apache.org/r/21243/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>



Review Request 21238: Patch for KAFKA-1431

2014-05-13 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21238/
---

Review request for kafka.


Bugs: KAFKA-1431
https://issues.apache.org/jira/browse/KAFKA-1431


Repository: kafka


Description
---

KAFKA-1431. ConsoleConsumer - Option to clean zk consumer path.


Diffs
-

  core/src/main/scala/kafka/consumer/ConsoleConsumer.scala 
0f62819be0562f62c0f778bd20ead053f01a6f2f 

Diff: https://reviews.apache.org/r/21238/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Commented] (KAFKA-1447) Controlled shutdown deadlock when trying to send state updates

2014-05-13 Thread Sam Meder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13996532#comment-13996532
 ] 

Sam Meder commented on KAFKA-1447:
--

0.8.0

> Controlled shutdown deadlock when trying to send state updates
> --
>
> Key: KAFKA-1447
> URL: https://issues.apache.org/jira/browse/KAFKA-1447
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.0
>Reporter: Sam Meder
>Assignee: Neha Narkhede
>
> We're seeing controlled shutdown indefinitely stuck on trying to send out 
> state change messages to the other brokers:
> [2014-05-03 04:01:30,580] INFO [Socket Server on Broker 4], Shutdown 
> completed (kafka.network.SocketServer)
> [2014-05-03 04:01:30,581] INFO [Kafka Request Handler on Broker 4], shutting 
> down (kafka.server.KafkaRequestHandlerPool)
> and stuck on:
> "kafka-request-handler-12" daemon prio=10 tid=0x7f1f04a66800 nid=0x6e79 
> waiting on condition [0x7f1ad5767000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> parking to wait for <0x00078e91dc20> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> at 
> kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
> locked <0x00078e91dc38> (a java.lang.Object)
> at kafka.controller.KafkaController.sendRequest(KafkaController.scala:655)
> at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:298)
> at 
> kafkler.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> at 
> kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:290)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:97)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:269)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:253)
> at scala.Option.foreach(Option.scala:197)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply$mcV$sp(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:252)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:249)
> at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:249)
> locked <0x00078b495af0> (a java.lang.Object)
> at kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:264)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:192)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1445) New Producer should send all partitions that have non-empty batches when on of them is ready

2014-05-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13995240#comment-13995240
 ] 

Guozhang Wang commented on KAFKA-1445:
--

[~jkreps] Could you give this a look?

> New Producer should send all partitions that have non-empty batches when on 
> of them is ready
> 
>
> Key: KAFKA-1445
> URL: https://issues.apache.org/jira/browse/KAFKA-1445
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
> Fix For: 0.9.0
>
> Attachments: KAFKA-1445.patch
>
>
> One difference between the new producer and the old producer is that on the 
> new producer the linger time is per partition, instead of global. Therefore, 
> when the traffic is low, the sender will likely expire partitions one-by-one 
> and send lots of small request containing only a few partitions with a few 
> data, resulting largely increased request rate.
> One solution of it would be to let senders select all partitions that have 
> non-empty batches when on of them is ready.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1449) Extend wire protocol to allow CRC32C

2014-05-13 Thread Albert Strasheim (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Albert Strasheim updated KAFKA-1449:


Description: 
Howdy

We are currently building out a number of Kafka consumers in Go, based on a 
patched version of the Sarama library that Shopify released a while back.

We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
and lots of cores. We have various consumers computing all kinds of aggregates 
on a reasonably high volume access log stream (1.1e6 messages/sec peak, about 
500-600 bytes per message uncompressed).

When profiling our consumer, our single hottest function (until we disabled 
it), was the CRC32 checksum validation, since the deserialization and 
aggregation in these consumers is pretty cheap.

We believe things could be improved by extending the wire protocol to support 
CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
calculation.

https://en.wikipedia.org/wiki/SSE4#SSE4.2

It might be hard to use from Java, but consumers written in most other 
languages will benefit a lot.

To give you an idea, here are some benchmarks for the Go CRC32 functions 
running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:

BenchmarkCrc32KB 90196 ns/op 363.30 MB/s
BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s

I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
CRC32-C speed should be close to what one achieves in Go.

(Met Todd and Clark at the meetup last night. Thanks for the great 
presentation!)

  was:
Howdy

We are currently building out a number of Kafka consumers in Go, based on a 
patched version of the Sarama library that Shopify released a while back.

We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
and lots of cores. We have various consumers computing all kinds of aggregates 
on a reasonably high volume access log stream (1e6 messages/sec peak, about 
500-600 bytes per message uncompressed).

When profiling our consumer, our single hottest function (until we disabled 
it), was the CRC32 checksum validation, since the deserialization and 
aggregation in these consumers is pretty cheap.

We believe things could be improved by extending the wire protocol to support 
CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
calculation.

https://en.wikipedia.org/wiki/SSE4#SSE4.2

It might be hard to use from Java, but consumers written in most other 
languages will benefit a lot.

To give you an idea, here are some benchmarks for the Go CRC32 functions 
running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:

BenchmarkCrc32KB 90196 ns/op 363.30 MB/s
BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s

I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
CRC32-C speed should be close to what one achieves in Go.

(Met Todd and Clark at the meetup last night. Thanks for the great 
presentation!)


> Extend wire protocol to allow CRC32C
> 
>
> Key: KAFKA-1449
> URL: https://issues.apache.org/jira/browse/KAFKA-1449
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Albert Strasheim
>Assignee: Neha Narkhede
> Fix For: 0.9.0
>
>
> Howdy
> We are currently building out a number of Kafka consumers in Go, based on a 
> patched version of the Sarama library that Shopify released a while back.
> We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
> and lots of cores. We have various consumers computing all kinds of 
> aggregates on a reasonably high volume access log stream (1.1e6 messages/sec 
> peak, about 500-600 bytes per message uncompressed).
> When profiling our consumer, our single hottest function (until we disabled 
> it), was the CRC32 checksum validation, since the deserialization and 
> aggregation in these consumers is pretty cheap.
> We believe things could be improved by extending the wire protocol to support 
> CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
> calculation.
> https://en.wikipedia.org/wiki/SSE4#SSE4.2
> It might be hard to use from Java, but consumers written in most other 
> languages will benefit a lot.
> To give you an idea, here are some benchmarks for the Go CRC32 functions 
> running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:
> BenchmarkCrc32KB   90196 ns/op 363.30 MB/s
> BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s
> I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
> CRC32-C speed should be close to what one achieves in Go.
> (Met Todd and Clark at the meetup last night. Thanks for the great 
> presentation!)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 21398: Fix KAFKA-1445 v2

2014-05-13 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21398/
---

(Updated May 13, 2014, 6:25 p.m.)


Review request for kafka.


Bugs: KAFKA-1445
https://issues.apache.org/jira/browse/KAFKA-1445


Repository: kafka


Description (updated)
---

0. Add the partitionsForNode index in Cluster;\n 1. Ready would return a list 
of ready nodes instead of partitions;\n 2. Ready would also check if there is 
any ready partitions with unknown leader, if yes indicate the processReadyNode 
to force metadata refresh;\n 3. Drain would take a list of nodes and drain the 
batches per node until the max request size is reached;\n 4. Collocate would 
not be just tranform batches per node into a producer request;\n 5. 
Corresponding unit test changes; \n 6. One minor compilation warning fix


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
 fbb732a57522109ac0e0eaf5c87b50cbd3a7f767 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 2d7e52d430fa267ee3689a06f8a621ce5dfd1e33 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
f0152fabbdd44e9f1a24291e84c17edf8379f4fc 
  clients/src/main/java/org/apache/kafka/common/Cluster.java 
426bd1eec708979149cbd6fa3959e6f9e73c7e0e 
  clients/src/main/java/org/apache/kafka/common/Node.java 
0e47ff3ff0e055823ec5a5aa4839d25b0fac8374 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 f37ab770b1794830154f9908a0156e7e99b4a458 
  clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java 
1df226606fad29da47d81d0b8ff36209c3536c06 

Diff: https://reviews.apache.org/r/21398/diff/


Testing
---

unit tests


Thanks,

Guozhang Wang



Re: Review Request 21398: Fix KAFKA-1445 v2

2014-05-13 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21398/
---

(Updated May 13, 2014, 6:19 p.m.)


Review request for kafka.


Bugs: KAFKA-1445
https://issues.apache.org/jira/browse/KAFKA-1445


Repository: kafka


Description (updated)
---

0. Add the partitionsForNode index in Cluster;
1. Ready would return a list of ready nodes instead of partitions;
2. Ready would also check if there is any ready partitions with unknown leader, 
if yes indicate the processReadyNode to force metadata refresh;
3. Drain would take a list of nodes and drain the batches per node until the 
max request size is reached;
4. Collocate would not be just transform batches per node into a producer 
request;
5. Corresponding unit test changes; 
6. One minor compilation warning fix


Diffs
-

  
clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
 fbb732a57522109ac0e0eaf5c87b50cbd3a7f767 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 2d7e52d430fa267ee3689a06f8a621ce5dfd1e33 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
f0152fabbdd44e9f1a24291e84c17edf8379f4fc 
  clients/src/main/java/org/apache/kafka/common/Cluster.java 
426bd1eec708979149cbd6fa3959e6f9e73c7e0e 
  clients/src/main/java/org/apache/kafka/common/Node.java 
0e47ff3ff0e055823ec5a5aa4839d25b0fac8374 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 f37ab770b1794830154f9908a0156e7e99b4a458 
  clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java 
1df226606fad29da47d81d0b8ff36209c3536c06 

Diff: https://reviews.apache.org/r/21398/diff/


Testing (updated)
---

unit tests


Thanks,

Guozhang Wang



[jira] [Commented] (KAFKA-1445) New Producer should send all partitions that have non-empty batches when on of them is ready

2014-05-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13996734#comment-13996734
 ] 

Guozhang Wang commented on KAFKA-1445:
--

Updated reviewboard https://reviews.apache.org/r/21398/
 against branch origin/trunk

> New Producer should send all partitions that have non-empty batches when on 
> of them is ready
> 
>
> Key: KAFKA-1445
> URL: https://issues.apache.org/jira/browse/KAFKA-1445
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
> Fix For: 0.9.0
>
> Attachments: KAFKA-1445.patch, KAFKA-1445.patch, 
> KAFKA-1445_2014-05-13_11:25:13.patch
>
>
> One difference between the new producer and the old producer is that on the 
> new producer the linger time is per partition, instead of global. Therefore, 
> when the traffic is low, the sender will likely expire partitions one-by-one 
> and send lots of small request containing only a few partitions with a few 
> data, resulting largely increased request rate.
> One solution of it would be to let senders select all partitions that have 
> non-empty batches when on of them is ready.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1447) Controlled shutdown deadlock when trying to send state updates

2014-05-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13996503#comment-13996503
 ] 

Guozhang Wang commented on KAFKA-1447:
--

Hi Sam, which version are you using?

> Controlled shutdown deadlock when trying to send state updates
> --
>
> Key: KAFKA-1447
> URL: https://issues.apache.org/jira/browse/KAFKA-1447
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.0
>Reporter: Sam Meder
>Assignee: Neha Narkhede
>
> We're seeing controlled shutdown indefinitely stuck on trying to send out 
> state change messages to the other brokers:
> [2014-05-03 04:01:30,580] INFO [Socket Server on Broker 4], Shutdown 
> completed (kafka.network.SocketServer)
> [2014-05-03 04:01:30,581] INFO [Kafka Request Handler on Broker 4], shutting 
> down (kafka.server.KafkaRequestHandlerPool)
> and stuck on:
> "kafka-request-handler-12" daemon prio=10 tid=0x7f1f04a66800 nid=0x6e79 
> waiting on condition [0x7f1ad5767000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> parking to wait for <0x00078e91dc20> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> at 
> kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
> locked <0x00078e91dc38> (a java.lang.Object)
> at kafka.controller.KafkaController.sendRequest(KafkaController.scala:655)
> at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:298)
> at 
> kafkler.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> at 
> kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:290)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:97)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:269)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:253)
> at scala.Option.foreach(Option.scala:197)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply$mcV$sp(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:252)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:249)
> at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:249)
> locked <0x00078b495af0> (a java.lang.Object)
> at kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:264)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:192)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1445) New Producer should send all partitions that have non-empty batches when on of them is ready

2014-05-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13996725#comment-13996725
 ] 

Guozhang Wang commented on KAFKA-1445:
--

Created reviewboard https://reviews.apache.org/r/21398/
 against branch origin/trunk

> New Producer should send all partitions that have non-empty batches when on 
> of them is ready
> 
>
> Key: KAFKA-1445
> URL: https://issues.apache.org/jira/browse/KAFKA-1445
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
> Fix For: 0.9.0
>
> Attachments: KAFKA-1445.patch, KAFKA-1445.patch
>
>
> One difference between the new producer and the old producer is that on the 
> new producer the linger time is per partition, instead of global. Therefore, 
> when the traffic is low, the sender will likely expire partitions one-by-one 
> and send lots of small request containing only a few partitions with a few 
> data, resulting largely increased request rate.
> One solution of it would be to let senders select all partitions that have 
> non-empty batches when on of them is ready.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 21398: Fix KAFKA-1445 v2

2014-05-13 Thread Timothy Chen

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21398/#review42869
---



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java


Should we only call forceUpdate once if we get multiple unknown nodes?


- Timothy Chen


On May 13, 2014, 6:25 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21398/
> ---
> 
> (Updated May 13, 2014, 6:25 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1445
> https://issues.apache.org/jira/browse/KAFKA-1445
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> 0. Add the partitionsForNode index in Cluster;\n 1. Ready would return a list 
> of ready nodes instead of partitions;\n 2. Ready would also check if there is 
> any ready partitions with unknown leader, if yes indicate the 
> processReadyNode to force metadata refresh;\n 3. Drain would take a list of 
> nodes and drain the batches per node until the max request size is reached;\n 
> 4. Collocate would not be just tranform batches per node into a producer 
> request;\n 5. Corresponding unit test changes; \n 6. One minor compilation 
> warning fix
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
>  fbb732a57522109ac0e0eaf5c87b50cbd3a7f767 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  2d7e52d430fa267ee3689a06f8a621ce5dfd1e33 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> f0152fabbdd44e9f1a24291e84c17edf8379f4fc 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> 426bd1eec708979149cbd6fa3959e6f9e73c7e0e 
>   clients/src/main/java/org/apache/kafka/common/Node.java 
> 0e47ff3ff0e055823ec5a5aa4839d25b0fac8374 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  f37ab770b1794830154f9908a0156e7e99b4a458 
>   
> clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java 
> 1df226606fad29da47d81d0b8ff36209c3536c06 
> 
> Diff: https://reviews.apache.org/r/21398/diff/
> 
> 
> Testing
> ---
> 
> unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Review Request 21398: Fix KAFKA-1445 v2

2014-05-13 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21398/
---

Review request for kafka.


Bugs: KAFKA-1445
https://issues.apache.org/jira/browse/KAFKA-1445


Repository: kafka


Description
---

0. Add the partitionsForNode index in Cluster;\n 1. Ready would return a list 
of ready nodes instead of partitions;\n 2. Ready would also check if there is 
any ready partitions with unknown leader, if yes indicate the processReadyNode 
to force metadata refresh;\n 3. Drain would take a list of nodes and drain the 
batches per node until the max request size is reached;\n 4. Collocate would 
not be just tranform batches per node into a producer request;\n 5. 
Corresponding unit test changes; \n 6. One minor compilation warning fix


Diffs
-

  
clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
 fbb732a57522109ac0e0eaf5c87b50cbd3a7f767 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 2d7e52d430fa267ee3689a06f8a621ce5dfd1e33 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
f0152fabbdd44e9f1a24291e84c17edf8379f4fc 
  clients/src/main/java/org/apache/kafka/common/Cluster.java 
426bd1eec708979149cbd6fa3959e6f9e73c7e0e 
  clients/src/main/java/org/apache/kafka/common/Node.java 
0e47ff3ff0e055823ec5a5aa4839d25b0fac8374 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 f37ab770b1794830154f9908a0156e7e99b4a458 
  clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java 
1df226606fad29da47d81d0b8ff36209c3536c06 

Diff: https://reviews.apache.org/r/21398/diff/


Testing
---


Thanks,

Guozhang Wang



[jira] [Created] (KAFKA-1449) Extend wire protocol to allow CRC32C

2014-05-13 Thread Albert Strasheim (JIRA)
Albert Strasheim created KAFKA-1449:
---

 Summary: Extend wire protocol to allow CRC32C
 Key: KAFKA-1449
 URL: https://issues.apache.org/jira/browse/KAFKA-1449
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Albert Strasheim
Assignee: Neha Narkhede
 Fix For: 0.9.0


Howdy

We are currently building out a number of Kafka consumers in Go, based on a 
patched version of the Sarama library that Shopify released a while back.

We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
and lots of cores. We have various consumers computing all kinds of aggregates 
on a reasonably high volume access log stream (1e6 messages/sec peak, about 
500-600 bytes per message uncompressed).

When profiling our consumer, our single hottest function (until we disabled 
it), was the CRC32 checksum validation, since the deserialization and 
aggregation in these consumers is pretty cheap.

We believe things could be improved by extending the wire protocol to support 
CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
calculation.

https://en.wikipedia.org/wiki/SSE4#SSE4.2

It might be hard to use from Java, but consumers written in most other 
languages will benefit a lot.

To give you an idea, here are some benchmarks for the Go CRC32 functions 
running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:

BenchmarkCrc32KB 90196 ns/op 363.30 MB/s
BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s

I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
CRC32-C speed should be close to what one achieves in Go.

(Met Todd and Clark at the meetup last night. Thanks for the great 
presentation!)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1431) ConsoleConsumer - Option to clean zk consumer path

2014-05-13 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13993146#comment-13993146
 ] 

Sriharsha Chintalapani commented on KAFKA-1431:
---

Created reviewboard  against branch origin/trunk

> ConsoleConsumer - Option to clean zk consumer path
> --
>
> Key: KAFKA-1431
> URL: https://issues.apache.org/jira/browse/KAFKA-1431
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1
> Environment: All
>Reporter: Jeremy A Laycock
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-1431.patch
>
>
> Raised in response to KAFKA-1426. Currently option "from-beginning" auto 
> deletes the zk consumer path. This is confusing and un-expected behaviour. 
> Suggest a separate option to clean the console consumer path.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Build failed in Jenkins: Kafka-trunk #184

2014-05-13 Thread Apache Jenkins Server
See 

Changes:

[neha.narkhede] KAFKA-1432 followup - Fixing the shutdown sequence furthermore; 
reviewed by Neha Narkhede

[junrao] kafka-1432; followup patch to enable new producer in system test;  
patched by Guozhang Wang; reviewed by Neha Narkhede, Jun Rao

--
[...truncated 581 lines...]
:perf:compileScala
:perf:processResources UP-TO-DATE
:perf:classes
:perf:compileTestJava UP-TO-DATE
:perf:compileTestScala UP-TO-DATE
:perf:processTestResources UP-TO-DATE
:perf:testClasses UP-TO-DATE
:perf:test
:contrib:hadoop-consumer:compileJavaNote: Some input files use or override a 
deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: 

 uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:contrib:hadoop-consumer:processResources UP-TO-DATE
:contrib:hadoop-consumer:classes
:contrib:hadoop-consumer:compileTestJava UP-TO-DATE
:contrib:hadoop-consumer:processTestResources UP-TO-DATE
:contrib:hadoop-consumer:testClasses UP-TO-DATE
:contrib:hadoop-consumer:test
:contrib:hadoop-producer:compileJava
:contrib:hadoop-producer:processResources UP-TO-DATE
:contrib:hadoop-producer:classes
:contrib:hadoop-producer:compileTestJava UP-TO-DATE
:contrib:hadoop-producer:processTestResources UP-TO-DATE
:contrib:hadoop-producer:testClasses UP-TO-DATE
:contrib:hadoop-producer:test

BUILD SUCCESSFUL

Total time: 5 mins 56.861 secs
[Kafka-trunk] $ /bin/bash -xe /tmp/hudson9098167881436304610.sh
+ ./gradlew -PscalaVersion=2.8.0 test
The TaskContainer.add() method has been deprecated and is scheduled to be 
removed in Gradle 2.0. Please use the create() method instead.
Building project 'core' with Scala version 2.8.0
Building project 'perf' with Scala version 2.8.0
:clients:compileJava UP-TO-DATE
:clients:processResources UP-TO-DATE
:clients:classes UP-TO-DATE
:clients:compileTestJava UP-TO-DATE
:clients:processTestResources UP-TO-DATE
:clients:testClasses UP-TO-DATE
:clients:test UP-TO-DATE
:contrib:compileJava UP-TO-DATE
:contrib:processResources UP-TO-DATE
:contrib:classes UP-TO-DATE
:contrib:compileTestJava UP-TO-DATE
:contrib:processTestResources UP-TO-DATE
:contrib:testClasses UP-TO-DATE
:contrib:test UP-TO-DATE
:clients:jar UP-TO-DATE
:core:compileJava UP-TO-DATE
:core:compileScala
:253:
 non variable type-argument String in type pattern 
scala.collection.Map[String,_] is unchecked since it is eliminated by erasure
case Some(map: Map[String, _]) => 
   ^
:256:
 non variable type-argument String in type pattern 
scala.collection.Map[String,String] is unchecked since it is eliminated by 
erasure
case Some(config: Map[String, String]) =>
  ^
:66:
 non variable type-argument String in type pattern (String, Int) is unchecked 
since it is eliminated by erasure
for ((key:(String, Int), value) <- responseMap) {
  ^
:368:
 non variable type-argument V in type pattern List[V] is unchecked since it is 
eliminated by erasure
case Some(l: List[V]) => m.put(k, v :: l)
 ^
four warnings found
:core:processResources UP-TO-DATE
:core:classes
:core:compileTestJava UP-TO-DATE
:core:compileTestScala
:core:processTestResources UP-TO-DATE
:core:testClasses
:core:test

unit.kafka.common.ConfigTest > testInvalidClientIds PASSED

unit.kafka.common.ConfigTest > testInvalidGroupIds PASSED

unit.kafka.common.TopicTest > testInvalidTopicNames PASSED

kafka.message.MessageTest > testFieldValues PASSED

kafka.message.MessageTest > testChecksum PASSED

kafka.message.MessageTest > testEquality PASSED

kafka.message.MessageTest > testIsHashable PASSED

kafka.message.MessageCompressionTest > testSimpleCompressDecompress PASSED

kafka.message.MessageCompressionTest > testComplexCompressDecompress PASSED

kafka.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.message.ByteBufferMessageSetTest > testWriteTo PASSED

kafka.message.ByteBufferMessageSetTest > testValidBytes PASSED

kafka.message.ByteBufferMessageSetTest > testValidBytesWithCompression PASSED

kafka.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.message.ByteBufferMessageSetTest > testIterator PASSED

kafka.message.ByteBufferMessageSetTest > testOffsetAssignment PASSED

kafka.zk.ZKEphe

[jira] [Commented] (KAFKA-1447) Controlled shutdown deadlock when trying to send state updates

2014-05-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13996876#comment-13996876
 ] 

Guozhang Wang commented on KAFKA-1447:
--

This may be due a known issue in 0.8.0, could you try with 0.8.1.1?

> Controlled shutdown deadlock when trying to send state updates
> --
>
> Key: KAFKA-1447
> URL: https://issues.apache.org/jira/browse/KAFKA-1447
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.0
>Reporter: Sam Meder
>Assignee: Neha Narkhede
>
> We're seeing controlled shutdown indefinitely stuck on trying to send out 
> state change messages to the other brokers:
> [2014-05-03 04:01:30,580] INFO [Socket Server on Broker 4], Shutdown 
> completed (kafka.network.SocketServer)
> [2014-05-03 04:01:30,581] INFO [Kafka Request Handler on Broker 4], shutting 
> down (kafka.server.KafkaRequestHandlerPool)
> and stuck on:
> "kafka-request-handler-12" daemon prio=10 tid=0x7f1f04a66800 nid=0x6e79 
> waiting on condition [0x7f1ad5767000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> parking to wait for <0x00078e91dc20> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> at 
> kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
> locked <0x00078e91dc38> (a java.lang.Object)
> at kafka.controller.KafkaController.sendRequest(KafkaController.scala:655)
> at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:298)
> at 
> kafkler.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> at 
> kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:290)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:97)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:269)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:253)
> at scala.Option.foreach(Option.scala:197)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply$mcV$sp(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:252)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:249)
> at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:249)
> locked <0x00078b495af0> (a java.lang.Object)
> at kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:264)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:192)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Review Request 21406: Fix KAFKA-1396.v3

2014-05-13 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21406/
---

Review request for kafka.


Bugs: KAFKA-1396
https://issues.apache.org/jira/browse/KAFKA-1396


Repository: kafka


Description
---

1. Let waitUntilMetadataIsPropagated return the leader id; 2. Check if the 
producer did not encounter any exceptions during the test and after the 
scheduler is shutdown. 3. Reduce the test iteration to 2


Diffs
-

  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
a993e8c7531abca6b82d1718ac57ba6a295c1fc7 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
af11a4983599a20a33b6f1fc05738a59abfede63 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
034f36130a1d0669dda5ed53e9cee328806c811e 

Diff: https://reviews.apache.org/r/21406/diff/


Testing
---


Thanks,

Guozhang Wang



kafka performance question

2014-05-13 Thread Zhujie (zhujie, Smartcare)
our version is kafka_2.10-0.8.1

发件人: Zhujie (zhujie, Smartcare)
发送时间: 2014年5月14日 8:56
收件人: 'us...@kafka.apache.org'; 'dev@kafka.apache.org'
主题: kafka performance question

Dear all,

We want to use kafka to collect and dispatch data file, but the performance is 
maybe lower than we want.

In our cluster,there is a provider and a broker. We use a one thread read file 
from local disk of provider and send it to broker. The average throughput is 
only 3 MB/S~4MB/S.
But if we just use java NIO API to send file ,the throughput can exceed 200MB/S.
Why the kafka performance is so bad in our test, are we missing something??



Our server:
Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4
Mem:300G
Disk:600G 15K RPM SAS*8

Configuration of provider:
props.put("serializer.class", "kafka.serializer.NullEncoder");
props.put("metadata.broker.list", "169.10.35.57:9092");
props.put("request.required.acks", "0");
props.put("producer.type", "async");//异步
props.put("queue.buffering.max.ms","500");
props.put("queue.buffering.max.messages","10");
props.put("batch.num.messages", "1200");
props.put("queue.enqueue.timeout.ms", "-1");
props.put("send.buffer.bytes", "10240");

Configuration of broker:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

# Server Basics #

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# Socket Server Settings 
#

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it 
uses the
# value for "host.name" if configured.  Otherwise, it will use the value 
returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=

# The number of threads handling network requests
#num.network.threads=2

# The number of threads doing disk I/O
#num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
#socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
#socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection 
against OOM)
#socket.request.max.bytes=104857600


# Log Basics #

# A comma seperated list of directories under which to store log files
log.dirs=/data/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#num.partitions=2

# Log Flush Policy #

# Messages are immediately written to the filesystem but by default we only 
fsync() to sync
# the OS cache lazily. The following configurations control the flush of data 
to disk.
# There are a few important trade-offs here:
#1. Durability: Unflushed data may be lost if you are not using replication.
#2. Latency: Very large flush intervals may lead to latency spikes when the 
flush does occur as there will be a lot of data to flush.
#3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data 
after a period of time or
# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=1

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

# Log Retention Policy #

# The following configurations control the disposal of log segments. The policy 
can
# be set to delete segments after a period of time, or after a given size has 
a

Re: Review Request 21398: Fix KAFKA-1445 v2

2014-05-13 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21398/#review42921
---


Looks good to me. Some minor comments.


clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java


Could the two loops be merged into a single loop?



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java


Need to fix the comment.



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java


Could we name this generateProduceRequests?



clients/src/main/java/org/apache/kafka/common/Cluster.java


handle them out => hand them out



clients/src/main/java/org/apache/kafka/common/Cluster.java


Could we just use Utils.notNull()?


- Jun Rao


On May 13, 2014, 6:25 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21398/
> ---
> 
> (Updated May 13, 2014, 6:25 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1445
> https://issues.apache.org/jira/browse/KAFKA-1445
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> 0. Add the partitionsForNode index in Cluster;\n 1. Ready would return a list 
> of ready nodes instead of partitions;\n 2. Ready would also check if there is 
> any ready partitions with unknown leader, if yes indicate the 
> processReadyNode to force metadata refresh;\n 3. Drain would take a list of 
> nodes and drain the batches per node until the max request size is reached;\n 
> 4. Collocate would not be just tranform batches per node into a producer 
> request;\n 5. Corresponding unit test changes; \n 6. One minor compilation 
> warning fix
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
>  fbb732a57522109ac0e0eaf5c87b50cbd3a7f767 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  2d7e52d430fa267ee3689a06f8a621ce5dfd1e33 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> f0152fabbdd44e9f1a24291e84c17edf8379f4fc 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> 426bd1eec708979149cbd6fa3959e6f9e73c7e0e 
>   clients/src/main/java/org/apache/kafka/common/Node.java 
> 0e47ff3ff0e055823ec5a5aa4839d25b0fac8374 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  f37ab770b1794830154f9908a0156e7e99b4a458 
>   
> clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java 
> 1df226606fad29da47d81d0b8ff36209c3536c06 
> 
> Diff: https://reviews.apache.org/r/21398/diff/
> 
> 
> Testing
> ---
> 
> unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[jira] [Resolved] (KAFKA-1396) fix transient unit test ProducerFailureHandlingTest.testBrokerFailure

2014-05-13 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-1396.


   Resolution: Fixed
Fix Version/s: 0.8.2

Thanks for the patch. +1 and committed to trunk.

> fix transient unit test ProducerFailureHandlingTest.testBrokerFailure
> -
>
> Key: KAFKA-1396
> URL: https://issues.apache.org/jira/browse/KAFKA-1396
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2
>Reporter: Jun Rao
>Assignee: Guozhang Wang
> Fix For: 0.8.2
>
> Attachments: KAFKA-1396.patch, KAFKA-1396.patch, 
> KAFKA-1396_2014-05-07_14:55:09.patch, KAFKA-1396_2014-05-08_15:57:11.patch, 
> KAFKA-1396_2014-05-09_10:35:07.patch
>
>
> Currently disabled after kafka-1390.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


How can I step through the Kafka code using a debugger

2014-05-13 Thread Sheng Wang
Hi,

I want to learn more about the Kafka code base. One of the easiest ways
that I can think of is to walk through the code with a debugger. Could
anyone tell how I can do that? Can I do it using any IDE?

Thanks!

-Sheng