Review Request 30477: Patch for KAFKA-1907

2015-01-31 Thread Jaikiran Pai

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30477/
---

Review request for kafka.


Bugs: KAFKA-1907
https://issues.apache.org/jira/browse/KAFKA-1907


Repository: kafka


Description
---

KAFKA-1907 Introduce a custom ZkClient for Kafka which allows timeouts on 
operations. Also mark certain Kafka threads as daemon to allow proper JVM 
shutdown


Diffs
-

  core/src/main/scala/kafka/network/SocketServer.scala 
39b1651b680b2995cedfde95d74c086d9c6219ef 
  core/src/main/scala/kafka/server/DelayedOperation.scala 
fc06b01cad3a0497800df727fa2abf60772694f2 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
4acdd70fe9c1ee78d6510741006c2ece65450671 
  core/src/main/scala/kafka/server/KafkaServer.scala 
89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/utils/KafkaZkClient.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/Utils.scala 
738c1af9ef5de16fdf5130daab69757a14c48b5c 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
c14bd455b6642f5e6eb254670bef9f57ae41d6cb 

Diff: https://reviews.apache.org/r/30477/diff/


Testing
---


Thanks,

Jaikiran Pai



[jira] [Commented] (KAFKA-1907) ZkClient can block controlled shutdown indefinitely

2015-01-31 Thread jaikiran pai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14299832#comment-14299832
 ] 

jaikiran pai commented on KAFKA-1907:
-

Created reviewboard https://reviews.apache.org/r/30477/diff/
 against branch origin/trunk

> ZkClient can block controlled shutdown indefinitely
> ---
>
> Key: KAFKA-1907
> URL: https://issues.apache.org/jira/browse/KAFKA-1907
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2
>Reporter: Ewen Cheslack-Postava
> Attachments: KAFKA-1907.patch
>
>
> There are some calls to ZkClient via ZkUtils in 
> KafkaServer.controlledShutdown() that can block indefinitely because they 
> internally call waitUntilConnected. The ZkClient API doesn't provide an 
> alternative with timeouts, so fixing this will require enforcing timeouts in 
> some other way.
> This may be a more general issue if there are any non daemon threads that 
> also call ZkUtils methods.
> Stacktrace showing the issue:
> {code}
> "Thread-2" prio=10 tid=0xb3305000 nid=0x4758 waiting on condition [0x6ad69000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x70a93368> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:267)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2130)
> at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:456)
> at kafka.utils.ZkUtils$.getController(ZkUtils.scala:65)
> at 
> kafka.server.KafkaServer.kafka$server$KafkaServer$$controlledShutdown(KafkaServer.scala:194)
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$sp(KafkaServer.scala:269)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.Utils$.swallow(Utils.scala:45)
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:269)
> at 
> kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:42)
> at kafka.Kafka$$anon$1.run(Kafka.scala:42)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1907) ZkClient can block controlled shutdown indefinitely

2015-01-31 Thread jaikiran pai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jaikiran pai updated KAFKA-1907:

Attachment: KAFKA-1907.patch

> ZkClient can block controlled shutdown indefinitely
> ---
>
> Key: KAFKA-1907
> URL: https://issues.apache.org/jira/browse/KAFKA-1907
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2
>Reporter: Ewen Cheslack-Postava
> Attachments: KAFKA-1907.patch
>
>
> There are some calls to ZkClient via ZkUtils in 
> KafkaServer.controlledShutdown() that can block indefinitely because they 
> internally call waitUntilConnected. The ZkClient API doesn't provide an 
> alternative with timeouts, so fixing this will require enforcing timeouts in 
> some other way.
> This may be a more general issue if there are any non daemon threads that 
> also call ZkUtils methods.
> Stacktrace showing the issue:
> {code}
> "Thread-2" prio=10 tid=0xb3305000 nid=0x4758 waiting on condition [0x6ad69000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x70a93368> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:267)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2130)
> at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:456)
> at kafka.utils.ZkUtils$.getController(ZkUtils.scala:65)
> at 
> kafka.server.KafkaServer.kafka$server$KafkaServer$$controlledShutdown(KafkaServer.scala:194)
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$sp(KafkaServer.scala:269)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.Utils$.swallow(Utils.scala:45)
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:269)
> at 
> kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:42)
> at kafka.Kafka$$anon$1.run(Kafka.scala:42)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1907) ZkClient can block controlled shutdown indefinitely

2015-01-31 Thread jaikiran pai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jaikiran pai updated KAFKA-1907:

Assignee: jaikiran pai
  Status: Patch Available  (was: Open)

> ZkClient can block controlled shutdown indefinitely
> ---
>
> Key: KAFKA-1907
> URL: https://issues.apache.org/jira/browse/KAFKA-1907
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2
>Reporter: Ewen Cheslack-Postava
>Assignee: jaikiran pai
> Attachments: KAFKA-1907.patch
>
>
> There are some calls to ZkClient via ZkUtils in 
> KafkaServer.controlledShutdown() that can block indefinitely because they 
> internally call waitUntilConnected. The ZkClient API doesn't provide an 
> alternative with timeouts, so fixing this will require enforcing timeouts in 
> some other way.
> This may be a more general issue if there are any non daemon threads that 
> also call ZkUtils methods.
> Stacktrace showing the issue:
> {code}
> "Thread-2" prio=10 tid=0xb3305000 nid=0x4758 waiting on condition [0x6ad69000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x70a93368> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:267)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2130)
> at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:456)
> at kafka.utils.ZkUtils$.getController(ZkUtils.scala:65)
> at 
> kafka.server.KafkaServer.kafka$server$KafkaServer$$controlledShutdown(KafkaServer.scala:194)
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$sp(KafkaServer.scala:269)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.Utils$.swallow(Utils.scala:45)
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:269)
> at 
> kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:42)
> at kafka.Kafka$$anon$1.run(Kafka.scala:42)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Cannot stop Kafka server if zookeeper is shutdown first

