Thanks to Jiunn-Yang for this hard KIP.

It might not be perfect, but we can refine it during the PR review process
:)


黃竣陽 <s7133...@gmail.com> 於 2025年8月6日 週三 下午7:32寫道:

> Hello chia,
>
> Thanks for the reply.
>
> chia05: I have updated the KIP to change the default values of
> task.assigned.groups
> and task.assigned.partitions from null to NO_DEFAULT_VALUE.
>
> chia06: I have also updated the KIP based on your feedback.
>
> Best Regards,
> Jiunn-Yang
>
> > Chia-Ping Tsai <chia7...@gmail.com> 於 2025年8月6日 凌晨12:54 寫道:
> >
> > hi Jiunn-Yang
> >
> > chia05:
> >
> > `task.assigned.groups` and `task.assigned.partitions` should not have
> > default value, right?
> >
> > chia06:
> >
> > `new HashSet<>(values).size()` -> `Set.copyOf(values).size()`
> >
> > Best,
> > Chia-Ping
> >
> > Jun Rao <j...@confluent.io.invalid> 於 2025年8月5日 週二 下午11:39寫道:
> >
> >> Hi, Jiunn-Yang,
> >>
> >> Thanks for the reply and your patience. The KIP looks good to me now.
> >>
> >> Hi, Luke, Mickael,
> >>
> >> Could you take a look at the KIP again and see if it makes sense to you?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Tue, Aug 5, 2025 at 3:04 AM 黃竣陽 <s7133...@gmail.com> wrote:
> >>
> >>> Hello Jun,
> >>>
> >>> Thanks for the reply,
> >>>
> >>> JR46: I have clarified the behavior for null and empty values, so users
> >>> can
> >>> understand the reasoning behind disallowing empty lists.
> >>>
> >>> JR48: I've updated the KIP accordingly.
> >>>
> >>> Best Regards,
> >>> Jiunn-Yang
> >>>
> >>>> Jun Rao <j...@confluent.io.INVALID> 於 2025年8月5日 凌晨12:45 寫道:
> >>>>
> >>>> Hi, Jiunn-Yang,
> >>>>
> >>>> Thanks for the reply.
> >>>>
> >>>> JR46. "Currently, if plugin.path is set to an empty list, no
> >>> user-specified
> >>>> plugin locations are processed, but plugins from the parent
> classloader
> >>> are
> >>>> still loaded." Could you add why the current behavior is bad for the
> >>> users?
> >>>>
> >>>> JR48. If we want to preserve the current behavior, we should remove
> the
> >>>> following validation, right?
> >>>>       if (validStrings.length == 0) {
> >>>>           throw new IllegalArgumentException("Valid strings list
> >> cannot
> >>>> be empty for in validator");
> >>>>
> >>>> Jun
> >>>>
> >>>> On Fri, Aug 1, 2025 at 6:47 PM 黃竣陽 <s7133...@gmail.com> wrote:
> >>>>
> >>>>> Hello Jun,
> >>>>>
> >>>>> Thanks for the reply.
> >>>>>
> >>>>> JR42, JR45, JR46, JR47: I have updated the KIP.
> >>>>>
> >>>>> JR43: If bootstrap.servers is empty for these configurations,
> >>>>> it will fail when creating the AdminClient. I have updated the KIP to
> >>>>> reflect this behavior.
> >>>>>
> >>>>> JR44: Thanks for pointing this out. Kafka will indeed fail at the
> >>> storage
> >>>>> format phase
> >>>>> if the configuration is invalid. I have updated the KIP accordingly.
> >>>>>
> >>>>> JR48: The current in() method allows empty lists by default, so I
> >>> believe
> >>>>> setting isEmptyAllowed to true
> >>>>> better preserves the original behavior. Therefore, I prefer not to
> >>> change
> >>>>> this default.
> >>>>>
> >>>>> Best Regards,
> >>>>> Jiunn-Yang
> >>>>>
> >>>>>> Jun Rao <j...@confluent.io.INVALID> 於 2025年8月2日 凌晨2:38 寫道:
> >>>>>>
> >>>>>> Hi, Jiunn-Yang,
> >>>>>>
> >>>>>> Thanks for the reply. A few more comments.
> >>>>>>
> >>>>>> JR42. [6] [7] are not needed since currently an empty list already
> >>> causes
> >>>>>> an exception.
> >>>>>>
> >>>>>> JR43. Could you document the current behavior when bootstrap.servers
> >> is
> >>>>>> empty in MirrorClientConfig and WorkerConfig?
> >>>>>>
> >>>>>> JR44. [12] seems incorrect. I set log.dir, but left log.dirs empty
> >> and
> >>>>> got
> >>>>>> the following.
> >>>>>> bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c
> >>>>>> config/server.properties
> >>>>>> Exception in thread "main" java.lang.IllegalArgumentException:
> >>>>> requirement
> >>>>>> failed: At least one log directory must be defined via log.dirs or
> >>>>> log.dir.
> >>>>>>
> >>>>>> bin/kafka-server-start.sh config/server.properties
> >>>>>> [2025-08-01 11:31:20,097] INFO Registered
> >>>>>> `kafka:type=kafka.Log4jController` MBean
> >>>>>> (kafka.utils.Log4jControllerRegistration$)
> >>>>>> [2025-08-01 11:31:20,238] ERROR Exiting Kafka due to fatal exception
> >>>>>> (kafka.Kafka$)
> >>>>>> java.lang.IllegalArgumentException: requirement failed: At least one
> >>> log
> >>>>>> directory must be defined via log.dirs or log.dir.
> >>>>>>
> >>>>>> JR45. Could you group all configs for WorkerConfig together?
> >>>>>>
> >>>>>> JR46. plugin.path: We have the following code. So, currently, it
> >> seems
> >>>>> that
> >>>>>> an empty list is allowed for plugin.path and we treat it by adding
> >> the
> >>>>>> parent path of the classloader. It would be useful to justify why
> >>>>>> disallowing empty is better for the users.
> >>>>>>
> >>>>>>  public static Set<PluginSource> pluginSources(Set<Path>
> >>>>>> pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory
> >>>>> factory)
> >>>>>> {
> >>>>>>      Set<PluginSource> pluginSources = new LinkedHashSet<>();
> >>>>>>      for (Path pluginLocation : pluginLocations) {
> >>>>>>          try {
> >>>>>>              pluginSources.add(isolatedPluginSource(pluginLocation,
> >>>>>> classLoader, factory));
> >>>>>>          } catch (InvalidPathException | MalformedURLException e) {
> >>>>>>              log.error("Invalid path in plugin path: {}. Ignoring.",
> >>>>>> pluginLocation, e);
> >>>>>>          } catch (IOException e) {
> >>>>>>              log.error("Could not get listing for plugin path: {}.
> >>>>>> Ignoring.", pluginLocation, e);
> >>>>>>          }
> >>>>>>      }
> >>>>>>
> >>> pluginSources.add(classpathPluginSource(classLoader.getParent()));
> >>>>>>      return pluginSources;
> >>>>>>  }
> >>>>>>
> >>>>>> JR47. typo in "Should not allow setting validStrings is empty"
> >>>>>>
> >>>>>> JR48. We should pass in isEmptyAllowed as false in the code below,
> >>> right?
> >>>>>>  public static ValidList in(String... validStrings) {
> >>>>>>      if (validStrings.length == 0) {
> >>>>>>          throw new IllegalArgumentException("Valid strings list
> >> cannot
> >>>>>> be empty for in validator");
> >>>>>>      }
> >>>>>>      return new ValidList(List.of(validStrings), true, false);
> >>>>>>  }
> >>>>>>
> >>>>>> Jun
> >>>>>>
> >>>>>> On Tue, Jul 29, 2025 at 8:53 AM 黃竣陽 <s7133...@gmail.com> wrote:
> >>>>>>
> >>>>>>> Hello Jun,
> >>>>>>>
> >>>>>>> Sorry for the late reply,
> >>>>>>>
> >>>>>>> JR40, JR41: I have updated the KIP based on your comments. Please
> >>> take a
> >>>>>>> look.
> >>>>>>>
> >>>>>>> Best Regards,
> >>>>>>> Jiunn-Yang
> >>>>>>>
> >>>>>>>> Jun Rao <j...@confluent.io.INVALID> 於 2025年7月24日 凌晨2:56 寫道:
> >>>>>>>>
> >>>>>>>> Hi, Jiunn-Yang,
> >>>>>>>>
> >>>>>>>> Thanks for the reply.
> >>>>>>>>
> >>>>>>>> JR40. In the table, there are a few configs for which we now
> >> disallow
> >>>>>>> empty
> >>>>>>>> lists. It would be useful to document the current behavior for
> each
> >>> of
> >>>>>>> them
> >>>>>>>> more accurately. It seems that they should all lead to some bad
> >>>>> behavior.
> >>>>>>>> For example, if log.dirs is empty, it seems that we can't create
> >>>>> topics?
> >>>>>>>> If group.coordinator.rebalance.protocols is empty, the group
> >>> rebalance
> >>>>>>>> protocol will fail?
> >>>>>>>>
> >>>>>>>> JR41. It would be useful to clean up the "Compatibility,
> >> Deprecation,
> >>>>> and
> >>>>>>>> Migration Plan" section a bit.
> >>>>>>>> JR41.1 The migration plan has "Introduce a new properties
> >>>>> isEmptyAllowed
> >>>>>>> ,
> >>>>>>>> isNullAllowed", which needs to be changed.
> >>>>>>>> JR41.2 It started with "For users who configure an empty list",
> but
> >>> not
> >>>>>>> all
> >>>>>>>> the bullet points are about empty lists.
> >>>>>>>> JR41.3 To me, it would be useful to summarize why the behavior
> >>> changes
> >>>>>>> are
> >>>>>>>> justified. We made 3 types of changes. (1) Disallow null. This is
> >> ok
> >>>>>>> since
> >>>>>>>> one can't really set a null value through a config file. (2)
> >> Disallow
> >>>>>>>> duplicates. This is ok since it's rare and confusing. (3) Disallow
> >>>>> empty.
> >>>>>>>> Ideally, we want to claim that for those cases, they all lead to
> >> some
> >>>>>>> other
> >>>>>>>> errors or had behavior currently.
> >>>>>>>>
> >>>>>>>> Jun
> >>>>>>>>
> >>>>>>>> On Wed, Jul 23, 2025 at 4:42 AM 黃竣陽 <s7133...@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>>> Hello Jun,
> >>>>>>>>>
> >>>>>>>>> Thanks for the reply.
> >>>>>>>>>
> >>>>>>>>> JR34 ,JR36, JR39: I’ve updated the KIP to reflect these changes.
> >>>>>>>>>
> >>>>>>>>> JR38: I clarified the following configurations:
> >>>>>>>>> early.start.listeners: This config is optional and allows both
> >> null
> >>>>> and
> >>>>>>>>> empty lists.
> >>>>>>>>> log.dirs and plugin.path: These are also optional, but only allow
> >>>>>>>>> null—empty lists are not permitted.
> >>>>>>>>>
> >>>>>>>>> Best Regards,
> >>>>>>>>> Jiunn-Yang
> >>>>>>>>>
> >>>>>>>>>> Jun Rao <j...@confluent.io.INVALID> 於 2025年7月23日 凌晨2:28 寫道:
> >>>>>>>>>>
> >>>>>>>>>> Hi, Jiunn-Yang and Chia-Ping,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>
> >>>>>>>>>> JR34. Sounds good.
> >>>>>>>>>>
> >>>>>>>>>> JR36. Could you document that the default value for
> >>>>>>>>>> sasl.oauthbearer.expected.audience and ssl.cipher.suites have
> >> been
> >>>>>>>>> changed?
> >>>>>>>>>>
> >>>>>>>>>> JR38. My concern wasn't quite addressed. "If the configuration
> is
> >>>>>>>>> optional,
> >>>>>>>>>> we will reject any duplicate values in the list." This implies
> >> that
> >>>>> if
> >>>>>>> a
> >>>>>>>>>> config is optional, we allow empty lists. However, for
> >> plugin.path,
> >>>>> we
> >>>>>>>>>> reject empty lists.
> >>>>>>>>>>
> >>>>>>>>>> JR39. We can remove the fields isEmptyAllowed and isNullAllowed
> >>> since
> >>>>>>>>> they
> >>>>>>>>>> are not public facing.
> >>>>>>>>>>
> >>>>>>>>>> Jun
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Jul 22, 2025 at 4:26 AM 黃竣陽 <s7133...@gmail.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hello Jun, Chia,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the reply,
> >>>>>>>>>>>
> >>>>>>>>>>> JR36: I’ve updated the following configurations:
> >>>>>>>>>>> - sasl.oauthbearer.expected.audience
> >>>>>>>>>>> - ssl.cipher.suites
> >>>>>>>>>>> - ssl.enabled.protocols
> >>>>>>>>>>> These now allow both empty and non-empty lists. Since the
> >> default
> >>>>>>> values
> >>>>>>>>>>> of
> >>>>>>>>>>> sasl.oauthbearer.expected.audience and ssl.cipher.suites were
> >>>>>>> previously
> >>>>>>>>>>> null,
> >>>>>>>>>>> I’ve updated their defaults to List.of() for consistency.
> >>>>>>>>>>>
> >>>>>>>>>>> JR37: A ClassCastException does not occur in MirrorClientConfig
> >>> and
> >>>>> it
> >>>>>>>>>>> should be none.
> >>>>>>>>>>> I’ve updated this in the KIP.
> >>>>>>>>>>>
> >>>>>>>>>>> JR38 & JR40: I’ve added a note below the table to indicate
> which
> >>>>>>>>>>> configurations require special attention.
> >>>>>>>>>>>
> >>>>>>>>>>> JR39: Yes, these properties are package-private.
> >>>>>>>>>>>
> >>>>>>>>>>> Best Regards,
> >>>>>>>>>>> Jiunn-Yang
> >>>>>>>>>>>
> >>>>>>>>>>>> Chia-Ping Tsai <chia7...@gmail.com> 於 2025年7月22日 下午6:04 寫道:
> >>>>>>>>>>>>
> >>>>>>>>>>>> JR34. Hmm, it seems that the code in taskConfigs() could
> return
> >>> an
> >>>>>>>>> empty
> >>>>>>>>>>>> list for a task if knownSourceTopicPartitions is less than
> >>>>> maxTasks,
> >>>>>>>>>>>> Chia-Ping?
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Yes, `MirrorSourceConnector#taskConfigs` could return an empty
> >>>>> list,
> >>>>>>>>>>> which
> >>>>>>>>>>>> means no `MirrorSourceTask` can be created.
> >>>>>>>>>>>>
> >>>>>>>>>>>> By contrast, if any `MirrorSourceTask` is created, the configs
> >>> used
> >>>>>>> by
> >>>>>>>>>>>> `MirrorSourceTask` must contain `task.assigned.partitions`
> >> with a
> >>>>>>>>>>> non-empty
> >>>>>>>>>>>> value.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Jun Rao <j...@confluent.io.invalid> 於 2025年7月22日 週二 上午5:14寫道:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi, Jiunn-Yang and Chia-Ping,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> JR34. Hmm, it seems that the code in taskConfigs() could
> >> return
> >>> an
> >>>>>>>>> empty
> >>>>>>>>>>>>> list for a task if knownSourceTopicPartitions is less than
> >>>>> maxTasks,
> >>>>>>>>>>>>> Chia-Ping?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> JR36. Sounds good. Could you update the KIP? Also, should we
> >> do
> >>>>> the
> >>>>>>>>> same
> >>>>>>>>>>>>> for ssl.cipher.suites and sasl.oauthbearer.expected.audience?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> JR37. Is it true that MirrorClientConfig.bootstrap.servers
> >>> returns
> >>>>>>>>>>>>> ClassCastException for null? It seems that null could be
> >> casted
> >>> to
> >>>>>>> any
> >>>>>>>>>>>>> class.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> JR38. "If the configuration is optional, we will reject any
> >>>>>>> duplicate
> >>>>>>>>>>>>> values in the list." Could you clarify this? For example,
> >>>>>>> plugin.path
> >>>>>>>>> is
> >>>>>>>>>>>>> optional, but with a default value. We reject duplicates as
> >> well
> >>>>> as
> >>>>>>>>>>> empty
> >>>>>>>>>>>>> lists.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> JR39. isEmptyAllowed and isNullAllowed are not public fields,
> >>>>> right?
> >>>>>>>>>>> They
> >>>>>>>>>>>>> are only public in the anyNonDuplicateValues() method.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> JR40. It would be useful to mention that if cleanup.policy is
> >>>>> empty
> >>>>>>>>> and
> >>>>>>>>>>>>> remote.storage.enable is true, the local log segments will be
> >>>>>>> cleaned
> >>>>>>>>>>> based
> >>>>>>>>>>>>> on log.local.retention.bytes and log.local.retention.ms.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, Jul 18, 2025 at 9:52 AM Chia-Ping Tsai <
> >>>>> chia7...@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> JR34: For these two configurations, setting an empty list
> >>> feels
> >>>>> a
> >>>>>>>>> bit
> >>>>>>>>>>>>>>> unintuitive. If an empty list is
> >>>>>>>>>>>>>>> provided, the consumer will call the unsubscribe method,
> >> which
> >>>>>>>>> doesn't
> >>>>>>>>>>>>>>> seem appropriate given
> >>>>>>>>>>>>>>> the documentation states: "Topic-partitions assigned to
> this
> >>>>> task
> >>>>>>> to
> >>>>>>>>>>>>>>> replicate."
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> agreed. Additionally, the source code [0] shows that it is
> >> not
> >>>>>>>>> intended
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>> generate empty tps for the source task
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> [0]
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> >>
> https://github.com/apache/kafka/blob/9b542b6ea21e84677a9292f250fc25f8b4162e6f/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L202
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
> >>
>
>

Reply via email to