Re: Review Request 27391: Fix KAFKA-1634

2014-12-18 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review65462
---



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java


can you add a comment: // only v0, v1 of offsetcommitrequest



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java


Not introduced by your patch, but it is odd that these are named 
topicResponseObj and partitionResponse below - probably an artifact of 
copy/paste. Can you do a rename here before checking in?



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala


I think we discussed before that timestamp does not need to be a var. We 
can use the case class copy method to make a copy + edit.



core/src/main/scala/kafka/server/KafkaServer.scala


Thanks for fixing this



core/src/main/scala/kafka/server/OffsetManager.scala


I think it would be better to move this to just before the call to 
offsetCommitValue in the loop in line 228. This method should only be 
responsible for taking the offsetAndMetadata and converting that into the 
on-disk bytes and should not concern itself with setting a critical field like 
the expiration timestamp. I was actually looking for where this happens (i.e., 
setting the expiration time) and took me a while to realize it was hidden in 
here.



core/src/main/scala/kafka/server/OffsetManager.scala


I think we can make this and some other methods here private.



core/src/main/scala/kafka/server/OffsetManager.scala


private



core/src/main/scala/kafka/server/OffsetManager.scala


Also, let us use a case class instead of a tuple



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala


Rather than sleep, we should improve OffsetManager to take in a 
MockScheduler instance - we can pass through the time instance from KafkaServer 
to offsetManager as we do for LogManager and replicaManager. That way we can 
advance time with MockTime. This test will need to change from OffsetCommitTest 
to OffsetManagerTest and we will just test the OffsetManager. Can you file a 
jira for that? Although that would make sense only after you check this in.



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala


If the offset in fact did expire, the assertion itself won't fail - i.e., 
you will get a NoSuchElementException

Same comments apply to checks below.


- Joel Koshy


On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> ---
> 
> (Updated Dec. 2, 2014, 2:03 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
> https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Add another api in offset manager to return the struct, and the cache layer 
> will only read its expiration timestamp while the offset formatter will read 
> the struct as a whole
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
> 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
> 121e880a941fcd3e6392859edba11a94236494cc 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
>  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   
> clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
>  df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
> 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
> c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
> 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 3c79428962604800983415f6f705e04f52acb8fb 
>  

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Joel Koshy


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
> > 338
> > 
> >
> > Can we make this a "reliable" commit - i.e., with retries up to the 
> > configured retry count? The policy is retry on commit errors during 
> > rebalance or shutdown, no need to retry on commit errors during 
> > auto-commits.
> > 
> > So for e.g., if a mirror maker rebalances and there is simultaneously 
> > offset manager movement we would need to retry the commit.
> > 
> > This is the motivation for the isAutoCommit flag - however, there seems 
> > to be a bug right now which maybe you can fix. i.e., if this is not an 
> > auto-commit then set retries to configured retries else no retries.
> 
> Jiangjie Qin wrote:
> Changed the code based you your suggestion. My original thinking is that 
> in mirror maker one commit failure actually does not matter too much because 
> next commit will succeed if the failure is due to offset topic leader 
> migration, etc. But for a more general purpose, it probably should retry if 
> it is not an auto commit.

I was thinking more about the shutdown and rebalance cases. We ideally want the 
commits to be reliable for those cases.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 192
> > 
> >
> > Why do you need a dummy param?
> 
> Jiangjie Qin wrote:
> Because the Utils.createObjct needs a args parameter and if we pass in a 
> null it will give an NPE...
> I've changed the code in Utils to allow us to pass in a null which use 
> the no arg constructor.

See comment in latest RB.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489
> > 
> >
> > Why not use KafkaScheduler for the offset commit task?
> 
> Jiangjie Qin wrote:
> Haven't thought that before... But it looks that we need to do some more 
> handling when something wrong happen in the offset commit threads. The 
> KafkaScheduler code seems not do so.

So you can make the task itself catch throwables. So it would look something 
like this:

scheduler.schedule("mirrorMakerOffsetsCommiter", commitTask, ...)

And in commitTask:
try {
  commitOffsets()
}
catch {
  case t: Throwable =>
// handle
}

That said, I don't think connector.commitOffsets will throw anything - since we 
catch all throwables there.

The only additional detail is that after you shutdown the scheduler you will 
need to call commitOffsets() manually one last time.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
> > 
> >
> > There might be a small memory leak here: if there was an error though 
> > (in the branch above) it seems no one removes the offset from the list.
> 
> Jiangjie Qin wrote:
> Yes, that is a memory leak, but in this case we should not commit the 
> offset of the message that was not sent successfully either. If any exception 
> occurs then the offsets will not advance anymore. We probably should have an 
> alert on the mirror consumer lags.

Hmm.. not sure if I'm on the same page here. See comments on new RB.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 705
> > 
> >
> > You need to also set tail.next = null (or None)
> 
> Jiangjie Qin wrote:
> tail.next = null will be handled in the previous if...else... the old 
> tail.prev.next will become new tail.next, which will be null.

Makes sense.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 666
> > 
> >
> > Should we expose a size() method - i.e., increment on add and decrement 
> > on remove. We can aggregate the size of all the offset lists outside and 
> > emit a gauge. That will give us some assurance that there are no 
> > "forgotten" offsets. Re: the potential leak mentioned above.
> > 
> > In fact, I'm a bit nervous about correctness since this is a custom 
> > implementation of a semi-non-trivial data structure. We should probably 
> > even assert that it is empty when numMessageUnacked goes to zero as part of 
> > the rebalance.
> > 
> > Ideally, these custom implementations need a full-fledged unit test.
> 
> Jiangjie Qin wrote:
> That's a good point, we probably need a metric to see if we have some 
> stuck offsets. But those stuck offsets should also 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65477
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala


This should be like the previous code - i.e., 1 + ...

E.g., if config.offsetsCommitMaxRetries is one, then we can have two 
attempts. In this version at most one attempt will be made.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala


Actually we should fix this. If this is the final commit during shutting 
down (and NOT autocommit) then we need to retry on error up to retries 
remaining.



core/src/main/scala/kafka/tools/MirrorMaker.scala


Can you use a named parameter here? i.e., `valueFactory = Some(...`



core/src/main/scala/kafka/tools/MirrorMaker.scala


Should this be fatal? i.e., fatal is normally used before exiting 
(abnormally). WARN would be more suitable.

I don't think it makes sense to "not advance" the offset here especially if 
you will still keep sending messages. I think you need to remove it from the 
unacked offset list. E.g., you may configure your mirror maker producer to only 
few retries (in which case you are okay with data loss). In this scenario you 
should just let the error go and allow the mirror maker to proceed normally.

If someone wants zero data loss the MM should be configured with required 
acks -1 and infinite retries.

Maybe I'm misunderstanding what zero data loss really means - can you 
clarify? (Especially if someone configures the producer with acks (say) one and 
limited retries)



core/src/main/scala/kafka/tools/MirrorMaker.scala


Should we make this a generic DoublyLinkedList data structure in utils or 
some other suitable place and unit test it as well?



core/src/main/scala/kafka/tools/MirrorMaker.scala


Does not seem to be used. (but it probably should be used)



core/src/main/scala/kafka/utils/Utils.scala


I don't think this is necessary right? i.e., args.map won't throw an NPE if 
you don't provide any additional arguments.

scala> def f(args: Int*) {println(args.size)}
f: (args: Int*)Unit

scala> f(1,2)
2

scala> f()
0


- Joel Koshy


On Dec. 17, 2014, 8:29 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> ---
> 
> (Updated Dec. 17, 2014, 8:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
> https://issues.apache.org/jira/browse/KAFKA-1650
> https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Incorporated Jun's comments
> 
> 
> Incorporated Jun's comments and rebased on trunk
> 
> 
> Rebased on current trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel'

[jira] [Commented] (KAFKA-1823) transient unit test failure in PartitionAssignorTest

2014-12-18 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14251549#comment-14251549
 ] 

Joel Koshy commented on KAFKA-1823:
---

This was caused by KAFKA-1684 - it can be reproduced by setting MaxTopicCount 
to 0. Will upload a patch in a minute.

> transient unit test failure in PartitionAssignorTest
> 
>
> Key: KAFKA-1823
> URL: https://issues.apache.org/jira/browse/KAFKA-1823
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>
> Saw the following transient unit test failure.
> unit.kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor 
> FAILED
> java.lang.UnsupportedOperationException: empty.max
> at 
> scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216)
> at scala.collection.AbstractIterator.max(Iterator.scala:1157)
> at 
> unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:190)
> at 
> unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at 
> unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 29203: Patch for KAFKA-1823