2015-01-31 Thread Jaikiran Pai
Neha, Ewen (and others), my initial attempt to solve this is uploaded 
here https://reviews.apache.org/r/30477/. It solves the shutdown problem 
and now the server shuts down even when Zookeeper has gone down before 
the Kafka server.


I went with the approach of introducing a custom (enhanced) ZkClient 
which for now allows time outs to be optionally specified for certain 
operations. I intentionally haven't forced the use of this new 
KafkaZkClient all over the code and instead for now have just used it in 
the KafkaServer.


Does this patch look like something worth using?

-Jaikiran

On Thursday 29 January 2015 10:41 PM, Neha Narkhede wrote:

Ewen is right. ZkClient APIs are blocking and the right fix for this seems
to be patching ZkClient. At some point, if we find ourselves fiddling too
much with ZkClient, it wouldn't hurt to write our own little zookeeper
client wrapper.

On Thu, Jan 29, 2015 at 12:57 AM, Ewen Cheslack-Postava 
wrote:


Looks like a bug to me -- the underlying ZK library wraps a lot of blocking
method implementations with waitUntilConnected() calls without any
timeouts. Ideally we could just add a version of ZkUtils.getController()
with a timeout, but I don't see an easy way to accomplish that with
ZkClient.

There's at least one other call to ZkUtils besides the one in the
stacktrace you gave that would cause the same issue, possibly more that
aren't directly called in that method. One ugly solution would be to use an
extra thread during shutdown to trigger timeouts, but I'd imagine we
probably have other threads that could end up blocking in similar ways.

I filed https://issues.apache.org/jira/browse/KAFKA-1907 to track the
issue.


On Mon, Jan 26, 2015 at 6:35 AM, Jaikiran Pai 
wrote:


The main culprit is this thread which goes into "forever retry connection
to a closed zookeeper" when I shutdown Kafka (via a Ctrl + C) after
zookeeper has already been shutdown. I have attached the complete thread
dump, but I don't know if it will be delivered to the mailing list.

"Thread-2" prio=10 tid=0xb3305000 nid=0x4758 waiting on condition
[0x6ad69000]
java.lang.Thread.State: TIMED_WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0x70a93368> (a java.util.concurrent.locks.
AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.parkUntil(
LockSupport.java:267)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$
ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2130)
 at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
 at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
 at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
 at

org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)

 at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
 at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
 at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:456)
 at kafka.utils.ZkUtils$.getController(ZkUtils.scala:65)
 at kafka.server.KafkaServer.kafka$server$KafkaServer$$
controlledShutdown(KafkaServer.scala:194)
 at kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$
sp(KafkaServer.scala:269)
 at kafka.utils.Utils$.swallow(Utils.scala:172)
 at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
 at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
 at kafka.utils.Logging$class.swallow(Logging.scala:94)
 at kafka.utils.Utils$.swallow(Utils.scala:45)
 at kafka.server.KafkaServer.shutdown(KafkaServer.scala:269)
 at kafka.server.KafkaServerStartable.shutdown(
KafkaServerStartable.scala:42)
 at kafka.Kafka$$anon$1.run(Kafka.scala:42)

-Jaikiran


On Monday 26 January 2015 05:46 AM, Neha Narkhede wrote:


For a clean shutdown, the broker tries to talk to the controller and

also

issues reads to zookeeper. Possibly that is where it tries to reconnect

to

zk. It will help to look at the thread dump.

Thanks
Neha

On Fri, Jan 23, 2015 at 8:53 PM, Jaikiran Pai 
shutdown zookeeper first I can't shutdown Kafka server at all since it
goes
into a never ending attempt to reconnect with zookeeper. I had to kill
the
Kafka process to stop it. I tried it against trunk too and there too I
see
the same issue. Should I file a JIRA for this and see if I can come up
with
a patch?

FWIW, here's the unending (and IMO too frequent) attempts at trying to
reconnect. I've a thread dump too which shows that the other thread

which

is trying to complete a controlled shutdown of Kafka is blocked forever
for
the zookeeper to be up. I can attach it to the JIRA.

2015-01-24 10:15:46,278] WARN Session 0x14b1a413680 for server

null,

unexpected error, closing socket connection and attempting reconnect
(org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: Connection refused
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at sun.nio.ch.SocketChann

Re: Review Request 30403: Patch for KAFKA-1906

2015-01-31 Thread Jaikiran Pai

Hi Jeff,

I guess you are :)

Personally, whenever I download and try a new project *in development 
environment* I always just want to get it up and running without having 
to fiddle with configurations. Of course, I do a bit of reading the 
docs, before trying it out, but I like to have the install and run to be 
straightforward without having to change/add configurations. Having 
sensible defaults helps in development environments and in getting 
started. IMO, this param belongs to that category.


-Jaikiran

On Thursday 29 January 2015 08:00 PM, Jeff Holoman wrote:
Maybe I'm in the minority here, but I actually don't think there 
should be a default for this param and you should be required to 
explicitly set this.


On Thu, Jan 29, 2015 at 5:43 AM, Jay Kreps > wrote:




> On Jan. 29, 2015, 6:50 a.m., Gwen Shapira wrote:
> > We added --override option to KafkaServer that allows overriding 
default configuration from
commandline.
> > I believe that just changing the shell script to include
--override log.dir=${KAFKA_HOME}/data
> > may be enough?
> >
> > overriding configuration from server.properties in code can be
very unintuitive.
>
> Jaikiran Pai wrote:
> That sounds a good idea. I wasn't aware of the --override
option. I'll give that a try and if it works then the change will
be limited to just the scripts.
>
> Jaikiran Pai wrote:
> Hi Gwen,
>
> I had a look at the JIRA
https://issues.apache.org/jira/browse/KAFKA-1654 which added
support for --override and also the purpose of that option. From
what I see it won't be useful in this case, because in this
current task, we don't want to override a value that has been
explicitly set (via server.properties for example). Instead we
want to handle a case where no explicit value is specified for the
data log directory and in such cases default it to a path which
resides under the Kafka install directory.
>
> If we use the --override option in our (startup) scripts to
set log.dir=${KAFKA_HOME}/data, we will end up forcing this value
as the log.dir even when the user has intentionally specified a
different path for the data logs.
>
> Let me know if I misunderstood your suggestion.
>
> Jay Kreps wrote:
> I think you are right that --override won't work but maybe
this is still a good suggestion?
>
> Something seems odd about force overriding the working
directory of the process just to set the log directory.
>
> Another option might be to add --default. This would work
like --override but would provide a default value only if none is
specified. I think this might be possible because the
java.util.Properties we use for config supports a hierarchy of
defaults. E.g. you can say new Properties(defaultProperties). Not
sure if this is better or worse.
>
> Thoughts?
>
> Jaikiran Pai wrote:
> Hi Jay,
>
> > Another option might be to add --default. This would work
like --override but would provide a default value only if none is
specified. I think this might be possible because the
java.util.Properties we use for config supports a hierarchy of
defaults. E.g. you can say new Properties(defaultProperties). Not
sure if this is better or worse.
>
> I think --default sounds like a good idea which could help
us use it for other properties too (if we need to). It does look
better than the current change that I have done, because the Java
code then doesn't have to worry about how that default value is
sourced. We can then just update the scripts to set up the default
for the log.dir appropriately.
>
> I can work towards adding support for it and will update
this patch once it's ready.
>
> As for:
>
> > Something seems odd about force overriding the working
directory of the process just to set the log directory.
>
> Sorry, I don't understand what you meant there. Is it
something about the change that was done to the scripts?

I guess what I mean is: is there any other reason you might care
about the working directory of the process? If so we probably
don't want to force it to be the Kafka directory. If not it may
actually be fine and in that case I think having relative paths
work is nice. I don't personally know the answer to this, what is
"good practice" for a server process?


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30403/#review70168
---


On Jan. 29, 2015, 6:24 a.m., Jaikiran Pai wrote:
>
> ---

Re: Review Request 30063: Patch for KAFKA-1840

2015-01-31 Thread Eric Olander

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review70508
---



core/src/main/scala/kafka/tools/MirrorMaker.scala


Similar to below comment, lines 220-226 are equivalent to:

val customRebalanceListener = 
Option(options.valueOf(consumerRebalanceListenerOpt)).map {
  customRebalanceListenerClass => 
Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)
}



core/src/main/scala/kafka/tools/MirrorMaker.scala


Something to consider:

mirrorMakerMessageHandler = 
Option(options.valueOf(mirrorMakerMessageHandlerOpt)).map {
   mirrorMakerMessageHandlerClass => 
Utils.createObject[MirrorMakerMessageHandler](mirrorMakerMessageHandlerClass) 
}.getOrElse(new defaultMirrorMakerMessageHandler)


- Eric Olander


On Jan. 31, 2015, 2:25 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> ---
> 
> (Updated Jan. 31, 2015, 2:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
> https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1840
> 
> 
> Addressed Joel's comments
> 
> 
> Allow message handler to specify partitions for produce
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



Re: Review Request 30063: Patch for KAFKA-1840

2015-01-31 Thread Eric Olander

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review70509
---



core/src/main/scala/kafka/tools/MirrorMaker.scala


Class name should be capitalized.  Can this be an object instead of a class?


- Eric Olander


On Jan. 31, 2015, 2:25 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> ---
> 
> (Updated Jan. 31, 2015, 2:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
> https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1840
> 
> 
> Addressed Joel's comments
> 
> 
> Allow message handler to specify partitions for produce
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Updated] (KAFKA-1633) Data loss if broker is killed

2015-01-31 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1633:
-
Assignee: Guozhang Wang  (was: Jun Rao)
  Status: Patch Available  (was: Open)

> Data loss if broker is killed
> -
>
> Key: KAFKA-1633
> URL: https://issues.apache.org/jira/browse/KAFKA-1633
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1
> Environment: centos 6.3, open jdk 7
>Reporter: gautham varada
>Assignee: Guozhang Wang
> Attachments: KAFKA-1633.patch
>
>
> We have a 2 node kafka cluster, we experienced data loss when we did a kill 
> -9 on the brokers.  We also found a work around to prevent this loss.
> Replication factor :2, 4 partitions
> Steps to reproduce
> 1. Create a 2 node cluster with replication factor 2, num partitions 4
> 2. We used Jmeter to pump events
> 3. We used kafka web console to inspect the log size after the test
> During the test, we simultaneously killed the brokers using kill -9 and we 
> tallied the metrics reported by jmeter and the size we observed in the web 
> console, we lost tons of messages.
> We went back and set the Producer retry to 1 instead of the default 3 and 
> repeated the above tests and we did not loose a single message.
> We repeated the above tests with the Producer retry set to 3 and 1 with a 
> single broker and we observed data loss when the retry was 3 and no loss when 
> the retry was 1



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1633) Data loss if broker is killed

