@Jason

Yes, second thought on the number of messages included, the offset delta
will probably be sufficient. The use case I encounter before for number of
messages in a message set is an embedded mirror maker on the destination
broker side which fetches message directly from the source cluster. Ideally
the destination cluster only needs to check CRC and assign the offsets
because all the message verification has been done by the source cluster,
but due to the lack of the number of messages in the message set, we have
to decompress the message set to increment offsets correctly. By knowing
the number of the messages in the message set, we can avoid doing that. The
offset delta will also help. It's just then the offsets may have holes for
log compacted topics, but that may be fine.

@Apurva

I am not sure if it is true that the consumer will either deliver all the
message for the entire transaction or none of them from one poll() call. If
we allow the transactions to be across partitions, unless the consumer
consumes from all the partitions involved in a transactions, it seems
impossible for it to deliver *all* the messages in a transaction, right? A
weaker guarantee is we will deliver all or none of the messages that belong
to the same transaction in ONE partition, but this would be different from
the guarantee from the producer side.

My two cents on Radai's sideways partition design:
1. If we consider the producer side behavior as doing a two phase commit
which including the committing the consumer offsets, it is a little awkward
that we allow uncommitted message goes into the main log and rely on the
consumer to filter out. So semantic wise I think it would be better if we
can avoid this. Radai's suggestion is actually intuitive because if the
brokers do not want to expose uncommitted transactions to the consumer, the
brokers have to buffer it.

2. Regarding the efficiency. I think may be it worth looking at the
efficiency cost v.s benefit. The efficiency includes both server side
efficiency and consumer side efficiency.

Regarding the server side efficiency, the current proposal would probably
have better efficiency regardless of whether something goes wrong. Radai's
suggestion would put more burden on the server side. If nothing goes wrong
we always pay the cost of having double copy of the transactional messages
and do not get the semantic benefit. But if something goes wrong, the
efficiency cost we pay we get us a better semantic.

For the consumer side efficiency, because there is no need to buffer the
uncommitted messages. The current proposal may have to potentially buffer
uncommitted messages so it would be less efficient than Radai's suggestion
when a transaction aborts. When everything goes well, both design seems
having the similar performance. However, it depends on whether we are
willing to loosen the consumer side transaction guarantee that I mentioned
earlier to Apurva.

Currently the biggest pressure on the consumer side is that it has to
buffer incomplete transactions. There are two reasons for it,
A. A transaction may be aborted so we cannot expose the messages to the
users.
B. We want to return all or none of the messages in a transaction in ONE
partition.

While reason A is mandatory, I think reason B may be discussable. Radai's
design actually removes reason A because there is no uncommitted messages
exposed to the consumers. This may potentially give us a chance to
significantly improve consumer side efficiency in normal cases. It again
depends on the use case, i.e. whether user can process a transaction
progressively (message by message) or it has to be buffered and returned
all together. If in most cases, users can process the transactions message
by message (most stream processing tasks probably can do so), then with
Radai's proposal we don't need to buffer the transactions for the users
anymore, which is a big difference. For the latter case, the consumer may
have to buffer the incomplete transactions otherwise we are just throwing
the burden onto the users.

Thanks,

Jiangjie (Becket) Qin

On Fri, Dec 16, 2016 at 4:56 PM, Jay Kreps <j...@confluent.io> wrote:

> Yeah good point. I relent!
>
> -jay
>
> On Fri, Dec 16, 2016 at 1:46 PM Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Jay/Ismael,
> >
> >
> >
> > I agree that lazy initialization of metadata seems unavoidable. Ideally,
> we
> >
> > could follow the same pattern for transactions, but remember that in the
> >
> > consumer+producer use case, the initialization needs to be completed
> prior
> >
> > to setting the consumer's position. Otherwise we risk reading stale
> >
> > offsets. But it would be pretty awkward if you have to begin a
> transaction
> >
> > first to ensure that your consumer can read the right offset from the
> >
> > consumer, right? It's a bit easier to explain that you should always call
> >
> > `producer.init()` prior to initializing the consumer. Users would
> probably
> >
> > get this right without any special effort.
> >
> >
> >
> > -Jason
> >
> >
> >
> > On Wed, Dec 14, 2016 at 1:52 AM, Rajini Sivaram <rsiva...@pivotal.io>
> > wrote:
> >
> >
> >
> > > Hi Apurva,
> >
> > >
> >
> > > Thank you for the answers. Just one follow-on.
> >
> > >
> >
> > > 15. Let me rephrase my original question. If all control messages
> > (messages
> >
> > > to transaction logs and markers on user logs) were acknowledged only
> > after
> >
> > > flushing the log segment, will transactions become durable in the
> >
> > > traditional sense (i.e. not restricted to min.insync.replicas
> failures) ?
> >
> > > This is not a suggestion to update the KIP. It seems to me that the
> > design
> >
> > > enables full durability if required in the future with a rather
> >
> > > non-intrusive change. I just wanted to make sure I haven't missed
> > anything
> >
> > > fundamental that prevents Kafka from doing this.
> >
> > >
> >
> > >
> >
> > >
> >
> > > On Wed, Dec 14, 2016 at 5:30 AM, Ben Kirwin <b...@kirw.in> wrote:
> >
> > >
> >
> > > > Hi Apurva,
> >
> > > >
> >
> > > > Thanks for the detailed answers... and sorry for the late reply!
> >
> > > >
> >
> > > > It does sound like, if the input-partitions-to-app-id mapping never
> >
> > > > changes, the existing fencing mechanisms should prevent duplicates.
> >
> > > Great!
> >
> > > > I'm a bit concerned the proposed API will be delicate to program
> > against
> >
> > > > successfully -- even in the simple case, we need to create a new
> > producer
> >
> > > > instance per input partition, and anything fancier is going to need
> its
> >
> > > own
> >
> > > > implementation of the Streams/Samza-style 'task' idea -- but that may
> > be
> >
> > > > fine for this sort of advanced feature.
> >
> > > >
> >
> > > > For the second question, I notice that Jason also elaborated on this
> >
> > > > downthread:
> >
> > > >
> >
> > > > > We also looked at removing the producer ID.
> >
> > > > > This was discussed somewhere above, but basically the idea is to
> > store
> >
> > > > the
> >
> > > > > AppID in the message set header directly and avoid the mapping to
> >
> > > > producer
> >
> > > > > ID altogether. As long as batching isn't too bad, the impact on
> total
> >
> > > > size
> >
> > > > > may not be too bad, but we were ultimately more comfortable with a
> >
> > > fixed
> >
> > > > > size ID.
> >
> > > >
> >
> > > > ...which suggests that the distinction is useful for performance, but
> > not
> >
> > > > necessary for correctness, which makes good sense to me. (Would a
> > 128-bid
> >
> > > > ID be a reasonable compromise? That's enough room for a UUID, or a
> >
> > > > reasonable hash of an arbitrary string, and has only a marginal
> > increase
> >
> > > on
> >
> > > > the message size.)
> >
> > > >
> >
> > > > On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta <apu...@confluent.io>
> >
> > > wrote:
> >
> > > >
> >
> > > > > Hi Ben,
> >
> > > > >
> >
> > > > > Now, on to your first question of how deal with consumer
> rebalances.
> >
> > > The
> >
> > > > > short answer is that the application needs to ensure that the the
> >
> > > > > assignment of input partitions to appId is consistent across
> >
> > > rebalances.
> >
> > > > >
> >
> > > > > For Kafka streams, they already ensure that the mapping of input
> >
> > > > partitions
> >
> > > > > to task Id is invariant across rebalances by implementing a custom
> >
> > > sticky
> >
> > > > > assignor. Other non-streams apps can trivially have one producer
> per
> >
> > > > input
> >
> > > > > partition and have the appId be the same as the partition number to
> >
> > > > achieve
> >
> > > > > the same effect.
> >
> > > > >
> >
> > > > > With this precondition in place, we can maintain transactions
> across
> >
> > > > > rebalances.
> >
> > > > >
> >
> > > > > Hope this answers your question.
> >
> > > > >
> >
> > > > > Thanks,
> >
> > > > > Apurva
> >
> > > > >
> >
> > > > > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin <b...@kirw.in> wrote:
> >
> > > > >
> >
> > > > > > Thanks for this! I'm looking forward to going through the full
> >
> > > proposal
> >
> > > > > in
> >
> > > > > > detail soon; a few early questions:
> >
> > > > > >
> >
> > > > > > First: what happens when a consumer rebalances in the middle of a
> >
> > > > > > transaction? The full documentation suggests that such a
> > transaction
> >
> > > > > ought
> >
> > > > > > to be rejected:
> >
> > > > > >
> >
> > > > > > > [...] if a rebalance has happened and this consumer
> >
> > > > > > > instance becomes a zombie, even if this offset message is
> > appended
> >
> > > in
> >
> > > > > the
> >
> > > > > > > offset topic, the transaction will be rejected later on when it
> >
> > > tries
> >
> > > > > to
> >
> > > > > > > commit the transaction via the EndTxnRequest.
> >
> > > > > >
> >
> > > > > > ...but it's unclear to me how we ensure that a transaction can't
> >
> > > > complete
> >
> > > > > > if a rebalance has happened. (It's quite possible I'm missing
> >
> > > something
> >
> > > > > > obvious!)
> >
> > > > > >
> >
> > > > > > As a concrete example: suppose a process with PID 1 adds offsets
> > for
> >
> > > > some
> >
> > > > > > partition to a transaction; a consumer rebalance happens that
> > assigns
> >
> > > > the
> >
> > > > > > partition to a process with PID 2, which adds some offsets to its
> >
> > > > current
> >
> > > > > > transaction; both processes try and commit. Allowing both commits
> >
> > > would
> >
> > > > > > cause the messages to be processed twice -- how is that avoided?
> >
> > > > > >
> >
> > > > > > Second: App IDs normally map to a single PID. It seems like one
> > could
> >
> > > > do
> >
> > > > > > away with the PID concept entirely, and just use App IDs in most
> >
> > > places
> >
> > > > > > that require a PID. This feels like it would be significantly
> >
> > > simpler,
> >
> > > > > > though it does increase the message size. Are there other reasons
> > why
> >
> > > > the
> >
> > > > > > App ID / PID split is necessary?
> >
> > > > > >
> >
> > > > > > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang <
> wangg...@gmail.com
> > >
> >
> > > > > wrote:
> >
> > > > > >
> >
> > > > > > > Hi all,
> >
> > > > > > >
> >
> > > > > > > I have just created KIP-98 to enhance Kafka with exactly once
> >
> > > > delivery
> >
> > > > > > > semantics:
> >
> > > > > > >
> >
> > > > > > > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >
> > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> >
> > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >
> > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
> >
> > > > > > >
> >
> > > > > > > This KIP adds a transactional messaging mechanism along with an
> >
> > > > > > idempotent
> >
> > > > > > > producer implementation to make sure that 1) duplicated
> messages
> >
> > > sent
> >
> > > > > > from
> >
> > > > > > > the same identified producer can be detected on the broker
> side,
> >
> > > and
> >
> > > > > 2) a
> >
> > > > > > > group of messages sent within a transaction will atomically be
> >
> > > either
> >
> > > > > > > reflected and fetchable to consumers or not as a whole.
> >
> > > > > > >
> >
> > > > > > > The above wiki page provides a high-level view of the proposed
> >
> > > > changes
> >
> > > > > as
> >
> > > > > > > well as summarized guarantees. Initial draft of the detailed
> >
> > > > > > implementation
> >
> > > > > > > design is described in this Google doc:
> >
> > > > > > >
> >
> > > > > > > https://docs.google.com/document/d/11Jqy_
> >
> > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> >
> > > > > > > 0wSw9ra8
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > > We would love to hear your comments and suggestions.
> >
> > > > > > >
> >
> > > > > > > Thanks,
> >
> > > > > > >
> >
> > > > > > > -- Guozhang
> >
> > > > > > >
> >
> > > > > >
> >
> > > > >
> >
> > > >
> >
> > >
> >
> >
>

Reply via email to