[ 
https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13560901#comment-13560901
 ] 

Neha Narkhede commented on KAFKA-631:
-------------------------------------

Reviewed patch v4 -

1. CleanerConfig
1.1 Typos - enableClenaer, dedupeBufferLoadFactor (probably 
dedupBufferLoadFactor is better?)

2. VerifiableProperties
"If the given key is not present" -> "If the given property is not present"

3. KafkaConfig
3.1 The comment for explaining log.cleaner.min.cleanable.ratio is confusing 
"/* the minimum ratio of bytes of log eligible for cleaning to bytes to total 
bytes which a log must
     contain to be eligible for cleaning */"
3.2 The config "log.retention.check.interval.ms" says the retention check is in 
milliseconds, but the name of the config is logCleanupIntervalMinutes and we 
multiple this value by 60K before passing it into LogManager
3.3 Can we document the different values for log.cleanup.policy in the comment ?

4. OffsetMap
4.1 Remove unused import "import java.util.concurrent._"
4.2 entries should be updated in put() API

5. Log
5.1 Rolling new log segment in %s (log = %d/%d, index = %d/%d, age = %d/%d)
This log statement got a little confusing but sofisticated. The last part of 
the statement should be index and last but one should be age

6. LogCleaner
6.1 In the cleanSegments() API, we pass in SystemTime to the LogSegment. 
However, we already pass in a Time instance to LogCleaner. In order to test it 
independently, we can pass in MockTime to LogCleaner but we should pass in the 
same instance to LogSegment for it to work correctly.
6.2 In the cleanInto() API, we log a custom message in the 
IllegalArgumentException. I'm not sure I quite understood that. Aren't the log 
segments to be cleaned a mix of previously cleaned segments and yet to be 
cleaned ones ? Why not just use "require" like we did while building the 
offsetmap ?
6.3 If the server crashes in replaceSegments() after addSegment() and before 
asyncDeleteSegment() and let's say 2 log segments (xxxx.log,yyyy.log) were 
replaced with one new log segment(xxxx.log). Now, when this server restarts, 
the loadSegments() API will swap in the new xxxx.log.swap as xxxx.log, but it 
will leave yyyy.log. 
6.4 Do we have a unit test to cover the grouping logic in groupSegmentsBySize() 
API ? It looks correct to me, but I've been bitten by several scala collection 
append nuances before.
6.5 Remove unused import "import java.util.concurrent.locks.ReentrantLock"
6.6 allCleanerCheckpoints() is only called from within LogCleaner. Can we make 
this private ?

7. CleanerConfig
7.1 Typo in API doc "enableClenaer" and "clenaer"
7.2 Why 3MB for the minimum buffer space per thread ? Can we keep this 
configurable as well ?

8. LogManager
8.1 Can we rename configs to topicConfigs or topicOverrides ?

9. LogSegment
9.1 Fix log4j statement for the .log renaming - "Failed to change the index 
file suffix"
 
10. ReplicaManager
In checkpointHighWatermarks(), it is better to use fatal("Error writing to 
highwatermark file: ", e)

11. MockScheduler
Even though this is not introduced in this patch, while reading the code, 
realized that the MockScheduler actually executes tasks before their 
nextExecution time is reached. This is because we just check if the 
nextExecutionTime <= now and then call task.fun() without waiting until 
nextExecution time.

                
> Implement log compaction
> ------------------------
>
>                 Key: KAFKA-631
>                 URL: https://issues.apache.org/jira/browse/KAFKA-631
>             Project: Kafka
>          Issue Type: New Feature
>          Components: core
>    Affects Versions: 0.8.1
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: KAFKA-631-v1.patch, KAFKA-631-v2.patch, 
> KAFKA-631-v3.patch, KAFKA-631-v4.patch
>
>
> Currently Kafka has only one way to bound the space of the log, namely by 
> deleting old segments. The policy that controls which segments are deleted 
> can be configured based either on the number of bytes to retain or the age of 
> the messages. This makes sense for event or log data which has no notion of 
> primary key. However lots of data has a primary key and consists of updates 
> by primary key. For this data it would be nice to be able to ensure that the 
> log contained at least the last version of every key.
> As an example, say that the Kafka topic contains a sequence of User Account 
> messages, each capturing the current state of a given user account. Rather 
> than simply discarding old segments, since the set of user accounts is 
> finite, it might make more sense to delete individual records that have been 
> made obsolete by a more recent update for the same key. This would ensure 
> that the topic contained at least the current state of each record.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to