Hello Josh, We do not have a ticket open for idempotent producer as it is still in the discussion process, but here is the wiki:
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer As for transactional messaging, we have a prototype implementation at LinkedIn which is not yet incorporated into Apache JIRA, you can find its design in this wiki: https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka Guozhang On Mon, Mar 16, 2015 at 4:21 AM, Josh Rader <jrader...@gmail.com> wrote: > Thanks Guozhang. Are there JIRAs created for tracking idempotent producer > or transactional messaging features? Maybe we can pick up some of the > tasks to expedite the release? > > On Fri, Mar 6, 2015 at 1:53 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Josh, > > > > Dedupping on the consumer side may be tricky as it requires some sequence > > number on the messages in order to achieve idempotency. On the other > hand, > > we are planning to add idempotent producer or transactional messaging > > > > https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka > > > > Guozhang > > > > On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader <jrader...@gmail.com> wrote: > > > > > Thanks everyone for your responses! These are great. It seems our > cases > > > matches closest to Jay's recommendations. > > > > > > The one part that sounds a little tricky is point #5 'Include in each > > > message the database's transaction id, scn, or other identifier '. > This > > is > > > pretty straightforward with the RDBMS case that I mentioned, but I > could > > > see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo) > > > which might not always have a readily available monotonic id, > > particularly > > > in failover scenarios. I guess in that case we can think about > creating > > > this id ourselves from the single producer. > > > > > > Xiao, > > > > > > I think in the Kafka failover cases you mention if we also store the > > offset > > > with replicated data we should be able to pick up where we left off > since > > > we are using the low level consumer. Maybe I am missing your point > > > though... > > > > > > Guozhang, > > > > > > Very good point that we didn't think of. We will need to think this > > > through, as you say avoid resending other messages in a batch if one is > > > failed. I wonder if we might also manage this on the consumer side too > > > with idempotency. Thanks for raising this! > > > > > > Josh > > > > > > > > > > > > On Tue, Mar 3, 2015 at 6:08 PM, Xiao <lixiao1...@gmail.com> wrote: > > > > > > > Hey Josh, > > > > > > > > Sorry, after reading codes, Kafka did fsync the data using a separate > > > > thread. The recovery point (oldest transaction timestamp) can be got > > from > > > > the file recovery-point-offset-checkpoint. > > > > > > > > You can adjust the value config.logFlushOffsetCheckpointIntervalMs, > if > > > you > > > > think the speed is not quick enough. When the workloads is huge, the > > > > bottleneck could be in your target side or source side. That means, > > your > > > > apply could have enough jobs to do. > > > > > > > > Basically, you need to keep reading this file for determining the > > oldest > > > > timestamps of all relevant partitions. Then, apply the transactions > > until > > > > that timestamp. > > > > > > > > Note, this does not protect the transaction consistency. This is just > > for > > > > ensuring the data at the target side is consistent at one timestamp > > when > > > > you have multiple channel to send data changes. The implementation > > should > > > > be simple if you can understand the concepts. I am unable to find the > > > filed > > > > patent application about it. This is one related paper. It covers the > > > main > > > > concepts about the issues you are facing. "Inter-Data-Center > > Large-Scale > > > > Database Replication Optimization – A Workload Driven Partitioning > > > Approach" > > > > > > > > Hopefully, you understood what I explained above. > > > > > > > > Best wishes, > > > > > > > > Xiao Li > > > > > > > > Best wishes, > > > > > > > > Xiao Li > > > > > > > > On Mar 3, 2015, at 4:23 PM, Xiao <lixiao1...@gmail.com> wrote: > > > > > > > > > Hey Josh, > > > > > > > > > > If you put different tables into different partitions or topics, it > > > > might break transaction ACID at the target side. This is risky for > some > > > use > > > > cases. Besides unit of work issues, you also need to think about the > > load > > > > balancing too. > > > > > > > > > > For failover, you have to find the timestamp for point-in-time > > > > consistency. This part is tricky. You have to ensure all the changes > > > before > > > > a specific timestamp have been flushed to the disk. Normally, you can > > > > maintain a bookmark for different partition at the target side to > know > > > what > > > > is the oldest transactions have been flushed to the disk. > > Unfortunately, > > > > based on my understanding, Kafka is unable to do it because it does > not > > > do > > > > fsync regularly for achieving better throughput. > > > > > > > > > > Best wishes, > > > > > > > > > > Xiao Li > > > > > > > > > > > > > > > On Mar 3, 2015, at 3:45 PM, Xiao <lixiao1...@gmail.com> wrote: > > > > > > > > > >> Hey Josh, > > > > >> > > > > >> Transactions can be applied in parallel in the consumer side based > > on > > > > transaction dependency checking. > > > > >> > > > > >> http://www.google.com.ar/patents/US20080163222 > > > > >> > > > > >> This patent documents how it work. It is easy to understand, > > however, > > > > you also need to consider the hash collision issues. This has been > > > > implemented in IBM Q Replication since 2001. > > > > >> > > > > >> Thanks, > > > > >> > > > > >> Xiao Li > > > > >> > > > > >> > > > > >> On Mar 3, 2015, at 3:36 PM, Jay Kreps <jay.kr...@gmail.com> > wrote: > > > > >> > > > > >>> Hey Josh, > > > > >>> > > > > >>> As you say, ordering is per partition. Technically it is > generally > > > > possible > > > > >>> to publish all changes to a database to a single > > partition--generally > > > > the > > > > >>> kafka partition should be high throughput enough to keep up. > > However > > > > there > > > > >>> are a couple of downsides to this: > > > > >>> 1. Consumer parallelism is limited to one. If you want a total > > order > > > > to the > > > > >>> consumption of messages you need to have just 1 process, but > often > > > you > > > > >>> would want to parallelize. > > > > >>> 2. Often what people want is not a full stream of all changes in > > all > > > > tables > > > > >>> in a database but rather the changes to a particular table. > > > > >>> > > > > >>> To some extent the best way to do this depends on what you will > do > > > > with the > > > > >>> data. However if you intend to have lots > > > > >>> > > > > >>> I have seen pretty much every variation on this in the wild, and > > here > > > > is > > > > >>> what I would recommend: > > > > >>> 1. Have a single publisher process that publishes events into > Kafka > > > > >>> 2. If possible use the database log to get these changes (e.g. > > mysql > > > > >>> binlog, Oracle xstreams, golden gate, etc). This will be more > > > complete > > > > and > > > > >>> more efficient than polling for changes, though that can work > too. > > > > >>> 3. Publish each table to its own topic. > > > > >>> 4. Partition each topic by the primary key of the table. > > > > >>> 5. Include in each message the database's transaction id, scn, or > > > other > > > > >>> identifier that gives the total order within the record stream. > > Since > > > > there > > > > >>> is a single publisher this id will be monotonic within each > > > partition. > > > > >>> > > > > >>> This seems to be the best set of tradeoffs for most use cases: > > > > >>> - You can have parallel consumers up to the number of partitions > > you > > > > chose > > > > >>> that still get messages in order per ID'd entity. > > > > >>> - You can subscribe to just one table if you like, or to multiple > > > > tables. > > > > >>> - Consumers who need a total order over all updates can do a > > "merge" > > > > across > > > > >>> the partitions to reassemble the fully ordered set of changes > > across > > > > all > > > > >>> tables/partitions. > > > > >>> > > > > >>> One thing to note is that the requirement of having a single > > consumer > > > > >>> process/thread to get the total order isn't really so much a > Kafka > > > > >>> restriction as it just is a restriction about the world, since if > > you > > > > had > > > > >>> multiple threads even if you delivered messages to them in order > > > their > > > > >>> processing might happen out of order (just do to the random > timing > > of > > > > the > > > > >>> processing). > > > > >>> > > > > >>> -Jay > > > > >>> > > > > >>> > > > > >>> > > > > >>> On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader <jrader...@gmail.com> > > > > wrote: > > > > >>> > > > > >>>> Hi Kafka Experts, > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> We have a use case around RDBMS replication where we are > > > investigating > > > > >>>> Kafka. In this case ordering is very important. Our > > understanding > > > is > > > > >>>> ordering is only preserved within a single partition. This > makes > > > > sense as > > > > >>>> a single thread will consume these messages, but our question is > > can > > > > we > > > > >>>> somehow parallelize this for better performance? Is there > maybe > > > some > > > > >>>> partition key strategy trick to have your cake and eat it too in > > > > terms of > > > > >>>> keeping ordering, but also able to parallelize the processing? > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> I am sorry if this has already been asked, but we tried to > search > > > > through > > > > >>>> the archives and couldn’t find this response. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> Thanks, > > > > >>>> > > > > >>>> Josh > > > > >>>> > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang