hi Jun

Thanks for the great example.

Whether null has value depends on user expectations regarding the "map
structure." If users expect it to represent both existence and
non-existence, then null is meaningful. Otherwise, for users expecting the
map structure to represent only existence, null might lead them to add
extra filters to prevent NPEs [0][1].

[0]
https://github.com/akka/alpakka-kafka/blob/61cae8c04eff05f1408de8d7d0080a0be4c00aac/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala#L365
[1]
https://github.com/zio/zio-kafka/blob/adfff39bdd86b75f6035bdfb90935f7800d30b52/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala#L630

Best,
Chia-Ping


Jun Rao <j...@confluent.io.invalid> 於 2025年5月6日 週二 上午1:19寫道:

> Hi, Jiunn-Yang,
>
> Looking at the code again, it seems that the assumption "For a real-world
> example, we can see that developers typically do not check for null when
> using this." is not true.
>
> For example, the following is how Streams uses committed().
>             // those which do not have a committed offset would default to
> 0
>             committedOffsets =
> consumer.committed(partitions).entrySet().stream()
>                 .collect(Collectors.toMap(Map.Entry::getKey, e ->
> e.getValue() == null ? 0L : e.getValue().offset()));
>
> The following is how Streams uses offsetsForTimes().
>                     for (final Map.Entry<TopicPartition,
> OffsetAndTimestamp> partitionAndOffset :
> mainConsumer.offsetsForTimes(seekToTimestamps).entrySet()) {
>                         final TopicPartition partition =
> partitionAndOffset.getKey();
>                         final OffsetAndTimestamp seekOffset =
> partitionAndOffset.getValue();
>                         if (seekOffset != null) {
>                             mainConsumer.seek(partition, new
> OffsetAndMetadata(seekOffset.offset()));
>                         } else {
>                             log.debug(
>                                 "Cannot reset offset to non-existing
> timestamp {} (larger than timestamp of last record)" +
>                                     " for partition {}. Seeking to end
> instead.",
>                                 seekToTimestamps.get(partition),
>                                 partition
>                             );
>
> mainConsumer.seekToEnd(Collections.singleton(partitionAndOffset.getKey()));
>                         }
>                     }
>
> In both cases, knowing that the offset doesn't exist is actually important
> to the application. Instead of ignoring the missing offset, the application
> wants to explicitly reset the offset to a default value. So the suggested
> change in the KIP actually will make it harder for the application
> developer to write the correct code.
>
> Thanks,
>
> Jun
>
> On Fri, May 2, 2025 at 4:51 AM 黃竣陽 <s7133...@gmail.com> wrote:
>
> > Hi chia, Jun,
> >
> > > It would be useful to think through the consistency with
> > adminClient.listOffset()
> >
> > While the returned map entries are never null, a value of -1 is used to
> > indicate the absence of an offset.
> > Given this approach, I believe we could apply a similar strategy by
> > introducing a new (deprecated) configuration
> > for the Admin client. This would give users the ability to opt into the
> > new behavior in Kafka 4.x.
> >
> > > We should also take into account the behavior of
> > ListOffsetsResult.partitionResult(TopicPartition partition).
> >
> > There are four methods that currently follow a similar pattern:
> > 1. ListOffsetsResult#partitionResult
> > 2. ListConsumerGroupOffsetsResult#partitionsToOffset
> > 3. DescribeTransactionsResult#description
> > 4. DescribeProducersResult#partitionResult However,
> >
> > I'm not entirely sure how these methods relate to the issue of map
> entries
> > being null. It seems that these result classes
> > behave differently from others in this regard.
> >
> > > It seems that consumer.committed()
> > > https://github.com/apache/kafka/pull/19578#discussion_r2068913749
> >
> > I will address these inconsistency method into KIP.
> >
> > Best Regards,
> > Jiunn-Yang
> >
> > > Chia-Ping Tsai <chia7...@gmail.com> 於 2025年5月1日 凌晨3:20 寫道:
> > >
> > > hi Jun
> > >
> > >> It seems that consumer.committed() has the same behavior. If you pass
> > in a
> > > non-existing partition, it returns a map with a null value. So, if we
> > want
> > > to make a change, it would be useful to make it consistent too.
> > >
> > > yes, both consumers have same behavior.
> > >
> > > For another, `listStreamsGroupOffsets` and `listShareGroupOffsets` also
> > add
> > > null value. They are not in production, but maybe we should keep the
> > > behavior for consistency.
> > >
> > > Best,
> > > Chia-Ping
> > >
> > > Jun Rao <j...@confluent.io.invalid> 於 2025年5月1日 週四 上午2:38寫道:
> > >
> > >> Hi, Chia-Ping,
> > >>
> > >> It seems that consumer.committed() has the same behavior. If you pass
> > in a
> > >> non-existing partition, it returns a map with a null value. So, if we
> > want
> > >> to make a change, it would be useful to make it consistent too.
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >> On Wed, Apr 30, 2025 at 9:05 AM Chia-Ping Tsai <chia7...@gmail.com>
> > wrote:
> > >>
> > >>> hi Ken,
> > >>>
> > >>> there is another inconsistent case for the offset-related APIs. see
> > >>> https://github.com/apache/kafka/pull/19578#discussion_r2068913749
> > >>>
> > >>> Best,
> > >>> Chia-Ping
> > >>>
> > >>> Chia-Ping Tsai <chia7...@gmail.com> 於 2025年4月30日 週三 下午7:03寫道:
> > >>>
> > >>>> hi all,
> > >>>>
> > >>>>> Also, it would be useful to think through the behavior
> > >>>> of ListOffsetsResult.partitionResult(final TopicPartition partition)
> > >> too.
> > >>>>
> > >>>> It seems there are three options:
> > >>>>
> > >>>>   1. Keep the current behavior -  it's inconsistent, as other result
> > >>>>   classes, such as DeleteRecordsResult, don't have a similar method.
> > >>>>   2. Deprecate the current method and add a new method that returns
> > >> all
> > >>>>   futures. I prefer this approach as it would align with other
> result
> > >>> classes.
> > >>>>   3. Return null or KafkaFuture<null> - this is a bad idea, so let's
> > >> not
> > >>>>   consider it.
> > >>>>
> > >>>> By the way, DescribeProducersResult#partitionResult(final
> > >> TopicPartition
> > >>>> partition) needs to be considered as well.
> > >>>> Best,
> > >>>> Chia-Ping
> > >>>>
> > >>>> Jun Rao <j...@confluent.io.invalid> 於 2025年4月30日 週三 上午6:30寫道:
> > >>>>
> > >>>>> Hi, Jiunn-Yang,
> > >>>>>
> > >>>>> Thanks for the KIP.
> > >>>>>
> > >>>>> It would be useful to think through the consistency with
> > >>>>> adminClient.listOffset(). Currently, if the offset for a timestamp
> > >>> doesn't
> > >>>>> exist, consumer.offsetsForTimes() returns a null value for that
> > >>> partition
> > >>>>> while adminClient.listOffset().all().get() returns a
> > >>> ListOffsetsResultInfo
> > >>>>> with -1 as the offset. Should we make the behavior more consistent
> in
> > >>> the
> > >>>>> KIP? Also, it would be useful to think through the behavior
> > >>>>> of ListOffsetsResult.partitionResult(final TopicPartition
> partition)
> > >>> too.
> > >>>>>
> > >>>>> Jun
> > >>>>>
> > >>>>> On Fri, Mar 14, 2025 at 4:33 AM 黃竣陽 <s7133...@gmail.com> wrote:
> > >>>>>
> > >>>>>> Hello everyone,
> > >>>>>>
> > >>>>>> I would like to start a discussion on KIP-1140: Avoid to return
> null
> > >>>>> value
> > >>>>>> in Map from public api of consumer
> > >>>>>> <https://cwiki.apache.org/confluence/x/mIuMEw>
> > >>>>>>
> > >>>>>> This proposal aims to improve the Kafka consumer API by ensuring
> > >> that
> > >>>>> the
> > >>>>>> Map it returns contains only non-null values,
> > >>>>>> aligning with the design philosophy of Java collections. This
> change
> > >>>>>> provides significantly more benefits than drawbacks,
> > >>>>>> enhancing API consistency and usability while reducing errors
> caused
> > >>> by
> > >>>>>> developer misuse.
> > >>>>>>
> > >>>>>> Best Regards,
> > >>>>>> Jiunn-Yang
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
>

Reply via email to