[jira] [Updated] (KAFKA-1215) Rack-Aware replica assignment option

2014-01-23 Thread Joris Van Remoortere (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joris Van Remoortere updated KAFKA-1215:


Attachment: rack_aware_replica_assignment_v1.patch

> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1215
> URL: https://issues.apache.org/jira/browse/KAFKA-1215
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.0, 0.8.1
>Reporter: Joris Van Remoortere
>Assignee: Neha Narkhede
> Fix For: 0.8.0, 0.8.1
>
> Attachments: rack_aware_replica_assignment_v1.patch
>
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preffered).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Deleted] (KAFKA-1216) Rack-Aware replica assignment option

2014-01-23 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein deleted KAFKA-1216:
-


> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1216
> URL: https://issues.apache.org/jira/browse/KAFKA-1216
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joris Van Remoortere
>Assignee: Neha Narkhede
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preffered).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Assigned] (KAFKA-1217) Rack-Aware replica assignment option

2014-01-23 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein reassigned KAFKA-1217:


Assignee: Joe Stein  (was: Neha Narkhede)

> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1217
> URL: https://issues.apache.org/jira/browse/KAFKA-1217
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.0, 0.8.1
>Reporter: Joris Van Remoortere
>Assignee: Joe Stein
> Fix For: 0.8.0, 0.8.1
>
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preffered).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Deleted] (KAFKA-1217) Rack-Aware replica assignment option

2014-01-23 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein deleted KAFKA-1217:
-


> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1217
> URL: https://issues.apache.org/jira/browse/KAFKA-1217
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joris Van Remoortere
>Assignee: Joe Stein
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preffered).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Deleted] (KAFKA-1218) Rack-Aware replica assignment option

2014-01-23 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein deleted KAFKA-1218:
-


> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1218
> URL: https://issues.apache.org/jira/browse/KAFKA-1218
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joris Van Remoortere
>Assignee: Neha Narkhede
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preffered).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Deleted] (KAFKA-1219) Rack-Aware replica assignment option

2014-01-23 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein deleted KAFKA-1219:
-


> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1219
> URL: https://issues.apache.org/jira/browse/KAFKA-1219
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joris Van Remoortere
>Assignee: Neha Narkhede
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preffered).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Deleted] (KAFKA-1225) Rack-Aware replica assignment option

2014-01-23 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein deleted KAFKA-1225:
-


> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1225
> URL: https://issues.apache.org/jira/browse/KAFKA-1225
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joris Van Remoortere
>Assignee: Neha Narkhede
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preferred).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Deleted] (KAFKA-1223) Rack-Aware replica assignment option

2014-01-23 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein deleted KAFKA-1223:
-


> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1223
> URL: https://issues.apache.org/jira/browse/KAFKA-1223
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joris Van Remoortere
>Assignee: Neha Narkhede
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preferred).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Deleted] (KAFKA-1224) Rack-Aware replica assignment option

2014-01-23 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein deleted KAFKA-1224:
-


> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1224
> URL: https://issues.apache.org/jira/browse/KAFKA-1224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joris Van Remoortere
>Assignee: Neha Narkhede
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preferred).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Deleted] (KAFKA-1220) Rack-Aware replica assignment option

2014-01-23 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein deleted KAFKA-1220:
-


> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1220
> URL: https://issues.apache.org/jira/browse/KAFKA-1220
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joris Van Remoortere
>Assignee: Neha Narkhede
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preferred).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Deleted] (KAFKA-1222) Rack-Aware replica assignment option

2014-01-23 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein deleted KAFKA-1222:
-


> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1222
> URL: https://issues.apache.org/jira/browse/KAFKA-1222
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joris Van Remoortere
>Assignee: Neha Narkhede
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preferred).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Deleted] (KAFKA-1221) Rack-Aware replica assignment option

2014-01-23 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein deleted KAFKA-1221:
-


> Rack-Aware replica assignment option
> 
>
> Key: KAFKA-1221
> URL: https://issues.apache.org/jira/browse/KAFKA-1221
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joris Van Remoortere
>Assignee: Neha Narkhede
>
> Adding a rack-id to kafka config. This rack-id can be used during replica 
> assignment by using the max-rack-replication argument in the admin scripts 
> (create topic, etc.). By default the original replication assignment 
> algorithm is used because max-rack-replication defaults to -1. 
> max-rack-replication > -1 is not honored if you are doing manual replica 
> assignment (preferred).
> If this looks good I can add some test cases specific to the rack-aware 
> assignment.
> I can also port this to trunk. We are currently running 0.8.0 in production 
> and need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


Re: Review Request 17006: Keep track of local pid and kill it on stopping all entities as discussed with John

2014-01-23 Thread John Fung

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17006/#review32632
---


There will be only 1 producer_performance running per entity_id. Therefore, 
entity_id can be used as a key in the dictionary to keep track of multiple 
running producer_performance threads:

+os.tcaseEnv.producerHostParentPidDict[entityId] = os.getpid()

- John Fung