2014-12-18 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29203/
---

Review request for kafka.


Bugs: KAFKA-1823
https://issues.apache.org/jira/browse/KAFKA-1823


Repository: kafka


Description
---

KAFKA-1823; Fix transient PartitionAssignorTest failure (triggered when there 
are no topics)


Diffs
-

  core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala 
24954de66ccc5158696166b7e2aabad0f1b1f287 

Diff: https://reviews.apache.org/r/29203/diff/


Testing
---


Thanks,

Joel Koshy



[jira] [Updated] (KAFKA-1823) transient unit test failure in PartitionAssignorTest

2014-12-18 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1823:
--
Attachment: KAFKA-1823.patch

> transient unit test failure in PartitionAssignorTest
> 
>
> Key: KAFKA-1823
> URL: https://issues.apache.org/jira/browse/KAFKA-1823
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Jun Rao
> Attachments: KAFKA-1823.patch
>
>
> Saw the following transient unit test failure.
> unit.kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor 
> FAILED
> java.lang.UnsupportedOperationException: empty.max
> at 
> scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216)
> at scala.collection.AbstractIterator.max(Iterator.scala:1157)
> at 
> unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:190)
> at 
> unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at 
> unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1823) transient unit test failure in PartitionAssignorTest

2014-12-18 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1823:
--
Assignee: Joel Koshy
  Status: Patch Available  (was: Open)

> transient unit test failure in PartitionAssignorTest
> 
>
> Key: KAFKA-1823
> URL: https://issues.apache.org/jira/browse/KAFKA-1823
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>Assignee: Joel Koshy
> Attachments: KAFKA-1823.patch
>
>
> Saw the following transient unit test failure.
> unit.kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor 
> FAILED
> java.lang.UnsupportedOperationException: empty.max
> at 
> scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216)
> at scala.collection.AbstractIterator.max(Iterator.scala:1157)
> at 
> unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:190)
> at 
> unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at 
> unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1823) transient unit test failure in PartitionAssignorTest

2014-12-18 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14251551#comment-14251551
 ] 

Joel Koshy commented on KAFKA-1823:
---

Created reviewboard https://reviews.apache.org/r/29203/
 against branch origin/trunk

> transient unit test failure in PartitionAssignorTest
> 
>
> Key: KAFKA-1823
> URL: https://issues.apache.org/jira/browse/KAFKA-1823
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Jun Rao
> Attachments: KAFKA-1823.patch
>
>
> Saw the following transient unit test failure.
> unit.kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor 
> FAILED
> java.lang.UnsupportedOperationException: empty.max
> at 
> scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216)
> at scala.collection.AbstractIterator.max(Iterator.scala:1157)
> at 
> unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:190)
> at 
> unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at 
> unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1823) transient unit test failure in PartitionAssignorTest

2014-12-18 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14251549#comment-14251549
 ] 

Joel Koshy edited comment on KAFKA-1823 at 12/18/14 11:43 AM:
--

This was caused by KAFKA-1648 - it can be reproduced by setting MaxTopicCount 
to 0. Will upload a patch in a minute.


was (Author: jjkoshy):
This was caused by KAFKA-1684 - it can be reproduced by setting MaxTopicCount 
to 0. Will upload a patch in a minute.

> transient unit test failure in PartitionAssignorTest
> 
>
> Key: KAFKA-1823
> URL: https://issues.apache.org/jira/browse/KAFKA-1823
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>Assignee: Joel Koshy
> Attachments: KAFKA-1823.patch
>
>
> Saw the following transient unit test failure.
> unit.kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor 
> FAILED
> java.lang.UnsupportedOperationException: empty.max
> at 
> scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216)
> at scala.collection.AbstractIterator.max(Iterator.scala:1157)
> at 
> unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:190)
> at 
> unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at 
> unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-1824:
---

 Summary: in ConsoleProducer - properties key.separator and 
