tombentley commented on code in PR #11776:
URL: https://github.com/apache/kafka/pull/11776#discussion_r886976256
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -842,21 +846,138 @@ public void deleteConnectorConfig(final String connName,
final Callback<Created<
}
@Override
- protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector
connector,
- ConfigDef
configDef,
-
Map<String, String> config) {
- Map<String, ConfigValue> validatedConfig =
super.validateBasicConnectorConfig(connector, configDef, config);
- if (connector instanceof SinkConnector) {
- ConfigValue validatedName =
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
- String name = (String) validatedName.value();
- if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
- validatedName.addErrorMessage("Consumer group for sink
connector named " + name +
- " conflicts with Connect worker group " +
workerGroupId);
+ protected Map<String, ConfigValue>
validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef,
Map<String, String> config) {
+ Map<String, ConfigValue> result =
super.validateSinkConnectorConfig(connector, configDef, config);
+ validateSinkConnectorGroupId(result);
+ return result;
+ }
+
+ @Override
+ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef,
Map<String, String> config) {
+ Map<String, ConfigValue> result =
super.validateSourceConnectorConfig(connector, configDef, config);
+ validateSourceConnectorExactlyOnceSupport(config, result, connector);
+ validateSourceConnectorTransactionBoundary(config, result, connector);
+ return result;
+ }
+
+
+ private void validateSinkConnectorGroupId(Map<String, ConfigValue>
validatedConfig) {
+ ConfigValue validatedName =
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
+ String name = (String) validatedName.value();
Review Comment:
Is it safe to assume `validatedName != null` at this point?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -842,21 +846,138 @@ public void deleteConnectorConfig(final String connName,
final Callback<Created<
}
@Override
- protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector
connector,
- ConfigDef
configDef,
-
Map<String, String> config) {
- Map<String, ConfigValue> validatedConfig =
super.validateBasicConnectorConfig(connector, configDef, config);
- if (connector instanceof SinkConnector) {
- ConfigValue validatedName =
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
- String name = (String) validatedName.value();
- if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
- validatedName.addErrorMessage("Consumer group for sink
connector named " + name +
- " conflicts with Connect worker group " +
workerGroupId);
+ protected Map<String, ConfigValue>
validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef,
Map<String, String> config) {
+ Map<String, ConfigValue> result =
super.validateSinkConnectorConfig(connector, configDef, config);
+ validateSinkConnectorGroupId(result);
+ return result;
+ }
+
+ @Override
+ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef,
Map<String, String> config) {
+ Map<String, ConfigValue> result =
super.validateSourceConnectorConfig(connector, configDef, config);
+ validateSourceConnectorExactlyOnceSupport(config, result, connector);
+ validateSourceConnectorTransactionBoundary(config, result, connector);
+ return result;
+ }
+
+
+ private void validateSinkConnectorGroupId(Map<String, ConfigValue>
validatedConfig) {
+ ConfigValue validatedName =
validatedConfig.get(ConnectorConfig.NAME_CONFIG);
+ String name = (String) validatedName.value();
+ if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
+ validatedName.addErrorMessage("Consumer group for sink connector
named " + name +
+ " conflicts with Connect worker group " + workerGroupId);
+ }
+ }
+
+ private void validateSourceConnectorExactlyOnceSupport(
+ Map<String, String> rawConfig,
+ Map<String, ConfigValue> validatedConfig,
+ SourceConnector connector) {
+ ConfigValue validatedExactlyOnceSupport =
validatedConfig.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG);
+ if (validatedExactlyOnceSupport.errorMessages().isEmpty()) {
+ // Should be safe to parse the enum from the user-provided value
since it's passed validation so far
+ SourceConnectorConfig.ExactlyOnceSupportLevel
exactlyOnceSupportLevel =
+
SourceConnectorConfig.ExactlyOnceSupportLevel.fromProperty(Objects.toString(validatedExactlyOnceSupport.value()));
+ if
(SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.equals(exactlyOnceSupportLevel))
{
+ if (!config.exactlyOnceSourceEnabled()) {
+ validatedExactlyOnceSupport.addErrorMessage("This worker
does not have exactly-once source support enabled.");
+ }
+
+ try {
+ ExactlyOnceSupport exactlyOnceSupport =
connector.exactlyOnceSupport(rawConfig);
+ if
(!ExactlyOnceSupport.SUPPORTED.equals(exactlyOnceSupport)) {
+ final String validationErrorMessage;
+ // Would do a switch here but that doesn't permit
matching on null values
+ if (exactlyOnceSupport == null) {
+ validationErrorMessage = "The connector does not
implement the API required for preflight validation of exactly-once "
+ + "source support. Please consult the
documentation for the connector to determine whether it supports exactly-once "
+ + "guarantees, and then consider
reconfiguring the connector to use the value \""
+ +
SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED
+ + "\" for this property (which will
disable this preflight check and allow the connector to be created).";
+ } else if
(ExactlyOnceSupport.UNSUPPORTED.equals(exactlyOnceSupport)) {
+ validationErrorMessage = "The connector does not
support exactly-once delivery guarantees with the provided configuration.";
+ } else {
+ throw new ConnectException("Unexpected value
returned from SourceConnector::exactlyOnceSupport: " + exactlyOnceSupport);
+ }
+
validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage);
+ }
+ } catch (Exception e) {
+ log.error("Failed while validating connector support for
exactly-once guarantees", e);
+ String validationErrorMessage = "An unexpected error
occurred during validation";
+ String failureMessage = e.getMessage();
+ if (failureMessage != null &&
!failureMessage.trim().isEmpty()) {
+ validationErrorMessage += ": " + failureMessage.trim();
+ } else {
+ validationErrorMessage += "; please see the worker
logs for more details.";
+ }
+
validatedExactlyOnceSupport.addErrorMessage(validationErrorMessage);
+ }
}
}
- return validatedConfig;
}
+ private void validateSourceConnectorTransactionBoundary(
+ Map<String, String> rawConfig,
+ Map<String, ConfigValue> validatedConfig,
+ SourceConnector connector) {
+ ConfigValue validatedTransactionBoundary =
validatedConfig.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG);
+ if (validatedTransactionBoundary.errorMessages().isEmpty()) {
+ // Should be safe to parse the enum from the user-provided value
since it's passed validation so far
+ SourceTask.TransactionBoundary transactionBoundary =
+
SourceTask.TransactionBoundary.fromProperty(Objects.toString(validatedTransactionBoundary.value()));
+ if
(SourceTask.TransactionBoundary.CONNECTOR.equals(transactionBoundary)) {
+ try {
+ ConnectorTransactionBoundaries connectorTransactionSupport
= connector.canDefineTransactionBoundaries(rawConfig);
+ if (connectorTransactionSupport == null) {
+ validatedTransactionBoundary.addErrorMessage(
+ "This connector has returned a null value from
its canDefineTransactionBoundaries method, which is not permitted. " +
+ "The connector will be treated as if
it cannot define its own transaction boundaries, and cannot be configured with
" +
+ "'" +
SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG + "' set to '" +
SourceTask.TransactionBoundary.CONNECTOR + "'."
+ );
+ } else if
(!ConnectorTransactionBoundaries.SUPPORTED.equals(connectorTransactionSupport))
{
+ validatedTransactionBoundary.addErrorMessage(
+ "The connector does not support
connector-defined transaction boundaries with the given configuration. "
+ + "Please reconfigure it to use a
different transaction boundary definition.");
+ }
+ } catch (Exception e) {
+ log.error("Failed while validating connector support for
defining its own transaction boundaries", e);
+ String validationErrorMessage = "An unexpected error
occurred during validation";
+ String failureMessage = e.getMessage();
+ if (failureMessage != null &&
!failureMessage.trim().isEmpty()) {
+ validationErrorMessage += ": " + failureMessage.trim();
+ } else {
+ validationErrorMessage += "; please see the worker
logs for more details.";
+ }
+
validatedTransactionBoundary.addErrorMessage(validationErrorMessage);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected boolean
connectorUsesAdmin(org.apache.kafka.connect.health.ConnectorType connectorType,
Map<String, String> connProps) {
+ if (super.connectorUsesAdmin(connectorType, connProps)) {
+ return true;
+ } else if (connectorType ==
org.apache.kafka.connect.health.ConnectorType.SOURCE) {
+ return config.exactlyOnceSourceEnabled()
+ ||
!connProps.getOrDefault(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG,
"").trim().isEmpty();
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ protected boolean
connectorUsesConsumer(org.apache.kafka.connect.health.ConnectorType
connectorType, Map<String, String> connProps) {
+ if (super.connectorUsesConsumer(connectorType, connProps)) {
+ return true;
+ } else if (connectorType ==
org.apache.kafka.connect.health.ConnectorType.SOURCE) {
+ return config.exactlyOnceSourceEnabled()
+ ||
!connProps.getOrDefault(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG,
"").trim().isEmpty();
+ } else {
+ return false;
+ }
Review Comment:
Factor out this `else if/else` logic into a method, since it's the same
between the `connectorUsesConsumer` and `connectorUsesAdmin` methods?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -790,22 +794,231 @@ public void testCreateConnectorFailedValidation() throws
Exception {
PowerMock.verifyAll();
}
- @SuppressWarnings("unchecked")
@Test
public void testConnectorNameConflictsWithWorkerGroupId() {
Map<String, String> config = new HashMap<>(CONN2_CONFIG);
config.put(ConnectorConfig.NAME_CONFIG, "test-group");
- Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+ SinkConnector connectorMock =
PowerMock.createMock(SinkConnector.class);
+
+ PowerMock.replayAll(connectorMock);
// CONN2 creation should fail because the worker group id
(connect-test-group) conflicts with
// the consumer group id we would use for this sink
- Map<String, ConfigValue> validatedConfigs =
- herder.validateBasicConnectorConfig(connectorMock,
ConnectorConfig.configDef(), config);
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSinkConnectorConfig(
+ connectorMock, SinkConnectorConfig.configDef(), config);
ConfigValue nameConfig =
validatedConfigs.get(ConnectorConfig.NAME_CONFIG);
- assertNotNull(nameConfig.errorMessages());
assertFalse(nameConfig.errorMessages().isEmpty());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testExactlyOnceSourceSupportValidation() {
+ herder = exactlyOnceHerder();
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG,
REQUIRED.toString());
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+ EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+ .andReturn(ExactlyOnceSupport.SUPPORTED);
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ ConfigValue exactlyOnceSupportConfig =
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG);
+ assertTrue(exactlyOnceSupportConfig.errorMessages().isEmpty());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testExactlyOnceSourceSupportValidationOnUnsupportedConnector()
{
+ herder = exactlyOnceHerder();
+ Map<String, String> config = new HashMap<>();
+ config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG,
REQUIRED.toString());
+
+ SourceConnector connectorMock =
PowerMock.createMock(SourceConnector.class);
+ EasyMock.expect(connectorMock.exactlyOnceSupport(EasyMock.eq(config)))
+ .andReturn(ExactlyOnceSupport.UNSUPPORTED);
+
+ PowerMock.replayAll(connectorMock);
+
+ Map<String, ConfigValue> validatedConfigs =
herder.validateSourceConnectorConfig(
+ connectorMock, SourceConnectorConfig.configDef(), config);
+
+ ConfigValue exactlyOnceSupportConfig =
validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG);
+ assertFalse(exactlyOnceSupportConfig.errorMessages().isEmpty());
Review Comment:
Could we assert on error message equality (or `contains` if there can be
more than one)? As well was making the tests clearer it would make it clearer
that we're actually hitting the expected validation condition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]