On Jan. 22, 2014, 7:51 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17006/
> ---
> 
> (Updated Jan. 22, 2014, 7:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1212
> https://issues.apache.org/jira/browse/KAFKA-1212
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1212.v2
> 
> 
> Diffs
> -
> 
>   system_test/utils/kafka_system_test_utils.py 
> fb4a9c05bf6f39a7abf41126325ed5ca26bcc246 
> 
> Diff: https://reviews.apache.org/r/17006/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[jira] [Created] (KAFKA-1226) Rack-Aware replica assignment option

2014-01-23 Thread Joris Van Remoortere (JIRA)
Joris Van Remoortere created KAFKA-1226:
---

 Summary: Rack-Aware replica assignment option
 Key: KAFKA-1226
 URL: https://issues.apache.org/jira/browse/KAFKA-1226
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.0, 0.8.1
Reporter: Joris Van Remoortere
Assignee: Neha Narkhede
 Fix For: 0.8.0


Adding a rack-id to kafka config. This rack-id can be used during replica 
assignment by using the max-rack-replication argument in the admin scripts 
(create topic, etc.). By default the original replication assignment algorithm 
is used because max-rack-replication defaults to -1. max-rack-replication > -1 
is not honored if you are doing manual replica assignment (preffered). 

If this looks good I can add some test cases specific to the rack-aware 
assignment. 

I can also port this to trunk. We are currently running 0.8.0 in production and 
need this, so i wrote the patch against that.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


Review Request 17248: Patch for KAFKA-1226

2014-01-23 Thread Joris Van Remoortere

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17248/
---

Review request for kafka.


Summary (updated)
-

Patch for KAFKA-1226


Bugs: KAFKA-1226
https://issues.apache.org/jira/browse/KAFKA-1226


Repository: kafka


Description (updated)
---

KAFKA-1226


Diffs
-

  core/src/main/scala/kafka/admin/AddPartitionsCommand.scala 7f03708 
  core/src/main/scala/kafka/admin/AdminUtils.scala d6ab275 
  core/src/main/scala/kafka/admin/CreateTopicCommand.scala 84c2095 
  core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 2f706c9 
  core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b 
  core/src/main/scala/kafka/cluster/Broker.scala 9407ed2 
  core/src/main/scala/kafka/server/KafkaConfig.scala 41c9626 
  core/src/main/scala/kafka/server/KafkaZooKeeper.scala 553640f 
  core/src/main/scala/kafka/utils/ZkUtils.scala c21bc60 
  core/src/test/scala/other/kafka/TestLogPerformance.scala 9f3bb40 
  core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 2436289 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala a480881 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
f43ac8f 
  core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 1ee34b9 
  core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala 2317760 
  core/src/test/scala/unit/kafka/integration/FetcherTest.scala c5cddea 
  core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala c3c7631 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala f764151 
  core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala 26e9bd6 
  core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala edf8555 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala ce893bf 
  core/src/test/scala/unit/kafka/log/LogOffsetTest.scala 1a9cc01 
  core/src/test/scala/unit/kafka/log/LogTest.scala df90695 
  core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala 67497dd 
  core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 69c88c7 
  core/src/test/scala/unit/kafka/producer/ProducerTest.scala 2cabfbb 
  core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 70e4b51 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 947e795 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala a06cfff 

Diff: https://reviews.apache.org/r/17248/diff/


Testing
---


File Attachments (updated)


rack_aware_replica_assignment_v1.patch
  
https://reviews.apache.org/media/uploaded/files/2014/01/23/394cef99-f800-4d94-bc59-fdb6c68b53f5__rack_aware_replica_assignment_v1.patch


Thanks,

Joris Van Remoortere



Re: Review Request 17248: Patch for KAFKA-1226

2014-01-23 Thread Joris Van Remoortere

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17248/
---

(Updated Jan. 23, 2014, 6:17 p.m.)


Review request for kafka.


Bugs: KAFKA-1226
https://issues.apache.org/jira/browse/KAFKA-1226


Repository: kafka


Description
---

KAFKA-1226


Diffs (updated)
-

  core/src/main/scala/kafka/admin/AddPartitionsCommand.scala 7f03708 
  core/src/main/scala/kafka/admin/AdminUtils.scala d6ab275 
  core/src/main/scala/kafka/admin/CreateTopicCommand.scala 84c2095 
  core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 2f706c9 
  core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b 
  core/src/main/scala/kafka/cluster/Broker.scala 9407ed2 
  core/src/main/scala/kafka/server/KafkaConfig.scala 41c9626 
  core/src/main/scala/kafka/server/KafkaZooKeeper.scala 553640f 
  core/src/main/scala/kafka/utils/ZkUtils.scala c21bc60 
  core/src/test/scala/other/kafka/TestLogPerformance.scala 9f3bb40 
  core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 2436289 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala a480881 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
f43ac8f 
  core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 1ee34b9 
  core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala 2317760 
  core/src/test/scala/unit/kafka/integration/FetcherTest.scala c5cddea 
  core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala c3c7631 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala f764151 
  core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala 26e9bd6 
  core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala edf8555 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala ce893bf 
  core/src/test/scala/unit/kafka/log/LogOffsetTest.scala 1a9cc01 
  core/src/test/scala/unit/kafka/log/LogTest.scala df90695 
  core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala 67497dd 
  core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 69c88c7 
  core/src/test/scala/unit/kafka/producer/ProducerTest.scala 2cabfbb 
  core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 70e4b51 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 947e795 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala a06cfff 

Diff: https://reviews.apache.org/r/17248/diff/


Testing
---


File Attachments


rack_aware_replica_assignment_v1.patch
  
https://reviews.apache.org/media/uploaded/files/2014/01/23/394cef99-f800-4d94-bc59-fdb6c68b53f5__rack_aware_replica_assignment_v1.patch


Thanks,

Joris Van Remoortere



producer rewrite

2014-01-23 Thread Jay Kreps
Hey all,

I have been working on a rewrite of the producer as described in the wiki
below and discussed in a few previous threads:
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite

My code is still has some bugs and is a bit rough in parts, but it
functions in the basic cases. I did some basic performance tests over
localhost, and the new approach has paid off quite significantly--for small
(10 byte) messages a single thread on my laptop can send over 1m
messages/second, and with larger messages easily maxes out the server.

The difference between "sync" and "async" largely producer disappears--all
requests immediately return a future response which can be used to get the
behavior of either sync or async usage and we batch whenever the producer
is under load using a "group commit"-like approach. You can encourage
additional batching by incurring a small amount of latency (as before).

Let's talk about how to integrate this code.

This is a from-scratch rewrite of the producer code. As such it is a pretty
major change. So far I have mostly been working on my own. I'd like to
start getting feedback before I get too far along--no point in my polishing
things that are going to be significantly revised in review, after all.

As such here is what I would propose:

1. I'll put up a preliminary patch. Since this code is a completely
standalone module it will not destabilize the existing server or existing
producer (in fact there is no change to those). I will avoid including
build support in this patch until we get the gradle stuff worked out so as
to not break that patch (hopefully that moves along). Let's take this patch
"as is" but with no expectation that the code is complete or that checkin
implies everyone agrees with every design decision. I will follow-up with
subsequent patches as we do reviews and discussions.

2. I'll send out a few higher-level topics for discussion threads. Let's
get to consensus on these. I think micro-reviewing minor correctness issues
won't be productive until we make higher level decisions. The topics. I'd
like to discuss include
a. The producer code:
 - The public API
 - The configurations: their names, and the general knobs we are
 - Client message serialization
 - The instrumentation to have
 - The blocking and batching behavior
b. The common code and few other cross-cutting policy things
 - The approach to protocol definition and request serialization
 - The config definition helper code
 - The metrics package
 - The project layout
 - The java coding style and the use of java
 - The approach to logging

This is somewhat backwards, but I think it will be easier to handle changes
that fall out of these discussions against an existing code base that is
checked in otherwise each revision will be a brand new very large patch.

If no objections I will toss up this code and kick off some of these
discussions.

-Jay


[jira] Subscription: outstanding kafka patches

2014-01-23 Thread jira
Issue Subscription
Filter: outstanding kafka patches (84 issues)
The list of outstanding kafka patches
Subscriber: kafka-mailing-list

Key Summary
KAFKA-1215  Rack-Aware replica assignment option
https://issues.apache.org/jira/browse/KAFKA-1215
KAFKA-1214  Support arguments to zookeeper-shell.sh script
https://issues.apache.org/jira/browse/KAFKA-1214
KAFKA-1212  System test exception handling does not stop background producer 
threads
https://issues.apache.org/jira/browse/KAFKA-1212
KAFKA-1210  Windows Bat files are not working properly
https://issues.apache.org/jira/browse/KAFKA-1210
KAFKA-1207  Launch Kafka from within Apache Mesos
https://issues.apache.org/jira/browse/KAFKA-1207
KAFKA-1206  allow Kafka to start from a resource negotiator system
https://issues.apache.org/jira/browse/KAFKA-1206
KAFKA-1194  The kafka broker cannot delete the old log files after the 
configured time
https://issues.apache.org/jira/browse/KAFKA-1194
KAFKA-1190  create a draw performance graph script
https://issues.apache.org/jira/browse/KAFKA-1190
KAFKA-1188  Stale LeaderAndIsr request could be handled by the broker on 
Controller failover
https://issues.apache.org/jira/browse/KAFKA-1188
KAFKA-1180  WhiteList topic filter gets a NullPointerException on complex Regex
https://issues.apache.org/jira/browse/KAFKA-1180
KAFKA-1173  Using Vagrant to get up and running with Apache Kafka
https://issues.apache.org/jira/browse/KAFKA-1173
KAFKA-1171  Gradle build for Kafka
https://issues.apache.org/jira/browse/KAFKA-1171
KAFKA-1147  Consumer socket timeout should be greater than fetch max wait
https://issues.apache.org/jira/browse/KAFKA-1147
KAFKA-1145  Broker fail to sync after restart
https://issues.apache.org/jira/browse/KAFKA-1145
KAFKA-1144  commitOffsets can be passed the offsets to commit
https://issues.apache.org/jira/browse/KAFKA-1144
KAFKA-1130  "log.dirs" is a confusing property name
https://issues.apache.org/jira/browse/KAFKA-1130
KAFKA-1116  Need to upgrade sbt-assembly to compile on scala 2.10.2
https://issues.apache.org/jira/browse/KAFKA-1116
KAFKA-1109  Need to fix GC log configuration code, not able to override 
KAFKA_GC_LOG_OPTS
https://issues.apache.org/jira/browse/KAFKA-1109
KAFKA-1106  HighwaterMarkCheckpoint failure puting broker into a bad state
https://issues.apache.org/jira/browse/KAFKA-1106
KAFKA-1093  Log.getOffsetsBefore(t, …) does not return the last confirmed 
offset before t
https://issues.apache.org/jira/browse/KAFKA-1093
KAFKA-1086  Improve GetOffsetShell to find metadata automatically
https://issues.apache.org/jira/browse/KAFKA-1086
KAFKA-1082  zkclient dies after UnknownHostException in zk reconnect
https://issues.apache.org/jira/browse/KAFKA-1082
KAFKA-1079  Liars in PrimitiveApiTest that promise to test api in compression 
mode, but don't do this actually
https://issues.apache.org/jira/browse/KAFKA-1079
KAFKA-1055  BrokerTopicStats is updated before checking for MessageSizeTooLarge
https://issues.apache.org/jira/browse/KAFKA-1055
KAFKA-1049  Encoder implementations are required to provide an undocumented 
constructor.
https://issues.apache.org/jira/browse/KAFKA-1049
KAFKA-1032  Messages sent to the old leader will be lost on broker GC resulted 
failure
https://issues.apache.org/jira/browse/KAFKA-1032
KAFKA-1025  Producer.send should provide recoverability info on failiure
https://issues.apache.org/jira/browse/KAFKA-1025
KAFKA-1012  Implement an Offset Manager and hook offset requests to it
https://issues.apache.org/jira/browse/KAFKA-1012
KAFKA-1011  Decompression and re-compression on MirrorMaker could result in 
messages being dropped in the pipeline
https://issues.apache.org/jira/browse/KAFKA-1011
KAFKA-1005  kafka.perf.ConsumerPerformance not shutting down consumer
https://issues.apache.org/jira/browse/KAFKA-1005
KAFKA-998   Producer should not retry on non-recoverable error codes
https://issues.apache.org/jira/browse/KAFKA-998
KAFKA-997   Provide a strict verification mode when reading configuration 
properties
https://issues.apache.org/jira/browse/KAFKA-997
KAFKA-996   Capitalize first letter for log entries
https://issues.apache.org/jira/browse/KAFKA-996
KAFKA-984   Avoid a full rebalance in cases when a new topic is discovered but 
container/broker set stay the same
https://issues.apache.org/jira/browse/KAFKA-984
KAFKA-976   Order-Preserving Mirror Maker Testcase
https://issues.apache.org/jira/browse/KAFKA-976
KAFKA-967   Use key range in ProducerPerformance
https://issues.apache.org/jira/browse/KAFKA-967
KAFKA-917   Expose zk.session.timeout.ms in console consum

Simple Auth and Auth

2014-01-23 Thread Joe Brown
Hi All

I’ve been looking at the wiki proposal to add Auth and Auth to kafka 
https://cwiki.apache.org/confluence/display/KAFKA/Security 

In the meantime I’ve had a recent immediate requirement to implement something 
similar - my solution is detailed below - fairly quick and dirty but achieved 
the desired results.  At the moment the code is branched off the 0.8-beta - 
though to move to 0.8.0 is trivial - the question is when will the 
https://cwiki.apache.org/confluence/display/KAFKA/Security be looked at, if 
this is a ways off then I could consider creating a patch and pull request for 
my code.

FYI my requirements were very specific - for example no encryption of the data, 
changes minimised - easy to re-apply to future releases.
Authentication

Authentication is achieved by passing the client’s public certificate along 
with a message, signed by the client’s private key, every time the client opens 
a connection to a broker. The underlying Apache Kafka BlockingChannel was 
altered to ensure the connection is only established on a successful 
authentication response from the broker. 

The broker, to authenticate the client, first verifies the client’s public 
certificate against the issuing CA certificate held in the broker truststore. 
The broker then uses the verified client’s certificate to verify the signed 
message from the client. On authentication by the broker the successful 
authentication response is sent to the client. The client DN is then registered 
against the client connection as trusted by the broker.

- Use of standard java.security.* and java.security.cert.* classes were used 
for Authentication
Authorization

Authorization is controlled by the file referenced by the property 
“auth.config.path” configured on the broker. On broker initialization the file 
is read, changes to this file will be reloaded at intervals defined by the 
“auth.file.watch.seconds" property.

Both Producer (produce) and Consumer (fetch) Requests are intercepted on the 
broker to check whether the client connection is authenticated and then whether 
the associated client DN has the permission to read (fetch) or write (produce) 
from/to the given topic.

- Uses simple json config file for authorization

Thoughts please?

Cheers

Joe

Re: Review Request 17006: Keep track of local pid and kill it on stopping all entities as discussed with John

2014-01-23 Thread Guozhang Wang


> On Jan. 23, 2014, 5:51 p.m., John Fung wrote:
> > There will be only 1 producer_performance running per entity_id. Therefore, 
> > entity_id can be used as a key in the dictionary to keep track of multiple 
> > running producer_performance threads:
> > 
> > +os.tcaseEnv.producerHostParentPidDict[entityId] = os.getpid()

Hi John, thanks for the comments. Not sure I get it clear though. What do you 
suggest to change the above line?


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17006/#review32632
---


On Jan. 22, 2014, 7:51 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17006/
> ---
> 
> (Updated Jan. 22, 2014, 7:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1212
> https://issues.apache.org/jira/browse/KAFKA-1212
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1212.v2
> 
> 
> Diffs
> -
> 
>   system_test/utils/kafka_system_test_utils.py 
> fb4a9c05bf6f39a7abf41126325ed5ca26bcc246 
> 
> Diff: https://reviews.apache.org/r/17006/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: producer rewrite

2014-01-23 Thread Neha Narkhede
+1 for checking this in as is. For a from-scratch rewrite like this, I
prefer to do incremental reviews on a standalone subproject until it is
complete and stable to be merged into the main codebase. Looking forward to
the patch!

Thanks,
Neha


On Thu, Jan 23, 2014 at 10:23 AM, Jay Kreps  wrote:

> Hey all,
>
> I have been working on a rewrite of the producer as described in the wiki
> below and discussed in a few previous threads:
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> My code is still has some bugs and is a bit rough in parts, but it
> functions in the basic cases. I did some basic performance tests over
> localhost, and the new approach has paid off quite significantly--for small
> (10 byte) messages a single thread on my laptop can send over 1m
> messages/second, and with larger messages easily maxes out the server.
>
> The difference between "sync" and "async" largely producer disappears--all
> requests immediately return a future response which can be used to get the
> behavior of either sync or async usage and we batch whenever the producer
> is under load using a "group commit"-like approach. You can encourage
> additional batching by incurring a small amount of latency (as before).
>
> Let's talk about how to integrate this code.
>
> This is a from-scratch rewrite of the producer code. As such it is a pretty
> major change. So far I have mostly been working on my own. I'd like to
> start getting feedback before I get too far along--no point in my polishing
> things that are going to be significantly revised in review, after all.
>
> As such here is what I would propose:
>
> 1. I'll put up a preliminary patch. Since this code is a completely
> standalone module it will not destabilize the existing server or existing
> producer (in fact there is no change to those). I will avoid including
> build support in this patch until we get the gradle stuff worked out so as
> to not break that patch (hopefully that moves along). Let's take this patch
> "as is" but with no expectation that the code is complete or that checkin
> implies everyone agrees with every design decision. I will follow-up with
> subsequent patches as we do reviews and discussions.
>
> 2. I'll send out a few higher-level topics for discussion threads. Let's
> get to consensus on these. I think micro-reviewing minor correctness issues
> won't be productive until we make higher level decisions. The topics. I'd
> like to discuss include
> a. The producer code:
>  - The public API
>  - The configurations: their names, and the general knobs we are
>  - Client message serialization
>  - The instrumentation to have
>  - The blocking and batching behavior
> b. The common code and few other cross-cutting policy things
>  - The approach to protocol definition and request serialization
>  - The config definition helper code
>  - The metrics package
>  - The project layout
>  - The java coding style and the use of java
>  - The approach to logging
>
> This is somewhat backwards, but I think it will be easier to handle changes
> that fall out of these discussions against an existing code base that is
> checked in otherwise each revision will be a brand new very large patch.
>
> If no objections I will toss up this code and kick off some of these
> discussions.
>
> -Jay
>


Re: producer rewrite

2014-01-23 Thread Jun Rao
This approach sounds reasonable to me. Since the new code will be not be
used in the current kafka jar, we can still release 0.8.1 off trunk when
it's ready.

Thanks,

Jun


On Thu, Jan 23, 2014 at 10:23 AM, Jay Kreps  wrote:

> Hey all,
>
> I have been working on a rewrite of the producer as described in the wiki
> below and discussed in a few previous threads:
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> My code is still has some bugs and is a bit rough in parts, but it
> functions in the basic cases. I did some basic performance tests over
> localhost, and the new approach has paid off quite significantly--for small
> (10 byte) messages a single thread on my laptop can send over 1m
> messages/second, and with larger messages easily maxes out the server.
>
> The difference between "sync" and "async" largely producer disappears--all
> requests immediately return a future response which can be used to get the
> behavior of either sync or async usage and we batch whenever the producer
> is under load using a "group commit"-like approach. You can encourage
> additional batching by incurring a small amount of latency (as before).
>
> Let's talk about how to integrate this code.
>
> This is a from-scratch rewrite of the producer code. As such it is a pretty
> major change. So far I have mostly been working on my own. I'd like to
> start getting feedback before I get too far along--no point in my polishing
> things that are going to be significantly revised in review, after all.
>
> As such here is what I would propose:
>
> 1. I'll put up a preliminary patch. Since this code is a completely
> standalone module it will not destabilize the existing server or existing
> producer (in fact there is no change to those). I will avoid including
> build support in this patch until we get the gradle stuff worked out so as
> to not break that patch (hopefully that moves along). Let's take this patch
> "as is" but with no expectation that the code is complete or that checkin
> implies everyone agrees with every design decision. I will follow-up with
> subsequent patches as we do reviews and discussions.
>
> 2. I'll send out a few higher-level topics for discussion threads. Let's
> get to consensus on these. I think micro-reviewing minor correctness issues
> won't be productive until we make higher level decisions. The topics. I'd
> like to discuss include
> a. The producer code:
>  - The public API
>  - The configurations: their names, and the general knobs we are
>  - Client message serialization
>  - The instrumentation to have
>  - The blocking and batching behavior
> b. The common code and few other cross-cutting policy things
>  - The approach to protocol definition and request serialization
>  - The config definition helper code
>  - The metrics package
>  - The project layout
>  - The java coding style and the use of java
>  - The approach to logging
>
> This is somewhat backwards, but I think it will be easier to handle changes
> that fall out of these discussions against an existing code base that is
> checked in otherwise each revision will be a brand new very large patch.
>
> If no objections I will toss up this code and kick off some of these
> discussions.
>
> -Jay
>


Re: producer rewrite

2014-01-23 Thread Joe Stein
awesome! +1 for checking this in as is as you suggest

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/


On Thu, Jan 23, 2014 at 2:37 PM, Jun Rao  wrote:

> This approach sounds reasonable to me. Since the new code will be not be
> used in the current kafka jar, we can still release 0.8.1 off trunk when
> it's ready.
>
> Thanks,
>
> Jun
>
>
> On Thu, Jan 23, 2014 at 10:23 AM, Jay Kreps  wrote:
>
> > Hey all,
> >
> > I have been working on a rewrite of the producer as described in the wiki
> > below and discussed in a few previous threads:
> > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> >
> > My code is still has some bugs and is a bit rough in parts, but it
> > functions in the basic cases. I did some basic performance tests over
> > localhost, and the new approach has paid off quite significantly--for
> small
> > (10 byte) messages a single thread on my laptop can send over 1m
> > messages/second, and with larger messages easily maxes out the server.
> >
> > The difference between "sync" and "async" largely producer
> disappears--all
> > requests immediately return a future response which can be used to get
> the
> > behavior of either sync or async usage and we batch whenever the producer
> > is under load using a "group commit"-like approach. You can encourage
> > additional batching by incurring a small amount of latency (as before).
> >
> > Let's talk about how to integrate this code.
> >
> > This is a from-scratch rewrite of the producer code. As such it is a
> pretty
> > major change. So far I have mostly been working on my own. I'd like to
> > start getting feedback before I get too far along--no point in my
> polishing
> > things that are going to be significantly revised in review, after all.
> >
> > As such here is what I would propose:
> >
> > 1. I'll put up a preliminary patch. Since this code is a completely
> > standalone module it will not destabilize the existing server or existing
> > producer (in fact there is no change to those). I will avoid including
> > build support in this patch until we get the gradle stuff worked out so
> as
> > to not break that patch (hopefully that moves along). Let's take this
> patch
> > "as is" but with no expectation that the code is complete or that checkin
> > implies everyone agrees with every design decision. I will follow-up with
> > subsequent patches as we do reviews and discussions.
> >
> > 2. I'll send out a few higher-level topics for discussion threads. Let's
> > get to consensus on these. I think micro-reviewing minor correctness
> issues
> > won't be productive until we make higher level decisions. The topics. I'd
> > like to discuss include
> > a. The producer code:
> >  - The public API
> >  - The configurations: their names, and the general knobs we are
> >  - Client message serialization
> >  - The instrumentation to have
> >  - The blocking and batching behavior
> > b. The common code and few other cross-cutting policy things
> >  - The approach to protocol definition and request serialization
> >  - The config definition helper code
> >  - The metrics package
> >  - The project layout
> >  - The java coding style and the use of java
> >  - The approach to logging
> >
> > This is somewhat backwards, but I think it will be easier to handle
> changes
> > that fall out of these discussions against an existing code base that is
> > checked in otherwise each revision will be a brand new very large patch.
> >
> > If no objections I will toss up this code and kick off some of these
> > discussions.
> >
> > -Jay
> >
>


[jira] [Created] (KAFKA-1227) Code dump of new producer

2014-01-23 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-1227:


 Summary: Code dump of new producer
 Key: KAFKA-1227
 URL: https://issues.apache.org/jira/browse/KAFKA-1227
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Jay Kreps


The plan is to take a dump of the producer code "as is" and then do a series of 
post-commit reviews to get it into shape. This bug tracks just the code dump.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


Review Request 17261: New producer for Kafka.

2014-01-23 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17261/
---

Review request for kafka.


Bugs: KAFKA-1227
https://issues.apache.org/jira/browse/KAFKA-1227


Repository: kafka


Description
---

KAFKA-1227 New producer!


Diffs
-

  clients/build.sbt PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/BufferPool.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/Metadata.java 
PRE-CREATION 
  
clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
 PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/Sender.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/tools/ProducerPerformance.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
  clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
  clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
  clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
  clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
  clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
  clients/src/main/java/kafka/common/Node.java PRE-CREATION 
  clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
  clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
  clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
  clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
  clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
  clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
  clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/CorruptMessageException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/MessageTooLargeException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/RetryableException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/UnknownServerException.java 
PRE-CREATION 
  
clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/QuotaViolationException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/s

Review Request 17263: New producer for Kafka.

2014-01-23 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/
---

Review request for kafka.


Bugs: KAFKA-1227
https://issues.apache.org/jira/browse/KAFKA-1227


Repository: kafka


Description
---

KAFKA-1227 New producer!


Diffs
-

  clients/build.sbt PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/BufferPool.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/Metadata.java 
PRE-CREATION 
  
clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
 PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/Sender.java 
PRE-CREATION 
  clients/src/main/java/kafka/clients/tools/ProducerPerformance.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
  clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
  clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
  clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
  clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
  clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
  clients/src/main/java/kafka/common/Node.java PRE-CREATION 
  clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
  clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
  clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
  clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
  clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
  clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
  clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/CorruptMessageException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/MessageTooLargeException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/RetryableException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/UnknownServerException.java 
PRE-CREATION 
  
clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/QuotaViolationException.java 
PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/s

[jira] [Commented] (KAFKA-1227) Code dump of new producer

2014-01-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880343#comment-13880343
 ] 

Jay Kreps commented on KAFKA-1227:
--

Created reviewboard https://reviews.apache.org/r/17263/
 against branch trunk

> Code dump of new producer
> -
>
> Key: KAFKA-1227
> URL: https://issues.apache.org/jira/browse/KAFKA-1227
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1227.patch
>
>
> The plan is to take a dump of the producer code "as is" and then do a series 
> of post-commit reviews to get it into shape. This bug tracks just the code 
> dump.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Updated] (KAFKA-1227) Code dump of new producer

2014-01-23 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1227:
-

Attachment: KAFKA-1227.patch

> Code dump of new producer
> -
>
> Key: KAFKA-1227
> URL: https://issues.apache.org/jira/browse/KAFKA-1227
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1227.patch
>
>
> The plan is to take a dump of the producer code "as is" and then do a series 
> of post-commit reviews to get it into shape. This bug tracks just the code 
> dump.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


Re: producer rewrite

2014-01-23 Thread Jay Kreps
Cool, I've uploaded a patch and rb here:
https://issues.apache.org/jira/browse/KAFKA-1227

-Jay


On Thu, Jan 23, 2014 at 12:00 PM, Joe Stein  wrote:

> awesome! +1 for checking this in as is as you suggest
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>
>
> On Thu, Jan 23, 2014 at 2:37 PM, Jun Rao  wrote:
>
> > This approach sounds reasonable to me. Since the new code will be not be
> > used in the current kafka jar, we can still release 0.8.1 off trunk when
> > it's ready.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Jan 23, 2014 at 10:23 AM, Jay Kreps  wrote:
> >
> > > Hey all,
> > >
> > > I have been working on a rewrite of the producer as described in the
> wiki
> > > below and discussed in a few previous threads:
> > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > >
> > > My code is still has some bugs and is a bit rough in parts, but it
> > > functions in the basic cases. I did some basic performance tests over
> > > localhost, and the new approach has paid off quite significantly--for
> > small
> > > (10 byte) messages a single thread on my laptop can send over 1m
> > > messages/second, and with larger messages easily maxes out the server.
> > >
> > > The difference between "sync" and "async" largely producer
> > disappears--all
> > > requests immediately return a future response which can be used to get
> > the
> > > behavior of either sync or async usage and we batch whenever the
> producer
> > > is under load using a "group commit"-like approach. You can encourage
> > > additional batching by incurring a small amount of latency (as before).
> > >
> > > Let's talk about how to integrate this code.
> > >
> > > This is a from-scratch rewrite of the producer code. As such it is a
> > pretty
> > > major change. So far I have mostly been working on my own. I'd like to
> > > start getting feedback before I get too far along--no point in my
> > polishing
> > > things that are going to be significantly revised in review, after all.
> > >
> > > As such here is what I would propose:
> > >
> > > 1. I'll put up a preliminary patch. Since this code is a completely
> > > standalone module it will not destabilize the existing server or
> existing
> > > producer (in fact there is no change to those). I will avoid including
> > > build support in this patch until we get the gradle stuff worked out so
> > as
> > > to not break that patch (hopefully that moves along). Let's take this
> > patch
> > > "as is" but with no expectation that the code is complete or that
> checkin
> > > implies everyone agrees with every design decision. I will follow-up
> with
> > > subsequent patches as we do reviews and discussions.
> > >
> > > 2. I'll send out a few higher-level topics for discussion threads.
> Let's
> > > get to consensus on these. I think micro-reviewing minor correctness
> > issues
> > > won't be productive until we make higher level decisions. The topics.
> I'd
> > > like to discuss include
> > > a. The producer code:
> > >  - The public API
> > >  - The configurations: their names, and the general knobs we are
> > >  - Client message serialization
> > >  - The instrumentation to have
> > >  - The blocking and batching behavior
> > > b. The common code and few other cross-cutting policy things
> > >  - The approach to protocol definition and request serialization
> > >  - The config definition helper code
> > >  - The metrics package
> > >  - The project layout
> > >  - The java coding style and the use of java
> > >  - The approach to logging
> > >
> > > This is somewhat backwards, but I think it will be easier to handle
> > changes
> > > that fall out of these discussions against an existing code base that
> is
> > > checked in otherwise each revision will be a brand new very large
> patch.
> > >
> > > If no objections I will toss up this code and kick off some of these
> > > discussions.
> > >
> > > -Jay
> > >
> >
>


