Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Ewen Cheslack-Postava


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java,
> >  line 16
> > 
> >
> > Do we need NONE?
> 
> Jason Gustafson wrote:
> It was there before, but I don't think it's actually used. I'd be fine 
> removing it.

You need it to properly parse the "none" config value and so there is an 
OffsetResetStrategy value to indicate that option. It's just not currently used 
because you handle it with an else rather than an else if.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java,
> >  line 196
> > 
> >
> > To be consistent with the naming convention with the rest of the 
> > methods, should we just name it offsetRestNeeded()?
> 
> Jason Gustafson wrote:
> Haha, I actually used that convention initially, but it was a little 
> confusing at times which method should be used. I can change it back, or we 
> can add the "is" prefix to the other usages. Preferences?

I think I've seen a number of other places where we're not exactly consistent 
with this (grep for "[.]is" for lots of examples). Naming conventions seem to 
be quite mixed between this type, get/set style, and just bare names. Not sure 
it's worth worrying about beyond readability issues.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> ---
> 
> (Updated June 5, 2015, 7:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
> https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>



Re: KIP Call Notes

2015-06-10 Thread Jayesh Thakrar
Our company (Conversant Media) became a Kafka user pretty recently (about 3 
months ago) and I have been following the user and dev group for the past 
month, so could have missed the discussions on KIP-4.
In looking up the current command line capabilities in Kafka 0.8.2.1 (not the 
latest release) it seems we are missing a couple of pieces.
Ideally, I am looking at getting the following kind of information
Describe a "Kafka cluster" (example) -kafka cluster metadata = {
  "brokers" : [ { "id" : 0, attributes from broker Json schema}, { "id" : 1, 
attributes from broker Json schema}, ...]  "topics" : [ { "name" : "topic1", 
attributes from topic and parition and ISR and leader information }, { "name" : 
"topic2", attributes from topic and parition and ISR and leader information } ] 
}

Topic metrics = atleast some kind of message high water mark from the leaders 
for each topic partition
Describe consumers (example) -Consumer metadata = {     [ "group" : 
"group_id1", "client" : "client_id1", [ list of subscribed topics ]  ], ..}
Having such information will make it easy to build monitoring tools and also 
aid troubleshooting and sizing/configuration.
Is discussing these things as part of KIP the right process or is there some 
other forum?
Thanks,Jayesh Thakrar


  From: Gwen Shapira 
 To: "dev@kafka.apache.org"  
 Sent: Tuesday, June 9, 2015 1:19 PM
 Subject: KIP Call Notes
   
Shortest KIP discussion ever :)

Here are my notes. Feel free to correct or expand:

SSL: Harsh is rebasing after Kafka-1928, and resolving an issue with
support for specific API

KIP-4  - ISR in Metadata request: We prefer to preserve this information.

KIP-25 - System Tests: Geoffrey posted an update with discussion
summary, there were no objections to the proposal. We will wait a day
before starting a vote.

New consumer - Separate the API changes for metadata to a new JIRA.
Since the new consumer is still work in progress, there's no need for
KIP for every modification.


  

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jun Rao


> On June 9, 2015, 6:49 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 322
> > 
> >
> > Do we need this? There is no real guarantee on the poll time, so it 
> > seems that we could just return when wakeup is called.
> 
> Jason Gustafson wrote:
> You might be waking up from a synchronous commit, for example. In that 
> case, all we can do is raise an exception. We could alternatively say that 
> wakeup only applies to the poll() method and cannot be used to interrupt the 
> other calls.

If poll just returns on wakeup, how would the caller know if there is an 
intention to close the consumer?


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87257
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> ---
> 
> (Updated June 5, 2015, 7:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
> https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jun Rao


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1212
> > 
> >
> > -1 makes the pollClient block forever. So, we don't get a chance to do 
> > the wakeup check.
> 
> Jason Gustafson wrote:
> I might be wrong, but I think we can still use NetworkClient.wakeup to 
> interrupt a poll call which is waiting forever.

Yes, you are right. I missed that.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 797-798
> > 
> >
> > Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
> > the end of the call, we expect the fetch offset to be set to the beginning. 
> > This is now changed to async, which doesn't match the intended behavior. We 
> > need to think through if this matters or not.
> > 
> > Ditto for seekToEnd().
> 
> Jason Gustafson wrote:
> Since we always update fetch positions before a new fetch and in 
> position(), it didn't seem necessary to make it synchronous. I thought this 
> handling might be more consistent with how new subscriptions are handled 
> (which are asynchronous and defer the initial offset fetch until the next 
> poll or position). That being said, I don't have a strong feeling about it, 
> so we could return to the blocking version.

Making this async may be fine. One implication is that we call position() 
immediately after seekToBeginning(), we may not be able to get the correct 
offset.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> ---
> 
> (Updated June 5, 2015, 7:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
> https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionSt

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jun Rao


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1212
> > 
> >
> > -1 makes the pollClient block forever. So, we don't get a chance to do 
> > the wakeup check.
> 
> Jason Gustafson wrote:
> I might be wrong, but I think we can still use NetworkClient.wakeup to 
> interrupt a poll call which is waiting forever.
> 
> Jun Rao wrote:
> Yes, you are right. I missed that.

Actually, this seems to be still a problem. The issus is that when 
NetworkClient.wakeup is made, NetworkClient.poll may not be called. In the 
case, the next NetworkClient.poll may still block for the timeout.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> ---
> 
> (Updated June 5, 2015, 7:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
> https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jason Gustafson


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1212
> > 
> >
> > -1 makes the pollClient block forever. So, we don't get a chance to do 
> > the wakeup check.
> 
> Jason Gustafson wrote:
> I might be wrong, but I think we can still use NetworkClient.wakeup to 
> interrupt a poll call which is waiting forever.
> 
> Jun Rao wrote:
> Yes, you are right. I missed that.
> 
> Jun Rao wrote:
> Actually, this seems to be still a problem. The issus is that when 
> NetworkClient.wakeup is made, NetworkClient.poll may not be called. In the 
> case, the next NetworkClient.poll may still block for the timeout.

>From the javadocs for Selector, the wakeup will apply to the next poll if one 
>is not in progress. But perhaps we should just check the wakeup flag before 
>entering the poll to be safe.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 797-798
> > 
> >
> > Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
> > the end of the call, we expect the fetch offset to be set to the beginning. 
> > This is now changed to async, which doesn't match the intended behavior. We 
> > need to think through if this matters or not.
> > 
> > Ditto for seekToEnd().
> 
> Jason Gustafson wrote:
> Since we always update fetch positions before a new fetch and in 
> position(), it didn't seem necessary to make it synchronous. I thought this 
> handling might be more consistent with how new subscriptions are handled 
> (which are asynchronous and defer the initial offset fetch until the next 
> poll or position). That being said, I don't have a strong feeling about it, 
> so we could return to the blocking version.
> 
> Jun Rao wrote:
> Making this async may be fine. One implication is that we call position() 
> immediately after seekToBeginning(), we may not be able to get the correct 
> offset.

We should be able to get the right offset since we always update offsets before 
returning the current position, but we might have to block for it. It's similar 
to if you call subscribe(topic) and then try to get its position immediately.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> ---
> 
> (Updated June 5, 2015, 7:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
> https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/k

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jason Gustafson


> On June 9, 2015, 6:49 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 322
> > 
> >
> > Do we need this? There is no real guarantee on the poll time, so it 
> > seems that we could just return when wakeup is called.
> 
> Jason Gustafson wrote:
> You might be waking up from a synchronous commit, for example. In that 
> case, all we can do is raise an exception. We could alternatively say that 
> wakeup only applies to the poll() method and cannot be used to interrupt the 
> other calls.
> 
> Jun Rao wrote:
> If poll just returns on wakeup, how would the caller know if there is an 
> intention to close the consumer?

I think the user would have to use a separate flag to indicate their intention 
to close. Then their shutdown hook would first set the flag, then call 
wakeup(). The polling thread then might look like this:


```
KakfaConsumer consumer = new KafkaConsumer()
try {
  consumer.subscribe("foo");
  
  while (!closed) {
records = consumer.poll(5000)

// Do something with the records
  }
} finally {
  consumer.close();
}
```


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87257
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> ---
> 
> (Updated June 5, 2015, 7:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
> https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>



Re: [VOTE] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-06-10 Thread Jay Kreps
+1

On Tue, Jun 9, 2015 at 11:24 PM, Honghai Chen 
wrote:

>  Hi Kafka,
>
> After a long discussion, please help vote again for the
> KIP. Thanks.
>
>
>
> I wrote a KIP for this after some discussion on KAFKA-1646.
> https://issues.apache.org/jira/browse/KAFKA-1646
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
>
> The RB is here: https://reviews.apache.org/r/33204/diff/4/
>
>
>
>
>
> Thanks,
>
> Honghai Chen
>
>
>


Re: [VOTE] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-06-10 Thread Sriram Subramanian
+1


> On Jun 10, 2015, at 9:38 AM, Jay Kreps  wrote:
> 
> +1
> 
> On Tue, Jun 9, 2015 at 11:24 PM, Honghai Chen 
> wrote:
> 
>> Hi Kafka,
>> 
>>After a long discussion, please help vote again for the
>> KIP. Thanks.
>> 
>> 
>> 
>> I wrote a KIP for this after some discussion on KAFKA-1646.
>> https://issues.apache.org/jira/browse/KAFKA-1646
>> 
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
>> 
>> The RB is here: https://reviews.apache.org/r/33204/diff/4/
>> 
>> 
>> 
>> 
>> 
>> Thanks,
>> 
>> Honghai Chen
>> 
>> 
>> 


Re: [VOTE] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-06-10 Thread Gwen Shapira
+1 (non-binding)

On Tue, Jun 9, 2015 at 11:24 PM, Honghai Chen
 wrote:
> Hi Kafka,
>
> After a long discussion, please help vote again for the KIP.
> Thanks.
>
>
>
> I wrote a KIP for this after some discussion on KAFKA-1646.
> https://issues.apache.org/jira/browse/KAFKA-1646
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
>
> The RB is here: https://reviews.apache.org/r/33204/diff/4/
>
>
>
>
>
> Thanks,
>
> Honghai Chen
>
>


[jira] [Commented] (KAFKA-2253) Deadlock in delayed operation purgatory

2015-06-10 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14580837#comment-14580837
 ] 

Mayuresh Gharat commented on KAFKA-2253:


Thanks [~guozhang] for this patch. I tested this patch (it is still running on 
our test cluster) and I can see that mirror makers are able to produce to 
target clusters just fine. So we should be good. 

Thanks,

Mayuresh

> Deadlock in delayed operation purgatory
> ---
>
> Key: KAFKA-2253
> URL: https://issues.apache.org/jira/browse/KAFKA-2253
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mayuresh Gharat
>Assignee: Guozhang Wang
> Fix For: 0.8.3
>
> Attachments: KAFKA-2253.patch, KAFKA-2253.patch, 
> KAFKA-2253_2015-06-08_11:47:40.patch
>
>
> We hit a deadlock while running brokers with git hash: 
> 9e894aa0173b14d64a900bcf780d6b7809368384
> There's a circular wait between the removeWatchersLock and an operations 
> intrinsic lock.
> {code}
> Found one Java-level deadlock:
> =
> "kafka-request-handler-a":
>   waiting for ownable synchronizer 0x0006da08f9e0, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "ExpirationReaper-xyz"
> "ExpirationReaper-xyz":
>   waiting to lock monitor 0x7f454e18 (object 0x0006b0563fe8, a 
> java.util.LinkedList),
>   which is held by "kafka-request-handler-b"
> "kafka-request-handler-b":
>   waiting for ownable synchronizer 0x0006da08f9e0, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "ExpirationReaper-xyz"
> "kafka-request-handler-a":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006da08f9e0> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
> at 
> java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
> at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224)
> at 
> kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
> at 
> kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358)
> at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288)
> at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
> at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
> at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268)
> at 
> kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244)
> at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790)
> at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787)
> at scala.collection.immutable.Map$Map4.foreach(Map.scala:181)
> at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787)
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> "ExpirationReaper-xyz":
> at 
> kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278)
> - waiting to lock <0x0006b0563fe8> (a java.util.LinkedList)
> at 
> kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
> at 
> kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
> at 
> kafka.server.DelayedOperationPu

Re: [VOTE] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-06-10 Thread JG Homan

+1

On Wed, Jun 10, 2015 at 10:12 AM, Gwen Shapira  
wrote:

+1 (non-binding)

On Tue, Jun 9, 2015 at 11:24 PM, Honghai Chen
 wrote:
> Hi Kafka,
>
> After a long discussion, please help vote again for the KIP.
> Thanks.
>
>
>
> I wrote a KIP for this after some discussion on KAFKA-1646.
> https://issues.apache.org/jira/browse/KAFKA-1646
>
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
>
> The RB is here: https://reviews.apache.org/r/33204/diff/4/
>
>
>
>
>
> Thanks,
>
> Honghai Chen
>
>

[jira] [Created] (KAFKA-2261) Provide a configurable timeout for NetworkClient.send

2015-06-10 Thread Glenn Sontheimer (JIRA)
Glenn Sontheimer created KAFKA-2261:
---

 Summary: Provide a configurable timeout for NetworkClient.send
 Key: KAFKA-2261
 URL: https://issues.apache.org/jira/browse/KAFKA-2261
 Project: Kafka
  Issue Type: New Feature
  Components: clients
Affects Versions: 0.8.2.0
Reporter: Glenn Sontheimer


Currently once a message has been submitted asynchronously (only option for 
0.8.2) there is a possibility that the message could remain in the submission 
state and never initiate the callback.  There are have been several iterations 
of the code (in previous versions) to help address this issue.  However these 
changes handle specific scenarios known as of each point in time, e.g. 
successful sends and node disconnects.  Additional failure scenarios may exist 
and/or be introduced in future iterations of the code base.  A fail safe 
mechanism seems appropriate in this situation while work continues to cover 
known and discovered scenarios.  

Adding a configuration to allow the client application to specify a timeout for 
the message send provides the following advantages:
1.  The client application will be guaranteed that a callback will be performed 
for every message.
2.  The interaction with the Kafka Queue can be better tuned to the 
application's needs.  In some cases a shorter timeout will be necessary to 
ensure data does not become too stale.

 







