Thanks Apurva - replies inline.

On 2017年01月27日 15:19, Apurva Mehta wrote:
Eugen, moving your email to the main thread so that it doesn't get split.

The `transaction.app.id` is a prerequisite for using transactional APIs.
And only messages wrapped inside transactions will enjoy idempotent
guarantees across sessions, and that too only when they employ a
consume-process-produce pattern.

Say I have a producer, producing messages into a topic and I only want to guarantee the producer cannot insert duplicates. In other words, there's no downstream consumer/processor to be worried about - which, when considering the correctness of the data only, is all I need for idempotent producers, as every message has a unique id (offset), so downstream processes can take care of exactly once processing by any number of means. (If you need transactional all-or-none behavior, which KIP-98 also addresses, that's of course a more complex story)

I was under the impression that KIP-98 would fulfill above requirement, i.e. the prevention of duplicate inserts of the same message into a topic per producer, without using transactions, and guaranteed across tcp connections to handle producer/broker crashes and network problems.

In other words, producers where the `transaction.app.id` is specified will
not enjoy idempotence across sessions unless their messages are
transactional. ie. that the sends  are wrapped between `beginTransaction`,
`sendOffsets`, and `commitTransaction`.

From the KIP-98 wiki and the design document, I understand that AppIDs, PIDs, and sequence numbers are enforced regardless of their being wrapped in a transaction or not. Is that not so?

Cheers,
Eugen

The comment about the heartbeat was just a passing comment about the fact
that an AppId could be expired if a producer doesn't use transactions for a
long time. We don't plan to implement heartbeats in V1, though we might in
the future.

Hope this clarified things.

Regards,
Apurva


KIP-98 says
 > transaction.app.id: A unique and persistent way to identify a
producer. This is used to ensure idempotency and to enable transaction
recovery or rollback across producer sessions. This is optional: you will
lose cross-session guarantees if this is blank.
which might suggest that a producer that does not use the transactional
features, but does set the transaction.app.id, could get cross-session
idempotency. But the design document "Exactly Once Delivery and
Transactional Messaging in Kafka" rules that out:
 > For the idempotent producer (i.e., producer that do not use
transactional APIs), currently we do not make any cross-session guarantees
in any case. In the future, we can extend this guarantee by having the
producer to periodically send InitPIDRequest to the transaction coordinator
to keep the AppID from expiring, which preserves the producer's zombie
defence.
Until that point in the future, could my non-transactional producer send a
InitPIDRequest once and then heartbeat via BeginTxnRequest/EndTxnRequest(ABORT)
in intervals less than transaction.app.id.timeout.ms in order to
guarantee cross-session itempotency? Or is that not guaranteed because
"currently we do not make any cross-session guarantees in any case"? I know
this is would be an ugly hack.
I guess that is also what the recently added "Producer HeartBeat" feature
proposal would address - although it is described to prevent idle
transactional producers from having their AppIds expired.

Related question: If KIP-98 does not make cross-session guarantees for
idempotent producers, is the only improvement over the current idempotency
situation the prevention of duplicate messages in case of a partition
leader migration? Because if a broker fails or the publisher fails, KIP-98
does not seem to change the risk of dupes for non-transactional producers.






Btw: Good job! Both in terms of Kafka in general, and KIP-98 in particular


Cheers

On Wed, Jan 25, 2017 at 6:00 PM, Apurva Mehta <apu...@confluent.io> wrote:



On Tue, Jan 17, 2017 at 6:17 PM, Apurva Mehta <apu...@confluent.io> wrote:

Hi Jun,

Some answers in line.


109. Could you describe when Producer.send() will receive an Unrecognized

MessageException?


This exception will be thrown if the producer sends a sequence number
which is greater than the sequence number expected by the broker (ie. more
than 1 greater than the previously sent sequence number). This can happen
in two cases:

a) If there is a bug in the producer where sequence numbers are
incremented more than once per message. So the producer itself will send
messages with gaps in sequence numbers.
b) The broker somehow lost a previous message. In a cluster configured
for durability (ie. no unclean leader elections, replication factor of 3,
min.isr of 2, acks=all, etc.), this should not happened.

So realistically, this exception will only be thrown in clusters
configured for high availability where brokers could lose messages.

Becket raised the question if we should throw this exception at all in
case b: it indicates a problem with a previously sent message and hence the
semantics are counter intuitive. We are still discussing this point, and
suggestions are most welcome!


I updated the KIP wiki to clarify when this exception will be raised.

First of all, I renamed this to OutOfOrderSequenceException. Based on
Jay's suggestion, this is a more precise name that is easier to understand.

Secondly, I updated the proposed API so that the send call will never
raise this exception directly. Instead this exception will be returned in
the future or passed with the callback, if any. Further, since this is a
fatal exception, any _future_ invocations of send() or other data
generating methods in the producer will raise an IllegalStateException. I
think this makes the semantics clearer and addresses the feedback on this
part of the API update.

Thanks,
Apurva


Reply via email to