Copilot commented on code in PR #390:
URL: https://github.com/apache/doris-thirdparty/pull/390#discussion_r3238861378
##########
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java:
##########
@@ -112,15 +111,16 @@ protected void setup(Binder binder)
newMapBinder(binder, String.class,
SchemaParser.class).addBinding("AVRO").to(AvroSchemaParser.class).in(Scopes.SINGLETON);
newMapBinder(binder, String.class,
SchemaParser.class).addBinding("PROTOBUF").to(LazyLoadedProtobufSchemaParser.class).in(Scopes.SINGLETON);
- // Bind the appropriate ConfluentSchemaRegistryAuth implementation
based on configuration
- ConfluentSchemaRegistryConfig schemaRegistryConfig =
buildConfigObject(ConfluentSchemaRegistryConfig.class);
- if (schemaRegistryConfig.getConfluentSchemaRegistryAuthType() ==
BASIC_AUTH) {
- configBinder(binder).bindConfig(BasicAuthConfig.class);
-
binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryBasicAuth.class).in(Scopes.SINGLETON);
- }
- else {
-
binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryNoAuth.class).in(Scopes.SINGLETON);
- }
+ configBinder(binder).bindConfig(BasicAuthConfig.class);
+ install(conditionalModule(
+ ConfluentSchemaRegistryConfig.class,
+ schemaRegistryConfig ->
schemaRegistryConfig.getConfluentSchemaRegistryAuthType() == BASIC_AUTH,
+ authBinder ->
authBinder.bind(SchemaRegistryClientPropertiesProvider.class)
+ .to(ConfluentSchemaRegistryBasicAuth.class)
+ .in(Scopes.SINGLETON),
+ authBinder ->
authBinder.bind(SchemaRegistryClientPropertiesProvider.class)
Review Comment:
Binding `BasicAuthConfig` unconditionally changes runtime behavior: if
`BasicAuthConfig` has any validation constraints (e.g., required
username/password), the connector could fail to start even when
`kafka.confluent-schema-registry-auth-type=NONE`. To preserve the prior
behavior, bind `BasicAuthConfig` only in the BASIC_AUTH branch (or make its
validation conditional on `ConfluentSchemaRegistryAuthType`).
##########
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java:
##########
@@ -135,23 +135,42 @@ public static SchemaRegistryClient
createSchemaRegistryClient(
requireNonNull(propertiesProviders, "propertiesProviders is null");
List<String> baseUrl =
confluentConfig.getConfluentSchemaRegistryUrls().stream()
- .map(HostAddress::getHostText)
.collect(toImmutableList());
- Map<String, ?> schemaRegistryClientProperties =
propertiesProviders.stream()
-
.map(SchemaRegistryClientPropertiesProvider::getSchemaRegistryClientProperties)
- .flatMap(properties -> properties.entrySet().stream())
- .collect(toImmutableMap(Map.Entry::getKey,
Map.Entry::getValue));
-
return new ClassLoaderSafeSchemaRegistryClient(
new CachedSchemaRegistryClient(
baseUrl,
confluentConfig.getConfluentSchemaRegistryClientCacheSize(),
ImmutableList.copyOf(schemaProviders),
- schemaRegistryClientProperties),
+ buildSchemaRegistryClientProperties(confluentConfig,
propertiesProviders)),
classLoader);
}
+ static Map<String, Object> buildSchemaRegistryClientProperties(
+ ConfluentSchemaRegistryConfig confluentConfig,
+ Set<SchemaRegistryClientPropertiesProvider> propertiesProviders)
+ {
+ Map<String, Object> schemaRegistryClientProperties =
propertiesProviders.stream()
+
.map(SchemaRegistryClientPropertiesProvider::getSchemaRegistryClientProperties)
+ .flatMap(properties -> properties.entrySet().stream())
+ .collect(java.util.stream.Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> (Object) entry.getValue(),
+ (left, right) -> right,
+ HashMap::new));
Review Comment:
This changes duplicate-key behavior from 'fail fast' (`toImmutableMap`
throws on duplicate keys) to 'last write wins' using a merge function. Because
`propertiesProviders` is a `Set`, iteration order is not guaranteed, so which
value wins can be nondeterministic and lead to flaky behavior across JVMs/runs.
Prefer either (a) keeping the fail-fast behavior for duplicate keys, or (b)
defining deterministic precedence (e.g., sort providers by a stable key, or use
an ordered collection with documented override rules).
##########
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java:
##########
@@ -137,17 +134,20 @@ public ConfluentSchemaRegistryConfig
setConfluentSchemaRegistrySubjectMapping(St
return this;
}
- private static ImmutableSet<HostAddress> parseNodes(String nodes)
+ private static List<String> parseNodes(String nodes)
{
Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults();
return stream(splitter.split(nodes))
- .map(ConfluentSchemaRegistryConfig::toHostAddress)
- .collect(toImmutableSet());
+ .map(ConfluentSchemaRegistryConfig::normalizeUrl)
Review Comment:
Switching from `Set<HostAddress>` to `List<String>` changes semantics:
duplicates are no longer removed. If the config string contains repeated
endpoints (or the same endpoint with/without scheme), the client may attempt
redundant endpoints and produce confusing failover behavior. Consider
de-duplicating while preserving order (e.g., apply `distinct()` after
`normalizeUrl`) to keep behavior closer to the previous `Set` implementation.
##########
plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java:
##########
@@ -38,7 +40,8 @@ public void testDefaults()
.setConfluentSchemaRegistryAuthType(NONE)
.setConfluentSchemaRegistryClientCacheSize(1000)
.setEmptyFieldStrategy(IGNORE)
- .setConfluentSubjectsCacheRefreshInterval(new Duration(1,
SECONDS)));
+ .setConfluentSubjectsCacheRefreshInterval(new Duration(1,
SECONDS))
+ .setConfluentSchemaRegistrySubjectMapping(null));
Review Comment:
Using `null` for the subject-mapping default in the defaults test makes the
intended default ambiguous (is the default 'unset', or 'empty mapping'?). If
the production default is an empty mapping, the test will be clearer and more
robust if it asserts an empty mapping explicitly (e.g., by not calling the
setter at all for this field, or by using an empty string if the setter is
string-backed) rather than passing `null`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]