2015-01-31 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1633:
-
Attachment: KAFKA-1633.patch

> Data loss if broker is killed
> -
>
> Key: KAFKA-1633
> URL: https://issues.apache.org/jira/browse/KAFKA-1633
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1
> Environment: centos 6.3, open jdk 7
>Reporter: gautham varada
>Assignee: Jun Rao
> Attachments: KAFKA-1633.patch
>
>
> We have a 2 node kafka cluster, we experienced data loss when we did a kill 
> -9 on the brokers.  We also found a work around to prevent this loss.
> Replication factor :2, 4 partitions
> Steps to reproduce
> 1. Create a 2 node cluster with replication factor 2, num partitions 4
> 2. We used Jmeter to pump events
> 3. We used kafka web console to inspect the log size after the test
> During the test, we simultaneously killed the brokers using kill -9 and we 
> tallied the metrics reported by jmeter and the size we observed in the web 
> console, we lost tons of messages.
> We went back and set the Producer retry to 1 instead of the default 3 and 
> repeated the above tests and we did not loose a single message.
> We repeated the above tests with the Producer retry set to 3 and 1 with a 
> single broker and we observed data loss when the retry was 3 and no loss when 
> the retry was 1



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1633) Data loss if broker is killed

2015-01-31 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14300047#comment-14300047
 ] 

Guozhang Wang commented on KAFKA-1633:
--

Created reviewboard https://reviews.apache.org/r/30482/diff/
 against branch origin/trunk

> Data loss if broker is killed
> -
>
> Key: KAFKA-1633
> URL: https://issues.apache.org/jira/browse/KAFKA-1633
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1
> Environment: centos 6.3, open jdk 7
>Reporter: gautham varada
>Assignee: Jun Rao
> Attachments: KAFKA-1633.patch
>
>
> We have a 2 node kafka cluster, we experienced data loss when we did a kill 
> -9 on the brokers.  We also found a work around to prevent this loss.
> Replication factor :2, 4 partitions
> Steps to reproduce
> 1. Create a 2 node cluster with replication factor 2, num partitions 4
> 2. We used Jmeter to pump events
> 3. We used kafka web console to inspect the log size after the test
> During the test, we simultaneously killed the brokers using kill -9 and we 
> tallied the metrics reported by jmeter and the size we observed in the web 
> console, we lost tons of messages.
> We went back and set the Producer retry to 1 instead of the default 3 and 
> repeated the above tests and we did not loose a single message.
> We repeated the above tests with the Producer retry set to 3 and 1 with a 
> single broker and we observed data loss when the retry was 3 and no loss when 
> the retry was 1



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 30482: Add the coordinator to server

2015-01-31 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
---

Review request for kafka.


Bugs: KAFKA-1633
https://issues.apache.org/jira/browse/KAFKA-1633


Repository: kafka


Description
---

TBD


Diffs
-

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala 
fb7e9ed5c16dd15b71e1b1ac12948641185871db 
  core/src/main/scala/kafka/server/KafkaApis.scala 
f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaServer.scala 
89200da30a04943f0b9befe84ab17e62b747c8c4 

Diff: https://reviews.apache.org/r/30482/diff/


Testing
---


Thanks,

Guozhang Wang



[jira] [Issue Comment Deleted] (KAFKA-1633) Data loss if broker is killed

2015-01-31 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1633:
-
Comment: was deleted

(was: Created reviewboard https://reviews.apache.org/r/30482/diff/
 against branch origin/trunk)

> Data loss if broker is killed
> -
>
> Key: KAFKA-1633
> URL: https://issues.apache.org/jira/browse/KAFKA-1633
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1
> Environment: centos 6.3, open jdk 7
>Reporter: gautham varada
>Assignee: Guozhang Wang
>
> We have a 2 node kafka cluster, we experienced data loss when we did a kill 
> -9 on the brokers.  We also found a work around to prevent this loss.
> Replication factor :2, 4 partitions
> Steps to reproduce
> 1. Create a 2 node cluster with replication factor 2, num partitions 4
> 2. We used Jmeter to pump events
> 3. We used kafka web console to inspect the log size after the test
> During the test, we simultaneously killed the brokers using kill -9 and we 
> tallied the metrics reported by jmeter and the size we observed in the web 
> console, we lost tons of messages.
> We went back and set the Producer retry to 1 instead of the default 3 and 
> repeated the above tests and we did not loose a single message.
> We repeated the above tests with the Producer retry set to 3 and 1 with a 
> single broker and we observed data loss when the retry was 3 and no loss when 
> the retry was 1



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 30482: Add the coordinator to server

2015-01-31 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
---

(Updated Feb. 1, 2015, 1:40 a.m.)


Review request for kafka.


Bugs: KAFKA-1333 and KAFKA-1633
https://issues.apache.org/jira/browse/KAFKA-1333
https://issues.apache.org/jira/browse/KAFKA-1633


Repository: kafka


Description
---

TBD


Diffs (updated)
-

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala 
fb7e9ed5c16dd15b71e1b1ac12948641185871db 
  core/src/main/scala/kafka/server/KafkaApis.scala 
f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaServer.scala 
89200da30a04943f0b9befe84ab17e62b747c8c4 

Diff: https://reviews.apache.org/r/30482/diff/


Testing
---


Thanks,

Guozhang Wang