--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2253) Deadlock in delayed operation purgatory

2015-06-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14580921#comment-14580921
 ] 

Guozhang Wang commented on KAFKA-2253:
--

Thanks for all the reviews! Committed to trunk.

> Deadlock in delayed operation purgatory
> ---
>
> Key: KAFKA-2253
> URL: https://issues.apache.org/jira/browse/KAFKA-2253
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mayuresh Gharat
>Assignee: Guozhang Wang
> Fix For: 0.8.3
>
> Attachments: KAFKA-2253.patch, KAFKA-2253.patch, 
> KAFKA-2253_2015-06-08_11:47:40.patch
>
>
> We hit a deadlock while running brokers with git hash: 
> 9e894aa0173b14d64a900bcf780d6b7809368384
> There's a circular wait between the removeWatchersLock and an operations 
> intrinsic lock.
> {code}
> Found one Java-level deadlock:
> =
> "kafka-request-handler-a":
>   waiting for ownable synchronizer 0x0006da08f9e0, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "ExpirationReaper-xyz"
> "ExpirationReaper-xyz":
>   waiting to lock monitor 0x7f454e18 (object 0x0006b0563fe8, a 
> java.util.LinkedList),
>   which is held by "kafka-request-handler-b"
> "kafka-request-handler-b":
>   waiting for ownable synchronizer 0x0006da08f9e0, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "ExpirationReaper-xyz"
> "kafka-request-handler-a":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006da08f9e0> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
> at 
> java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
> at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224)
> at 
> kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
> at 
> kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358)
> at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288)
> at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
> at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
> at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268)
> at 
> kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244)
> at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790)
> at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787)
> at scala.collection.immutable.Map$Map4.foreach(Map.scala:181)
> at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787)
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> "ExpirationReaper-xyz":
> at 
> kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278)
> - waiting to lock <0x0006b0563fe8> (a java.util.LinkedList)
> at 
> kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
> at 
> kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
> at 
> kafka.server.DelayedOperationPurgatory$Watchers.purgeCompleted(DelayedOperation.scala:322)
> - locked <0x00071a86a478> (a java.util.LinkedList)
> at 
> kafka.server.DelayedOperationPurgatory$

[jira] [Updated] (KAFKA-2253) Deadlock in delayed operation purgatory

2015-06-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2253:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> Deadlock in delayed operation purgatory
> ---
>
> Key: KAFKA-2253
> URL: https://issues.apache.org/jira/browse/KAFKA-2253
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mayuresh Gharat
>Assignee: Guozhang Wang
> Fix For: 0.8.3
>
> Attachments: KAFKA-2253.patch, KAFKA-2253.patch, 
> KAFKA-2253_2015-06-08_11:47:40.patch
>
>
> We hit a deadlock while running brokers with git hash: 
> 9e894aa0173b14d64a900bcf780d6b7809368384
> There's a circular wait between the removeWatchersLock and an operations 
> intrinsic lock.
> {code}
> Found one Java-level deadlock:
> =
> "kafka-request-handler-a":
>   waiting for ownable synchronizer 0x0006da08f9e0, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "ExpirationReaper-xyz"
> "ExpirationReaper-xyz":
>   waiting to lock monitor 0x7f454e18 (object 0x0006b0563fe8, a 
> java.util.LinkedList),
>   which is held by "kafka-request-handler-b"
> "kafka-request-handler-b":
>   waiting for ownable synchronizer 0x0006da08f9e0, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "ExpirationReaper-xyz"
> "kafka-request-handler-a":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006da08f9e0> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
> at 
> java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304)
> at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224)
> at 
> kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166)
> at 
> kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358)
> at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288)
> at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
> at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
> at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268)
> at 
> kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244)
> at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790)
> at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787)
> at scala.collection.immutable.Map$Map4.foreach(Map.scala:181)
> at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787)
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> "ExpirationReaper-xyz":
> at 
> kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278)
> - waiting to lock <0x0006b0563fe8> (a java.util.LinkedList)
> at 
> kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306)
> at 
> kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256)
> at 
> kafka.server.DelayedOperationPurgatory$Watchers.purgeCompleted(DelayedOperation.scala:322)
> - locked <0x00071a86a478> (a java.util.LinkedList)
> at 
> kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.a

Re: [VOTE] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-06-10 Thread Jakob Homan
+1



> On Jun 10, 2015, at 10:36 AM, JG Homan  wrote:
> 
> +1 
> 
> On Wed, Jun 10, 2015 at 10:12 AM, Gwen Shapira  wrote:
> +1 (non-binding)
> 
> On Tue, Jun 9, 2015 at 11:24 PM, Honghai Chen
>  wrote:
> > Hi Kafka,
> >
> > After a long discussion, please help vote again for the KIP.
> > Thanks.
> >
> >
> >
> > I wrote a KIP for this after some discussion on KAFKA-1646.
> > https://issues.apache.org/jira/browse/KAFKA-1646
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
> >
> > The RB is here: https://reviews.apache.org/r/33204/diff/4/
> >
> >
> >
> >
> >
> > Thanks,
> >
> > Honghai Chen
> >
> >


Build failed in Jenkins: Kafka-trunk #506

2015-06-10 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-2253; fix deadlock between removeWatchersLock and watcher 
operations list lock; reviewed by Onur Karaman and Jiangjie Qin

--
[...truncated 419 lines...]
org.apache.kafka.common.record.RecordTest > testFields[60] PASSED

org.apache.kafka.common.record.RecordTest > testChecksum[60] PASSED

org.apache.kafka.common.record.RecordTest > testEquality[60] PASSED

org.apache.kafka.common.record.RecordTest > testFields[61] PASSED

org.apache.kafka.common.record.RecordTest > testChecksum[61] PASSED

org.apache.kafka.common.record.RecordTest > testEquality[61] PASSED

org.apache.kafka.common.record.RecordTest > testFields[62] PASSED

org.apache.kafka.common.record.RecordTest > testChecksum[62] PASSED

org.apache.kafka.common.record.RecordTest > testEquality[62] PASSED

org.apache.kafka.common.record.RecordTest > testFields[63] PASSED

org.apache.kafka.common.record.RecordTest > testChecksum[63] PASSED

org.apache.kafka.common.record.RecordTest > testEquality[63] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[0] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[1] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[2] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[3] PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testSimple 
PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testNulls 
PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testDefault 
PASSED

org.apache.kafka.common.metrics.MetricsTest > testMetricName PASSED

org.apache.kafka.common.metrics.MetricsTest > testSimpleStats PASSED

org.apache.kafka.common.metrics.MetricsTest > testHierarchicalSensors PASSED

org.apache.kafka.common.metrics.MetricsTest > testBadSensorHiearchy PASSED

org.apache.kafka.common.metrics.MetricsTest > testEventWindowing PASSED

org.apache.kafka.common.metrics.MetricsTest > testTimeWindowing PASSED

org.apache.kafka.common.metrics.MetricsTest > testOldDataHasNoEffect PASSED

org.apache.kafka.common.metrics.MetricsTest > testDuplicateMetricName PASSED