[jira] [Commented] (KAFKA-1212) System test exception handling does not stop background producer threads

2014-01-23 Thread John Fung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880370#comment-13880370
 ] 

John Fung commented on KAFKA-1212:
--

Commented in the rb. thanks.

> System test exception handling does not stop background producer threads
> 
>
> Key: KAFKA-1212
> URL: https://issues.apache.org/jira/browse/KAFKA-1212
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Attachments: KAFKA-1212.patch, KAFKA-1212_2014-01-21_13:20:59.patch, 
> KAFKA-1212_2014-01-22_11:47:22.patch, KAFKA-1212_2014-01-22_11:51:46.patch
>
>
> When exception is thrown, the system test script stops all known entities. 
> However, the background producer thread cannot be stopped since it does not 
> register its pid in the testcase environment. We need to specifically stop 
> them.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Commented] (KAFKA-1227) Code dump of new producer

2014-01-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880474#comment-13880474
 ] 

Jun Rao commented on KAFKA-1227:


I made a pass of the producer client code. The following are my comments.

1. Selector: It seems that the selector never closes an existing socket on its 
own (other than when the selector itself is closed). For example, not existing 
sockets are closed after metadata refresh. This has the implication that it may 
increase the # of socket connections that a client has to maintain. For 
example, if every client uses all brokers as the metadata broker list, it means 
that every client will maintain a socket connection to every broker, which 
doesn't seem to be very scalable. Also, if a partition is moved to some new 
brokers, the client will still be maintaining the socket connections to the old 
brokers. In 0.8, we close all existing sockets everytime the metadata is 
refreshed.

