[jira] [Commented] (KAFKA-1903) Zk Expiration causes controller deadlock

2015-01-29 Thread yufeng.chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14296572#comment-14296572
 ] 

yufeng.chen commented on KAFKA-1903:


kafka_2.11-0.8.2-beta doesn't have the issue,  the deleteTopicThread has not 
use the ReentrantLock whith KafkaController together, the deleteTopicThread has 
it's own ReentrantLock. Thanks !

> Zk Expiration causes controller deadlock
> 
>
> Key: KAFKA-1903
> URL: https://issues.apache.org/jira/browse/KAFKA-1903
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1, 0.8.1.1
> Environment: java version "1.7.0_55"
> Java(TM) SE Runtime Environment (build 1.7.0_55-b13)
> Java HotSpot(TM) 64-Bit Server VM (build 24.55-b03, mixed mode)
> kafka_2.9.2-0.8.1
>Reporter: yufeng.chen
>Assignee: Neha Narkhede
>Priority: Critical
>
> when controller encounter a ZK expired, zookeeper node /broker/ids  lost one 
> kafkk controler. If there has three node, e.g. 1 2, 3; and the 1 start 
> delete-topic-method thread. At this time, node 1 will lost. Why? The reason 
> is that: when ZK expiration happened,  the zk-event-thread will call 
> KafkaController.SessionExpirationListener.handleNewSession method. if the 
> zk-event-thread has the controllerContext.controllerLock, will call 
> onControllerResignation->deleteTopicManager.shutdown()->deleteTopicsThread.shutdown().
>  And the delete-topic-thread is working, and await at 
> awaitTopicDeletionNotification() method。 Zk-event-thread call 
> deleteTopicsThread.shutdown() and wait until the run() method execute 
> compelely. Because the zk-event-thread has the lock,  
> "deleteTopicsCond.await()" whill not be really "interruted " . Then 
> zk-event-thread whill pause,  not execute the 
> kafkaHealthcheck->SessionExpireListener.handleNewSession。 The controller will 
> not register again. The jstack log :
> "delete-topics-thread" prio=10 tid=0x7fb0bc21b000 nid=0x2825 waiting on 
> condition [0x7fb0f534a000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe4952da0> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2047)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:333)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> "ZkClient-EventThread-12-10.3.63.8:2181,10.3.63.9:2181" daemon prio=10 
> tid=0x7fb10038e800 nid=0x7d93 waiting on condition [0x7fb0f544a000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe4f4a760> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
>

[jira] [Created] (KAFKA-1907) ZkClient can block controlled shutdown indefinitely

2015-01-29 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-1907:


 Summary: ZkClient can block controlled shutdown indefinitely
 Key: KAFKA-1907
 URL: https://issues.apache.org/jira/browse/KAFKA-1907
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Ewen Cheslack-Postava


There are some calls to ZkClient via ZkUtils in 
KafkaServer.controlledShutdown() that can block indefinitely because they 
internally call waitUntilConnected. The ZkClient API doesn't provide an 
alternative with timeouts, so fixing this will require enforcing timeouts in 
some other way.

This may be a more general issue if there are any non daemon threads that also 
call ZkUtils methods.

Stacktrace showing the issue:

{code}
"Thread-2" prio=10 tid=0xb3305000 nid=0x4758 waiting on condition [0x6ad69000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x70a93368> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:267)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2130)
at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:456)
at kafka.utils.ZkUtils$.getController(ZkUtils.scala:65)
at 
kafka.server.KafkaServer.kafka$server$KafkaServer$$controlledShutdown(KafkaServer.scala:194)
at 
kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$sp(KafkaServer.scala:269)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
at kafka.utils.Logging$class.swallow(Logging.scala:94)
at kafka.utils.Utils$.swallow(Utils.scala:45)
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:269)
at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:42)
at kafka.Kafka$$anon$1.run(Kafka.scala:42)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Cannot stop Kafka server if zookeeper is shutdown first

2015-01-29 Thread Ewen Cheslack-Postava
Looks like a bug to me -- the underlying ZK library wraps a lot of blocking
method implementations with waitUntilConnected() calls without any
timeouts. Ideally we could just add a version of ZkUtils.getController()
with a timeout, but I don't see an easy way to accomplish that with
ZkClient.

There's at least one other call to ZkUtils besides the one in the
stacktrace you gave that would cause the same issue, possibly more that
aren't directly called in that method. One ugly solution would be to use an
extra thread during shutdown to trigger timeouts, but I'd imagine we
probably have other threads that could end up blocking in similar ways.

I filed https://issues.apache.org/jira/browse/KAFKA-1907 to track the issue.


On Mon, Jan 26, 2015 at 6:35 AM, Jaikiran Pai 
wrote:

> The main culprit is this thread which goes into "forever retry connection
> to a closed zookeeper" when I shutdown Kafka (via a Ctrl + C) after
> zookeeper has already been shutdown. I have attached the complete thread
> dump, but I don't know if it will be delivered to the mailing list.
>
> "Thread-2" prio=10 tid=0xb3305000 nid=0x4758 waiting on condition
> [0x6ad69000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x70a93368> (a java.util.concurrent.locks.
> AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkUntil(
> LockSupport.java:267)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2130)
> at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:456)
> at kafka.utils.ZkUtils$.getController(ZkUtils.scala:65)
> at kafka.server.KafkaServer.kafka$server$KafkaServer$$
> controlledShutdown(KafkaServer.scala:194)
> at kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$
> sp(KafkaServer.scala:269)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.Utils$.swallow(Utils.scala:45)
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:269)
> at kafka.server.KafkaServerStartable.shutdown(
> KafkaServerStartable.scala:42)
> at kafka.Kafka$$anon$1.run(Kafka.scala:42)
>
> -Jaikiran
>
>
> On Monday 26 January 2015 05:46 AM, Neha Narkhede wrote:
>
>> For a clean shutdown, the broker tries to talk to the controller and also
>> issues reads to zookeeper. Possibly that is where it tries to reconnect to
>> zk. It will help to look at the thread dump.
>>
>> Thanks
>> Neha
>>
>> On Fri, Jan 23, 2015 at 8:53 PM, Jaikiran Pai 
>> wrote:
>>
>>  I was just playing around with the RC2 of 0.8.2 and noticed that if I
>>> shutdown zookeeper first I can't shutdown Kafka server at all since it
>>> goes
>>> into a never ending attempt to reconnect with zookeeper. I had to kill
>>> the
>>> Kafka process to stop it. I tried it against trunk too and there too I
>>> see
>>> the same issue. Should I file a JIRA for this and see if I can come up
>>> with
>>> a patch?
>>>
>>> FWIW, here's the unending (and IMO too frequent) attempts at trying to
>>> reconnect. I've a thread dump too which shows that the other thread which
>>> is trying to complete a controlled shutdown of Kafka is blocked forever
>>> for
>>> the zookeeper to be up. I can attach it to the JIRA.
>>>
>>> 2015-01-24 10:15:46,278] WARN Session 0x14b1a413680 for server null,
>>> unexpected error, closing socket connection and attempting reconnect
>>> (org.apache.zookeeper.ClientCnxn)
>>> java.net.ConnectException: Connection refused
>>>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>  at sun.nio.ch.SocketChannelImpl.finishConnect(
>>> SocketChannelImpl.java:739)
>>>  at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(
>>> ClientCnxnSocketNIO.java:361)
>>>  at org.apache.zookeeper.ClientCnxn$SendThread.run(
>>> ClientCnxn.java:1081)
>>> [2015-01-24 10:15:47,437] INFO Opening socket connection to server
>>> localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL
>>> (unknown error) (org.apache.zookeeper.ClientCnxn)
>>> [2015-01-24 10:15:47,438] WARN Session 0x14b1a413680 for server null,
>>> unexpected error, closing socket connection and attempting reconnect
>>> (org.apache.zookeeper.ClientCnxn)
>>> java.net.ConnectException: Connection refused
>>>  at sun.nio.ch.SocketChannelImpl.che

Re: [VOTE] 0.8.2.0 Candidate 3

2015-01-29 Thread Magnus Edenhill
+1 on librdkafka interop

Minor nitpick:
 KAFKA-1781 (state required gradle version in README)  is included in the
Release notes but is not actually fixed


2015-01-29 6:22 GMT+01:00 Jun Rao :