org.apache.kafka.common.metrics.MetricsTest > testQuotas PASSED

org.apache.kafka.common.metrics.MetricsTest > testPercentiles PASSED

org.apache.kafka.common.metrics.JmxReporterTest > testJmxRegistration PASSED

org.apache.kafka.common.metrics.stats.HistogramTest > testHistogram PASSED

org.apache.kafka.common.metrics.stats.HistogramTest > testConstantBinScheme 
PASSED

org.apache.kafka.common.metrics.stats.HistogramTest > testLinearBinScheme PASSED

org.apache.kafka.common.config.ConfigDefTest > testBasicTypes PASSED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefault PASSED

org.apache.kafka.common.config.ConfigDefTest > testNullDefault PASSED

org.apache.kafka.common.config.ConfigDefTest > testMissingRequired PASSED

org.apache.kafka.common.config.ConfigDefTest > testDefinedTwice PASSED

org.apache.kafka.common.config.ConfigDefTest > testBadInputs PASSED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefaultRange PASSED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefaultString PASSED

org.apache.kafka.common.config.ConfigDefTest > testValidators PASSED

org.apache.kafka.common.config.AbstractConfigTest > testConfiguredInstances 
PASSED

org.apache.kafka.common.serialization.SerializationTest > testStringSerializer 
PASSED

org.apache.kafka.common.serialization.SerializationTest > testIntegerSerializer 
PASSED

org.apache.kafka.common.network.SelectorTest > testServerDisconnect PASSED

org.apache.kafka.common.network.SelectorTest > testClientDisconnect PASSED

org.apache.kafka.common.network.SelectorTest > testCantSendWithInProgress PASSED

org.apache.kafka.common.network.SelectorTest > testCantSendWithoutConnecting 
PASSED

org.apache.kafka.common.network.SelectorTest > testNoRouteToHost PASSED

org.apache.kafka.common.network.SelectorTest > testConnectionRefused PASSED

org.apache.kafka.common.network.SelectorTest > testNormalOperation PASSED

org.apache.kafka.common.network.SelectorTest > testSendLargeRequest PASSED

org.apache.kafka.common.network.SelectorTest > testEmptyRequest PASSED

org.apache.kafka.common.network.SelectorTest > testExistingConnectionId PASSED

org.apache.kafka.common.network.SelectorTest > testMute PASSED

org.apache.kafka.common.requests.RequestResponseTest > testSerialization PASSED

org.apache.kafka.clients.NetworkClientTest > testReadyAndDisconnect PASSED

org.apache.kafka.clients.NetworkClientTest > testSendToUnreadyNode PASSED

org.apache.kafka.clients.NetworkClientTest > testSimpleRequestResponse PASSED

org.apache.kafka.clients.MetadataTest > testMetadata FAILED
java.lang.AssertionError: 
at org.junit.Assert.fail(Assert.java:91)
at org.junit.Assert.assertTrue(Assert.java:43)
at org.junit.

Re: [VOTE] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-06-10 Thread Neha Narkhede
+1

On Wed, Jun 10, 2015 at 11:45 AM, Jakob Homan  wrote:

> +1
>
>
>
> > On Jun 10, 2015, at 10:36 AM, JG Homan  wrote:
> >
> > +1
> >
> > On Wed, Jun 10, 2015 at 10:12 AM, Gwen Shapira 
> wrote:
> > +1 (non-binding)
> >
> > On Tue, Jun 9, 2015 at 11:24 PM, Honghai Chen
> >  wrote:
> > > Hi Kafka,
> > >
> > > After a long discussion, please help vote again for the KIP.
> > > Thanks.
> > >
> > >
> > >
> > > I wrote a KIP for this after some discussion on KAFKA-1646.
> > > https://issues.apache.org/jira/browse/KAFKA-1646
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
> > >
> > > The RB is here: https://reviews.apache.org/r/33204/diff/4/
> > >
> > >
> > >
> > >
> > >
> > > Thanks,
> > >
> > > Honghai Chen
> > >
> > >
>



-- 
Thanks,
Neha


Re: KIP Call Notes

2015-06-10 Thread Jun Rao
Jayesh,

Most of the metadata you mentioned already exist. For example, ISR is
already in the TopicMetadataResponse and the high water mark can be
returned from the OffsetRequest. The details of the current wire protocol
can be found at
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

We have another KIP (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-24+-+Remove+ISR+information+from+TopicMetadataRequest+and+add+broker+level+metadata+request)
to add a new request for collecting some broker level info. There will be a
discussion thread on this at some point that you can chime in.

Thanks,

Jun


On Wed, Jun 10, 2015 at 8:42 AM, Jayesh Thakrar  wrote:

> Our company (Conversant Media) became a Kafka user pretty recently (about
> 3 months ago) and I have been following the user and dev group for the past
> month, so could have missed the discussions on KIP-4.
> In looking up the current command line capabilities in Kafka 0.8.2.1 (not
> the latest release) it seems we are missing a couple of pieces.
> Ideally, I am looking at getting the following kind of information
> Describe a "Kafka cluster" (example) -kafka cluster metadata = {
>   "brokers" : [ { "id" : 0, attributes from broker Json schema}, { "id" :
> 1, attributes from broker Json schema}, ...]  "topics" : [ { "name" :
> "topic1", attributes from topic and parition and ISR and leader information
> }, { "name" : "topic2", attributes from topic and parition and ISR and
> leader information } ] }
>
> Topic metrics = atleast some kind of message high water mark from the
> leaders for each topic partition
> Describe consumers (example) -Consumer metadata = { [ "group" :
> "group_id1", "client" : "client_id1", [ list of subscribed topics ]  ],
> ..}
> Having such information will make it easy to build monitoring tools and
> also aid troubleshooting and sizing/configuration.
> Is discussing these things as part of KIP the right process or is there
> some other forum?
> Thanks,Jayesh Thakrar
>
>
>   From: Gwen Shapira 
>  To: "dev@kafka.apache.org" 
>  Sent: Tuesday, June 9, 2015 1:19 PM
>  Subject: KIP Call Notes
>
> Shortest KIP discussion ever :)
>
> Here are my notes. Feel free to correct or expand:
>
> SSL: Harsh is rebasing after Kafka-1928, and resolving an issue with
> support for specific API
>
> KIP-4  - ISR in Metadata request: We prefer to preserve this information.
>
> KIP-25 - System Tests: Geoffrey posted an update with discussion
> summary, there were no objections to the proposal. We will wait a day
> before starting a vote.
>
> New consumer - Separate the API changes for metadata to a new JIRA.
> Since the new consumer is still work in progress, there's no need for
> KIP for every modification.
>
>
>
>


Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jun Rao


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1212
> > 
> >
> > -1 makes the pollClient block forever. So, we don't get a chance to do 
> > the wakeup check.
> 
> Jason Gustafson wrote:
> I might be wrong, but I think we can still use NetworkClient.wakeup to 
> interrupt a poll call which is waiting forever.
> 
> Jun Rao wrote:
> Yes, you are right. I missed that.
> 
> Jun Rao wrote:
> Actually, this seems to be still a problem. The issus is that when 
> NetworkClient.wakeup is made, NetworkClient.poll may not be called. In the 
> case, the next NetworkClient.poll may still block for the timeout.
> 
> Jason Gustafson wrote:
> From the javadocs for Selector, the wakeup will apply to the next poll if 
> one is not in progress. But perhaps we should just check the wakeup flag 
> before entering the poll to be safe.