2. Metadata: We need to think through the case when the clients use a VIP in 
the metadata broker list. In this patch, it seems that we only use the VIP once 
and then switch to actual broker list after first metadata update. This means 
that the producer can only issue metadata requests to brokers to which replicas 
are assigned. In 0.8, we always fetch metadata requests using the metadata 
broker list. Another thing that we do in 0.8 is to close the socket connection 
after each metadata request. When using a VIP, an idle socket connection can be 
killed by the load balancer. If the vip is not configured properly, it may take 
a long time (e.g., 8 minutes) to detect that the socket is already killed, 
which will slow down the fetching of metadata.

3. DefaultPartitioner:
3.1 This has the issue that every instance of producer always starts with 
partition 0, which could create imbalanced load if multiple producers are 
created at the same time.
3.2 Also, a better default partitioner when no partition key is provided, is 
probably to select a random "available" (i.e., leader node exists) partition, 
instead of just a random partition.

4.Partitioner.partition(): From cluster, we can get the partition list for a 
topic. Is the passed in numPartitions redundant?

5. Sender:
5.1 run(): It seems that it's possible to have a produce request and metadata 
request to be sent to the same node in one iteration. This will cause 
selector.poll() to fail since we can't send more than 1 request to the same 
node per poll.
5.2 produceRequest(): topicDatas is weird since data is the plural form of 
datum.

6. TopicPartition: How do we prevent that the computed hash code is exactly 0?

7. BufferExhaustedException: It's probably useful to include the requested size 
in the exception.

8. RecordAccumulator:
8.1 Should we call free bufferPool?
8.2 ready(): Should a partition be also considered ready if it has only 1 
ReocrdBatch whose size is exactly of batchSize?

9. RecordBatch.done(): Should we unblock RecordSend after registered callbacks 
are called?

10. RecordSend: We should include at least the partition number and probably 
the topic itself.

11. Various mis-spellings:
11.1 ProducerRecord: chosing
11.2 KafkaProducer:
11.2.1 comments above send(): messaging waiting = > messages waiting
11.2.2 {@link kafka.clients.producer.RecordSend.await() await()}: 2 await()
11.3 RecordBatch: sufficent

12. Formatting: Should we use 4-space indentation vs 2-space? The latter is 
what we have been using in scala.

The following can be added to the TODO list:

13. BufferPool: When we add jmx, it's probably using to have one on size of the 
waiter list, and another on the available memory.

14. logging: It seems that there is no logging messages and we use 
e.printStackTrace() in a few places. Should we use log4j?

15. Configs: It would be useful to log every overridden value and unused 
property name.


> Code dump of new producer
> -
>
> Key: KAFKA-1227
> URL: https://issues.apache.org/jira/browse/KAFKA-1227
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1227.patch
>
>
> The plan is to take a dump of the producer code "as is" and then do a series 
> of post-commit reviews to get it into shape. This bug tracks just the code 
> dump.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Commented] (KAFKA-1227) Code dump of new producer

2014-01-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880519#comment-13880519
 ] 

Guozhang Wang commented on KAFKA-1227:
--

Some more comments:

--- General

1. How to we decide where to put Exception definitions? Currently we have an 
errors folder in kafka.comm and some folders also have their only exceptions.

2. Shall we merge kafka.comm.protocol and kafka.comm.request folders since the 
requests definitions are highly dependent on the protocol class?

3. Shall we put Node, Cluster, Partition, TopicPartition in kafka.comm into one 
sub-folder, for example, called kafka.comm.metadata?

