[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377446#comment-14377446
 ] 

Jiangjie Qin commented on KAFKA-2029:
-

[~mstr] I'm not sure if adding complexity to an already complicated controller 
is a good idea.
As I mentioned before:
1. In current design there will be data loss when leader migration happen. 
2. The amount of data loss is mostly determined by when the broker handles 
LeaderAndIsrRequest from controller.
As you might have already noticed, 2) cannot be solved from controller side. 
The controller to broker sender thread works in a blocking way - it sends a 
message then waits for response. So it is already by nature waiting for the 
previous message to be handled by broker. Adding a tracker won't help reduce 
the time difference of when brokers handling those messages. That's why I'm 
suggesting to prioritize the controller messages on broker side.
Let's maybe take a look at what falls in short for prioritizing the controller 
message, because it looks to be a simpler and more effective way to reduce data 
loss.
For the high watermark lost issue, I remember we did something in KAFKA-1647. 
Could you take a look to see if that fix the issue you see? Thanks.

> Improving controlled shutdown for rolling updates
> -
>
> Key: KAFKA-2029
> URL: https://issues.apache.org/jira/browse/KAFKA-2029
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Dmitry Bugaychenko
>Assignee: Neha Narkhede
>Priority: Critical
>
> Controlled shutdown as implemented currently can cause numerous problems: 
> deadlocks, local and global datalos, partitions without leader and etc. In 
> some cases the only way to restore cluster is to stop it completelly using 
> kill -9 and start again.
> Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
> queue size makes things much worse (see discussion there).
> Note 2: The problems described here can occure in any setup, but they are 
> extremly painful in setup with large brokers (36 disks, 1000+ assigned 
> partitions per broker in our case).
> Note 3: These improvements are actually workarounds and it is worth to 
> consider global refactoring of the controller (make it single thread, or even 
> get rid of it in the favour of ZK leader elections for partitions).
> The problems and improvements are:
> # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
> shutdown requests and finally considers it as failed and procedes to unclean 
> shutdow, controller got stuck for a while (holding a lock waiting for free 
> space in controller-to-broker queue). After broker starts back it receives 
> followers request and erases highwatermarks (with a message that "replica 
> does not exists" - controller hadn't yet sent a request with replica 
> assignment), then controller starts replicas on the broker it deletes all 
> local data (due to missing highwatermarks). Furthermore, controller starts 
> processing pending shutdown request and stops replicas on the broker letting 
> it in a non-functional state. Solution to the problem might be to increase 
> time broker waits for controller reponse to shutdown request, but this 
> timeout is taken from controller.socket.timeout.ms which is global for all 
> broker-controller communication and setting it to 30 minutes is dangerous. 
> *Proposed solution: introduce dedicated config parameter for this timeout 
> with a high default*.
> # If a broker gets down during controlled shutdown and did not come back 
> controller got stuck in a deadlock (one thread owns the lock and tries to add 
> message to the dead broker's queue, send thread is a infinite loop trying to 
> retry message to the dead broker, and the broker failure handler is waiting 
> for a lock). There are numerous partitions without a leader and the only way 
> out is to kill -9 the controller. *Proposed solution: add timeout for adding 
> message to broker's queue*. ControllerChannelManager.sendRequest:
> {code}
>   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
> (RequestOrResponse) => Unit = null) {
> brokerLock synchronized {
>   val stateInfoOpt = brokerStateInfo.get(brokerId)
>   stateInfoOpt match {
> case Some(stateInfo) =>
>   // ODKL Patch: prevent infinite hang on trying to send message to a 
> dead broker.
>   // TODO: Move timeout to config
>   if (!stateInfo.messageQueue.offer((request, callback), 10, 
> TimeUnit.SECONDS)) {
> error("Timed out trying to send message to broker " + 
> brokerId.toString)
> // Do not throw, as it brings controller into completely 
> non-functional state
> // "Co

[jira] [Created] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)
Jiangjie Qin created KAFKA-2042:
---

 Summary: New producer metadata update always get all topics.
 Key: KAFKA-2042
 URL: https://issues.apache.org/jira/browse/KAFKA-2042
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin


The new java producer metadata.topics is initially empty so the producer sends 
TMR with empty topic set. The broker takes the empty requested topic set as all 
topics, so metadata.cluster contains all topic metadata. Later on, when a new 
topic was produced, it gets added into the metadata.topics. The next metadata 
update will only contain the meta data for this new topic, so the 
metadata.cluster will only have this topic. Since there are a lot of messages 
are still in the accumulator but has no metadata in metadata.cluster, if a 
caller thread do a flush(), the caller thread will block forever because the 
messages sitting in accumulator without metadata will never be ready to send.
We should add check for the metadata.topics, if it is empty, no TMR should be 
sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377476#comment-14377476
 ] 

Jiangjie Qin commented on KAFKA-2042:
-

Created reviewboard https://reviews.apache.org/r/32434/diff/
 against branch origin/trunk

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
> Attachments: KAFKA-2042.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32434: Patch for KAFKA-2042

2015-03-24 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32434/
---

Review request for kafka.


Bugs: KAFKA-2042
https://issues.apache.org/jira/browse/KAFKA-2042


Repository: kafka


Description
---

Patch for KAFKA-2042. Do not update metadata for empty topic set in new producer


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
f4295025c28e2842244dc775052b7a3d30fb9d11 

Diff: https://reviews.apache.org/r/32434/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2042:

Attachment: KAFKA-2042.patch

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
> Attachments: KAFKA-2042.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2042:

Assignee: Jiangjie Qin
  Status: Patch Available  (was: Open)

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2042.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append

2015-03-24 Thread Grant Henke (JIRA)
Grant Henke created KAFKA-2043:
--

 Summary: CompressionType is passed in each RecordAccumulator append
 Key: KAFKA-2043
 URL: https://issues.apache.org/jira/browse/KAFKA-2043
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.0
Reporter: Grant Henke
Assignee: Grant Henke
Priority: Minor


Currently org.apache.kafka.clients.producer.internals.RecordAccumulator append 
method accepts the compressionType on a per record basis. It looks like the 
code would only work on a per batch basis because the CompressionType is only 
used when creating a new RecordBatch. My understanding is this should only 
support setting per batch at most. 

public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
value, CompressionType compression, Callback callback) throws 
InterruptedException;

The compression type is a producer
level config. Instead of passing it in for each append, we probably should
just pass it in once during the creation RecordAccumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32440: Patch for KAFKA-2043

2015-03-24 Thread Grant Henke

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32440/
---

Review request for kafka.


Bugs: KAFKA-2043
https://issues.apache.org/jira/browse/KAFKA-2043


Repository: kafka


Description
---

CompressionType is passed in each RecordAccumulator append


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 24274a64885fadd0e9318de2beb232218ddd52cd 

Diff: https://reviews.apache.org/r/32440/diff/


Testing
---


Thanks,

Grant Henke



[jira] [Commented] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append

2015-03-24 Thread Grant Henke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378046#comment-14378046
 ] 

Grant Henke commented on KAFKA-2043:


Created reviewboard https://reviews.apache.org/r/32440/diff/
 against branch origin/trunk

> CompressionType is passed in each RecordAccumulator append
> --
>
> Key: KAFKA-2043
> URL: https://issues.apache.org/jira/browse/KAFKA-2043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.2.0
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Minor
> Attachments: KAFKA-2043.patch
>
>
> Currently org.apache.kafka.clients.producer.internals.RecordAccumulator 
> append method accepts the compressionType on a per record basis. It looks 
> like the code would only work on a per batch basis because the 
> CompressionType is only used when creating a new RecordBatch. My 
> understanding is this should only support setting per batch at most. 
> public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
> value, CompressionType compression, Callback callback) throws 
> InterruptedException;
> The compression type is a producer
> level config. Instead of passing it in for each append, we probably should
> just pass it in once during the creation RecordAccumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append

2015-03-24 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated KAFKA-2043:
---
Status: Patch Available  (was: Open)

> CompressionType is passed in each RecordAccumulator append
> --
>
> Key: KAFKA-2043
> URL: https://issues.apache.org/jira/browse/KAFKA-2043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.2.0
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Minor
> Attachments: KAFKA-2043.patch
>
>
> Currently org.apache.kafka.clients.producer.internals.RecordAccumulator 
> append method accepts the compressionType on a per record basis. It looks 
> like the code would only work on a per batch basis because the 
> CompressionType is only used when creating a new RecordBatch. My 
> understanding is this should only support setting per batch at most. 
> public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
> value, CompressionType compression, Callback callback) throws 
> InterruptedException;
> The compression type is a producer
> level config. Instead of passing it in for each append, we probably should
> just pass it in once during the creation RecordAccumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append

2015-03-24 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated KAFKA-2043:
---
Attachment: KAFKA-2043.patch

> CompressionType is passed in each RecordAccumulator append
> --
>
> Key: KAFKA-2043
> URL: https://issues.apache.org/jira/browse/KAFKA-2043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.2.0
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Minor
> Attachments: KAFKA-2043.patch
>
>
> Currently org.apache.kafka.clients.producer.internals.RecordAccumulator 
> append method accepts the compressionType on a per record basis. It looks 
> like the code would only work on a per batch basis because the 
> CompressionType is only used when creating a new RecordBatch. My 
> understanding is this should only support setting per batch at most. 
> public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
> value, CompressionType compression, Callback callback) throws 
> InterruptedException;
> The compression type is a producer
> level config. Instead of passing it in for each append, we probably should
> just pass it in once during the creation RecordAccumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: New Producer Questions/Feedback

2015-03-24 Thread Grant Henke
Here is the jira: https://issues.apache.org/jira/browse/KAFKA-2043

Thanks,
Grant

On Mon, Mar 23, 2015 at 11:53 PM, Jun Rao  wrote:

> RecordAccumulator is actually not part of the public api since it's
> internal. The public apis are only those in
>
> http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
>
> Thanks,
>
> Jun
>
> On Mon, Mar 23, 2015 at 9:23 PM, Grant Henke  wrote:
>
> > Thanks for validating that. I was thinking of solving it in the same
> > fashion. Though I was unsure if there was/would be a use case to have
> > multiple CompressionTypes in the same RecordAccumulator since the API was
> > originally created this way.
> >
> > I would be happy to file a jira and can take on making the change too.
> > Since
> > RecordAccumulator is part of the public api, should the KIP process be
> > followed here as well?
> >
> > On Mon, Mar 23, 2015 at 10:58 PM, Jun Rao  wrote:
> >
> > > Hi, Grant,
> > >
> > > The append api seems indeed a bit weird. The compression type is a
> > producer
> > > level config. Instead of passing it in for each append, we probably
> > should
> > > just pass it in once during the creation RecordAccumulator. Could you
> > file
> > > a jira to track this?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Mar 23, 2015 at 7:16 PM, Grant Henke 
> > wrote:
> > >
> > > > I am reading over the new producer code in an effort to understand
> the
> > > > implementation more thoroughly and had some questions/feedback.
> > > >
> > > > Currently
> org.apache.kafka.clients.producer.internals.RecordAccumulator
> > > > append method accepts the compressionType on a per record basis. It
> > looks
> > > > like the code would only work on a per batch basis because the
> > > > CompressionType is only used when creating a new RecordBatch. My
> > > > understanding is this should only support setting per batch at most.
> I
> > > may
> > > > have misread this though. Is there a time where setting per record
> > would
> > > > make sense?
> > > >
> > > > public RecordAppendResult append(TopicPartition tp, byte[] key,
> > > byte[]
> > > > value, CompressionType compression, Callback callback) throws
> > > > InterruptedException;
> > > >
> > > > Why does org.apache.kafka.common.serialization.Serializer Interface
> > > require
> > > > a topic?  Is there a use case where serialization would change based
> on
> > > > topic?
> > > >
> > > >public byte[] serialize(String topic, T data);
> > > >
> > > > Thank you,
> > > > Grant
> > > >
> > > > --
> > > > Grant Henke
> > > > Solutions Consultant | Cloudera
> > > > ghe...@cloudera.com | 920-980-8979
> > > > twitter.com/ghenke  |
> > > > linkedin.com/in/granthenke
> > > >
> > >
> >
> >
> >
> > --
> > Grant Henke
> > Solutions Consultant | Cloudera
> > ghe...@cloudera.com | 920-980-8979
> > twitter.com/ghenke  |
> > linkedin.com/in/granthenke
> >
>



-- 
Grant Henke
Solutions Consultant | Cloudera
ghe...@cloudera.com | 920-980-8979
twitter.com/ghenke  | linkedin.com/in/granthenke


[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-03-24 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378064#comment-14378064
 ] 

Parth Brahmbhatt commented on KAFKA-1688:
-

[~prasadm] Thanks for taking the time to review. 
* Personally, I don't like super user concept primarily because even though it 
provides convenience it also increases the blast radius of the entire system. 
If a user's credentials are compromised in the current design only topics and 
actions he can perform on a cluster are compromised. However I think its fair 
to provide this feature so the users can make that choice on their own. I will 
update the KIP to reflect this as part of the proposal and let others vote for 
it.
The user's won't have to grant permissions on non existing topics in absence of 
the super user concept. However they will have to supply permissions during 
topic creation and they will be allowed to alter these ACLs via alter topic 
command line tool.
* I can add ALL as an item to the Operation enum. For now, like most other 
permissions it will only be applicable to topics.

> Add authorization interface and naive implementation
> 
>
> Key: KAFKA-1688
> URL: https://issues.apache.org/jira/browse/KAFKA-1688
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Parth Brahmbhatt
> Fix For: 0.8.3
>
>
> Add a PermissionManager interface as described here:
> https://cwiki.apache.org/confluence/display/KAFKA/Security
> (possibly there is a better name?)
> Implement calls to the PermissionsManager in KafkaApis for the main requests 
> (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
> exception to the protocol to indicate "permission denied".
> Add a server configuration to give the class you want to instantiate that 
> implements that interface. That class can define its own configuration 
> properties from the main config file.
> Provide a simple implementation of this interface which just takes a user and 
> ip whitelist and permits those in either of the whitelists to do anything, 
> and denies all others.
> Rather than writing an integration test for this class we can probably just 
> use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2042:

Priority: Blocker  (was: Major)

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Attachments: KAFKA-2042.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-03-24 Thread Don Bosco Durai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378100#comment-14378100
 ] 

Don Bosco Durai commented on KAFKA-1688:


[~prasadm], 

I agree with you, we should support "ALL" or equivalent keyword like (*) in the 
default implementation. 

I remember the Hierarchical subject brought up during one the google hangout 
discussion around authorization. I don't think there were any resolutions 
around it.

Does it make sense to make this as a custom implementation feature? So for 
OOTB, it would be be just topic name, but anyone who want to implement 
hierarchical privileges can parse the topic name and use "." or any other 
supported character has delimiter and provide namespace/database like 
permissions.

FYI, it seems, hierarchical Topics was discussed back in 2012 
https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics


> Add authorization interface and naive implementation
> 
>
> Key: KAFKA-1688
> URL: https://issues.apache.org/jira/browse/KAFKA-1688
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Parth Brahmbhatt
> Fix For: 0.8.3
>
>
> Add a PermissionManager interface as described here:
> https://cwiki.apache.org/confluence/display/KAFKA/Security
> (possibly there is a better name?)
> Implement calls to the PermissionsManager in KafkaApis for the main requests 
> (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
> exception to the protocol to indicate "permission denied".
> Add a server configuration to give the class you want to instantiate that 
> implements that interface. That class can define its own configuration 
> properties from the main config file.
> Provide a simple implementation of this interface which just takes a user and 
> ip whitelist and permits those in either of the whitelists to do anything, 
> and denies all others.
> Rather than writing an integration test for this class we can probably just 
> use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1973) Remove the accidentally created LogCleanerManager.scala.orig

2015-03-24 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-1973:
--

Assignee: Grant Henke

> Remove the accidentally created LogCleanerManager.scala.orig
> 
>
> Key: KAFKA-1973
> URL: https://issues.apache.org/jira/browse/KAFKA-1973
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Grant Henke
> Attachments: KAFKA-1973.patch
>
>
> It seems there is a LogCleanerManager.scala.orig in the trunk now. Need to 
> remove it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1991) Clean ControllerStats initialization

2015-03-24 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-1991:
--

Assignee: Grant Henke

> Clean ControllerStats initialization
> 
>
> Key: KAFKA-1991
> URL: https://issues.apache.org/jira/browse/KAFKA-1991
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.2.0
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Trivial
> Attachments: KAFKA-1991.patch
>
>
> This is just a trivial clean up. Values defined by an object are instantiated 
> lazily and are initialized the first time the object is used. 
> This could cause confusion and down the road issues about when/what metrics 
> are initialized in the ControllerStats object. KafkaServer.scala makes a call 
> to each value to initialize it but Scala is not actually behaving that way.
> The change matches the BrokerTopicStats implementation
> example:
> scala> object ControllerStats {
>  |val uncleanLeaderElectionRate = {
>  | println("initializing uncleanLeaderElectionRate")
>  | "uncleanLeaderElectionRate"
>  | }
>  |val leaderElectionTimer = {
>  | println("initializing leaderElectionTimer")
>  | "leaderElectionTimer"
>  |}
>  | }
> defined object ControllerStats
> scala> ControllerStats.uncleanLeaderElectionRate
> initializing uncleanLeaderElectionRate
> initializing leaderElectionTimer
> res7: String = uncleanLeaderElectionRate



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32440: Patch for KAFKA-2043

2015-03-24 Thread Mayuresh Gharat

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32440/#review77593
---



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java


Since its a Producer level config, is this change needed. We can keep it as 
an instance variable. Also since the compression type does not change, the 
"private final" makes it more clear. What do you think?


- Mayuresh Gharat


On March 24, 2015, 3:51 p.m., Grant Henke wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32440/
> ---
> 
> (Updated March 24, 2015, 3:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2043
> https://issues.apache.org/jira/browse/KAFKA-2043
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> CompressionType is passed in each RecordAccumulator append
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
>  24274a64885fadd0e9318de2beb232218ddd52cd 
> 
> Diff: https://reviews.apache.org/r/32440/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Grant Henke
> 
>



[jira] [Created] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-24 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-2044:
---

 Summary: Support requests and responses from o.a.k.common in 
KafkaApis
 Key: KAFKA-2044
 URL: https://issues.apache.org/jira/browse/KAFKA-2044
 Project: Kafka
  Issue Type: Improvement
Reporter: Gwen Shapira
Assignee: Gwen Shapira


As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to support 
handling of requests and responses from o.a.k.common in KafkaApis.

This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32448: Patch for KAFKA-1927

2015-03-24 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32448/
---

Review request for kafka.


Bugs: KAFKA-1927
https://issues.apache.org/jira/browse/KAFKA-1927


Repository: kafka


Description
---

support requests and responses using Common api in core modules (missing files)


Diffs
-

  core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
  core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
9a71faae3138af1b4fb48125db619ddc3ad13c5a 
  core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
3651e8603dd0ed0d2ea059786c68cf0722aa094b 
  core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
  core/src/main/scala/kafka/api/RequestKeys.scala 
c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
55ecac285e00abf38d7131368bb46b4c4010dc87 
  core/src/main/scala/kafka/network/RequestChannel.scala 
7b1db3dbbb2c0676f166890f566c14aa248467ab 
  core/src/main/scala/kafka/server/KafkaApis.scala 
35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 

Diff: https://reviews.apache.org/r/32448/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-24 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1927:

Attachment: KAFKA-1927.patch

> Replace requests in kafka.api with requests in 
> org.apache.kafka.common.requests
> ---
>
> Key: KAFKA-1927
> URL: https://issues.apache.org/jira/browse/KAFKA-1927
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1927.patch
>
>
> The common package introduced a better way of defining requests using a new 
> protocol definition DSL and also includes wrapper objects for these.
> We should switch KafkaApis over to use these request definitions and consider 
> the scala classes deprecated (we probably need to retain some of them for a 
> while for the scala clients).
> This will be a big improvement because
> 1. We will have each request now defined in only one place (Protocol.java)
> 2. We will have built-in support for multi-version requests
> 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-24 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1927:

Status: Patch Available  (was: Open)

> Replace requests in kafka.api with requests in 
> org.apache.kafka.common.requests
> ---
>
> Key: KAFKA-1927
> URL: https://issues.apache.org/jira/browse/KAFKA-1927
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1927.patch
>
>
> The common package introduced a better way of defining requests using a new 
> protocol definition DSL and also includes wrapper objects for these.
> We should switch KafkaApis over to use these request definitions and consider 
> the scala classes deprecated (we probably need to retain some of them for a 
> while for the scala clients).
> This will be a big improvement because
> 1. We will have each request now defined in only one place (Protocol.java)
> 2. We will have built-in support for multi-version requests
> 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-24 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378236#comment-14378236
 ] 

Gwen Shapira commented on KAFKA-1927:
-

The uploaded patch is:
1. Preliminary and untested, just to show what I had in mind. 
2. Actually belongs in KAFKA-2044...

> Replace requests in kafka.api with requests in 
> org.apache.kafka.common.requests
> ---
>
> Key: KAFKA-1927
> URL: https://issues.apache.org/jira/browse/KAFKA-1927
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1927.patch
>
>
> The common package introduced a better way of defining requests using a new 
> protocol definition DSL and also includes wrapper objects for these.
> We should switch KafkaApis over to use these request definitions and consider 
> the scala classes deprecated (we probably need to retain some of them for a 
> while for the scala clients).
> This will be a big improvement because
> 1. We will have each request now defined in only one place (Protocol.java)
> 2. We will have built-in support for multi-version requests
> 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-24 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378231#comment-14378231
 ] 

Gwen Shapira commented on KAFKA-1927:
-

Created reviewboard https://reviews.apache.org/r/32448/diff/
 against branch trunk

> Replace requests in kafka.api with requests in 
> org.apache.kafka.common.requests
> ---
>
> Key: KAFKA-1927
> URL: https://issues.apache.org/jira/browse/KAFKA-1927
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1927.patch
>
>
> The common package introduced a better way of defining requests using a new 
> protocol definition DSL and also includes wrapper objects for these.
> We should switch KafkaApis over to use these request definitions and consider 
> the scala classes deprecated (we probably need to retain some of them for a 
> while for the scala clients).
> This will be a big improvement because
> 1. We will have each request now defined in only one place (Protocol.java)
> 2. We will have built-in support for multi-version requests
> 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32440: Patch for KAFKA-2043

2015-03-24 Thread Grant Henke


> On March 24, 2015, 5 p.m., Mayuresh Gharat wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
> > line 134
> > 
> >
> > Since its a Producer level config, is this change needed. We can keep 
> > it as an instance variable. Also since the compression type does not 
> > change, the "private final" makes it more clear. What do you think?

I don't mind leaving the instance level config. However, since it is not used 
anywhere but the constructor I don't see the value in it. If we want to mark it 
as final we can in the constructor and have the same clarity. The only reason I 
didn't initially is because the other code did not seam to follow the style of 
putting final on everything. (Note: I would prefer to put final on everything)


- Grant


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32440/#review77593
---


On March 24, 2015, 3:51 p.m., Grant Henke wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32440/
> ---
> 
> (Updated March 24, 2015, 3:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2043
> https://issues.apache.org/jira/browse/KAFKA-2043
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> CompressionType is passed in each RecordAccumulator append
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
>  24274a64885fadd0e9318de2beb232218ddd52cd 
> 
> Diff: https://reviews.apache.org/r/32440/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Grant Henke
> 
>



Re: [Discussion] Using Client Requests and Responses in Server

2015-03-24 Thread Gwen Shapira
Hi,

I uploaded a (very) preliminary patch with my idea.

One thing thats missing:
RequestResponse had  handleError method that all requests implemented,
typically generating appropriate error Response for the request and sending
it along. Its used by KafkaApis to handle all protocol errors for valid
requests that are not handled elsewhere.
AbstractRequestResponse doesn't have such method.

I can, of course, add it.
But before I jump into this, I'm wondering if there was another plan on
handling Api errors.

Gwen

On Mon, Mar 23, 2015 at 6:16 PM, Jun Rao  wrote:

> I think what you are saying is that in RequestChannel, we can start
> generating header/body for new request types and leave requestObj null. For
> existing requests, header/body will be null initially. Gradually, we can
> migrate each type of requests by populating header/body, instead of
> requestObj. This makes sense to me since it serves two purposes (1) not
> polluting the code base with duplicated request/response objects for new
> types of requests and (2) allowing the refactoring of existing requests to
> be done in smaller pieces.
>
> Could you try that approach and perhaps just migrate one existing request
> type (e.g. HeartBeatRequest) as an example? We probably need to rewind the
> buffer after reading the requestId when deserializing the header (since the
> header includes the request id).
>
> Thanks,
>
> Jun
>
> On Mon, Mar 23, 2015 at 4:52 PM, Gwen Shapira 
> wrote:
>
> > I'm thinking of a different approach, that will not fix everything, but
> > will allow adding new requests without code duplication (and therefore
> > unblock KIP-4):
> >
> > RequestChannel.request currently takes a buffer and parses it into an
> "old"
> > request object. Since the objects are byte-compatibly, we should be able
> to
> > parse existing requests into both old and new objects. New requests will
> > only be parsed into new objects.
> >
> > Basically:
> > val requestId = buffer.getShort()
> > if (requestId in keyToNameAndDeserializerMap) {
> >requestObj = RequestKeys.deserializerForKey(requestId)(buffer)
> >header: RequestHeader = RequestHeader.parse(buffer)
> >body: Struct =
> > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
> > } else {
> >requestObj = null
> > header: RequestHeader = RequestHeader.parse(buffer)
> >body: Struct =
> > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
> > }
> >
> > This way existing KafkaApis will keep working as normal. The new Apis can
> > implement just the new header/body requests.
> > We'll do the same on the send-side: BoundedByteBufferSend can have a
> > constructor that takes header/body instead of just a response object.
> >
> > Does that make sense?
> >
> > Once we have this in, we can move to:
> > * Adding the missing request/response to the client code
> > * Replacing requests that can be replaced
> >
> > It will also make life easier by having us review and tests smaller
> chunks
> > of work (the existing patch is *huge* , touches nearly every core
> component
> > and I'm not done yet...)
> >
> > Gwen
> >
> >
> >
> >
> > On Sun, Mar 22, 2015 at 10:24 PM, Jay Kreps  wrote:
> >
> > > Ack, yeah, forgot about that.
> > >
> > > It's not just a difference of wrappers. The server side actually sends
> > the
> > > bytes lazily using FileChannel.transferTo. We need to make it possible
> to
> > > carry over that optimization. In some sense what we want to be able to
> do
> > > is set a value to a Send instead of a ByteBuffer.
> > >
> > > Let me try to add that support to the protocol definition stuff, will
> > > probably take me a few days to free up time.
> > >
> > > -Jay
> > >
> > > On Sun, Mar 22, 2015 at 7:44 PM, Gwen Shapira 
> > > wrote:
> > >
> > > > In case anyone is still following this thread, I need a bit of help
> :)
> > > >
> > > > The old FetchResponse.PartitionData included a MessageSet object.
> > > > The new FetchResponse.PartitionData includes a ByteBuffer.
> > > >
> > > > However, when we read from logs, we return a MessageSet, and as far
> as
> > I
> > > > can see, these can't be converted to ByteBuffers (at least not
> without
> > > > copying their data).
> > > >
> > > > Did anyone consider how to reconcile the MessageSets with the new
> > > > FetchResponse objects?
> > > >
> > > > Gwen
> > > >
> > > >
> > > > On Sat, Mar 21, 2015 at 6:54 PM, Gwen Shapira  >
> > > > wrote:
> > > >
> > > > > Note: I'm also treating ZkUtils as if it was a public API (i.e.
> > > > converting
> > > > > objects that are returned into o.a.k.common equivalents but not
> > > changing
> > > > > ZkUtils itself).
> > > > > I know its not public, but I suspect I'm not the only developer
> here
> > > who
> > > > > has tons of external code that uses it.
> > > > >
> > > > > Gwen
> > > > >
> > > > > On Wed, Mar 18, 2015 at 5:48 PM, Gwen Shapira <
> gshap...@cloudera.com
> > >
> > > > > wrote:
> > > > >
> > > > >> We can't rip them out completely, unfor

