[ 
https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-631:
----------------------------

    Attachment: KAFKA-631-v1.patch

This patch implements more or less what was described above.

Specific Changes:
- OffsetCheckpoint.scala: Generalize HighwaterMarkCheckpoint to 
OffsetCheckpoint for use in tracking the cleaner point. In the future we would 
use this for flush point too, if possible.
- Move configuration parameters in Log to a single class, LogConfig, to prepare 
for dynamically changing log configuration (also a nice cleanup)
- Implement a cleaner process in LogCleaner.scala that cleans logs, this is 
mostly standalone code. It is complicated but doesn't really touch anything 
else.
- Implement an efficient OffsetMap (and associated tests) for log deduplication
- Add an API in Log.scala that allows swapping in segments. This api is fairly 
specific to the cleaner for now and is not a public api.
- Refactor segment delete in Log.scala to allow reuse of the async delete 
functionality in segment swap
- Add logic in log recovery (Log.scala) to handle the case of a crash in the 
middle of cleaning or file swaps.
- Add a set of unit tests on cleaner logic (CleanerTest.scala), an integration 
test (LogCleanerIntegrationTest.scala) for the cleaner, and a torture test to 
run against a standalone server (TestLogCleaning.scala). The torture test 
produces a bunch of messages to a server over a long period of time and 
simultaneously logs them out to a text file. Then it uses unix sort to deduce 
this text file and compares the result to the result of consuming from the 
topic (if the unique key-set isn't the same for both it throws an error). It 
also measures the log size reduction.

New configuration parameters:

# should we default to delete or deduce for the cleanup policy?
log.cleanup.policy = delete/dedupe

# per-topic override for cleanup policy
topic.log.cleanup.policy = topic:delete/dedupe, … 

# number of background threads to use for log cleaning
log.cleaner.threads=1

# maximum I/O the cleaner is allowed to do (read & write combined)
log.cleaner.io.max.bytes.per.second=Double.MaxValue

# the maximum memory the cleaner can use
log.cleaner.buffer.size=100MB

# the amount of time to sleep when there is no cleaning to do
log.cleaner.backoff.ms=30secs

# minimum ratio of new to old messages the log must have for cleaning to proceed
log.cleaner.min.cleanable.ratio=0.5

I also changed the configuration log.cleanup.interval.mins to 
log.retention.check.interval.ms because the word "cleanup" is confusing.

New Persistent Data

This patch adds a new persistent data structure, a per-data directory file 
'cleaner-offset-checkpoint'. This is the exact same format and code as the 
existing 'replication-offset-checkpoint'. The contents of the file is the 
position in the log up to which the cleaner has cleaned.

Current State

This patch is mostly functional with a number of known limitations:
1. It is a lot of code, so there are likely bugs. I think most bugs would only 
effect log cleaning.
2. The cleaner is somewhat inefficient. Current it does about 11MB/sec. I 
suspect this can be increased to around 70-100MB/sec by implementing batching 
of writes. I will do this as a follow-up ticket.
3. I do not properly handle compressed logs. Cleaning will work correctly but 
all messages are written uncompressed. The reason for this is that logically it 
is pretty complex to figure out what codec messages should be written with 
(since there may be a mixture of compression types in the log). Rather then try 
to handle this now, I think it makes more sense to implement dynamic config and 
then add a new config for log compression so that each topic has a single 
compression type that all messages are written with.
4. It would be nice to seed the hash with a different seed for each run so that 
collisions would get handled in the next run. This will also be done in a 
follow-up patch.
5. It would be nice to integrate the torture test into the nightly integration 
test framework (since it is a pass/fail test). I will work to do this as a 
separate item.

I would like to get this in in the current state and work on making log config 
dynamic. Without that feature this is not very useful since you have to bounce 
the server every time you add a new topic to set the cleanup policy. Once that 
is done we can use it for real features which will likely uncover more issues 
then further testing now.

Status of Testing

- There is reasonable unit test coverage but I will likely add additional tests 
as real usage uncovers corner cases
- I can run the torture test for many hours on a few dozen gb of data and get 
correct results.

                
> Implement log compaction
> ------------------------
>
>                 Key: KAFKA-631
>                 URL: https://issues.apache.org/jira/browse/KAFKA-631
>             Project: Kafka
>          Issue Type: New Feature
>          Components: core
>    Affects Versions: 0.8.1
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: KAFKA-631-v1.patch
>
>
> Currently Kafka has only one way to bound the space of the log, namely by 
> deleting old segments. The policy that controls which segments are deleted 
> can be configured based either on the number of bytes to retain or the age of 
> the messages. This makes sense for event or log data which has no notion of 
> primary key. However lots of data has a primary key and consists of updates 
> by primary key. For this data it would be nice to be able to ensure that the 
> log contained at least the last version of every key.
> As an example, say that the Kafka topic contains a sequence of User Account 
> messages, each capturing the current state of a given user account. Rather 
> than simply discarding old segments, since the set of user accounts is 
> finite, it might make more sense to delete individual records that have been 
> made obsolete by a more recent update for the same key. This would ensure 
> that the topic contained at least the current state of each record.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to