[jira] [Commented] (KAFKA-955) After a leader change, messages sent with ack=0 are lost

2013-08-24 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13749410#comment-13749410
 ] 

Neha Narkhede commented on KAFKA-955:
-

I like the way the responseCode is generalized. Patch v6 looks good, few minor 
comments before checkin -

1. Remove unused variable allBrokers from KafkaApis
2. This comment needs to be changed according to the new response code logic - 
// a null response send object indicates

Maybe we should wait for review from [~jkreps] since he has most context on the 
socket server.

> After a leader change, messages sent with ack=0 are lost
> 
>
> Key: KAFKA-955
> URL: https://issues.apache.org/jira/browse/KAFKA-955
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Rosenberg
>Assignee: Guozhang Wang
> Attachments: KAFKA-955.v1.patch, KAFKA-955.v1.patch, 
> KAFKA-955.v2.patch, KAFKA-955.v3.patch, KAFKA-955.v4.patch, 
> KAFKA-955.v5.patch, KAFKA-955.v6.patch
>
>
> If the leader changes for a partition, and a producer is sending messages 
> with ack=0, then messages will be lost, since the producer has no active way 
> of knowing that the leader has changed, until it's next metadata refresh 
> update.
> The broker receiving the message, which is no longer the leader, logs a 
> message like this:
> Produce request with correlation id 7136261 from client  on partition 
> [mytopic,0] failed due to Leader not local for partition [mytopic,0] on 
> broker 508818741
> This is exacerbated by the controlled shutdown mechanism, which forces an 
> immediate leader change.
> A possible solution to this would be for a broker which receives a message, 
> for a topic that it is no longer the leader for (and if the ack level is 0), 
> then the broker could just silently forward the message over to the current 
> leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-952) a broker should unregister certain ZK watchers afte it is no longer the controller

2013-08-24 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-952:


Component/s: (was: core)
 controller

> a broker should unregister certain ZK watchers afte it is no longer the 
> controller
> --
>
> Key: KAFKA-952
> URL: https://issues.apache.org/jira/browse/KAFKA-952
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>
> It seems that we only register watchers in the controller logic, but never 
> deregister any watchers. Technically, after a broker stops becoming a 
> controller, the only watcher that it needs to keep registering is on the 
> controller path. The rest of the watchers can be deregistered.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-952) a broker should unregister certain ZK watchers afte it is no longer the controller

2013-08-24 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-952:


Assignee: Neha Narkhede

> a broker should unregister certain ZK watchers afte it is no longer the 
> controller
> --
>
> Key: KAFKA-952
> URL: https://issues.apache.org/jira/browse/KAFKA-952
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>Assignee: Neha Narkhede
>
> It seems that we only register watchers in the controller logic, but never 
> deregister any watchers. Technically, after a broker stops becoming a 
> controller, the only watcher that it needs to keep registering is on the 
> controller path. The rest of the watchers can be deregistered.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-881) Kafka broker not respecting log.roll.hours

2013-08-24 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13749420#comment-13749420
 ] 

Neha Narkhede commented on KAFKA-881:
-

Sorry for reviewing this late. Few questions -

1. Doesn't it suffice to check if(segment.lastAppendTime.isDefined && 
(time.milliseconds - segment.lastAppendTime.get > rollIntervalMs))
2. There are many whitespace changes in LogSegment. Do you mind getting rid of 
those in the next patch ?

> Kafka broker not respecting log.roll.hours
> --
>
> Key: KAFKA-881
> URL: https://issues.apache.org/jira/browse/KAFKA-881
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.7.2
>Reporter: Dan F
>Assignee: Jay Kreps
> Attachments: kafka-roll-0.8.patch, kafka-roll.again.patch, 
> kafka_roll.patch
>
>
> We are running Kafka 0.7.2. We set log.roll.hours=1. I hoped that meant logs 
> would be rolled every hour, or more. Only, sometimes logs that are many hours 
> (sometimes days) old have more data added to them. This perturbs our systems 
> for reasons I won't get in to.
> I don't know Scala or Kafka well, but I have proposal for why this might 
> happen: upon restart, a broker forgets when its log files have been appended 
> to ("firstAppendTime"). Then a potentially infinite amount of time later, the 
> restarted broker receives another message for the particular (topic, 
> partition), and starts the clock again. It will then roll over that log after 
> an hour.
> https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/server/KafkaConfig.scala
>  says:
>   /* the maximum time before a new log segment is rolled out */
>   val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, 
> Int.MaxValue))
> https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/log/Log.scala
>  has maybeRoll, which needs segment.firstAppendTime defined. It also has 
> updateFirstAppendTime() which says if it's empty, then set it.
> If my hypothesis is correct about why it is happening, here is a case where 
> rolling is longer than an hour, even on a high volume topic:
> - write to a topic for 20 minutes
> - restart the broker
> - wait for 5 days
> - write to a topic for 20 minutes
> - restart the broker
> - write to a topic for an hour
> The rollover time was now 5 days, 1 hour, 40 minutes. You can make it as long 
> as you want.
> Proposed solution:
> The very easiest thing to do would be to have Kafka re-initialized 
> firstAppendTime with the file creation time. Unfortunately, there is no file 
> creation time in UNIX. There is ctime, change time, updated when a file's 
> inode information is changed.
> One solution is to embed the firstAppendTime in the filename (say, seconds 
> since epoch). Then when you open it you could reset firstAppendTime to 
> exactly what it really was. This ignores clock drift or resetting. One could 
> set firstAppendTime to min(filename-based time, current time).
> A second solution is to make the Kafka log roll over at specific times, 
> regardless of when the file was created. Conceptually, time can be divided 
> into windows of size log.rollover.hours since epoch (UNIX time 0, 1970). So, 
> when firstAppendTime is empty, compute the next rollover time (say, next = 
> (hours since epoch) % (log.rollover.hours) + log.rollover.hours). If the file 
> mtime (last modified) is before the current rollover window ( 
> (next-log.rollover.hours) .. next ), roll it over right away. Otherwise, roll 
> over when you cross next, and reset next.
> A third solution (not perfect, but an approximation at least) would be to not 
> to write to a segment if firstAppendTime is not defined and the timestamp on 
> the file is more than log.roll.hours old.
> There are probably other solutions.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-677) Retention process gives exception if an empty segment is chosen for collection

