chia7712 commented on code in PR #20389:
URL: https://github.com/apache/kafka/pull/20389#discussion_r2301926271


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -784,7 +783,7 @@ public List<Object> validValues(String name, Map<String, 
Object> parsedConfig) {
                 for (PluginDesc<T> plugin : plugins()) {
                     result.add(plugin.pluginClass());
                 }
-                return Collections.unmodifiableList(result);
+                return List.copyOf(result);

Review Comment:
   ```java
               public List<Object> validValues(String name, Map<String, Object> 
parsedConfig) {
                   return plugins().stream().map(p -> (Object) 
p.pluginClass()).toList();
               }
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##########
@@ -253,7 +252,7 @@ public MetricGroupId(String groupName, Map<String, String> 
tags) {
             Objects.requireNonNull(groupName);
             Objects.requireNonNull(tags);
             this.groupName = groupName;
-            this.tags = Collections.unmodifiableMap(new LinkedHashMap<>(tags));
+            this.tags = Map.copyOf(new LinkedHashMap<>(tags));

Review Comment:
   IIRC, `Map.copyOf` does not guarantee insertion order, so we should keep 
using `Collections.unmodifiableMap` instead. By the way, it would be useful to 
add comment to prevent unintended change in the future. 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java:
##########
@@ -58,7 +57,7 @@ protected Collection<Class<?>> regularResources() {
 
     @Override
     protected Collection<Class<?>> adminResources() {
-        return Collections.singletonList(
+        return List.of(

Review Comment:
   ```java
   return List.of(LoggingResource.class);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -137,7 +136,7 @@ public Future<Void> executeFailed(ProcessingContext<T> 
context, Stage stage, Cla
     // Visible for testing
     synchronized Future<Void> report(ProcessingContext<T> context) {
         if (reporters.size() == 1) {
-            return new 
WorkerErrantRecordReporter.ErrantRecordFuture(Collections.singletonList(reporters.iterator().next().report(context)));
+            return new 
WorkerErrantRecordReporter.ErrantRecordFuture(List.of(reporters.iterator().next().report(context)));

Review Comment:
   ```java
               return new 
WorkerErrantRecordReporter.ErrantRecordFuture(List.of(reporters.get(0).report(context)));
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java:
##########
@@ -134,11 +134,11 @@ private Map<String, ByteBuffer> 
fillAssignmentsAndSerialize(Collection<String> m
         for (String member : members) {
             Collection<String> connectors = connectorAssignments.get(member);

Review Comment:
   ```java
               Collection<String> connectors = 
connectorAssignments.getOrDefault(member, List.of());
               Collection<ConnectorTaskId> tasks = 
taskAssignments.getOrDefault(member, List.of());
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##########
@@ -335,12 +333,12 @@ public static List<Path> pluginUrls(Path topPath) throws 
IOException {
 
         if (containsClassFiles) {
             if (archives.isEmpty()) {
-                return Collections.singletonList(topPath);
+                return List.of(topPath);
             }
             log.warn("Plugin path contains both java archives and class files. 
Returning only the"
                     + " archives");
         }
-        return Arrays.asList(archives.toArray(new Path[0]));
+        return List.of(archives.toArray(new Path[0]));

Review Comment:
   ```java
   return List.copyOf(archives);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java:
##########
@@ -395,7 +394,7 @@ public ConnectMetricsRegistry(Set<String> tags) {
         connectorStatusMetrics.put(connectorUnassignedTaskCount, 
TaskStatus.State.UNASSIGNED);
         connectorStatusMetrics.put(connectorDestroyedTaskCount, 
TaskStatus.State.DESTROYED);
         connectorStatusMetrics.put(connectorRestartingTaskCount, 
TaskStatus.State.RESTARTING);
-        connectorStatusMetrics = 
Collections.unmodifiableMap(connectorStatusMetrics);
+        connectorStatusMetrics = Map.copyOf(connectorStatusMetrics);

Review Comment:
   `connectorStatusMetrics` could be initialized with `Map.of`. Also, it could 
be a "final" variable



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java:
##########
@@ -131,11 +131,11 @@ public void resume(TopicPartition... partitions) {
             throw new IllegalWorkerStateException("SinkTaskContext may not be 
used to resume consumption until the task is initialized");
         }
         try {
-            pausedPartitions.removeAll(Arrays.asList(partitions));
+            List.of(partitions).forEach(pausedPartitions::remove);

Review Comment:
   Could you please create a `List<TopicPartition>` at the begining and reuse 
it in the log message? Otherwise, the log message will jst call `toString` on 
the array object



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to