Hello Josh,

We do not have a ticket open for idempotent producer as it is still in the
discussion process, but here is the wiki:

https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer

As for transactional messaging, we have a prototype implementation at
LinkedIn which is not yet incorporated into Apache JIRA, you can find its
design in this wiki:

https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka

Guozhang



On Mon, Mar 16, 2015 at 4:21 AM, Josh Rader <jrader...@gmail.com> wrote:

> Thanks Guozhang.  Are there JIRAs created for tracking idempotent producer
> or transactional messaging features?  Maybe we can pick up some of the
> tasks to expedite the release?
>
> On Fri, Mar 6, 2015 at 1:53 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Josh,
> >
> > Dedupping on the consumer side may be tricky as it requires some sequence
> > number on the messages in order to achieve idempotency. On the other
> hand,
> > we are planning to add idempotent producer or transactional messaging
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> >
> > Guozhang
> >
> > On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader <jrader...@gmail.com> wrote:
> >
> > > Thanks everyone for your responses!  These are great.  It seems our
> cases
> > > matches closest to Jay's recommendations.
> > >
> > > The one part that sounds a little tricky is point #5 'Include in each
> > > message the database's transaction id, scn, or other identifier '.
> This
> > is
> > > pretty straightforward with the RDBMS case that I mentioned, but I
> could
> > > see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
> > > which might not always have a readily available monotonic id,
> > particularly
> > > in failover scenarios.  I guess in that case we can think about
> creating
> > > this id ourselves from the single producer.
> > >
> > > Xiao,
> > >
> > > I think in the Kafka failover cases you mention if we also store the
> > offset
> > > with replicated data we should be able to pick up where we left off
> since
> > > we are using the low level consumer.  Maybe I am missing your point
> > > though...
> > >
> > > Guozhang,
> > >
> > > Very good point that we didn't think of.  We will need to think this
> > > through, as you say avoid resending other messages in a batch if one is
> > > failed.  I wonder if we might also manage this on the consumer side too
> > > with idempotency.  Thanks for raising this!
> > >
> > > Josh
> > >
> > >
> > >
> > > On Tue, Mar 3, 2015 at 6:08 PM, Xiao <lixiao1...@gmail.com> wrote:
> > >
> > > > Hey Josh,
> > > >
> > > > Sorry, after reading codes, Kafka did fsync the data using a separate
> > > > thread. The recovery point (oldest transaction timestamp) can be got
> > from
> > > > the file recovery-point-offset-checkpoint.
> > > >
> > > > You can adjust the value config.logFlushOffsetCheckpointIntervalMs,
> if
> > > you
> > > > think the speed is not quick enough. When the workloads is huge, the
> > > > bottleneck could be in your target side or source side. That means,
> > your
> > > > apply could have enough jobs to do.
> > > >
> > > > Basically, you need to keep reading this file for determining the
> > oldest
> > > > timestamps of all relevant partitions. Then, apply the transactions
> > until
> > > > that timestamp.
> > > >
> > > > Note, this does not protect the transaction consistency. This is just
> > for
> > > > ensuring the data at the target side is consistent at one timestamp
> > when
> > > > you have multiple channel to send data changes. The implementation
> > should
> > > > be simple if you can understand the concepts. I am unable to find the
> > > filed
> > > > patent application about it. This is one related paper. It covers the
> > > main
> > > > concepts about the issues you are facing. "Inter-Data-Center
> > Large-Scale
> > > > Database Replication Optimization – A Workload Driven Partitioning
> > > Approach"
> > > >
> > > > Hopefully, you understood what I explained above.
> > > >
> > > > Best wishes,
> > > >
> > > > Xiao Li
> > > >
> > > > Best wishes,
> > > >
> > > > Xiao Li
> > > >
> > > > On Mar 3, 2015, at 4:23 PM, Xiao <lixiao1...@gmail.com> wrote:
> > > >
> > > > > Hey Josh,
> > > > >
> > > > > If you put different tables into different partitions or topics, it
> > > > might break transaction ACID at the target side. This is risky for
> some
> > > use
> > > > cases. Besides unit of work issues, you also need to think about the
> > load
> > > > balancing too.
> > > > >
> > > > > For failover, you have to find the timestamp for point-in-time
> > > > consistency. This part is tricky. You have to ensure all the changes
> > > before
> > > > a specific timestamp have been flushed to the disk. Normally, you can
> > > > maintain a bookmark for different partition at the target side to
> know
> > > what
> > > > is the oldest transactions have been flushed to the disk.
> > Unfortunately,
> > > > based on my understanding, Kafka is unable to do it because it does
> not
> > > do
> > > > fsync regularly for achieving better throughput.
> > > > >
> > > > > Best wishes,
> > > > >
> > > > > Xiao Li
> > > > >
> > > > >
> > > > > On Mar 3, 2015, at 3:45 PM, Xiao <lixiao1...@gmail.com> wrote:
> > > > >
> > > > >> Hey Josh,
> > > > >>
> > > > >> Transactions can be applied in parallel in the consumer side based
> > on
> > > > transaction dependency checking.
> > > > >>
> > > > >> http://www.google.com.ar/patents/US20080163222
> > > > >>
> > > > >> This patent documents how it work. It is easy to understand,
> > however,
> > > > you also need to consider the hash collision issues. This has been
> > > > implemented in IBM Q Replication since 2001.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Xiao Li
> > > > >>
> > > > >>
> > > > >> On Mar 3, 2015, at 3:36 PM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> > > > >>
> > > > >>> Hey Josh,
> > > > >>>
> > > > >>> As you say, ordering is per partition. Technically it is
> generally
> > > > possible
> > > > >>> to publish all changes to a database to a single
> > partition--generally
> > > > the
> > > > >>> kafka partition should be high throughput enough to keep up.
> > However
> > > > there
> > > > >>> are a couple of downsides to this:
> > > > >>> 1. Consumer parallelism is limited to one. If you want a total
> > order
> > > > to the
> > > > >>> consumption of messages you need to have just 1 process, but
> often
> > > you
> > > > >>> would want to parallelize.
> > > > >>> 2. Often what people want is not a full stream of all changes in
> > all
> > > > tables
> > > > >>> in a database but rather the changes to a particular table.
> > > > >>>
> > > > >>> To some extent the best way to do this depends on what you will
> do
> > > > with the
> > > > >>> data. However if you intend to have lots
> > > > >>>
> > > > >>> I have seen pretty much every variation on this in the wild, and
> > here
> > > > is
> > > > >>> what I would recommend:
> > > > >>> 1. Have a single publisher process that publishes events into
> Kafka
> > > > >>> 2. If possible use the database log to get these changes (e.g.
> > mysql
> > > > >>> binlog, Oracle xstreams, golden gate, etc). This will be more
> > > complete
> > > > and
> > > > >>> more efficient than polling for changes, though that can work
> too.
> > > > >>> 3. Publish each table to its own topic.
> > > > >>> 4. Partition each topic by the primary key of the table.
> > > > >>> 5. Include in each message the database's transaction id, scn, or
> > > other
> > > > >>> identifier that gives the total order within the record stream.
> > Since
> > > > there
> > > > >>> is a single publisher this id will be monotonic within each
> > > partition.
> > > > >>>
> > > > >>> This seems to be the best set of tradeoffs for most use cases:
> > > > >>> - You can have parallel consumers up to the number of partitions
> > you
> > > > chose
> > > > >>> that still get messages in order per ID'd entity.
> > > > >>> - You can subscribe to just one table if you like, or to multiple
> > > > tables.
> > > > >>> - Consumers who need a total order over all updates can do a
> > "merge"
> > > > across
> > > > >>> the partitions to reassemble the fully ordered set of changes
> > across
> > > > all
> > > > >>> tables/partitions.
> > > > >>>
> > > > >>> One thing to note is that the requirement of having a single
> > consumer
> > > > >>> process/thread to get the total order isn't really so much a
> Kafka
> > > > >>> restriction as it just is a restriction about the world, since if
> > you
> > > > had
> > > > >>> multiple threads even if you delivered messages to them in order
> > > their
> > > > >>> processing might happen out of order (just do to the random
> timing
> > of
> > > > the
> > > > >>> processing).
> > > > >>>
> > > > >>> -Jay
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader <jrader...@gmail.com>
> > > > wrote:
> > > > >>>
> > > > >>>> Hi Kafka Experts,
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> We have a use case around RDBMS replication where we are
> > > investigating
> > > > >>>> Kafka.  In this case ordering is very important.  Our
> > understanding
> > > is
> > > > >>>> ordering is only preserved within a single partition.  This
> makes
> > > > sense as
> > > > >>>> a single thread will consume these messages, but our question is
> > can
> > > > we
> > > > >>>> somehow parallelize this for better performance?   Is there
> maybe
> > > some
> > > > >>>> partition key strategy trick to have your cake and eat it too in
> > > > terms of
> > > > >>>> keeping ordering, but also able to parallelize the processing?
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> I am sorry if this has already been asked, but we tried to
> search
> > > > through
> > > > >>>> the archives and couldn’t find this response.
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> Thanks,
> > > > >>>>
> > > > >>>> Josh
> > > > >>>>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to