Hi Jonathan,

TCP will take care of re-ordering the packets.
On Wed, Mar 4, 2015 at 6:05 PM, Jonathan Hodges <hodg...@gmail.com> wrote:

> Thanks James.  This is really helpful.  Another extreme edge case might be
> that the single producer is sending the database log changes and the
> network causes them to reach Kafka out of order.  How do you prevent
> something like this, I guess relying on the scn on the consumer side?
>
>
> On Wed, Mar 4, 2015 at 5:59 PM, James Cheng <jch...@tivo.com> wrote:
>
> > Another thing to think about is delivery guarantees. Exactly once, at
> > least once, etc.
> >
> > If you have a publisher that consumes from the database log and pushes
> out
> > to Kafka, and then the publisher crashes, what happens when it starts
> back
> > up? Depending on how you keep track of the database's transaction
> > id/scn/offset, you may end up re-publishing events that you already
> > published out to the kafka topic.
> >
> > I am also working on database replication, namely from MySQL to Kafka.
> I'm
> > using some of the ideas from
> http://ben.kirw.in/2014/11/28/kafka-patterns/
> > in order to get exactly once processing, so that I don't have any
> > duplicates in my kafka stream.
> >
> > Specifically, I have the publisher write messages to a single topic (I
> > think/hope that Kafka's throughput is high enough). I include MySQL's
> > binary log coordinates into my output messages. Upon startup, I read back
> > the "end" of my topic to find out what messages I published. This gives
> me
> > 2 pieces of information:
> > 1) The MySQL binary log coordinates, so I know where to start again.
> > 2) The messages that I last published, to make sure that I don't
> > re-publish them.
> >
> > That does mean that all data from all tables is in a single topic. I will
> > probably have a consumer that will read that "all tables" topic, and
> split
> > the data out into separate topics, for consumers who just want a subset
> of
> > the data.
> >
> > -James
> >
> > On Mar 4, 2015, at 9:28 AM, Jonathan Hodges <hodg...@gmail.com> wrote:
> >
> > > Yes you are right on the oplog per partition as well as that mapping
> well
> > > to the Kafka partitions.  I think we are making this harder than it is
> > > based on previous attempts and trying to leverage something like
> Databus
> > > for propagating log changes from MongoDB and Cassandra since it
> requires
> > a
> > > scn.  Sounds like direct Kafka makes more sense for these use cases.
> > > Thanks again!
> > >
> > >
> > > On Wed, Mar 4, 2015 at 8:56 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> > >
> > >> Hey Josh,
> > >>
> > >> NoSQL DBs may actually be easier because they themselves generally
> don't
> > >> have a global order. I.e. I believe Mongo has a per-partition oplog,
> is
> > >> that right? Their partitions would match our partitions.
> > >>
> > >> -Jay
> > >>
> > >> On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader <jrader...@gmail.com>
> wrote:
> > >>
> > >>> Thanks everyone for your responses!  These are great.  It seems our
> > cases
> > >>> matches closest to Jay's recommendations.
> > >>>
> > >>> The one part that sounds a little tricky is point #5 'Include in each
> > >>> message the database's transaction id, scn, or other identifier '.
> > This
> > >> is
> > >>> pretty straightforward with the RDBMS case that I mentioned, but I
> > could
> > >>> see wanting to extend this to replicate NoSQL stores (Cassandra,
> Mongo)
> > >>> which might not always have a readily available monotonic id,
> > >> particularly
> > >>> in failover scenarios.  I guess in that case we can think about
> > creating
> > >>> this id ourselves from the single producer.
> > >>>
> > >>> Xiao,
> > >>>
> > >>> I think in the Kafka failover cases you mention if we also store the
> > >> offset
> > >>> with replicated data we should be able to pick up where we left off
> > since
> > >>> we are using the low level consumer.  Maybe I am missing your point
> > >>> though...
> > >>>
> > >>> Guozhang,
> > >>>
> > >>> Very good point that we didn't think of.  We will need to think this
> > >>> through, as you say avoid resending other messages in a batch if one
> is
> > >>> failed.  I wonder if we might also manage this on the consumer side
> too
> > >>> with idempotency.  Thanks for raising this!
> > >>>
> > >>> Josh
> > >>>
> > >>>
> > >>>
> > >>> On Tue, Mar 3, 2015 at 6:08 PM, Xiao <lixiao1...@gmail.com> wrote:
> > >>>
> > >>>> Hey Josh,
> > >>>>
> > >>>> Sorry, after reading codes, Kafka did fsync the data using a
> separate
> > >>>> thread. The recovery point (oldest transaction timestamp) can be got
> > >> from
> > >>>> the file recovery-point-offset-checkpoint.
> > >>>>
> > >>>> You can adjust the value config.logFlushOffsetCheckpointIntervalMs,
> if
> > >>> you
> > >>>> think the speed is not quick enough. When the workloads is huge, the
> > >>>> bottleneck could be in your target side or source side. That means,
> > >> your
> > >>>> apply could have enough jobs to do.
> > >>>>
> > >>>> Basically, you need to keep reading this file for determining the
> > >> oldest
> > >>>> timestamps of all relevant partitions. Then, apply the transactions
> > >> until
> > >>>> that timestamp.
> > >>>>
> > >>>> Note, this does not protect the transaction consistency. This is
> just
> > >> for
> > >>>> ensuring the data at the target side is consistent at one timestamp
> > >> when
> > >>>> you have multiple channel to send data changes. The implementation
> > >> should
> > >>>> be simple if you can understand the concepts. I am unable to find
> the
> > >>> filed
> > >>>> patent application about it. This is one related paper. It covers
> the
> > >>> main
> > >>>> concepts about the issues you are facing. "Inter-Data-Center
> > >> Large-Scale
> > >>>> Database Replication Optimization - A Workload Driven Partitioning
> > >>> Approach"
> > >>>>
> > >>>> Hopefully, you understood what I explained above.
> > >>>>
> > >>>> Best wishes,
> > >>>>
> > >>>> Xiao Li
> > >>>>
> > >>>> Best wishes,
> > >>>>
> > >>>> Xiao Li
> > >>>>
> > >>>> On Mar 3, 2015, at 4:23 PM, Xiao <lixiao1...@gmail.com> wrote:
> > >>>>
> > >>>>> Hey Josh,
> > >>>>>
> > >>>>> If you put different tables into different partitions or topics, it
> > >>>> might break transaction ACID at the target side. This is risky for
> > some
> > >>> use
> > >>>> cases. Besides unit of work issues, you also need to think about the
> > >> load
> > >>>> balancing too.
> > >>>>>
> > >>>>> For failover, you have to find the timestamp for point-in-time
> > >>>> consistency. This part is tricky. You have to ensure all the changes
> > >>> before
> > >>>> a specific timestamp have been flushed to the disk. Normally, you
> can
> > >>>> maintain a bookmark for different partition at the target side to
> know
> > >>> what
> > >>>> is the oldest transactions have been flushed to the disk.
> > >> Unfortunately,
> > >>>> based on my understanding, Kafka is unable to do it because it does
> > not
> > >>> do
> > >>>> fsync regularly for achieving better throughput.
> > >>>>>
> > >>>>> Best wishes,
> > >>>>>
> > >>>>> Xiao Li
> > >>>>>
> > >>>>>
> > >>>>> On Mar 3, 2015, at 3:45 PM, Xiao <lixiao1...@gmail.com> wrote:
> > >>>>>
> > >>>>>> Hey Josh,
> > >>>>>>
> > >>>>>> Transactions can be applied in parallel in the consumer side based
> > >> on
> > >>>> transaction dependency checking.
> > >>>>>>
> > >>>>>> http://www.google.com.ar/patents/US20080163222
> > >>>>>>
> > >>>>>> This patent documents how it work. It is easy to understand,
> > >> however,
> > >>>> you also need to consider the hash collision issues. This has been
> > >>>> implemented in IBM Q Replication since 2001.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>>
> > >>>>>> Xiao Li
> > >>>>>>
> > >>>>>>
> > >>>>>> On Mar 3, 2015, at 3:36 PM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> > >>>>>>
> > >>>>>>> Hey Josh,
> > >>>>>>>
> > >>>>>>> As you say, ordering is per partition. Technically it is
> generally
> > >>>> possible
> > >>>>>>> to publish all changes to a database to a single
> > >> partition--generally
> > >>>> the
> > >>>>>>> kafka partition should be high throughput enough to keep up.
> > >> However
> > >>>> there
> > >>>>>>> are a couple of downsides to this:
> > >>>>>>> 1. Consumer parallelism is limited to one. If you want a total
> > >> order
> > >>>> to the
> > >>>>>>> consumption of messages you need to have just 1 process, but
> often
> > >>> you
> > >>>>>>> would want to parallelize.
> > >>>>>>> 2. Often what people want is not a full stream of all changes in
> > >> all
> > >>>> tables
> > >>>>>>> in a database but rather the changes to a particular table.
> > >>>>>>>
> > >>>>>>> To some extent the best way to do this depends on what you will
> do
> > >>>> with the
> > >>>>>>> data. However if you intend to have lots
> > >>>>>>>
> > >>>>>>> I have seen pretty much every variation on this in the wild, and
> > >> here
> > >>>> is
> > >>>>>>> what I would recommend:
> > >>>>>>> 1. Have a single publisher process that publishes events into
> Kafka
> > >>>>>>> 2. If possible use the database log to get these changes (e.g.
> > >> mysql
> > >>>>>>> binlog, Oracle xstreams, golden gate, etc). This will be more
> > >>> complete
> > >>>> and
> > >>>>>>> more efficient than polling for changes, though that can work
> too.
> > >>>>>>> 3. Publish each table to its own topic.
> > >>>>>>> 4. Partition each topic by the primary key of the table.
> > >>>>>>> 5. Include in each message the database's transaction id, scn, or
> > >>> other
> > >>>>>>> identifier that gives the total order within the record stream.
> > >> Since
> > >>>> there
> > >>>>>>> is a single publisher this id will be monotonic within each
> > >>> partition.
> > >>>>>>>
> > >>>>>>> This seems to be the best set of tradeoffs for most use cases:
> > >>>>>>> - You can have parallel consumers up to the number of partitions
> > >> you
> > >>>> chose
> > >>>>>>> that still get messages in order per ID'd entity.
> > >>>>>>> - You can subscribe to just one table if you like, or to multiple
> > >>>> tables.
> > >>>>>>> - Consumers who need a total order over all updates can do a
> > >> "merge"
> > >>>> across
> > >>>>>>> the partitions to reassemble the fully ordered set of changes
> > >> across
> > >>>> all
> > >>>>>>> tables/partitions.
> > >>>>>>>
> > >>>>>>> One thing to note is that the requirement of having a single
> > >> consumer
> > >>>>>>> process/thread to get the total order isn't really so much a
> Kafka
> > >>>>>>> restriction as it just is a restriction about the world, since if
> > >> you
> > >>>> had
> > >>>>>>> multiple threads even if you delivered messages to them in order
> > >>> their
> > >>>>>>> processing might happen out of order (just do to the random
> timing
> > >> of
> > >>>> the
> > >>>>>>> processing).
> > >>>>>>>
> > >>>>>>> -Jay
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader <jrader...@gmail.com>
> > >>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Kafka Experts,
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> We have a use case around RDBMS replication where we are
> > >>> investigating
> > >>>>>>>> Kafka.  In this case ordering is very important.  Our
> > >> understanding
> > >>> is
> > >>>>>>>> ordering is only preserved within a single partition.  This
> makes
> > >>>> sense as
> > >>>>>>>> a single thread will consume these messages, but our question is
> > >> can
> > >>>> we
> > >>>>>>>> somehow parallelize this for better performance?   Is there
> maybe
> > >>> some
> > >>>>>>>> partition key strategy trick to have your cake and eat it too in
> > >>>> terms of
> > >>>>>>>> keeping ordering, but also able to parallelize the processing?
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> I am sorry if this has already been asked, but we tried to
> search
> > >>>> through
> > >>>>>>>> the archives and couldn't find this response.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>>
> > >>>>>>>> Josh
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> >
>

Reply via email to