[jira] [Updated] (KAFKA-1333) Add consumer co-ordinator module to the server

2015-01-31 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1333:
-
Attachment: KAFKA-1333_2015-01-31_17:40:51.patch

> Add consumer co-ordinator module to the server
> --
>
> Key: KAFKA-1333
> URL: https://issues.apache.org/jira/browse/KAFKA-1333
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-1333_2015-01-31_17:40:51.patch
>
>
> Scope of this JIRA is to just add a consumer co-ordinator module that do the 
> following:
> 1) coordinator start-up, metadata initialization
> 2) simple join group handling (just updating metadata, no failure detection / 
> rebalancing): this should be sufficient for consumers doing self offset / 
> partition management.
> Offset manager will still run side-by-side with the coordinator in this JIRA, 
> and we will merge it in KAFKA-1740.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1633) Data loss if broker is killed

2015-01-31 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1633:
-
Attachment: (was: KAFKA-1633.patch)

> Data loss if broker is killed
> -
>
> Key: KAFKA-1633
> URL: https://issues.apache.org/jira/browse/KAFKA-1633
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1
> Environment: centos 6.3, open jdk 7
>Reporter: gautham varada
>Assignee: Guozhang Wang
>
> We have a 2 node kafka cluster, we experienced data loss when we did a kill 
> -9 on the brokers.  We also found a work around to prevent this loss.
> Replication factor :2, 4 partitions
> Steps to reproduce
> 1. Create a 2 node cluster with replication factor 2, num partitions 4
> 2. We used Jmeter to pump events
> 3. We used kafka web console to inspect the log size after the test
> During the test, we simultaneously killed the brokers using kill -9 and we 
> tallied the metrics reported by jmeter and the size we observed in the web 
> console, we lost tons of messages.
> We went back and set the Producer retry to 1 instead of the default 3 and 
> repeated the above tests and we did not loose a single message.
> We repeated the above tests with the Producer retry set to 3 and 1 with a 
> single broker and we observed data loss when the retry was 3 and no loss when 
> the retry was 1



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1333) Add consumer co-ordinator module to the server

2015-01-31 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1333:
-
Status: Patch Available  (was: Open)

> Add consumer co-ordinator module to the server
> --
>
> Key: KAFKA-1333
> URL: https://issues.apache.org/jira/browse/KAFKA-1333
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-1333_2015-01-31_17:40:51.patch
>
>
> Scope of this JIRA is to just add a consumer co-ordinator module that do the 
> following:
> 1) coordinator start-up, metadata initialization
> 2) simple join group handling (just updating metadata, no failure detection / 
> rebalancing): this should be sufficient for consumers doing self offset / 
> partition management.
> Offset manager will still run side-by-side with the coordinator in this JIRA, 
> and we will merge it in KAFKA-1740.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1333) Add consumer co-ordinator module to the server

2015-01-31 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14300048#comment-14300048
 ] 

Guozhang Wang commented on KAFKA-1333:
--

Updated reviewboard https://reviews.apache.org/r/30482/diff/
 against branch origin/trunk

> Add consumer co-ordinator module to the server
> --
>
> Key: KAFKA-1333
> URL: https://issues.apache.org/jira/browse/KAFKA-1333
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-1333_2015-01-31_17:40:51.patch
>
>
> Scope of this JIRA is to just add a consumer co-ordinator module that do the 
> following:
> 1) coordinator start-up, metadata initialization
> 2) simple join group handling (just updating metadata, no failure detection / 
> rebalancing): this should be sufficient for consumers doing self offset / 
> partition management.
> Offset manager will still run side-by-side with the coordinator in this JIRA, 
> and we will merge it in KAFKA-1740.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1910) Refactor KafkaConsumer

2015-01-31 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-1910:


 Summary: Refactor KafkaConsumer
 Key: KAFKA-1910
 URL: https://issues.apache.org/jira/browse/KAFKA-1910
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Guozhang Wang


KafkaConsumer now contains all the logic on the consumer side, making it a very 
huge class file, better re-factoring it to have multiple layers on top of 
KafkaClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 30482: Add the coordinator to server

2015-01-31 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
---

(Updated Feb. 1, 2015, 1:47 a.m.)


Review request for kafka.


Bugs: KAFKA-1333 and KAFKA-1633
https://issues.apache.org/jira/browse/KAFKA-1333
https://issues.apache.org/jira/browse/KAFKA-1633


Repository: kafka


Description (updated)
---

1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, 
and ZK listeners.
2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire 
heartbeat requests.
3. Add a delayed rebalance purgatory for preparing rebalance.
4. Add a join-group purgatory for sending back responses with assigned 
partitions.
5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / 
join-group / rebalance purgatories.
6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with 
coordinator, and sending reponses via callbacks.


Diffs
-

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala 
fb7e9ed5c16dd15b71e1b1ac12948641185871db 
  core/src/main/scala/kafka/server/KafkaApis.scala 
f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaServer.scala 
89200da30a04943f0b9befe84ab17e62b747c8c4 

Diff: https://reviews.apache.org/r/30482/diff/


Testing
---


Thanks,

Guozhang Wang



[jira] [Assigned] (KAFKA-1892) System tests for the new consumer and co-ordinator

2015-01-31 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-1892:


Assignee: Guozhang Wang

> System tests for the new consumer and co-ordinator
> --
>
> Key: KAFKA-1892
> URL: https://issues.apache.org/jira/browse/KAFKA-1892
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Guozhang Wang
>
> We need to get system test coverage for the new consumer implementation and 
> the co-ordinator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1335) Add rebalancing logic to the coordinator / consumer

