Hi Jun, chia

Thanks for great examples.

I still believe that when a map contains a key, its value should not be null. 
Now that we've introduced 
the allow.null.offsets.entries config as a bridge between the two behaviors, I 
trust that most users who 
rely on null values will have a reasonable path to transition during this 
period. 

I can update the KIP to include the Kafka Streams use case and emphasize that 
the allow.null.offsets.entries 
config will remain available until the next major release, giving most users 
enough time to adapt.

Best Regards,
Jiunn-Yang

> Chia-Ping Tsai <chia7...@gmail.com> 於 2025年5月6日 凌晨1:53 寫道:
> 
> hi Jun
> 
> Thanks for the great example.
> 
> Whether null has value depends on user expectations regarding the "map
> structure." If users expect it to represent both existence and
> non-existence, then null is meaningful. Otherwise, for users expecting the
> map structure to represent only existence, null might lead them to add
> extra filters to prevent NPEs [0][1].
> 
> [0]
> https://github.com/akka/alpakka-kafka/blob/61cae8c04eff05f1408de8d7d0080a0be4c00aac/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala#L365
> [1]
> https://github.com/zio/zio-kafka/blob/adfff39bdd86b75f6035bdfb90935f7800d30b52/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala#L630
> 
> Best,
> Chia-Ping
> 
> 
> Jun Rao <j...@confluent.io.invalid> 於 2025年5月6日 週二 上午1:19寫道:
> 
>> Hi, Jiunn-Yang,
>> 
>> Looking at the code again, it seems that the assumption "For a real-world
>> example, we can see that developers typically do not check for null when
>> using this." is not true.
>> 
>> For example, the following is how Streams uses committed().
>>            // those which do not have a committed offset would default to
>> 0
>>            committedOffsets =
>> consumer.committed(partitions).entrySet().stream()
>>                .collect(Collectors.toMap(Map.Entry::getKey, e ->
>> e.getValue() == null ? 0L : e.getValue().offset()));
>> 
>> The following is how Streams uses offsetsForTimes().
>>                    for (final Map.Entry<TopicPartition,
>> OffsetAndTimestamp> partitionAndOffset :
>> mainConsumer.offsetsForTimes(seekToTimestamps).entrySet()) {
>>                        final TopicPartition partition =
>> partitionAndOffset.getKey();
>>                        final OffsetAndTimestamp seekOffset =
>> partitionAndOffset.getValue();
>>                        if (seekOffset != null) {
>>                            mainConsumer.seek(partition, new
>> OffsetAndMetadata(seekOffset.offset()));
>>                        } else {
>>                            log.debug(
>>                                "Cannot reset offset to non-existing
>> timestamp {} (larger than timestamp of last record)" +
>>                                    " for partition {}. Seeking to end
>> instead.",
>>                                seekToTimestamps.get(partition),
>>                                partition
>>                            );
>> 
>> mainConsumer.seekToEnd(Collections.singleton(partitionAndOffset.getKey()));
>>                        }
>>                    }
>> 
>> In both cases, knowing that the offset doesn't exist is actually important
>> to the application. Instead of ignoring the missing offset, the application
>> wants to explicitly reset the offset to a default value. So the suggested
>> change in the KIP actually will make it harder for the application
>> developer to write the correct code.
>> 
>> Thanks,
>> 
>> Jun
>> 
>> On Fri, May 2, 2025 at 4:51 AM 黃竣陽 <s7133...@gmail.com> wrote:
>> 
>>> Hi chia, Jun,
>>> 
>>>> It would be useful to think through the consistency with
>>> adminClient.listOffset()
>>> 
>>> While the returned map entries are never null, a value of -1 is used to
>>> indicate the absence of an offset.
>>> Given this approach, I believe we could apply a similar strategy by
>>> introducing a new (deprecated) configuration
>>> for the Admin client. This would give users the ability to opt into the
>>> new behavior in Kafka 4.x.
>>> 
>>>> We should also take into account the behavior of
>>> ListOffsetsResult.partitionResult(TopicPartition partition).
>>> 
>>> There are four methods that currently follow a similar pattern:
>>> 1. ListOffsetsResult#partitionResult
>>> 2. ListConsumerGroupOffsetsResult#partitionsToOffset
>>> 3. DescribeTransactionsResult#description
>>> 4. DescribeProducersResult#partitionResult However,
>>> 
>>> I'm not entirely sure how these methods relate to the issue of map
>> entries
>>> being null. It seems that these result classes
>>> behave differently from others in this regard.
>>> 
>>>> It seems that consumer.committed()
>>>> https://github.com/apache/kafka/pull/19578#discussion_r2068913749
>>> 
>>> I will address these inconsistency method into KIP.
>>> 
>>> Best Regards,
>>> Jiunn-Yang
>>> 
>>>> Chia-Ping Tsai <chia7...@gmail.com> 於 2025年5月1日 凌晨3:20 寫道:
>>>> 
>>>> hi Jun
>>>> 
>>>>> It seems that consumer.committed() has the same behavior. If you pass
>>> in a
>>>> non-existing partition, it returns a map with a null value. So, if we
>>> want
>>>> to make a change, it would be useful to make it consistent too.
>>>> 
>>>> yes, both consumers have same behavior.
>>>> 
>>>> For another, `listStreamsGroupOffsets` and `listShareGroupOffsets` also
>>> add
>>>> null value. They are not in production, but maybe we should keep the
>>>> behavior for consistency.
>>>> 
>>>> Best,
>>>> Chia-Ping
>>>> 
>>>> Jun Rao <j...@confluent.io.invalid> 於 2025年5月1日 週四 上午2:38寫道:
>>>> 
>>>>> Hi, Chia-Ping,
>>>>> 
>>>>> It seems that consumer.committed() has the same behavior. If you pass
>>> in a
>>>>> non-existing partition, it returns a map with a null value. So, if we
>>> want
>>>>> to make a change, it would be useful to make it consistent too.
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jun
>>>>> 
>>>>> On Wed, Apr 30, 2025 at 9:05 AM Chia-Ping Tsai <chia7...@gmail.com>
>>> wrote:
>>>>> 
>>>>>> hi Ken,
>>>>>> 
>>>>>> there is another inconsistent case for the offset-related APIs. see
>>>>>> https://github.com/apache/kafka/pull/19578#discussion_r2068913749
>>>>>> 
>>>>>> Best,
>>>>>> Chia-Ping
>>>>>> 
>>>>>> Chia-Ping Tsai <chia7...@gmail.com> 於 2025年4月30日 週三 下午7:03寫道:
>>>>>> 
>>>>>>> hi all,
>>>>>>> 
>>>>>>>> Also, it would be useful to think through the behavior
>>>>>>> of ListOffsetsResult.partitionResult(final TopicPartition partition)
>>>>> too.
>>>>>>> 
>>>>>>> It seems there are three options:
>>>>>>> 
>>>>>>>  1. Keep the current behavior -  it's inconsistent, as other result
>>>>>>>  classes, such as DeleteRecordsResult, don't have a similar method.
>>>>>>>  2. Deprecate the current method and add a new method that returns
>>>>> all
>>>>>>>  futures. I prefer this approach as it would align with other
>> result
>>>>>> classes.
>>>>>>>  3. Return null or KafkaFuture<null> - this is a bad idea, so let's
>>>>> not
>>>>>>>  consider it.
>>>>>>> 
>>>>>>> By the way, DescribeProducersResult#partitionResult(final
>>>>> TopicPartition
>>>>>>> partition) needs to be considered as well.
>>>>>>> Best,
>>>>>>> Chia-Ping
>>>>>>> 
>>>>>>> Jun Rao <j...@confluent.io.invalid> 於 2025年4月30日 週三 上午6:30寫道:
>>>>>>> 
>>>>>>>> Hi, Jiunn-Yang,
>>>>>>>> 
>>>>>>>> Thanks for the KIP.
>>>>>>>> 
>>>>>>>> It would be useful to think through the consistency with
>>>>>>>> adminClient.listOffset(). Currently, if the offset for a timestamp
>>>>>> doesn't
>>>>>>>> exist, consumer.offsetsForTimes() returns a null value for that
>>>>>> partition
>>>>>>>> while adminClient.listOffset().all().get() returns a
>>>>>> ListOffsetsResultInfo
>>>>>>>> with -1 as the offset. Should we make the behavior more consistent
>> in
>>>>>> the
>>>>>>>> KIP? Also, it would be useful to think through the behavior
>>>>>>>> of ListOffsetsResult.partitionResult(final TopicPartition
>> partition)
>>>>>> too.
>>>>>>>> 
>>>>>>>> Jun
>>>>>>>> 
>>>>>>>> On Fri, Mar 14, 2025 at 4:33 AM 黃竣陽 <s7133...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>>> Hello everyone,
>>>>>>>>> 
>>>>>>>>> I would like to start a discussion on KIP-1140: Avoid to return
>> null
>>>>>>>> value
>>>>>>>>> in Map from public api of consumer
>>>>>>>>> <https://cwiki.apache.org/confluence/x/mIuMEw>
>>>>>>>>> 
>>>>>>>>> This proposal aims to improve the Kafka consumer API by ensuring
>>>>> that
>>>>>>>> the
>>>>>>>>> Map it returns contains only non-null values,
>>>>>>>>> aligning with the design philosophy of Java collections. This
>> change
>>>>>>>>> provides significantly more benefits than drawbacks,
>>>>>>>>> enhancing API consistency and usability while reducing errors
>> caused
>>>>>> by
>>>>>>>>> developer misuse.
>>>>>>>>> 
>>>>>>>>> Best Regards,
>>>>>>>>> Jiunn-Yang
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>>> 
>> 

Reply via email to