parse.key no longer work
 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira


Looks like the change in kafka-1711 breaks them accidentally.

reader.init is called with readerProps which is initialized with commandline 
properties as defaults.

the problem is that reader.init checks:
if(props.containsKey("parse.key"))
and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1806) broker can still expose uncommitted data to a consumer

2014-12-18 Thread lokesh Birla (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14251940#comment-14251940
 ] 

lokesh Birla commented on KAFKA-1806:
-

Hi Neha,

What is the status of fixing this issue? This issue happens on every run. I 
have seen, if I use: num.replica.fetchers=1, then sometimes this issue goes 
away however I see other problem of leadership changes very often even when all 
brokers are running. 

If I set: num.replica.fetchers=4, then I can reproduce this issue on every run. 

Please let me or Evan (from sarama) know if you need any help to fix this. 

> broker can still expose uncommitted data to a consumer
> --
>
> Key: KAFKA-1806
> URL: https://issues.apache.org/jira/browse/KAFKA-1806
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: lokesh Birla
>Assignee: Neha Narkhede
>
> Although following issue: https://issues.apache.org/jira/browse/KAFKA-727
> is marked fixed but I still see this issue in 0.8.1.1. I am able to 
> reproducer the issue consistently. 
> [2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch 
> request for partition [mmetopic4,2] offset 1940029 from consumer with 
> correlation id 21 (kafka.server.Kaf
> kaApis)
> java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
> (1818353) less than the start offset (1940029).
> at kafka.log.LogSegment.read(LogSegment.scala:136)
> at kafka.log.Log.read(Log.scala:386)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:119)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> at scala.collection.immutable.Map$Map1.map(Map.scala:107)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> at 
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
> at 
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
> at 
> kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 29210: Patch for KAFKA-1819

2014-12-18 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29210/
---

Review request for kafka.


Bugs: KAFKA-1819
https://issues.apache.org/jira/browse/KAFKA-1819


Repository: kafka


Description
---

added locking


Diffs
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
f8fcb843c80eec3cf3c931df6bb472c019305253 
  core/src/main/scala/kafka/log/LogCleanerManager.scala 
bcfef77ed53f94017c06a884e4db14531774a0a2 
  core/src/main/scala/kafka/log/LogManager.scala 
4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
29cc01bcef9cacd8dec1f5d662644fc6fe4994bc 

Diff: https://reviews.apache.org/r/29210/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics

2014-12-18 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1819:

Attachment: KAFKA-1819.patch

> Cleaner gets confused about deleted and re-created topics
> -
>
> Key: KAFKA-1819
> URL: https://issues.apache.org/jira/browse/KAFKA-1819
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gian Merlino
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1819.patch
>
>
> I get an error like this after deleting a compacted topic and re-creating it. 
> I think it's because the brokers don't remove cleaning checkpoints from the 
> cleaner-offset-checkpoint file. This is from a build based off commit bd212b7.
> java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
> 587607 but segment base offset is 0 for log foo-6.
> at scala.Predef$.require(Predef.scala:233)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502)
> at kafka.log.Cleaner.clean(LogCleaner.scala:300)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252039#comment-14252039
 ] 

Gwen Shapira commented on KAFKA-1819:
-

Created reviewboard https://reviews.apache.org/r/29210/diff/
 against branch trunk

> Cleaner gets confused about deleted and re-created topics
> -
>
> Key: KAFKA-1819
> URL: https://issues.apache.org/jira/browse/KAFKA-1819
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gian Merlino
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1819.patch
>
>
> I get an error like this after deleting a compacted topic and re-creating it. 
> I think it's because the brokers don't remove cleaning checkpoints from the 
> cleaner-offset-checkpoint file. This is from a build based off commit bd212b7.
> java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
> 587607 but segment base offset is 0 for log foo-6.
> at scala.Predef$.require(Predef.scala:233)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502)
> at kafka.log.Cleaner.clean(LogCleaner.scala:300)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics

2014-12-18 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1819:

Assignee: Gwen Shapira
  Status: Patch Available  (was: Open)

> Cleaner gets confused about deleted and re-created topics
> -
>
> Key: KAFKA-1819
> URL: https://issues.apache.org/jira/browse/KAFKA-1819
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gian Merlino
>Assignee: Gwen Shapira
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1819.patch
>
>
> I get an error like this after deleting a compacted topic and re-creating it. 
> I think it's because the brokers don't remove cleaning checkpoints from the 
> cleaner-offset-checkpoint file. This is from a build based off commit bd212b7.
> java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
> 587607 but segment base offset is 0 for log foo-6.
> at scala.Predef$.require(Predef.scala:233)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502)
> at kafka.log.Cleaner.clean(LogCleaner.scala:300)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 29211: Patch for KAFKA-1824

2014-12-18 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29211/
---

Review request for kafka.


Bugs: KAFKA-1824
https://issues.apache.org/jira/browse/KAFKA-1824


Repository: kafka


Description
---

KAFKA-1824 - fix ConsoleProducer so parse.key and key.separator will work again


Diffs
-

  core/src/main/scala/kafka/tools/ConsoleProducer.scala 
1061cc74fac69693836f1e75add06b09d459a764 

Diff: https://reviews.apache.org/r/29211/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252061#comment-14252061
 ] 

Gwen Shapira commented on KAFKA-1824:
-

Created reviewboard https://reviews.apache.org/r/29211/diff/
 against branch trunk

> in ConsoleProducer - properties key.separator and parse.key no longer work
> --
>
> Key: KAFKA-1824
> URL: https://issues.apache.org/jira/browse/KAFKA-1824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
> Attachments: KAFKA-1824.patch
>
>
> Looks like the change in kafka-1711 breaks them accidentally.
> reader.init is called with readerProps which is initialized with commandline 
> properties as defaults.
> the problem is that reader.init checks:
> if(props.containsKey("parse.key"))
> and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1824:

Attachment: KAFKA-1824.patch