Yes, in that case, this is not an issue. We probaly don't have to check the 
wakeup flag before the poll call since the flag could change immediately after 
the check.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 797-798
> > 
> >
> > Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
> > the end of the call, we expect the fetch offset to be set to the beginning. 
> > This is now changed to async, which doesn't match the intended behavior. We 
> > need to think through if this matters or not.
> > 
> > Ditto for seekToEnd().
> 
> Jason Gustafson wrote:
> Since we always update fetch positions before a new fetch and in 
> position(), it didn't seem necessary to make it synchronous. I thought this 
> handling might be more consistent with how new subscriptions are handled 
> (which are asynchronous and defer the initial offset fetch until the next 
> poll or position). That being said, I don't have a strong feeling about it, 
> so we could return to the blocking version.
> 
> Jun Rao wrote:
> Making this async may be fine. One implication is that we call position() 
> immediately after seekToBeginning(), we may not be able to get the correct 
> offset.
> 
> Jason Gustafson wrote:
> We should be able to get the right offset since we always update offsets 
> before returning the current position, but we might have to block for it. 
> It's similar to if you call subscribe(topic) and then try to get its position 
> immediately.

That may work. However, if one calls seekToBegining() followed by seekToEnd(), 
will we guarantee that position() returns the end offset?


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> ---
> 
> (Updated June 5, 2015, 7:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
> https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/a

Re: Review Request 33204: Patch for KAFKA-1646 merge to latest trunk

2015-06-10 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33204/#review87450
---

Ship it!


Thanks for the patch. +1. Just some minor comments below.


core/src/main/scala/kafka/log/FileMessageSet.scala


Need space after if to be consistent with the rest of the code. There are a 
few other places like that.



core/src/main/scala/kafka/log/FileMessageSet.scala


It's probably clearer if we wrap this in {}.



core/src/main/scala/kafka/log/Log.scala


This check is not necessary since we already verify that the segment size 
is larger than 0 when parsing the configuration.



core/src/main/scala/kafka/server/KafkaConfig.scala


need set => need to set



core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala


FileMessageSEt => FileMessageSet



core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala


clear shutdown => cleanly shut down



core/src/test/scala/unit/kafka/log/LogSegmentTest.scala


clear shutdown => clean shutdown


- Jun Rao


On June 9, 2015, 3:02 a.m., Honghai Chen wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33204/
> ---
> 
> (Updated June 9, 2015, 3:02 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1646
> https://issues.apache.org/jira/browse/KAFKA-1646
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Patch for KAFKA-1646 merge to latest trunk
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/FileMessageSet.scala 
> 2522604bd985c513527fa0c863a7df677ff7a503 
>   core/src/main/scala/kafka/log/Log.scala 
> 84e7b8fe9dd014884b60c4fbe13c835cf02a40e4 
>   core/src/main/scala/kafka/log/LogConfig.scala 
> a907da09e1ccede3b446459225e407cd1ae6d8b3 
>   core/src/main/scala/kafka/log/LogSegment.scala 
> ed039539ac18ea4d65144073915cf112f7374631 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 2d75186a110075e0c322db4b9f7a8c964a7a3e88 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> b320ce9f6a12c0ee392e91beb82e8804d167f9f4 
>   core/src/main/scala/kafka/utils/CoreUtils.scala 
> d0a8fa701564b4c13b3cd6501e1b6218d77e8e06 
>   core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala 
> cec1caecc51507ae339ebf8f3b8a028b12a1a056 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
> 03fb3512c4a4450eac83d4cd4b0919baeaa22942 
> 
> Diff: https://reviews.apache.org/r/33204/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Honghai Chen
> 
>



Re: [VOTE] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-06-10 Thread Jun Rao
+1

Thanks,

Jun

On Tue, Jun 9, 2015 at 11:24 PM, Honghai Chen 
wrote:

>  Hi Kafka,
>
> After a long discussion, please help vote again for the
> KIP. Thanks.
>
>
>
> I wrote a KIP for this after some discussion on KAFKA-1646.
> https://issues.apache.org/jira/browse/KAFKA-1646
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
>
> The RB is here: https://reviews.apache.org/r/33204/diff/4/
>
>
>
>
>
> Thanks,
>
> Honghai Chen
>
>
>


[jira] [Created] (KAFKA-2262) LogSegmentSize validation should be consistent

2015-06-10 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-2262:
--

 Summary: LogSegmentSize validation should be consistent
 Key: KAFKA-2262
 URL: https://issues.apache.org/jira/browse/KAFKA-2262
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2.1
Reporter: Jun Rao
Priority: Trivial


In KafkaConfig, we have the following constraint on LogSegmentBytes
  .define(LogSegmentBytesProp, INT, Defaults.LogSegmentBytes, 
atLeast(Message.MinHeaderSize), HIGH, LogSegmentBytesDoc)

However, at LogConfig level, the constraint is a bit different. We should make 
it to be the same as in KafkaConfig.
  .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(0), MEDIUM, 
SegmentSizeDoc)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper

2015-06-10 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14581292#comment-14581292
 ] 

Ashish K Singh commented on KAFKA-1367:
---

[~junrao] and [~jjkoshy], correct me if my understanding is wrong, but I think 
we agreed on keeping ISR info in TMR and below mentioned approach is our 
preference.

{quote}
When the leader changes the isr, in addition to writing the new isr in the 
partition state in ZK, it also writes the change as a sequential node under a 
new isrChangeNotification path in ZK. The controller listens to child changes 
in the isrChangeNotification path. On child change, the controller reads the 
new isr and broadcasts it through an UpdateMetadataRequest to every broker.
{quote}

Now that we want to keep ISR as part of TMR, do we still need a new 
BrokerMetadataRequest?

> Broker topic metadata not kept in sync with ZooKeeper
> -
>
> Key: KAFKA-1367
> URL: https://issues.apache.org/jira/browse/KAFKA-1367
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.0, 0.8.1
>Reporter: Ryan Berdeen
>Assignee: Ashish K Singh
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1367.txt
>
>
> When a broker is restarted, the topic metadata responses from the brokers 
> will be incorrect (different from ZooKeeper) until a preferred replica leader 
> election.
> In the metadata, it looks like leaders are correctly removed from the ISR 
> when a broker disappears, but followers are not. Then, when a broker 
> reappears, the ISR is never updated.
> I used a variation of the Vagrant setup created by Joe Stein to reproduce 
> this with latest from the 0.8.1 branch: 
> https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2261) Provide a configurable timeout for NetworkClient.send

2015-06-10 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved KAFKA-2261.
-
Resolution: Duplicate

This is duplicate to KAFKA-2120 and KIP-19 addressed it.