[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication

2015-03-24 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378335#comment-14378335
 ] 

Sriharsha Chintalapani commented on KAFKA-1684:
---

[~gwenshap] if you have patch available for KAFKA-1928 can you please upload 
it. I can modify my ssl and kerberos patches according to the new code. 

> Implement TLS/SSL authentication
> 
>
> Key: KAFKA-1684
> URL: https://issues.apache.org/jira/browse/KAFKA-1684
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Affects Versions: 0.9.0
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
> Attachments: KAFKA-1684.patch, KAFKA-1684.patch
>
>
> Add an SSL port to the configuration and advertise this as part of the 
> metadata request.
> If the SSL port is configured the socket server will need to add a second 
> Acceptor thread to listen on it. Connections accepted on this port will need 
> to go through the SSL handshake prior to being registered with a Processor 
> for request processing.
> SSL requests and responses may need to be wrapped or unwrapped using the 
> SSLEngine that was initialized by the acceptor. This wrapping and unwrapping 
> is very similar to what will need to be done for SASL-based authentication 
> schemes. We should have a uniform interface that covers both of these and we 
> will need to store the instance in the session with the request. The socket 
> server will have to use this object when reading and writing requests. We 
> will need to take care with the FetchRequests as the current 
> FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we 
> can only use this optimization for unencrypted sockets that don't require 
> userspace translation (wrapping).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: 0.8.1 migrator

2015-03-24 Thread syyang
GitHub user syyang opened a pull request:

https://github.com/apache/kafka/pull/52

0.8.1 migrator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uber/kafka 0.8.1-migrator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/52.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #52


commit 405a04d958b1e7a8c997610d761e94c52b782c66
Author: Norbert Hu 
Date:   2015-02-11T05:32:45Z

Create debian package for 0.8.1

commit 647fac5824d6e1e5b644770505d6e6f8bc9e5d61
Author: Norbert Hu 
Date:   2015-02-11T05:38:19Z

Use scala 2.8.0

commit a2767a3e12350351bdd45c88e420936c1d2e4ed9
Author: Norbert Hu 
Date:   2015-02-11T19:03:56Z

Install 0.8.1 debian package as "leaf"

commit 0c66bb6d60d7f6a9db8f7866149348766a9e9f7c
Author: Norbert Hu 
Date:   2015-02-15T03:40:51Z

Add jmxtrans-agent to debian package

This makes exporting jmx metrics via the deployed debian package much
easier. See https://github.com/jmxtrans/jmxtrans-agent

NOTE: the inclusion of the jmxtrans-agent jar is mostly for the
migration tool as well as the mirror maker, which doesn't expose an
easy way to export metrics to graphite

commit 505e2d01d96513217c914f34fa0021c64d09d258
Author: Norbert Hu 
Date:   2015-03-12T00:15:17Z

Kafka migrator tool is buggy with JDK7

See {T63657}

commit b45239268609b761e07d994764165307466cf470
Author: Norbert Hu 
Date:   2015-03-14T06:42:12Z

Revert "Kafka migrator tool is buggy with JDK7"

This reverts commit 505e2d01d96513217c914f34fa0021c64d09d258.

commit b96e540a5602adf0b85e83b843d8f850c046bbe5
Author: Norbert Hu 
Date:   2015-03-18T20:07:21Z

Add arcanist config

commit d2d137d320e4776b3a9dbde4b630a4ebc0d5b331
Author: Norbert Hu 
Date:   2015-03-18T21:05:04Z

Add arc land branch

commit 998bc66fa3a368b37aa2e31f5e167b4cfe9c62b1
Author: Norbert Hu 
Date:   2015-03-18T21:12:01Z

Update debian comment

Reviewers: grk

Reviewed By: grk

Differential Revision: https://code.uberinternal.com/D83314

commit b65536b94705b0b890eb510df5610ab0e05df7e9
Author: Seung-Yeoul Yang 
Date:   2015-03-19T00:57:10Z

[kafka] better error recovery in migrator tool (part 1)

Summary:
* Add a new gradle project for the migrator tool
* Update .gitignore
* KafkaMigrationTool is just copy-pasted.

Test Plan:
  ./gradlew test

Reviewers: grk, praveen, vinoth, csoman, norbert

Reviewed By: norbert

Subscribers: bigpunk, nharkins

Maniphest Tasks: T67641

Differential Revision: https://code.uberinternal.com/D83463

commit ce43cf232fa02769ce774c46aa4b391c5224f471
Author: Seung-Yeoul Yang 
Date:   2015-03-19T01:12:08Z

[kafka] better error recovery in migration tool (part 2)

Summary:
* check topic whiltelist/blacklist before running the migration tool
* fix corrupted offsets upon detection
* fail fast on erorrs and sigterm

Test plan:
* Added unit tests
* Manually tested that corrupted offsets are fixed

Reviewers: norbert, grk, praveen, csoman, vinoth

Subscribers: nharkins, bigpunk

Maniphest Tasks: T67641

Differential Revision: https://code.uberinternal.com/D84682

commit d6a25ef7e74f5cf74346700e80e05e6a9f6370c4
Author: Seung-Yeoul Yang 
Date:   2015-03-19T01:12:08Z

[kafka] better error recovery in migration tool (part 2)

Summary:
* check topic whiltelist/blacklist before running the migration tool
* fix corrupted offsets upon detection
* fail fast on erorrs and sigterm

Test plan:
* Added unit tests
* Manually tested that corrupted offsets are fixed

Reviewers: norbert, grk, praveen, csoman, vinoth

Subscribers: nharkins, bigpunk

Maniphest Tasks: T67641

Differential Revision: https://code.uberinternal.com/D84682

commit d257ad6bef663693c3de96562189fdf83a444ed8
Author: Seung-Yeoul Yang 
Date:   2015-03-24T01:13:32Z

foobar

commit ccb63bd8940f9c02416e4a66925d7d048f1fc3a8
Author: Seung-Yeoul Yang 
Date:   2015-03-24T01:13:32Z

foobar




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request: 0.8.1 migrator

2015-03-24 Thread syyang
Github user syyang closed the pull request at:

https://github.com/apache/kafka/pull/52


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Review Request 32440: Patch for KAFKA-2043

2015-03-24 Thread Mayuresh Gharat


> On March 24, 2015, 5 p.m., Mayuresh Gharat wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
> > line 134
> > 
> >
> > Since its a Producer level config, is this change needed. We can keep 
> > it as an instance variable. Also since the compression type does not 
> > change, the "private final" makes it more clear. What do you think?
> 
> Grant Henke wrote:
> I don't mind leaving the instance level config. However, since it is not 
> used anywhere but the constructor I don't see the value in it. If we want to 
> mark it as final we can in the constructor and have the same clarity. The 
> only reason I didn't initially is because the other code did not seam to 
> follow the style of putting final on everything. (Note: I would prefer to put 
> final on everything)

Completely agree that its not used as much. Not having it as instance level 
config is not going to effect the functionality, just that someone would prefer 
the configs to be together.


- Mayuresh


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32440/#review77593
---


On March 24, 2015, 3:51 p.m., Grant Henke wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32440/
> ---
> 
> (Updated March 24, 2015, 3:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2043
> https://issues.apache.org/jira/browse/KAFKA-2043
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> CompressionType is passed in each RecordAccumulator append
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
>  24274a64885fadd0e9318de2beb232218ddd52cd 
> 
> Diff: https://reviews.apache.org/r/32440/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Grant Henke
> 
>



[jira] [Created] (KAFKA-2045) Memory Management on the consumer

2015-03-24 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-2045:


 Summary: Memory Management on the consumer
 Key: KAFKA-2045
 URL: https://issues.apache.org/jira/browse/KAFKA-2045
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


We need to add the memory management on the new consumer like we did in the new 
producer. This would probably include:

1. byte buffer re-usage for fetch response partition data.

2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2045) Memory Management on the consumer

2015-03-24 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2045:
-
Issue Type: Sub-task  (was: Bug)
Parent: KAFKA-1326

> Memory Management on the consumer
> -
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2015-03-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378397#comment-14378397
 ] 

Guozhang Wang commented on KAFKA-1461:
--

Sorry for the delay, I will take a look at 31366 today.

> Replica fetcher thread does not implement any back-off behavior
> ---
>
> Key: KAFKA-1461
> URL: https://issues.apache.org/jira/browse/KAFKA-1461
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.1.1
>Reporter: Sam Meder
>Assignee: Sriharsha Chintalapani
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1461.patch, KAFKA-1461.patch, 
> KAFKA-1461_2015-03-11_10:41:26.patch, KAFKA-1461_2015-03-11_18:17:51.patch, 
> KAFKA-1461_2015-03-12_13:54:51.patch, KAFKA-1461_2015-03-17_16:03:33.patch
>
>
> The current replica fetcher thread will retry in a tight loop if any error 
> occurs during the fetch call. For example, we've seen cases where the fetch 
> continuously throws a connection refused exception leading to several replica 
> fetcher threads that spin in a pretty tight loop.
> To a much lesser degree this is also an issue in the consumer fetcher thread, 
> although the fact that erroring partitions are removed so a leader can be 
> re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Fwd: MirrorMaker improvements

2015-03-24 Thread vlad...@gmail.com
Dear all,

I had a short discussion with Jay yesterday at the ACM meetup and he
suggested writing an email regarding a few possible MirrorMaker
improvements.

At Turn, we have been using MirrorMaker for a a few months now to
asynchronously replicate our key/value store data between our datacenters.
In a way, our system is similar to Linkedin's Databus, but it uses Kafka
clusters and MirrorMaker as its building blocks. Our overall message rate
peaks at about 650K/sec and, when pushing data over high bandwidth delay
product links, we have found some minor bottlenecks.

The MirrorMaker process uses a standard consumer to pull data from a remote
datacenter. This implies that it opens a single TCP connection to each of
the remote brokers and muxes requests for different topics and partitions
over this connection. While this is a good thing in terms of maintaining
the congestion window open, over long RTT lines with rather high loss rate
the congestion window will cap, in our case at just a few Mbps. While the
overall line bandwidth is much higher, this means that we have to start
multiple MirrorMaker processes (somewhere in the hundreds), in order to
completely use the line capacity. Being able to pool multiple TCP
connections from a single consumer to a broker would solve this
complication.

The standard consumer also uses the remote ZooKeeper in order to manage the
consumer group. While consumer group management is moving closer to the
brokers, it might make sense to move the group management to the local
datacenter, since that would avoid using the long-distance connection for
this purpose.

Another possible improvement assumes a further constraint, namely that the
number of partitions for a topic in both datacenters is the same. In my
opinion, this is a sane constraint, since it preserves the Kafka ordering
guarantees (per partition), instead of a simple guarantee per key. This
kind of guarantee can be for example useful in a system that compares
partition contents to reach eventual consistency using Merkle trees. If the
number of partitions is equal, then offsets have the same meaning for the
same partition in both clusters, since the data for both partitions is
identical before the offset. This allows a simple consumer to just inquire
the local broker and the remote broker for their current offsets and, in
case the remote broker is ahead, copy the extra data to the local cluster.
Since the consumer offsets are no longer bound to the specific partitioning
of a single remote cluster, the consumer could pull from one of any number
of remote clusters, BitTorrent-style, if their offsets are ahead of the
local offset. The group management problem would reduce to assigning local
partitions to different MirrorMaker processes, so the group management
could be done locally also in this situation.

Regards,
Vlad

PS: Sorry if this is a double posting! The original posting did not appear
in the archives for a while.


Re: [VOTE] KIP-15 add a close method with timeout to KafkaProducer

2015-03-24 Thread Jiangjie Qin
Push up the thread for voting after discussion on the KIP hangout.

On 3/19/15, 9:03 PM, "Jiangjie Qin"  wrote:

>We had some additional discussions on the discussion thread. Pushing up
>this thread to resume voting.
>
>On 3/11/15, 8:47 PM, "Jay Kreps"  wrote:
>
>>Yeah guys, I'd like to second that. I'd really really love to get the
>>quality of these to the point where we could broadly solicit user input
>>and
>>use them as a permanent document of the alternatives and rationale.
>>
>>I know it is a little painful to have process, but I think we all saw
>>what
>>happened to the previous clients as public interfaces so I really really
>>really want us to just be incredibly thoughtful and disciplined as we
>>make
>>changes. I think we all want to avoid another "client rewrite".
>>
>>To second Joe's question in a more specific way, I think an alternative I
>>don't see considered to give close() a bounded time is just to enforce
>>the
>>request time on the client side, which will cause all requests to be
>>failed
>>after the request timeout expires. This was the same behavior as for
>>flush.
>>In the case where the user just wants to ensure close doesn't block
>>forever
>>I think that may be sufficient?
>>
>>So one alternative might be to just do that request timeout feature and
>>add
>>a new producer config that is something like
>>  abort.on.failure=false
>>which causes the producer to hard exit if it can't send a request. Which
>>I
>>think is closer to what you want, with this just being a way to implement
>>that behavior.
>>
>>I'm not sure if this is better or worse, but we should be sure before we
>>make the change.
>>
>>I also have a concern about
>>  producer.close(0, TimeUnit.MILLISECONDS)
>>not meaning close with a timeout of 0 ms.
>>
>>I realize this exists in other java apis, but it is so confusing it even
>>confused us into having that recent producer bug because of course all
>>the
>>other numbers mean "wait that long".
>>
>>I'd propose
>>  close()--block until all completed
>>  close(0, TimeUnit.MILLISECONDS)--block for 0 ms
>>  close(5, TimeUnit.MILLISECONDS)--block for 5 ms
>>  close(-1, TimeUnit.MILLISECONDS)--error because blocking for negative
>>ms
>>would mean completing in the past :-)
>>
>>-Jay
>>
>>On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein  wrote:
>>
>>> Could the KIP confluence please have updated the discussion thread
>>>link,
>>> thanks... could you also remove the template boilerplate at the top
>>>"*This
>>> page is meant as a template ..*" so we can capture it for the release
>>> cleanly.
>>>
>>> Also I don't really/fully understand how this is different than
>>> flush(time); close() and why close has its own timeout also?
>>>
>>> Lastly, what is the forceClose flag? This isn't documented in the
>>>public
>>> interface so it isn't clear how to completely use the feature just by
>>> reading the KIP.
>>>
>>> ~ Joe Stein
>>> - - - - - - - - - - - - - - - - -
>>>
>>>   http://www.stealth.ly
>>> - - - - - - - - - - - - - - - - -
>>>
>>> On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang 
>>> wrote:
>>>
>>> > +1 (binding)
>>> >
>>> > On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin
> >
>>> > wrote:
>>> >
>>> > >
>>> > >
>>> >
>>> 
>>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m
>>>e
>>>thod+with+a+timeout+in+the+producer
>>> > >
>>> > >
>>> >
>>> >
>>> > --
>>> > -- Guozhang
>>> >
>>>
>