4. Shall we put the Serializer classes into the protocol folder?

5. Shall we move the kafka.clients.common.network sub-folder to kafka.common?


--- kafka.common.Cluster

1. Since the nextNode use global round robin, we need to make sure no more than 
one objects access a single Cluster’s nextNode.

--- kafka.common.StringSerialization

1. Shall we put config names such as ENCODING_CONFIG all in a single file?

--- kafka.common.AbstractIterator

1. makeNext is not supposed to left in other states other than DONE and READY?

--- kafka.common.protocl.Schema

1. Will Field order difference make to different schemas?

--- kafka.common.protocl.ProtoUtil

1. parseMetadataResponse: after reading the function I feel that the 
TopicInfo/PartitionInfo object for parsing might be preferable. We can put 
these objects in the Protocol.java file so any protocol change would only 
require one file edit.

--- kafka.common.record.LogEntry

1. Maybe we can rename to OffsetRecord?

--- kafka.common.record.Record

1. Do we expect MIN_HEADER_SIZE and RECORD_OVERHEAD to be different in the 
future? Currently their values are the same and the way they are computed are 
also identical.

--- kafka.common.request.RequestHeader

1. Is it better to define "client_id" strings as static field in the 
Protocol.java?

2. Does REQUEST/RESPONSE_HEADER also need to be versioned?

--- kafka.client.common.NetworkReceive

1. In the first constructor, why not also initializing the size buffer also to 
ByteBuffer.allocate(4)?

2. Why NetworkReceive not extending ByteBufferReceive?

--- kafka.client.common.Selector

1. “transmissions.send.remaining() <= 0”, under what condition can remaining() 
be < 0?

2. “if (trans != null) this.disconnected.add(trans.id); “, should it be trans 
== null?

--- kafka.client.producer.internals.BufferPool:

1. In the freeUp() function, should use this.free.pollLast().capacity() instead 
of limit()?

2. What is the rational of having just one poolable size?

--- kafka.clients.producer.internals.Metadata

1. After configs are added, we need to remove the hard-coded default values. So 
for all of these places we could leave a TODO mark for now.

--- kafka.clients.producer.internals.ProduceRequestResult

1. Its member fields are dependent on Protocol.java, so once we change the 
protocol we would probably also need to change this file.

--- kafka.clients.producer.internals.RecordAccumulator

1. Typo: “Get a list of topic-partitions which are ready to be send.”

--- kafka.clients.producer.internals.Sender

1. One corner case we may need to consider is the following: if a partition 
becomes not available, and producer keep sending data to this partition, then 
later on this partition could exhaust the memory, keeping other partitions to 
not able to take more messages but block waiting.

2. In handling dis-connection, the ProduceRequestResult will set the exception, 
and if await() is called this exception will be thrown and the callback not be 
executed. Since this exception is already stored in the RecordSend I think a 
better way is not throw exception on await() but let the callback function to 
handle it. That would make the application code more clean since otherwise the 
application need so try-catch the await() call.

3. In closing the producer, there is another corner case that the io thread can 
keep trying to send the rest of the data and failed. Probably we could add 
another option to drop whatever is in the buffer and let the callback functions 
of the application to handle them.

> Code dump of new producer
> -
>
> Key: KAFKA-1227
> URL: https://issues.apache.org/jira/browse/KAFKA-1227
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1227.patch
>
>
> The plan is to take a dump of the producer code "as is" and then do a series 
> of post-commit reviews to get it into shape. This bug tracks just the code 
> dump.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


Re: Review Request 17006: Keep track of local pid and kill it on stopping all entities as discussed with John

2014-01-23 Thread John Fung


> On Jan. 23, 2014, 5:51 p.m., John Fung wrote:
> > There will be only 1 producer_performance running per entity_id. Therefore, 
> > entity_id can be used as a key in the dictionary to keep track of multiple 
> > running producer_performance threads:
> > 
> > +os.tcaseEnv.producerHostParentPidDict[entityId] = os.getpid()
> 
> Guozhang Wang wrote:
> Hi John, thanks for the comments. Not sure I get it clear though. What do 
> you suggest to change the above line?

Guozhang, sorry about the confusion. I meant using entityId as a key to the 
dictionary is the proper way to track producer pids. I wanted to give a "+1" 
for it.


- John


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17006/#review32632
---