2015-01-31 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-1335:


Assignee: Guozhang Wang

> Add rebalancing logic to the coordinator / consumer
> ---
>
> Key: KAFKA-1335
> URL: https://issues.apache.org/jira/browse/KAFKA-1335
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
>
> This implements the group management protocol. This will be a tricky and 
> potentially large change since it will involve implementing the group 
> management protocol, which include:
> 1) Adding the rebalance logic on the coordinator that can be triggered from 
> membership change (either through failure detector or join group requests) 
> and topic / partition ZK listener fires.
> 2) Adding the rebalance logic on the consumer upon topic change / error code 
> from coordinator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1633) Data loss if broker is killed

2015-01-31 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14300051#comment-14300051
 ] 

Guozhang Wang commented on KAFKA-1633:
--

Mistakenly submitted a patch which is for KAFKA-1333. Please ignore.

> Data loss if broker is killed
> -
>
> Key: KAFKA-1633
> URL: https://issues.apache.org/jira/browse/KAFKA-1633
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1
> Environment: centos 6.3, open jdk 7
>Reporter: gautham varada
>Assignee: Guozhang Wang
>
> We have a 2 node kafka cluster, we experienced data loss when we did a kill 
> -9 on the brokers.  We also found a work around to prevent this loss.
> Replication factor :2, 4 partitions
> Steps to reproduce
> 1. Create a 2 node cluster with replication factor 2, num partitions 4
> 2. We used Jmeter to pump events
> 3. We used kafka web console to inspect the log size after the test
> During the test, we simultaneously killed the brokers using kill -9 and we 
> tallied the metrics reported by jmeter and the size we observed in the web 
> console, we lost tons of messages.
> We went back and set the Producer retry to 1 instead of the default 3 and 
> repeated the above tests and we did not loose a single message.
> We repeated the above tests with the Producer retry set to 3 and 1 with a 
> single broker and we observed data loss when the retry was 3 and no loss when 
> the retry was 1



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] 0.8.2.0 Candidate 3

2015-01-31 Thread Jun Rao
Hi, Alex,

Thanks for testing RC3.

Broker.connectionString() is actually not part of the public api for the
producer. Is there a particular reason that you need to use this api?

Thanks,

Jun

On Sat, Jan 31, 2015 at 1:53 PM, Alex The Rocker 
wrote:

