Hi Jonathan, TCP will take care of re-ordering the packets.
On Wed, Mar 4, 2015 at 6:05 PM, Jonathan Hodges <hodg...@gmail.com> wrote: > Thanks James. This is really helpful. Another extreme edge case might be > that the single producer is sending the database log changes and the > network causes them to reach Kafka out of order. How do you prevent > something like this, I guess relying on the scn on the consumer side? > > > On Wed, Mar 4, 2015 at 5:59 PM, James Cheng <jch...@tivo.com> wrote: > > > Another thing to think about is delivery guarantees. Exactly once, at > > least once, etc. > > > > If you have a publisher that consumes from the database log and pushes > out > > to Kafka, and then the publisher crashes, what happens when it starts > back > > up? Depending on how you keep track of the database's transaction > > id/scn/offset, you may end up re-publishing events that you already > > published out to the kafka topic. > > > > I am also working on database replication, namely from MySQL to Kafka. > I'm > > using some of the ideas from > http://ben.kirw.in/2014/11/28/kafka-patterns/ > > in order to get exactly once processing, so that I don't have any > > duplicates in my kafka stream. > > > > Specifically, I have the publisher write messages to a single topic (I > > think/hope that Kafka's throughput is high enough). I include MySQL's > > binary log coordinates into my output messages. Upon startup, I read back > > the "end" of my topic to find out what messages I published. This gives > me > > 2 pieces of information: > > 1) The MySQL binary log coordinates, so I know where to start again. > > 2) The messages that I last published, to make sure that I don't > > re-publish them. > > > > That does mean that all data from all tables is in a single topic. I will > > probably have a consumer that will read that "all tables" topic, and > split > > the data out into separate topics, for consumers who just want a subset > of > > the data. > > > > -James > > > > On Mar 4, 2015, at 9:28 AM, Jonathan Hodges <hodg...@gmail.com> wrote: > > > > > Yes you are right on the oplog per partition as well as that mapping > well > > > to the Kafka partitions. I think we are making this harder than it is > > > based on previous attempts and trying to leverage something like > Databus > > > for propagating log changes from MongoDB and Cassandra since it > requires > > a > > > scn. Sounds like direct Kafka makes more sense for these use cases. > > > Thanks again! > > > > > > > > > On Wed, Mar 4, 2015 at 8:56 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > >> Hey Josh, > > >> > > >> NoSQL DBs may actually be easier because they themselves generally > don't > > >> have a global order. I.e. I believe Mongo has a per-partition oplog, > is > > >> that right? Their partitions would match our partitions. > > >> > > >> -Jay > > >> > > >> On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader <jrader...@gmail.com> > wrote: > > >> > > >>> Thanks everyone for your responses! These are great. It seems our > > cases > > >>> matches closest to Jay's recommendations. > > >>> > > >>> The one part that sounds a little tricky is point #5 'Include in each > > >>> message the database's transaction id, scn, or other identifier '. > > This > > >> is > > >>> pretty straightforward with the RDBMS case that I mentioned, but I > > could > > >>> see wanting to extend this to replicate NoSQL stores (Cassandra, > Mongo) > > >>> which might not always have a readily available monotonic id, > > >> particularly > > >>> in failover scenarios. I guess in that case we can think about > > creating > > >>> this id ourselves from the single producer. > > >>> > > >>> Xiao, > > >>> > > >>> I think in the Kafka failover cases you mention if we also store the > > >> offset > > >>> with replicated data we should be able to pick up where we left off > > since > > >>> we are using the low level consumer. Maybe I am missing your point > > >>> though... > > >>> > > >>> Guozhang, > > >>> > > >>> Very good point that we didn't think of. We will need to think this > > >>> through, as you say avoid resending other messages in a batch if one > is > > >>> failed. I wonder if we might also manage this on the consumer side > too > > >>> with idempotency. Thanks for raising this! > > >>> > > >>> Josh > > >>> > > >>> > > >>> > > >>> On Tue, Mar 3, 2015 at 6:08 PM, Xiao <lixiao1...@gmail.com> wrote: > > >>> > > >>>> Hey Josh, > > >>>> > > >>>> Sorry, after reading codes, Kafka did fsync the data using a > separate > > >>>> thread. The recovery point (oldest transaction timestamp) can be got > > >> from > > >>>> the file recovery-point-offset-checkpoint. > > >>>> > > >>>> You can adjust the value config.logFlushOffsetCheckpointIntervalMs, > if > > >>> you > > >>>> think the speed is not quick enough. When the workloads is huge, the > > >>>> bottleneck could be in your target side or source side. That means, > > >> your > > >>>> apply could have enough jobs to do. > > >>>> > > >>>> Basically, you need to keep reading this file for determining the > > >> oldest > > >>>> timestamps of all relevant partitions. Then, apply the transactions > > >> until > > >>>> that timestamp. > > >>>> > > >>>> Note, this does not protect the transaction consistency. This is > just > > >> for > > >>>> ensuring the data at the target side is consistent at one timestamp > > >> when > > >>>> you have multiple channel to send data changes. The implementation > > >> should > > >>>> be simple if you can understand the concepts. I am unable to find > the > > >>> filed > > >>>> patent application about it. This is one related paper. It covers > the > > >>> main > > >>>> concepts about the issues you are facing. "Inter-Data-Center > > >> Large-Scale > > >>>> Database Replication Optimization - A Workload Driven Partitioning > > >>> Approach" > > >>>> > > >>>> Hopefully, you understood what I explained above. > > >>>> > > >>>> Best wishes, > > >>>> > > >>>> Xiao Li > > >>>> > > >>>> Best wishes, > > >>>> > > >>>> Xiao Li > > >>>> > > >>>> On Mar 3, 2015, at 4:23 PM, Xiao <lixiao1...@gmail.com> wrote: > > >>>> > > >>>>> Hey Josh, > > >>>>> > > >>>>> If you put different tables into different partitions or topics, it > > >>>> might break transaction ACID at the target side. This is risky for > > some > > >>> use > > >>>> cases. Besides unit of work issues, you also need to think about the > > >> load > > >>>> balancing too. > > >>>>> > > >>>>> For failover, you have to find the timestamp for point-in-time > > >>>> consistency. This part is tricky. You have to ensure all the changes > > >>> before > > >>>> a specific timestamp have been flushed to the disk. Normally, you > can > > >>>> maintain a bookmark for different partition at the target side to > know > > >>> what > > >>>> is the oldest transactions have been flushed to the disk. > > >> Unfortunately, > > >>>> based on my understanding, Kafka is unable to do it because it does > > not > > >>> do > > >>>> fsync regularly for achieving better throughput. > > >>>>> > > >>>>> Best wishes, > > >>>>> > > >>>>> Xiao Li > > >>>>> > > >>>>> > > >>>>> On Mar 3, 2015, at 3:45 PM, Xiao <lixiao1...@gmail.com> wrote: > > >>>>> > > >>>>>> Hey Josh, > > >>>>>> > > >>>>>> Transactions can be applied in parallel in the consumer side based > > >> on > > >>>> transaction dependency checking. > > >>>>>> > > >>>>>> http://www.google.com.ar/patents/US20080163222 > > >>>>>> > > >>>>>> This patent documents how it work. It is easy to understand, > > >> however, > > >>>> you also need to consider the hash collision issues. This has been > > >>>> implemented in IBM Q Replication since 2001. > > >>>>>> > > >>>>>> Thanks, > > >>>>>> > > >>>>>> Xiao Li > > >>>>>> > > >>>>>> > > >>>>>> On Mar 3, 2015, at 3:36 PM, Jay Kreps <jay.kr...@gmail.com> > wrote: > > >>>>>> > > >>>>>>> Hey Josh, > > >>>>>>> > > >>>>>>> As you say, ordering is per partition. Technically it is > generally > > >>>> possible > > >>>>>>> to publish all changes to a database to a single > > >> partition--generally > > >>>> the > > >>>>>>> kafka partition should be high throughput enough to keep up. > > >> However > > >>>> there > > >>>>>>> are a couple of downsides to this: > > >>>>>>> 1. Consumer parallelism is limited to one. If you want a total > > >> order > > >>>> to the > > >>>>>>> consumption of messages you need to have just 1 process, but > often > > >>> you > > >>>>>>> would want to parallelize. > > >>>>>>> 2. Often what people want is not a full stream of all changes in > > >> all > > >>>> tables > > >>>>>>> in a database but rather the changes to a particular table. > > >>>>>>> > > >>>>>>> To some extent the best way to do this depends on what you will > do > > >>>> with the > > >>>>>>> data. However if you intend to have lots > > >>>>>>> > > >>>>>>> I have seen pretty much every variation on this in the wild, and > > >> here > > >>>> is > > >>>>>>> what I would recommend: > > >>>>>>> 1. Have a single publisher process that publishes events into > Kafka > > >>>>>>> 2. If possible use the database log to get these changes (e.g. > > >> mysql > > >>>>>>> binlog, Oracle xstreams, golden gate, etc). This will be more > > >>> complete > > >>>> and > > >>>>>>> more efficient than polling for changes, though that can work > too. > > >>>>>>> 3. Publish each table to its own topic. > > >>>>>>> 4. Partition each topic by the primary key of the table. > > >>>>>>> 5. Include in each message the database's transaction id, scn, or > > >>> other > > >>>>>>> identifier that gives the total order within the record stream. > > >> Since > > >>>> there > > >>>>>>> is a single publisher this id will be monotonic within each > > >>> partition. > > >>>>>>> > > >>>>>>> This seems to be the best set of tradeoffs for most use cases: > > >>>>>>> - You can have parallel consumers up to the number of partitions > > >> you > > >>>> chose > > >>>>>>> that still get messages in order per ID'd entity. > > >>>>>>> - You can subscribe to just one table if you like, or to multiple > > >>>> tables. > > >>>>>>> - Consumers who need a total order over all updates can do a > > >> "merge" > > >>>> across > > >>>>>>> the partitions to reassemble the fully ordered set of changes > > >> across > > >>>> all > > >>>>>>> tables/partitions. > > >>>>>>> > > >>>>>>> One thing to note is that the requirement of having a single > > >> consumer > > >>>>>>> process/thread to get the total order isn't really so much a > Kafka > > >>>>>>> restriction as it just is a restriction about the world, since if > > >> you > > >>>> had > > >>>>>>> multiple threads even if you delivered messages to them in order > > >>> their > > >>>>>>> processing might happen out of order (just do to the random > timing > > >> of > > >>>> the > > >>>>>>> processing). > > >>>>>>> > > >>>>>>> -Jay > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader <jrader...@gmail.com> > > >>>> wrote: > > >>>>>>> > > >>>>>>>> Hi Kafka Experts, > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> We have a use case around RDBMS replication where we are > > >>> investigating > > >>>>>>>> Kafka. In this case ordering is very important. Our > > >> understanding > > >>> is > > >>>>>>>> ordering is only preserved within a single partition. This > makes > > >>>> sense as > > >>>>>>>> a single thread will consume these messages, but our question is > > >> can > > >>>> we > > >>>>>>>> somehow parallelize this for better performance? Is there > maybe > > >>> some > > >>>>>>>> partition key strategy trick to have your cake and eat it too in > > >>>> terms of > > >>>>>>>> keeping ordering, but also able to parallelize the processing? > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> I am sorry if this has already been asked, but we tried to > search > > >>>> through > > >>>>>>>> the archives and couldn't find this response. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Thanks, > > >>>>>>>> > > >>>>>>>> Josh > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>>> > > >>> > > >> > > > > >