Re: [VOTE] KIP-15 add a close method with timeout to KafkaProducer

2015-03-24 Thread Joel Koshy
Actually, since there are already votes on this and the KIP has
changed a bit we should cancel this and start a new thread.

On Tue, Mar 24, 2015 at 12:19 PM, Jiangjie Qin
 wrote:
> Push up the thread for voting after discussion on the KIP hangout.
>
> On 3/19/15, 9:03 PM, "Jiangjie Qin"  wrote:
>
>>We had some additional discussions on the discussion thread. Pushing up
>>this thread to resume voting.
>>
>>On 3/11/15, 8:47 PM, "Jay Kreps"  wrote:
>>
>>>Yeah guys, I'd like to second that. I'd really really love to get the
>>>quality of these to the point where we could broadly solicit user input
>>>and
>>>use them as a permanent document of the alternatives and rationale.
>>>
>>>I know it is a little painful to have process, but I think we all saw
>>>what
>>>happened to the previous clients as public interfaces so I really really
>>>really want us to just be incredibly thoughtful and disciplined as we
>>>make
>>>changes. I think we all want to avoid another "client rewrite".
>>>
>>>To second Joe's question in a more specific way, I think an alternative I
>>>don't see considered to give close() a bounded time is just to enforce
>>>the
>>>request time on the client side, which will cause all requests to be
>>>failed
>>>after the request timeout expires. This was the same behavior as for
>>>flush.
>>>In the case where the user just wants to ensure close doesn't block
>>>forever
>>>I think that may be sufficient?
>>>
>>>So one alternative might be to just do that request timeout feature and
>>>add
>>>a new producer config that is something like
>>>  abort.on.failure=false
>>>which causes the producer to hard exit if it can't send a request. Which
>>>I
>>>think is closer to what you want, with this just being a way to implement
>>>that behavior.
>>>
>>>I'm not sure if this is better or worse, but we should be sure before we
>>>make the change.
>>>
>>>I also have a concern about
>>>  producer.close(0, TimeUnit.MILLISECONDS)
>>>not meaning close with a timeout of 0 ms.
>>>
>>>I realize this exists in other java apis, but it is so confusing it even
>>>confused us into having that recent producer bug because of course all
>>>the
>>>other numbers mean "wait that long".
>>>
>>>I'd propose
>>>  close()--block until all completed
>>>  close(0, TimeUnit.MILLISECONDS)--block for 0 ms
>>>  close(5, TimeUnit.MILLISECONDS)--block for 5 ms
>>>  close(-1, TimeUnit.MILLISECONDS)--error because blocking for negative
>>>ms
>>>would mean completing in the past :-)
>>>
>>>-Jay
>>>
>>>On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein  wrote:
>>>
 Could the KIP confluence please have updated the discussion thread
link,
 thanks... could you also remove the template boilerplate at the top
"*This
 page is meant as a template ..*" so we can capture it for the release
 cleanly.

 Also I don't really/fully understand how this is different than
 flush(time); close() and why close has its own timeout also?

 Lastly, what is the forceClose flag? This isn't documented in the
public
 interface so it isn't clear how to completely use the feature just by
 reading the KIP.

 ~ Joe Stein
 - - - - - - - - - - - - - - - - -

   http://www.stealth.ly
 - - - - - - - - - - - - - - - - -

 On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang 
 wrote:

 > +1 (binding)
 >
 > On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin
>>> >
 > wrote:
 >
 > >
 > >
 >

https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m
e
thod+with+a+timeout+in+the+producer
 > >
 > >
 >
 >
 > --
 > -- Guozhang
 >

>>
>


Re: KIP discussion Mar 24 at 11am PST

2015-03-24 Thread Jun Rao
Just to keep everyone posted. The following is a summary of what's being
discussed in the KIP hangout today.

KIP-4 (admin commands):
* Gwen is uploading a patch in KAFKA-1927 (refactoring requests) so that we
can get unblocked of adding new requests.
* We will combine DescribeTopic and TopicMetadata in the future.
* We will leave the admin requests async for now.
* We will not add a VerifyReassignPartitionRequest for now. We can do that
later when we improve the verification process.
* We need to discuss a bit more on how to expose the controller info to the
client.
* Andrii will send out more details on the KIP thread.

KIP-15 (close):
* If close() or close with a non-zero timeout is called from the send
thread, we will log it as an error.
* Jiangjie will follow up on the KIP thread.

KIP-13 (quota):
* Need a separate discussion on whether to use the new metrics package on
the broker on the mailing list.
* There are a few other details being discuss and Aditya will follow up on
the KIP thread.

Thanks,

Jun


On Fri, Mar 20, 2015 at 2:44 PM, Jun Rao  wrote:

> Hi, Everyone,
>
> We plan to have a KIP discussion on Google hangout on Mar 24 at 11am PST.
> If you are interested in participating and have not already received a
> calendar invitation, please let me know. The following is the agenda.
>
> KIP-4 (admin commands): 10 mins
> * status of KAFKA-1927 (refactoring requests). which blocks this KIP
> * status of KAFKA-1634 (improve OffsetCommitRequest), which blocks
> KAFKA-1927
> * any remaining issues for discussion
>
> KIP-15 (close): 10 mins
> * semantics of close() and close(timeout)
>
> KIP-13 (quota):
> * protocol change to reflect the state of throttling
> * dependency on using the new metrics package
> * dependency KIP-5 (broker configuration)
>
> Thanks,
>
> Jun
>


[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-24 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378433#comment-14378433
 ] 

Dmitry Bugaychenko commented on KAFKA-2029:
---

We tried prioritization of controller messages, but it din't help. 
Communication with a single broker is synchronous, but different brokers might 
handle requests on different speed - as a result with a large queue one broker 
can get way behind another one while controller thinks it is doing fine. 
Addiding "tracked" we ensure taht *all* brokers done the leadership movement, 
thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
...
  private val requestQueue = new 
LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in 
the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId 
== RequestKeys.StopReplicaKey) {
  // ODKL Patch: prioritize controller requests over data requests.
  requestQueue.putFirst(request)
  info("Escalated controller request: " + 
request.requestObj.describe(details = true))
} else {
  requestQueue.putLast(request)
}
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
requestQueue.takeFirst()
...
{code}

It increased GC overhead but didn't improved the speed of partitions movement - 
it looks like the network request processing is not the botlleneck.

> Improving controlled shutdown for rolling updates
> -
>
> Key: KAFKA-2029
> URL: https://issues.apache.org/jira/browse/KAFKA-2029
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Dmitry Bugaychenko
>Assignee: Neha Narkhede
>Priority: Critical
>
> Controlled shutdown as implemented currently can cause numerous problems: 
> deadlocks, local and global datalos, partitions without leader and etc. In 
> some cases the only way to restore cluster is to stop it completelly using 
> kill -9 and start again.
> Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
> queue size makes things much worse (see discussion there).
> Note 2: The problems described here can occure in any setup, but they are 
> extremly painful in setup with large brokers (36 disks, 1000+ assigned 
> partitions per broker in our case).
> Note 3: These improvements are actually workarounds and it is worth to 
> consider global refactoring of the controller (make it single thread, or even 
> get rid of it in the favour of ZK leader elections for partitions).
> The problems and improvements are:
> # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
> shutdown requests and finally considers it as failed and procedes to unclean 
> shutdow, controller got stuck for a while (holding a lock waiting for free 
> space in controller-to-broker queue). After broker starts back it receives 
> followers request and erases highwatermarks (with a message that "replica 
> does not exists" - controller hadn't yet sent a request with replica 
> assignment), then controller starts replicas on the broker it deletes all 
> local data (due to missing highwatermarks). Furthermore, controller starts 
> processing pending shutdown request and stops replicas on the broker letting 
> it in a non-functional state. Solution to the problem might be to increase 
> time broker waits for controller reponse to shutdown request, but this 
> timeout is taken from controller.socket.timeout.ms which is global for all 
> broker-controller communication and setting it to 30 minutes is dangerous. 
> *Proposed solution: introduce dedicated config parameter for this timeout 
> with a high default*.
> # If a broker gets down during controlled shutdown and did not come back 
> controller got stuck in a deadlock (one thread owns the lock and tries to add 
> message to the dead broker's queue, send thread is a infinite loop trying to 
> retry message to the dead broker, and the broker failure handler is waiting 
> for a lock). There are numerous partitions without a leader and the only way 
> out is to kill -9 the controller. *Proposed solution: add timeout for adding 
> message to broker's queue*. ControllerChannelManager.sendRequest:
> {code}
>   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
> (RequestOrResponse) => Unit = null) {
> brokerLock synchronized {
>   val stateInfoOpt = brokerStateInfo.get(brokerId)
>

[jira] [Comment Edited] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-24 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378433#comment-14378433
 ] 

Dmitry Bugaychenko edited comment on KAFKA-2029 at 3/24/15 7:34 PM:


We tried prioritization of controller messages, but it din't help. 
Communication with a single broker is synchronous, but different brokers might 
handle requests on different speed - as a result with a large queue one broker 
can get way behind another one while controller thinks it is doing fine. 
Addiding "tracked" we ensure taht *all* brokers done the leadership movement, 
thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
...
  private val requestQueue = new 
LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in 
the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId 
== RequestKeys.StopReplicaKey) {
  // ODKL Patch: prioritize controller requests over data requests.
  requestQueue.putFirst(request)
  info("Escalated controller request: " + 
request.requestObj.describe(details = true))
} else {
  requestQueue.putLast(request)
}
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
requestQueue.takeFirst()
...
{code}

It increased GC overhead but hasn't improved the speed of partitions movement - 
it looks like the network request processing is not the botlleneck.


was (Author: dmitrybugaychenko):
We tried prioritization of controller messages, but it din't help. 
Communication with a single broker is synchronous, but different brokers might 
handle requests on different speed - as a result with a large queue one broker 
can get way behind another one while controller thinks it is doing fine. 
Addiding "tracked" we ensure taht *all* brokers done the leadership movement, 
thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
...
  private val requestQueue = new 
LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in 
the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId 
== RequestKeys.StopReplicaKey) {
  // ODKL Patch: prioritize controller requests over data requests.
  requestQueue.putFirst(request)
  info("Escalated controller request: " + 
request.requestObj.describe(details = true))
} else {
  requestQueue.putLast(request)
}
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
requestQueue.takeFirst()
...
{code}

It increased GC overhead but didn't improved the speed of partitions movement - 
it looks like the network request processing is not the botlleneck.

> Improving controlled shutdown for rolling updates
> -
>
> Key: KAFKA-2029
> URL: https://issues.apache.org/jira/browse/KAFKA-2029
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Dmitry Bugaychenko
>Assignee: Neha Narkhede
>Priority: Critical
>
> Controlled shutdown as implemented currently can cause numerous problems: 
> deadlocks, local and global datalos, partitions without leader and etc. In 
> some cases the only way to restore cluster is to stop it completelly using 
> kill -9 and start again.
> Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
> queue size makes things much worse (see discussion there).
> Note 2: The problems described here can occure in any setup, but they are 
> extremly painful in setup with large brokers (36 disks, 1000+ assigned 
> partitions per broker in our case).
> Note 3: These improvements are actually workarounds and it is worth to 
> consider global refactoring of the controller (make it single thread, or even 
> get rid of it in the favour of ZK leader elections for partitions).
> The problems and improvements are:
> # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
> shutdown requests and finally considers it as failed and procedes to unclean 
> shutdow, controller got stuck for a while (holding a lock waiting 

[jira] [Comment Edited] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-24 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378433#comment-14378433
 ] 

Dmitry Bugaychenko edited comment on KAFKA-2029 at 3/24/15 7:35 PM:


We tried prioritization of controller messages, but it din't help. 
Communication with a single broker is synchronous, but different brokers might 
handle requests on different speed - as a result with a large queue one broker 
can get way behind another one while controller thinks it is doing fine. 
Addiding "tracked" we ensure that *all* brokers done the leadership movement, 
thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
...
  private val requestQueue = new 
LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in 
the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId 
== RequestKeys.StopReplicaKey) {
  // ODKL Patch: prioritize controller requests over data requests.
  requestQueue.putFirst(request)
  info("Escalated controller request: " + 
request.requestObj.describe(details = true))
} else {
  requestQueue.putLast(request)
}
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
requestQueue.takeFirst()
...
{code}

It increased GC overhead but hasn't improved the speed of partitions movement - 
it looks like the network request processing is not the botlleneck.


was (Author: dmitrybugaychenko):
We tried prioritization of controller messages, but it din't help. 
Communication with a single broker is synchronous, but different brokers might 
handle requests on different speed - as a result with a large queue one broker 
can get way behind another one while controller thinks it is doing fine. 
Addiding "tracked" we ensure taht *all* brokers done the leadership movement, 
thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
...
  private val requestQueue = new 
LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in 
the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId 
== RequestKeys.StopReplicaKey) {
  // ODKL Patch: prioritize controller requests over data requests.
  requestQueue.putFirst(request)
  info("Escalated controller request: " + 
request.requestObj.describe(details = true))
} else {
  requestQueue.putLast(request)
}
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
requestQueue.takeFirst()
...
{code}

