I not sure I'm clear on the expected behavior of READ_COMMITTED in
some interleaved cases:

* If a transaction starts, sends few messages and someone writes
non-transactional event into the same topic/partition, few more events
from that transaction and then a commit. I think the producer will
block until the commit, but I'm not sure in what order I'll see events
after that.
* Same for transaction A starts, transactions B starts, transaction B
commits, transaction A commits... when will we unblock? and what will
we see?

If I missed more edge-cases, feel free to clarify those too :)

Gwen

On Tue, Jan 10, 2017 at 9:30 AM, Jason Gustafson <ja...@confluent.io> wrote:
> Hi All,
>
> We've been putting some thought into the need to buffer fetched data in the
> consumer in the READ_COMMITTED isolation mode and have a proposal to
> address the concern. The basic idea is to introduce an index to keep track
> of the aborted transactions. We use this index to return in each fetch a
> list of the aborted transactions from the fetch range so that the consumer
> can tell without any buffering whether a record set should be returned to
> the user. Take a look and let us know what you think:
> https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc/edit?usp=sharing
> .
>
> Thanks,
> Jason
>
> On Sun, Jan 8, 2017 at 9:32 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Jason,
>>
>> 100. Yes, AppId level security is mainly for protecting the shared
>> transaction log. We could also include AppId in produce request (not in
>> message format) so that we could protect writes at the AppId level. I agree
>> that we need to support prefix matching on AppId for applications like
>> stream to use this conveniently.
>>
>> A couple of other comments.
>>
>> 122. Earlier, Becket asked for the use case of knowing the number of
>> messages in a message set. One potential use case is KAFKA-4293. Currently,
>> since we don't know the number of messages in a compressed set, to finish
>> the iteration, we rely on catching EOF in the decompressor, which adds a
>> bit overhead in the consumer.
>>
>> 123. I am wondering if the coordinator needs to add a "BEGIN transaction
>> message" on a BeginTxnRequest
>> <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>> 0wSw9ra8/edit#heading=h.lbrw4crdnl5>.
>> Could we just wait until an AddPartitionsToTxnRequest
>> <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
>> 0wSw9ra8/edit#heading=h.r6klddrx9ibz>
>> ?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Thu, Jan 5, 2017 at 11:05 AM, Jason Gustafson <ja...@confluent.io>
>> wrote:
>>
>> > Hi Jun,
>> >
>> > Let me start picking off a some of your questions (we're giving the
>> shadow
>> > log suggestion a bit more thought before responding).
>> >
>> > 100. Security: It seems that if an app is mistakenly configured with the
>> > > appId of an existing producer, it can take over the pid and prevent the
>> > > existing app from publishing. So, I am wondering if it makes sense to
>> add
>> > > ACLs at the TransactionResource level just like we do for
>> > > ConsumerGroupResource. So, a user can only do transactions under a
>> > > particular appId if he/she has the write permission to the
>> > > TransactionResource
>> > > associated with the appId.
>> >
>> >
>> > I think this makes sense in general. There are a couple points worth
>> > mentioning:
>> >
>> > 1. Because we only use the AppID in requests to the transaction
>> > coordinator, that's the only point at which we can do authorization in
>> the
>> > current proposal. It is possible for a malicious producer to hijack
>> another
>> > producer's PID and use it to write data. It wouldn't be able to commit or
>> > abort transactions, but it could effectively fence the legitimate
>> producer
>> > from a partition by forcing an epoch bump. We could add the AppID to the
>> > ProduceRequest schema, but we would still need to protect its binding to
>> > the PID somehow. This is one argument in favor of dropping the PID and
>> > using the AppID in the log message format. However, there are still ways
>> in
>> > the current proposal to give better protection if we added the AppID
>> > authorization at the transaction coordinator as you suggest. Note that a
>> > malicious producer would have to be authorized to write to the same
>> topics
>> > used by the transactional producer. So one way to protect those topics is
>> > to only allow write access by the authorized transactional producers. The
>> > transactional producers could still interfere with each other, but
>> perhaps
>> > that's a smaller concern (it's similar in effect to the limitations of
>> > consumer group authorization).
>> >
>> > 2. It's a bit unfortunate that we don't have something like the
>> consumer's
>> > groupId to use for authorization. The AppID is really more of an instance
>> > ID (we were reluctant to introduce any formal notion of a producer
>> group).
>> > I guess distributed applications could use a common prefix and a wildcard
>> > authorization policy. I don't think we currently support general
>> wildcards,
>> > but that might be helpful for this use case.
>> >
>> > -Jason
>> >
>> > On Wed, Jan 4, 2017 at 12:55 PM, Jay Kreps <j...@confluent.io> wrote:
>> >
>> > > Hey Jun,
>> > >
>> > > We had a proposal like this previously. The suppression scheme was
>> > slightly
>> > > different. Rather than than attempting to recopy or swap, there was
>> > instead
>> > > an aborted offset index maintained along with each segment containing a
>> > > sequential list of aborted offsets. The filtering would happen at fetch
>> > > time and would just ensure that fetch requests never span an aborted
>> > > transaction. That is, if you did a fetch request which would include
>> > > offsets 7,8,9,10,11, but offsets 7 and 10 appears in the index of
>> aborted
>> > > transactions, then the fetch would return 8,9 only even if there was
>> more
>> > > space in the fetch response. This leads to minimal overhead, but
>> > > potentially would give back smaller fetch responses if transactions are
>> > > being continually aborted.
>> > >
>> > > One downside to this approach (both your proposal and the variation
>> that
>> > I
>> > > just described is that it does not allow the possibility of consuming
>> in
>> > > transaction commit order. Consuming in transaction commit order means
>> > that
>> > > the only delay you incur is the delay in committing a given
>> transaction.
>> > > Consuming in offset order means you cannot consume a given offset until
>> > ALL
>> > > previously begun transactions are committed or aborted. KIP-98 doesn't
>> > > propose making this change now, but since it is consumer side it is
>> > > possible.
>> > >
>> > > -Jay
>> > >
>> > > On Tue, Jan 3, 2017 at 7:50 AM, Jun Rao <j...@confluent.io> wrote:
>> > >
>> > > > Just to follow up on Radai's idea of pushing the buffering logic to
>> the
>> > > > broker. It may be possible to do this efficiently if we assume
>> aborted
>> > > > transactions are rare. The following is a draft proposal. For each
>> > > > partition, the broker maintains the last stable offset (LSO) as
>> > described
>> > > > in the document, and only exposes messages up to this point if the
>> > reader
>> > > > is in the read-committed mode. When a new stable offset (NSO) is
>> > > > determined, if there is no aborted message in this window, the broker
>> > > > simply advances the LSO to the NSO. If there is at least one aborted
>> > > > message, the broker first replaces the current log segment with new
>> log
>> > > > segments excluding the aborted messages and then advances the LSO. To
>> > > make
>> > > > the replacement efficient, we can replace the current log segment
>> with
>> > 3
>> > > > new segments: (1) a new "shadow" log segment that simply references
>> the
>> > > > portion of the current log segment from the beginning to the LSO,
>> (2) a
>> > > log
>> > > > segment created by copying only committed messages between the LSO
>> and
>> > > the
>> > > > NSO, (3) a new "shadow" log segment that references the portion of
>> the
>> > > > current log segment from the NSO (open ended). Note that only (2)
>> > > involves
>> > > > real data copying. If aborted transactions are rare, this overhead
>> will
>> > > be
>> > > > insignificant. Assuming that applications typically don't abort
>> > > > transactions, transactions will only be aborted by transaction
>> > > coordinators
>> > > > during hard failure of the producers, which should be rare.
>> > > >
>> > > > This way, the consumer library's logic will be simplified. We can
>> still
>> > > > expose uncommitted messages to readers in the read-uncommitted mode
>> and
>> > > > therefore leave the door open for speculative reader in the future.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jun
>> > > >
>> > > >
>> > > > On Wed, Dec 21, 2016 at 10:44 AM, Apurva Mehta <apu...@confluent.io>
>> > > > wrote:
>> > > >
>> > > > > Hi Joel,
>> > > > >
>> > > > > The alternatives are embedded in the 'discussion' sections which
>> are
>> > > > spread
>> > > > > throughout the google doc.
>> > > > >
>> > > > > Admittedly, we have not covered high level alternatives like those
>> > > which
>> > > > > have been brought up in this thread. In particular, having a
>> separate
>> > > log
>> > > > > for transactional mesages and also having multiple producers
>> > > participate
>> > > > in
>> > > > > a single transaction.
>> > > > >
>> > > > > This is an omission which we will correct.
>> > > > >
>> > > > > Thanks,
>> > > > > Apurva
>> > > > >
>> > > > > On Wed, Dec 21, 2016 at 10:34 AM, Joel Koshy <jjkosh...@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > @Joel,
>> > > > > > >
>> > > > > > > I read over your wiki, and apart from the introduction of the
>> > > notion
>> > > > of
>> > > > > > > journal partitions --whose pros and cons are already being
>> > > > discussed--
>> > > > > > you
>> > > > > > > also introduce the notion of a 'producer group' which enables
>> > > > multiple
>> > > > > > > producers to participate in a single transaction. This is
>> > > completely
>> > > > > > > opposite of the model in the KIP where a transaction is defined
>> > by
>> > > a
>> > > > > > > producer id, and hence there is a 1-1 mapping between producers
>> > and
>> > > > > > > transactions. Further, each producer can have exactly one
>> > in-flight
>> > > > > > > transaction at a time in the KIP.
>> > > > > > >
>> > > > > >
>> > > > > > Hi Apurva - yes I did notice those differences among other things
>> > :)
>> > > > > BTW, I
>> > > > > > haven't yet gone through the google-doc carefully but on a skim
>> it
>> > > does
>> > > > > not
>> > > > > > seem to contain any rejected alternatives as the wiki states.
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog

Reply via email to