> in ConsoleProducer - properties key.separator and parse.key no longer work
> --
>
> Key: KAFKA-1824
> URL: https://issues.apache.org/jira/browse/KAFKA-1824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
> Attachments: KAFKA-1824.patch
>
>
> Looks like the change in kafka-1711 breaks them accidentally.
> reader.init is called with readerProps which is initialized with commandline 
> properties as defaults.
> the problem is that reader.init checks:
> if(props.containsKey("parse.key"))
> and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1824:

Assignee: Gwen Shapira
  Status: Patch Available  (was: Open)

> in ConsoleProducer - properties key.separator and parse.key no longer work
> --
>
> Key: KAFKA-1824
> URL: https://issues.apache.org/jira/browse/KAFKA-1824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-1824.patch
>
>
> Looks like the change in kafka-1711 breaks them accidentally.
> reader.init is called with readerProps which is initialized with commandline 
> properties as defaults.
> the problem is that reader.init checks:
> if(props.containsKey("parse.key"))
> and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: API Annotations

2014-12-18 Thread Gwen Shapira
Thanks for the comments, Joe and Jay.

Doing the public / private designation at package level is definitely
easier than going method-by-method :)

If I get your comments right, the idea is to publish java docs for the
public apis on our website (or archive.apache.org) and not publish
java docs for the private apis.

The pros to that approach is that it doesn't require going through
every single public method in our code-base and marking it private or
public. I'm definitely not looking forward to that patch.
The cons is that open source developers don't necessarily assume that
lack of java docs imply that a method is not public... they usually
assume that the project was sloppy about its docs.

I think the pros win here :)

Regarding the location of the javadocs - the website, via SVN is
traditional, and probably more google-able. Updating the SVN shouldn't
be too-painful to do as part of the release process?  Right now
googling for "kafka javadoc" does not lead the to latest 0.8.2 docs.
I'm getting github, stackoverflow and Neha's private apache page.

Then there's the other attribute - which API is stable.
Currently Kafka pretty much assumes an API is stable from the moment
its committed to the trunk, I believe. It may make the development
process slightly easier if we can mark new APIs as "evolving" until we
are certain we are happy with them. It will allow us to iterate faster
and let users try out newer APIs.

Do you think adding this will be helpful?



On Tue, Dec 16, 2014 at 11:24 AM, Jay Kreps  wrote:
> Hey Gwen,
>
> We discussed this a bit about this when starting on the new clients.
>
> We were super sloppy about this in initial Kafka development--single jar,
> no real differentiation between public and private apis.
>
> The plan was something like the following:
> 1. Start to consider this with the new clients.
> 2. Do the public/private designation at the package level. The public
> packages are o.a.k.common, o.a.k.errors, o.a.k.producer, o.a.k.consumer,
> o.a.k.tools. This makes javadoc and things like that easier, and it makes
> it easy to see at a glance all the public classes. It would be even better
> to enforce this in the build if that is possible (i.e. no class from a
> non-public package is leaked) but we haven't done this. This approach
> obviously wasn't possible in Hadoop since they started without a clear
> delineation as we did in the original scala code.
>
> Thoughts?
>
> -Jay
>
>
> On Tue, Dec 16, 2014 at 10:04 AM, Gwen Shapira 
> wrote:
>>
>> Hi,
>>
>> Kafka has public APIs in Java and Scala, intended for use by external
>> developers.
>> In addition, Kafka also exposes many public methods that are intended
>> to use within Kafka but are not intended to be called by external
>> developers.
>> Also, some of the external APIs are less stable than others (the new
>> producer for example).
>>
>> In Hadoop we have a similar situation, and to avoid misunderstandings
>> or miscommunications on which APIs are external and which are stable,
>> we use annotations to communicate this information.
>> We find it very useful in preventing our customers from accidentally
>> getting into trouble by using internal methods or unstable APIs.
>>
>> Here are the annotations Hadoop uses:
>>
>> https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceStability.html
>>
>> https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceAudience.html
>>
>> I'm wondering what others think about using something similar in Kafka.
>>
>> Gwen
>>


Re: Review Request 29211: Patch for KAFKA-1824

2014-12-18 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29211/#review65574
---

Ship it!


Ship It!

- Neha Narkhede


On Dec. 18, 2014, 7:08 p.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29211/
> ---
> 
> (Updated Dec. 18, 2014, 7:08 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1824
> https://issues.apache.org/jira/browse/KAFKA-1824
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1824 - fix ConsoleProducer so parse.key and key.separator will work 
> again
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
> 1061cc74fac69693836f1e75add06b09d459a764 
> 
> Diff: https://reviews.apache.org/r/29211/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>



[jira] [Updated] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1824:
-
Fix Version/s: 0.8.3

> in ConsoleProducer - properties key.separator and parse.key no longer work
> --
>
> Key: KAFKA-1824
> URL: https://issues.apache.org/jira/browse/KAFKA-1824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1824.patch
>
>
> Looks like the change in kafka-1711 breaks them accidentally.
> reader.init is called with readerProps which is initialized with commandline 
> properties as defaults.
> the problem is that reader.init checks:
> if(props.containsKey("parse.key"))
> and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1824:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the patch. Pushed to trunk.

> in ConsoleProducer - properties key.separator and parse.key no longer work
> --
>
> Key: KAFKA-1824
> URL: https://issues.apache.org/jira/browse/KAFKA-1824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1824.patch
>
>
> Looks like the change in kafka-1711 breaks them accidentally.
> reader.init is called with readerProps which is initialized with commandline 
> properties as defaults.
> the problem is that reader.init checks:
> if(props.containsKey("parse.key"))
> and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252706#comment-14252706
 ] 

Gwen Shapira commented on KAFKA-1824:
-

Thanks for the quick review [~nehanarkhede]!

> in ConsoleProducer - properties key.separator and parse.key no longer work
> --
>
> Key: KAFKA-1824
> URL: https://issues.apache.org/jira/browse/KAFKA-1824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1824.patch
>
>
> Looks like the change in kafka-1711 breaks them accidentally.
> reader.init is called with readerProps which is initialized with commandline 
> properties as defaults.
> the problem is that reader.init checks:
> if(props.containsKey("parse.key"))
> and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29210: Patch for KAFKA-1819

