Hi Jun,

Thanks for the reply,

I have updated the KIP

Best Regards,
Jiunn-Yang

> Jun Rao <j...@confluent.io.INVALID> 於 2025年8月6日 晚上11:34 寫道:
> 
> Hi, Jiunn-Yang,
> 
> Regarding bootstrap.servers in MirrorClientConfig, it's weird that it
> defaults to an empty list while empty list is not allowed. It seems that
> this should have no default value.
> 
> Thanks,
> 
> Jun
> 
> On Wed, Aug 6, 2025 at 7:56 AM Chia-Ping Tsai <chia7...@gmail.com> wrote:
> 
>> Thanks to Jiunn-Yang for this hard KIP.
>> 
>> It might not be perfect, but we can refine it during the PR review process
>> :)
>> 
>> 
>> 黃竣陽 <s7133...@gmail.com> 於 2025年8月6日 週三 下午7:32寫道:
>> 
>>> Hello chia,
>>> 
>>> Thanks for the reply.
>>> 
>>> chia05: I have updated the KIP to change the default values of
>>> task.assigned.groups
>>> and task.assigned.partitions from null to NO_DEFAULT_VALUE.
>>> 
>>> chia06: I have also updated the KIP based on your feedback.
>>> 
>>> Best Regards,
>>> Jiunn-Yang
>>> 
>>>> Chia-Ping Tsai <chia7...@gmail.com> 於 2025年8月6日 凌晨12:54 寫道:
>>>> 
>>>> hi Jiunn-Yang
>>>> 
>>>> chia05:
>>>> 
>>>> `task.assigned.groups` and `task.assigned.partitions` should not have
>>>> default value, right?
>>>> 
>>>> chia06:
>>>> 
>>>> `new HashSet<>(values).size()` -> `Set.copyOf(values).size()`
>>>> 
>>>> Best,
>>>> Chia-Ping
>>>> 
>>>> Jun Rao <j...@confluent.io.invalid> 於 2025年8月5日 週二 下午11:39寫道:
>>>> 
>>>>> Hi, Jiunn-Yang,
>>>>> 
>>>>> Thanks for the reply and your patience. The KIP looks good to me now.
>>>>> 
>>>>> Hi, Luke, Mickael,
>>>>> 
>>>>> Could you take a look at the KIP again and see if it makes sense to
>> you?
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jun
>>>>> 
>>>>> On Tue, Aug 5, 2025 at 3:04 AM 黃竣陽 <s7133...@gmail.com> wrote:
>>>>> 
>>>>>> Hello Jun,
>>>>>> 
>>>>>> Thanks for the reply,
>>>>>> 
>>>>>> JR46: I have clarified the behavior for null and empty values, so
>> users
>>>>>> can
>>>>>> understand the reasoning behind disallowing empty lists.
>>>>>> 
>>>>>> JR48: I've updated the KIP accordingly.
>>>>>> 
>>>>>> Best Regards,
>>>>>> Jiunn-Yang
>>>>>> 
>>>>>>> Jun Rao <j...@confluent.io.INVALID> 於 2025年8月5日 凌晨12:45 寫道:
>>>>>>> 
>>>>>>> Hi, Jiunn-Yang,
>>>>>>> 
>>>>>>> Thanks for the reply.
>>>>>>> 
>>>>>>> JR46. "Currently, if plugin.path is set to an empty list, no
>>>>>> user-specified
>>>>>>> plugin locations are processed, but plugins from the parent
>>> classloader
>>>>>> are
>>>>>>> still loaded." Could you add why the current behavior is bad for the
>>>>>> users?
>>>>>>> 
>>>>>>> JR48. If we want to preserve the current behavior, we should remove
>>> the
>>>>>>> following validation, right?
>>>>>>>      if (validStrings.length == 0) {
>>>>>>>          throw new IllegalArgumentException("Valid strings list
>>>>> cannot
>>>>>>> be empty for in validator");
>>>>>>> 
>>>>>>> Jun
>>>>>>> 
>>>>>>> On Fri, Aug 1, 2025 at 6:47 PM 黃竣陽 <s7133...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Hello Jun,
>>>>>>>> 
>>>>>>>> Thanks for the reply.
>>>>>>>> 
>>>>>>>> JR42, JR45, JR46, JR47: I have updated the KIP.
>>>>>>>> 
>>>>>>>> JR43: If bootstrap.servers is empty for these configurations,
>>>>>>>> it will fail when creating the AdminClient. I have updated the KIP
>> to
>>>>>>>> reflect this behavior.
>>>>>>>> 
>>>>>>>> JR44: Thanks for pointing this out. Kafka will indeed fail at the
>>>>>> storage
>>>>>>>> format phase
>>>>>>>> if the configuration is invalid. I have updated the KIP
>> accordingly.
>>>>>>>> 
>>>>>>>> JR48: The current in() method allows empty lists by default, so I
>>>>>> believe
>>>>>>>> setting isEmptyAllowed to true
>>>>>>>> better preserves the original behavior. Therefore, I prefer not to
>>>>>> change
>>>>>>>> this default.
>>>>>>>> 
>>>>>>>> Best Regards,
>>>>>>>> Jiunn-Yang
>>>>>>>> 
>>>>>>>>> Jun Rao <j...@confluent.io.INVALID> 於 2025年8月2日 凌晨2:38 寫道:
>>>>>>>>> 
>>>>>>>>> Hi, Jiunn-Yang,
>>>>>>>>> 
>>>>>>>>> Thanks for the reply. A few more comments.
>>>>>>>>> 
>>>>>>>>> JR42. [6] [7] are not needed since currently an empty list already
>>>>>> causes
>>>>>>>>> an exception.
>>>>>>>>> 
>>>>>>>>> JR43. Could you document the current behavior when
>> bootstrap.servers
>>>>> is
>>>>>>>>> empty in MirrorClientConfig and WorkerConfig?
>>>>>>>>> 
>>>>>>>>> JR44. [12] seems incorrect. I set log.dir, but left log.dirs empty
>>>>> and
>>>>>>>> got
>>>>>>>>> the following.
>>>>>>>>> bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c
>>>>>>>>> config/server.properties
>>>>>>>>> Exception in thread "main" java.lang.IllegalArgumentException:
>>>>>>>> requirement
>>>>>>>>> failed: At least one log directory must be defined via log.dirs or
>>>>>>>> log.dir.
>>>>>>>>> 
>>>>>>>>> bin/kafka-server-start.sh config/server.properties
>>>>>>>>> [2025-08-01 11:31:20,097] INFO Registered
>>>>>>>>> `kafka:type=kafka.Log4jController` MBean
>>>>>>>>> (kafka.utils.Log4jControllerRegistration$)
>>>>>>>>> [2025-08-01 11:31:20,238] ERROR Exiting Kafka due to fatal
>> exception
>>>>>>>>> (kafka.Kafka$)
>>>>>>>>> java.lang.IllegalArgumentException: requirement failed: At least
>> one
>>>>>> log
>>>>>>>>> directory must be defined via log.dirs or log.dir.
>>>>>>>>> 
>>>>>>>>> JR45. Could you group all configs for WorkerConfig together?
>>>>>>>>> 
>>>>>>>>> JR46. plugin.path: We have the following code. So, currently, it
>>>>> seems
>>>>>>>> that
>>>>>>>>> an empty list is allowed for plugin.path and we treat it by adding
>>>>> the
>>>>>>>>> parent path of the classloader. It would be useful to justify why
>>>>>>>>> disallowing empty is better for the users.
>>>>>>>>> 
>>>>>>>>> public static Set<PluginSource> pluginSources(Set<Path>
>>>>>>>>> pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory
>>>>>>>> factory)
>>>>>>>>> {
>>>>>>>>>     Set<PluginSource> pluginSources = new LinkedHashSet<>();
>>>>>>>>>     for (Path pluginLocation : pluginLocations) {
>>>>>>>>>         try {
>>>>>>>>> 
>> pluginSources.add(isolatedPluginSource(pluginLocation,
>>>>>>>>> classLoader, factory));
>>>>>>>>>         } catch (InvalidPathException | MalformedURLException e)
>> {
>>>>>>>>>             log.error("Invalid path in plugin path: {}.
>> Ignoring.",
>>>>>>>>> pluginLocation, e);
>>>>>>>>>         } catch (IOException e) {
>>>>>>>>>             log.error("Could not get listing for plugin path: {}.
>>>>>>>>> Ignoring.", pluginLocation, e);
>>>>>>>>>         }
>>>>>>>>>     }
>>>>>>>>> 
>>>>>> pluginSources.add(classpathPluginSource(classLoader.getParent()));
>>>>>>>>>     return pluginSources;
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> JR47. typo in "Should not allow setting validStrings is empty"
>>>>>>>>> 
>>>>>>>>> JR48. We should pass in isEmptyAllowed as false in the code below,
>>>>>> right?
>>>>>>>>> public static ValidList in(String... validStrings) {
>>>>>>>>>     if (validStrings.length == 0) {
>>>>>>>>>         throw new IllegalArgumentException("Valid strings list
>>>>> cannot
>>>>>>>>> be empty for in validator");
>>>>>>>>>     }
>>>>>>>>>     return new ValidList(List.of(validStrings), true, false);
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> Jun
>>>>>>>>> 
>>>>>>>>> On Tue, Jul 29, 2025 at 8:53 AM 黃竣陽 <s7133...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>>> Hello Jun,
>>>>>>>>>> 
>>>>>>>>>> Sorry for the late reply,
>>>>>>>>>> 
>>>>>>>>>> JR40, JR41: I have updated the KIP based on your comments. Please
>>>>>> take a
>>>>>>>>>> look.
>>>>>>>>>> 
>>>>>>>>>> Best Regards,
>>>>>>>>>> Jiunn-Yang
>>>>>>>>>> 
>>>>>>>>>>> Jun Rao <j...@confluent.io.INVALID> 於 2025年7月24日 凌晨2:56 寫道:
>>>>>>>>>>> 
>>>>>>>>>>> Hi, Jiunn-Yang,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>> 
>>>>>>>>>>> JR40. In the table, there are a few configs for which we now
>>>>> disallow
>>>>>>>>>> empty
>>>>>>>>>>> lists. It would be useful to document the current behavior for
>>> each
>>>>>> of
>>>>>>>>>> them
>>>>>>>>>>> more accurately. It seems that they should all lead to some bad
>>>>>>>> behavior.
>>>>>>>>>>> For example, if log.dirs is empty, it seems that we can't create
>>>>>>>> topics?
>>>>>>>>>>> If group.coordinator.rebalance.protocols is empty, the group
>>>>>> rebalance
>>>>>>>>>>> protocol will fail?
>>>>>>>>>>> 
>>>>>>>>>>> JR41. It would be useful to clean up the "Compatibility,
>>>>> Deprecation,
>>>>>>>> and
>>>>>>>>>>> Migration Plan" section a bit.
>>>>>>>>>>> JR41.1 The migration plan has "Introduce a new properties
>>>>>>>> isEmptyAllowed
>>>>>>>>>> ,
>>>>>>>>>>> isNullAllowed", which needs to be changed.
>>>>>>>>>>> JR41.2 It started with "For users who configure an empty list",
>>> but
>>>>>> not
>>>>>>>>>> all
>>>>>>>>>>> the bullet points are about empty lists.
>>>>>>>>>>> JR41.3 To me, it would be useful to summarize why the behavior
>>>>>> changes
>>>>>>>>>> are
>>>>>>>>>>> justified. We made 3 types of changes. (1) Disallow null. This
>> is
>>>>> ok
>>>>>>>>>> since
>>>>>>>>>>> one can't really set a null value through a config file. (2)
>>>>> Disallow
>>>>>>>>>>> duplicates. This is ok since it's rare and confusing. (3)
>> Disallow
>>>>>>>> empty.
>>>>>>>>>>> Ideally, we want to claim that for those cases, they all lead to
>>>>> some
>>>>>>>>>> other
>>>>>>>>>>> errors or had behavior currently.
>>>>>>>>>>> 
>>>>>>>>>>> Jun
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, Jul 23, 2025 at 4:42 AM 黃竣陽 <s7133...@gmail.com> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hello Jun,
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>>> 
>>>>>>>>>>>> JR34 ,JR36, JR39: I’ve updated the KIP to reflect these
>> changes.
>>>>>>>>>>>> 
>>>>>>>>>>>> JR38: I clarified the following configurations:
>>>>>>>>>>>> early.start.listeners: This config is optional and allows both
>>>>> null
>>>>>>>> and
>>>>>>>>>>>> empty lists.
>>>>>>>>>>>> log.dirs and plugin.path: These are also optional, but only
>> allow
>>>>>>>>>>>> null—empty lists are not permitted.
>>>>>>>>>>>> 
>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>> Jiunn-Yang
>>>>>>>>>>>> 
>>>>>>>>>>>>> Jun Rao <j...@confluent.io.INVALID> 於 2025年7月23日 凌晨2:28 寫道:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi, Jiunn-Yang and Chia-Ping,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> JR34. Sounds good.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> JR36. Could you document that the default value for
>>>>>>>>>>>>> sasl.oauthbearer.expected.audience and ssl.cipher.suites have
>>>>> been
>>>>>>>>>>>> changed?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> JR38. My concern wasn't quite addressed. "If the configuration
>>> is
>>>>>>>>>>>> optional,
>>>>>>>>>>>>> we will reject any duplicate values in the list." This implies
>>>>> that
>>>>>>>> if
>>>>>>>>>> a
>>>>>>>>>>>>> config is optional, we allow empty lists. However, for
>>>>> plugin.path,
>>>>>>>> we
>>>>>>>>>>>>> reject empty lists.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> JR39. We can remove the fields isEmptyAllowed and
>> isNullAllowed
>>>>>> since
>>>>>>>>>>>> they
>>>>>>>>>>>>> are not public facing.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Jun
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Tue, Jul 22, 2025 at 4:26 AM 黃竣陽 <s7133...@gmail.com>
>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hello Jun, Chia,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks for the reply,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> JR36: I’ve updated the following configurations:
>>>>>>>>>>>>>> - sasl.oauthbearer.expected.audience
>>>>>>>>>>>>>> - ssl.cipher.suites
>>>>>>>>>>>>>> - ssl.enabled.protocols
>>>>>>>>>>>>>> These now allow both empty and non-empty lists. Since the
>>>>> default
>>>>>>>>>> values
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> sasl.oauthbearer.expected.audience and ssl.cipher.suites were
>>>>>>>>>> previously
>>>>>>>>>>>>>> null,
>>>>>>>>>>>>>> I’ve updated their defaults to List.of() for consistency.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> JR37: A ClassCastException does not occur in
>> MirrorClientConfig
>>>>>> and
>>>>>>>> it
>>>>>>>>>>>>>> should be none.
>>>>>>>>>>>>>> I’ve updated this in the KIP.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> JR38 & JR40: I’ve added a note below the table to indicate
>>> which
>>>>>>>>>>>>>> configurations require special attention.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> JR39: Yes, these properties are package-private.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>> Jiunn-Yang
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Chia-Ping Tsai <chia7...@gmail.com> 於 2025年7月22日 下午6:04 寫道:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> JR34. Hmm, it seems that the code in taskConfigs() could
>>> return
>>>>>> an
>>>>>>>>>>>> empty
>>>>>>>>>>>>>>> list for a task if knownSourceTopicPartitions is less than
>>>>>>>> maxTasks,
>>>>>>>>>>>>>>> Chia-Ping?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Yes, `MirrorSourceConnector#taskConfigs` could return an
>> empty
>>>>>>>> list,
>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>> means no `MirrorSourceTask` can be created.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> By contrast, if any `MirrorSourceTask` is created, the
>> configs
>>>>>> used
>>>>>>>>>> by
>>>>>>>>>>>>>>> `MirrorSourceTask` must contain `task.assigned.partitions`
>>>>> with a
>>>>>>>>>>>>>> non-empty
>>>>>>>>>>>>>>> value.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Jun Rao <j...@confluent.io.invalid> 於 2025年7月22日 週二
>> 上午5:14寫道:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi, Jiunn-Yang and Chia-Ping,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> JR34. Hmm, it seems that the code in taskConfigs() could
>>>>> return
>>>>>> an
>>>>>>>>>>>> empty
>>>>>>>>>>>>>>>> list for a task if knownSourceTopicPartitions is less than
>>>>>>>> maxTasks,
>>>>>>>>>>>>>>>> Chia-Ping?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> JR36. Sounds good. Could you update the KIP? Also, should
>> we
>>>>> do
>>>>>>>> the
>>>>>>>>>>>> same
>>>>>>>>>>>>>>>> for ssl.cipher.suites and
>> sasl.oauthbearer.expected.audience?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> JR37. Is it true that MirrorClientConfig.bootstrap.servers
>>>>>> returns
>>>>>>>>>>>>>>>> ClassCastException for null? It seems that null could be
>>>>> casted
>>>>>> to
>>>>>>>>>> any
>>>>>>>>>>>>>>>> class.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> JR38. "If the configuration is optional, we will reject any
>>>>>>>>>> duplicate
>>>>>>>>>>>>>>>> values in the list." Could you clarify this? For example,
>>>>>>>>>> plugin.path
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> optional, but with a default value. We reject duplicates as
>>>>> well
>>>>>>>> as
>>>>>>>>>>>>>> empty
>>>>>>>>>>>>>>>> lists.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> JR39. isEmptyAllowed and isNullAllowed are not public
>> fields,
>>>>>>>> right?
>>>>>>>>>>>>>> They
>>>>>>>>>>>>>>>> are only public in the anyNonDuplicateValues() method.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> JR40. It would be useful to mention that if cleanup.policy
>> is
>>>>>>>> empty
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> remote.storage.enable is true, the local log segments will
>> be
>>>>>>>>>> cleaned
>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>> on log.local.retention.bytes and log.local.retention.ms.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Jun
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Fri, Jul 18, 2025 at 9:52 AM Chia-Ping Tsai <
>>>>>>>> chia7...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> JR34: For these two configurations, setting an empty list
>>>>>> feels
>>>>>>>> a
>>>>>>>>>>>> bit
>>>>>>>>>>>>>>>>>> unintuitive. If an empty list is
>>>>>>>>>>>>>>>>>> provided, the consumer will call the unsubscribe method,
>>>>> which
>>>>>>>>>>>> doesn't
>>>>>>>>>>>>>>>>>> seem appropriate given
>>>>>>>>>>>>>>>>>> the documentation states: "Topic-partitions assigned to
>>> this
>>>>>>>> task
>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> replicate."
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> agreed. Additionally, the source code [0] shows that it is
>>>>> not
>>>>>>>>>>>> intended
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> generate empty tps for the source task
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> [0]
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> https://github.com/apache/kafka/blob/9b542b6ea21e84677a9292f250fc25f8b4162e6f/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L202
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>> 
>>> 
>> 

Reply via email to