> Hello,
>
> I have read Broker.scala source code, and I found the answer:
>  - With Kafka 0.8.1.1 we used Broker.getConnectionString() in our Java
> code.
>  - With Kafka 0.8.2.0, this method has been replaced by a 0-arity method
> without the "get" prefix, so we have to change our Java code to call
> Broker.connectionString()
>
> So despite binary compatibility is broken, we have a by-pass.
> I hope this will help other people relying on this API...
>
> and I'm going to continue tests with 0.8.2 rc3..
>
> Alex
>
> 2015-01-31 21:23 GMT+01:00 Alex The Rocker :
>
> > Hello,
> >
> > I ran my own tests made with kafka_2.10-0.8.1.1.tgz binaries with our
> > application:
> >
> > 1st test:
> > ==
> >   replace all kafka .jar files in our application on consumming side
> >   (without recompiling anything)
> >   => tests passed, OK
> >
> > 2nd test:
> > ===
> >   replace all kafka .jar files in our application on producubg side
> >   (without recompiling anything)
> >   => KO, we get this error:
> >
> > 2015-01-31 20:54:00,094 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
> > Exception in thread "Timer-2"
> > 2015-01-31 20:54:00,111 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
> > java.lang.NoSuchMethodError:
> > kafka.cluster.Broker.getConnectionString()Ljava/lang/String;
> >
> > Which means that binary compatibility with 0.8.1.1 version has been
> broken.
> > We use getConnectionString() to get Broker's zookeepers adresses, see
> this
> > answer from Neha:
> >
> >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201404.mbox/%3CCAOG_4QYnWrB=tmrtcryf8-pdagy_cgfe_cxotqbclrkj2+x...@mail.gmail.com%3E
> >
> > If the kafka.cluster.Broker.getConnectionString() method has been removed
> > with Kafka 0.8.2.0, then what is the suitable replacement for it ?
> >
> > Thanks
> > Alex
> >
> >
> >> -Original Message-
> >> From: Jun Rao [mailto:j...@confluent.io]
> >> Sent: Thursday, January 29, 2015 6:22
> >> To: dev@kafka.apache.org; us...@kafka.apache.org;
> >> kafka-clie...@googlegroups.com
> >> Subject: [VOTE] 0.8.2.0 Candidate 3
> >>
> >> This is the third candidate for release of Apache Kafka 0.8.2.0.
> >>
> >> Release Notes for the 0.8.2.0 release
> >>
> >>
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
> >>
> >> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
> >>
> >> Kafka's KEYS file containing PGP keys we use to sign the release:
> >> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
> >> (SHA256) checksum.
> >>
> >> * Release artifacts to be voted upon (source and binary):
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
> >>
> >> * Maven artifacts to be voted upon prior to release:
> >> https://repository.apache.org/content/groups/staging/
> >>
> >> * scala-doc
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
> >>
> >> * java-doc
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
> >>
> >> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
> >>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
> >> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
> >>
> >> /***
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >
> >
>


Re: Review Request 30482: Add the coordinator to server

2015-01-31 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
---

(Updated Feb. 1, 2015, 2:45 a.m.)


Review request for kafka.


Bugs: KAFKA-1333
https://issues.apache.org/jira/browse/KAFKA-1333


Repository: kafka


Description
---

1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, 
and ZK listeners.
2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire 
heartbeat requests.
3. Add a delayed rebalance purgatory for preparing rebalance.
4. Add a join-group purgatory for sending back responses with assigned 
partitions.
5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / 
join-group / rebalance purgatories.
6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with 
coordinator, and sending reponses via callbacks.


Diffs
-

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala 
fb7e9ed5c16dd15b71e1b1ac12948641185871db 
  core/src/main/scala/kafka/server/KafkaApis.scala 
f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaServer.scala 
89200da30a04943f0b9befe84ab17e62b747c8c4 

Diff: https://reviews.apache.org/r/30482/diff/


Testing
---


Thanks,

Guozhang Wang



Re: [VOTE] 0.8.2.0 Candidate 3

2015-01-31 Thread Jun Rao
+1 (binding). Verified quickstart and unit tests.

Thanks,

Jun

On Wed, Jan 28, 2015 at 11:22 PM, Jun Rao  wrote:

> This is the third candidate for release of Apache Kafka 0.8.2.0.
>
> Release Notes for the 0.8.2.0 release
>
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
>
> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
> (SHA256) checksum.
>
> * Release artifacts to be voted upon (source and binary):
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
>
> * Maven artifacts to be voted upon prior to release:
> https://repository.apache.org/content/groups/staging/
>
> * scala-doc
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
>
> * java-doc
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
>
> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
>
> /***
>
> Thanks,
>
> Jun
>
>


Re: Cannot stop Kafka server if zookeeper is shutdown first

2015-01-31 Thread Gwen Shapira
It looks like the new KafkaZkClient is a wrapper around ZkClient, but
not a replacement. Did I get it right?

I think a wrapper for ZkClient can be useful - for example KAFKA-1664
can also use one.

However, I'm wondering why not contribute the fix directly to ZKClient
project and ask for a release that contains the fix?
This will benefit other users of the project who may also need a
timeout (thats pretty basic...)

As an alternative, if we don't want to collaborate with ZKClient for
some reason, forking the project into Kafka will probably give us more
control than wrappers and without much downside.

Just a thought.

Gwen





On Sat, Jan 31, 2015 at 6:32 AM, Jaikiran Pai  wrote:
> Neha, Ewen (and others), my initial attempt to solve this is uploaded here
> https://reviews.apache.org/r/30477/. It solves the shutdown problem and now
> the server shuts down even when Zookeeper has gone down before the Kafka
> server.
>
> I went with the approach of introducing a custom (enhanced) ZkClient which
> for now allows time outs to be optionally specified for certain operations.
> I intentionally haven't forced the use of this new KafkaZkClient all over
> the code and instead for now have just used it in the KafkaServer.
>
> Does this patch look like something worth using?
>
> -Jaikiran
>
>
> On Thursday 29 January 2015 10:41 PM, Neha Narkhede wrote:
>>
>> Ewen is right. ZkClient APIs are blocking and the right fix for this seems
>> to be patching ZkClient. At some point, if we find ourselves fiddling too
>> much with ZkClient, it wouldn't hurt to write our own little zookeeper
>> client wrapper.
>>
>> On Thu, Jan 29, 2015 at 12:57 AM, Ewen Cheslack-Postava
>> 
>> wrote:
>>
>>> Looks like a bug to me -- the underlying ZK library wraps a lot of
>>> blocking
>>> method implementations with waitUntilConnected() calls without any
>>> timeouts. Ideally we could just add a version of ZkUtils.getController()
>>> with a timeout, but I don't see an easy way to accomplish that with
>>> ZkClient.
>>>
>>> There's at least one other call to ZkUtils besides the one in the
>>> stacktrace you gave that would cause the same issue, possibly more that
>>> aren't directly called in that method. One ugly solution would be to use
>>> an
>>> extra thread during shutdown to trigger timeouts, but I'd imagine we
>>> probably have other threads that could end up blocking in similar ways.
>>>
>>> I filed https://issues.apache.org/jira/browse/KAFKA-1907 to track the
>>> issue.
>>>
>>>
>>> On Mon, Jan 26, 2015 at 6:35 AM, Jaikiran Pai 
>>> wrote:
>>>
 The main culprit is this thread which goes into "forever retry
 connection
 to a closed zookeeper" when I shutdown Kafka (via a Ctrl + C) after
 zookeeper has already been shutdown. I have attached the complete thread
 dump, but I don't know if it will be delivered to the mailing list.

 "Thread-2" prio=10 tid=0xb3305000 nid=0x4758 waiting on condition
 [0x6ad69000]
 java.lang.Thread.State: TIMED_WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <0x70a93368> (a java.util.concurrent.locks.
 AbstractQueuedSynchronizer$ConditionObject)
  at java.util.concurrent.locks.LockSupport.parkUntil(
 LockSupport.java:267)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer$
 ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2130)
  at
 org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
  at
 org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
  at
 org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
  at
>>>
>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)

  at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
  at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
  at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:456)
  at kafka.utils.ZkUtils$.getController(ZkUtils.scala:65)
  at kafka.server.KafkaServer.kafka$server$KafkaServer$$
 controlledShutdown(KafkaServer.scala:194)
  at kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$
 sp(KafkaServer.scala:269)
  at kafka.utils.Utils$.swallow(Utils.scala:172)
  at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
  at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
  at kafka.utils.Logging$class.swallow(Logging.scala:94)
  at kafka.utils.Utils$.swallow(Utils.scala:45)
  at kafka.server.KafkaServer.shutdown(KafkaServer.scala:269)
  at kafka.server.KafkaServerStartable.shutdown(
 KafkaServerStartable.scala:42)
  at kafka.Kafka$$anon$1.run(Kafka.scala:42)

 -Jaikiran


 On Monday 26 January 2015 05:46 AM, Neha Narkhede wrote:

> For a clean shutdown, the broker tries to talk to the controller and
>>>
>>> also
>
> issues reads to zoo

Re: Review Request 30063: Patch for KAFKA-1840

2015-01-31 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review70523
---


I like the idea of a message handler in MirrorMaker, but I think we can do 
better. Let me know if you think I'm taking it far beyond your original 
scope... I can add it as a follow up jira.

1. I think we need to let users pass parameters to the handlers. We need a 
"configure" or "init" method in the handler, which MirrorMaker will call once 
with the right properties and the handler can use them for basic setup. For 
example, imagine a "regexp filter" handler - I get a regexp from the 
commandline and filter messages that don't match. My "init" method will set up 
the regexp so it will be available for all handle() calls.
2. I think the handle() method should take List as input, not just a 
Record. MirrorMaker will be able to consume until it fills a batch (or until we 
waited too long), "handle" a batch - which will be able to use Scala's Sequence 
operators - Filter, Map, etc, and then produce an entire list of records. This 
sounds more efficient to me. 

Another comment - I think the addition of "destination partition" is unrelated 
to this change and Jira?

- Gwen Shapira


On Jan. 31, 2015, 2:25 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> ---
> 
> (Updated Jan. 31, 2015, 2:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
> https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1840
> 
> 
> Addressed Joel's comments
> 
> 
> Allow message handler to specify partitions for produce
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Created] (KAFKA-1911) Log deletion on stopping replicas should be async

2015-01-31 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-1911:
-

 Summary: Log deletion on stopping replicas should be async
 Key: KAFKA-1911
 URL: https://issues.apache.org/jira/browse/KAFKA-1911
 Project: Kafka
  Issue Type: Bug
  Components: log, replication
Reporter: Joel Koshy
Assignee: Jay Kreps
 Fix For: 0.8.3


If a StopReplicaRequest sets delete=true then we do a file.delete on the file 
message sets. I was under the impression that this is fast but it does not seem 
to be the case.

On a partition reassignment in our cluster the local time for stop replica took 
nearly 30 seconds.

{noformat}
Completed request:Name: StopReplicaRequest; Version: 0; CorrelationId: 467; 
ClientId: ;DeletePartitions: true; ControllerId: 1212; ControllerEpoch: 53 
from 
client/...:45964;totalTime:29191,requestQueueTime:1,localTime:29190,remoteTime:0,responseQueueTime:0,sendTime:0
{noformat}

This ties up one API thread for the duration of the request.

Specifically in our case, the queue times for other requests also went up and 
producers to the partition that was just deleted on the old leader took a while 
to refresh their metadata (see KAFKA-1303) and eventually ran out of retries on 
some messages leading to data loss.

I think the log deletion in this case should be fully asynchronous although we 
need to handle the case when a broker may respond immediately to the 
stop-replica-request but then go down after deleting only some of the log 
segments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 30063: Patch for KAFKA-1840

2015-01-31 Thread Jiangjie Qin


> On Feb. 1, 2015, 6:34 a.m., Gwen Shapira wrote:
> > I like the idea of a message handler in MirrorMaker, but I think we can do 
> > better. Let me know if you think I'm taking it far beyond your original 
> > scope... I can add it as a follow up jira.
> > 
> > 1. I think we need to let users pass parameters to the handlers. We need a 
> > "configure" or "init" method in the handler, which MirrorMaker will call 
> > once with the right properties and the handler can use them for basic 
> > setup. For example, imagine a "regexp filter" handler - I get a regexp from 
> > the commandline and filter messages that don't match. My "init" method will 
> > set up the regexp so it will be available for all handle() calls.
> > 2. I think the handle() method should take List as input, not just 
> > a Record. MirrorMaker will be able to consume until it fills a batch (or 
> > until we waited too long), "handle" a batch - which will be able to use 
> > Scala's Sequence operators - Filter, Map, etc, and then produce an entire 
> > list of records. This sounds more efficient to me. 
> > 
> > Another comment - I think the addition of "destination partition" is 
> > unrelated to this change and Jira?

I think it's a great idea to extend message handler to take commandline 
arguments. I'll add that to the this patch.
I'm not sure I understand why it will improve performance by passing in a list 
of record. Shouldn't the message handler decide whether it wants to buffer some 
records or not? Some worries about buffering messages outside message handler 
are:
1. It is usually very difficult to decide how many messages to buffer because 
it is likely depending on the business logic and could vary from time to time.
2. It might further increase the delay.

The addition of destination partition is actually mainly to serve partition to 
partition copy, which is something related to message handler. I feel it could 
be put into either KAFKA-1840 or KAFKA-1839. I put it here as it seems to be 
more related to the message handler because the handler has to assigne 
partition explicitly. 

Maybe as a follow up patch, I'm thinking that we can add a commandline option 
of --mirror-cluster so it does the following:
1. Automatically create same number of partitions in target cluster when new 
topic is created in source cluster. (Based on KAFKA-1839)
2. Produce the messages to the destination partition same as the source 
partition. (Follow-up patch for KAFKA-1840)


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review70523
---


On Jan. 31, 2015, 2:25 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> ---
> 
> (Updated Jan. 31, 2015, 2:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
> https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1840
> 
> 
> Addressed Joel's comments
> 
> 
> Allow message handler to specify partitions for produce
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>