2014-12-18 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29210/#review65578
---


Overall, looks good. Have one suggestion below.


core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala


Since the bug is about entries related to deleted topics, it will be good 
to add that verification step to all tests in DeleteTopicTest.


- Neha Narkhede


On Dec. 18, 2014, 6:59 p.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29210/
> ---
> 
> (Updated Dec. 18, 2014, 6:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1819
> https://issues.apache.org/jira/browse/KAFKA-1819
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> added locking
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8fcb843c80eec3cf3c931df6bb472c019305253 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala 
> bcfef77ed53f94017c06a884e4db14531774a0a2 
>   core/src/main/scala/kafka/log/LogManager.scala 
> 4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
> 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc 
> 
> Diff: https://reviews.apache.org/r/29210/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>



Build failed in Jenkins: Kafka-trunk #357

2014-12-18 Thread Apache Jenkins Server
See 

Changes:

[neha.narkhede] KAFKA-1824 - fix ConsoleProducer so parse.key and key.separator 
will work again; reviewed by Neha Narkhede

--
[...truncated 1707 lines...]
kafka.admin.AdminTest > testShutdownBroker PASSED

kafka.admin.AdminTest > testTopicConfigChange PASSED

kafka.admin.AddPartitionsTest > testTopicDoesNotExist PASSED

kafka.admin.AddPartitionsTest > testWrongReplicaCount PASSED

kafka.admin.AddPartitionsTest > testIncrementPartitions PASSED

kafka.admin.AddPartitionsTest > testManualAssignmentOfReplicas PASSED

kafka.admin.AddPartitionsTest > testReplicaPlacement PASSED

kafka.admin.DeleteTopicTest > testDeleteTopicWithAllAliveReplicas PASSED

kafka.admin.DeleteTopicTest > testResumeDeleteTopicWithRecoveredFollower PASSED

kafka.admin.DeleteTopicTest > testResumeDeleteTopicOnControllerFailover PASSED

kafka.admin.DeleteTopicTest > testPartitionReassignmentDuringDeleteTopic PASSED

kafka.admin.DeleteTopicTest > testDeleteTopicDuringAddPartition PASSED

kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic PASSED

kafka.admin.DeleteTopicTest > testRecreateTopicAfterDeletion PASSED

kafka.admin.DeleteTopicTest > testDeleteNonExistingTopic PASSED

kafka.api.RequestResponseSerializationTest > 
testSerializationAndDeserialization PASSED

kafka.api.ApiUtilsTest > testShortStringNonASCII PASSED

kafka.api.ApiUtilsTest > testShortStringASCII PASSED

kafka.api.test.ProducerSendTest > testSendOffset PASSED

kafka.api.test.ProducerSendTest > testClose PASSED

kafka.api.test.ProducerSendTest > testSendToPartition PASSED

kafka.api.test.ProducerSendTest > testAutoCreateTopic PASSED

kafka.api.test.ProducerFailureHandlingTest > testNotEnoughReplicas PASSED

kafka.api.test.ProducerFailureHandlingTest > testInvalidPartition PASSED

kafka.api.test.ProducerFailureHandlingTest > testTooLargeRecordWithAckZero 
PASSED

kafka.api.test.ProducerFailureHandlingTest > testTooLargeRecordWithAckOne PASSED

kafka.api.test.ProducerFailureHandlingTest > testNonExistentTopic PASSED

kafka.api.test.ProducerFailureHandlingTest > testWrongBrokerList PASSED

kafka.api.test.ProducerFailureHandlingTest > testNoResponse PASSED

kafka.api.test.ProducerFailureHandlingTest > testSendAfterClosed PASSED

kafka.api.test.ProducerFailureHandlingTest > testBrokerFailure PASSED

kafka.api.test.ProducerFailureHandlingTest > testCannotSendToInternalTopic 
PASSED

kafka.api.test.ProducerFailureHandlingTest > 
testNotEnoughReplicasAfterBrokerShutdown PASSED

kafka.api.test.ProducerCompressionTest > testCompression[0] PASSED

kafka.api.test.ProducerCompressionTest > testCompression[1] PASSED

kafka.api.test.ProducerCompressionTest > testCompression[2] PASSED

kafka.api.test.ProducerCompressionTest > testCompression[3] PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionEnabled 
PASSED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionDisabled 
FAILED
kafka.common.KafkaException: Socket server failed to bind to 
localhost:53018: Address already in use.
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:260)
at kafka.network.Acceptor.(SocketServer.scala:205)
at kafka.network.SocketServer.startup(SocketServer.scala:86)
at kafka.server.KafkaServer.startup(KafkaServer.scala:98)
at kafka.utils.TestUtils$.createServer(TestUtils.scala:134)
at 
kafka.integration.UncleanLeaderElectionTest$$anonfun$startBrokers$1.apply(UncleanLeaderElectionTest.scala:95)
at 
kafka.integration.UncleanLeaderElectionTest$$anonfun$startBrokers$1.apply(UncleanLeaderElectionTest.scala:93)
at scala.collection.immutable.List.foreach(List.scala:383)
at 
kafka.integration.UncleanLeaderElectionTest.startBrokers(UncleanLeaderElectionTest.scala:93)
at 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabled(UncleanLeaderElectionTest.scala:115)

Caused by:
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bi

Re: API Annotations

2014-12-18 Thread Jay Kreps
Yes, exactly--the goal is to just publish the javadoc on the public classes
and all methods on a public class are considered public. Right now we
really haven't done proper docs for the new producer--no javadoc and no
examples either. This needs to get done before 0.8.2.

The idea of an @Experimental annotation sounds like a good way to introduce
features we aren't sure about. I wouldn't want us to abuse it since in
practice I suspect people expect binary compatability and often breaks are
transitive (i.e. some common library in you stack uses an experimental
feature then you can't upgrade Kafka, and the person impacted won't know
that the api had an annotation).

-Jay

