Re: Transactions and `endOffsets` Java client consumer method

2022-03-21 Thread Chris Jansen
Hi Guozhang,

Thanks for getting back to me. I'm using Confluent's distribution version
6.2.2, so Kafka 2.8.1. Could there be some consumer or broker configuration
I'm missing that would change this behaviour?

Just to confirm, here are my consumer settings:

auto.offset.reset -> earliest, isolation.level -> read_committed,
group.instance.id -> , group.id ->
ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers ->
PLAINTEXT://localhost:58300, enable.auto.commit -> false

Thanks again,


Chris


On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang  wrote:

> Hi Chris,
>
> The broker does take the isolation level in the ListOffset API (which is
> used for the endoffset call) in to consideration:
>
> val lastFetchableOffset = isolationLevel match {
>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>   case None => localLog.logEndOffset
> }
>
>
> If it is READ_COMMITTED, it would only return the last stable offset.
>
> But this requires the broker version to be newer enough to actually
> know the new format of the request, is your broker on the newer
> version? Other than that, I cannot think of a reason explaining what
> you saw.
>
>
>
> On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen 
> wrote:
>
> > Hello all,
> >
> > I have the need to query end offsets for a particular topic using a
> > consumer that has been configured with the "READ_COMMITTED" isolation
> > level. The response I get via the Java client
> > includes offsets at the end of the log that have not yet been committed
> and
> > therefore can't be consumed.
> >
> > After digging around in the Java client code, it looks like the isolation
> > level is being transmitted in the API request to the broker. So my
> question
> > is does the broker use this information when crafting a response and, if
> it
> > is taken into account, why does it return log end offsets for
> transactions
> > that have not yet been committed?
> >
> > For my own future reference I was struggling to find any details on the
> > broker API anywhere, if someone could point me in the right direction,
> I'd
> > really appreciate it.
> >
> > Thanks,
> >
> >
> > Chris Jansen
> >
>
>
> --
> -- Guozhang
>


Re: Transactions and `endOffsets` Java client consumer method

2022-03-21 Thread Chris Jansen
So an update on this, it seems that the consumer reports an unreadable end
of the log if the transaction has been aborted. Is this the intended
behaviour?

Thanks again,

Chris

On Mon, Mar 21, 2022 at 12:25 PM Chris Jansen 
wrote:

> Hi Guozhang,
>
> Thanks for getting back to me. I'm using Confluent's distribution version
> 6.2.2, so Kafka 2.8.1. Could there be some consumer or broker configuration
> I'm missing that would change this behaviour?
>
> Just to confirm, here are my consumer settings:
>
> auto.offset.reset -> earliest, isolation.level -> read_committed,
> group.instance.id -> , group.id ->
> ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers ->
> PLAINTEXT://localhost:58300, enable.auto.commit -> false
>
> Thanks again,
>
>
> Chris
>
>
> On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang  wrote:
>
>> Hi Chris,
>>
>> The broker does take the isolation level in the ListOffset API (which is
>> used for the endoffset call) in to consideration:
>>
>> val lastFetchableOffset = isolationLevel match {
>>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>>   case None => localLog.logEndOffset
>> }
>>
>>
>> If it is READ_COMMITTED, it would only return the last stable offset.
>>
>> But this requires the broker version to be newer enough to actually
>> know the new format of the request, is your broker on the newer
>> version? Other than that, I cannot think of a reason explaining what
>> you saw.
>>
>>
>>
>> On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen 
>> wrote:
>>
>> > Hello all,
>> >
>> > I have the need to query end offsets for a particular topic using a
>> > consumer that has been configured with the "READ_COMMITTED" isolation
>> > level. The response I get via the Java client
>> > includes offsets at the end of the log that have not yet been committed
>> and
>> > therefore can't be consumed.
>> >
>> > After digging around in the Java client code, it looks like the
>> isolation
>> > level is being transmitted in the API request to the broker. So my
>> question
>> > is does the broker use this information when crafting a response and,
>> if it
>> > is taken into account, why does it return log end offsets for
>> transactions
>> > that have not yet been committed?
>> >
>> > For my own future reference I was struggling to find any details on the
>> > broker API anywhere, if someone could point me in the right direction,
>> I'd
>> > really appreciate it.
>> >
>> > Thanks,
>> >
>> >
>> > Chris Jansen
>> >
>>
>>
>> --
>> -- Guozhang
>>
>


Re: Transactions and `endOffsets` Java client consumer method

2022-03-21 Thread Guozhang Wang
Hi Chris,

What do you mean by "an unreadable end of the log"? Could you elaborate?

The version you used is recent enough, and the configs seems okay. So I
think there are some issues elsewhere.

On Mon, Mar 21, 2022 at 5:51 AM Chris Jansen 
wrote:

> So an update on this, it seems that the consumer reports an unreadable end
> of the log if the transaction has been aborted. Is this the intended
> behaviour?
>
> Thanks again,
>
> Chris
>
> On Mon, Mar 21, 2022 at 12:25 PM Chris Jansen 
> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for getting back to me. I'm using Confluent's distribution version
> > 6.2.2, so Kafka 2.8.1. Could there be some consumer or broker
> configuration
> > I'm missing that would change this behaviour?
> >
> > Just to confirm, here are my consumer settings:
> >
> > auto.offset.reset -> earliest, isolation.level -> read_committed,
> > group.instance.id -> , group.id ->
> > ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers ->
> > PLAINTEXT://localhost:58300, enable.auto.commit -> false
> >
> > Thanks again,
> >
> >
> > Chris
> >
> >
> > On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang 
> wrote:
> >
> >> Hi Chris,
> >>
> >> The broker does take the isolation level in the ListOffset API (which is
> >> used for the endoffset call) in to consideration:
> >>
> >> val lastFetchableOffset = isolationLevel match {
> >>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
> >>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
> >>   case None => localLog.logEndOffset
> >> }
> >>
> >>
> >> If it is READ_COMMITTED, it would only return the last stable offset.
> >>
> >> But this requires the broker version to be newer enough to actually
> >> know the new format of the request, is your broker on the newer
> >> version? Other than that, I cannot think of a reason explaining what
> >> you saw.
> >>
> >>
> >>
> >> On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen <
> chris.jan...@permutive.com>
> >> wrote:
> >>
> >> > Hello all,
> >> >
> >> > I have the need to query end offsets for a particular topic using a
> >> > consumer that has been configured with the "READ_COMMITTED" isolation
> >> > level. The response I get via the Java client
> >> > includes offsets at the end of the log that have not yet been
> committed
> >> and
> >> > therefore can't be consumed.
> >> >
> >> > After digging around in the Java client code, it looks like the
> >> isolation
> >> > level is being transmitted in the API request to the broker. So my
> >> question
> >> > is does the broker use this information when crafting a response and,
> >> if it
> >> > is taken into account, why does it return log end offsets for
> >> transactions
> >> > that have not yet been committed?
> >> >
> >> > For my own future reference I was struggling to find any details on
> the
> >> > broker API anywhere, if someone could point me in the right direction,
> >> I'd
> >> > really appreciate it.
> >> >
> >> > Thanks,
> >> >
> >> >
> >> > Chris Jansen
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>


-- 
-- Guozhang