On Jan. 22, 2014, 7:51 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17006/
> ---
> 
> (Updated Jan. 22, 2014, 7:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1212
> https://issues.apache.org/jira/browse/KAFKA-1212
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1212.v2
> 
> 
> Diffs
> -
> 
>   system_test/utils/kafka_system_test_utils.py 
> fb4a9c05bf6f39a7abf41126325ed5ca26bcc246 
> 
> Diff: https://reviews.apache.org/r/17006/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: producer rewrite

2014-01-23 Thread S Ahmed
Sorry I'm new to o/s, how can I look at this patch?

Would it be mirrored here? https://github.com/apache/kafka



On Thu, Jan 23, 2014 at 3:56 PM, Jay Kreps  wrote:

> Cool, I've uploaded a patch and rb here:
> https://issues.apache.org/jira/browse/KAFKA-1227
>
> -Jay
>
>
> On Thu, Jan 23, 2014 at 12:00 PM, Joe Stein  wrote:
>
> > awesome! +1 for checking this in as is as you suggest
> >
> > /***
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop 
> > /
> >
> >
> > On Thu, Jan 23, 2014 at 2:37 PM, Jun Rao  wrote:
> >
> > > This approach sounds reasonable to me. Since the new code will be not
> be
> > > used in the current kafka jar, we can still release 0.8.1 off trunk
> when
> > > it's ready.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Jan 23, 2014 at 10:23 AM, Jay Kreps 
> wrote:
> > >
> > > > Hey all,
> > > >
> > > > I have been working on a rewrite of the producer as described in the
> > wiki
> > > > below and discussed in a few previous threads:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > >
> > > > My code is still has some bugs and is a bit rough in parts, but it
> > > > functions in the basic cases. I did some basic performance tests over
> > > > localhost, and the new approach has paid off quite significantly--for
> > > small
> > > > (10 byte) messages a single thread on my laptop can send over 1m
> > > > messages/second, and with larger messages easily maxes out the
> server.
> > > >
> > > > The difference between "sync" and "async" largely producer
> > > disappears--all
> > > > requests immediately return a future response which can be used to
> get
> > > the
> > > > behavior of either sync or async usage and we batch whenever the
> > producer
> > > > is under load using a "group commit"-like approach. You can encourage
> > > > additional batching by incurring a small amount of latency (as
> before).
> > > >
> > > > Let's talk about how to integrate this code.
> > > >
> > > > This is a from-scratch rewrite of the producer code. As such it is a
> > > pretty
> > > > major change. So far I have mostly been working on my own. I'd like
> to
> > > > start getting feedback before I get too far along--no point in my
> > > polishing
> > > > things that are going to be significantly revised in review, after
> all.
> > > >
> > > > As such here is what I would propose:
> > > >
> > > > 1. I'll put up a preliminary patch. Since this code is a completely
> > > > standalone module it will not destabilize the existing server or
> > existing
> > > > producer (in fact there is no change to those). I will avoid
> including
> > > > build support in this patch until we get the gradle stuff worked out
> so
> > > as
> > > > to not break that patch (hopefully that moves along). Let's take this
> > > patch
> > > > "as is" but with no expectation that the code is complete or that
> > checkin
> > > > implies everyone agrees with every design decision. I will follow-up
> > with
> > > > subsequent patches as we do reviews and discussions.
> > > >
> > > > 2. I'll send out a few higher-level topics for discussion threads.
> > Let's
> > > > get to consensus on these. I think micro-reviewing minor correctness
> > > issues
> > > > won't be productive until we make higher level decisions. The topics.
> > I'd
> > > > like to discuss include
> > > > a. The producer code:
> > > >  - The public API
> > > >  - The configurations: their names, and the general knobs we are
> > > >  - Client message serialization
> > > >  - The instrumentation to have
> > > >  - The blocking and batching behavior
> > > > b. The common code and few other cross-cutting policy things
> > > >  - The approach to protocol definition and request serialization
> > > >  - The config definition helper code
> > > >  - The metrics package
> > > >  - The project layout
> > > >  - The java coding style and the use of java
> > > >  - The approach to logging
> > > >
> > > > This is somewhat backwards, but I think it will be easier to handle
> > > changes
> > > > that fall out of these discussions against an existing code base that
> > is
> > > > checked in otherwise each revision will be a brand new very large
> > patch.
> > > >
> > > > If no objections I will toss up this code and kick off some of these
> > > > discussions.
> > > >
> > > > -Jay
> > > >
> > >
> >
>


Re: producer rewrite

2014-01-23 Thread Neha Narkhede
There is a link to the reviewboard url on the
JIRAmentioned above.
That will take you to the patch.


On Thu, Jan 23, 2014 at 4:49 PM, S Ahmed  wrote:

> Sorry I'm new to o/s, how can I look at this patch?
>
> Would it be mirrored here? https://github.com/apache/kafka
>
>
>
> On Thu, Jan 23, 2014 at 3:56 PM, Jay Kreps  wrote:
>
> > Cool, I've uploaded a patch and rb here:
> > https://issues.apache.org/jira/browse/KAFKA-1227
> >
> > -Jay
> >
> >
> > On Thu, Jan 23, 2014 at 12:00 PM, Joe Stein 
> wrote:
> >
> > > awesome! +1 for checking this in as is as you suggest
> > >
> > > /***
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop 
> > > /
> > >
> > >
> > > On Thu, Jan 23, 2014 at 2:37 PM, Jun Rao  wrote:
> > >
> > > > This approach sounds reasonable to me. Since the new code will be not
> > be
> > > > used in the current kafka jar, we can still release 0.8.1 off trunk
> > when
> > > > it's ready.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Thu, Jan 23, 2014 at 10:23 AM, Jay Kreps 
> > wrote:
> > > >
> > > > > Hey all,
> > > > >
> > > > > I have been working on a rewrite of the producer as described in
> the
> > > wiki
> > > > > below and discussed in a few previous threads:
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > > >
> > > > > My code is still has some bugs and is a bit rough in parts, but it
> > > > > functions in the basic cases. I did some basic performance tests
> over
> > > > > localhost, and the new approach has paid off quite
> significantly--for
> > > > small
> > > > > (10 byte) messages a single thread on my laptop can send over 1m
> > > > > messages/second, and with larger messages easily maxes out the
> > server.
> > > > >
> > > > > The difference between "sync" and "async" largely producer
> > > > disappears--all
> > > > > requests immediately return a future response which can be used to
> > get
> > > > the
> > > > > behavior of either sync or async usage and we batch whenever the
> > > producer
> > > > > is under load using a "group commit"-like approach. You can
> encourage
> > > > > additional batching by incurring a small amount of latency (as
> > before).
> > > > >
> > > > > Let's talk about how to integrate this code.
> > > > >
> > > > > This is a from-scratch rewrite of the producer code. As such it is
> a
> > > > pretty
> > > > > major change. So far I have mostly been working on my own. I'd like
> > to
> > > > > start getting feedback before I get too far along--no point in my
> > > > polishing
> > > > > things that are going to be significantly revised in review, after
> > all.
> > > > >
> > > > > As such here is what I would propose:
> > > > >
> > > > > 1. I'll put up a preliminary patch. Since this code is a completely
> > > > > standalone module it will not destabilize the existing server or
> > > existing
> > > > > producer (in fact there is no change to those). I will avoid
> > including
> > > > > build support in this patch until we get the gradle stuff worked
> out
> > so
> > > > as
> > > > > to not break that patch (hopefully that moves along). Let's take
> this
> > > > patch
> > > > > "as is" but with no expectation that the code is complete or that
> > > checkin
> > > > > implies everyone agrees with every design decision. I will
> follow-up
> > > with
> > > > > subsequent patches as we do reviews and discussions.
> > > > >
> > > > > 2. I'll send out a few higher-level topics for discussion threads.
> > > Let's
> > > > > get to consensus on these. I think micro-reviewing minor
> correctness
> > > > issues
> > > > > won't be productive until we make higher level decisions. The
> topics.
> > > I'd
> > > > > like to discuss include
> > > > > a. The producer code:
> > > > >  - The public API
> > > > >  - The configurations: their names, and the general knobs we
> are
> > > > >  - Client message serialization
> > > > >  - The instrumentation to have
> > > > >  - The blocking and batching behavior
> > > > > b. The common code and few other cross-cutting policy things
> > > > >  - The approach to protocol definition and request
> serialization
> > > > >  - The config definition helper code
> > > > >  - The metrics package
> > > > >  - The project layout
> > > > >  - The java coding style and the use of java
> > > > >  - The approach to logging
> > > > >
> > > > > This is somewhat backwards, but I think it will be easier to handle
> > > > changes
> > > > > that fall out of these discussions against an existing code base
> that
> > > is
> > > > > checked in otherwise each revision will be a brand new very large
> > > patch.
> > > > >
> > > > > If no objections I will toss up this code and kick off

[jira] [Commented] (KAFKA-1227) Code dump of new producer

2014-01-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880717#comment-13880717
 ] 

Jay Kreps commented on KAFKA-1227:
--

Hey Jun, quick responses. I'll try to get a patch up with some of the minor 
things, though a few of the others I'd like to do post-commit.

1. WRT closing connections. Yes, this is true. I agree it is needed but decided 
to punt on it for the first pass. It is an important follow-up item. There are 
two cases to handle: metadata fetches and leadership handoffs. 
Obviously the Selector will not handle these special cases which are specific 
to this use case. Theoretically this could all be done in the Sender logic but 
it would be a bit complex. I think the best solution is just to have us time 
out idle connections after some configurable period of disuse (30 seconds, 
say). 

2. I think the VIP problem can be handled by just timing out idle connections. 
Special cases related to metadata won't help because non-metadata related 
connections can also be idle. Not retaining the bootstrap urls is 
intentional--future metadata requests should use the full broker set the 
producer is connecting to. You mention that this will cause the producer to 
prefer to fetch metadata from a broker to which it already has a connection for 
subsequent metadata fetches after the initial bootstrap, but this was the 
idea--no need to setup and then timeout another connection if we already have 
one.

3. WRT initializing the partitioner to 0: Yeah we can initialize to something 
random. This problem would be a bit pathological as you would have to start all 
your producers the same instant and send exactly the same number of messages 
through them for this to persist.

4. I included the numPartitions even though it is easily computable from 
cluster as all partitioners will need to mod by number of partitions, but the 
vast majority won't need to use the cluster. So it just seemed more intuitive 
rather than the user having to figure out that they can get it by calling into 
cluster and worrying about the underlying performance of that just to give it 
to them.

5.1 Yes, that is a bug.
5.2 It is a bit slangy :-)

6. I don't prevent this: a zero hash code will be recomputed each time, but 
this is an unlikely case and recomputing is what would happen in all cases if 
we didn't cache.

7. Good point, I'll improve the error message.

8.1 I'll try to think of a better name.
8.2 Yes, we can do that. I think that would be good for latency in the case 
where we had to allocate a non-standard size

9. I think you could argue either way in terms of the preferrable sequencing. 
However I wanted to reuse the RecordSend object as the argument to the callback 
rather than introduce another object. However this means I do need to complete 
the record send first otherwise the callback will block trying to access fields 
in the send.

10. Ah, very good point.

11. Thanks

12. I am not two picky. 2 spaces is the recommended style in scala and 4 spaces 
is the classic "sun java style". I would like to get our style formally 
specified in an IDE formatter. That is what I am using for eclipse and it is 
very nice, it does all formatting for you and ensures a very consistent style. 
I will start a thread on this one as likely everyone has an opinion.

13. I had the same thought. I'm not sure if it is better to give the current 
value or the average over the window. Thoughts? Since we mostly look at graphs 
polled every 30 seconds if we do the instantaneous measurement it amounts to 
just a single data point for the whole 30 seconds but that may be okay...

14. I figured we would need to discuss logging so I just punted for now. The 
standard insofar as there is one is really slf4j, which I consider kind of 
silly. There are really just a couple of places that need logging so maybe it 
would be fine to just use java.util.logging which comes with the jvm. I'll 
start a thread on this.

15. In a client I think this is something we should leave to the client. 
Printing lots of messages in their logs is a bit rude. I think it is better to 
give an API to get information about the configs.


> Code dump of new producer
> -
>
> Key: KAFKA-1227
> URL: https://issues.apache.org/jira/browse/KAFKA-1227
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1227.patch
>
>
> The plan is to take a dump of the producer code "as is" and then do a series 
> of post-commit reviews to get it into shape. This bug tracks just the code 
> dump.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Comment Edited] (KAFKA-1227) Code dump of new producer

2014-01-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880717#comment-13880717
 ] 

Jay Kreps edited comment on KAFKA-1227 at 1/24/14 4:54 AM:
---

Hey Jun, quick responses. I'll try to get a patch up with some of the minor 
things, though a few of the others I'd like to do post-commit.

1. WRT closing connections. Yes, this is true. I agree it is needed but decided 
to punt on it for the first pass. It is an important follow-up item. There are 
two cases to handle: metadata fetches and leadership handoffs. 
Obviously the Selector will not handle these special cases which are specific 
to this use case. Theoretically this could all be done in the Sender logic but 
it would be a bit complex. I think the best solution is just to have us time 
out idle connections after some configurable period of disuse (30 seconds, 
say). 

2. I think the VIP problem can be handled by just timing out idle connections. 
Special cases related to metadata won't help because non-metadata related 
connections can also be idle. Not retaining the bootstrap urls is intentional: 
future metadata requests should use the full broker set the producer is 
connecting to. You mention that this will cause the producer to prefer to fetch 
metadata from a broker to which it already has a connection for subsequent 
metadata fetches after the initial bootstrap, but this was the idea--no need to 
setup and then timeout another connection if we already have one.

3. WRT initializing the partitioner to 0: Yeah we can initialize to something 
random. This problem would be a bit pathological as you would have to start all 
your producers the same instant and send exactly the same number of messages 
through them for this to persist.

4. I included the numPartitions even though it is easily computable from 
cluster as all partitioners will need to mod by number of partitions, but the 
vast majority won't need to use the cluster. So it just seemed more intuitive 
rather than the user having to figure out that they can get it by calling into 
cluster and worrying about the underlying performance of that just to give it 
to them.

5.1 Yes, that is a bug.
5.2 It is a bit slangy :-)

6. I don't prevent this: a zero hash code will be recomputed each time, but 
this is an unlikely case and recomputing is what would happen in all cases if 
we didn't cache.

7. Good point, I'll improve the error message.

8.1 I'll try to think of a better name.
8.2 Yes, we can do that. I think that would be good for latency in the case 
where we had to allocate a non-standard size

9. I think you could argue either way in terms of the preferrable sequencing. 
However I wanted to reuse the RecordSend object as the argument to the callback 
rather than introduce another object. However this means I do need to complete 
the record send first otherwise the callback will block trying to access fields 
in the send.

10. Ah, very good point.

11. Thanks

12. I am not two picky. 2 spaces is the recommended style in scala and 4 spaces 
is the classic "sun java style". I would like to get our style formally 
specified in an IDE formatter. That is what I am using for eclipse and it is 
very nice, it does all formatting for you and ensures a very consistent style. 
I will start a thread on this one as likely everyone has an opinion.

13. I had the same thought. I'm not sure if it is better to give the current 
value or the average over the window. Thoughts? Since we mostly look at graphs 
polled every 30 seconds if we do the instantaneous measurement it amounts to 
just a single data point for the whole 30 seconds but that may be okay...

14. I figured we would need to discuss logging so I just punted for now. The 
standard insofar as there is one is really slf4j, which I consider kind of 
silly. There are really just a couple of places that need logging so maybe it 
would be fine to just use java.util.logging which comes with the jvm. I'll 
start a thread on this.

15. In a client I think this is something we should leave to the client. 
Printing lots of messages in their logs is a bit rude. I think it is better to 
give an API to get information about the configs.



was (Author: jkreps):
Hey Jun, quick responses. I'll try to get a patch up with some of the minor 
things, though a few of the others I'd like to do post-commit.

1. WRT closing connections. Yes, this is true. I agree it is needed but decided 
to punt on it for the first pass. It is an important follow-up item. There are 
two cases to handle: metadata fetches and leadership handoffs. 
Obviously the Selector will not handle these special cases which are specific 
to this use case. Theoretically this could all be done in the Sender logic but 
it would be a bit complex. I think the best solution is just to have us time 
out idle connections after some configurable period of disuse

[jira] [Commented] (KAFKA-1227) Code dump of new producer

2014-01-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880752#comment-13880752
 ] 

Jay Kreps commented on KAFKA-1227:
--

Hey Guozhang, thanks for the detailed stylistic questions. I think these are 
important to discuss. Quick responses inline:

_1. How to we decide where to put Exception definitions? Currently we have an 
errors folder in kafka.comm and some folders also have their only exceptions._

That package was meant to be explicitly API errors. I.e. those errors which are 
defined in ErrorKeys.java with a registered error code and have a bidirectional 
mapping to this code. These represent communication between the client and 
server so I wanted to put them in a special place. In general exceptions should 
be kept with the package with which they most naturally fit (ConfigException 
goes with config, etc).

The most important code organization principle I had in mind was that each 
package should be either public or private. Only public packages will be 
javadoc'd. All classes in a public package are exposed to the user and are part 
of the public interface. The idea is that we would be very careful with these 
public packages. I considered actually differentiating these as something like 
kafka.clients.producer.pub or something but I thought that was a bit 
ugly--maybe there is another way to differentiate or annotate classes so that 
we are very explicit about public or private. Essentially any change to 
interfaces in these packages is breaking all our users so we have to think very 
carefully about API design and change. The rest of the classes (most of them 
actually) are really just an implementation detail and we can change them at 
will.

Currently the public packages are just:
  kafka.clients.producer
  kafka.common
  kafka.common.errors

One item I wanted to document and discuss as a separate thread was code 
organization as I think these kinds of conventions only work if they are 
documented and broadly understood.

2. Shall we merge kafka.comm.protocol and kafka.comm.request folders since the 
requests definitions are highly dependent on the protocol class?

I would rather not. The kafka.common.network package defines a low-level 
network framing based on size delimited messages. This is fully generic--the 
unit test tests an "echo server". It is not tied to any details of our protocol 
and it is really important that people not leak details of our protocol into 
it :-)

The protocol is just a bunch of message definitions and isn't tied to the 
network transport or framing at all. It is just a way of laying out bytes.

The request package combines the protocol definition and network framing.

I am hoping to keep these things orthogonal.

Once we get our build fixed (ahem), I'd really like us to get checkstyle 
integrated so we can enforce these kinds of package dependencies and keep some 
kind of logical coherence. It has been a struggle otherwise.

3. Shall we put Node, Cluster, Partition, TopicPartition in kafka.comm into one 
sub-folder, for example, called kafka.comm.metadata?

I'm open to that. I liked having the current flat package for simplicity for 
the user (fewer things to import). Basically I am trying to make the javadoc 
for the producer as simple and flat as possible.

4. Shall we put the Serializer classes into the protocol folder?

The Serializer is the PUBLIC interface for users to serialize their messages. 
It actually isn't related to the definition of our protocol definition.

5. Shall we move the kafka.clients.common.network sub-folder to kafka.common?

Yeah I think that is actually how it is. I previously had it separate and the 
rationale was that many of the classes...e.g. the Selector were really written 
with the clients in mind. Theoretically the same Selector class could be the 
basis for the socket server but I didn't really think those use cases through.

1. Since the nextNode use global round robin, we need to make sure no more than 
one objects access a single Cluster’s nextNode.

That may just be a bad name. The goal of that method was load balancing not 
iterating over the nodes. So actually the intention was to give a different 
node to each thread in the multithreaded case. 

1. Shall we put config names such as ENCODING_CONFIG all in a single file?

I planned to do a discussion on config. The way it works is that configs are 
defined by the ConfigDef. However we allow plug-in interfaces (Serializer, 
Partitioner, etc). These may need configs too, but these are (generally 
speaking) user classes. So we allow including user-defined configs. So 
StringSerializer is essentially a user plug-in that seemed useful enough to 
include in the main code. I think it makes more sense to document it's configs 
with the class rather than elsewhere.

— kafka.common.AbstractIterator
1. makeNext is not supposed to left in other states other