On Thu, Dec 18, 2014 at 4:43 PM, Gwen Shapira  wrote:
>
> Thanks for the comments, Joe and Jay.
>
> Doing the public / private designation at package level is definitely
> easier than going method-by-method :)
>
> If I get your comments right, the idea is to publish java docs for the
> public apis on our website (or archive.apache.org) and not publish
> java docs for the private apis.
>
> The pros to that approach is that it doesn't require going through
> every single public method in our code-base and marking it private or
> public. I'm definitely not looking forward to that patch.
> The cons is that open source developers don't necessarily assume that
> lack of java docs imply that a method is not public... they usually
> assume that the project was sloppy about its docs.
>
> I think the pros win here :)
>
> Regarding the location of the javadocs - the website, via SVN is
> traditional, and probably more google-able. Updating the SVN shouldn't
> be too-painful to do as part of the release process?  Right now
> googling for "kafka javadoc" does not lead the to latest 0.8.2 docs.
> I'm getting github, stackoverflow and Neha's private apache page.
>
> Then there's the other attribute - which API is stable.
> Currently Kafka pretty much assumes an API is stable from the moment
> its committed to the trunk, I believe. It may make the development
> process slightly easier if we can mark new APIs as "evolving" until we
> are certain we are happy with them. It will allow us to iterate faster
> and let users try out newer APIs.
>
> Do you think adding this will be helpful?
>
>
>
> On Tue, Dec 16, 2014 at 11:24 AM, Jay Kreps  wrote:
> > Hey Gwen,
> >
> > We discussed this a bit about this when starting on the new clients.
> >
> > We were super sloppy about this in initial Kafka development--single jar,
> > no real differentiation between public and private apis.
> >
> > The plan was something like the following:
> > 1. Start to consider this with the new clients.
> > 2. Do the public/private designation at the package level. The public
> > packages are o.a.k.common, o.a.k.errors, o.a.k.producer, o.a.k.consumer,
> > o.a.k.tools. This makes javadoc and things like that easier, and it makes
> > it easy to see at a glance all the public classes. It would be even
> better
> > to enforce this in the build if that is possible (i.e. no class from a
> > non-public package is leaked) but we haven't done this. This approach
> > obviously wasn't possible in Hadoop since they started without a clear
> > delineation as we did in the original scala code.
> >
> > Thoughts?
> >
> > -Jay
> >
> >
> > On Tue, Dec 16, 2014 at 10:04 AM, Gwen Shapira 
> > wrote:
> >>
> >> Hi,
> >>
> >> Kafka has public APIs in Java and Scala, intended for use by external
> >> developers.
> >> In addition, Kafka also exposes many public methods that are intended
> >> to use within Kafka but are not intended to be called by external
> >> developers.
> >> Also, some of the external APIs are less stable than others (the new
> >> producer for example).
> >>
> >> In Hadoop we have a similar situation, and to avoid misunderstandings
> >> or miscommunications on which APIs are external and which are stable,
> >> we use annotations to communicate this information.
> >> We find it very useful in preventing our customers from accidentally
> >> getting into trouble by using internal methods or unstable APIs.
> >>
> >> Here are the annotations Hadoop uses:
> >>
> >>
> https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceStability.html
> >>
> >>
> https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceAudience.html
> >>
> >> I'm wondering what others think about using something similar in Kafka.
> >>
> >> Gwen
> >>
>


Review Request 29231: Patch for KAFKA-1824

2014-12-18 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29231/
---

Review request for kafka.


Bugs: KAFKA-1824
https://issues.apache.org/jira/browse/KAFKA-1824


Repository: kafka


Description
---

fixing accidental return of "WARN Property topic is not valid"


Diffs
-

  core/src/main/scala/kafka/tools/ConsoleProducer.scala 
1061cc74fac69693836f1e75add06b09d459a764 

Diff: https://reviews.apache.org/r/29231/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1824:

Attachment: KAFKA-1824.patch

> in ConsoleProducer - properties key.separator and parse.key no longer work
> --
>
> Key: KAFKA-1824
> URL: https://issues.apache.org/jira/browse/KAFKA-1824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1824.patch, KAFKA-1824.patch
>
>
> Looks like the change in kafka-1711 breaks them accidentally.
> reader.init is called with readerProps which is initialized with commandline 
> properties as defaults.
> the problem is that reader.init checks:
> if(props.containsKey("parse.key"))
> and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-766) Isr shrink/expand check is fragile

2014-12-18 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-766:

Fix Version/s: 0.8.3

> Isr shrink/expand check is fragile
> --
>
> Key: KAFKA-766
> URL: https://issues.apache.org/jira/browse/KAFKA-766
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.0
>Reporter: Sriram Subramanian
>Assignee: Neha Narkhede
> Fix For: 0.8.3
>
>
> Currently the isr check is coupled tightly with the produce batch size. For 
> example, if the producer batch size is 1 messages and isr check is 4000 
> messages, we continuously oscillate between shrinking isr and expanding isr 
> every second. This is because a single produce request throws the replica out 
> of the isr. This results in hundreds of calls to ZK (we still dont have multi 
> write). This can be alleviated by making the producer batch size smaller than 
> the isr check size. 
> Going forward, we should try to not have this coupling. It is worth 
> investigating if we can make the check more robust under such scenarios. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252774#comment-14252774
 ] 

Gwen Shapira commented on KAFKA-1824:
-

Created reviewboard https://reviews.apache.org/r/29231/diff/
 against branch trunk

> in ConsoleProducer - properties key.separator and parse.key no longer work
> --
>
> Key: KAFKA-1824
> URL: https://issues.apache.org/jira/browse/KAFKA-1824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1824.patch, KAFKA-1824.patch
>
>
> Looks like the change in kafka-1711 breaks them accidentally.
> reader.init is called with readerProps which is initialized with commandline 
> properties as defaults.
> the problem is that reader.init checks:
> if(props.containsKey("parse.key"))
> and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252777#comment-14252777
 ] 

Gwen Shapira commented on KAFKA-1824:
-

My apologies! 

Additional round of tests revealed that my first patch accidentally breaks what 
was fixed in KAFKA-1711 - i.e. the "WARN Property topic is not valid" message 
returned. Those VerifiableProperties are tricky!

I added a new patch, on top of the one already committed that removes the extra 
properties before creating the producer and eliminates the WARN messages.

