Hi All, We've been putting some thought into the need to buffer fetched data in the consumer in the READ_COMMITTED isolation mode and have a proposal to address the concern. The basic idea is to introduce an index to keep track of the aborted transactions. We use this index to return in each fetch a list of the aborted transactions from the fetch range so that the consumer can tell without any buffering whether a record set should be returned to the user. Take a look and let us know what you think: https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc/edit?usp=sharing .
Thanks, Jason On Sun, Jan 8, 2017 at 9:32 PM, Jun Rao <j...@confluent.io> wrote: > Hi, Jason, > > 100. Yes, AppId level security is mainly for protecting the shared > transaction log. We could also include AppId in produce request (not in > message format) so that we could protect writes at the AppId level. I agree > that we need to support prefix matching on AppId for applications like > stream to use this conveniently. > > A couple of other comments. > > 122. Earlier, Becket asked for the use case of knowing the number of > messages in a message set. One potential use case is KAFKA-4293. Currently, > since we don't know the number of messages in a compressed set, to finish > the iteration, we rely on catching EOF in the decompressor, which adds a > bit overhead in the consumer. > > 123. I am wondering if the coordinator needs to add a "BEGIN transaction > message" on a BeginTxnRequest > <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > 0wSw9ra8/edit#heading=h.lbrw4crdnl5>. > Could we just wait until an AddPartitionsToTxnRequest > <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > 0wSw9ra8/edit#heading=h.r6klddrx9ibz> > ? > > Thanks, > > Jun > > > On Thu, Jan 5, 2017 at 11:05 AM, Jason Gustafson <ja...@confluent.io> > wrote: > > > Hi Jun, > > > > Let me start picking off a some of your questions (we're giving the > shadow > > log suggestion a bit more thought before responding). > > > > 100. Security: It seems that if an app is mistakenly configured with the > > > appId of an existing producer, it can take over the pid and prevent the > > > existing app from publishing. So, I am wondering if it makes sense to > add > > > ACLs at the TransactionResource level just like we do for > > > ConsumerGroupResource. So, a user can only do transactions under a > > > particular appId if he/she has the write permission to the > > > TransactionResource > > > associated with the appId. > > > > > > I think this makes sense in general. There are a couple points worth > > mentioning: > > > > 1. Because we only use the AppID in requests to the transaction > > coordinator, that's the only point at which we can do authorization in > the > > current proposal. It is possible for a malicious producer to hijack > another > > producer's PID and use it to write data. It wouldn't be able to commit or > > abort transactions, but it could effectively fence the legitimate > producer > > from a partition by forcing an epoch bump. We could add the AppID to the > > ProduceRequest schema, but we would still need to protect its binding to > > the PID somehow. This is one argument in favor of dropping the PID and > > using the AppID in the log message format. However, there are still ways > in > > the current proposal to give better protection if we added the AppID > > authorization at the transaction coordinator as you suggest. Note that a > > malicious producer would have to be authorized to write to the same > topics > > used by the transactional producer. So one way to protect those topics is > > to only allow write access by the authorized transactional producers. The > > transactional producers could still interfere with each other, but > perhaps > > that's a smaller concern (it's similar in effect to the limitations of > > consumer group authorization). > > > > 2. It's a bit unfortunate that we don't have something like the > consumer's > > groupId to use for authorization. The AppID is really more of an instance > > ID (we were reluctant to introduce any formal notion of a producer > group). > > I guess distributed applications could use a common prefix and a wildcard > > authorization policy. I don't think we currently support general > wildcards, > > but that might be helpful for this use case. > > > > -Jason > > > > On Wed, Jan 4, 2017 at 12:55 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > Hey Jun, > > > > > > We had a proposal like this previously. The suppression scheme was > > slightly > > > different. Rather than than attempting to recopy or swap, there was > > instead > > > an aborted offset index maintained along with each segment containing a > > > sequential list of aborted offsets. The filtering would happen at fetch > > > time and would just ensure that fetch requests never span an aborted > > > transaction. That is, if you did a fetch request which would include > > > offsets 7,8,9,10,11, but offsets 7 and 10 appears in the index of > aborted > > > transactions, then the fetch would return 8,9 only even if there was > more > > > space in the fetch response. This leads to minimal overhead, but > > > potentially would give back smaller fetch responses if transactions are > > > being continually aborted. > > > > > > One downside to this approach (both your proposal and the variation > that > > I > > > just described is that it does not allow the possibility of consuming > in > > > transaction commit order. Consuming in transaction commit order means > > that > > > the only delay you incur is the delay in committing a given > transaction. > > > Consuming in offset order means you cannot consume a given offset until > > ALL > > > previously begun transactions are committed or aborted. KIP-98 doesn't > > > propose making this change now, but since it is consumer side it is > > > possible. > > > > > > -Jay > > > > > > On Tue, Jan 3, 2017 at 7:50 AM, Jun Rao <j...@confluent.io> wrote: > > > > > > > Just to follow up on Radai's idea of pushing the buffering logic to > the > > > > broker. It may be possible to do this efficiently if we assume > aborted > > > > transactions are rare. The following is a draft proposal. For each > > > > partition, the broker maintains the last stable offset (LSO) as > > described > > > > in the document, and only exposes messages up to this point if the > > reader > > > > is in the read-committed mode. When a new stable offset (NSO) is > > > > determined, if there is no aborted message in this window, the broker > > > > simply advances the LSO to the NSO. If there is at least one aborted > > > > message, the broker first replaces the current log segment with new > log > > > > segments excluding the aborted messages and then advances the LSO. To > > > make > > > > the replacement efficient, we can replace the current log segment > with > > 3 > > > > new segments: (1) a new "shadow" log segment that simply references > the > > > > portion of the current log segment from the beginning to the LSO, > (2) a > > > log > > > > segment created by copying only committed messages between the LSO > and > > > the > > > > NSO, (3) a new "shadow" log segment that references the portion of > the > > > > current log segment from the NSO (open ended). Note that only (2) > > > involves > > > > real data copying. If aborted transactions are rare, this overhead > will > > > be > > > > insignificant. Assuming that applications typically don't abort > > > > transactions, transactions will only be aborted by transaction > > > coordinators > > > > during hard failure of the producers, which should be rare. > > > > > > > > This way, the consumer library's logic will be simplified. We can > still > > > > expose uncommitted messages to readers in the read-uncommitted mode > and > > > > therefore leave the door open for speculative reader in the future. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Wed, Dec 21, 2016 at 10:44 AM, Apurva Mehta <apu...@confluent.io> > > > > wrote: > > > > > > > > > Hi Joel, > > > > > > > > > > The alternatives are embedded in the 'discussion' sections which > are > > > > spread > > > > > throughout the google doc. > > > > > > > > > > Admittedly, we have not covered high level alternatives like those > > > which > > > > > have been brought up in this thread. In particular, having a > separate > > > log > > > > > for transactional mesages and also having multiple producers > > > participate > > > > in > > > > > a single transaction. > > > > > > > > > > This is an omission which we will correct. > > > > > > > > > > Thanks, > > > > > Apurva > > > > > > > > > > On Wed, Dec 21, 2016 at 10:34 AM, Joel Koshy <jjkosh...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > @Joel, > > > > > > > > > > > > > > I read over your wiki, and apart from the introduction of the > > > notion > > > > of > > > > > > > journal partitions --whose pros and cons are already being > > > > discussed-- > > > > > > you > > > > > > > also introduce the notion of a 'producer group' which enables > > > > multiple > > > > > > > producers to participate in a single transaction. This is > > > completely > > > > > > > opposite of the model in the KIP where a transaction is defined > > by > > > a > > > > > > > producer id, and hence there is a 1-1 mapping between producers > > and > > > > > > > transactions. Further, each producer can have exactly one > > in-flight > > > > > > > transaction at a time in the KIP. > > > > > > > > > > > > > > > > > > > Hi Apurva - yes I did notice those differences among other things > > :) > > > > > BTW, I > > > > > > haven't yet gone through the google-doc carefully but on a skim > it > > > does > > > > > not > > > > > > seem to contain any rejected alternatives as the wiki states. > > > > > > > > > > > > > > > > > > > > >