Hey Josh, NoSQL DBs may actually be easier because they themselves generally don't have a global order. I.e. I believe Mongo has a per-partition oplog, is that right? Their partitions would match our partitions.
-Jay On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader <jrader...@gmail.com> wrote: > Thanks everyone for your responses! These are great. It seems our cases > matches closest to Jay's recommendations. > > The one part that sounds a little tricky is point #5 'Include in each > message the database's transaction id, scn, or other identifier '. This is > pretty straightforward with the RDBMS case that I mentioned, but I could > see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo) > which might not always have a readily available monotonic id, particularly > in failover scenarios. I guess in that case we can think about creating > this id ourselves from the single producer. > > Xiao, > > I think in the Kafka failover cases you mention if we also store the offset > with replicated data we should be able to pick up where we left off since > we are using the low level consumer. Maybe I am missing your point > though... > > Guozhang, > > Very good point that we didn't think of. We will need to think this > through, as you say avoid resending other messages in a batch if one is > failed. I wonder if we might also manage this on the consumer side too > with idempotency. Thanks for raising this! > > Josh > > > > On Tue, Mar 3, 2015 at 6:08 PM, Xiao <lixiao1...@gmail.com> wrote: > > > Hey Josh, > > > > Sorry, after reading codes, Kafka did fsync the data using a separate > > thread. The recovery point (oldest transaction timestamp) can be got from > > the file recovery-point-offset-checkpoint. > > > > You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if > you > > think the speed is not quick enough. When the workloads is huge, the > > bottleneck could be in your target side or source side. That means, your > > apply could have enough jobs to do. > > > > Basically, you need to keep reading this file for determining the oldest > > timestamps of all relevant partitions. Then, apply the transactions until > > that timestamp. > > > > Note, this does not protect the transaction consistency. This is just for > > ensuring the data at the target side is consistent at one timestamp when > > you have multiple channel to send data changes. The implementation should > > be simple if you can understand the concepts. I am unable to find the > filed > > patent application about it. This is one related paper. It covers the > main > > concepts about the issues you are facing. "Inter-Data-Center Large-Scale > > Database Replication Optimization – A Workload Driven Partitioning > Approach" > > > > Hopefully, you understood what I explained above. > > > > Best wishes, > > > > Xiao Li > > > > Best wishes, > > > > Xiao Li > > > > On Mar 3, 2015, at 4:23 PM, Xiao <lixiao1...@gmail.com> wrote: > > > > > Hey Josh, > > > > > > If you put different tables into different partitions or topics, it > > might break transaction ACID at the target side. This is risky for some > use > > cases. Besides unit of work issues, you also need to think about the load > > balancing too. > > > > > > For failover, you have to find the timestamp for point-in-time > > consistency. This part is tricky. You have to ensure all the changes > before > > a specific timestamp have been flushed to the disk. Normally, you can > > maintain a bookmark for different partition at the target side to know > what > > is the oldest transactions have been flushed to the disk. Unfortunately, > > based on my understanding, Kafka is unable to do it because it does not > do > > fsync regularly for achieving better throughput. > > > > > > Best wishes, > > > > > > Xiao Li > > > > > > > > > On Mar 3, 2015, at 3:45 PM, Xiao <lixiao1...@gmail.com> wrote: > > > > > >> Hey Josh, > > >> > > >> Transactions can be applied in parallel in the consumer side based on > > transaction dependency checking. > > >> > > >> http://www.google.com.ar/patents/US20080163222 > > >> > > >> This patent documents how it work. It is easy to understand, however, > > you also need to consider the hash collision issues. This has been > > implemented in IBM Q Replication since 2001. > > >> > > >> Thanks, > > >> > > >> Xiao Li > > >> > > >> > > >> On Mar 3, 2015, at 3:36 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > >> > > >>> Hey Josh, > > >>> > > >>> As you say, ordering is per partition. Technically it is generally > > possible > > >>> to publish all changes to a database to a single partition--generally > > the > > >>> kafka partition should be high throughput enough to keep up. However > > there > > >>> are a couple of downsides to this: > > >>> 1. Consumer parallelism is limited to one. If you want a total order > > to the > > >>> consumption of messages you need to have just 1 process, but often > you > > >>> would want to parallelize. > > >>> 2. Often what people want is not a full stream of all changes in all > > tables > > >>> in a database but rather the changes to a particular table. > > >>> > > >>> To some extent the best way to do this depends on what you will do > > with the > > >>> data. However if you intend to have lots > > >>> > > >>> I have seen pretty much every variation on this in the wild, and here > > is > > >>> what I would recommend: > > >>> 1. Have a single publisher process that publishes events into Kafka > > >>> 2. If possible use the database log to get these changes (e.g. mysql > > >>> binlog, Oracle xstreams, golden gate, etc). This will be more > complete > > and > > >>> more efficient than polling for changes, though that can work too. > > >>> 3. Publish each table to its own topic. > > >>> 4. Partition each topic by the primary key of the table. > > >>> 5. Include in each message the database's transaction id, scn, or > other > > >>> identifier that gives the total order within the record stream. Since > > there > > >>> is a single publisher this id will be monotonic within each > partition. > > >>> > > >>> This seems to be the best set of tradeoffs for most use cases: > > >>> - You can have parallel consumers up to the number of partitions you > > chose > > >>> that still get messages in order per ID'd entity. > > >>> - You can subscribe to just one table if you like, or to multiple > > tables. > > >>> - Consumers who need a total order over all updates can do a "merge" > > across > > >>> the partitions to reassemble the fully ordered set of changes across > > all > > >>> tables/partitions. > > >>> > > >>> One thing to note is that the requirement of having a single consumer > > >>> process/thread to get the total order isn't really so much a Kafka > > >>> restriction as it just is a restriction about the world, since if you > > had > > >>> multiple threads even if you delivered messages to them in order > their > > >>> processing might happen out of order (just do to the random timing of > > the > > >>> processing). > > >>> > > >>> -Jay > > >>> > > >>> > > >>> > > >>> On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader <jrader...@gmail.com> > > wrote: > > >>> > > >>>> Hi Kafka Experts, > > >>>> > > >>>> > > >>>> > > >>>> We have a use case around RDBMS replication where we are > investigating > > >>>> Kafka. In this case ordering is very important. Our understanding > is > > >>>> ordering is only preserved within a single partition. This makes > > sense as > > >>>> a single thread will consume these messages, but our question is can > > we > > >>>> somehow parallelize this for better performance? Is there maybe > some > > >>>> partition key strategy trick to have your cake and eat it too in > > terms of > > >>>> keeping ordering, but also able to parallelize the processing? > > >>>> > > >>>> > > >>>> > > >>>> I am sorry if this has already been asked, but we tried to search > > through > > >>>> the archives and couldn’t find this response. > > >>>> > > >>>> > > >>>> > > >>>> Thanks, > > >>>> > > >>>> Josh > > >>>> > > >> > > > > > > > >