> in ConsoleProducer - properties key.separator and parse.key no longer work
> --
>
> Key: KAFKA-1824
> URL: https://issues.apache.org/jira/browse/KAFKA-1824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1824.patch, KAFKA-1824.patch
>
>
> Looks like the change in kafka-1711 breaks them accidentally.
> reader.init is called with readerProps which is initialized with commandline 
> properties as defaults.
> the problem is that reader.init checks:
> if(props.containsKey("parse.key"))
> and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252805#comment-14252805
 ] 

Neha Narkhede commented on KAFKA-1824:
--

[~gwenshap] Good catch. I wonder if we should just add tests given the tricky 
logic involved.

> in ConsoleProducer - properties key.separator and parse.key no longer work
> --
>
> Key: KAFKA-1824
> URL: https://issues.apache.org/jira/browse/KAFKA-1824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1824.patch, KAFKA-1824.patch
>
>
> Looks like the change in kafka-1711 breaks them accidentally.
> reader.init is called with readerProps which is initialized with commandline 
> properties as defaults.
> the problem is that reader.init checks:
> if(props.containsKey("parse.key"))
> and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1806) broker can still expose uncommitted data to a consumer

2014-12-18 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252807#comment-14252807
 ] 

Neha Narkhede commented on KAFKA-1806:
--

[~lokeshbirla] I was looking for steps to reproduce this. So if I download 
0.8.2-beta and go through your steps, I should be able to see the same error 
you see.

> broker can still expose uncommitted data to a consumer
> --
>
> Key: KAFKA-1806
> URL: https://issues.apache.org/jira/browse/KAFKA-1806
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: lokesh Birla
>Assignee: Neha Narkhede
>
> Although following issue: https://issues.apache.org/jira/browse/KAFKA-727
> is marked fixed but I still see this issue in 0.8.1.1. I am able to 
> reproducer the issue consistently. 
> [2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch 
> request for partition [mmetopic4,2] offset 1940029 from consumer with 
> correlation id 21 (kafka.server.Kaf
> kaApis)
> java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
> (1818353) less than the start offset (1940029).
> at kafka.log.LogSegment.read(LogSegment.scala:136)
> at kafka.log.Log.read(Log.scala:386)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:119)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> at scala.collection.immutable.Map$Map1.map(Map.scala:107)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> at 
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
> at 
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
> at 
> kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29210: Patch for KAFKA-1819

2014-12-18 Thread Gwen Shapira


> On Dec. 19, 2014, 1:23 a.m., Neha Narkhede wrote:
> > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala, line 231
> > 
> >
> > Since the bug is about entries related to deleted topics, it will be 
> > good to add that verification step to all tests in DeleteTopicTest.

I added the "delete.topic.enable" to createTestTopicAndCluster(), which is used 
by all tests in DeleteTopicTest. This exercises the part of the code-path where 
we abort and checkpoint the cleaner.

However, this does not provide any verification that the cleaner checkpoint 
file was correctly updated. I wanted to add that, but it looks like getting the 
content of the checkpoint file from the information available at the delete 
topic tests will require quite intrusive modification to unrelated parts of the 
code (the cleaner being private to the log, for example). So I left this part 
out, and did some manual testing instead.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29210/#review65578
---


On Dec. 18, 2014, 6:59 p.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29210/
> ---
> 
> (Updated Dec. 18, 2014, 6:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1819
> https://issues.apache.org/jira/browse/KAFKA-1819
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> added locking
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8fcb843c80eec3cf3c931df6bb472c019305253 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala 
> bcfef77ed53f94017c06a884e4db14531774a0a2 
>   core/src/main/scala/kafka/log/LogManager.scala 
> 4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
> 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc 
> 
> Diff: https://reviews.apache.org/r/29210/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>



[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252809#comment-14252809
 ] 

Gwen Shapira commented on KAFKA-1824:
-

Yes, lets do that. Will help us avoid another round of the break-and-fix cycle.

Hold off on this patch and I'll provide tests in a day or two.

> in ConsoleProducer - properties key.separator and parse.key no longer work
> --
>
> Key: KAFKA-1824
> URL: https://issues.apache.org/jira/browse/KAFKA-1824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
> Attachments: KAFKA-1824.patch, KAFKA-1824.patch
>
>
> Looks like the change in kafka-1711 breaks them accidentally.
> reader.init is called with readerProps which is initialized with commandline 
> properties as defaults.
> the problem is that reader.init checks:
> if(props.containsKey("parse.key"))
> and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29231: Patch for KAFKA-1824

2014-12-18 Thread Eric Olander

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29231/#review65582
---



core/src/main/scala/kafka/tools/ConsoleProducer.scala


remove() returns the value assigned to the key being removed, so you could 
simply do:

topic = props.remove("topic")

instead of the getProperty() and remove()


- Eric Olander


On Dec. 19, 2014, 1:56 a.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29231/
> ---
> 
> (Updated Dec. 19, 2014, 1:56 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1824
> https://issues.apache.org/jira/browse/KAFKA-1824
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> fixing accidental return of "WARN Property topic is not valid"
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
> 1061cc74fac69693836f1e75add06b09d459a764 
> 
> Diff: https://reviews.apache.org/r/29231/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 2:48 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1650:

Attachment: KAFKA-1650_2014-12-18_18:48:18.patch

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
> KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
> KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252817#comment-14252817
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

Updated reviewboard https://reviews.apache.org/r/25995/diff/
 against branch origin/trunk

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
> KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
> KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin


> On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
> > 
> >
> > Should this be fatal? i.e., fatal is normally used before exiting 
> > (abnormally). WARN would be more suitable.
> > 
> > I don't think it makes sense to "not advance" the offset here 
> > especially if you will still keep sending messages. I think you need to 
> > remove it from the unacked offset list. E.g., you may configure your mirror 
> > maker producer to only few retries (in which case you are okay with data 
> > loss). In this scenario you should just let the error go and allow the 
> > mirror maker to proceed normally.
> > 
> > If someone wants zero data loss the MM should be configured with 
> > required acks -1 and infinite retries.
> > 
> > Maybe I'm misunderstanding what zero data loss really means - can you 
> > clarify? (Especially if someone configures the producer with acks (say) one 
> > and limited retries)

That makes sense. So I've changed the code to work in the following way:

1. If retries is set to infinite, the producer will keep retrying and the 
entire pipeline will finally be blocked. (This is strict data-loss free.)
2. If retries are not set to infinite, after the retries are exhausted, it will 
remove the offset from unacked list and record it as a skippedUnackedMessage, 
which is an exposed metric.


> On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 668
> > 
> >
> > Should we make this a generic DoublyLinkedList data structure in utils 
> > or some other suitable place and unit test it as well?

I'm not sure if this is generic enough to put it into utils. This raw linked 
list seems only serve the purpose of removing/inserting node in the middle in 
O(1), which cannot be achieved in java linkedlist. Maybe we can keep it here 
now. And if later on there are some other use cases, we can refactor the code 
to create a raw LinkedList in utils and use that one. What do you think?


> On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/utils/Utils.scala, line 441
> > 
> >
> > I don't think this is necessary right? i.e., args.map won't throw an 
> > NPE if you don't provide any additional arguments.
> > 
> > scala> def f(args: Int*) {println(args.size)}
> > f: (args: Int*)Unit
> > 
> > scala> f(1,2)
> > 2
> > 
> > scala> f()
> > 0

Yes, it seems to be working.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65477
---


On Dec. 19, 2014, 2:48 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> ---
> 
> (Updated Dec. 19, 2014, 2:48 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
> https://issues.apache.org/jira/browse/KAFKA-1650
> https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be 

Re: Review Request 29231: Patch for KAFKA-1824

2014-12-18 Thread Gwen Shapira


> On Dec. 19, 2014, 2:36 a.m., Eric Olander wrote:
> > core/src/main/scala/kafka/tools/ConsoleProducer.scala, line 269
> > 
> >
> > remove() returns the value assigned to the key being removed, so you 
> > could simply do:
> > 
> > topic = props.remove("topic")
> > 
> > instead of the getProperty() and remove()

Will do. Thanks for the tip, Eric :)


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29231/#review65582
---


On Dec. 19, 2014, 1:56 a.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29231/
> ---
> 
> (Updated Dec. 19, 2014, 1:56 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1824
> https://issues.apache.org/jira/browse/KAFKA-1824
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> fixing accidental return of "WARN Property topic is not valid"
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
> 1061cc74fac69693836f1e75add06b09d459a764 
> 
> Diff: https://reviews.apache.org/r/29231/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 6:17 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Fix a bug in metric.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1650:

Attachment: KAFKA-1650_2014-12-18_22:17:08.patch

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
> KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
> KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
> KAFKA-1650_2014-12-18_22:17:08.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14253019#comment-14253019
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

Updated reviewboard https://reviews.apache.org/r/25995/diff/
 against branch origin/trunk

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
> KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
> KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
> KAFKA-1650_2014-12-18_22:17:08.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 6:53 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Fix a bug in metric.


Missed some change in the prvevious patch submission, submit patch again.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14253071#comment-14253071
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

Updated reviewboard https://reviews.apache.org/r/25995/diff/
 against branch origin/trunk

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
> KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
> KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
> KAFKA-1650_2014-12-18_22:17:08.patch, KAFKA-1650_2014-12-18_22:53:26.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1650:

Attachment: KAFKA-1650_2014-12-18_22:53:26.patch

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
> KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
> KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
> KAFKA-1650_2014-12-18_22:17:08.patch, KAFKA-1650_2014-12-18_22:53:26.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 7:41 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Fix a bug in metric.


Missed some change in the prvevious patch submission, submit patch again.


change offset commit thread to use scheduler.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489
> > 
> >
> > Why not use KafkaScheduler for the offset commit task?
> 
> Jiangjie Qin wrote:
> Haven't thought that before... But it looks that we need to do some more 
> handling when something wrong happen in the offset commit threads. The 
> KafkaScheduler code seems not do so.
> 
> Joel Koshy wrote:
> So you can make the task itself catch throwables. So it would look 
> something like this:
> 
> scheduler.schedule("mirrorMakerOffsetsCommiter", commitTask, ...)
> 
> And in commitTask:
> try {
>   commitOffsets()
> }
> catch {
>   case t: Throwable =>
> // handle
> }
> 
> That said, I don't think connector.commitOffsets will throw anything - 
> since we catch all throwables there.
> 
> The only additional detail is that after you shutdown the scheduler you 
> will need to call commitOffsets() manually one last time.

I changed the code to use scheduler, it seems that the try catch block only 
handles the kafka based offset commit and it did not include 
ensureOffsetsManager connected. Also, theoretically OOM could be thrown when 
create a super big offsetmap, so I kept the catch block to make mirror maker 
exit in that case.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65306
---


On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> ---
> 
> (Updated Dec. 19, 2014, 7:41 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
> https://issues.apache.org/jira/browse/KAFKA-1650
> https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Incorporated Jun's comments
> 
> 
> Incorporated Jun's comments and rebased on trunk
> 
> 
> Rebased on current trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments.
> 
> 
> Incorporated Joel's comments
> 
> 
> Incorporated Joel's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Joel's comments
> 
> 
> Fix a bug in metric.
> 
> 
> Missed some change in the prvevious patch submission, submit patch again.
> 
> 
> change offset commit thread to use scheduler.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> e991d2187d03241f639eeaf6769fb59c8c99664c 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9baad34a9793e5067d11289ece2154ba87b388af 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Updated] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1650:

Attachment: KAFKA-1650_2014-12-18_23:41:16.patch

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
> KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
> KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
> KAFKA-1650_2014-12-18_22:17:08.patch, KAFKA-1650_2014-12-18_22:53:26.patch, 
> KAFKA-1650_2014-12-18_23:41:16.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14253103#comment-14253103
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

Updated reviewboard https://reviews.apache.org/r/25995/diff/
 against branch origin/trunk

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
> KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
> KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
> KAFKA-1650_2014-12-18_22:17:08.patch, KAFKA-1650_2014-12-18_22:53:26.patch, 
> KAFKA-1650_2014-12-18_23:41:16.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)