It increased GC overhead but hasn't improved the speed of partitions movement - 
it looks like the network request processing is not the botlleneck.

> Improving controlled shutdown for rolling updates
> -
>
> Key: KAFKA-2029
> URL: https://issues.apache.org/jira/browse/KAFKA-2029
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Dmitry Bugaychenko
>Assignee: Neha Narkhede
>Priority: Critical
>
> Controlled shutdown as implemented currently can cause numerous problems: 
> deadlocks, local and global datalos, partitions without leader and etc. In 
> some cases the only way to restore cluster is to stop it completelly using 
> kill -9 and start again.
> Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
> queue size makes things much worse (see discussion there).
> Note 2: The problems described here can occure in any setup, but they are 
> extremly painful in setup with large brokers (36 disks, 1000+ assigned 
> partitions per broker in our case).
> Note 3: These improvements are actually workarounds and it is worth to 
> consider global refactoring of the controller (make it single thread, or even 
> get rid of it in the favour of ZK leader elections for partitions).
> The problems and improvements are:
> # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
> shutdown requests and finally considers it as failed and procedes to unclean 
> shutdow, controller got stuck for a while (holding a lock waiting 

MirrorMaker improvements

2015-03-24 Thread vlad...@gmail.com
Dear all,

I had a short discussion with Jay yesterday at the ACM meetup and he
suggested writing an email regarding a few possible MirrorMaker
improvements.

At Turn, we have been using MirrorMaker for a a few months now to
asynchronously replicate our key/value store data between our datacenters.
In a way, our system is similar to Linkedin's Databus, but it uses Kafka
clusters and MirrorMaker as its building blocks. Our overall message rate
peaks at about 650K/sec and, when pushing data over high bandwidth delay
product links, we have found some minor bottlenecks.

The MirrorMaker process uses a standard consumer to pull data from a remote
datacenter. This implies that it opens a single TCP connection to each of
the remote brokers and muxes requests for different topics and partitions
over this connection. While this is a good thing in terms of maintaining
the congestion window open, over long RTT lines with rather high loss rate
the congestion window will cap, in our case at just a few Mbps. While the
overall line bandwidth is much higher, this means that we have to start
multiple MirrorMaker processes (somewhere in the hundreds), in order to
completely use the line capacity. Being able to pool multiple TCP
connections from a single consumer to a broker would solve this
complication.

The standard consumer also uses the remote ZooKeeper in order to manage the
consumer group. While consumer group management is moving closer to the
brokers, it might make sense to move the group management to the local
datacenter, since that would avoid using the long-distance connection for
this purpose.

Another possible improvement assumes a further constraint, namely that the
number of partitions for a topic in both datacenters is the same. In my
opinion, this is a sane constraint, since it preserves the Kafka ordering
guarantees (per partition), instead of a simple guarantee per key. This
kind of guarantee can be for example useful in a system that compares
partition contents to reach eventual consistency using Merkle trees. If the
number of partitions is equal, then offsets have the same meaning for the
same partition in both clusters, since the data for both partitions is
identical before the offset. This allows a simple consumer to just inquire
the local broker and the remote broker for their current offsets and, in
case the remote broker is ahead, copy the extra data to the local cluster.
Since the consumer offsets are no longer bound to the specific partitioning
of a single remote cluster, the consumer could pull from one of any number
of remote clusters, BitTorrent-style, if their offsets are ahead of the
local offset. The group management problem would reduce to assigning local
partitions to different MirrorMaker processes, so the group management
could be done locally also in this situation.

Regards,
Vlad


[jira] [Commented] (KAFKA-2045) Memory Management on the consumer

2015-03-24 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378476#comment-14378476
 ] 

Jay Kreps commented on KAFKA-2045:
--

There are really two issues:
1. Bounding fetch size while still guaranteeing that you eventually get data 
from each partition
2. Pooling and reusing byte buffers

I actually think (1) is really pressing, but (2) is just an optimization that 
may or may not have high payoff.

(1) is what leads to the huge memory allocations and sudden OOM when a consumer 
falls behind and then suddenly has lots of data or when partition assignment 
changes.

For (1) I think we need to figure out whether this is (a) some heuristic in the 
consumer which decides to only do fetches for a subset of topic/partitions or 
(b) a new parameter in the fetch request that gives a total bound on the 
request size. I think we discussed this a while back and agreed on (b), but I 
can't remember now. The argument if I recall was that that was the only way for 
the server to monitor all the subscribed topics and avoid blocking on an empty 
topic while non-empty partitions have data.

Bounding the allocations should help performance a lot too.

If we do this bounding then I think reuse will be a lot easier to since each 
response will use at most that many bytes and you could potentially even just 
statically allocate the byte buffer for each partition and reuse it.

> Memory Management on the consumer
> -
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32434: Patch for KAFKA-2042

2015-03-24 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32434/#review77634
---


As discussed offline, I think it is better to make the fix in the app-level 
(KafkaProducer, Metadata) rather than forbidding sending emptry MetadataRequest 
in NetworkClient.

- Guozhang Wang


On March 24, 2015, 8:17 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32434/
> ---
> 
> (Updated March 24, 2015, 8:17 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2042
> https://issues.apache.org/jira/browse/KAFKA-2042
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Patch for KAFKA-2042. Do not update metadata for empty topic set in new 
> producer
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> f4295025c28e2842244dc775052b7a3d30fb9d11 
> 
> Diff: https://reviews.apache.org/r/32434/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



Re: [VOTE] KIP-15 add a close method with timeout to KafkaProducer

2015-03-24 Thread Guozhang Wang
+1, let's just start a new thread for this.

On Tue, Mar 24, 2015 at 12:23 PM, Joel Koshy  wrote:

> Actually, since there are already votes on this and the KIP has
> changed a bit we should cancel this and start a new thread.
>
> On Tue, Mar 24, 2015 at 12:19 PM, Jiangjie Qin
>  wrote:
> > Push up the thread for voting after discussion on the KIP hangout.
> >
> > On 3/19/15, 9:03 PM, "Jiangjie Qin"  wrote:
> >
> >>We had some additional discussions on the discussion thread. Pushing up
> >>this thread to resume voting.
> >>
> >>On 3/11/15, 8:47 PM, "Jay Kreps"  wrote:
> >>
> >>>Yeah guys, I'd like to second that. I'd really really love to get the
> >>>quality of these to the point where we could broadly solicit user input
> >>>and
> >>>use them as a permanent document of the alternatives and rationale.
> >>>
> >>>I know it is a little painful to have process, but I think we all saw
> >>>what
> >>>happened to the previous clients as public interfaces so I really really
> >>>really want us to just be incredibly thoughtful and disciplined as we
> >>>make
> >>>changes. I think we all want to avoid another "client rewrite".
> >>>
> >>>To second Joe's question in a more specific way, I think an alternative
> I
> >>>don't see considered to give close() a bounded time is just to enforce
> >>>the
> >>>request time on the client side, which will cause all requests to be
> >>>failed
> >>>after the request timeout expires. This was the same behavior as for
> >>>flush.
> >>>In the case where the user just wants to ensure close doesn't block
> >>>forever
> >>>I think that may be sufficient?
> >>>
> >>>So one alternative might be to just do that request timeout feature and
> >>>add
> >>>a new producer config that is something like
> >>>  abort.on.failure=false
> >>>which causes the producer to hard exit if it can't send a request. Which
> >>>I
> >>>think is closer to what you want, with this just being a way to
> implement
> >>>that behavior.
> >>>
> >>>I'm not sure if this is better or worse, but we should be sure before we
> >>>make the change.
> >>>
> >>>I also have a concern about
> >>>  producer.close(0, TimeUnit.MILLISECONDS)
> >>>not meaning close with a timeout of 0 ms.
> >>>
> >>>I realize this exists in other java apis, but it is so confusing it even
> >>>confused us into having that recent producer bug because of course all
> >>>the
> >>>other numbers mean "wait that long".
> >>>
> >>>I'd propose
> >>>  close()--block until all completed
> >>>  close(0, TimeUnit.MILLISECONDS)--block for 0 ms
> >>>  close(5, TimeUnit.MILLISECONDS)--block for 5 ms
> >>>  close(-1, TimeUnit.MILLISECONDS)--error because blocking for negative
> >>>ms
> >>>would mean completing in the past :-)
> >>>
> >>>-Jay
> >>>
> >>>On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein 
> wrote:
> >>>
>  Could the KIP confluence please have updated the discussion thread
> link,
>  thanks... could you also remove the template boilerplate at the top
> "*This
>  page is meant as a template ..*" so we can capture it for the release
>  cleanly.
> 
>  Also I don't really/fully understand how this is different than
>  flush(time); close() and why close has its own timeout also?
> 
>  Lastly, what is the forceClose flag? This isn't documented in the
> public
>  interface so it isn't clear how to completely use the feature just by
>  reading the KIP.
> 
>  ~ Joe Stein
>  - - - - - - - - - - - - - - - - -
> 
>    http://www.stealth.ly
>  - - - - - - - - - - - - - - - - -
> 
>  On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang 
>  wrote:
> 
>  > +1 (binding)
>  >
>  > On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin
>   >
>  > wrote:
>  >
>  > >
>  > >
>  >
> 
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m
> e
> thod+with+a+timeout+in+the+producer
>  > >
>  > >
>  >
>  >
>  > --
>  > -- Guozhang
>  >
> 
> >>
> >
>



-- 
-- Guozhang


[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2042:

Attachment: KAFKA-2042_2015-03-24_13:37:49.patch

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32434: Patch for KAFKA-2042

2015-03-24 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32434/
---

(Updated March 24, 2015, 8:37 p.m.)


Review request for kafka.


Bugs: KAFKA-2042
https://issues.apache.org/jira/browse/KAFKA-2042


Repository: kafka


Description (updated)
---

Move the change to KafkaProducer after talking to Guozhang offline.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
feda9c922d7dab17e424f8e6f0aa0a3f968e3729 

Diff: https://reviews.apache.org/r/32434/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378551#comment-14378551
 ] 

Jiangjie Qin commented on KAFKA-2042:
-

Updated reviewboard https://reviews.apache.org/r/32434/diff/
 against branch origin/trunk

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378586#comment-14378586
 ] 

Jiangjie Qin commented on KAFKA-2042:
-

Updated reviewboard https://reviews.apache.org/r/32434/diff/
 against branch origin/trunk

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
> KAFKA-2042_2015-03-24_13:57:23.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2042:

Attachment: KAFKA-2042_2015-03-24_13:57:23.patch

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
> KAFKA-2042_2015-03-24_13:57:23.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32434: Patch for KAFKA-2042

2015-03-24 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32434/
---

(Updated March 24, 2015, 8:57 p.m.)


Review request for kafka.


Bugs: KAFKA-2042
https://issues.apache.org/jira/browse/KAFKA-2042


Repository: kafka


Description (updated)
---

Move the change to KafkaProducer after talking to Guozhang offline.


A less expensive fix


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/Metadata.java 
c8bde8b732defa20819730d87303a9a80d01116f 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
feda9c922d7dab17e424f8e6f0aa0a3f968e3729 

Diff: https://reviews.apache.org/r/32434/diff/


Testing
---


Thanks,

Jiangjie Qin



[VOTE] KIP-15 add a close with timeout to new producer

2015-03-24 Thread Jiangjie Qin
https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+method+with+a+timeout+in+the+producer

As a short summary, the new interface will be as following:
Close(Long Timeout, TimeUnit timeUnit)

  1.  When timeout > 0, it will try to wait up to timeout for the sender thread 
to complete all the requests, then join the sender thread. If the sender thread 
is not able to finish work before timeout, the method force close the producer 
by fail all the incomplete requests and join the sender thread.
  2.  When timeout = 0, it will be a non-blocking call, just initiate the force 
close and DOES NOT join the sender thread.

If close(timeout) is called from callback, an error message will be logged and 
the producer sender thread will block forever.



Re: Review Request 32434: Patch for KAFKA-2042

2015-03-24 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32434/#review77653
---



clients/src/main/java/org/apache/kafka/clients/Metadata.java


Rename to containsTopic() for Java naming consistency? If you do not feel 
strongly about this I can do the change upon checkin.


- Guozhang Wang


