a few more points:

1. "we can make that efficient by storing only the 'start offsets' of
transactions and then replaying the log once we hit the corresponding
commit/abort markers"

this is efficient in terms of memory consumption on the consumer. it is
definitely not efficient when i consider what this means in a large
datacenter with lots and lots of consumers - you've double reads on the
network (and broker disks, potentially). your other alternative is to spool
to client disk (again not efficient across the whole datacenter). this is
why i think more of TX support belongs on the broker.

2. while a commit is in progress (partition leader is still appending tx
msgs onto the partition and replicating to followers) any new msgs sent to
the partition (or committed from another TX) will be queued behind them.
there is a downside to this - watermarks will not advance until a whole TX
is appended. on the other hand, actual writes continue at full possible
speed, so this is not a real performance degradation, just a potential
latency spike (head of queue blocking on appending TXs). logically any
consumer that "opts in" under the current proposal already suffers it.

On Fri, Dec 16, 2016 at 9:24 AM, radai <radai.rosenbl...@gmail.com> wrote:

> Hi Apurva,
>
> here's an outline of what I had in mind:
>
> 1. TXs in progress are written "sideways" on the partition leader. this
> can be (in order of increasing reliability and decreasing performance):
>      1.1 an im-mem buffer on the partition leader (TX lost on broker
> crash, memory limits need to be in place, but disk and network efficient.
>      1.2 a file on the partition leader (still not replicated)
>      1.3 a full blown "ad-hoc partition" - so file on the leader
> replicated to followers, open TX will survive a crash.
> 2. as long as a TX is in progress its not written to the "real"/"target"
> partition at all. this means nothing downstream sees it and that any
> offsets returned to client at this point are "virtual".
> 3. when a client commits, the partition leader will append all the msgs
> accumulated in the TX store (#1 above) to the real partition as if theyve
> just been written:
>     3.1 this is the point where msgs get "real" offsets
>     3.2 since TX size may be larger than what followers pull in a single
> go, and any broker can still crash at this point, the appended TX msgs can
> only be "seen" by clients once everything has been distributed to
> followers. this would mean some marker on the 1st msg appended indicating
> how many more are expected (known at this point) so that watermarks will
> not advance until everything is appended and new leaders can resume/abort
> on crash.
>     3.3 only after commit step in #3.2 above has completed can watermarks
> advance and msgs become visible. this is also the point where the client
> can consider the TX committed.
> 4. depending on the choice made in #1, there are several alternatives
> w.r.t complexity and disk efficiency:
>     4.1 append contents of TX file to partition segment file. this means
> that any TX msg incurs 2 disk writes and 1 disk read (written to TX file,
> read from TX file, written to partition segment).
>     4.2 make the partition log a new segment file. this is efficient in
> terms of disk writes (msgs were already written to disk, just reuse the
> file and make it a segment) but implemented naively would lead to
> fragmentation. could be consolidated in the background (similar in concept
> to compaction) to amortize the cost.
>     4.3 offer lower resiliency guarantees for ongoing TXs compared to
> normal msg delivery (this is common in DBs - if your sql connection drops,
> your TX is gone. its accepted by users), spool them in memory with
> disk-overflow and so given proper sizing incur no extra disk costs at all
> (overflow to disk when out of memory).
>
> this design puts more burden server-side but presents a cleaner view
> downstream and allows for simpler, smaller, more efficient clients. given
> the kafka ecosystem includes one server implementation and multiple client
> implementations across various languages owned and maintained by different
> people i think this is a better solution in terms of overall complexity,
> overall network utilization (io/disk across the whole datacenter, not just
> the broker) and in terms of eco system adoption (less burden on client
> writers, as we want to make it easier to have lots of client impls).
>
>
> On Thu, Dec 15, 2016 at 11:59 PM, Apurva Mehta <apu...@confluent.io>
> wrote:
>
>> Hi Radai,
>>
>> Thanks for your email. You raise some good points.
>>
>> Your larger point centers on your --correct!-- observation that persisting
>> messages belonging to aborted transactions causes complications for
>> consumers.
>>
>> You are right in observing that the current proposal does require
>> consumers
>> to buffer, though we can make that efficient by storing only the 'start
>> offsets' of transactions and then replaying the log once we hit the
>> corresponding commit/abort markers. And yes, it does mean that in some
>> consumer modes, we will deliver messages belonging to aborted
>> transactions.
>>
>> As such, I would really like to understand the details of your alternative
>> to see if we can address these issues efficiently.
>>
>> When you say that you will write to the main partition from the 'sideways'
>> transaction partition atomically, do you literally mean that a sequence of
>> individual messages from the 'sideways' partition will be written to the
>> main partition atomically?
>>
>> If the 'sideways partition' is written to disk, you have two copies of
>> data. I would like to know how you would ensure that you do not create
>> duplicates in the event of crashes. And if it is in-mem, you are now
>> buffering anyway, on all replicas.
>>
>> And, in the absence of any markers in the log, how would you ensure that
>> the batch of messages in a committed transaction is delivered to the
>> consumer all together? In the current proposal, the return of a 'poll'
>> would include all the messages of a transaction, or none of them.
>>
>> * *
>>
>> Re: 3::  you ask about the migration plan. This has been detailed in the
>> design
>> doc
>> <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7
>> CP1SnQGdp2eF0wSw9ra8/edit#bookmark=id.3s6zgsaq9cep>.
>> In particular, the recommended sequence is to bump all your clients before
>> bumping up the message format. Further, since transactions are entirely
>> opt-in, it is not unreasonable to ensure that the ecosystem is ready for
>> them before starting to use them.
>>
>> * *
>>
>> Thanks,
>> Apurva
>>
>>
>> On Thu, Dec 15, 2016 at 3:07 PM, radai <radai.rosenbl...@gmail.com>
>> wrote:
>>
>> > some clarifications on my alternative proposal:
>> >
>> > TX msgs are written "sideways" to a transaction (ad-hoc) partition. this
>> > partition can be replicated to followers, or can be an in-mem buffer -
>> > depends on the resilience guarantees you want to provide for TXs in
>> case of
>> > broker crash.
>> > on "commit" the partition leader broker (being the single point of
>> > synchronization for the partition anyway) can atomically append the
>> > contents of this TX "partition" onto the real target partition. this is
>> the
>> > point where the msgs get "real" offsets. there's some trickiness around
>> how
>> > not to expose these offsets to any consumers until everything's been
>> > replicated to followers, but we believe its possible.
>> >
>> >
>> >
>> > On Thu, Dec 15, 2016 at 2:31 PM, radai <radai.rosenbl...@gmail.com>
>> wrote:
>> >
>> > > I can see several issues with the current proposal.
>> > >
>> > > messages, even if sent under a TX, are delivered directly to their
>> > > destination partitions, downstream consumers need to be TX-aware. they
>> > can
>> > > either:
>> > >    1. be oblivious to TXs. that means they will deliver "garbage" -
>> msgs
>> > > sent during eventually-aborted TXs.
>> > >    2. "opt-in" - they would have to not deliver _ANY_ msg until they
>> know
>> > > the fate of all outstanding overlapping TXs - if i see msg A1 (in a
>> TX),
>> > > followed by B, which is not under any TX, i cannot deliver B until i
>> know
>> > > if A1 was committed or not (or I violate ordering). this would require
>> > some
>> > > sort of buffering on consumers. with a naive buffering impl i could
>> DOS
>> > > everyone on a topic - just start a TX on a very busy topic and keep it
>> > open
>> > > as long as I can ....
>> > >    3. explode if youre an old consumer that sees a control msg (whats
>> > your
>> > > migration plan?)
>> > >    4. cross-cluster replication mechanisms either replicate the
>> garbage
>> > or
>> > > need to clean it up. there are >1 such different mechanism (almost one
>> > per
>> > > company really :-) ) so lots of adjustments.
>> > >
>> > > I think the end result could be better if ongoing TXs are treated as
>> > > logically separate topic partitions, and only atomically appended onto
>> > the
>> > > target partitions on commit (meaning they are written to separate
>> journal
>> > > file(s) on the broker).
>> > >
>> > > such a design would present a "clean" view to any downstream
>> consumers -
>> > > anything not committed wont even show up. old consumers wont need to
>> know
>> > > about control msgs, no issues with unbounded msg buffering, generally
>> > > cleaner overall?
>> > >
>> > > there would need to be adjustments made to watermark and follower
>> fetch
>> > > logic but some of us here have discussed this over lunch and we think
>> its
>> > > doable.
>> > >
>> > >
>> > > On Thu, Dec 15, 2016 at 1:08 AM, Rajini Sivaram <rsiva...@pivotal.io>
>> > > wrote:
>> > >
>> > >> Hi Apurva,
>> > >>
>> > >> Thank you, makes sense.
>> > >>
>> > >> Rajini
>> > >>
>> > >> On Wed, Dec 14, 2016 at 7:36 PM, Apurva Mehta <apu...@confluent.io>
>> > >> wrote:
>> > >>
>> > >> > Hi Rajini,
>> > >> >
>> > >> > I think my original response to your point 15 was not accurate. The
>> > >> regular
>> > >> > definition of durability is that data once committed would never be
>> > >> lost.
>> > >> > So it is not enough for only the control messages to be flushed
>> before
>> > >> > being acknowledged -- all the messages (and offset commits) which
>> are
>> > >> part
>> > >> > of the transaction would need to be flushed before being
>> acknowledged
>> > as
>> > >> > well.
>> > >> >
>> > >> > Otherwise, it is possible that if all replicas of a topic partition
>> > >> crash
>> > >> > before the transactional messages are flushed, those messages will
>> be
>> > >> lost
>> > >> > even if the commit marker exists in the log. In this case, the
>> > >> transaction
>> > >> > would be 'committed' with incomplete data.
>> > >> >
>> > >> > Right now, there isn't any config which will ensure that the flush
>> to
>> > >> disk
>> > >> > happens before the acknowledgement. We could add it in the future,
>> and
>> > >> get
>> > >> > durability guarantees for kafka transactions.
>> > >> >
>> > >> > I hope this clarifies the situation. The present KIP does not
>> intend
>> > to
>> > >> add
>> > >> > the aforementioned config, so even the control messages are
>> > susceptible
>> > >> to
>> > >> > being lost if there is a simultaneous crash across all replicas. So
>> > >> > transactions are only as durable as existing Kafka messages. We
>> don't
>> > >> > strengthen any durability guarantees as part of this KIP.
>> > >> >
>> > >> > Thanks,
>> > >> > Apurva
>> > >> >
>> > >> >
>> > >> > On Wed, Dec 14, 2016 at 1:52 AM, Rajini Sivaram <
>> rsiva...@pivotal.io>
>> > >> > wrote:
>> > >> >
>> > >> > > Hi Apurva,
>> > >> > >
>> > >> > > Thank you for the answers. Just one follow-on.
>> > >> > >
>> > >> > > 15. Let me rephrase my original question. If all control messages
>> > >> > (messages
>> > >> > > to transaction logs and markers on user logs) were acknowledged
>> only
>> > >> > after
>> > >> > > flushing the log segment, will transactions become durable in the
>> > >> > > traditional sense (i.e. not restricted to min.insync.replicas
>> > >> failures) ?
>> > >> > > This is not a suggestion to update the KIP. It seems to me that
>> the
>> > >> > design
>> > >> > > enables full durability if required in the future with a rather
>> > >> > > non-intrusive change. I just wanted to make sure I haven't missed
>> > >> > anything
>> > >> > > fundamental that prevents Kafka from doing this.
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > > On Wed, Dec 14, 2016 at 5:30 AM, Ben Kirwin <b...@kirw.in> wrote:
>> > >> > >
>> > >> > > > Hi Apurva,
>> > >> > > >
>> > >> > > > Thanks for the detailed answers... and sorry for the late
>> reply!
>> > >> > > >
>> > >> > > > It does sound like, if the input-partitions-to-app-id mapping
>> > never
>> > >> > > > changes, the existing fencing mechanisms should prevent
>> > duplicates.
>> > >> > > Great!
>> > >> > > > I'm a bit concerned the proposed API will be delicate to
>> program
>> > >> > against
>> > >> > > > successfully -- even in the simple case, we need to create a
>> new
>> > >> > producer
>> > >> > > > instance per input partition, and anything fancier is going to
>> > need
>> > >> its
>> > >> > > own
>> > >> > > > implementation of the Streams/Samza-style 'task' idea -- but
>> that
>> > >> may
>> > >> > be
>> > >> > > > fine for this sort of advanced feature.
>> > >> > > >
>> > >> > > > For the second question, I notice that Jason also elaborated on
>> > this
>> > >> > > > downthread:
>> > >> > > >
>> > >> > > > > We also looked at removing the producer ID.
>> > >> > > > > This was discussed somewhere above, but basically the idea
>> is to
>> > >> > store
>> > >> > > > the
>> > >> > > > > AppID in the message set header directly and avoid the
>> mapping
>> > to
>> > >> > > > producer
>> > >> > > > > ID altogether. As long as batching isn't too bad, the impact
>> on
>> > >> total
>> > >> > > > size
>> > >> > > > > may not be too bad, but we were ultimately more comfortable
>> > with a
>> > >> > > fixed
>> > >> > > > > size ID.
>> > >> > > >
>> > >> > > > ...which suggests that the distinction is useful for
>> performance,
>> > >> but
>> > >> > not
>> > >> > > > necessary for correctness, which makes good sense to me.
>> (Would a
>> > >> > 128-bid
>> > >> > > > ID be a reasonable compromise? That's enough room for a UUID,
>> or a
>> > >> > > > reasonable hash of an arbitrary string, and has only a marginal
>> > >> > increase
>> > >> > > on
>> > >> > > > the message size.)
>> > >> > > >
>> > >> > > > On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta <
>> > apu...@confluent.io>
>> > >> > > wrote:
>> > >> > > >
>> > >> > > > > Hi Ben,
>> > >> > > > >
>> > >> > > > > Now, on to your first question of how deal with consumer
>> > >> rebalances.
>> > >> > > The
>> > >> > > > > short answer is that the application needs to ensure that the
>> > the
>> > >> > > > > assignment of input partitions to appId is consistent across
>> > >> > > rebalances.
>> > >> > > > >
>> > >> > > > > For Kafka streams, they already ensure that the mapping of
>> input
>> > >> > > > partitions
>> > >> > > > > to task Id is invariant across rebalances by implementing a
>> > custom
>> > >> > > sticky
>> > >> > > > > assignor. Other non-streams apps can trivially have one
>> producer
>> > >> per
>> > >> > > > input
>> > >> > > > > partition and have the appId be the same as the partition
>> number
>> > >> to
>> > >> > > > achieve
>> > >> > > > > the same effect.
>> > >> > > > >
>> > >> > > > > With this precondition in place, we can maintain transactions
>> > >> across
>> > >> > > > > rebalances.
>> > >> > > > >
>> > >> > > > > Hope this answers your question.
>> > >> > > > >
>> > >> > > > > Thanks,
>> > >> > > > > Apurva
>> > >> > > > >
>> > >> > > > > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin <b...@kirw.in>
>> wrote:
>> > >> > > > >
>> > >> > > > > > Thanks for this! I'm looking forward to going through the
>> full
>> > >> > > proposal
>> > >> > > > > in
>> > >> > > > > > detail soon; a few early questions:
>> > >> > > > > >
>> > >> > > > > > First: what happens when a consumer rebalances in the
>> middle
>> > of
>> > >> a
>> > >> > > > > > transaction? The full documentation suggests that such a
>> > >> > transaction
>> > >> > > > > ought
>> > >> > > > > > to be rejected:
>> > >> > > > > >
>> > >> > > > > > > [...] if a rebalance has happened and this consumer
>> > >> > > > > > > instance becomes a zombie, even if this offset message is
>> > >> > appended
>> > >> > > in
>> > >> > > > > the
>> > >> > > > > > > offset topic, the transaction will be rejected later on
>> when
>> > >> it
>> > >> > > tries
>> > >> > > > > to
>> > >> > > > > > > commit the transaction via the EndTxnRequest.
>> > >> > > > > >
>> > >> > > > > > ...but it's unclear to me how we ensure that a transaction
>> > can't
>> > >> > > > complete
>> > >> > > > > > if a rebalance has happened. (It's quite possible I'm
>> missing
>> > >> > > something
>> > >> > > > > > obvious!)
>> > >> > > > > >
>> > >> > > > > > As a concrete example: suppose a process with PID 1 adds
>> > offsets
>> > >> > for
>> > >> > > > some
>> > >> > > > > > partition to a transaction; a consumer rebalance happens
>> that
>> > >> > assigns
>> > >> > > > the
>> > >> > > > > > partition to a process with PID 2, which adds some offsets
>> to
>> > >> its
>> > >> > > > current
>> > >> > > > > > transaction; both processes try and commit. Allowing both
>> > >> commits
>> > >> > > would
>> > >> > > > > > cause the messages to be processed twice -- how is that
>> > avoided?
>> > >> > > > > >
>> > >> > > > > > Second: App IDs normally map to a single PID. It seems like
>> > one
>> > >> > could
>> > >> > > > do
>> > >> > > > > > away with the PID concept entirely, and just use App IDs in
>> > most
>> > >> > > places
>> > >> > > > > > that require a PID. This feels like it would be
>> significantly
>> > >> > > simpler,
>> > >> > > > > > though it does increase the message size. Are there other
>> > >> reasons
>> > >> > why
>> > >> > > > the
>> > >> > > > > > App ID / PID split is necessary?
>> > >> > > > > >
>> > >> > > > > > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang <
>> > >> wangg...@gmail.com
>> > >> > >
>> > >> > > > > wrote:
>> > >> > > > > >
>> > >> > > > > > > Hi all,
>> > >> > > > > > >
>> > >> > > > > > > I have just created KIP-98 to enhance Kafka with exactly
>> > once
>> > >> > > > delivery
>> > >> > > > > > > semantics:
>> > >> > > > > > >
>> > >> > > > > > > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > >> > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>> > >> > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > >> > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
>> > >> > > > > > >
>> > >> > > > > > > This KIP adds a transactional messaging mechanism along
>> with
>> > >> an
>> > >> > > > > > idempotent
>> > >> > > > > > > producer implementation to make sure that 1) duplicated
>> > >> messages
>> > >> > > sent
>> > >> > > > > > from
>> > >> > > > > > > the same identified producer can be detected on the
>> broker
>> > >> side,
>> > >> > > and
>> > >> > > > > 2) a
>> > >> > > > > > > group of messages sent within a transaction will
>> atomically
>> > be
>> > >> > > either
>> > >> > > > > > > reflected and fetchable to consumers or not as a whole.
>> > >> > > > > > >
>> > >> > > > > > > The above wiki page provides a high-level view of the
>> > proposed
>> > >> > > > changes
>> > >> > > > > as
>> > >> > > > > > > well as summarized guarantees. Initial draft of the
>> detailed
>> > >> > > > > > implementation
>> > >> > > > > > > design is described in this Google doc:
>> > >> > > > > > >
>> > >> > > > > > > https://docs.google.com/document/d/11Jqy_
>> > >> > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>> > >> > > > > > > 0wSw9ra8
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > We would love to hear your comments and suggestions.
>> > >> > > > > > >
>> > >> > > > > > > Thanks,
>> > >> > > > > > >
>> > >> > > > > > > -- Guozhang
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Reply via email to