> Provide a configurable timeout for NetworkClient.send
> -
>
> Key: KAFKA-2261
> URL: https://issues.apache.org/jira/browse/KAFKA-2261
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 0.8.2.0
>Reporter: Glenn Sontheimer
>
> Currently once a message has been submitted asynchronously (only option for 
> 0.8.2) there is a possibility that the message could remain in the submission 
> state and never initiate the callback.  There are have been several 
> iterations of the code (in previous versions) to help address this issue.  
> However these changes handle specific scenarios known as of each point in 
> time, e.g. successful sends and node disconnects.  Additional failure 
> scenarios may exist and/or be introduced in future iterations of the code 
> base.  A fail safe mechanism seems appropriate in this situation while work 
> continues to cover known and discovered scenarios.  
> Adding a configuration to allow the client application to specify a timeout 
> for the message send provides the following advantages:
> 1.  The client application will be guaranteed that a callback will be 
> performed for every message.
> 2.  The interaction with the Kafka Queue can be better tuned to the 
> application's needs.  In some cases a shorter timeout will be necessary to 
> ensure data does not become too stale.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 35201: Fix KAFKA-2253

2015-06-10 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35201/#review87495
---



core/src/main/scala/kafka/server/DelayedOperation.scala


Not sure if we need this check. Since all writes to watchersForKey are 
sync-ed, it's ok to remove a watcher as long as its count is 0.

I am bit concerned about the overhead on the removeWatchersLock, which is 
global. For example, if you have 1000 requests/sec and each request has 1000 
partitions, that lock is going to be access 1million times in a sec. Could you 
do some tests/profiling before and after we introduced the global lock to see 
if this could be an issue?


- Jun Rao


On June 8, 2015, 6:47 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35201/
> ---
> 
> (Updated June 8, 2015, 6:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2253
> https://issues.apache.org/jira/browse/KAFKA-2253
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Incorporated Jiangjie and Onur's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala 
> 123078d97a7bfe2121655c00f3b2c6af21c53015 
> 
> Diff: https://reviews.apache.org/r/35201/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[VOTE] KIP-25 System test improvements

2015-06-10 Thread Geoffrey Anderson
Hi Kafka,

After a few rounds of discussion on KIP-25, there doesn't seem to be
opposition, so I'd like to propose a vote.

Thanks,
Geoff

On Mon, Jun 8, 2015 at 10:56 PM, Geoffrey Anderson 
wrote:

> Hi KIP-25 thread,
>
> I consolidated some of the questions from this thread and elsewhere.
>
> Q: Can we see a map of what system-test currently tests, which ones we
> want to replace and JIRAs for replacing?
> A: Initial draft here:
> https://cwiki.apache.org/confluence/display/KAFKA/Roadmap+-+port+existing+system+tests
>
> Q: Will ducktape be maintained separately as a github repo?
> A: Yes https://github.com/confluentinc/ducktape
>
> Q: How easy is viewing the test results and logs, how will test output be
> structured?
> A: Hierarchical structure as outlined here:
> https://github.com/confluentinc/ducktape/wiki/Design-overview#output
>
> Q: Does it support code coverage? If not, how easy/ difficult would it be
> to support?
> A: It does not, and we have no immediate plans to support this. Difficulty
> unclear.
>
> Q: It would be nice if each Kafka version that we release will also
> have a separate "tests" artifact that users can download, untar and easily
> run against a Kafka cluster of the same version.
> A: This seems reasonable and not too much extra work. Definitely open to
> discussion on this.
>
> Q: Why not share running services across multiple tests?
> A: Prefer to optimize for simplicity and correctness over what might be a
> questionable improvement in run-time.
>
> Q: Are regressions - in the road map?
> A: yes
>
> Q: Are Jepsen style tests involving network failures in the road map?
> A: yes
>
> Thanks much,
> Geoff
>
>
>


Re: [VOTE] KIP-25 System test improvements

2015-06-10 Thread Gwen Shapira
+1 (non-binding. Actually, since this is non-binding anyway, lets make
it +100. I'm so so excited about having a usable testing framework)

On Wed, Jun 10, 2015 at 6:10 PM, Geoffrey Anderson  wrote:
> Hi Kafka,
>
> After a few rounds of discussion on KIP-25, there doesn't seem to be
> opposition, so I'd like to propose a vote.
>
> Thanks,
> Geoff
>
> On Mon, Jun 8, 2015 at 10:56 PM, Geoffrey Anderson 
> wrote:
>
>> Hi KIP-25 thread,
>>
>> I consolidated some of the questions from this thread and elsewhere.
>>
>> Q: Can we see a map of what system-test currently tests, which ones we
>> want to replace and JIRAs for replacing?
>> A: Initial draft here:
>> https://cwiki.apache.org/confluence/display/KAFKA/Roadmap+-+port+existing+system+tests
>>
>> Q: Will ducktape be maintained separately as a github repo?
>> A: Yes https://github.com/confluentinc/ducktape
>>
>> Q: How easy is viewing the test results and logs, how will test output be
>> structured?
>> A: Hierarchical structure as outlined here:
>> https://github.com/confluentinc/ducktape/wiki/Design-overview#output
>>
>> Q: Does it support code coverage? If not, how easy/ difficult would it be
>> to support?
>> A: It does not, and we have no immediate plans to support this. Difficulty
>> unclear.
>>
>> Q: It would be nice if each Kafka version that we release will also
>> have a separate "tests" artifact that users can download, untar and easily
>> run against a Kafka cluster of the same version.
>> A: This seems reasonable and not too much extra work. Definitely open to
>> discussion on this.
>>
>> Q: Why not share running services across multiple tests?
>> A: Prefer to optimize for simplicity and correctness over what might be a
>> questionable improvement in run-time.
>>
>> Q: Are regressions - in the road map?
>> A: yes
>>
>> Q: Are Jepsen style tests involving network failures in the road map?
>> A: yes
>>
>> Thanks much,
>> Geoff
>>
>>
>>


Re: [VOTE] KIP-25 System test improvements

2015-06-10 Thread Neha Narkhede
+1. Thanks Geoff!





On Wed, Jun 10, 2015 at 6:20 PM -0700, "Gwen Shapira"  
wrote:










+1 (non-binding. Actually, since this is non-binding anyway, lets make
it +100. I'm so so excited about having a usable testing framework)

On Wed, Jun 10, 2015 at 6:10 PM, Geoffrey Anderson  wrote:
> Hi Kafka,
>
> After a few rounds of discussion on KIP-25, there doesn't seem to be
> opposition, so I'd like to propose a vote.
>
> Thanks,
> Geoff
>
> On Mon, Jun 8, 2015 at 10:56 PM, Geoffrey Anderson 
> wrote:
>
>> Hi KIP-25 thread,
>>
>> I consolidated some of the questions from this thread and elsewhere.
>>
>> Q: Can we see a map of what system-test currently tests, which ones we
>> want to replace and JIRAs for replacing?
>> A: Initial draft here:
>> https://cwiki.apache.org/confluence/display/KAFKA/Roadmap+-+port+existing+system+tests
>>
>> Q: Will ducktape be maintained separately as a github repo?
>> A: Yes https://github.com/confluentinc/ducktape
>>
>> Q: How easy is viewing the test results and logs, how will test output be
>> structured?
>> A: Hierarchical structure as outlined here:
>> https://github.com/confluentinc/ducktape/wiki/Design-overview#output
>>
>> Q: Does it support code coverage? If not, how easy/ difficult would it be
>> to support?
>> A: It does not, and we have no immediate plans to support this. Difficulty
>> unclear.
>>
>> Q: It would be nice if each Kafka version that we release will also
>> have a separate "tests" artifact that users can download, untar and easily
>> run against a Kafka cluster of the same version.
>> A: This seems reasonable and not too much extra work. Definitely open to
>> discussion on this.
>>
>> Q: Why not share running services across multiple tests?
>> A: Prefer to optimize for simplicity and correctness over what might be a
>> questionable improvement in run-time.
>>
>> Q: Are regressions - in the road map?
>> A: yes
>>
>> Q: Are Jepsen style tests involving network failures in the road map?
>> A: yes
>>
>> Thanks much,
>> Geoff
>>
>>
>>

[jira] [Commented] (KAFKA-1792) change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments

2015-06-10 Thread manpreet singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14581419#comment-14581419
 ] 

manpreet singh commented on KAFKA-1792:
---

Integrated the patch and tested this new optimized reassignment of partitions. 
Working great!
I was wondering when generating or executing the plan if there's a way to 
fairly allocate leaders for each partition over the given broker-list.
I think as part of fair replica distribution over brokers we would also like to 
balance out the count of leaders over each broker to avoid overloading of one 
broker over the other. 
As of now for after re-partitioning the leader distribution doesn't seem to be 
balanced after running the above patch. 
Any suggestions on this? 

> change behavior of --generate to produce assignment config with fair replica 
> distribution and minimal number of reassignments
> -
>
> Key: KAFKA-1792
> URL: https://issues.apache.org/jira/browse/KAFKA-1792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: tools
>Reporter: Dmitry Pekar
>Assignee: Dmitry Pekar
> Fix For: 0.8.3
>
> Attachments: KAFKA-1792.patch, KAFKA-1792_2014-12-03_19:24:56.patch, 
> KAFKA-1792_2014-12-08_13:42:43.patch, KAFKA-1792_2014-12-19_16:48:12.patch, 
> KAFKA-1792_2015-01-14_12:54:52.patch, KAFKA-1792_2015-01-27_19:09:27.patch, 
> KAFKA-1792_2015-02-13_21:07:06.patch, KAFKA-1792_2015-02-26_16:58:23.patch, 
> generate_alg_tests.txt, rebalance_use_cases.txt
>
>
> Current implementation produces fair replica distribution between specified 
> list of brokers. Unfortunately, it doesn't take
> into account current replica assignment.
> So if we have, for instance, 3 brokers id=[0..2] and are going to add fourth 
> broker id=3, 
> generate will create an assignment config which will redistribute replicas 
> fairly across brokers [0..3] 
> in the same way as those partitions were created from scratch. It will not 
> take into consideration current replica 
> assignment and accordingly will not try to minimize number of replica moves 
> between brokers.
> As proposed by [~charmalloc] this should be improved. New output of improved 
> --generate algorithm should suite following requirements:
> - fairness of replica distribution - every broker will have R or R+1 replicas 
> assigned;
> - minimum of reassignments - number of replica moves between brokers will be 
> minimal;
> Example.
> Consider following replica distribution per brokers [0..3] (we just added 
> brokers 2 and 3):
> - broker - 0, 1, 2, 3 
> - replicas - 7, 6, 0, 0
> The new algorithm will produce following assignment:
> - broker - 0, 1, 2, 3 
> - replicas - 4, 3, 3, 3
> - moves - -3, -3, +3, +3
> It will be fair and number of moves will be 6, which is minimal for specified 
> initial distribution.
> The scope of this issue is:
> - design an algorithm matching the above requirements;
> - implement this algorithm and unit tests;
> - test it manually using different initial assignments;



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2022) simpleconsumer.fetch(req) throws a java.nio.channels.ClosedChannelException: null exception when the original leader fails instead of being trapped in the fetchResponse

2015-06-10 Thread Jinder Aujla (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14581473#comment-14581473
 ] 

Jinder Aujla commented on KAFKA-2022:
-

Hi

I also noticed this, here is part of the stack trace is there a work around or 
something I can do prevent this from happening?

java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:147)
at 
kafka.javaapi.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:99)


thanks

> simpleconsumer.fetch(req) throws a java.nio.channels.ClosedChannelException: 
> null exception when the original leader fails instead of being trapped in the 
> fetchResponse api while consuming messages
> -
>
> Key: KAFKA-2022
> URL: https://issues.apache.org/jira/browse/KAFKA-2022
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.1
> Environment: 3 linux nodes with both zookeepr & brokers running under 
> respective users on each..
>Reporter: Muqeet Mohammed Ali
>Assignee: Neha Narkhede
>
> simpleconsumer.fetch(req) throws a java.nio.channels.ClosedChannelException: 
> null exception when the original leader fails, instead of being trapped in 
> the fetchResponse api while consuming messages. My understanding was that any 
> fetch failures can be found via fetchResponse.hasError() call and then be 
> handled to fetch new leader in this case. Below is the relevant code snippet 
> from the simple consumer with comments marking the line causing 
> exception..can you please comment on this?
> if (simpleconsumer == null) {
>   simpleconsumer = new 
> SimpleConsumer(leaderAddress.getHostName(), leaderAddress.getPort(), 
> consumerTimeout,
>   consumerBufferSize, 
> consumerId);
> }
> FetchRequest req = new FetchRequestBuilder().clientId(getConsumerId())
>   .addFetch(topic, partition, 
> offsetManager.getTempOffset(), consumerBufferSize)
>   // Note: the fetchSize might need to be increased
>   // if large batches are written to Kafka
>   .build();
> // exception is throw at the below line
> FetchResponse fetchResponse = simpleconsumer.fetch(req);
> if (fetchResponse.hasError()) {
>   numErrors++;
> etc...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-25 System test improvements

2015-06-10 Thread Joe Stein
+1

~ Joestein
On Jun 10, 2015 10:21 PM, "Neha Narkhede"  wrote:

> +1. Thanks Geoff!
>
>
>
>
>
> On Wed, Jun 10, 2015 at 6:20 PM -0700, "Gwen Shapira" <
> gshap...@cloudera.com> wrote:
>
>
>
>
>
>
>
>
>
>
> +1 (non-binding. Actually, since this is non-binding anyway, lets make
> it +100. I'm so so excited about having a usable testing framework)
>
> On Wed, Jun 10, 2015 at 6:10 PM, Geoffrey Anderson  wrote:
> > Hi Kafka,
> >
> > After a few rounds of discussion on KIP-25, there doesn't seem to be
> > opposition, so I'd like to propose a vote.
> >
> > Thanks,
> > Geoff
> >
> > On Mon, Jun 8, 2015 at 10:56 PM, Geoffrey Anderson
> > wrote:
> >
> >> Hi KIP-25 thread,
> >>
> >> I consolidated some of the questions from this thread and elsewhere.
> >>
> >> Q: Can we see a map of what system-test currently tests, which ones we
> >> want to replace and JIRAs for replacing?
> >> A: Initial draft here:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Roadmap+-+port+existing+system+tests
> >>
> >> Q: Will ducktape be maintained separately as a github repo?
> >> A: Yes https://github.com/confluentinc/ducktape
> >>
> >> Q: How easy is viewing the test results and logs, how will test output
> be
> >> structured?
> >> A: Hierarchical structure as outlined here:
> >> https://github.com/confluentinc/ducktape/wiki/Design-overview#output
> >>
> >> Q: Does it support code coverage? If not, how easy/ difficult would it
> be
> >> to support?
> >> A: It does not, and we have no immediate plans to support this.
> Difficulty
> >> unclear.
> >>
> >> Q: It would be nice if each Kafka version that we release will also
> >> have a separate "tests" artifact that users can download, untar and
> easily
> >> run against a Kafka cluster of the same version.
> >> A: This seems reasonable and not too much extra work. Definitely open to
> >> discussion on this.
> >>
> >> Q: Why not share running services across multiple tests?
> >> A: Prefer to optimize for simplicity and correctness over what might be
> a
> >> questionable improvement in run-time.
> >>
> >> Q: Are regressions - in the road map?
> >> A: yes
> >>
> >> Q: Are Jepsen style tests involving network failures in the road map?
> >> A: yes
> >>
> >> Thanks much,
> >> Geoff
> >>
> >>
> >>


[jira] [Resolved] (KAFKA-1882) Create extendable channel interface and default implementations

2015-06-10 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira resolved KAFKA-1882.
-
Resolution: Duplicate

I believe this is already handled in the SSL patches

> Create extendable channel interface and default implementations
> ---
>
> Key: KAFKA-1882
> URL: https://issues.apache.org/jira/browse/KAFKA-1882
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Blocker
> Fix For: 0.8.3
>
>
> For the security infrastructure, we need an extendible interface to replace 
> SocketChannel.
> KAFKA-1684 suggests extending SocketChannel itself, but since SocketChannel 
> is part of Java's standard library, the interface changes between different 
> Java versions, so extending it directly can become a compatibility issue.
> Instead, we can implement a KafkaChannel interface, which will implement 
> connect(), read(), write() and possibly other methods we use. 
> We will replace direct use of SocketChannel in our code with use of 
> KafkaChannel.
> Different implementations of KafkaChannel will be instantiated based on the 
> port/SecurityProtocol configuration. 
> This patch will provide at least the PLAINTEXT implementation for 
> KafkaChannel.
> I will validate that the SSL implementation in KAFKA-1684 can be refactored 
> to use a KafkaChannel interface rather than extend SocketChannel directly. 
> However, the patch will not include the SSL channel itself.
> The interface should also include setters/getters for principal and remote 
> IP, which will be used for the authentication code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1882) Create extendable channel interface and default implementations

2015-06-10 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1882:

Fix Version/s: (was: 0.8.3)

> Create extendable channel interface and default implementations
> ---
>
> Key: KAFKA-1882
> URL: https://issues.apache.org/jira/browse/KAFKA-1882
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Blocker
>
> For the security infrastructure, we need an extendible interface to replace 
> SocketChannel.
> KAFKA-1684 suggests extending SocketChannel itself, but since SocketChannel 
> is part of Java's standard library, the interface changes between different 
> Java versions, so extending it directly can become a compatibility issue.
> Instead, we can implement a KafkaChannel interface, which will implement 
> connect(), read(), write() and possibly other methods we use. 
> We will replace direct use of SocketChannel in our code with use of 
> KafkaChannel.
> Different implementations of KafkaChannel will be instantiated based on the 
> port/SecurityProtocol configuration. 
> This patch will provide at least the PLAINTEXT implementation for 
> KafkaChannel.
> I will validate that the SSL implementation in KAFKA-1684 can be refactored 
> to use a KafkaChannel interface rather than extend SocketChannel directly. 
> However, the patch will not include the SSL channel itself.
> The interface should also include setters/getters for principal and remote 
> IP, which will be used for the authentication code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2249) KafkaConfig does not preserve original Properties