On March 24, 2015, 8:57 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32434/
> ---
> 
> (Updated March 24, 2015, 8:57 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2042
> https://issues.apache.org/jira/browse/KAFKA-2042
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Move the change to KafkaProducer after talking to Guozhang offline.
> 
> 
> A less expensive fix
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> c8bde8b732defa20819730d87303a9a80d01116f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
> 
> Diff: https://reviews.apache.org/r/32434/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



Re: Review Request 32434: Patch for KAFKA-2042

2015-03-24 Thread Jiangjie Qin


> On March 24, 2015, 9:21 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/Metadata.java, line 139
> > 
> >
> > Rename to containsTopic() for Java naming consistency? If you do not 
> > feel strongly about this I can do the change upon checkin.

Thanks for the review, Guozhang. Yes, I'm fine with containsTopic().


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32434/#review77653
---


On March 24, 2015, 8:57 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32434/
> ---
> 
> (Updated March 24, 2015, 8:57 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2042
> https://issues.apache.org/jira/browse/KAFKA-2042
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Move the change to KafkaProducer after talking to Guozhang offline.
> 
> 
> A less expensive fix
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> c8bde8b732defa20819730d87303a9a80d01116f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
> 
> Diff: https://reviews.apache.org/r/32434/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[CANCEL] [VOTE] KIP-15 add a close method with timeout to KafkaProducer

2015-03-24 Thread Jiangjie Qin


On 3/24/15, 1:34 PM, "Guozhang Wang"  wrote:

>+1, let's just start a new thread for this.
>
>On Tue, Mar 24, 2015 at 12:23 PM, Joel Koshy  wrote:
>
>> Actually, since there are already votes on this and the KIP has
>> changed a bit we should cancel this and start a new thread.
>>
>> On Tue, Mar 24, 2015 at 12:19 PM, Jiangjie Qin
>>  wrote:
>> > Push up the thread for voting after discussion on the KIP hangout.
>> >
>> > On 3/19/15, 9:03 PM, "Jiangjie Qin"  wrote:
>> >
>> >>We had some additional discussions on the discussion thread. Pushing
>>up
>> >>this thread to resume voting.
>> >>
>> >>On 3/11/15, 8:47 PM, "Jay Kreps"  wrote:
>> >>
>> >>>Yeah guys, I'd like to second that. I'd really really love to get the
>> >>>quality of these to the point where we could broadly solicit user
>>input
>> >>>and
>> >>>use them as a permanent document of the alternatives and rationale.
>> >>>
>> >>>I know it is a little painful to have process, but I think we all saw
>> >>>what
>> >>>happened to the previous clients as public interfaces so I really
>>really
>> >>>really want us to just be incredibly thoughtful and disciplined as we
>> >>>make
>> >>>changes. I think we all want to avoid another "client rewrite".
>> >>>
>> >>>To second Joe's question in a more specific way, I think an
>>alternative
>> I
>> >>>don't see considered to give close() a bounded time is just to
>>enforce
>> >>>the
>> >>>request time on the client side, which will cause all requests to be
>> >>>failed
>> >>>after the request timeout expires. This was the same behavior as for
>> >>>flush.
>> >>>In the case where the user just wants to ensure close doesn't block
>> >>>forever
>> >>>I think that may be sufficient?
>> >>>
>> >>>So one alternative might be to just do that request timeout feature
>>and
>> >>>add
>> >>>a new producer config that is something like
>> >>>  abort.on.failure=false
>> >>>which causes the producer to hard exit if it can't send a request.
>>Which
>> >>>I
>> >>>think is closer to what you want, with this just being a way to
>> implement
>> >>>that behavior.
>> >>>
>> >>>I'm not sure if this is better or worse, but we should be sure
>>before we
>> >>>make the change.
>> >>>
>> >>>I also have a concern about
>> >>>  producer.close(0, TimeUnit.MILLISECONDS)
>> >>>not meaning close with a timeout of 0 ms.
>> >>>
>> >>>I realize this exists in other java apis, but it is so confusing it
>>even
>> >>>confused us into having that recent producer bug because of course
>>all
>> >>>the
>> >>>other numbers mean "wait that long".
>> >>>
>> >>>I'd propose
>> >>>  close()--block until all completed
>> >>>  close(0, TimeUnit.MILLISECONDS)--block for 0 ms
>> >>>  close(5, TimeUnit.MILLISECONDS)--block for 5 ms
>> >>>  close(-1, TimeUnit.MILLISECONDS)--error because blocking for
>>negative
>> >>>ms
>> >>>would mean completing in the past :-)
>> >>>
>> >>>-Jay
>> >>>
>> >>>On Wed, Mar 11, 2015 at 8:31 PM, Joe Stein 
>> wrote:
>> >>>
>>  Could the KIP confluence please have updated the discussion thread
>> link,
>>  thanks... could you also remove the template boilerplate at the top
>> "*This
>>  page is meant as a template ..*" so we can capture it for the
>>release
>>  cleanly.
>> 
>>  Also I don't really/fully understand how this is different than
>>  flush(time); close() and why close has its own timeout also?
>> 
>>  Lastly, what is the forceClose flag? This isn't documented in the
>> public
>>  interface so it isn't clear how to completely use the feature just
>>by
>>  reading the KIP.
>> 
>>  ~ Joe Stein
>>  - - - - - - - - - - - - - - - - -
>> 
>>    http://www.stealth.ly
>>  - - - - - - - - - - - - - - - - -
>> 
>>  On Wed, Mar 11, 2015 at 11:24 PM, Guozhang Wang
>>
>>  wrote:
>> 
>>  > +1 (binding)
>>  >
>>  > On Wed, Mar 11, 2015 at 8:10 PM, Jiangjie Qin
>> >  >
>>  > wrote:
>>  >
>>  > >
>>  > >
>>  >
>> 
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m
>> e
>> thod+with+a+timeout+in+the+producer
>>  > >
>>  > >
>>  >
>>  >
>>  > --
>>  > -- Guozhang
>>  >
>> 
>> >>
>> >
>>
>
>
>
>-- 
>-- Guozhang



KAFKA-2042

2015-03-24 Thread Guozhang Wang
Hello,

We found a serious bug while testing flush() calls in the new producer,
which is summarized in KAFKA-2042.

In general, when the producer starts up it will try to refresh metadata
with empty topic list, and hence get all the topic metadata. When sending
the message with some topic later, it will hence not cause the topic to be
added into metadata's topic list since the metadata is available. When the
data is still sitting in the accumulator and a new topic is created, that
will cause metadata refresh with just this single topic, hence losing the
metadata for any other topics. Under usual scenarios the messages will be
sitting in the accumulator until another send() is triggered with the same
topic, but with flush() as a blocking call the likelihood of this issue
being exposed that messages gets blocked forever inside flush() could be
largely increased.

I am writing to ask if people think this problem is severe enough that
requires another bug-fix release.

-- Guozhang


Re: [VOTE] KIP-15 add a close with timeout to new producer

2015-03-24 Thread Guozhang Wang
+1.

On Tue, Mar 24, 2015 at 2:15 PM, Jiangjie Qin 
wrote:

>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+method+with+a+timeout+in+the+producer
>
> As a short summary, the new interface will be as following:
> Close(Long Timeout, TimeUnit timeUnit)
>
>   1.  When timeout > 0, it will try to wait up to timeout for the sender
> thread to complete all the requests, then join the sender thread. If the
> sender thread is not able to finish work before timeout, the method force
> close the producer by fail all the incomplete requests and join the sender
> thread.
>   2.  When timeout = 0, it will be a non-blocking call, just initiate the
> force close and DOES NOT join the sender thread.
>
> If close(timeout) is called from callback, an error message will be logged
> and the producer sender thread will block forever.
>
>


-- 
-- Guozhang


[jira] [Created] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Clark Haskins (JIRA)
Clark Haskins created KAFKA-2046:


 Summary: Delete topic still doesn't work
 Key: KAFKA-2046
 URL: https://issues.apache.org/jira/browse/KAFKA-2046
 Project: Kafka
  Issue Type: Bug
Reporter: Clark Haskins


I just attempted to delete at 128 partition topic with all inbound producers 
stopped.

The result was as follows:
The /admin/delete_topics znode was empty
the topic under /brokers/topics was removed
The Kafka topics command showed that the topic was removed

However, the data on disk on each broker was not deleted and the topic has not 
yet been re-created by starting up the inbound mirror maker.

Let me know what additional information is needed





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1982) change kafka.examples.Producer to use the new java producer

2015-03-24 Thread Pete Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378734#comment-14378734
 ] 

Pete Wright commented on KAFKA-1982:


+1 on committing this as it will quicken adoption of the new producer across 
our organization while also allowing us to track upstream releases with out 
patches.

> change kafka.examples.Producer to use the new java producer
> ---
>
> Key: KAFKA-1982
> URL: https://issues.apache.org/jira/browse/KAFKA-1982
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>Assignee: Ashish K Singh
>  Labels: newbie
> Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, 
> KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, 
> KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch
>
>
> We need to change the example to use the new java producer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378741#comment-14378741
 ] 

Sriharsha Chintalapani commented on KAFKA-2046:
---

[~clarkhaskins] can you add the details on how the big the cluster was and also 
do you have state-change.log files on the brokers where the Log data was not 
deleted.

> Delete topic still doesn't work
> ---
>
> Key: KAFKA-2046
> URL: https://issues.apache.org/jira/browse/KAFKA-2046
> Project: Kafka
>  Issue Type: Bug
>Reporter: Clark Haskins
>
> I just attempted to delete at 128 partition topic with all inbound producers 
> stopped.
> The result was as follows:
> The /admin/delete_topics znode was empty
> the topic under /brokers/topics was removed
> The Kafka topics command showed that the topic was removed
> However, the data on disk on each broker was not deleted and the topic has 
> not yet been re-created by starting up the inbound mirror maker.
> Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani reassigned KAFKA-2046:
-

Assignee: Sriharsha Chintalapani

> Delete topic still doesn't work
> ---
>
> Key: KAFKA-2046
> URL: https://issues.apache.org/jira/browse/KAFKA-2046
> Project: Kafka
>  Issue Type: Bug
>Reporter: Clark Haskins
>Assignee: Sriharsha Chintalapani
>
> I just attempted to delete at 128 partition topic with all inbound producers 
> stopped.
> The result was as follows:
> The /admin/delete_topics znode was empty
> the topic under /brokers/topics was removed
> The Kafka topics command showed that the topic was removed
> However, the data on disk on each broker was not deleted and the topic has 
> not yet been re-created by starting up the inbound mirror maker.
> Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KAFKA-2042

2015-03-24 Thread Jun Rao
Hi, Guozhang,

The flush() was added to the new producer in trunk, not in 0.8.2, right?

Thanks,

Jun

On Tue, Mar 24, 2015 at 2:42 PM, Guozhang Wang  wrote:

> Hello,
>
> We found a serious bug while testing flush() calls in the new producer,
> which is summarized in KAFKA-2042.
>
> In general, when the producer starts up it will try to refresh metadata
> with empty topic list, and hence get all the topic metadata. When sending
> the message with some topic later, it will hence not cause the topic to be
> added into metadata's topic list since the metadata is available. When the
> data is still sitting in the accumulator and a new topic is created, that
> will cause metadata refresh with just this single topic, hence losing the
> metadata for any other topics. Under usual scenarios the messages will be
> sitting in the accumulator until another send() is triggered with the same
> topic, but with flush() as a blocking call the likelihood of this issue
> being exposed that messages gets blocked forever inside flush() could be
> largely increased.
>
> I am writing to ask if people think this problem is severe enough that
> requires another bug-fix release.
>
> -- Guozhang
>


Re: KAFKA-2042

2015-03-24 Thread Jiangjie Qin
Hi Jun,

This issue does not only affect flush(). It is just with flush() the
probability is much higher.
It will affect the following scenario:
1. Producer started and refreshed metadata.
2. User call producer.send() to send message 1 to topic A, but message A
is in accumulator.
3. User call producer.send() to send message 2 to topic B (topic B is a
new topic, which does not exist in broker)
4. Message 1 will not get sent out until user try to send message to topic
A again.

If a flush() is called at this point, it will block forever.

Jiangjie (Becket) Qin

On 3/24/15, 3:09 PM, "Jun Rao"  wrote:

>Hi, Guozhang,
>
>The flush() was added to the new producer in trunk, not in 0.8.2, right?
>
>Thanks,
>
>Jun
>
>On Tue, Mar 24, 2015 at 2:42 PM, Guozhang Wang  wrote:
>
>> Hello,
>>
>> We found a serious bug while testing flush() calls in the new producer,
>> which is summarized in KAFKA-2042.
>>
>> In general, when the producer starts up it will try to refresh metadata
>> with empty topic list, and hence get all the topic metadata. When
>>sending
>> the message with some topic later, it will hence not cause the topic to
>>be
>> added into metadata's topic list since the metadata is available. When
>>the
>> data is still sitting in the accumulator and a new topic is created,
>>that
>> will cause metadata refresh with just this single topic, hence losing
>>the
>> metadata for any other topics. Under usual scenarios the messages will
>>be
>> sitting in the accumulator until another send() is triggered with the
>>same
>> topic, but with flush() as a blocking call the likelihood of this issue
>> being exposed that messages gets blocked forever inside flush() could be
>> largely increased.
>>
>> I am writing to ask if people think this problem is severe enough that
>> requires another bug-fix release.
>>
>> -- Guozhang
>>



[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378783#comment-14378783
 ] 

Neha Narkhede commented on KAFKA-2046:
--

[~clarkhaskins] As discussed previously, the minimum amount of information that 
is needed to troubleshoot any issue, not just delete topic is-
1. Controller logs (possibly at TRACE)
2. Server logs
3 State change log (DEBUG works)

> Delete topic still doesn't work
> ---
>
> Key: KAFKA-2046
> URL: https://issues.apache.org/jira/browse/KAFKA-2046
> Project: Kafka
>  Issue Type: Bug
>Reporter: Clark Haskins
>Assignee: Sriharsha Chintalapani
>
> I just attempted to delete at 128 partition topic with all inbound producers 
> stopped.
> The result was as follows:
> The /admin/delete_topics znode was empty
> the topic under /brokers/topics was removed
> The Kafka topics command showed that the topic was removed
> However, the data on disk on each broker was not deleted and the topic has 
> not yet been re-created by starting up the inbound mirror maker.
> Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KAFKA-2042

2015-03-24 Thread Guozhang Wang
Ah that is right, we just need to make sure this ticket goes along with
flush() call in the next release then.

On Tue, Mar 24, 2015 at 3:09 PM, Jun Rao  wrote:

> Hi, Guozhang,
>
> The flush() was added to the new producer in trunk, not in 0.8.2, right?
>
> Thanks,
>
> Jun
>
> On Tue, Mar 24, 2015 at 2:42 PM, Guozhang Wang  wrote:
>
> > Hello,
> >
> > We found a serious bug while testing flush() calls in the new producer,
> > which is summarized in KAFKA-2042.
> >
> > In general, when the producer starts up it will try to refresh metadata
> > with empty topic list, and hence get all the topic metadata. When sending
> > the message with some topic later, it will hence not cause the topic to
> be
> > added into metadata's topic list since the metadata is available. When
> the
> > data is still sitting in the accumulator and a new topic is created, that
> > will cause metadata refresh with just this single topic, hence losing the
> > metadata for any other topics. Under usual scenarios the messages will be
> > sitting in the accumulator until another send() is triggered with the
> same
> > topic, but with flush() as a blocking call the likelihood of this issue
> > being exposed that messages gets blocked forever inside flush() could be
> > largely increased.
> >
> > I am writing to ask if people think this problem is severe enough that
> > requires another bug-fix release.
> >
> > -- Guozhang
> >
>



-- 
-- Guozhang


Review Request 32459: Patch for KAFKA-2044

2015-03-24 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/
---

