C0urante commented on code in PR #11776:
URL: https://github.com/apache/kafka/pull/11776#discussion_r887332195


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -842,21 +846,138 @@ public void deleteConnectorConfig(final String connName, 
final Callback<Created<
     }
 
     @Override
-    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector 
connector,
-                                                                    ConfigDef 
configDef,
-                                                                    
Map<String, String> config) {
-        Map<String, ConfigValue> validatedConfig = 
super.validateBasicConnectorConfig(connector, configDef, config);
-        if (connector instanceof SinkConnector) {
-            ConfigValue validatedName = 
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
-            String name = (String) validatedName.value();
-            if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
-                validatedName.addErrorMessage("Consumer group for sink 
connector named " + name +
-                        " conflicts with Connect worker group " + 
workerGroupId);
+    protected Map<String, ConfigValue> 
validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, 
Map<String, String> config) {
+        Map<String, ConfigValue> result = 
super.validateSinkConnectorConfig(connector, configDef, config);
+        validateSinkConnectorGroupId(result);
+        return result;
+    }
+
+    @Override
+    protected Map<String, ConfigValue> 
validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef, 
Map<String, String> config) {
+        Map<String, ConfigValue> result = 
super.validateSourceConnectorConfig(connector, configDef, config);
+        validateSourceConnectorExactlyOnceSupport(config, result, connector);
+        validateSourceConnectorTransactionBoundary(config, result, connector);
+        return result;
+    }
+
+
+    private void validateSinkConnectorGroupId(Map<String, ConfigValue> 
validatedConfig) {
+        ConfigValue validatedName = 
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
+        String name = (String) validatedName.value();

Review Comment:
   Yes; every framework-level connector property should have a `ConfigValue` 
entry in the map here.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -842,21 +846,138 @@ public void deleteConnectorConfig(final String connName, 
final Callback<Created<
     }
 
     @Override
-    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector 
connector,
-                                                                    ConfigDef 
configDef,
-                                                                    
Map<String, String> config) {
-        Map<String, ConfigValue> validatedConfig = 
super.validateBasicConnectorConfig(connector, configDef, config);
-        if (connector instanceof SinkConnector) {
-            ConfigValue validatedName = 
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
-            String name = (String) validatedName.value();
-            if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
-                validatedName.addErrorMessage("Consumer group for sink 
connector named " + name +
-                        " conflicts with Connect worker group " + 
workerGroupId);
+    protected Map<String, ConfigValue> 
validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, 
Map<String, String> config) {
+        Map<String, ConfigValue> result = 
super.validateSinkConnectorConfig(connector, configDef, config);
+        validateSinkConnectorGroupId(result);
+        return result;
+    }
+
+    @Override
+    protected Map<String, ConfigValue> 
validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef, 
Map<String, String> config) {
+        Map<String, ConfigValue> result = 
super.validateSourceConnectorConfig(connector, configDef, config);
+        validateSourceConnectorExactlyOnceSupport(config, result, connector);
+        validateSourceConnectorTransactionBoundary(config, result, connector);
+        return result;
+    }
+
+
+    private void validateSinkConnectorGroupId(Map<String, ConfigValue> 
validatedConfig) {
+        ConfigValue validatedName = 
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
+        String name = (String) validatedName.value();
+        if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
+            validatedName.addErrorMessage("Consumer group for sink connector 
named " + name +
+                    " conflicts with Connect worker group " + workerGroupId);
+        }
+    }
+
+    private void validateSourceConnectorExactlyOnceSupport(
+            Map<String, String> rawConfig,
+            Map<String, ConfigValue> validatedConfig,
+            SourceConnector connector) {
+        ConfigValue validatedExactlyOnceSupport = 
validatedConfig.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG);
+        if (validatedExactlyOnceSupport.errorMessages().isEmpty()) {
+            // Should be safe to parse the enum from the user-provided value 
since it's passed validation so far
+            SourceConnectorConfig.ExactlyOnceSupportLevel 
exactlyOnceSupportLevel =
+                    
SourceConnectorConfig.ExactlyOnceSupportLevel.fromProperty(Objects.toString(validatedExactlyOnceSupport.value()));
+            if 
(SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.equals(exactlyOnceSupportLevel))
 {
+                if (!config.exactlyOnceSourceEnabled()) {
+                    validatedExactlyOnceSupport.addErrorMessage("This worker 
does not have exactly-once source support enabled.");
+                }
+
+                try {
+                    ExactlyOnceSupport exactlyOnceSupport = 
connector.exactlyOnceSupport(rawConfig);
+                    if 
(!ExactlyOnceSupport.SUPPORTED.equals(exactlyOnceSupport)) {
+                        final String validationErrorMessage;
+                        // Would do a switch here but that doesn't permit 
matching on null values
+                        if (exactlyOnceSupport == null) {
+                            validationErrorMessage = "The connector does not 
implement the API required for preflight validation of exactly-once "
+                                    + "source support. Please consult the 
documentation for the connector to determine whether it supports exactly-once "
+                                    + "guarantees, and then consider 
reconfiguring the connector to use the value \""
+                                    + 
SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED
+                                    + "\" for this property (which will 
disable this preflight check and allow the connector to be created).";
+                        } else if 
(ExactlyOnceSupport.UNSUPPORTED.equals(exactlyOnceSupport)) {
+                            validationErrorMessage = "The connector does not 
support exactly-once delivery guarantees with the provided configuration.";
+                        } else {
+                            throw new ConnectException("Unexpected value 
returned from SourceConnector::exactlyOnceSupport: " + exactlyOnceSupport);
+                        }
+                        
validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage);
+                    }
+                } catch (Exception e) {
+                    log.error("Failed while validating connector support for 
exactly-once guarantees", e);
+                    String validationErrorMessage = "An unexpected error 
occurred during validation";
+                    String failureMessage = e.getMessage();
+                    if (failureMessage != null && 
!failureMessage.trim().isEmpty()) {
+                        validationErrorMessage += ": " + failureMessage.trim();
+                    } else {
+                        validationErrorMessage += "; please see the worker 
logs for more details.";
+                    }
+                    
validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage);
+                }
             }
         }
-        return validatedConfig;
     }
 
+    private void validateSourceConnectorTransactionBoundary(
+            Map<String, String> rawConfig,
+            Map<String, ConfigValue> validatedConfig,
+            SourceConnector connector) {
+        ConfigValue validatedTransactionBoundary = 
validatedConfig.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG);
+        if (validatedTransactionBoundary.errorMessages().isEmpty()) {
+            // Should be safe to parse the enum from the user-provided value 
since it's passed validation so far
+            SourceTask.TransactionBoundary transactionBoundary =
+                    
SourceTask.TransactionBoundary.fromProperty(Objects.toString(validatedTransactionBoundary.value()));
+            if 
(SourceTask.TransactionBoundary.CONNECTOR.equals(transactionBoundary)) {
+                try {
+                    ConnectorTransactionBoundaries connectorTransactionSupport 
= connector.canDefineTransactionBoundaries(rawConfig);
+                    if (connectorTransactionSupport == null) {
+                        validatedTransactionBoundary.addErrorMessage(
+                                "This connector has returned a null value from 
its canDefineTransactionBoundaries method, which is not permitted. " +
+                                        "The connector will be treated as if 
it cannot define its own transaction boundaries, and cannot be configured with 
" +
+                                        "'" + 
SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG + "' set to '" + 
SourceTask.TransactionBoundary.CONNECTOR + "'."
+                        );
+                    } else if 
(!ConnectorTransactionBoundaries.SUPPORTED.equals(connectorTransactionSupport)) 
{
+                        validatedTransactionBoundary.addErrorMessage(
+                                "The connector does not support 
connector-defined transaction boundaries with the given configuration. "
+                                        + "Please reconfigure it to use a 
different transaction boundary definition.");
+                    }
+                } catch (Exception e) {
+                    log.error("Failed while validating connector support for 
defining its own transaction boundaries", e);
+                    String validationErrorMessage = "An unexpected error 
occurred during validation";
+                    String failureMessage = e.getMessage();
+                    if (failureMessage != null && 
!failureMessage.trim().isEmpty()) {
+                        validationErrorMessage += ": " + failureMessage.trim();
+                    } else {
+                        validationErrorMessage += "; please see the worker 
logs for more details.";
+                    }
+                    
validatedTransactionBoundary.addErrorMessage(validationErrorMessage);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected boolean 
connectorUsesAdmin(org.apache.kafka.connect.health.ConnectorType connectorType, 
Map<String, String> connProps) {
+        if (super.connectorUsesAdmin(connectorType, connProps)) {
+            return true;
+        } else if (connectorType == 
org.apache.kafka.connect.health.ConnectorType.SOURCE) {
+            return config.exactlyOnceSourceEnabled()
+                || 
!connProps.getOrDefault(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, 
"").trim().isEmpty();
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    protected boolean 
connectorUsesConsumer(org.apache.kafka.connect.health.ConnectorType 
connectorType, Map<String, String> connProps) {
+        if (super.connectorUsesConsumer(connectorType, connProps)) {
+            return true;
+        } else if (connectorType == 
org.apache.kafka.connect.health.ConnectorType.SOURCE) {
+            return config.exactlyOnceSourceEnabled()
+                || 
!connProps.getOrDefault(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, 
"").trim().isEmpty();
+        } else {
+            return false;
+        }

Review Comment:
   👍 done.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -790,22 +794,231 @@ public void testCreateConnectorFailedValidation() throws 
Exception {
         PowerMock.verifyAll();
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testConnectorNameConflictsWithWorkerGroupId() {
         Map<String, String> config = new HashMap<>(CONN2_CONFIG);
         config.put(ConnectorConfig.NAME_CONFIG, "test-group");
 
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        SinkConnector connectorMock = 
PowerMock.createMock(SinkConnector.class);
+
+        PowerMock.replayAll(connectorMock);
 
         // CONN2 creation should fail because the worker group id 
(connect-test-group) conflicts with
         // the consumer group id we would use for this sink
-        Map<String, ConfigValue> validatedConfigs =
-            herder.validateBasicConnectorConfig(connectorMock, 
ConnectorConfig.configDef(), config);
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSinkConnectorConfig(
+                connectorMock, SinkConnectorConfig.configDef(), config);
 
         ConfigValue nameConfig = 
validatedConfigs.get(ConnectorConfig.NAME_CONFIG);
-        assertNotNull(nameConfig.errorMessages());
         assertFalse(nameConfig.errorMessages().isEmpty());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testExactlyOnceSourceSupportValidation() {
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+        EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+                .andReturn(ExactlyOnceSupport.SUPPORTED);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        ConfigValue exactlyOnceSupportConfig = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG);
+        assertTrue(exactlyOnceSupportConfig.errorMessages().isEmpty());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testExactlyOnceSourceSupportValidationOnUnsupportedConnector() 
{
+        herder = exactlyOnceHerder();
+        Map<String, String> config = new HashMap<>();
+        config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, 
REQUIRED.toString());
+
+        SourceConnector connectorMock = 
PowerMock.createMock(SourceConnector.class);
+        EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+                .andReturn(ExactlyOnceSupport.UNSUPPORTED);
+
+        PowerMock.replayAll(connectorMock);
+
+        Map<String, ConfigValue> validatedConfigs = 
herder.validateSourceConnectorConfig(
+                connectorMock, SourceConnectorConfig.configDef(), config);
+
+        ConfigValue exactlyOnceSupportConfig = 
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG);
+        assertFalse(exactlyOnceSupportConfig.errorMessages().isEmpty());

Review Comment:
   Ack, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to