2015-06-10 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2249:

Assignee: Gwen Shapira
  Status: Patch Available  (was: Open)

> KafkaConfig does not preserve original Properties
> -
>
> Key: KAFKA-2249
> URL: https://issues.apache.org/jira/browse/KAFKA-2249
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2249.patch
>
>
> We typically generate configuration from properties objects (or maps).
> The old KafkaConfig, and the new ProducerConfig and ConsumerConfig all retain 
> the original Properties object, which means that if the user specified 
> properties that are not part of ConfigDef definitions, they are still 
> accessible.
> This is important especially for MetricReporters where we want to allow users 
> to pass arbitrary properties for the reporter.
> One way to support this is by having KafkaConfig implement AbstractConfig, 
> which will give us other nice functionality too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2249) KafkaConfig does not preserve original Properties

2015-06-10 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2249:

Attachment: KAFKA-2249.patch

> KafkaConfig does not preserve original Properties
> -
>
> Key: KAFKA-2249
> URL: https://issues.apache.org/jira/browse/KAFKA-2249
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
> Attachments: KAFKA-2249.patch
>
>
> We typically generate configuration from properties objects (or maps).
> The old KafkaConfig, and the new ProducerConfig and ConsumerConfig all retain 
> the original Properties object, which means that if the user specified 
> properties that are not part of ConfigDef definitions, they are still 
> accessible.
> This is important especially for MetricReporters where we want to allow users 
> to pass arbitrary properties for the reporter.
> One way to support this is by having KafkaConfig implement AbstractConfig, 
> which will give us other nice functionality too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 35347: Patch for KAFKA-2249

2015-06-10 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35347/
---

Review request for kafka.


Bugs: KAFKA-2249
https://issues.apache.org/jira/browse/KAFKA-2249


Repository: kafka


Description
---

modified KafkaConfig to implement AbstractConfig. This resulted in somewhat 
cleaner code, and we preserve the original Properties for use by MetricReporter


Diffs
-

  clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
c4fa058692f50abb4f47bd344119d805c60123f5 
  core/src/main/scala/kafka/controller/KafkaController.scala 
69bba243a9a511cc5292b43da0cc48e421a428b0 
  core/src/main/scala/kafka/server/KafkaApis.scala 
d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
2d75186a110075e0c322db4b9f7a8c964a7a3e88 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
ace6321b36d809946554d205bc926c9c76a43bd6 

Diff: https://reviews.apache.org/r/35347/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Commented] (KAFKA-2249) KafkaConfig does not preserve original Properties

2015-06-10 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14581505#comment-14581505
 ] 

Gwen Shapira commented on KAFKA-2249:
-

Created reviewboard https://reviews.apache.org/r/35347/diff/
 against branch trunk

> KafkaConfig does not preserve original Properties
> -
>
> Key: KAFKA-2249
> URL: https://issues.apache.org/jira/browse/KAFKA-2249
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
> Attachments: KAFKA-2249.patch
>
>
> We typically generate configuration from properties objects (or maps).
> The old KafkaConfig, and the new ProducerConfig and ConsumerConfig all retain 
> the original Properties object, which means that if the user specified 
> properties that are not part of ConfigDef definitions, they are still 
> accessible.
> This is important especially for MetricReporters where we want to allow users 
> to pass arbitrary properties for the reporter.
> One way to support this is by having KafkaConfig implement AbstractConfig, 
> which will give us other nice functionality too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)