Review request for kafka.


Bugs: KAFKA-2044
https://issues.apache.org/jira/browse/KAFKA-2044


Repository: kafka


Description
---

support requests and responses using Common api in core modules (missing files)


added error handling and factory method for requests


Diffs
-

  
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
 37aff6c0fd2ec2da8551aa74b166ca49b224ddd3 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
6943878116a97c02b758d273d93976019688830e 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
1ebc188742fd65e5e744003b4579324874fd81a9 
  core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
  core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
9a71faae3138af1b4fb48125db619ddc3ad13c5a 
  core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
3651e8603dd0ed0d2ea059786c68cf0722aa094b 
  core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
  core/src/main/scala/kafka/api/RequestKeys.scala 
c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
55ecac285e00abf38d7131368bb46b4c4010dc87 
  core/src/main/scala/kafka/network/RequestChannel.scala 
7b1db3dbbb2c0676f166890f566c14aa248467ab 
  core/src/main/scala/kafka/server/KafkaApis.scala 
35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 

Diff: https://reviews.apache.org/r/32459/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-24 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2044:

Attachment: KAFKA-2044.patch

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-24 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2044:

Status: Patch Available  (was: Open)

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-24 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378814#comment-14378814
 ] 

Gwen Shapira commented on KAFKA-2044:
-

Created reviewboard https://reviews.apache.org/r/32459/diff/
 against branch trunk

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [Discussion] Using Client Requests and Responses in Server

2015-03-24 Thread Gwen Shapira
OK, I posted a working patch on KAFKA-2044 and
https://reviews.apache.org/r/32459/diff/.

There are few decisions there than can be up to discussion (factory method
on AbstractRequestResponse, the new handleErrors in request API), but as
far as support for o.a.k.common requests in core goes, it does what it
needs to do.

Please review!

Gwen



On Tue, Mar 24, 2015 at 10:59 AM, Gwen Shapira 
wrote:

> Hi,
>
> I uploaded a (very) preliminary patch with my idea.
>
> One thing thats missing:
> RequestResponse had  handleError method that all requests implemented,
> typically generating appropriate error Response for the request and sending
> it along. Its used by KafkaApis to handle all protocol errors for valid
> requests that are not handled elsewhere.
> AbstractRequestResponse doesn't have such method.
>
> I can, of course, add it.
> But before I jump into this, I'm wondering if there was another plan on
> handling Api errors.
>
> Gwen
>
> On Mon, Mar 23, 2015 at 6:16 PM, Jun Rao  wrote:
>
>> I think what you are saying is that in RequestChannel, we can start
>> generating header/body for new request types and leave requestObj null.
>> For
>> existing requests, header/body will be null initially. Gradually, we can
>> migrate each type of requests by populating header/body, instead of
>> requestObj. This makes sense to me since it serves two purposes (1) not
>> polluting the code base with duplicated request/response objects for new
>> types of requests and (2) allowing the refactoring of existing requests to
>> be done in smaller pieces.
>>
>> Could you try that approach and perhaps just migrate one existing request
>> type (e.g. HeartBeatRequest) as an example? We probably need to rewind the
>> buffer after reading the requestId when deserializing the header (since
>> the
>> header includes the request id).
>>
>> Thanks,
>>
>> Jun
>>
>> On Mon, Mar 23, 2015 at 4:52 PM, Gwen Shapira 
>> wrote:
>>
>> > I'm thinking of a different approach, that will not fix everything, but
>> > will allow adding new requests without code duplication (and therefore
>> > unblock KIP-4):
>> >
>> > RequestChannel.request currently takes a buffer and parses it into an
>> "old"
>> > request object. Since the objects are byte-compatibly, we should be
>> able to
>> > parse existing requests into both old and new objects. New requests will
>> > only be parsed into new objects.
>> >
>> > Basically:
>> > val requestId = buffer.getShort()
>> > if (requestId in keyToNameAndDeserializerMap) {
>> >requestObj = RequestKeys.deserializerForKey(requestId)(buffer)
>> >header: RequestHeader = RequestHeader.parse(buffer)
>> >body: Struct =
>> >
>> ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
>> > } else {
>> >requestObj = null
>> > header: RequestHeader = RequestHeader.parse(buffer)
>> >body: Struct =
>> >
>> ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
>> > }
>> >
>> > This way existing KafkaApis will keep working as normal. The new Apis
>> can
>> > implement just the new header/body requests.
>> > We'll do the same on the send-side: BoundedByteBufferSend can have a
>> > constructor that takes header/body instead of just a response object.
>> >
>> > Does that make sense?
>> >
>> > Once we have this in, we can move to:
>> > * Adding the missing request/response to the client code
>> > * Replacing requests that can be replaced
>> >
>> > It will also make life easier by having us review and tests smaller
>> chunks
>> > of work (the existing patch is *huge* , touches nearly every core
>> component
>> > and I'm not done yet...)
>> >
>> > Gwen
>> >
>> >
>> >
>> >
>> > On Sun, Mar 22, 2015 at 10:24 PM, Jay Kreps 
>> wrote:
>> >
>> > > Ack, yeah, forgot about that.
>> > >
>> > > It's not just a difference of wrappers. The server side actually sends
>> > the
>> > > bytes lazily using FileChannel.transferTo. We need to make it
>> possible to
>> > > carry over that optimization. In some sense what we want to be able
>> to do
>> > > is set a value to a Send instead of a ByteBuffer.
>> > >
>> > > Let me try to add that support to the protocol definition stuff, will
>> > > probably take me a few days to free up time.
>> > >
>> > > -Jay
>> > >
>> > > On Sun, Mar 22, 2015 at 7:44 PM, Gwen Shapira 
>> > > wrote:
>> > >
>> > > > In case anyone is still following this thread, I need a bit of help
>> :)
>> > > >
>> > > > The old FetchResponse.PartitionData included a MessageSet object.
>> > > > The new FetchResponse.PartitionData includes a ByteBuffer.
>> > > >
>> > > > However, when we read from logs, we return a MessageSet, and as far
>> as
>> > I
>> > > > can see, these can't be converted to ByteBuffers (at least not
>> without
>> > > > copying their data).
>> > > >
>> > > > Did anyone consider how to reconcile the MessageSets with the new
>> > > > FetchResponse objects?
>> > > >
>> > > > Gwen
>> > > >
>> > > >
>> > > > On Sat, Mar 21, 2015 at 6:54 PM, Gwen Shapira <
>> gshap...@c

Re: Review Request 31366: Patch for KAFKA-1461

2015-03-24 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31366/#review77674
---



core/src/main/scala/kafka/server/AbstractFetcherThread.scala


Jun has a comment about the case when all partitions gets inactive, which 
is common when the fetched broker has been just gone through leader migration.

We can move the foreach statement before the if statement, and after 
foreach check if any partitions gets added, if not just backoff for 
fetchBackoffMs.


- Guozhang Wang


On March 17, 2015, 11:03 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31366/
> ---
> 
> (Updated March 17, 2015, 11:03 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1461
> https://issues.apache.org/jira/browse/KAFKA-1461
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1461. Replica fetcher thread does not implement any back-off behavior.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> e731df4b2a3e44aa3d761713a09b1070aff81430 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
> 96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 
> 
> Diff: https://reviews.apache.org/r/31366/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>



Re: Review Request 32434: Patch for KAFKA-2042

2015-03-24 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32434/#review77675
---

Ship it!


Ship It!

- Guozhang Wang


On March 24, 2015, 8:57 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32434/
> ---
> 
> (Updated March 24, 2015, 8:57 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2042
> https://issues.apache.org/jira/browse/KAFKA-2042
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Move the change to KafkaProducer after talking to Guozhang offline.
> 
> 
> A less expensive fix
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> c8bde8b732defa20819730d87303a9a80d01116f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> feda9c922d7dab17e424f8e6f0aa0a3f968e3729 
> 
> Diff: https://reviews.apache.org/r/32434/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Updated] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2042:
-
Fix Version/s: 0.8.3

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.8.3
>
> Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
> KAFKA-2042_2015-03-24_13:57:23.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378828#comment-14378828
 ] 

Guozhang Wang commented on KAFKA-2042:
--

Thanks for the patch, committed to trunk.

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.8.3
>
> Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
> KAFKA-2042_2015-03-24_13:57:23.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32460: Patch for KAFKA-2032

2015-03-24 Thread Parth Brahmbhatt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32460/
---

Review request for kafka.


Bugs: KAFKA-2032
https://issues.apache.org/jira/browse/KAFKA-2032


Repository: kafka


Description
---

KAFKA:2032 added topic config cache.


Diffs
-

  core/src/main/scala/kafka/server/KafkaServer.scala 
dddef938fabae157ed8644536eb1a2f329fb42b7 
  core/src/main/scala/kafka/server/TopicConfigCache.scala PRE-CREATION 
  core/src/main/scala/kafka/server/TopicConfigManager.scala 
47295d40131492aaac786273819b7bc6e22e5486 
  core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
93182aeb342729d420d2e7d59a1035994164b7db 
  core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/32460/diff/


Testing
---


Thanks,

Parth Brahmbhatt



[jira] [Updated] (KAFKA-2032) ConsumerConfig doesn't validate partition.assignment.strategy values

2015-03-24 Thread Parth Brahmbhatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Brahmbhatt updated KAFKA-2032:

Attachment: KAFKA-2032.patch

> ConsumerConfig doesn't validate partition.assignment.strategy values
> 
>
> Key: KAFKA-2032
> URL: https://issues.apache.org/jira/browse/KAFKA-2032
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.2
>Reporter: Jason Rosenberg
>Assignee: Parth Brahmbhatt
> Attachments: KAFKA-2032.patch, KAFKA-2032.patch, 
> KAFKA-2032_2015-03-19_11:42:07.patch, KAFKA-2032_2015-03-19_11:44:48.patch, 
> KAFKA-2032_2015-03-19_11:47:24.patch, KAFKA-2032_2015-03-19_12:19:45.patch
>
>
> In the ConsumerConfig class, there are validation checks to make sure that 
> string based configuration properties conform to allowed values.  However, 
> this validation appears to be missing for the partition.assignment.strategy.  
> E.g. there is validation for autooffset.reset and offsets.storage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2032) ConsumerConfig doesn't validate partition.assignment.strategy values

2015-03-24 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378834#comment-14378834
 ] 

Parth Brahmbhatt commented on KAFKA-2032:
-

Created reviewboard https://reviews.apache.org/r/32460/diff/
 against branch origin/trunk

> ConsumerConfig doesn't validate partition.assignment.strategy values
> 
>
> Key: KAFKA-2032
> URL: https://issues.apache.org/jira/browse/KAFKA-2032
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.2
>Reporter: Jason Rosenberg
>Assignee: Parth Brahmbhatt
> Attachments: KAFKA-2032.patch, KAFKA-2032.patch, 
> KAFKA-2032_2015-03-19_11:42:07.patch, KAFKA-2032_2015-03-19_11:44:48.patch, 
> KAFKA-2032_2015-03-19_11:47:24.patch, KAFKA-2032_2015-03-19_12:19:45.patch
>
>
> In the ConsumerConfig class, there are validation checks to make sure that 
> string based configuration properties conform to allowed values.  However, 
> this validation appears to be missing for the partition.assignment.strategy.  
> E.g. there is validation for autooffset.reset and offsets.storage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-2046:
--
Assignee: Onur Karaman  (was: Sriharsha Chintalapani)

> Delete topic still doesn't work
> ---
>
> Key: KAFKA-2046
> URL: https://issues.apache.org/jira/browse/KAFKA-2046
> Project: Kafka
>  Issue Type: Bug
>Reporter: Clark Haskins
>Assignee: Onur Karaman
>
> I just attempted to delete at 128 partition topic with all inbound producers 
> stopped.
> The result was as follows:
> The /admin/delete_topics znode was empty
> the topic under /brokers/topics was removed
> The Kafka topics command showed that the topic was removed
> However, the data on disk on each broker was not deleted and the topic has 
> not yet been re-created by starting up the inbound mirror maker.
> Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378842#comment-14378842
 ] 

Joel Koshy commented on KAFKA-2046:
---

Hey Clark - Onur and I are looking into this.

> Delete topic still doesn't work
> ---
>
> Key: KAFKA-2046
> URL: https://issues.apache.org/jira/browse/KAFKA-2046
> Project: Kafka
>  Issue Type: Bug
>Reporter: Clark Haskins
>Assignee: Sriharsha Chintalapani
>
> I just attempted to delete at 128 partition topic with all inbound producers 
> stopped.
> The result was as follows:
> The /admin/delete_topics znode was empty
> the topic under /brokers/topics was removed
> The Kafka topics command showed that the topic was removed
> However, the data on disk on each broker was not deleted and the topic has 
> not yet been re-created by starting up the inbound mirror maker.
> Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31366: Patch for KAFKA-1461

2015-03-24 Thread Sriharsha Chintalapani


> On March 24, 2015, 10:46 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, lines 81-86
> > 
> >
> > Jun has a comment about the case when all partitions gets inactive, 
> > which is common when the fetched broker has been just gone through leader 
> > migration.
> > 
> > We can move the foreach statement before the if statement, and after 
> > foreach check if any partitions gets added, if not just backoff for 
> > fetchBackoffMs.

Thanks for the review. Are you looking at something like this. This wouldn't 
handle if we have partitionMap populated but all of them are inactive.

  partitionMap.foreach {
case((topicAndPartition, partitionFetchState)) =>
  if(partitionFetchState.isActive)
fetchRequestBuilder.addFetch(topicAndPartition.topic, 
topicAndPartition.partition,
  partitionFetchState.offset, fetchSize)
  }
  if (partitionMap.isEmpty)
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
or do we want to check if all the currentParttions are inactive and than 
backoff? that would be expensive to check if all the partitions or active or 
not in dowork.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31366/#review77674
---


On March 17, 2015, 11:03 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31366/
> ---
> 
> (Updated March 17, 2015, 11:03 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1461
> https://issues.apache.org/jira/browse/KAFKA-1461
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1461. Replica fetcher thread does not implement any back-off behavior.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> e731df4b2a3e44aa3d761713a09b1070aff81430 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
> 96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 
> 
> Diff: https://reviews.apache.org/r/31366/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>



[jira] [Commented] (KAFKA-2035) Add a topic config cache.

2015-03-24 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378925#comment-14378925
 ] 

Parth Brahmbhatt commented on KAFKA-2035:
-

Posted a review https://reviews.apache.org/r/32460/

> Add a topic config cache.
> -
>
> Key: KAFKA-2035
> URL: https://issues.apache.org/jira/browse/KAFKA-2035
> Project: Kafka
>  Issue Type: Task
>Reporter: Parth Brahmbhatt
>Assignee: Parth Brahmbhatt
>
> Currently the topic config is all about Log configuration so we have a 
> TopicConfigManager which takes in a Log instance and keeps updating that 
> instance's config instance as and when the topic config is updated. The topic 
> config update notifications are sent using zk watchers by Controller.
> I propose to introduce a TopicConfigCache which will be updated by 
> TopicConfigManager on any config changes. The log instance and any other 
> component (like the authorizer mentioned in KAFKA-1688) will have a reference 
> to TopicConfigCache using which they will access the topic configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : Kafka-trunk #430

2015-03-24 Thread Apache Jenkins Server
See 



[jira] [Created] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-24 Thread Jiangjie Qin (JIRA)
Jiangjie Qin created KAFKA-2047:
---

 Summary: Accelarate consumer rebalance in mirror maker.
 Key: KAFKA-2047
 URL: https://issues.apache.org/jira/browse/KAFKA-2047
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin


In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
longer because there are more zookeeper consumer connectors doing rebalance 
serially. Rebalance would be faster if the bootstrap of 
ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32465: Patch for KAFKA-2047

2015-03-24 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32465/
---

Review request for kafka.


Bugs: KAFKA-2047
https://issues.apache.org/jira/browse/KAFKA-2047


Repository: kafka


Description
---

Fix for KAFKA-2047 Accelarate consumer bootstrap consumer rebalance in mirror 
maker.


Diffs
-

  core/src/main/scala/kafka/tools/MirrorMaker.scala 
4f3c4c872e144195bb4b742b802fa3b931edb534 

Diff: https://reviews.apache.org/r/32465/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2047:

Attachment: KAFKA-2047.patch

> Accelarate consumer rebalance in mirror maker.
> --
>
> Key: KAFKA-2047
> URL: https://issues.apache.org/jira/browse/KAFKA-2047
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2047.patch
>
>
> In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
> longer because there are more zookeeper consumer connectors doing rebalance 
> serially. Rebalance would be faster if the bootstrap of 
> ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378935#comment-14378935
 ] 

Jiangjie Qin commented on KAFKA-2047:
-

Created reviewboard https://reviews.apache.org/r/32465/diff/
 against branch origin/trunk

> Accelarate consumer rebalance in mirror maker.
> --
>
> Key: KAFKA-2047
> URL: https://issues.apache.org/jira/browse/KAFKA-2047
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2047.patch
>
>
> In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
> longer because there are more zookeeper consumer connectors doing rebalance 
> serially. Rebalance would be faster if the bootstrap of 
> ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2047:

Status: Patch Available  (was: Open)

> Accelarate consumer rebalance in mirror maker.
> --
>
> Key: KAFKA-2047
> URL: https://issues.apache.org/jira/browse/KAFKA-2047
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2047.patch
>
>
> In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
> longer because there are more zookeeper consumer connectors doing rebalance 
> serially. Rebalance would be faster if the bootstrap of 
> ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work

2015-03-24 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378944#comment-14378944
 ] 

Onur Karaman commented on KAFKA-2046:
-

I just tried deleting a topic with 128 partitions and noticed that the delete 
topic node and the topic node were still in zk, and all but one replica on the 
brokers had not been deleted.

grep "handling stop replica (delete=false)" kafka-state-change.log produced 
output for all of the partitions. So the controller was able to send a 
StopReplicaRequest to the brokers to transition from OnlineReplica to 
OfflineReplica.

However, grep "handling stop replica (delete=true)" kafka-state-change.log only 
revealed only one replica. This was the replica that I noticed had actually 
been deleted from the filesystem. The other replicas never received the 
delete=true StopReplicaRequest. So the transition from OfflineReplica to 
ReplicaDeletionStarted for all the other replicas hang. A thread dump on the 
controller indicates that it's getting stuck because of a LinkedBlockingQueue 
in ControllerChannelManager:
{code}
"delete-topics-thread-xyz"...
  java.lang.Thread.State: WAITING (parking)
  ...
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
at 
kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
...
at 
kafka.controller.KafkaController.sendRequest(KafkaController.scala:670)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3$$anonfun$apply$10.apply(ControllerChannelManager.scala:320)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3$$anonfun$apply$10.apply(ControllerChannelManager.scala:317)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3.apply(ControllerChannelManager.scala:317)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3.apply(ControllerChannelManager.scala:310)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at 
kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:310)
at 
kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:115)
at 
kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:337)
at 
kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:327)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at 
kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:327)
{code}
controller.message.queue.size property is currently set to be very small. I'll 
try bumping this up and see if this addresses the issue.

> Delete topic still doesn't work
> ---
>
> Key: KAFKA-2046
> URL: https://issues.apache.org/jira/browse/KAFKA-2046
> Project: Kafka
>  Issue Type: Bug
>Reporter: Clark Haskins
>Assignee: Onur Karaman
>
> I just attempted to delete at 128 partition topic with all inbound producers 
> stopped.
> The result was as follows:
> The /admin/delete_topics znode was empty
> the topic under /brokers/topics was removed
> The Kafka topics command showed that the topic was removed
> However, the data on disk on each broker was not deleted and the topic has 
> not yet been re-created by starting up the inbound mirror maker.
> Let me know what additional information is needed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378954#comment-14378954
 ] 

Jun Rao commented on KAFKA-2042:


Could you explain a bit more when the new producer will send a TMR with an 
empty topic list? I can see this happen if after the producer is created, no 
message is sent within the window of metadata age. Is that the only case when 
this can happen?

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.8.3
>
> Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
> KAFKA-2042_2015-03-24_13:57:23.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : KafkaPreCommit #42

2015-03-24 Thread Apache Jenkins Server
See 



[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378986#comment-14378986
 ] 

Jiangjie Qin commented on KAFKA-2042:
-

Yes, it depends on whether the topic list is empty or not when we send the 
first TMR.
I might miss something but I think the TMR will be sent very soon after the 
producer is instantiated. 
In the first NetworkClient.poll(), it checks if metadata needs update by 
getting the max of
timeToNextMeatadataUpdate
timeToNextReconnectAttempt
waitForMetadataFetch
All of them will be 0 on starting up. That means the TMR will be sent at the 
first poll().


> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.8.3
>
> Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
> KAFKA-2042_2015-03-24_13:57:23.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14379015#comment-14379015
 ] 

Jun Rao commented on KAFKA-2042:


If I start a console-producer w/o typing in any message, the producer actually 
doesn't send any metadata request immediately. On initializing the producer, we 
update metadata with the bootstrap broker. This sets lastRefreshMs to the 
current time. So, in NetworkClient.poll(), timeToNextMeatadataUpdate will 
actually be the metadata age, which defaults to 300 secs.

In what situation did you discover this problem?

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.8.3
>
> Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
> KAFKA-2042_2015-03-24_13:57:23.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1293) Mirror maker housecleaning

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14379033#comment-14379033
 ] 

Jiangjie Qin commented on KAFKA-1293:
-

[~mwarhaftig] It looks very closely related to the work in KIP-14. Please feel 
free to own that KIP if you want to.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-14+-+Tools+Standardization
Since this is a public interface change, we need to go through the KIP process. 
You can find the KIP process here:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

> Mirror maker housecleaning
> --
>
> Key: KAFKA-1293
> URL: https://issues.apache.org/jira/browse/KAFKA-1293
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1
>Reporter: Jay Kreps
>Priority: Minor
>  Labels: usability
> Attachments: KAFKA-1293.patch
>
>
> Mirror maker uses it's own convention for command-line arguments, e.g. 
> --num.producers, where everywhere else follows the unix convention like 
> --num-producers. This is annoying because when running different tools you 
> have to constantly remember whatever quirks of the person who wrote that tool.
> Mirror maker should also have a top-level wrapper script in bin/ to make tab 
> completion work and so you don't have to remember the fully qualified class 
> name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)
TAO XIAO created KAFKA-2048:
---

 Summary: java.lang.IllegalMonitorStateException thrown in 
AbstractFetcherThread when handling error returned from simpleConsumer
 Key: KAFKA-2048
 URL: https://issues.apache.org/jira/browse/KAFKA-2048
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
Reporter: TAO XIAO
Assignee: Neha Narkhede


AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in the 
catch block of processFetchRequest method. This is because partitionMapLock is 
not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TAO XIAO updated KAFKA-2048:

Attachment: KAFKA-2048.patch

> java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
> handling error returned from simpleConsumer
> ---
>
> Key: KAFKA-2048
> URL: https://issues.apache.org/jira/browse/KAFKA-2048
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.1
>Reporter: TAO XIAO
>Assignee: Neha Narkhede
> Attachments: KAFKA-2048.patch
>
>
> AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
> the catch block of processFetchRequest method. This is because 
> partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TAO XIAO updated KAFKA-2048:

Status: Patch Available  (was: Open)

> java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
> handling error returned from simpleConsumer
> ---
>
> Key: KAFKA-2048
> URL: https://issues.apache.org/jira/browse/KAFKA-2048
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.1
>Reporter: TAO XIAO
>Assignee: Neha Narkhede
> Attachments: KAFKA-2048.patch
>
>
> AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
> the catch block of processFetchRequest method. This is because 
> partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2042) New producer metadata update always get all topics.

2015-03-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14379120#comment-14379120
 ] 

Jiangjie Qin commented on KAFKA-2042:
-

Ah, yes, that's right. I found this issue when starting mirror maker. Because a 
large mirror maker cluster might take some time to finish consumer rebalance. 
So that's why no producer.send() was called before the first TMR was sent. So 
this issue probably will not cause issue in normal cases under default settings.

> New producer metadata update always get all topics.
> ---
>
> Key: KAFKA-2042
> URL: https://issues.apache.org/jira/browse/KAFKA-2042
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.8.3
>
> Attachments: KAFKA-2042.patch, KAFKA-2042_2015-03-24_13:37:49.patch, 
> KAFKA-2042_2015-03-24_13:57:23.patch
>
>
> The new java producer metadata.topics is initially empty so the producer 
> sends TMR with empty topic set. The broker takes the empty requested topic 
> set as all topics, so metadata.cluster contains all topic metadata. Later on, 
> when a new topic was produced, it gets added into the metadata.topics. The 
> next metadata update will only contain the meta data for this new topic, so 
> the metadata.cluster will only have this topic. Since there are a lot of 
> messages are still in the accumulator but has no metadata in 
> metadata.cluster, if a caller thread do a flush(), the caller thread will 
> block forever because the messages sitting in accumulator without metadata 
> will never be ready to send.
> We should add check for the metadata.topics, if it is empty, no TMR should be 
> sent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TAO XIAO updated KAFKA-2048:

Attachment: (was: KAFKA-2048.patch)

> java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
> handling error returned from simpleConsumer
> ---
>
> Key: KAFKA-2048
> URL: https://issues.apache.org/jira/browse/KAFKA-2048
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.1
>Reporter: TAO XIAO
>Assignee: Neha Narkhede
>
> AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
> the catch block of processFetchRequest method. This is because 
> partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2048) java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when handling error returned from simpleConsumer

2015-03-24 Thread TAO XIAO (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TAO XIAO updated KAFKA-2048:

Status: Open  (was: Patch Available)

> java.lang.IllegalMonitorStateException thrown in AbstractFetcherThread when 
> handling error returned from simpleConsumer
> ---
>
> Key: KAFKA-2048
> URL: https://issues.apache.org/jira/browse/KAFKA-2048
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.1
>Reporter: TAO XIAO
>Assignee: Neha Narkhede
>
> AbstractFetcherThread will throw java.lang.IllegalMonitorStateException in 
> the catch block of processFetchRequest method. This is because 
> partitionMapLock is not acquired before calling partitionMapCond.await()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2049) Add thread that detects JVM pauses

2015-03-24 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-2049:
---

 Summary: Add thread that detects JVM pauses
 Key: KAFKA-2049
 URL: https://issues.apache.org/jira/browse/KAFKA-2049
 Project: Kafka
  Issue Type: Improvement
Reporter: Gwen Shapira
Assignee: Gwen Shapira


Long JVM pauses can cause Kafka malfunctions (especially when interacting with 
ZK) that can be challenging to debug.

I propose implementing HADOOP-9618 in Kafka:
Add a simple thread which loops on 1-second sleeps, and if the sleep ever takes 
significantly longer than 1 second, log a WARN. This will make GC pauses (and 
other pauses) obvious in logs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2015-03-24 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14379162#comment-14379162
 ] 

Sriharsha Chintalapani commented on KAFKA-1461:
---

[~guozhang] Thanks for the review. Can you please take a look at my reply to 
your comment.

> Replica fetcher thread does not implement any back-off behavior
> ---
>
> Key: KAFKA-1461
> URL: https://issues.apache.org/jira/browse/KAFKA-1461
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.1.1
>Reporter: Sam Meder
>Assignee: Sriharsha Chintalapani
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1461.patch, KAFKA-1461.patch, 
> KAFKA-1461_2015-03-11_10:41:26.patch, KAFKA-1461_2015-03-11_18:17:51.patch, 
> KAFKA-1461_2015-03-12_13:54:51.patch, KAFKA-1461_2015-03-17_16:03:33.patch
>
>
> The current replica fetcher thread will retry in a tight loop if any error 
> occurs during the fetch call. For example, we've seen cases where the fetch 
> continuously throws a connection refused exception leading to several replica 
> fetcher threads that spin in a pretty tight loop.
> To a much lesser degree this is also an issue in the consumer fetcher thread, 
> although the fact that erroring partitions are removed so a leader can be 
> re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >