Hello Bill, Just to clarify your use case, is your "log compaction" executed manually, or it is triggered periodically like the current log cleaning by-key does? If it is the latter case, how will you advance the "threshold transaction_id" each time when it executes?
Guozhang On Wed, Jan 20, 2016 at 1:50 PM, Bill Warshaw <bill.wars...@appian.com> wrote: > Damian, I appreciate your quick response. > > Our transaction_id is incrementing for each transaction, so we will only > ever have one message in Kafka with a given transaction_id. We thought > about using a rolling counter that is incremented on each checkpoint as the > key, and manually triggering compaction after the checkpoint is complete, > but our checkpoints are asynchronous. This means that we would have a set > of messages appended to the log after the checkpoint started, with value of > the previous key + 1, that would also be compacted down to a single entry. > > Our particular custom policy would delete all messages whose key was less > than a given transaction_id that we passed in. I can imagine a wide > variety of other custom policies that could be used for retention based on > the key and value of the message. > > On Wed, Jan 20, 2016 at 1:35 PM, Bill Warshaw <bill.wars...@appian.com> > wrote: > > > Hello, > > > > I'm working on a team that is starting to use Kafka as a distributed > > transaction log for a set of in-memory databases which can be replicated > > across nodes. We decided to use Kafka instead of Bookkeeper for a > variety > > of reasons, but there are a couple spots where Kafka is not a perfect > fit. > > > > The biggest issue facing us is deleting old transactions from the log > > after checkpointing the database. We can't use any of the built-in size > or > > time-based deletion mechanisms efficiently, because we could get > ourselves > > into a dangerous state where we're deleting transactions that haven't > been > > checkpointed yet. The current approach we're looking at is rolling a new > > topic each time we checkpoint, and deleting the old topic once all > replicas > > have consumed everything in it. > > > > Another idea we came up with is using a pluggable compaction policy; we > > would set the message key as the offset or transaction id, and the policy > > would delete all messages with a key smaller than that id. > > I took a stab at implementing the hook in Kafka for pluggable compaction > > policies at > > > > > https://github.com/apache/kafka/compare/trunk...bill-warshaw:pluggable_compaction_policy > > (rough implementation), and it seems fairly straightforward. One problem > > that we run into is that the custom policy class can only access > > information that is defined in the configuration, and the configuration > > doesn't allow custom key-value pairs; if we wanted to pass it information > > dynamically, we'd have to use some hack like calling Zookeeper from > within > > the class. > > To get around this, my best idea is to add the ability to specify > > arbitrary key-value pairs in the configuration, that our client could use > > to pass information to the custom policy. Does this set off any alarm > > bells for you guys? If so, are there other approaches we could take that > > come to mind? > > > > > > Thanks for your time, > > Bill Warshaw > > > > > > -- > <http://appianworld.com> > This message and any attachments are solely for the intended recipient. If > you are not the intended recipient, disclosure, copying, use, or > distribution of the information included in this message is prohibited -- > please immediately and permanently delete this message. > -- -- Guozhang