Damian, I appreciate your quick response. Our transaction_id is incrementing for each transaction, so we will only ever have one message in Kafka with a given transaction_id. We thought about using a rolling counter that is incremented on each checkpoint as the key, and manually triggering compaction after the checkpoint is complete, but our checkpoints are asynchronous. This means that we would have a set of messages appended to the log after the checkpoint started, with value of the previous key + 1, that would also be compacted down to a single entry.
Our particular custom policy would delete all messages whose key was less than a given transaction_id that we passed in. I can imagine a wide variety of other custom policies that could be used for retention based on the key and value of the message. On Wed, Jan 20, 2016 at 1:35 PM, Bill Warshaw <bill.wars...@appian.com> wrote: > Hello, > > I'm working on a team that is starting to use Kafka as a distributed > transaction log for a set of in-memory databases which can be replicated > across nodes. We decided to use Kafka instead of Bookkeeper for a variety > of reasons, but there are a couple spots where Kafka is not a perfect fit. > > The biggest issue facing us is deleting old transactions from the log > after checkpointing the database. We can't use any of the built-in size or > time-based deletion mechanisms efficiently, because we could get ourselves > into a dangerous state where we're deleting transactions that haven't been > checkpointed yet. The current approach we're looking at is rolling a new > topic each time we checkpoint, and deleting the old topic once all replicas > have consumed everything in it. > > Another idea we came up with is using a pluggable compaction policy; we > would set the message key as the offset or transaction id, and the policy > would delete all messages with a key smaller than that id. > I took a stab at implementing the hook in Kafka for pluggable compaction > policies at > > https://github.com/apache/kafka/compare/trunk...bill-warshaw:pluggable_compaction_policy > (rough implementation), and it seems fairly straightforward. One problem > that we run into is that the custom policy class can only access > information that is defined in the configuration, and the configuration > doesn't allow custom key-value pairs; if we wanted to pass it information > dynamically, we'd have to use some hack like calling Zookeeper from within > the class. > To get around this, my best idea is to add the ability to specify > arbitrary key-value pairs in the configuration, that our client could use > to pass information to the custom policy. Does this set off any alarm > bells for you guys? If so, are there other approaches we could take that > come to mind? > > > Thanks for your time, > Bill Warshaw > > -- <http://appianworld.com> This message and any attachments are solely for the intended recipient. If you are not the intended recipient, disclosure, copying, use, or distribution of the information included in this message is prohibited -- please immediately and permanently delete this message.