[ 
https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16192405#comment-16192405
 ] 

Ivan Babrou commented on KAFKA-3359:
------------------------------------

It's me again. We hit the issue again and I googled into this issue again. 
Wanted to add that Kafka re-reads full partitions to recover and it takes 20 
minutes on smallest ones that are around 1.5TB:

{noformat}
Oct 05 01:10:43 mybroker14 kafka[32940]: WARN Found a corrupted index file due 
to requirement failed: Corrupt index found, index file 
(/state/kafka/http/requests-47/00000001246285678992.index) has non-zero size 
but the last offset is 1246285678992 which is no larger than the base offset 
1246285678992.}. deleting 
/state/kafka/http/requests-47/00000001246285678992.timeindex, 
/state/kafka/http/requests-47/00000001246285678992.index, and 
/state/kafka/http/requests-47/00000001246285678992.txnindex and rebuilding 
index... (kafka.log.Log)
Oct 05 01:10:43 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/00000001246285678992.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:10:47 mybroker14 kafka[32940]: INFO Recovering unflushed segment 
1246283087840 in log requests-47. (kafka.log.Log)
Oct 05 01:31:29 mybroker14 kafka[32940]: INFO Recovering unflushed segment 
1246284384425 in log requests-47. (kafka.log.Log)
Oct 05 01:31:29 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/00000001246283087840.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:31:36 mybroker14 kafka[32940]: INFO Recovering unflushed segment 
1246285678992 in log requests-47. (kafka.log.Log)
Oct 05 01:31:36 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/00000001246284384425.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:31:42 mybroker14 kafka[32940]: INFO Loading producer state from 
offset 1246286680535 for partition requests-47 with message format version 0 
(kafka.log.Log)
Oct 05 01:31:42 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/00000001246285678992.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:31:43 mybroker14 kafka[32940]: INFO Completed load of log requests-47 
with 719 log segments, log start offset 1245351135299 and log end offset 
1246286680535 in 1260684 ms (kafka.log.Log)
{noformat}

> Parallel log-recovery of un-flushed segments on startup
> -------------------------------------------------------
>
>                 Key: KAFKA-3359
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3359
>             Project: Kafka
>          Issue Type: Improvement
>          Components: log
>    Affects Versions: 0.8.2.2, 0.9.0.1
>            Reporter: Vamsi Subhash Achanta
>            Assignee: Jay Kreps
>
> On startup, currently the log segments within a logDir are loaded 
> sequentially when there is a un-clean shutdown. This will take a lot of time 
> for the segments to be loaded as the logSegment.recover(..) is called for 
> every segment and for brokers which have many partitions, the time taken will 
> be very high (we have noticed ~40mins for 2k partitions).
> https://github.com/apache/kafka/pull/1035
> This pull request will make the log-segment load parallel with two 
> configurable properties "log.recovery.threads" and 
> "log.recovery.max.interval.ms".
> Logic:
> 1. Have a threadpool defined of fixed length (log.recovery.threads)
> 2. Submit the logSegment recovery as a job to the threadpool and add the 
> future returned to a job list
> 3. Wait till all the jobs are done within req. time 
> (log.recovery.max.interval.ms - default set to Long.Max).
> 4. If they are done and the futures are all null (meaning that the jobs are 
> successfully completed), it is considered done.
> 5. If any of the recovery jobs failed, then it is logged and 
> LogRecoveryFailedException is thrown
> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
> The logic is backward compatible with the current sequential implementation 
> as the default thread count is set to 1.
> PS: I am new to Scala and the code might look Java-ish but I will be happy to 
> modify the code review changes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to