chia7712 commented on code in PR #15841: URL: https://github.com/apache/kafka/pull/15841#discussion_r1586954293
########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java: ########## @@ -347,6 +380,165 @@ public void testPutConnectorConfigWithTargetState() throws Exception { verify(configLog).stop(); } + @Test + public void testPutConnectorConfigProducerError() throws Exception { + expectStart(Collections.emptyList(), Collections.emptyMap()); + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0))) + .thenReturn(CONFIGS_SERIALIZED.get(0)); + when(configLog.sendWithReceipt(anyString(), any(byte[].class))).thenReturn(producerFuture); + + // Verify initial state + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(-1, configState.offset()); + assertEquals(0, configState.connectors().size()); + + when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow( + new ExecutionException(new TopicAuthorizationException(Collections.singleton("test")))); + + // verify that the producer exception from KafkaBasedLog::send is propagated + ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), + SAMPLE_CONFIGS.get(0), null)); + assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka")); Review Comment: Could we verify the `e.getCause()` to make sure the error is caused by what we expect? ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java: ########## @@ -347,6 +380,165 @@ public void testPutConnectorConfigWithTargetState() throws Exception { verify(configLog).stop(); } + @Test + public void testPutConnectorConfigProducerError() throws Exception { + expectStart(Collections.emptyList(), Collections.emptyMap()); + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0))) + .thenReturn(CONFIGS_SERIALIZED.get(0)); + when(configLog.sendWithReceipt(anyString(), any(byte[].class))).thenReturn(producerFuture); + + // Verify initial state + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(-1, configState.offset()); + assertEquals(0, configState.connectors().size()); + + when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow( + new ExecutionException(new TopicAuthorizationException(Collections.singleton("test")))); + + // verify that the producer exception from KafkaBasedLog::send is propagated + ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), + SAMPLE_CONFIGS.get(0), null)); + assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka")); + + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + public void testRemoveConnectorConfigSlowProducer() throws Exception { + expectStart(Collections.emptyList(), Collections.emptyMap()); + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + @SuppressWarnings("unchecked") + Future<RecordMetadata> connectorConfigProducerFuture = mock(Future.class); + + @SuppressWarnings("unchecked") + Future<RecordMetadata> targetStateProducerFuture = mock(Future.class); + + when(configLog.sendWithReceipt(anyString(), isNull())) + // tombstone for the connector config + .thenReturn(connectorConfigProducerFuture) + // tombstone for the connector target state + .thenReturn(targetStateProducerFuture); + + when(connectorConfigProducerFuture.get(eq(READ_WRITE_TOTAL_TIMEOUT_MS), any(TimeUnit.class))) + .thenAnswer((Answer<RecordMetadata>) invocation -> { + time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000); + return null; + }); + + // the future get timeout is expected to be reduced according to how long the previous Future::get took + when(targetStateProducerFuture.get(eq(1000L), any(TimeUnit.class))) + .thenAnswer((Answer<RecordMetadata>) invocation -> { + time.sleep(1000); + return null; + }); + + @SuppressWarnings("unchecked") + Future<Void> future = mock(Future.class); + when(configLog.readToEnd()).thenReturn(future); + + // the Future::get calls on the previous two producer futures exhausted the overall timeout; so expect the + // timeout on the log read future to be 0 + when(future.get(eq(0L), any(TimeUnit.class))).thenReturn(null); + + configStorage.removeConnectorConfig("test-connector"); + configStorage.stop(); + verify(configLog).stop(); + } + + @Test + @SuppressWarnings("unchecked") + public void testWritePrivileges() throws Exception { + // With exactly.once.source.support = preparing (or also, "enabled"), we need to use a transactional producer + // to write some types of messages to the config topic + props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing"); + createStore(); + + expectStart(Collections.emptyList(), Collections.emptyMap()); + expectPartitionCount(1); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + verifyConfigure(); + configStorage.start(); + + // Try and fail to write a task count record to the config topic without write privileges + when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0))) + .thenReturn(CONFIGS_SERIALIZED.get(0)); + + // Should fail the first time since we haven't claimed write privileges + assertThrows(IllegalStateException.class, () -> configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6)); + + // Claim write privileges + doReturn(fencableProducer).when(configStorage).createFencableProducer(); + // And write the task count record successfully + when(fencableProducer.send(any(ProducerRecord.class))).thenReturn(null); + doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)))) + .doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)))) + .when(configLog).readToEnd(); + expectRead(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)); + + // Should succeed now + configStorage.claimWritePrivileges(); + configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6); + + verify(fencableProducer).beginTransaction(); + verify(fencableProducer).commitTransaction(); + + // Try to write a connector config + when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0))) + .thenReturn(CONFIGS_SERIALIZED.get(1)); + // Get fenced out + doThrow(new ProducerFencedException("Better luck next time")) + .doNothing() + .when(fencableProducer).commitTransaction(); + + // Should fail again when we get fenced out + assertThrows(PrivilegedWriteException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null)); + + verify(fencableProducer, times(2)).beginTransaction(); + verify(fencableProducer).close(Duration.ZERO); + + // Should fail if we retry without reclaiming write privileges + assertThrows(IllegalStateException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null)); + + // In the meantime, write a target state (which doesn't require write privileges) + when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED)) + .thenReturn(CONFIGS_SERIALIZED.get(1)); + when(configLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1))) + .thenReturn(producerFuture); + when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenReturn(null); + + // Should succeed even without write privileges (target states can be written by anyone) + configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED); + + // Reclaim write privileges and successfully write the config + when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(2))) + .thenReturn(new SchemaAndValue(null, structToMap(CONNECTOR_CONFIG_STRUCTS.get(0)))); + + // Should succeed if we re-claim write privileges + configStorage.claimWritePrivileges(); + configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null); + + verify(fencableProducer, times(3)).beginTransaction(); + verify(fencableProducer, times(3)).commitTransaction(); + verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1)); + + configStorage.stop(); + verify(configLog).stop(); + verify(fencableProducer, times(2)).close(Duration.ZERO); Review Comment: Could you add `verify(configStorage, times(2)).createFencableProducer();` to remind readers that we return the same `fencableProducer` instead of creating new one. Otherwise, `verify(fencableProducer, times(2)).close(Duration.ZERO);` is weird to me since it should not be closed two times in production. ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java: ########## @@ -48,16 +53,19 @@ import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.function.Supplier; Review Comment: this import is redundant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org