[ https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13562027#comment-13562027 ]
Jay Kreps commented on KAFKA-631: --------------------------------- Sriram--These are great suggestions. For the most part I am taking these to be "future work" because I don't think they block a "minimally viable product" which is what I hope to get in first. My intuition is to avoid doing anything hard or complicated until we have real operational experience with this functionality because otherwise we end up building a ton of fancy stuff that solves the wrong problems. Patches would be gladly accepted, though. :-) 1. This is a good suggestion. There is an additional assumption which is combining read and write I/O. Read I/O may be coming out of pagecache (shared) or from disks (not shared). Likewise it isn't really the number of disks per se since a RAID setup would effectively pool the I/O of all the disks (making the global throttler correct). We support multiple data directories with the recommendation that each data directory be a disk. We also know the mapping of log->data_directory. If we relied on this assumption we could do the throttling per data directory without too much difficulty. Of course that creates another additional scheduling problem which is that we should ideally choose a cleaning schedule that balances load over data directories. In any case, I think the global throttle, while not as precise as it could be, is pretty good. So I am going to add this to the "future work" page. 2. Yes. In fact the current code can generate segments with size 0. This is okay though. There is nothing too bad about having a few small files. We just can't accumulate an unbounded number of small files that never disappear (some combining must occur). Small files will get cleaned up in the next run. So I knowingly chose this heuristic rather than doing dynamic grouping because it made the code easier and simpler to test (i.e. I can test grouping separate from cleaning). 3. Since you have to size your heap statically in the case of a single thread shrinking the map size doesn't help anyone. Having a very sparse map just makes duplicates unlikely. However in the case where you had two threads it would be possible to schedule cleanings in such a way that you allocated small buffers for small logs and big buffers for big logs instead of medium buffers for both. Since these threads progress independently, though, it would be a bit complicated. Probably the small log would finish soon, so you would have to keep finding more small logs for the duration of the cleaning of the large log. And when the large cleaning did happen, you would probably have a small cleaning in progress so you would have to start another cleaning with the same large buffer size if you wanted memory to remain fixed. However one thing this brings up is that if your logs are non-uniform having non-uniform buffers (even if they are statically sized) could make it so you were able to efficiently clean large logs with less memory provided your scheduling was sophisticated enough. There are a number of gotchas here though. 4. I created a cleaner log and after each cleaning I log the full cleaner stats (time, mb/sec, size reduction, etc). 5. There are three tests in the patch. A simple non-threaded method-by-method unit test. A junit integration test of the full cleaner running as a background thread with concurrent appends. Finally a stand-alone torture test that runs against an arbitrary broker by producing to N topics and recording all its produced messages, then consuming from the broker to a file, then sorting and deduplicating both files by brute force and comparing them exactly. This later test is very comprehensive and runs over many hours and can test any broker configuration. I ran it with multiple threads to validate that case (and found some bugs, that i fixed). I think a third thing that could be done (but which I haven't done) is to build a stand-alone log duplication checker that consumes a topic/partition and estimates the duplication of keys using a bloom filter or something like that. I haven't done the later. 5. Intuitively this should not be true. By definition "independent" means that sequential salt should perform as well as well as any other salt or else that would be an attack on md5, no? > Implement log compaction > ------------------------ > > Key: KAFKA-631 > URL: https://issues.apache.org/jira/browse/KAFKA-631 > Project: Kafka > Issue Type: New Feature > Components: core > Affects Versions: 0.8.1 > Reporter: Jay Kreps > Assignee: Jay Kreps > Attachments: KAFKA-631-v1.patch, KAFKA-631-v2.patch, > KAFKA-631-v3.patch, KAFKA-631-v4.patch, KAFKA-631-v5.patch > > > Currently Kafka has only one way to bound the space of the log, namely by > deleting old segments. The policy that controls which segments are deleted > can be configured based either on the number of bytes to retain or the age of > the messages. This makes sense for event or log data which has no notion of > primary key. However lots of data has a primary key and consists of updates > by primary key. For this data it would be nice to be able to ensure that the > log contained at least the last version of every key. > As an example, say that the Kafka topic contains a sequence of User Account > messages, each capturing the current state of a given user account. Rather > than simply discarding old segments, since the set of user accounts is > finite, it might make more sense to delete individual records that have been > made obsolete by a more recent update for the same key. This would ensure > that the topic contained at least the current state of each record. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira