Thanks for the proposal. A few more detailed comments. 100. Security: It seems that if an app is mistakenly configured with the appId of an existing producer, it can take over the pid and prevent the existing app from publishing. So, I am wondering if it makes sense to add ACLs at the TransactionResource level just like we do for ConsumerGroupResource. So, a user can only do transactions under a particular appId if he/she has the write permission to the TransactionResource associated with the appId.
101. Compatibility during upgrade: Suppose that the brokers are upgraded to the new version, but the broker message format is still the old one. If a new producer uses the transaction feature, should the producer get an error in this case? A tricky case can be that the leader broker is on the new message format, but the follower broker is still on the old message format. In this case, the transactional info will be lost in the follower due to down conversion. Should we failed the transactional requests when the followers are still on the old message format? 102. When there is a correlated hard failure (e.g., power outage), it's possible that an existing commit/abort marker is lost in all replicas. This may not be fixed by the transaction coordinator automatically and the consumer may get stuck on that incomplete transaction forever. Not sure what's the best way to address this. Perhaps, one way is to run a tool to add an abort maker for all pids in all affected partitions. 103. Currently, there is no check for producer liveness. This means that if a producer has not been sending transactional requests for a long time, its appId will be expired by the coordinator. Have we considered having producers sending heartbeatRequest just like the consumer to keep it alive? 104. The logic for handling follower truncation can be a bit tricker now. The truncation may rewind the sequence number for some pids. The question is how to quickly recover the last sequence number of those pids. Do we plan to reload from a PID snapshot and scan forward? 105. When the transaction coordinator changes (due to leadership changes), it's possible for both the old and the new coordinator sending requests to a broker at the same time (for a short period of time). I am wondering if we need to add logic to fence off the old coordinator. One way to do that is to include the leaderEpoch of the partition associated with the coordinator in the coordinator to broker request and control messages. 106. Compacted topics. 106.1. When all messages in a transaction are removed, we could remove the commit/abort marker for that transaction too. However, we have to be a bit careful. If the marker is removed too quickly, it's possible for a consumer to see a message in that transaction, but not to see the marker, and therefore will be stuck in that transaction forever. We have a similar issue when dealing with tombstones. The solution is to preserve the tombstone for at least a preconfigured amount of time after the cleaning has passed the tombstone. Then, as long as a consumer can finish reading to the cleaning point within the configured amount of time, it's guaranteed not to miss the tombstone after it has seen a non-tombstone message on the same key. I am wondering if we should do something similar here. 106.2. "To address this problem, we propose to preserve the last epoch and sequence number written by each producer for a fixed amount of time as an empty message set. This is allowed by the new message format we are proposing in this document. The time to preserve the sequence number will be governed by the log retention settings. " Could you be a bit more specific on what retention time will be used since by default, there is no retention time for compacted (but not delete) topic? 106.3 "As for control messages, if the broker does not have any corresponding transaction cached with the PID when encountering a control message, that message can be safely removed." Do controlled messages have keys? If not, do we need to relax the constraint that messages in a compacted topic must have keys? 107. Could you include the default values for the newly introduced configs? 108. Could you describe the format of the PID snapshot file? 109. Could you describe when Producer.send() will receive an UnrecognizedM essageException? 110. Transaction log: 110.1 "Key => Version AppID Version" It seems that Version should really be Type? 110.2 "Value => Version Epoch Status ExpirationTime [Topic Partition]" Should we store [Topic [Partition]] instead? 110.3 To expire an AppId, do we need to insert a tombstone with the expired AppID as the key to physically remove the existing AppID entries in the transaction log? 111. Transaction coordinator startup: "Verify that there is already an entry with the PID in the AppID map. If there is not, raise an exception." For completed transactions, it seems that it's possible that their AppId->pid has been compacted out. But that shouldn't trigger an exception? 112. Control message: Will control messages be used for timestamp indexing? If so, what timestamp will we use if the timestamp type is creation time? 113. Zombie producer: "If the zombie has an ongoing transaction with its old PID while its AppID is being expired by the coordinator, by the time the zombie is about to commit the transaction it needs to talk to coordinator again and will be notified its PID is unrecognized and hence need to re-register its AppID with the InitPIDRequest. At this time, if there is already another registered producer with the same AppID, then this request will be rejected with the fatal ProducerFenced error code." Is that right? According the the coordinator request handling logic, it seems that the InitPIDRequest will bump up the epoch of the pid and succeed? 114.The section on Discussion on Pro-active Transaction Timeout: "If there is no other instance with the same PID yet, or it has not started a transaction, or it has not appended a message to some of the partitions, then the zombie can continue appending messages to those partitions after the abort marker whose epoch has not been incremented yet, but its commitTxn call will fail." Is that correct, in earlier discussion, it seems that if a transaction is timed out by the coordinator, the coordinator will bump up epoch and write the abort marker to those inserted partitions. 115. Message format: 115.1 Epoch is int16. Does it wrap after max? If not, is int16 too small since it's possible for a producer to be restarted 10s thousands of times? 115.2 Sequence number int32. Does it wrap after max? It's possible for a producer to publish more than 2 billion messages in a session. 115.3 "Null-value bit is 1: skip the key-length (since it can now be calculated) and value fields." It seems that it's unnatural for the format of key to depend on value. It seems it's easier to just skip value in this case? 116. ProducerRequest: The existing format doesn't have "MessageSetSize" at the partition level. 117. UpdateTxnRequest: Could you explain the format of Marker? 118. TxnOffsetCommitRequest: How is retention time determined? Do we need a new config in producer or just default it to -1 as the consumer? 119. InitPIDRequest <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#heading=h.z99xar1h2enr> : Should we write the completion of open transactions before append the pid with bumped up epoch to the transaction log? 120. transaction.app.id: An app may have multiple concurrent instances. Perhaps we should name it transaction.instance.id or just instance.id? 121. The ordering is important with idempotent producer, which means that max.in.flight.requests.per.connection should be set to 1. Do we want to enforce this? Thanks, Jun On Tue, Jan 3, 2017 at 5:38 PM, radai <radai.rosenbl...@gmail.com> wrote: > @jun - good proposal. i was willing to concede that read-uncommitted was > impossible under my proposal but if LSO/NSO is introduced is becomes > possible. > > > On Tue, Jan 3, 2017 at 7:50 AM, Jun Rao <j...@confluent.io> wrote: > > > Just to follow up on Radai's idea of pushing the buffering logic to the > > broker. It may be possible to do this efficiently if we assume aborted > > transactions are rare. The following is a draft proposal. For each > > partition, the broker maintains the last stable offset (LSO) as described > > in the document, and only exposes messages up to this point if the reader > > is in the read-committed mode. When a new stable offset (NSO) is > > determined, if there is no aborted message in this window, the broker > > simply advances the LSO to the NSO. If there is at least one aborted > > message, the broker first replaces the current log segment with new log > > segments excluding the aborted messages and then advances the LSO. To > make > > the replacement efficient, we can replace the current log segment with 3 > > new segments: (1) a new "shadow" log segment that simply references the > > portion of the current log segment from the beginning to the LSO, (2) a > log > > segment created by copying only committed messages between the LSO and > the > > NSO, (3) a new "shadow" log segment that references the portion of the > > current log segment from the NSO (open ended). Note that only (2) > involves > > real data copying. If aborted transactions are rare, this overhead will > be > > insignificant. Assuming that applications typically don't abort > > transactions, transactions will only be aborted by transaction > coordinators > > during hard failure of the producers, which should be rare. > > > > This way, the consumer library's logic will be simplified. We can still > > expose uncommitted messages to readers in the read-uncommitted mode and > > therefore leave the door open for speculative reader in the future. > > > > Thanks, > > > > Jun > > > > > > On Wed, Dec 21, 2016 at 10:44 AM, Apurva Mehta <apu...@confluent.io> > > wrote: > > > > > Hi Joel, > > > > > > The alternatives are embedded in the 'discussion' sections which are > > spread > > > throughout the google doc. > > > > > > Admittedly, we have not covered high level alternatives like those > which > > > have been brought up in this thread. In particular, having a separate > log > > > for transactional mesages and also having multiple producers > participate > > in > > > a single transaction. > > > > > > This is an omission which we will correct. > > > > > > Thanks, > > > Apurva > > > > > > On Wed, Dec 21, 2016 at 10:34 AM, Joel Koshy <jjkosh...@gmail.com> > > wrote: > > > > > > > > > > > > > > > > > > @Joel, > > > > > > > > > > I read over your wiki, and apart from the introduction of the > notion > > of > > > > > journal partitions --whose pros and cons are already being > > discussed-- > > > > you > > > > > also introduce the notion of a 'producer group' which enables > > multiple > > > > > producers to participate in a single transaction. This is > completely > > > > > opposite of the model in the KIP where a transaction is defined by > a > > > > > producer id, and hence there is a 1-1 mapping between producers and > > > > > transactions. Further, each producer can have exactly one in-flight > > > > > transaction at a time in the KIP. > > > > > > > > > > > > > Hi Apurva - yes I did notice those differences among other things :) > > > BTW, I > > > > haven't yet gone through the google-doc carefully but on a skim it > does > > > not > > > > seem to contain any rejected alternatives as the wiki states. > > > > > > > > > >