> This is the third candidate for release of Apache Kafka 0.8.2.0.
>
> Release Notes for the 0.8.2.0 release
>
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
>
> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
> (SHA256)
> checksum.
>
> * Release artifacts to be voted upon (source and binary):
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
>
> * Maven artifacts to be voted upon prior to release:
> https://repository.apache.org/content/groups/staging/
>
> * scala-doc
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
>
> * java-doc
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
>
> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
>
> /***
>
> Thanks,
>
> Jun
>


Re: Changing the default Kafka data log directory (was Re: What to do when file.rename fails?)

2015-01-29 Thread Darion Yaphet
It's seems a good idea .
use /tmp/kafka-log as default dir maybe delete by other people and very
unsafe .

2015-01-29 14:26 GMT+08:00 Jaikiran Pai :

> I have created a JIRA for this proposed change https://issues.apache.org/
> jira/browse/KAFKA-1906 and uploaded a patch for review
> https://reviews.apache.org/r/30403/
>
> -Jaikiran
>
> On Tuesday 27 January 2015 02:36 AM, Jay Kreps wrote:
>
>> Having a relative path and keeping data under /data in the kafka distro
>> would make sense. This would require some reworking of the shell scripts,
>> though, as I think right now you an actually run Kafka from any directory
>> and the cwd of the process will be whatever directory you start from. If
>> we
>> have a relative path in the config then the working directory will HAVE to
>> be the kafka directory. This works for the simple download case but may
>> making some packaging stuff harder for other use cases.
>>
>> -Jay
>>
>> On Mon, Jan 26, 2015 at 5:54 AM, Jaikiran Pai 
>> wrote:
>>
>>  Having looked at the logs the user posted, I don't think this specific
>>> issue has to do with /tmp path.
>>>
>>> However, now that the /tmp path is being discussed, I think it's a good
>>> idea that we default the Kafka logs to a certain folder. As Jay notes, it
>>> makes it very easy to just download and start the servers without having
>>> to
>>> fiddle with the configs when you are just starting out. Having said that,
>>> when I started out with Kafka, I found /tmp to be a odd place to default
>>> the path to. I expected them to be defaulted to a folder within the Kafka
>>> install. Somewhere like KAFKA_INSTALL_FOLDER/data/kafka-logs/ folder. Is
>>> that something we should do?
>>>
>>> -Jaikiran
>>>
>>> On Monday 26 January 2015 12:23 AM, Jay Kreps wrote:
>>>
>>>  Hmm, but I don't think tmp gets cleaned while the server is running...

 The reason for using tmp was because we don't know which directory they
 will use and we don't want them to have to edit configuration for the
 simple "out of the box" getting started tutorial. I actually do think
 that
 is important. Maybe an intermediate step we could do is just call out
 this
 setting in the quickstart so people know where data is going and know
 they
 need to configure it later...

 -Jay

 On Sun, Jan 25, 2015 at 9:32 AM, Joe Stein 
 wrote:

   This feels like another type of symptom from people using /tmp/ for
 their

> logs.  Perosnally, I would rather use /mnt/data or something and if
> that
> doesn't exist on their machine we can exception, or no default and
> force
> set it.
>
> /***
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop
> /
> On Jan 25, 2015 11:37 AM, "Jay Kreps"  wrote:
>
>   I think you are right, good catch. It could be that this user deleted
>
>> the
>> files manually, but I wonder if there isn't some way that is a Kafka
>> bug--e.g. if multiple types of retention policies kick in at the same
>>
>>  time
>
>  do we synchronize that properly?
>>
>> -Jay
>>
>> On Sat, Jan 24, 2015 at 9:26 PM, Jaikiran Pai <
>> jai.forums2...@gmail.com
>> wrote:
>>
>>   Hi Jay,
>>
>>> I spent some more time over this today and went back to the original
>>> thread which brought up the issue with file leaks [1]. I think that
>>>
>>>  output
>>
>>  of lsof in that logs has a very important hint:
>>>
>>> /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_
>>> topic_ypgsearch_yellowpageV2-0/68818668.log (deleted)
>>> java
>>> 8446 root 725u REG 253,2 536910838 26087364
>>>
>>> /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_
>>> topic_ypgsearch_yellowpageV2-0/69457098.log (deleted)
>>> java
>>> 8446 root 726u REG 253,2 536917902 26087368
>>>
>>> Notice the "(deleted)" text in that output. The last time I looked at
>>>
>>>  that
>>
>>  output, I thought it was the user who had added that "deleted" text
>>> to
>>>
>>>  help
>>
>>  us understand that problem. But today I read up on the output format
>>> of
>>> lsof and it turns out that it's lsof which itself adds that hint
>>>
>>>  whenever a
>>
>>  file has already been deleted possibly by a different process but
>>> some
>>> other process is still holding on to open resources of that (deleted)
>>>
>>>  file
>>
>>  [2].
>>>
>>> So in the context of the issue that we are discussing and the way
>>> Kafka
>>> deals with async deletes (i.e. first by attempting a rename of the
>>> log/index files), I think this all makes sense now. So what I think
>>> i

Re: Review Request 30403: Patch for KAFKA-1906

2015-01-29 Thread Jay Kreps


> On Jan. 29, 2015, 6:50 a.m., Gwen Shapira wrote:
> > We added --override option to KafkaServer that allows overriding default 
> > configuration from commandline.
> > I believe that just changing the shell script to include --override 
> > log.dir=${KAFKA_HOME}/data 
> > may be enough?
> > 
> > overriding configuration from server.properties in code can be very 
> > unintuitive.
> 
> Jaikiran Pai wrote:
> That sounds a good idea. I wasn't aware of the --override option. I'll 
> give that a try and if it works then the change will be limited to just the 
> scripts.
> 
> Jaikiran Pai wrote:
> Hi Gwen,
> 
> I had a look at the JIRA https://issues.apache.org/jira/browse/KAFKA-1654 
> which added support for --override and also the purpose of that option. From 
> what I see it won't be useful in this case, because in this current task, we 
> don't want to override a value that has been explicitly set (via 
> server.properties for example). Instead we want to handle a case where no 
> explicit value is specified for the data log directory and in such cases 
> default it to a path which resides under the Kafka install directory.
> 
> If we use the --override option in our (startup) scripts to set 
> log.dir=${KAFKA_HOME}/data, we will end up forcing this value as the log.dir 
> even when the user has intentionally specified a different path for the data 
> logs.
> 
> Let me know if I misunderstood your suggestion.

I think you are right that --override won't work but maybe this is still a good 
suggestion?

Something seems odd about force overriding the working directory of the process 
just to set the log directory.

Another option might be to add --default. This would work like --override but 
would provide a default value only if none is specified. I think this might be 
possible because the java.util.Properties we use for config supports a 
hierarchy of defaults. E.g. you can say new Properties(defaultProperties). Not 
sure if this is better or worse.

Thoughts?


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30403/#review70168
---


On Jan. 29, 2015, 6:24 a.m., Jaikiran Pai wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30403/
> ---
> 
> (Updated Jan. 29, 2015, 6:24 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1906
> https://issues.apache.org/jira/browse/KAFKA-1906
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1906 Default the Kafka data log directory to 
> $KAFKA_HOME/data/kafka-logs directory, where KAFKA_HOME is the Kafka 
> installation directory
> 
> 
> Diffs
> -
> 
>   bin/kafka-run-class.sh 881f578a8f5c796fe23415b978c1ad35869af76e 
>   bin/windows/kafka-run-class.bat 9df3d2b45236b4f06d55a89c84afcf0ab9f5d0f2 
>   config/server.properties 1614260b71a658b405bb24157c8f12b1f1031aa5 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 6d74983472249eac808d361344c58cc2858ec971 
>   core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
> 82dce80d553957d8b5776a9e140c346d4e07f766 
> 
> Diff: https://reviews.apache.org/r/30403/diff/
> 
> 
> Testing
> ---
> 
> The change here involves updating the Kafka scripts (for Windows and * nix) 
> to infer and setup KAFKA_HOME environment variable. This value is then used 
> by the KafkaConfig to decide what path to default to for the Kafka data logs, 
> in the absence of any explicitly set log.dirs (or log.dir) properties.
> 
> Care has been taken to ensure that other mechanism which might presently be 
> bypassing the Kafka scripts, will still continue to function, since in the 
> absence of KAFKA_HOME environment property value, we fall back to 
> /tmp/kafka-logs (the present default) as the default data log directory
> 
> Existing tests have been run to ensure that this change maintains backward 
> compatibility (i.e. doesn't fail when KAFKA_HOME isn't available/set) and 2 
> new test methods have been added to the KafkaConfigTest to ensure that this 
> change works.
> 
> Although the change has been made to both .sh and .bat files, to support 
> this, I haven't actually tested this change on a Windows OS and would 
> appreciate if someone can test this there and let me know if they run into 
> any issues.
> 
> 
> Thanks,
> 
> Jaikiran Pai
> 
>



Re: Review Request 30403: Patch for KAFKA-1906

2015-01-29 Thread Jaikiran Pai


> On Jan. 29, 2015, 6:50 a.m., Gwen Shapira wrote:
> > We added --override option to KafkaServer that allows overriding default 
> > configuration from commandline.
> > I believe that just changing the shell script to include --override 
> > log.dir=${KAFKA_HOME}/data 
> > may be enough?
> > 
> > overriding configuration from server.properties in code can be very 
> > unintuitive.
> 
> Jaikiran Pai wrote:
> That sounds a good idea. I wasn't aware of the --override option. I'll 
> give that a try and if it works then the change will be limited to just the 
> scripts.
> 
> Jaikiran Pai wrote:
> Hi Gwen,
> 
> I had a look at the JIRA https://issues.apache.org/jira/browse/KAFKA-1654 
> which added support for --override and also the purpose of that option. From 
> what I see it won't be useful in this case, because in this current task, we 
> don't want to override a value that has been explicitly set (via 
> server.properties for example). Instead we want to handle a case where no 
> explicit value is specified for the data log directory and in such cases 
> default it to a path which resides under the Kafka install directory.
> 
> If we use the --override option in our (startup) scripts to set 
> log.dir=${KAFKA_HOME}/data, we will end up forcing this value as the log.dir 
> even when the user has intentionally specified a different path for the data 
> logs.
> 
> Let me know if I misunderstood your suggestion.
> 
> Jay Kreps wrote:
> I think you are right that --override won't work but maybe this is still 
> a good suggestion?
> 
> Something seems odd about force overriding the working directory of the 
> process just to set the log directory.
> 
> Another option might be to add --default. This would work like --override 
> but would provide a default value only if none is specified. I think this 
> might be possible because the java.util.Properties we use for config supports 
> a hierarchy of defaults. E.g. you can say new Properties(defaultProperties). 
> Not sure if this is better or worse.
> 
> Thoughts?

Hi Jay,

> Another option might be to add --default. This would work like --override but 
> would provide a default value only if none is specified. I think this might 
> be possible because the java.util.Properties we use for config supports a 
> hierarchy of defaults. E.g. you can say new Properties(defaultProperties). 
> Not sure if this is better or worse.

I think --default sounds like a good idea which could help us use it for other 
properties too (if we need to). It does look better than the current change 
that I have done, because the Java code then doesn't have to worry about how 
that default value is sourced. We can then just update the scripts to set up 
the default for the log.dir appropriately.

I can work towards adding support for it and will update this patch once it's 
ready.

As for:

> Something seems odd about force overriding the working directory of the 
> process just to set the log directory.

Sorry, I don't understand what you meant there. Is it something about the 
change that was done to the scripts?


- Jaikiran


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30403/#review70168
---


On Jan. 29, 2015, 6:24 a.m., Jaikiran Pai wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30403/
> ---
> 
> (Updated Jan. 29, 2015, 6:24 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1906
> https://issues.apache.org/jira/browse/KAFKA-1906
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1906 Default the Kafka data log directory to 
> $KAFKA_HOME/data/kafka-logs directory, where KAFKA_HOME is the Kafka 
> installation directory
> 
> 
> Diffs
> -
> 
>   bin/kafka-run-class.sh 881f578a8f5c796fe23415b978c1ad35869af76e 
>   bin/windows/kafka-run-class.bat 9df3d2b45236b4f06d55a89c84afcf0ab9f5d0f2 
>   config/server.properties 1614260b71a658b405bb24157c8f12b1f1031aa5 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 6d74983472249eac808d361344c58cc2858ec971 
>   core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
> 82dce80d553957d8b5776a9e140c346d4e07f766 
> 
> Diff: https://reviews.apache.org/r/30403/diff/
> 
> 
> Testing
> ---
> 
> The change here involves updating the Kafka scripts (for Windows and * nix) 
> to infer and setup KAFKA_HOME environment variable. This value is then used 
> by the KafkaConfig to decide what path to default to for the Kafka data logs, 
> in the absence of any explicitly set log.dirs (or log.dir) properties.
> 
> Care has been taken to ensure that other mechanism which might presently be 
> bypassing the Kafka scripts, will

Re: Review Request 30403: Patch for KAFKA-1906

2015-01-29 Thread Jay Kreps


> On Jan. 29, 2015, 6:50 a.m., Gwen Shapira wrote:
> > We added --override option to KafkaServer that allows overriding default 
> > configuration from commandline.
> > I believe that just changing the shell script to include --override 
> > log.dir=${KAFKA_HOME}/data 
> > may be enough?
> > 
> > overriding configuration from server.properties in code can be very 
> > unintuitive.
> 
> Jaikiran Pai wrote:
> That sounds a good idea. I wasn't aware of the --override option. I'll 
> give that a try and if it works then the change will be limited to just the 
> scripts.
> 
> Jaikiran Pai wrote:
> Hi Gwen,
> 
> I had a look at the JIRA https://issues.apache.org/jira/browse/KAFKA-1654 
> which added support for --override and also the purpose of that option. From 
> what I see it won't be useful in this case, because in this current task, we 
> don't want to override a value that has been explicitly set (via 
> server.properties for example). Instead we want to handle a case where no 
> explicit value is specified for the data log directory and in such cases 
> default it to a path which resides under the Kafka install directory.
> 
> If we use the --override option in our (startup) scripts to set 
> log.dir=${KAFKA_HOME}/data, we will end up forcing this value as the log.dir 
> even when the user has intentionally specified a different path for the data 
> logs.
> 
> Let me know if I misunderstood your suggestion.
> 
> Jay Kreps wrote:
> I think you are right that --override won't work but maybe this is still 
> a good suggestion?
> 
> Something seems odd about force overriding the working directory of the 
> process just to set the log directory.
> 
> Another option might be to add --default. This would work like --override 
> but would provide a default value only if none is specified. I think this 
> might be possible because the java.util.Properties we use for config supports 
> a hierarchy of defaults. E.g. you can say new Properties(defaultProperties). 
> Not sure if this is better or worse.
> 
> Thoughts?
> 
> Jaikiran Pai wrote:
> Hi Jay,
> 
> > Another option might be to add --default. This would work like 
> --override but would provide a default value only if none is specified. I 
> think this might be possible because the java.util.Properties we use for 
> config supports a hierarchy of defaults. E.g. you can say new 
> Properties(defaultProperties). Not sure if this is better or worse.
> 
> I think --default sounds like a good idea which could help us use it for 
> other properties too (if we need to). It does look better than the current 
> change that I have done, because the Java code then doesn't have to worry 
> about how that default value is sourced. We can then just update the scripts 
> to set up the default for the log.dir appropriately.
> 
> I can work towards adding support for it and will update this patch once 
> it's ready.
> 
> As for:
> 
> > Something seems odd about force overriding the working directory of the 
> process just to set the log directory.
> 
> Sorry, I don't understand what you meant there. Is it something about the 
> change that was done to the scripts?

I guess what I mean is: is there any other reason you might care about the 
working directory of the process? If so we probably don't want to force it to 
be the Kafka directory. If not it may actually be fine and in that case I think 
having relative paths work is nice. I don't personally know the answer to this, 
what is "good practice" for a server process?


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30403/#review70168
---


On Jan. 29, 2015, 6:24 a.m., Jaikiran Pai wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30403/
> ---
> 
> (Updated Jan. 29, 2015, 6:24 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1906
> https://issues.apache.org/jira/browse/KAFKA-1906
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1906 Default the Kafka data log directory to 
> $KAFKA_HOME/data/kafka-logs directory, where KAFKA_HOME is the Kafka 
> installation directory
> 
> 
> Diffs
> -
> 
>   bin/kafka-run-class.sh 881f578a8f5c796fe23415b978c1ad35869af76e 
>   bin/windows/kafka-run-class.bat 9df3d2b45236b4f06d55a89c84afcf0ab9f5d0f2 
>   config/server.properties 1614260b71a658b405bb24157c8f12b1f1031aa5 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 6d74983472249eac808d361344c58cc2858ec971 
>   core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
> 82dce80d553957d8b5776a9e140c346d4e07f766 
> 
> Diff: https://reviews.apache.org

Re: Review Request 27799: New consumer

2015-01-29 Thread Jay Kreps


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java, lines 30-33
> > 
> >
> > Wondering why we make newline for @param but keep the same line for 
> > @return?

Yeah that is annoying auto-formatting from my IDE, will fix.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 167
> > 
> >
> > Is this function really private? If yes we do not need keep the javadoc 
> > for it.

Well we actually have javadoc'd everythin in that class.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java, 
> > lines 37-44
> > 
> >
> > The javadocs here are a little confusing: users looking for its 
> > function APIs need to look into KafkaConsumer, an implementation of the 
> > interface Consumer.

The motivation here was that the description of behavior is actually very 
specific to the KafkaConsumer implementation (e.g. details of communication to 
the server and so on). I didn't want to duplicate the docs either. Actually the 
interface is not really meant for use, we don't want a bunch of 
implementations, people should either use our mock or the real class. Mocks for 
our class will likely break as we add methods.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java,
> >  line 114
> > 
> >
> > Should this inherit from CommonClientConfig?

Well, actually ConsumerConfig and ProducerConfig are public and 
CommonClientConfig is private...I just wanted a way to share those variables.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java,
> >  lines 185-191
> > 
> >
> > Could these two be moved to CommonClientConfig?

Actually it's subtly different since it is key.serializer in the producer and 
key.deserializer in the consumer.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 845
> > 
> >
> > Maybe add some comments on why consumerId / generationId are 
> > initialized as such and when they will be updated and used.

This is actually not handled yet as we need the consumer co-ordinator to 
maintain and update these variables first.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 917
> > 
> >
> > There is a potential risk that a topic is deleted and the consumer 
> > unscribes to it, but not removing it from its metadata topic list, causing 
> > the underlying network client to keep refreshing metadata.

I think you are flagging that we never remove topics from our metadata fetch 
list. This is true. But I don't think this will cause metadata refreshes to 
occur will it?

Another question here is just what the behavior should be for a topic that is 
deleted. There are obviously a bunch of states the consumer could be in when 
the topic gets deleted so we should ideally be sure we handle them all. I 
haven't thought a lot about this.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 968-973
> > 
> >
> > Should the ordering of these two "else if" be swapped?

I don't think so. I think the idea is that if the coordinator has hit the 
failure detection criteria you want to mark it dead and continue. You don't to 
keep trying to heartbeat to it (since you have already failed multiple times).


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1075
> > 
> >
> > Do we need to back off here?

Good call.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1343
> > 
> >
> > Should we change the "this.client.inFlightRequestCount .. " condition 
> > to just "node.ready()"?

Actually the criteria here is stricter. We need to issue successiv

Re: Review Request 27799: New consumer

2015-01-29 Thread Jay Kreps


> On Jan. 22, 2015, 5:35 p.m., Aditya Auradkar wrote:
> > clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java,
> >  line 21
> > 
> >
> > nit. Can we remove the public from the interface methods?
> 
> Jay Kreps wrote:
> Can you explain...?
> 
> Aditya Auradkar wrote:
> I gather all interface methods are implicitly public.. so that should be 
> unnecessary.
> 
> Aditya Auradkar wrote:
> http://docs.oracle.com/javase/specs/jls/se7/html/jls-9.html#jls-9.4

Oh interesting. Did not know that. I suppose we should probably either leave it 
or remove then all then.


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27799/#review69204
---


On Jan. 23, 2015, 9:15 p.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27799/
> ---
> 
> (Updated Jan. 23, 2015, 9:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1760
> https://issues.apache.org/jira/browse/KAFKA-1760
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> New consumer.
> 
> 
> Diffs
> -
> 
>   build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> d32c319d8ee4c46dad309ea54b136ea9798e2fd7 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> 8aece7e81a804b177a6f2c12e2dc6c89c1613262 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
> ab7e3220f9b76b92ef981d695299656f041ad5ed 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 397695568d3fd8e835d8f923a89b3b00c96d0ead 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 
>   clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 
> 752a979ea0b8bde7ff6d2e1a23bf54052305d841 
>   clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> c0c636b3e1ba213033db6d23655032c9bbd5e378 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 57c1807ccba9f264186f83e91f37c34b959c8060 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
>  e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
> 16af70a5de52cca786fdea147a6a639b7dc4a311 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 76efc216c9e6c3ab084461d792877092a189ad0f 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> fa88ac1a8b19b4294f211c4467fe68c7707ddbae 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java 
> ea423ad15eebd262d20d5ec05d592cc115229177 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> fc71710dd5997576d3841a1c3b0f7e19a8c9698e 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 904976fadf0610982958628eaee810b60a98d725 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
>  dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
>  483899d2e69b33655d0e08949f5f64af2519660a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> ccc03d8447ebba40131a70e16969686ac4aab58a 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> d3299b944062d96852452de455902659ad8af757 
>   clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
> b15aa2c3ef2d7c4b24618ff42fd4da324237a813 
>   clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
> 98cb79b701918eca3f6ca9823b6c7b7c97b3ecec 
>   clients/src/main/java/o

Re: Review Request 27799: New consumer

2015-01-29 Thread Jay Kreps


> On Jan. 27, 2015, 10:25 a.m., Onur Karaman wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java,
> >  line 205
> > 
> >
> > You left a blah.

This one is actually intentional. We haven't implemented the server assignment 
which would determine the name that the client should send. I wanted to leave 
this something made up so we would remember to change it when we have that in 
place.


> On Jan. 27, 2015, 10:25 a.m., Onur Karaman wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 163-176
> > 
> >
> > Was this switch to html codes intentional?

Yeah my IDE seems to do that. Kind of annoying.


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27799/#review69737
---


On Jan. 23, 2015, 9:15 p.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27799/
> ---
> 
> (Updated Jan. 23, 2015, 9:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1760
> https://issues.apache.org/jira/browse/KAFKA-1760
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> New consumer.
> 
> 
> Diffs
> -
> 
>   build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> d32c319d8ee4c46dad309ea54b136ea9798e2fd7 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> 8aece7e81a804b177a6f2c12e2dc6c89c1613262 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
> ab7e3220f9b76b92ef981d695299656f041ad5ed 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 397695568d3fd8e835d8f923a89b3b00c96d0ead 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 
>   clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 
> 752a979ea0b8bde7ff6d2e1a23bf54052305d841 
>   clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> c0c636b3e1ba213033db6d23655032c9bbd5e378 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 57c1807ccba9f264186f83e91f37c34b959c8060 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
>  e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
> 16af70a5de52cca786fdea147a6a639b7dc4a311 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 76efc216c9e6c3ab084461d792877092a189ad0f 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> fa88ac1a8b19b4294f211c4467fe68c7707ddbae 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java 
> ea423ad15eebd262d20d5ec05d592cc115229177 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> fc71710dd5997576d3841a1c3b0f7e19a8c9698e 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 904976fadf0610982958628eaee810b60a98d725 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
>  dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
>  483899d2e69b33655d0e08949f5f64af2519660a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> ccc03d8447ebba40131a70e16969686ac4aab58a 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> d3299b944062d96852452de455902659ad8af757 
>   clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
> b15aa2c3ef2d7c4b24618ff42f

Re: Review Request 27799: New consumer

2015-01-29 Thread Jay Kreps


> On Jan. 28, 2015, 1:34 a.m., Guozhang Wang wrote:
> > "patch -p1 < patch-file" does not do the renaming of 
> > RequestCompletionHandler.java so I have to do that manually (weird), but 
> > other than that, build / test LGTM. 
> > 
> > It seems some of previous comments are not addressed yet. For exmaple ones 
> > from Jan. 20, the METADATA_MAX_AGE configs, etc. Could you double check to 
> > see if they are valid and then I think we can check it in.

Oops, I forgot to hit publish on some of the replies.


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27799/#review69936
---


On Jan. 23, 2015, 9:15 p.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27799/
> ---
> 
> (Updated Jan. 23, 2015, 9:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1760
> https://issues.apache.org/jira/browse/KAFKA-1760
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> New consumer.
> 
> 
> Diffs
> -
> 
>   build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> d32c319d8ee4c46dad309ea54b136ea9798e2fd7 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> 8aece7e81a804b177a6f2c12e2dc6c89c1613262 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
> ab7e3220f9b76b92ef981d695299656f041ad5ed 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 397695568d3fd8e835d8f923a89b3b00c96d0ead 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 
>   clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 
> 752a979ea0b8bde7ff6d2e1a23bf54052305d841 
>   clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> c0c636b3e1ba213033db6d23655032c9bbd5e378 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 57c1807ccba9f264186f83e91f37c34b959c8060 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
>  e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
> 16af70a5de52cca786fdea147a6a639b7dc4a311 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 76efc216c9e6c3ab084461d792877092a189ad0f 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> fa88ac1a8b19b4294f211c4467fe68c7707ddbae 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java 
> ea423ad15eebd262d20d5ec05d592cc115229177 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> fc71710dd5997576d3841a1c3b0f7e19a8c9698e 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 904976fadf0610982958628eaee810b60a98d725 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
>  dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
>  483899d2e69b33655d0e08949f5f64af2519660a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> ccc03d8447ebba40131a70e16969686ac4aab58a 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> d3299b944062d96852452de455902659ad8af757 
>   clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
> b15aa2c3ef2d7c4b24618ff42fd4da324237a813 
>   clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
> 98cb79b701918eca3f6ca9823b6c7b7c97b3ecec 
>   clients/src/main/java/org/apache/kafka/common/errors/ApiException.java 
> 7c948b166a8ac07616809f260754116ae7764973 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.jav

Re: Review Request 27799: New consumer

2015-01-29 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27799/
---

(Updated Jan. 29, 2015, 11:20 a.m.)


Review request for kafka.


Bugs: KAFKA-1760
https://issues.apache.org/jira/browse/KAFKA-1760


Repository: kafka


Description (updated)
---

KAFKA-1760: New consumer.


Diffs (updated)
-

  build.gradle a980f61def59ae59dbd5e58050285a801b21674f 
  clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
d32c319d8ee4c46dad309ea54b136ea9798e2fd7 
  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
8aece7e81a804b177a6f2c12e2dc6c89c1613262 
  clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
ab7e3220f9b76b92ef981d695299656f041ad5ed 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
397695568d3fd8e835d8f923a89b3b00c96d0ead 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
6746275d0b2596cd6ff7ce464a3a8225ad75ef00 
  clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 
752a979ea0b8bde7ff6d2e1a23bf54052305d841 
  clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
c0c636b3e1ba213033db6d23655032c9bbd5e378 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
57c1807ccba9f264186f83e91f37c34b959c8060 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
 e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
16af70a5de52cca786fdea147a6a639b7dc4a311 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
76efc216c9e6c3ab084461d792877092a189ad0f 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
fa88ac1a8b19b4294f211c4467fe68c7707ddbae 
  
clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java 
ea423ad15eebd262d20d5ec05d592cc115229177 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
fc71710dd5997576d3841a1c3b0f7e19a8c9698e 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
904976fadf0610982958628eaee810b60a98d725 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 
dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
 483899d2e69b33655d0e08949f5f64af2519660a 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
ccc03d8447ebba40131a70e16969686ac4aab58a 
  clients/src/main/java/org/apache/kafka/common/Cluster.java 
d3299b944062d96852452de455902659ad8af757 
  clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
b15aa2c3ef2d7c4b24618ff42fd4da324237a813 
  clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
98cb79b701918eca3f6ca9823b6c7b7c97b3ecec 
  clients/src/main/java/org/apache/kafka/common/errors/ApiException.java 
7c948b166a8ac07616809f260754116ae7764973 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b68bbf00ab8eba6c5867d346c91188142593ca6e 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
74d695ba39de44b6a3d15340ec0114bc4fce2ba2 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
3316b6a1098311b8603a4a5893bf57b75d2e43cb 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
121e880a941fcd3e6392859edba11a94236494cc 
  clients/src/main/java/org/apache/kafka/common/record/LogEntry.java 
e4d688cbe0c61b74ea15fc8dd3b634f9e5ee9b83 
  clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
 99b52c23d639df010bf2affc0f79d1c6e16ed67c 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
 8b8f591c4b2802a9cbbe34746c0b3ca4a64a8681 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
2fc471f64f435

[jira] [Updated] (KAFKA-1760) Implement new consumer client

2015-01-29 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1760:
-
Attachment: KAFKA-1760_2015-01-29_03:20:20.patch

> Implement new consumer client
> -
>
> Key: KAFKA-1760
> URL: https://issues.apache.org/jira/browse/KAFKA-1760
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Fix For: 0.8.3
>
> Attachments: KAFKA-1760.patch, KAFKA-1760_2015-01-11_16:57:15.patch, 
> KAFKA-1760_2015-01-18_19:10:13.patch, KAFKA-1760_2015-01-21_08:42:20.patch, 
> KAFKA-1760_2015-01-22_10:03:26.patch, KAFKA-1760_2015-01-22_20:21:56.patch, 
> KAFKA-1760_2015-01-23_13:13:00.patch, KAFKA-1760_2015-01-29_03:20:20.patch
>
>
> Implement a consumer client.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1760) Implement new consumer client

2015-01-29 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14296710#comment-14296710
 ] 

Jay Kreps commented on KAFKA-1760:
--

Updated reviewboard https://reviews.apache.org/r/27799/diff/
 against branch trunk

> Implement new consumer client
> -
>
> Key: KAFKA-1760
> URL: https://issues.apache.org/jira/browse/KAFKA-1760
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Fix For: 0.8.3
>
> Attachments: KAFKA-1760.patch, KAFKA-1760_2015-01-11_16:57:15.patch, 
> KAFKA-1760_2015-01-18_19:10:13.patch, KAFKA-1760_2015-01-21_08:42:20.patch, 
> KAFKA-1760_2015-01-22_10:03:26.patch, KAFKA-1760_2015-01-22_20:21:56.patch, 
> KAFKA-1760_2015-01-23_13:13:00.patch, KAFKA-1760_2015-01-29_03:20:20.patch
>
>
> Implement a consumer client.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1908) Split brain

2015-01-29 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-1908:


 Summary: Split brain
 Key: KAFKA-1908
 URL: https://issues.apache.org/jira/browse/KAFKA-1908
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Alexey Ozeritskiy


In some cases, there may be two leaders for one partition.
Steps to reproduce:
# We have 3 brokers, 1 partition with 3 replicas:
{code}
TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
ISR: [1,2,3]
{code} 
# controller works on broker 3
# let the kafka port be 9092. We execute on broker 1:
{code}
iptables -A INPUT -p tcp --dport 9092 -j REJECT
{code}
# Initiate replica election
# As a result:
Broker 1:
{code}
TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
ISR: [1,2,3]
{code}
Broker 2:
{code}
TopicAndPartition: [partition,0]Leader: 2   Replicas: [2,1,3]   
ISR: [1,2,3]
{code}
# Flush the iptables rules on broker 1

Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not 
receive new data. A consumer can read data from replica-1 or replica-2. When it 
reads from replica-1 it resets the offsets and than can read duplicates from 
replica-2.

We saw this situation in our production cluster when it had network problems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28769: Patch for KAFKA-1809

2015-01-29 Thread Jeff Holoman


> On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/KafkaConfig.scala, lines 182-183
> > 
> >
> > Since this is also used for communication btw the controller and the 
> > brokers, perhaps it's better named as sth like 
> > "intra.broker.security.protocol"?

Maybe it makese sense to prepend all security related configs with "security, 
eg: "security.intra.broker.protocol", "security.new.param.for.future.jira" With 
all of the upcoming changes it would make security realted configs easy to spot.


- Jeff


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28769/#review69281
---


On Jan. 28, 2015, 6:26 p.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28769/
> ---
> 
> (Updated Jan. 28, 2015, 6:26 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1809
> https://issues.apache.org/jira/browse/KAFKA-1809
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1890 Fix bug preventing Mirror Maker from successful rebalance; 
> reviewed by Gwen Shapira and Neha Narkhede
> 
> 
> first commit of refactoring.
> 
> 
> changed topicmetadata to include brokerendpoints and fixed few unit tests
> 
> 
> fixing systest and support for binding to default address
> 
> 
> fixed system tests
> 
> 
> fix default address binding and ipv6 support
> 
> 
> fix some issues regarding endpoint parsing. Also, larger segments for systest 
> make the validation much faster
> 
> 
> added link to security wiki in doc
> 
> 
> fixing unit test after rename of ProtocolType to SecurityProtocol
> 
> 
> Following Joe's advice, added security protocol enum on client side, and 
> modified protocol to use ID instead of string.
> 
> 
> validate producer config against enum
> 
> 
> add a second protocol for testing and modify SocketServerTests to check on 
> multi-ports
> 
> 
> Reverted the metadata request changes and removed the explicit security 
> protocol argument. Instead the socketserver will determine the protocol based 
> on the port and add this to the request
> 
> 
> bump version for UpdateMetadataRequest and added support for rolling upgrades 
> with new config
> 
> 
> following tests - fixed LeaderAndISR protocol and ZK registration for 
> backward compatibility
> 
> 
> cleaned up some changes that were actually not necessary. hopefully making 
> this patch slightly easier to review
> 
> 
> undoing some changes that don't belong here
> 
> 
> bring back config lost in cleanup
> 
> 
> fixes neccessary for an all non-plaintext cluster to work
> 
> 
> minor modifications following comments by Jun
> 
> 
> added missing license
> 
> 
> formatting
> 
> 
> clean up imports
> 
> 
> cleaned up V2 to not include host+port field. Using use.new.protocol flag to 
> decide which version to serialize
> 
> 
> change endpoints collection in Broker to Map[protocol, endpoint], mostly to 
> be clear that we intend to have one endpoint per protocol
> 
> 
> validate that listeners and advertised listeners have unique ports and 
> protocols
> 
> 
> support legacy configs
> 
> 
> some fixes following rebase
> 
> 
> Reverted to backward compatible zk registration, changed use.new.protocol to 
> support multiple versions and few minor fixes
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 
>   clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 527dd0f9c47fce7310b7a37a9b95bf87f1b9c292 
>   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
> a39fab532f73148316a56c0f8e9197f38ea66f79 
>   config/server.properties 1614260b71a658b405bb24157c8f12b1f1031aa5 
>   core/src/main/scala/kafka/admin/AdminUtils.scala 
> 28b12c7b89a56c113b665fbde1b95f873f8624a3 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 285c0333ff43543d3e46444c1cd9374bb883bb59 
>   core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 
> 84f60178f6ebae735c8aa3e79ed93fe21ac4aea7 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
> 4ff7e8f8cc695551dd5d2fe65c74f6b6c571e340 
>   core/src/main/scala/kafka/api/TopicMetadata.scala 
> 0190076df0adf906ecd332284f222ff974b315fc 
>   core/src/main/scala/kafka/api/TopicMetadataResponse.scala 
> 92ac4e687be22e4800199c0666bfac5e0059e5bb 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
> 530982e36b17934b8cc5fb668075a5342e142c59 
>   core/

[jira] [Resolved] (KAFKA-1903) Zk Expiration causes controller deadlock

2015-01-29 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani resolved KAFKA-1903.
---
Resolution: Won't Fix

Closing this as won't  fix as this is fixed 0.8.2 , re-open if necessary. 

> Zk Expiration causes controller deadlock
> 
>
> Key: KAFKA-1903
> URL: https://issues.apache.org/jira/browse/KAFKA-1903
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1, 0.8.1.1
> Environment: java version "1.7.0_55"
> Java(TM) SE Runtime Environment (build 1.7.0_55-b13)
> Java HotSpot(TM) 64-Bit Server VM (build 24.55-b03, mixed mode)
> kafka_2.9.2-0.8.1
>Reporter: yufeng.chen
>Assignee: Neha Narkhede
>Priority: Critical
>
> when controller encounter a ZK expired, zookeeper node /broker/ids  lost one 
> kafkk controler. If there has three node, e.g. 1 2, 3; and the 1 start 
> delete-topic-method thread. At this time, node 1 will lost. Why? The reason 
> is that: when ZK expiration happened,  the zk-event-thread will call 
> KafkaController.SessionExpirationListener.handleNewSession method. if the 
> zk-event-thread has the controllerContext.controllerLock, will call 
> onControllerResignation->deleteTopicManager.shutdown()->deleteTopicsThread.shutdown().
>  And the delete-topic-thread is working, and await at 
> awaitTopicDeletionNotification() method。 Zk-event-thread call 
> deleteTopicsThread.shutdown() and wait until the run() method execute 
> compelely. Because the zk-event-thread has the lock,  
> "deleteTopicsCond.await()" whill not be really "interruted " . Then 
> zk-event-thread whill pause,  not execute the 
> kafkaHealthcheck->SessionExpireListener.handleNewSession。 The controller will 
> not register again. The jstack log :
> "delete-topics-thread" prio=10 tid=0x7fb0bc21b000 nid=0x2825 waiting on 
> condition [0x7fb0f534a000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe4952da0> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2047)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:333)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> "ZkClient-EventThread-12-10.3.63.8:2181,10.3.63.9:2181" daemon prio=10 
> tid=0x7fb10038e800 nid=0x7d93 waiting on condition [0x7fb0f544a000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe4f4a760> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
>   

Re: Review Request 30403: Patch for KAFKA-1906

2015-01-29 Thread Jeff Holoman
Maybe I'm in the minority here, but I actually don't think there should be
a default for this param and you should be required to explicitly set this.

On Thu, Jan 29, 2015 at 5:43 AM, Jay Kreps  wrote:

>
>
> > On Jan. 29, 2015, 6:50 a.m., Gwen Shapira wrote:
> > > We added --override option to KafkaServer that allows overriding
> default configuration from commandline.
> > > I believe that just changing the shell script to include --override
> log.dir=${KAFKA_HOME}/data
> > > may be enough?
> > >
> > > overriding configuration from server.properties in code can be very
> unintuitive.
> >
> > Jaikiran Pai wrote:
> > That sounds a good idea. I wasn't aware of the --override option.
> I'll give that a try and if it works then the change will be limited to
> just the scripts.
> >
> > Jaikiran Pai wrote:
> > Hi Gwen,
> >
> > I had a look at the JIRA
> https://issues.apache.org/jira/browse/KAFKA-1654 which added support for
> --override and also the purpose of that option. From what I see it won't be
> useful in this case, because in this current task, we don't want to
> override a value that has been explicitly set (via server.properties for
> example). Instead we want to handle a case where no explicit value is
> specified for the data log directory and in such cases default it to a path
> which resides under the Kafka install directory.
> >
> > If we use the --override option in our (startup) scripts to set
> log.dir=${KAFKA_HOME}/data, we will end up forcing this value as the
> log.dir even when the user has intentionally specified a different path for
> the data logs.
> >
> > Let me know if I misunderstood your suggestion.
> >
> > Jay Kreps wrote:
> > I think you are right that --override won't work but maybe this is
> still a good suggestion?
> >
> > Something seems odd about force overriding the working directory of
> the process just to set the log directory.
> >
> > Another option might be to add --default. This would work like
> --override but would provide a default value only if none is specified. I
> think this might be possible because the java.util.Properties we use for
> config supports a hierarchy of defaults. E.g. you can say new
> Properties(defaultProperties). Not sure if this is better or worse.
> >
> > Thoughts?
> >
> > Jaikiran Pai wrote:
> > Hi Jay,
> >
> > > Another option might be to add --default. This would work like
> --override but would provide a default value only if none is specified. I
> think this might be possible because the java.util.Properties we use for
> config supports a hierarchy of defaults. E.g. you can say new
> Properties(defaultProperties). Not sure if this is better or worse.
> >
> > I think --default sounds like a good idea which could help us use it
> for other properties too (if we need to). It does look better than the
> current change that I have done, because the Java code then doesn't have to
> worry about how that default value is sourced. We can then just update the
> scripts to set up the default for the log.dir appropriately.
> >
> > I can work towards adding support for it and will update this patch
> once it's ready.
> >
> > As for:
> >
> > > Something seems odd about force overriding the working directory
> of the process just to set the log directory.
> >
> > Sorry, I don't understand what you meant there. Is it something
> about the change that was done to the scripts?
>
> I guess what I mean is: is there any other reason you might care about the
> working directory of the process? If so we probably don't want to force it
> to be the Kafka directory. If not it may actually be fine and in that case
> I think having relative paths work is nice. I don't personally know the
> answer to this, what is "good practice" for a server process?
>
>
> - Jay
>
>
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30403/#review70168
> ---
>
>
> On Jan. 29, 2015, 6:24 a.m., Jaikiran Pai wrote:
> >
> > ---
> > This is an automatically generated e-mail. To reply, visit:
> > https://reviews.apache.org/r/30403/
> > ---
> >
> > (Updated Jan. 29, 2015, 6:24 a.m.)
> >
> >
> > Review request for kafka.
> >
> >
> > Bugs: KAFKA-1906
> > https://issues.apache.org/jira/browse/KAFKA-1906
> >
> >
> > Repository: kafka
> >
> >
> > Description
> > ---
> >
> > KAFKA-1906 Default the Kafka data log directory to
> $KAFKA_HOME/data/kafka-logs directory, where KAFKA_HOME is the Kafka
> installation directory
> >
> >
> > Diffs
> > -
> >
> >   bin/kafka-run-class.sh 881f578a8f5c796fe23415b978c1ad35869af76e
> >   bin/windows/kafka-run-class.bat
> 9df3d2b45236b4f06d55a89c84afcf0ab9f5d0f2
> >   config/server.properties 1614260b71a658b405bb24157c8f

Re: Review Request 27799: New consumer

2015-01-29 Thread Guozhang Wang


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java,
> >  line 114
> > 
> >
> > Should this inherit from CommonClientConfig?
> 
> Jay Kreps wrote:
> Well, actually ConsumerConfig and ProducerConfig are public and 
> CommonClientConfig is private...I just wanted a way to share those variables.

I meant it could be "public static final String METADATA_MAX_AGE_CONFIG = 
CommonClientConfigs.METADATA_MAX_AGE_CONFIG"?


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java,
> >  lines 1-29
> > 
> >
> > This kafka exception could be thrown other places besides committed() 
> > and position(), it could also be thrown in:
> > 
> > private resetOffset() -> 
> > 
> >   private fetchMissingPositionsOrResetThem() ->
> > 
> > public position()
> > public pool()
> > 
> >   private handleFetchResponse() ->
> > 
> > public pool()
> > 
> > 
> > Hence in pool() we need to hanle this exception specifically.
> > 
> > In general, I would suggest we add the @throws label to private 
> > functions also for easy maintainning the throwable exceptions.
> 
> Jay Kreps wrote:
> Yeah the behavior is actually correct, I think. The general philosophy is 
> that if the error is recoverable we should log it and keep trying but if it 
> is fatal we should throw it. This is a fatal case (you are out of range and 
> have no reset policy) so we should just blow up whatever the calling method 
> is. I added the appropriate javadoc on poll() and position().

Yeah I think we were just missing @throws labels here.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27799/#review68693
---


On Jan. 29, 2015, 11:20 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27799/
> ---
> 
> (Updated Jan. 29, 2015, 11:20 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1760
> https://issues.apache.org/jira/browse/KAFKA-1760
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1760: New consumer.
> 
> 
> Diffs
> -
> 
>   build.gradle a980f61def59ae59dbd5e58050285a801b21674f 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> d32c319d8ee4c46dad309ea54b136ea9798e2fd7 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> 8aece7e81a804b177a6f2c12e2dc6c89c1613262 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
> ab7e3220f9b76b92ef981d695299656f041ad5ed 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 397695568d3fd8e835d8f923a89b3b00c96d0ead 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 
>   clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 
> 752a979ea0b8bde7ff6d2e1a23bf54052305d841 
>   clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> c0c636b3e1ba213033db6d23655032c9bbd5e378 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 57c1807ccba9f264186f83e91f37c34b959c8060 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
>  e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
> 16af70a5de52cca786fdea147a6a639b7dc4a311 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 76efc216c9e6c3ab084461d792877092a189ad0f 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> fa88ac1a8b19b4294f211c4467fe68c7707ddbae 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java 
> ea423ad15eebd262d20d5ec05d592cc115229177 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.jav

Re: Cannot stop Kafka server if zookeeper is shutdown first

2015-01-29 Thread Neha Narkhede
Ewen is right. ZkClient APIs are blocking and the right fix for this seems
to be patching ZkClient. At some point, if we find ourselves fiddling too
much with ZkClient, it wouldn't hurt to write our own little zookeeper
client wrapper.

On Thu, Jan 29, 2015 at 12:57 AM, Ewen Cheslack-Postava 
wrote:

> Looks like a bug to me -- the underlying ZK library wraps a lot of blocking
> method implementations with waitUntilConnected() calls without any
> timeouts. Ideally we could just add a version of ZkUtils.getController()
> with a timeout, but I don't see an easy way to accomplish that with
> ZkClient.
>
> There's at least one other call to ZkUtils besides the one in the
> stacktrace you gave that would cause the same issue, possibly more that
> aren't directly called in that method. One ugly solution would be to use an
> extra thread during shutdown to trigger timeouts, but I'd imagine we
> probably have other threads that could end up blocking in similar ways.
>
> I filed https://issues.apache.org/jira/browse/KAFKA-1907 to track the
> issue.
>
>
> On Mon, Jan 26, 2015 at 6:35 AM, Jaikiran Pai 
> wrote:
>
> > The main culprit is this thread which goes into "forever retry connection
> > to a closed zookeeper" when I shutdown Kafka (via a Ctrl + C) after
> > zookeeper has already been shutdown. I have attached the complete thread
> > dump, but I don't know if it will be delivered to the mailing list.
> >
> > "Thread-2" prio=10 tid=0xb3305000 nid=0x4758 waiting on condition
> > [0x6ad69000]
> >java.lang.Thread.State: TIMED_WAITING (parking)
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for  <0x70a93368> (a java.util.concurrent.locks.
> > AbstractQueuedSynchronizer$ConditionObject)
> > at java.util.concurrent.locks.LockSupport.parkUntil(
> > LockSupport.java:267)
> > at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> > ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2130)
> > at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
> > at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
> > at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
> > at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> > at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:456)
> > at kafka.utils.ZkUtils$.getController(ZkUtils.scala:65)
> > at kafka.server.KafkaServer.kafka$server$KafkaServer$$
> > controlledShutdown(KafkaServer.scala:194)
> > at kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$
> > sp(KafkaServer.scala:269)
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> > at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> > at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
> > at kafka.utils.Logging$class.swallow(Logging.scala:94)
> > at kafka.utils.Utils$.swallow(Utils.scala:45)
> > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:269)
> > at kafka.server.KafkaServerStartable.shutdown(
> > KafkaServerStartable.scala:42)
> > at kafka.Kafka$$anon$1.run(Kafka.scala:42)
> >
> > -Jaikiran
> >
> >
> > On Monday 26 January 2015 05:46 AM, Neha Narkhede wrote:
> >
> >> For a clean shutdown, the broker tries to talk to the controller and
> also
> >> issues reads to zookeeper. Possibly that is where it tries to reconnect
> to
> >> zk. It will help to look at the thread dump.
> >>
> >> Thanks
> >> Neha
> >>
> >> On Fri, Jan 23, 2015 at 8:53 PM, Jaikiran Pai  >
> >> wrote:
> >>
> >>  I was just playing around with the RC2 of 0.8.2 and noticed that if I
> >>> shutdown zookeeper first I can't shutdown Kafka server at all since it
> >>> goes
> >>> into a never ending attempt to reconnect with zookeeper. I had to kill
> >>> the
> >>> Kafka process to stop it. I tried it against trunk too and there too I
> >>> see
> >>> the same issue. Should I file a JIRA for this and see if I can come up
> >>> with
> >>> a patch?
> >>>
> >>> FWIW, here's the unending (and IMO too frequent) attempts at trying to
> >>> reconnect. I've a thread dump too which shows that the other thread
> which
> >>> is trying to complete a controlled shutdown of Kafka is blocked forever
> >>> for
> >>> the zookeeper to be up. I can attach it to the JIRA.
> >>>
> >>> 2015-01-24 10:15:46,278] WARN Session 0x14b1a413680 for server
> null,
> >>> unexpected error, closing socket connection and attempting reconnect
> >>> (org.apache.zookeeper.ClientCnxn)
> >>> java.net.ConnectException: Connection refused
> >>>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> >>>  at sun.nio.ch.SocketChannelImpl.finishConnect(
> >>> SocketChannelImpl.java:739)
> >>>  at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(
> >>> ClientCnxnSocketNIO.java:361)
> >>>  at org.apache.zookeeper.ClientCnxn$SendThread.run(
> >

Re: Review Request 30403: Patch for KAFKA-1906

2015-01-29 Thread Gwen Shapira


> On Jan. 29, 2015, 6:50 a.m., Gwen Shapira wrote:
> > We added --override option to KafkaServer that allows overriding default 
> > configuration from commandline.
> > I believe that just changing the shell script to include --override 
> > log.dir=${KAFKA_HOME}/data 
> > may be enough?
> > 
> > overriding configuration from server.properties in code can be very 
> > unintuitive.
> 
> Jaikiran Pai wrote:
> That sounds a good idea. I wasn't aware of the --override option. I'll 
> give that a try and if it works then the change will be limited to just the 
> scripts.
> 
> Jaikiran Pai wrote:
> Hi Gwen,
> 
> I had a look at the JIRA https://issues.apache.org/jira/browse/KAFKA-1654 
> which added support for --override and also the purpose of that option. From 
> what I see it won't be useful in this case, because in this current task, we 
> don't want to override a value that has been explicitly set (via 
> server.properties for example). Instead we want to handle a case where no 
> explicit value is specified for the data log directory and in such cases 
> default it to a path which resides under the Kafka install directory.
> 
> If we use the --override option in our (startup) scripts to set 
> log.dir=${KAFKA_HOME}/data, we will end up forcing this value as the log.dir 
> even when the user has intentionally specified a different path for the data 
> logs.
> 
> Let me know if I misunderstood your suggestion.
> 
> Jay Kreps wrote:
> I think you are right that --override won't work but maybe this is still 
> a good suggestion?
> 
> Something seems odd about force overriding the working directory of the 
> process just to set the log directory.
> 
> Another option might be to add --default. This would work like --override 
> but would provide a default value only if none is specified. I think this 
> might be possible because the java.util.Properties we use for config supports 
> a hierarchy of defaults. E.g. you can say new Properties(defaultProperties). 
> Not sure if this is better or worse.
> 
> Thoughts?
> 
> Jaikiran Pai wrote:
> Hi Jay,
> 
> > Another option might be to add --default. This would work like 
> --override but would provide a default value only if none is specified. I 
> think this might be possible because the java.util.Properties we use for 
> config supports a hierarchy of defaults. E.g. you can say new 
> Properties(defaultProperties). Not sure if this is better or worse.
> 
> I think --default sounds like a good idea which could help us use it for 
> other properties too (if we need to). It does look better than the current 
> change that I have done, because the Java code then doesn't have to worry 
> about how that default value is sourced. We can then just update the scripts 
> to set up the default for the log.dir appropriately.
> 
> I can work towards adding support for it and will update this patch once 
> it's ready.
> 
> As for:
> 
> > Something seems odd about force overriding the working directory of the 
> process just to set the log directory.
> 
> Sorry, I don't understand what you meant there. Is it something about the 
> change that was done to the scripts?
> 
> Jay Kreps wrote:
> I guess what I mean is: is there any other reason you might care about 
> the working directory of the process? If so we probably don't want to force 
> it to be the Kafka directory. If not it may actually be fine and in that case 
> I think having relative paths work is nice. I don't personally know the 
> answer to this, what is "good practice" for a server process?

Looking at my favorite projects:

Zookeeper defaults to /tmp/zookeeper
HDFS defaults to an empty configuration file and will not start without an 
explicit disk location.
MySQL defaults to a data/ or var/ directory under the base_dir (depending on 
whether its a binary package or compiled from source...)

Lots of standards, I think we are free to choose one :)


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30403/#review70168
---


On Jan. 29, 2015, 6:24 a.m., Jaikiran Pai wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30403/
> ---
> 
> (Updated Jan. 29, 2015, 6:24 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1906
> https://issues.apache.org/jira/browse/KAFKA-1906
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1906 Default the Kafka data log directory to 
> $KAFKA_HOME/data/kafka-logs directory, where KAFKA_HOME is the Kafka 
> installation directory
> 
> 
> Diffs
> -
> 
>   bin/kafka-run-class.sh 881f578a8f5c796fe23415b978c1ad35869af

Re: Review Request 28769: Patch for KAFKA-1809

2015-01-29 Thread Don Bosco Durai
+1

I also feel, having security.* would be easy going forward.

Thanks

Bosco


On 1/29/15, 6:08 AM, "Jeff Holoman"  wrote:

>
>
>> On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote:
>> > core/src/main/scala/kafka/server/KafkaConfig.scala, lines 182-183
>> > 
>>>2>
>> >
>> > Since this is also used for communication btw the controller and
>>the brokers, perhaps it's better named as sth like
>>"intra.broker.security.protocol"?
>
>Maybe it makese sense to prepend all security related configs with
>"security, eg: "security.intra.broker.protocol",
>"security.new.param.for.future.jira" With all of the upcoming changes it
>would make security realted configs easy to spot.
>
>
>- Jeff
>
>
>---
>This is an automatically generated e-mail. To reply, visit:
>https://reviews.apache.org/r/28769/#review69281
>---
>
>
>On Jan. 28, 2015, 6:26 p.m., Gwen Shapira wrote:
>> 
>> ---
>> This is an automatically generated e-mail. To reply, visit:
>> https://reviews.apache.org/r/28769/
>> ---
>> 
>> (Updated Jan. 28, 2015, 6:26 p.m.)
>> 
>> 
>> Review request for kafka.
>> 
>> 
>> Bugs: KAFKA-1809
>> https://issues.apache.org/jira/browse/KAFKA-1809
>> 
>> 
>> Repository: kafka
>> 
>> 
>> Description
>> ---
>> 
>> KAFKA-1890 Fix bug preventing Mirror Maker from successful rebalance;
>>reviewed by Gwen Shapira and Neha Narkhede
>> 
>> 
>> first commit of refactoring.
>> 
>> 
>> changed topicmetadata to include brokerendpoints and fixed few unit
>>tests
>> 
>> 
>> fixing systest and support for binding to default address
>> 
>> 
>> fixed system tests
>> 
>> 
>> fix default address binding and ipv6 support
>> 
>> 
>> fix some issues regarding endpoint parsing. Also, larger segments for
>>systest make the validation much faster
>> 
>> 
>> added link to security wiki in doc
>> 
>> 
>> fixing unit test after rename of ProtocolType to SecurityProtocol
>> 
>> 
>> Following Joe's advice, added security protocol enum on client side,
>>and modified protocol to use ID instead of string.
>> 
>> 
>> validate producer config against enum
>> 
>> 
>> add a second protocol for testing and modify SocketServerTests to check
>>on multi-ports
>> 
>> 
>> Reverted the metadata request changes and removed the explicit security
>>protocol argument. Instead the socketserver will determine the protocol
>>based on the port and add this to the request
>> 
>> 
>> bump version for UpdateMetadataRequest and added support for rolling
>>upgrades with new config
>> 
>> 
>> following tests - fixed LeaderAndISR protocol and ZK registration for
>>backward compatibility
>> 
>> 
>> cleaned up some changes that were actually not necessary. hopefully
>>making this patch slightly easier to review
>> 
>> 
>> undoing some changes that don't belong here
>> 
>> 
>> bring back config lost in cleanup
>> 
>> 
>> fixes neccessary for an all non-plaintext cluster to work
>> 
>> 
>> minor modifications following comments by Jun
>> 
>> 
>> added missing license
>> 
>> 
>> formatting
>> 
>> 
>> clean up imports
>> 
>> 
>> cleaned up V2 to not include host+port field. Using use.new.protocol
>>flag to decide which version to serialize
>> 
>> 
>> change endpoints collection in Broker to Map[protocol, endpoint],
>>mostly to be clear that we intend to have one endpoint per protocol
>> 
>> 
>> validate that listeners and advertised listeners have unique ports and
>>protocols
>> 
>> 
>> support legacy configs
>> 
>> 
>> some fixes following rebase
>> 
>> 
>> Reverted to backward compatible zk registration, changed
>>use.new.protocol to support multiple versions and few minor fixes
>> 
>> 
>> Diffs
>> -
>> 
>>   
>>clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.ja
>>va 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75
>>   
>>clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
>>PRE-CREATION 
>>   
>>clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.j
>>ava PRE-CREATION 
>>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java
>>527dd0f9c47fce7310b7a37a9b95bf87f1b9c292
>>   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
>>a39fab532f73148316a56c0f8e9197f38ea66f79
>>   config/server.properties 1614260b71a658b405bb24157c8f12b1f1031aa5
>>   core/src/main/scala/kafka/admin/AdminUtils.scala
>>28b12c7b89a56c113b665fbde1b95f873f8624a3
>>   core/src/main/scala/kafka/admin/TopicCommand.scala
>>285c0333ff43543d3e46444c1cd9374bb883bb59
>>   core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
>>84f60178f6ebae735c8aa3e79ed93fe21ac4aea7
>>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
>>4ff7e8f8cc695551dd5d2fe65c74f6b6c571e340
>>   core/src/main/scala/kafka/api/TopicMetadata.scala
>>0190076df0adf90

Re: Review Request 28108: Patch for KAFKA-1664

2015-01-29 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28108/
---

(Updated Jan. 29, 2015, 6:26 p.m.)


Review request for kafka.


Summary (updated)
-

Patch for KAFKA-1664


Bugs: KAFKA-1664
https://issues.apache.org/jira/browse/KAFKA-1664


Repository: kafka


Description
---

KAFKA-1664: Kafka does not properly parse multiple ZK nodes with non-root chroot


Diffs (updated)
-

  core/src/main/scala/kafka/utils/ZkUtils.scala 
c14bd455b6642f5e6eb254670bef9f57ae41d6cb 
  core/src/test/scala/unit/kafka/zk/ZKPathTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/28108/diff/


Testing
---

Tested with and without the fix.


Thanks,

Ashish Singh



[jira] [Updated] (KAFKA-1664) Kafka does not properly parse multiple ZK nodes with non-root chroot

2015-01-29 Thread Ashish Kumar Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Kumar Singh updated KAFKA-1664:
--
Attachment: KAFKA-1664_2015-01-29_10:26:20.patch

> Kafka does not properly parse multiple ZK nodes with non-root chroot
> 
>
> Key: KAFKA-1664
> URL: https://issues.apache.org/jira/browse/KAFKA-1664
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Ricky Saltzer
>Assignee: Ashish Kumar Singh
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-1664.1.patch, KAFKA-1664.2.patch, 
> KAFKA-1664.patch, KAFKA-1664_2015-01-29_10:26:20.patch
>
>
> When using a non-root ZK directory for Kafka, if you specify multiple ZK 
> servers, Kafka does not seem to properly parse the connection string. 
> *Error*
> {code}
> [root@hodor-001 bin]# ./kafka-console-consumer.sh --zookeeper 
> baelish-001.edh.cloudera.com:2181/kafka,baelish-002.edh.cloudera.com:2181/kafka,baelish-003.edh.cloudera.com:2181/kafka
>  --topic test-topic
> [2014-10-01 15:31:04,629] ERROR Error processing message, stopping consumer:  
> (kafka.consumer.ConsoleConsumer$)
> java.lang.IllegalArgumentException: Path length must be > 0
>   at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
>   at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
>   at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:766)
>   at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
>   at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
>   at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>   at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
>   at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:245)
>   at kafka.utils.ZkUtils$.createEphemeralPath(ZkUtils.scala:256)
>   at 
> kafka.utils.ZkUtils$.createEphemeralPathExpectConflict(ZkUtils.scala:268)
>   at 
> kafka.utils.ZkUtils$.createEphemeralPathExpectConflictHandleZKBug(ZkUtils.scala:306)
>   at 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$registerConsumerInZK(ZookeeperConsumerConnector.scala:226)
>   at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.(ZookeeperConsumerConnector.scala:755)
>   at 
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:145)
>   at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
>   at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}
> *Working*
> {code}
> [root@hodor-001 bin]# ./kafka-console-consumer.sh --zookeeper 
> baelish-001.edh.cloudera.com:2181/kafka --topic test-topic
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1664) Kafka does not properly parse multiple ZK nodes with non-root chroot

2015-01-29 Thread Ashish Kumar Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14297291#comment-14297291
 ] 

Ashish Kumar Singh commented on KAFKA-1664:
---

Updated reviewboard https://reviews.apache.org/r/28108/
 against branch trunk

> Kafka does not properly parse multiple ZK nodes with non-root chroot
> 
>
> Key: KAFKA-1664
> URL: https://issues.apache.org/jira/browse/KAFKA-1664
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Ricky Saltzer
>Assignee: Ashish Kumar Singh
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-1664.1.patch, KAFKA-1664.2.patch, 
> KAFKA-1664.patch, KAFKA-1664_2015-01-29_10:26:20.patch
>
>
> When using a non-root ZK directory for Kafka, if you specify multiple ZK 
> servers, Kafka does not seem to properly parse the connection string. 
> *Error*
> {code}
> [root@hodor-001 bin]# ./kafka-console-consumer.sh --zookeeper 
> baelish-001.edh.cloudera.com:2181/kafka,baelish-002.edh.cloudera.com:2181/kafka,baelish-003.edh.cloudera.com:2181/kafka
>  --topic test-topic
> [2014-10-01 15:31:04,629] ERROR Error processing message, stopping consumer:  
> (kafka.consumer.ConsoleConsumer$)
> java.lang.IllegalArgumentException: Path length must be > 0
>   at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
>   at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
>   at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:766)
>   at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
>   at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
>   at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>   at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
>   at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:245)
>   at kafka.utils.ZkUtils$.createEphemeralPath(ZkUtils.scala:256)
>   at 
> kafka.utils.ZkUtils$.createEphemeralPathExpectConflict(ZkUtils.scala:268)
>   at 
> kafka.utils.ZkUtils$.createEphemeralPathExpectConflictHandleZKBug(ZkUtils.scala:306)
>   at 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$registerConsumerInZK(ZookeeperConsumerConnector.scala:226)
>   at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.(ZookeeperConsumerConnector.scala:755)
>   at 
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:145)
>   at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
>   at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}
> *Working*
> {code}
> [root@hodor-001 bin]# ./kafka-console-consumer.sh --zookeeper 
> baelish-001.edh.cloudera.com:2181/kafka --topic test-topic
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28108: Patch for KAFKA-1664

2015-01-29 Thread Ashish Singh


> On Jan. 26, 2015, 1:54 a.m., Neha Narkhede wrote:
> > Sorry for the late review. Can check this in once you get a chance to 
> > address these review suggestions.

Thanks for the review. Addressed the concerns.


> On Jan. 26, 2015, 1:54 a.m., Neha Narkhede wrote:
> > core/src/test/scala/unit/kafka/zk/ZKPathTest.scala, line 63
> > 
> >
> > typo: zkConnectWithInvaidRoot

Good catch!


- Ashish


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28108/#review69582
---


On Jan. 29, 2015, 6:26 p.m., Ashish Singh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28108/
> ---
> 
> (Updated Jan. 29, 2015, 6:26 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1664
> https://issues.apache.org/jira/browse/KAFKA-1664
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1664: Kafka does not properly parse multiple ZK nodes with non-root 
> chroot
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> c14bd455b6642f5e6eb254670bef9f57ae41d6cb 
>   core/src/test/scala/unit/kafka/zk/ZKPathTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/28108/diff/
> 
> 
> Testing
> ---
> 
> Tested with and without the fix.
> 
> 
> Thanks,
> 
> Ashish Singh
> 
>



Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-01-29 Thread Jiangjie Qin
Hi Bhavesh,

I think it is the right discussion to have when we are talking about the
new new design for MM.
Please see the inline comments.

Jiangjie (Becket) Qin

On 1/28/15, 10:48 PM, "Bhavesh Mistry"  wrote:

>Hi Jiangjie,
>
>I just wanted to let you know about our use case and stress the point that
>local data center broker cluster have fewer partitions than the
>destination
>offline broker cluster. Just because we do the batch pull from CAMUS and
>in
>order to drain data faster than the injection rate (from four DCs for same
>topic).
Keeping the same partition number in source and target cluster will be an
option but will not be enforced by default.
>
>We are facing following issues (probably due to configuration):
>
>1)  We occasionally loose data due to message batch size is too large
>(2MB) on target data (we are using old producer but I think new producer
>will solve this problem to some extend).
We do see this issue in LinkedIn as well. New producer also might have
this issue. There are some proposal of solutions, but no real work started
yet. For now, as a workaround, setting a more aggressive batch size on
producer side should work.
>2)  Since only one instance is set to MM data,  we are not able to
>set-up ack per topic instead ack is attached to producer instance.
I don’t quite get the question here.
>3)  How are you going to address two phase commit problem if ack is
>set
>to strongest, but auto commit is on for consumer (meaning producer does
>not
>get ack,  but consumer auto committed offset that message).  Is there
>transactional (Kafka transaction is in process) based ack and commit
>offset
>?
Auto offset commit should be turned off in this case. The offset will only
be committed once by the offset commit thread. So there is no two phase
commit.
>4)  How are you planning to avoid duplicated message?  ( Is
>brokergoing
>have moving window of message collected and de-dupe ?)  Possibly, we get
>this from retry set to 5…?
We are not trying to completely avoid duplicates. The duplicates will
still be there if:
1. Producer retries on failure.
2. Mirror maker is hard killed.
Currently, dedup is expected to be done by user if necessary.
>5)  Last, is there any warning or any thing you can provide insight
>from MM component about data injection rate into destination partitions is
>NOT evenly distributed regardless  of  keyed or non-keyed message (Hence
>there is ripple effect such as data not arriving late, or data is arriving
>out of order in  intern of time stamp  and early some time, and CAMUS
>creates huge number of file count on HDFS due to uneven injection rate .
>Camus Job is  configured to run every 3 minutes.)
I think uneven data distribution is typically caused by server side
unbalance, instead of something mirror maker could control. In new mirror
maker, however, there is a customizable message handler, that might be
able to help a little bit. In message handler, you can explicitly set a
partition that you want to produce the message to. So if you know the
uneven data distribution in target cluster, you may offset it here. But
that probably only works for non-keyed messages.
>
>I am not sure if this is right discussion form to bring these to
>your/kafka
>Dev team attention.  This might be off track,
>
>
>Thanks,
>
>Bhavesh
>
>On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin 
>wrote:
>
>> I’ve updated the KIP page. Feedbacks are welcome.
>>
>> Regarding the simple mirror maker design. I thought over it and have
>>some
>> worries:
>> There are two things that might worth thinking:
>> 1. One of the enhancement to mirror maker is adding a message handler to
>> do things like reformatting. I think we might potentially want to have
>> more threads processing the messages than the number of consumers. If we
>> follow the simple mirror maker solution, we lose this flexibility.
>> 2. This might not matter too much, but creating more consumers means
>>more
>> footprint of TCP connection / memory.
>>
>> Any thoughts on this?
>>
>> Thanks.
>>
>> Jiangjie (Becket) Qin
>>
>> On 1/26/15, 10:35 AM, "Jiangjie Qin"  wrote:
>>
>> >Hi Jay and Neha,
>> >
>> >Thanks a lot for the reply and explanation. I do agree it makes more
>>sense
>> >to avoid duplicate effort and plan based on new consumer. I’ll modify
>>the
>> >KIP.
>> >
>> >To Jay’s question on message ordering - The data channel selection
>>makes
>> >sure that the messages from the same source partition will sent by the
>> >same producer. So the order of the messages is guaranteed with proper
>> >producer settings (MaxInFlightRequests=1,retries=Integer.MaxValue,
>>etc.)
>> >For keyed messages, because they come from the same source partition
>>and
>> >will end up in the same target partition, as long as they are sent by
>>the
>> >same producer, the order is guaranteed.
>> >For non-keyed messages, the messages coming from the same source
>>partition
>> >might go to different target partitions. The order is only guaranteed
>> >withi

Re: Review Request 30259: Add static code coverage reporting capability

2015-01-29 Thread Ashish Singh


> On Jan. 27, 2015, 1:39 a.m., Eric Olander wrote:
> > core/src/main/scala/kafka/utils/ZkUtils.scala, line 17
> > 
> >
> > Are there any open issues against scoverage that would explain why it 
> > can't instrument this class?  If not, it might be worth contacting that 
> > project to see if they have any ideas why it blows up on this class.  
> > Probably would be good to add a TODO explaining that once scoverage can 
> > process this class the $COVERAGE-OFF$ should be removed.

I actually planned to put the todo and completely forgot about it. Thanks for 
pointing this out.

I contacted gradle-scoverage guy to get insight into why is this not working, 
but suggestion was to skip this for now. However, I have not created an issue 
on the project. I will create an issue and put that in the TODO as well.


- Ashish


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30259/#review69727
---


On Jan. 25, 2015, 8:47 p.m., Ashish Singh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30259/
> ---
> 
> (Updated Jan. 25, 2015, 8:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1722
> https://issues.apache.org/jira/browse/KAFKA-1722
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1722: Add static code coverage capability
> 
> 
> Diffs
> -
> 
>   build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> c14bd455b6642f5e6eb254670bef9f57ae41d6cb 
> 
> Diff: https://reviews.apache.org/r/30259/diff/
> 
> 
> Testing
> ---
> 
> How to run: ./gradlew sonarRunner -PscalaVersion=2.11
> 
> Note that if you do not have sonarqube running on your system. The 
> sonarRunner task will fail, but it would have generated coverage reports for 
> core and clients at core/build/reports/scoverage/ and 
> clients/build/reports/jacocoHtml respectively. Open index.html in any of 
> those dirs to see the coverage.
> 
> Once gradle-scoverage starts publishing scoverage report, a single report 
> generated from sonar will be available.
> 
> 
> Thanks,
> 
> Ashish Singh
> 
>



Re: Review Request 30259: Patch for KAFKA-1722

2015-01-29 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30259/
---

(Updated Jan. 29, 2015, 8:33 p.m.)


Review request for kafka.


Summary (updated)
-

Patch for KAFKA-1722


Bugs: KAFKA-1722
https://issues.apache.org/jira/browse/KAFKA-1722


Repository: kafka


Description
---

KAFKA-1722: Add static code coverage capability


Diffs (updated)
-

  build.gradle a980f61def59ae59dbd5e58050285a801b21674f 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
c14bd455b6642f5e6eb254670bef9f57ae41d6cb 

Diff: https://reviews.apache.org/r/30259/diff/


Testing
---

How to run: ./gradlew sonarRunner -PscalaVersion=2.11

Note that if you do not have sonarqube running on your system. The sonarRunner 
task will fail, but it would have generated coverage reports for core and 
clients at core/build/reports/scoverage/ and clients/build/reports/jacocoHtml 
respectively. Open index.html in any of those dirs to see the coverage.

Once gradle-scoverage starts publishing scoverage report, a single report 
generated from sonar will be available.


Thanks,

Ashish Singh



[jira] [Commented] (KAFKA-1722) static analysis code coverage for pci audit needs

2015-01-29 Thread Ashish Kumar Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14297604#comment-14297604
 ] 

Ashish Kumar Singh commented on KAFKA-1722:
---

Updated reviewboard https://reviews.apache.org/r/30259/
 against branch trunk

> static analysis code coverage for pci audit needs
> -
>
> Key: KAFKA-1722
> URL: https://issues.apache.org/jira/browse/KAFKA-1722
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Joe Stein
>Assignee: Ashish Kumar Singh
> Fix For: 0.9.0
>
> Attachments: KAFKA-1722.patch, KAFKA-1722_2015-01-29_12:33:01.patch, 
> Sonar's summary report.png, clients coverage.png, core coverage.png
>
>
> Code coverage is a measure used to describe the degree to which the source 
> code of a product is tested. A product with high code coverage has been more 
> thoroughly tested and has a lower chance of containing software bugs than a 
> product with low code coverage. Apart from PCI audit needs, increasing user 
> base of Kafka makes it important to increase code coverage of Kafka. 
> Something just can not be improved without being measured.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1722) static analysis code coverage for pci audit needs

2015-01-29 Thread Ashish Kumar Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Kumar Singh updated KAFKA-1722:
--
Attachment: KAFKA-1722_2015-01-29_12:33:01.patch

> static analysis code coverage for pci audit needs
> -
>
> Key: KAFKA-1722
> URL: https://issues.apache.org/jira/browse/KAFKA-1722
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Joe Stein
>Assignee: Ashish Kumar Singh
> Fix For: 0.9.0
>
> Attachments: KAFKA-1722.patch, KAFKA-1722_2015-01-29_12:33:01.patch, 
> Sonar's summary report.png, clients coverage.png, core coverage.png
>
>
> Code coverage is a measure used to describe the degree to which the source 
> code of a product is tested. A product with high code coverage has been more 
> thoroughly tested and has a lower chance of containing software bugs than a 
> product with low code coverage. Apart from PCI audit needs, increasing user 
> base of Kafka makes it important to increase code coverage of Kafka. 
> Something just can not be improved without being measured.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 30259: Patch for KAFKA-1722

2015-01-29 Thread Ashish Singh


> On Jan. 27, 2015, 1:39 a.m., Eric Olander wrote:
> > This is a nice improvement to the project.  Thanks!

Thanks for the review. Addressed your concern.


- Ashish


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30259/#review69727
---


On Jan. 29, 2015, 8:33 p.m., Ashish Singh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30259/
> ---
> 
> (Updated Jan. 29, 2015, 8:33 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1722
> https://issues.apache.org/jira/browse/KAFKA-1722
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1722: Add static code coverage capability
> 
> 
> Diffs
> -
> 
>   build.gradle a980f61def59ae59dbd5e58050285a801b21674f 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> c14bd455b6642f5e6eb254670bef9f57ae41d6cb 
> 
> Diff: https://reviews.apache.org/r/30259/diff/
> 
> 
> Testing
> ---
> 
> How to run: ./gradlew sonarRunner -PscalaVersion=2.11
> 
> Note that if you do not have sonarqube running on your system. The 
> sonarRunner task will fail, but it would have generated coverage reports for 
> core and clients at core/build/reports/scoverage/ and 
> clients/build/reports/jacocoHtml respectively. Open index.html in any of 
> those dirs to see the coverage.
> 
> Once gradle-scoverage starts publishing scoverage report, a single report 
> generated from sonar will be available.
> 
> 
> Thanks,
> 
> Ashish Singh
> 
>



Re: Review Request 27799: New consumer

2015-01-29 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27799/#review70291
---

Ship it!


- Guozhang Wang


On Jan. 29, 2015, 11:20 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27799/
> ---
> 
> (Updated Jan. 29, 2015, 11:20 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1760
> https://issues.apache.org/jira/browse/KAFKA-1760
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1760: New consumer.
> 
> 
> Diffs
> -
> 
>   build.gradle a980f61def59ae59dbd5e58050285a801b21674f 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> d32c319d8ee4c46dad309ea54b136ea9798e2fd7 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> 8aece7e81a804b177a6f2c12e2dc6c89c1613262 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
> ab7e3220f9b76b92ef981d695299656f041ad5ed 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 397695568d3fd8e835d8f923a89b3b00c96d0ead 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 
>   clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 
> 752a979ea0b8bde7ff6d2e1a23bf54052305d841 
>   clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> c0c636b3e1ba213033db6d23655032c9bbd5e378 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 57c1807ccba9f264186f83e91f37c34b959c8060 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
>  e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
> 16af70a5de52cca786fdea147a6a639b7dc4a311 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 76efc216c9e6c3ab084461d792877092a189ad0f 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> fa88ac1a8b19b4294f211c4467fe68c7707ddbae 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java 
> ea423ad15eebd262d20d5ec05d592cc115229177 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> fc71710dd5997576d3841a1c3b0f7e19a8c9698e 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 904976fadf0610982958628eaee810b60a98d725 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
>  dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
>  483899d2e69b33655d0e08949f5f64af2519660a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> ccc03d8447ebba40131a70e16969686ac4aab58a 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> d3299b944062d96852452de455902659ad8af757 
>   clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
> b15aa2c3ef2d7c4b24618ff42fd4da324237a813 
>   clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
> 98cb79b701918eca3f6ca9823b6c7b7c97b3ecec 
>   clients/src/main/java/org/apache/kafka/common/errors/ApiException.java 
> 7c948b166a8ac07616809f260754116ae7764973 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> b68bbf00ab8eba6c5867d346c91188142593ca6e 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> 74d695ba39de44b6a3d15340ec0114bc4fce2ba2 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 3316b6a1098311b8603a4a5893bf57b75d2e43cb 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
> 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/record/LogEntry.java 
> e4d68

[jira] [Commented] (KAFKA-1760) Implement new consumer client

2015-01-29 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14297847#comment-14297847
 ] 

Guozhang Wang commented on KAFKA-1760:
--

LGTM, let's commit to trunk now.

> Implement new consumer client
> -
>
> Key: KAFKA-1760
> URL: https://issues.apache.org/jira/browse/KAFKA-1760
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Fix For: 0.8.3
>
> Attachments: KAFKA-1760.patch, KAFKA-1760_2015-01-11_16:57:15.patch, 
> KAFKA-1760_2015-01-18_19:10:13.patch, KAFKA-1760_2015-01-21_08:42:20.patch, 
> KAFKA-1760_2015-01-22_10:03:26.patch, KAFKA-1760_2015-01-22_20:21:56.patch, 
> KAFKA-1760_2015-01-23_13:13:00.patch, KAFKA-1760_2015-01-29_03:20:20.patch
>
>
> Implement a consumer client.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] 0.8.2.0 Candidate 3

2015-01-29 Thread Jiangjie Qin
In meetup we said that KAFKA-1650 and follow up patches is included in
0.8.2, but it seems not on the list.


On 1/29/15, 1:01 AM, "Magnus Edenhill"  wrote:

>+1 on librdkafka interop
>
>Minor nitpick:
> KAFKA-1781 (state required gradle version in README)  is included in the
>Release notes but is not actually fixed
>
>
>2015-01-29 6:22 GMT+01:00 Jun Rao :
>
>> This is the third candidate for release of Apache Kafka 0.8.2.0.
>>
>> Release Notes for the 0.8.2.0 release
>>
>> 
>>https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.
>>html
>>
>> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
>> (SHA256)
>> checksum.
>>
>> * Release artifacts to be voted upon (source and binary):
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
>>
>> * Maven artifacts to be voted upon prior to release:
>> https://repository.apache.org/content/groups/staging/
>>
>> * scala-doc
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
>>
>> * java-doc
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
>>
>> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
>>
>> 
>>https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0
>>dab378cc411f4938a9cea1eb7ea
>> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
>>
>> /***
>>
>> Thanks,
>>
>> Jun
>>



Re: [VOTE] 0.8.2.0 Candidate 3

2015-01-29 Thread Jun Rao
I think we were just confirming whether that issue was fixed in 0.8.2 not.
Given that this issue only happens in unclean shutdown, I don't think it's
a blocker for 0.8.2. Also, the patch is not trivial and it's better to test
it out a bit longer in trunk.

Thanks,

Jun

On Thu, Jan 29, 2015 at 5:36 PM, Jiangjie Qin 
wrote:

> In meetup we said that KAFKA-1650 and follow up patches is included in
> 0.8.2, but it seems not on the list.
>
>
> On 1/29/15, 1:01 AM, "Magnus Edenhill"  wrote:
>
> >+1 on librdkafka interop
> >
> >Minor nitpick:
> > KAFKA-1781 (state required gradle version in README)  is included in the
> >Release notes but is not actually fixed
> >
> >
> >2015-01-29 6:22 GMT+01:00 Jun Rao :
> >
> >> This is the third candidate for release of Apache Kafka 0.8.2.0.
> >>
> >> Release Notes for the 0.8.2.0 release
> >>
> >>
> >>https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES
> .
> >>html
> >>
> >> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
> >>
> >> Kafka's KEYS file containing PGP keys we use to sign the release:
> >> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
> >> (SHA256)
> >> checksum.
> >>
> >> * Release artifacts to be voted upon (source and binary):
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
> >>
> >> * Maven artifacts to be voted upon prior to release:
> >> https://repository.apache.org/content/groups/staging/
> >>
> >> * scala-doc
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
> >>
> >> * java-doc
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
> >>
> >> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
> >>
> >>
> >>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0
> >>dab378cc411f4938a9cea1eb7ea
> >> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
> >>
> >> /***
> >>
> >> Thanks,
> >>
> >> Jun
> >>
>
>


[jira] [Commented] (KAFKA-1729) add doc for Kafka-based offset management in 0.8.2

2015-01-29 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14298059#comment-14298059
 ] 

Joel Koshy commented on KAFKA-1729:
---

Thanks Jun - will do. When you get a chance, take a look at the attached doc 
patch as well which incorporates your comments.

> add doc for Kafka-based offset management in 0.8.2
> --
>
> Key: KAFKA-1729
> URL: https://issues.apache.org/jira/browse/KAFKA-1729
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jun Rao
>Assignee: Joel Koshy
> Fix For: 0.8.2
>
> Attachments: KAFKA-1729.patch, KAFKA-1782-doc-v1.patch, 
> KAFKA-1782-doc-v2.patch, KAFKA-1782-doc-v3.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1905) KafkaProducer's performance could be halved when MaxInFlightRequest is set to 1

2015-01-29 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned KAFKA-1905:
---

Assignee: Jiangjie Qin

> KafkaProducer's performance could be halved when MaxInFlightRequest is set to 
> 1
> ---
>
> Key: KAFKA-1905
> URL: https://issues.apache.org/jira/browse/KAFKA-1905
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>
> In KafkaProducer, the following logic is used in each poll():
> 1. Get a list of nodes who has a batch available for sending
> 2. Filter the list to remove the node which is not ready to receive a new 
> request (MaxInFlightRequests is checked here) 
> 3. Compose the requests for the nodes in the filtered list, i.e. has a batch 
> to send and also ready to receive.
> 4. Increase InFlightRequests, send the requests and get the responses of 
> previous send.
> 5. handle all receives and decrease the inFlightRequests.
> In this case, when MaxInFlightRequest is set to 1, since we are checking the 
> InFlightRequests before each receive, even if we have already received the 
> response, the node will still be considered not ready. So for a sequence of 
> poll, we end up in the PollForSend - PollForReceive - PollForSend... pattern. 
> Which essentially halved the throughput in a fast network. Ideally we should 
> check whether node is ready after we check all the receives.
> Here are the some logs that shows this situation when I run 
> kafka-producer-perf-test locally.
> -1st poll for send, no receive--
> [2015-01-28 13:54:06,009] INFO Nodes with data ready to send: [Node(0, 
> jqin-ld1.linkedin.biz, 9092)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
>  recordCount=15)}, 
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1074,client_id=producer-performance},
>  
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=15780 cap=16384]}]}]}))] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO responses #: 0 
> (org.apache.kafka.clients.producer.internals.Sender)
> -- 2nd poll for receive, no send--
> [2015-01-28 13:54:06,009] INFO No ready nodes, timeout = 9223372036854775807 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO responses #: 1 
> (org.apache.kafka.clients.producer.internals.Sender)
> -- 3rd poll for send, no receive--
> [2015-01-28 13:54:06,010] INFO Nodes with data ready to send: [Node(0, 
> jqin-ld1.linkedin.biz, 9092)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
>  recordCount=15)}, 
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1075,client_id=producer-performance},
>  
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=15780 cap=16384]}]}]}))] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO responses #: 0 
> (org.apache.kafka.clients.producer.internals.Sender)
>  4th poll for receive, no send
> [2015-01-28 13:54:06,010] INFO No ready nodes, timeout = 9223372036854775807 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO responses #: 1 
> (org.apache.kafka.clients.producer.internals.Sender)
>  5th poll for send, no receive
> [2015-01-28 13:54:06,011] INFO Nodes with data ready to send: [Node(0, 
> jqin-ld1.linkedin.biz, 9092)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,011] INFO Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
>  recordCount=15)}, 
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1076,client_id=producer-performance},
>  
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=15780 cap=16384]}]}]}))] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,011] INFO responses #: 0 
> (org.apache.kafka.clients.producer.internals.Sender)
>  6th poll for receive, no send-
> [2015-01-28 13:54:06,011] INFO No ready nodes, timeout = 9223372036854775807

[jira] [Commented] (KAFKA-1905) KafkaProducer's performance could be halved when MaxInFlightRequest is set to 1

2015-01-29 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14298106#comment-14298106
 ] 

Jay Kreps commented on KAFKA-1905:
--

It's not every day you double performance by moving a few lines of code 
around...

> KafkaProducer's performance could be halved when MaxInFlightRequest is set to 
> 1
> ---
>
> Key: KAFKA-1905
> URL: https://issues.apache.org/jira/browse/KAFKA-1905
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>
> In KafkaProducer, the following logic is used in each poll():
> 1. Get a list of nodes who has a batch available for sending
> 2. Filter the list to remove the node which is not ready to receive a new 
> request (MaxInFlightRequests is checked here) 
> 3. Compose the requests for the nodes in the filtered list, i.e. has a batch 
> to send and also ready to receive.
> 4. Increase InFlightRequests, send the requests and get the responses of 
> previous send.
> 5. handle all receives and decrease the inFlightRequests.
> In this case, when MaxInFlightRequest is set to 1, since we are checking the 
> InFlightRequests before each receive, even if we have already received the 
> response, the node will still be considered not ready. So for a sequence of 
> poll, we end up in the PollForSend - PollForReceive - PollForSend... pattern. 
> Which essentially halved the throughput in a fast network. Ideally we should 
> check whether node is ready after we check all the receives.
> Here are the some logs that shows this situation when I run 
> kafka-producer-perf-test locally.
> -1st poll for send, no receive--
> [2015-01-28 13:54:06,009] INFO Nodes with data ready to send: [Node(0, 
> jqin-ld1.linkedin.biz, 9092)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
>  recordCount=15)}, 
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1074,client_id=producer-performance},
>  
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=15780 cap=16384]}]}]}))] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO responses #: 0 
> (org.apache.kafka.clients.producer.internals.Sender)
> -- 2nd poll for receive, no send--
> [2015-01-28 13:54:06,009] INFO No ready nodes, timeout = 9223372036854775807 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO responses #: 1 
> (org.apache.kafka.clients.producer.internals.Sender)
> -- 3rd poll for send, no receive--
> [2015-01-28 13:54:06,010] INFO Nodes with data ready to send: [Node(0, 
> jqin-ld1.linkedin.biz, 9092)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
>  recordCount=15)}, 
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1075,client_id=producer-performance},
>  
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=15780 cap=16384]}]}]}))] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO responses #: 0 
> (org.apache.kafka.clients.producer.internals.Sender)
>  4th poll for receive, no send
> [2015-01-28 13:54:06,010] INFO No ready nodes, timeout = 9223372036854775807 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO responses #: 1 
> (org.apache.kafka.clients.producer.internals.Sender)
>  5th poll for send, no receive
> [2015-01-28 13:54:06,011] INFO Nodes with data ready to send: [Node(0, 
> jqin-ld1.linkedin.biz, 9092)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,011] INFO Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
>  recordCount=15)}, 
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1076,client_id=producer-performance},
>  
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=15780 cap=16384]}]}]}))] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,011] INFO responses #: 0 
> (org.apache.kafka.clients.producer.internals.Sender)
>  6th poll for

Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-01-29 Thread Bhavesh Mistry
Hi Jiangjie,

Thanks for the input.

a) Is MM will  producer ack will be attach to Producer Instance or per
topic.  Use case is that one instance of MM
needs to handle both strong ack and also ack=0 for some topic.  Or it would
be better to set-up another instance of MM.

b) Regarding TCP connections, Why does #producer instance attach to TCP
connection.  Is it possible to use Broker Connection TCP Pool, producer
will just checkout TCP connection  to Broker.  So, # of Producer Instance
does not correlation to Brokers Connection.  Is this possible ?


Thanks,

Bhavesh

On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin 
wrote:

> Hi Bhavesh,
>
> I think it is the right discussion to have when we are talking about the
> new new design for MM.
> Please see the inline comments.
>
> Jiangjie (Becket) Qin
>
> On 1/28/15, 10:48 PM, "Bhavesh Mistry"  wrote:
>
> >Hi Jiangjie,
> >
> >I just wanted to let you know about our use case and stress the point that
> >local data center broker cluster have fewer partitions than the
> >destination
> >offline broker cluster. Just because we do the batch pull from CAMUS and
> >in
> >order to drain data faster than the injection rate (from four DCs for same
> >topic).
> Keeping the same partition number in source and target cluster will be an
> option but will not be enforced by default.
> >
> >We are facing following issues (probably due to configuration):
> >
> >1)  We occasionally loose data due to message batch size is too large
> >(2MB) on target data (we are using old producer but I think new producer
> >will solve this problem to some extend).
> We do see this issue in LinkedIn as well. New producer also might have
> this issue. There are some proposal of solutions, but no real work started
> yet. For now, as a workaround, setting a more aggressive batch size on
> producer side should work.
> >2)  Since only one instance is set to MM data,  we are not able to
> >set-up ack per topic instead ack is attached to producer instance.
> I don’t quite get the question here.
> >3)  How are you going to address two phase commit problem if ack is
> >set
> >to strongest, but auto commit is on for consumer (meaning producer does
> >not
> >get ack,  but consumer auto committed offset that message).  Is there
> >transactional (Kafka transaction is in process) based ack and commit
> >offset
> >?
> Auto offset commit should be turned off in this case. The offset will only
> be committed once by the offset commit thread. So there is no two phase
> commit.
> >4)  How are you planning to avoid duplicated message?  ( Is
> >brokergoing
> >have moving window of message collected and de-dupe ?)  Possibly, we get
> >this from retry set to 5…?
> We are not trying to completely avoid duplicates. The duplicates will
> still be there if:
> 1. Producer retries on failure.
> 2. Mirror maker is hard killed.
> Currently, dedup is expected to be done by user if necessary.
> >5)  Last, is there any warning or any thing you can provide insight
> >from MM component about data injection rate into destination partitions is
> >NOT evenly distributed regardless  of  keyed or non-keyed message (Hence
> >there is ripple effect such as data not arriving late, or data is arriving
> >out of order in  intern of time stamp  and early some time, and CAMUS
> >creates huge number of file count on HDFS due to uneven injection rate .
> >Camus Job is  configured to run every 3 minutes.)
> I think uneven data distribution is typically caused by server side
> unbalance, instead of something mirror maker could control. In new mirror
> maker, however, there is a customizable message handler, that might be
> able to help a little bit. In message handler, you can explicitly set a
> partition that you want to produce the message to. So if you know the
> uneven data distribution in target cluster, you may offset it here. But
> that probably only works for non-keyed messages.
> >
> >I am not sure if this is right discussion form to bring these to
> >your/kafka
> >Dev team attention.  This might be off track,
> >
> >
> >Thanks,
> >
> >Bhavesh
> >
> >On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin  >
> >wrote:
> >
> >> I’ve updated the KIP page. Feedbacks are welcome.
> >>
> >> Regarding the simple mirror maker design. I thought over it and have
> >>some
> >> worries:
> >> There are two things that might worth thinking:
> >> 1. One of the enhancement to mirror maker is adding a message handler to
> >> do things like reformatting. I think we might potentially want to have
> >> more threads processing the messages than the number of consumers. If we
> >> follow the simple mirror maker solution, we lose this flexibility.
> >> 2. This might not matter too much, but creating more consumers means
> >>more
> >> footprint of TCP connection / memory.
> >>
> >> Any thoughts on this?
> >>
> >> Thanks.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 1/26/15, 10:35 AM, "Jiangjie Qin"  wrote:
> >>
> >> >Hi Jay and Neha,
> >> >
> >> >Than

Build failed in Jenkins: Kafka-trunk #384

2015-01-29 Thread Apache Jenkins Server
See 

Changes:

[jay.kreps] KAFKA-1760: New consumer.

--
[...truncated 1171 lines...]
kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[19] PASSED

kafka.log.LogCleanerIntegrationTest > cleanerTest PASSED

kafka.log.LogManagerTest > testCreateLog PASSED

kafka.log.LogManagerTest > testGetNonExistentLog PASSED

kafka.log.LogManagerTest > testCleanupExpiredSegments PASSED

kafka.log.LogManagerTest > testCleanupSegmentsToMaintainSize PASSED

kafka.log.LogManagerTest > testTimeBasedFlush PASSED

kafka.log.LogManagerTest > testLeastLoadedAssignment PASSED

kafka.log.LogManagerTest > testTwoLogManagersUsingSameDirFails PASSED

kafka.log.LogManagerTest > testCheckpointRecoveryPoints PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithTrailingSlash PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithRelativeDirectory 
PASSED

kafka.log.LogTest > testTimeBasedLogRoll PASSED

kafka.log.LogTest > testTimeBasedLogRollJitter PASSED

kafka.log.LogTest > testSizeBasedLogRoll PASSED

kafka.log.LogTest > testLoadEmptyLog PASSED

kafka.log.LogTest > testAppendAndReadWithSequentialOffsets PASSED

kafka.log.LogTest > testAppendAndReadWithNonSequentialOffsets PASSED

kafka.log.LogTest > testReadAtLogGap PASSED

kafka.log.LogTest > testReadOutOfRange PASSED

kafka.log.LogTest > testLogRolls PASSED

kafka.log.LogTest > testCompressedMessages PASSED

kafka.log.LogTest > testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED

kafka.log.LogTest > testMessageSetSizeCheck PASSED

kafka.log.LogTest > testMessageSizeCheck PASSED

kafka.log.LogTest > testLogRecoversToCorrectOffset PASSED

kafka.log.LogTest > testIndexRebuild PASSED

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testIndexResizingAtTruncation PASSED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved PASSED

kafka.log.LogTest > testReopenThenTruncate PASSED

kafka.log.LogTest > testAsyncDelete PASSED

kafka.log.LogTest > testOpenDeletesObsoleteFiles PASSED

kafka.log.LogTest > testAppendMessageWithNullPayload PASSED

kafka.log.LogTest > testCorruptLog PASSED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.LogTest > testParseTopicPartitionName PASSED

kafka.log.LogTest > testParseTopicPartitionNameForEmptyName PASSED

kafka.log.LogTest > testParseTopicPartitionNameForNull PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingSeparator PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingTopic PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingPartition PASSED

kafka.log.OffsetIndexTest > truncate PASSED

kafka.log.OffsetIndexTest > randomLookupTest PASSED

kafka.log.OffsetIndexTest > lookupExtremeCases PASSED

kafka.log.OffsetIndexTest > appendTooMany PASSED

kafka.log.OffsetIndexTest > appendOutOfOrder PASSED

kafka.log.OffsetIndexTest > testReopen PASSED

kafka.log.LogSegmentTest > testTruncate PASSED

kafka.log.LogSegmentTest > testReadOnEmptySegment PASSED

kafka.log.LogSegmentTest > testReadBeforeFirstOffset PASSED

kafka.log.LogSegmentTest > testMaxOffset PASSED

kafka.log.LogSegmentTest > testReadAfterLast PASSED

kafka.log.LogSegmentTest > testReadFromGap PASSED

kafka.log.LogSegmentTest > testTruncateFull PASSED

kafka.log.LogSegmentTest > testNextOffsetCalculation PASSED

kafka.log.LogSegmentTest > testChangeFileSuffixes PASSED

kafka.log.LogSegmentTest > testRecoveryFixesCorruptIndex PASSED

kafka.log.LogSegmentTest > testRecoveryWithCorruptMessage PASSED

kafka.log.CleanerTest > testCleanSegments PASSED

kafka.log.CleanerTest > testCleaningWithDeletes PASSED

kafka.log.CleanerTest > testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest > testSegmentGrouping PASSED

kafka.log.CleanerTest > testBuildOffsetMap PASSED

kafka.log.OffsetMapTest > testBasicValidation PASSED

kafka.log.OffsetMapTest > testClear PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED
:core:copyDependantLibs UP-TO-DATE
:core:jar
:examples:c

Build failed in Jenkins: Kafka-trunk #385

2015-01-29 Thread Apache Jenkins Server
See 

Changes:

[jay.kreps] KAFKA-1760 Follow-up: fix compilation issue with Scala 2.11

--
[...truncated 2177 lines...]
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:40)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:40)
at 
kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33)
at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:40)

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:40)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:40)
at 
kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33)
at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:40)

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.AutoOffsetResetTest.kafka$integration$KafkaServerTestHarness$$super$setUp(AutoOffsetResetTest.scala:32)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44)
at 
kafka.integration.AutoOffsetResetTest.setUp(AutoOffsetResetTest.scala:46)

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.AutoOffsetResetTest.kafka$integration$KafkaServerTestHarness$$super$setUp(AutoOffsetResetTest.scala:32)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44)
at 
kafka.integration.AutoOffsetResetTest.setUp(AutoOffsetResetTest.scala:46)

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.AutoOffsetReset