2013-08-24 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13749421#comment-13749421
 ] 

Neha Narkhede commented on KAFKA-677:
-

Jun, does this still happen on 0.8 ? 

> Retention process gives exception if an empty segment is chosen for collection
> --
>
> Key: KAFKA-677
> URL: https://issues.apache.org/jira/browse/KAFKA-677
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Fix For: 0.8.1
>
> Attachments: kafka_677-cleanup.patch
>
>
> java.io.FileNotFoundException: 
> /mnt/u001/kafka_08_long_running_test/kafka-logs/NewsActivityEvent-3/.index
>  (No such file or directory)
> at java.io.RandomAccessFile.open(Native Method)
> at java.io.RandomAccessFile.(RandomAccessFile.java:212)
> at kafka.log.OffsetIndex.resize(OffsetIndex.scala:244)
> at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:233)
> at kafka.log.Log.rollToOffset(Log.scala:459)
> at kafka.log.Log.roll(Log.scala:443)
> at kafka.log.Log.markDeletedWhile(Log.scala:395)
> at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:241)
> at 
> kafka.log.LogManager$$anonfun$cleanupLogs$2.apply(LogManager.scala:277)
> at 
> kafka.log.LogManager$$anonfun$cleanupLogs$2.apply(LogManager.scala:275)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at 
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495)
> at kafka.log.LogManager.cleanupLogs(LogManager.scala:275)
> at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:141)
> at kafka.utils.Utils$$anon$2.run(Utils.scala:66)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:619)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-881) Kafka broker not respecting log.roll.hours

2013-08-24 Thread Dan F (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13749430#comment-13749430
 ] 

Dan F commented on KAFKA-881:
-

1. No, I believe that is the bug. lastAppendTime does not persist across 
restarts.

2. It might be better for you to do it, since you know which whitespace changes 
are disturbing you.  As far as I know, I made minimal changes that obey the 
whitespace rules (i.e. closing braces matching open braces in indentation, 
etc.).  I might be wrong.

