[ https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jay Kreps updated KAFKA-631: ---------------------------- Attachment: KAFKA-631-v1.patch This patch implements more or less what was described above. Specific Changes: - OffsetCheckpoint.scala: Generalize HighwaterMarkCheckpoint to OffsetCheckpoint for use in tracking the cleaner point. In the future we would use this for flush point too, if possible. - Move configuration parameters in Log to a single class, LogConfig, to prepare for dynamically changing log configuration (also a nice cleanup) - Implement a cleaner process in LogCleaner.scala that cleans logs, this is mostly standalone code. It is complicated but doesn't really touch anything else. - Implement an efficient OffsetMap (and associated tests) for log deduplication - Add an API in Log.scala that allows swapping in segments. This api is fairly specific to the cleaner for now and is not a public api. - Refactor segment delete in Log.scala to allow reuse of the async delete functionality in segment swap - Add logic in log recovery (Log.scala) to handle the case of a crash in the middle of cleaning or file swaps. - Add a set of unit tests on cleaner logic (CleanerTest.scala), an integration test (LogCleanerIntegrationTest.scala) for the cleaner, and a torture test to run against a standalone server (TestLogCleaning.scala). The torture test produces a bunch of messages to a server over a long period of time and simultaneously logs them out to a text file. Then it uses unix sort to deduce this text file and compares the result to the result of consuming from the topic (if the unique key-set isn't the same for both it throws an error). It also measures the log size reduction. New configuration parameters: # should we default to delete or deduce for the cleanup policy? log.cleanup.policy = delete/dedupe # per-topic override for cleanup policy topic.log.cleanup.policy = topic:delete/dedupe, … # number of background threads to use for log cleaning log.cleaner.threads=1 # maximum I/O the cleaner is allowed to do (read & write combined) log.cleaner.io.max.bytes.per.second=Double.MaxValue # the maximum memory the cleaner can use log.cleaner.buffer.size=100MB # the amount of time to sleep when there is no cleaning to do log.cleaner.backoff.ms=30secs # minimum ratio of new to old messages the log must have for cleaning to proceed log.cleaner.min.cleanable.ratio=0.5 I also changed the configuration log.cleanup.interval.mins to log.retention.check.interval.ms because the word "cleanup" is confusing. New Persistent Data This patch adds a new persistent data structure, a per-data directory file 'cleaner-offset-checkpoint'. This is the exact same format and code as the existing 'replication-offset-checkpoint'. The contents of the file is the position in the log up to which the cleaner has cleaned. Current State This patch is mostly functional with a number of known limitations: 1. It is a lot of code, so there are likely bugs. I think most bugs would only effect log cleaning. 2. The cleaner is somewhat inefficient. Current it does about 11MB/sec. I suspect this can be increased to around 70-100MB/sec by implementing batching of writes. I will do this as a follow-up ticket. 3. I do not properly handle compressed logs. Cleaning will work correctly but all messages are written uncompressed. The reason for this is that logically it is pretty complex to figure out what codec messages should be written with (since there may be a mixture of compression types in the log). Rather then try to handle this now, I think it makes more sense to implement dynamic config and then add a new config for log compression so that each topic has a single compression type that all messages are written with. 4. It would be nice to seed the hash with a different seed for each run so that collisions would get handled in the next run. This will also be done in a follow-up patch. 5. It would be nice to integrate the torture test into the nightly integration test framework (since it is a pass/fail test). I will work to do this as a separate item. I would like to get this in in the current state and work on making log config dynamic. Without that feature this is not very useful since you have to bounce the server every time you add a new topic to set the cleanup policy. Once that is done we can use it for real features which will likely uncover more issues then further testing now. Status of Testing - There is reasonable unit test coverage but I will likely add additional tests as real usage uncovers corner cases - I can run the torture test for many hours on a few dozen gb of data and get correct results. > Implement log compaction > ------------------------ > > Key: KAFKA-631 > URL: https://issues.apache.org/jira/browse/KAFKA-631 > Project: Kafka > Issue Type: New Feature > Components: core > Affects Versions: 0.8.1 > Reporter: Jay Kreps > Assignee: Jay Kreps > Attachments: KAFKA-631-v1.patch > > > Currently Kafka has only one way to bound the space of the log, namely by > deleting old segments. The policy that controls which segments are deleted > can be configured based either on the number of bytes to retain or the age of > the messages. This makes sense for event or log data which has no notion of > primary key. However lots of data has a primary key and consists of updates > by primary key. For this data it would be nice to be able to ensure that the > log contained at least the last version of every key. > As an example, say that the Kafka topic contains a sequence of User Account > messages, each capturing the current state of a given user account. Rather > than simply discarding old segments, since the set of user accounts is > finite, it might make more sense to delete individual records that have been > made obsolete by a more recent update for the same key. This would ensure > that the topic contained at least the current state of each record. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira