I not sure I'm clear on the expected behavior of READ_COMMITTED in some interleaved cases:
* If a transaction starts, sends few messages and someone writes non-transactional event into the same topic/partition, few more events from that transaction and then a commit. I think the producer will block until the commit, but I'm not sure in what order I'll see events after that. * Same for transaction A starts, transactions B starts, transaction B commits, transaction A commits... when will we unblock? and what will we see? If I missed more edge-cases, feel free to clarify those too :) Gwen On Tue, Jan 10, 2017 at 9:30 AM, Jason Gustafson <ja...@confluent.io> wrote: > Hi All, > > We've been putting some thought into the need to buffer fetched data in the > consumer in the READ_COMMITTED isolation mode and have a proposal to > address the concern. The basic idea is to introduce an index to keep track > of the aborted transactions. We use this index to return in each fetch a > list of the aborted transactions from the fetch range so that the consumer > can tell without any buffering whether a record set should be returned to > the user. Take a look and let us know what you think: > https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc/edit?usp=sharing > . > > Thanks, > Jason > > On Sun, Jan 8, 2017 at 9:32 PM, Jun Rao <j...@confluent.io> wrote: > >> Hi, Jason, >> >> 100. Yes, AppId level security is mainly for protecting the shared >> transaction log. We could also include AppId in produce request (not in >> message format) so that we could protect writes at the AppId level. I agree >> that we need to support prefix matching on AppId for applications like >> stream to use this conveniently. >> >> A couple of other comments. >> >> 122. Earlier, Becket asked for the use case of knowing the number of >> messages in a message set. One potential use case is KAFKA-4293. Currently, >> since we don't know the number of messages in a compressed set, to finish >> the iteration, we rely on catching EOF in the decompressor, which adds a >> bit overhead in the consumer. >> >> 123. I am wondering if the coordinator needs to add a "BEGIN transaction >> message" on a BeginTxnRequest >> <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF >> 0wSw9ra8/edit#heading=h.lbrw4crdnl5>. >> Could we just wait until an AddPartitionsToTxnRequest >> <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF >> 0wSw9ra8/edit#heading=h.r6klddrx9ibz> >> ? >> >> Thanks, >> >> Jun >> >> >> On Thu, Jan 5, 2017 at 11:05 AM, Jason Gustafson <ja...@confluent.io> >> wrote: >> >> > Hi Jun, >> > >> > Let me start picking off a some of your questions (we're giving the >> shadow >> > log suggestion a bit more thought before responding). >> > >> > 100. Security: It seems that if an app is mistakenly configured with the >> > > appId of an existing producer, it can take over the pid and prevent the >> > > existing app from publishing. So, I am wondering if it makes sense to >> add >> > > ACLs at the TransactionResource level just like we do for >> > > ConsumerGroupResource. So, a user can only do transactions under a >> > > particular appId if he/she has the write permission to the >> > > TransactionResource >> > > associated with the appId. >> > >> > >> > I think this makes sense in general. There are a couple points worth >> > mentioning: >> > >> > 1. Because we only use the AppID in requests to the transaction >> > coordinator, that's the only point at which we can do authorization in >> the >> > current proposal. It is possible for a malicious producer to hijack >> another >> > producer's PID and use it to write data. It wouldn't be able to commit or >> > abort transactions, but it could effectively fence the legitimate >> producer >> > from a partition by forcing an epoch bump. We could add the AppID to the >> > ProduceRequest schema, but we would still need to protect its binding to >> > the PID somehow. This is one argument in favor of dropping the PID and >> > using the AppID in the log message format. However, there are still ways >> in >> > the current proposal to give better protection if we added the AppID >> > authorization at the transaction coordinator as you suggest. Note that a >> > malicious producer would have to be authorized to write to the same >> topics >> > used by the transactional producer. So one way to protect those topics is >> > to only allow write access by the authorized transactional producers. The >> > transactional producers could still interfere with each other, but >> perhaps >> > that's a smaller concern (it's similar in effect to the limitations of >> > consumer group authorization). >> > >> > 2. It's a bit unfortunate that we don't have something like the >> consumer's >> > groupId to use for authorization. The AppID is really more of an instance >> > ID (we were reluctant to introduce any formal notion of a producer >> group). >> > I guess distributed applications could use a common prefix and a wildcard >> > authorization policy. I don't think we currently support general >> wildcards, >> > but that might be helpful for this use case. >> > >> > -Jason >> > >> > On Wed, Jan 4, 2017 at 12:55 PM, Jay Kreps <j...@confluent.io> wrote: >> > >> > > Hey Jun, >> > > >> > > We had a proposal like this previously. The suppression scheme was >> > slightly >> > > different. Rather than than attempting to recopy or swap, there was >> > instead >> > > an aborted offset index maintained along with each segment containing a >> > > sequential list of aborted offsets. The filtering would happen at fetch >> > > time and would just ensure that fetch requests never span an aborted >> > > transaction. That is, if you did a fetch request which would include >> > > offsets 7,8,9,10,11, but offsets 7 and 10 appears in the index of >> aborted >> > > transactions, then the fetch would return 8,9 only even if there was >> more >> > > space in the fetch response. This leads to minimal overhead, but >> > > potentially would give back smaller fetch responses if transactions are >> > > being continually aborted. >> > > >> > > One downside to this approach (both your proposal and the variation >> that >> > I >> > > just described is that it does not allow the possibility of consuming >> in >> > > transaction commit order. Consuming in transaction commit order means >> > that >> > > the only delay you incur is the delay in committing a given >> transaction. >> > > Consuming in offset order means you cannot consume a given offset until >> > ALL >> > > previously begun transactions are committed or aborted. KIP-98 doesn't >> > > propose making this change now, but since it is consumer side it is >> > > possible. >> > > >> > > -Jay >> > > >> > > On Tue, Jan 3, 2017 at 7:50 AM, Jun Rao <j...@confluent.io> wrote: >> > > >> > > > Just to follow up on Radai's idea of pushing the buffering logic to >> the >> > > > broker. It may be possible to do this efficiently if we assume >> aborted >> > > > transactions are rare. The following is a draft proposal. For each >> > > > partition, the broker maintains the last stable offset (LSO) as >> > described >> > > > in the document, and only exposes messages up to this point if the >> > reader >> > > > is in the read-committed mode. When a new stable offset (NSO) is >> > > > determined, if there is no aborted message in this window, the broker >> > > > simply advances the LSO to the NSO. If there is at least one aborted >> > > > message, the broker first replaces the current log segment with new >> log >> > > > segments excluding the aborted messages and then advances the LSO. To >> > > make >> > > > the replacement efficient, we can replace the current log segment >> with >> > 3 >> > > > new segments: (1) a new "shadow" log segment that simply references >> the >> > > > portion of the current log segment from the beginning to the LSO, >> (2) a >> > > log >> > > > segment created by copying only committed messages between the LSO >> and >> > > the >> > > > NSO, (3) a new "shadow" log segment that references the portion of >> the >> > > > current log segment from the NSO (open ended). Note that only (2) >> > > involves >> > > > real data copying. If aborted transactions are rare, this overhead >> will >> > > be >> > > > insignificant. Assuming that applications typically don't abort >> > > > transactions, transactions will only be aborted by transaction >> > > coordinators >> > > > during hard failure of the producers, which should be rare. >> > > > >> > > > This way, the consumer library's logic will be simplified. We can >> still >> > > > expose uncommitted messages to readers in the read-uncommitted mode >> and >> > > > therefore leave the door open for speculative reader in the future. >> > > > >> > > > Thanks, >> > > > >> > > > Jun >> > > > >> > > > >> > > > On Wed, Dec 21, 2016 at 10:44 AM, Apurva Mehta <apu...@confluent.io> >> > > > wrote: >> > > > >> > > > > Hi Joel, >> > > > > >> > > > > The alternatives are embedded in the 'discussion' sections which >> are >> > > > spread >> > > > > throughout the google doc. >> > > > > >> > > > > Admittedly, we have not covered high level alternatives like those >> > > which >> > > > > have been brought up in this thread. In particular, having a >> separate >> > > log >> > > > > for transactional mesages and also having multiple producers >> > > participate >> > > > in >> > > > > a single transaction. >> > > > > >> > > > > This is an omission which we will correct. >> > > > > >> > > > > Thanks, >> > > > > Apurva >> > > > > >> > > > > On Wed, Dec 21, 2016 at 10:34 AM, Joel Koshy <jjkosh...@gmail.com> >> > > > wrote: >> > > > > >> > > > > > > >> > > > > > > >> > > > > > > @Joel, >> > > > > > > >> > > > > > > I read over your wiki, and apart from the introduction of the >> > > notion >> > > > of >> > > > > > > journal partitions --whose pros and cons are already being >> > > > discussed-- >> > > > > > you >> > > > > > > also introduce the notion of a 'producer group' which enables >> > > > multiple >> > > > > > > producers to participate in a single transaction. This is >> > > completely >> > > > > > > opposite of the model in the KIP where a transaction is defined >> > by >> > > a >> > > > > > > producer id, and hence there is a 1-1 mapping between producers >> > and >> > > > > > > transactions. Further, each producer can have exactly one >> > in-flight >> > > > > > > transaction at a time in the KIP. >> > > > > > > >> > > > > > >> > > > > > Hi Apurva - yes I did notice those differences among other things >> > :) >> > > > > BTW, I >> > > > > > haven't yet gone through the google-doc carefully but on a skim >> it >> > > does >> > > > > not >> > > > > > seem to contain any rejected alternatives as the wiki states. >> > > > > > >> > > > > >> > > > >> > > >> > >> -- Gwen Shapira Product Manager | Confluent 650.450.2760 | @gwenshap Follow us: Twitter | blog