> Kafka broker not respecting log.roll.hours
> --
>
> Key: KAFKA-881
> URL: https://issues.apache.org/jira/browse/KAFKA-881
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.7.2
>Reporter: Dan F
>Assignee: Jay Kreps
> Attachments: kafka-roll-0.8.patch, kafka-roll.again.patch, 
> kafka_roll.patch
>
>
> We are running Kafka 0.7.2. We set log.roll.hours=1. I hoped that meant logs 
> would be rolled every hour, or more. Only, sometimes logs that are many hours 
> (sometimes days) old have more data added to them. This perturbs our systems 
> for reasons I won't get in to.
> I don't know Scala or Kafka well, but I have proposal for why this might 
> happen: upon restart, a broker forgets when its log files have been appended 
> to ("firstAppendTime"). Then a potentially infinite amount of time later, the 
> restarted broker receives another message for the particular (topic, 
> partition), and starts the clock again. It will then roll over that log after 
> an hour.
> https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/server/KafkaConfig.scala
>  says:
>   /* the maximum time before a new log segment is rolled out */
>   val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, 
> Int.MaxValue))
> https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/log/Log.scala
>  has maybeRoll, which needs segment.firstAppendTime defined. It also has 
> updateFirstAppendTime() which says if it's empty, then set it.
> If my hypothesis is correct about why it is happening, here is a case where 
> rolling is longer than an hour, even on a high volume topic:
> - write to a topic for 20 minutes
> - restart the broker
> - wait for 5 days
> - write to a topic for 20 minutes
> - restart the broker
> - write to a topic for an hour
> The rollover time was now 5 days, 1 hour, 40 minutes. You can make it as long 
> as you want.
> Proposed solution:
> The very easiest thing to do would be to have Kafka re-initialized 
> firstAppendTime with the file creation time. Unfortunately, there is no file 
> creation time in UNIX. There is ctime, change time, updated when a file's 
> inode information is changed.
> One solution is to embed the firstAppendTime in the filename (say, seconds 
> since epoch). Then when you open it you could reset firstAppendTime to 
> exactly what it really was. This ignores clock drift or resetting. One could 
> set firstAppendTime to min(filename-based time, current time).
> A second solution is to make the Kafka log roll over at specific times, 
> regardless of when the file was created. Conceptually, time can be divided 
> into windows of size log.rollover.hours since epoch (UNIX time 0, 1970). So, 
> when firstAppendTime is empty, compute the next rollover time (say, next = 
> (hours since epoch) % (log.rollover.hours) + log.rollover.hours). If the file 
> mtime (last modified) is before the current rollover window ( 
> (next-log.rollover.hours) .. next ), roll it over right away. Otherwise, roll 
> over when you cross next, and reset next.
> A third solution (not perfect, but an approximation at least) would be to not 
> to write to a segment if firstAppendTime is not defined and the timestamp on 
> the file is more than log.roll.hours old.
> There are probably other solutions.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-1000) Inbuilt consumer offset management feature for kakfa

2013-08-24 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1000:
-

Description: 
Kafka currently stores offsets in zookeeper. This is a problem for several 
reasons. First it means the consumer must embed the zookeeper client which is 
not available in all languages. Secondly offset commits are actually quite 
frequent and Zookeeper does not scale this kind of high-write load. 

This Jira is for tracking the phase #2 of Offset Management [0]. Joel and I 
have been working on this. [1] is the overall design of the feature.

[0] : https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management
[1] : 
https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management


  was:
This Jira is for tracking the phase #2 of Offset Management [0]. Joel and I 
have been working on this. [1] is the overall design of the feature.

[0] : https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management
[1] : 
https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management



> Inbuilt consumer offset management feature for kakfa
> 
>
> Key: KAFKA-1000
> URL: https://issues.apache.org/jira/browse/KAFKA-1000
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Affects Versions: 0.8.1
>Reporter: Tejas Patil
>Assignee: Tejas Patil
>Priority: Minor
>  Labels: features
>
> Kafka currently stores offsets in zookeeper. This is a problem for several 
> reasons. First it means the consumer must embed the zookeeper client which is 
> not available in all languages. Secondly offset commits are actually quite 
> frequent and Zookeeper does not scale this kind of high-write load. 
> This Jira is for tracking the phase #2 of Offset Management [0]. Joel and I 
> have been working on this. [1] is the overall design of the feature.
> [0] : https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management
> [1] : 
> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-1023) Allow Injectable LogManager

2013-08-24 Thread Micah Whitacre (JIRA)
Micah Whitacre created KAFKA-1023:
-

 Summary: Allow Injectable LogManager
 Key: KAFKA-1023
 URL: https://issues.apache.org/jira/browse/KAFKA-1023
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Micah Whitacre


Currently the LogManager is responsible for deleting and cleaning up messages 
based on time or size.  It'd be nice to be able enhance the LogManager to not 
only perform the cleanup but maybe also backup the messages eligible for 
deletion to a custom location (hdfs).  This would allow a backup plan in the 
case of a consumer not able to keep up with the messages and data being lost 
due to log rolling.

Currently LogManager is sealed so no one can extend it but additionally we'd 
need a way to inject the custom LogManager into the KafkaServer. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1023) Allow Injectable LogManager

2013-08-24 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13749519#comment-13749519
 ] 

Neha Narkhede commented on KAFKA-1023:
--

LogManager is an internal component of a Kafka server that manages a server's 
log storage. It may not be a good idea to make it pluggable. For the purposes 
of log backup into HDFS, it makes sense to use something like - 
https://github.com/linkedin/camus.

> Allow Injectable LogManager
> ---
>
> Key: KAFKA-1023
> URL: https://issues.apache.org/jira/browse/KAFKA-1023
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Micah Whitacre
>
> Currently the LogManager is responsible for deleting and cleaning up messages 
> based on time or size.  It'd be nice to be able enhance the LogManager to not 
> only perform the cleanup but maybe also backup the messages eligible for 
> deletion to a custom location (hdfs).  This would allow a backup plan in the 
> case of a consumer not able to keep up with the messages and data being lost 
> due to log rolling.
> Currently LogManager is sealed so no one can extend it but additionally we'd 
> need a way to inject the custom LogManager into the KafkaServer. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira