Hey Josh,

NoSQL DBs may actually be easier because they themselves generally don't
have a global order. I.e. I believe Mongo has a per-partition oplog, is
that right? Their partitions would match our partitions.

-Jay

On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader <jrader...@gmail.com> wrote:

> Thanks everyone for your responses!  These are great.  It seems our cases
> matches closest to Jay's recommendations.
>
> The one part that sounds a little tricky is point #5 'Include in each
> message the database's transaction id, scn, or other identifier '.  This is
> pretty straightforward with the RDBMS case that I mentioned, but I could
> see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
> which might not always have a readily available monotonic id, particularly
> in failover scenarios.  I guess in that case we can think about creating
> this id ourselves from the single producer.
>
> Xiao,
>
> I think in the Kafka failover cases you mention if we also store the offset
> with replicated data we should be able to pick up where we left off since
> we are using the low level consumer.  Maybe I am missing your point
> though...
>
> Guozhang,
>
> Very good point that we didn't think of.  We will need to think this
> through, as you say avoid resending other messages in a batch if one is
> failed.  I wonder if we might also manage this on the consumer side too
> with idempotency.  Thanks for raising this!
>
> Josh
>
>
>
> On Tue, Mar 3, 2015 at 6:08 PM, Xiao <lixiao1...@gmail.com> wrote:
>
> > Hey Josh,
> >
> > Sorry, after reading codes, Kafka did fsync the data using a separate
> > thread. The recovery point (oldest transaction timestamp) can be got from
> > the file recovery-point-offset-checkpoint.
> >
> > You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if
> you
> > think the speed is not quick enough. When the workloads is huge, the
> > bottleneck could be in your target side or source side. That means, your
> > apply could have enough jobs to do.
> >
> > Basically, you need to keep reading this file for determining the oldest
> > timestamps of all relevant partitions. Then, apply the transactions until
> > that timestamp.
> >
> > Note, this does not protect the transaction consistency. This is just for
> > ensuring the data at the target side is consistent at one timestamp when
> > you have multiple channel to send data changes. The implementation should
> > be simple if you can understand the concepts. I am unable to find the
> filed
> > patent application about it. This is one related paper. It covers the
> main
> > concepts about the issues you are facing. "Inter-Data-Center Large-Scale
> > Database Replication Optimization – A Workload Driven Partitioning
> Approach"
> >
> > Hopefully, you understood what I explained above.
> >
> > Best wishes,
> >
> > Xiao Li
> >
> > Best wishes,
> >
> > Xiao Li
> >
> > On Mar 3, 2015, at 4:23 PM, Xiao <lixiao1...@gmail.com> wrote:
> >
> > > Hey Josh,
> > >
> > > If you put different tables into different partitions or topics, it
> > might break transaction ACID at the target side. This is risky for some
> use
> > cases. Besides unit of work issues, you also need to think about the load
> > balancing too.
> > >
> > > For failover, you have to find the timestamp for point-in-time
> > consistency. This part is tricky. You have to ensure all the changes
> before
> > a specific timestamp have been flushed to the disk. Normally, you can
> > maintain a bookmark for different partition at the target side to know
> what
> > is the oldest transactions have been flushed to the disk. Unfortunately,
> > based on my understanding, Kafka is unable to do it because it does not
> do
> > fsync regularly for achieving better throughput.
> > >
> > > Best wishes,
> > >
> > > Xiao Li
> > >
> > >
> > > On Mar 3, 2015, at 3:45 PM, Xiao <lixiao1...@gmail.com> wrote:
> > >
> > >> Hey Josh,
> > >>
> > >> Transactions can be applied in parallel in the consumer side based on
> > transaction dependency checking.
> > >>
> > >> http://www.google.com.ar/patents/US20080163222
> > >>
> > >> This patent documents how it work. It is easy to understand, however,
> > you also need to consider the hash collision issues. This has been
> > implemented in IBM Q Replication since 2001.
> > >>
> > >> Thanks,
> > >>
> > >> Xiao Li
> > >>
> > >>
> > >> On Mar 3, 2015, at 3:36 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> > >>
> > >>> Hey Josh,
> > >>>
> > >>> As you say, ordering is per partition. Technically it is generally
> > possible
> > >>> to publish all changes to a database to a single partition--generally
> > the
> > >>> kafka partition should be high throughput enough to keep up. However
> > there
> > >>> are a couple of downsides to this:
> > >>> 1. Consumer parallelism is limited to one. If you want a total order
> > to the
> > >>> consumption of messages you need to have just 1 process, but often
> you
> > >>> would want to parallelize.
> > >>> 2. Often what people want is not a full stream of all changes in all
> > tables
> > >>> in a database but rather the changes to a particular table.
> > >>>
> > >>> To some extent the best way to do this depends on what you will do
> > with the
> > >>> data. However if you intend to have lots
> > >>>
> > >>> I have seen pretty much every variation on this in the wild, and here
> > is
> > >>> what I would recommend:
> > >>> 1. Have a single publisher process that publishes events into Kafka
> > >>> 2. If possible use the database log to get these changes (e.g. mysql
> > >>> binlog, Oracle xstreams, golden gate, etc). This will be more
> complete
> > and
> > >>> more efficient than polling for changes, though that can work too.
> > >>> 3. Publish each table to its own topic.
> > >>> 4. Partition each topic by the primary key of the table.
> > >>> 5. Include in each message the database's transaction id, scn, or
> other
> > >>> identifier that gives the total order within the record stream. Since
> > there
> > >>> is a single publisher this id will be monotonic within each
> partition.
> > >>>
> > >>> This seems to be the best set of tradeoffs for most use cases:
> > >>> - You can have parallel consumers up to the number of partitions you
> > chose
> > >>> that still get messages in order per ID'd entity.
> > >>> - You can subscribe to just one table if you like, or to multiple
> > tables.
> > >>> - Consumers who need a total order over all updates can do a "merge"
> > across
> > >>> the partitions to reassemble the fully ordered set of changes across
> > all
> > >>> tables/partitions.
> > >>>
> > >>> One thing to note is that the requirement of having a single consumer
> > >>> process/thread to get the total order isn't really so much a Kafka
> > >>> restriction as it just is a restriction about the world, since if you
> > had
> > >>> multiple threads even if you delivered messages to them in order
> their
> > >>> processing might happen out of order (just do to the random timing of
> > the
> > >>> processing).
> > >>>
> > >>> -Jay
> > >>>
> > >>>
> > >>>
> > >>> On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader <jrader...@gmail.com>
> > wrote:
> > >>>
> > >>>> Hi Kafka Experts,
> > >>>>
> > >>>>
> > >>>>
> > >>>> We have a use case around RDBMS replication where we are
> investigating
> > >>>> Kafka.  In this case ordering is very important.  Our understanding
> is
> > >>>> ordering is only preserved within a single partition.  This makes
> > sense as
> > >>>> a single thread will consume these messages, but our question is can
> > we
> > >>>> somehow parallelize this for better performance?   Is there maybe
> some
> > >>>> partition key strategy trick to have your cake and eat it too in
> > terms of
> > >>>> keeping ordering, but also able to parallelize the processing?
> > >>>>
> > >>>>
> > >>>>
> > >>>> I am sorry if this has already been asked, but we tried to search
> > through
> > >>>> the archives and couldn’t find this response.
> > >>>>
> > >>>>
> > >>>>
> > >>>> Thanks,
> > >>>>
> > >>>> Josh
> > >>>>
> > >>
> > >
> >
> >
>

Reply via email to