chia7712 commented on code in PR #15841:
URL: https://github.com/apache/kafka/pull/15841#discussion_r1586954293


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##########
@@ -347,6 +380,165 @@ public void testPutConnectorConfigWithTargetState() 
throws Exception {
         verify(configLog).stop();
     }
 
+    @Test
+    public void testPutConnectorConfigProducerError() throws Exception {
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        when(converter.fromConnectData(TOPIC, 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, 
CONNECTOR_CONFIG_STRUCTS.get(0)))
+                .thenReturn(CONFIGS_SERIALIZED.get(0));
+        when(configLog.sendWithReceipt(anyString(), 
any(byte[].class))).thenReturn(producerFuture);
+
+        // Verify initial state
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertEquals(0, configState.connectors().size());
+
+        when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(
+                new ExecutionException(new 
TopicAuthorizationException(Collections.singleton("test"))));
+
+        // verify that the producer exception from KafkaBasedLog::send is 
propagated
+        ConnectException e = assertThrows(ConnectException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
+                SAMPLE_CONFIGS.get(0), null));
+        assertTrue(e.getMessage().contains("Error writing connector 
configuration to Kafka"));

Review Comment:
   Could we verify the `e.getCause()` to make sure the error is caused by what 
we expect?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##########
@@ -347,6 +380,165 @@ public void testPutConnectorConfigWithTargetState() 
throws Exception {
         verify(configLog).stop();
     }
 
+    @Test
+    public void testPutConnectorConfigProducerError() throws Exception {
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        when(converter.fromConnectData(TOPIC, 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, 
CONNECTOR_CONFIG_STRUCTS.get(0)))
+                .thenReturn(CONFIGS_SERIALIZED.get(0));
+        when(configLog.sendWithReceipt(anyString(), 
any(byte[].class))).thenReturn(producerFuture);
+
+        // Verify initial state
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertEquals(0, configState.connectors().size());
+
+        when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(
+                new ExecutionException(new 
TopicAuthorizationException(Collections.singleton("test"))));
+
+        // verify that the producer exception from KafkaBasedLog::send is 
propagated
+        ConnectException e = assertThrows(ConnectException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
+                SAMPLE_CONFIGS.get(0), null));
+        assertTrue(e.getMessage().contains("Error writing connector 
configuration to Kafka"));
+
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testRemoveConnectorConfigSlowProducer() throws Exception {
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        @SuppressWarnings("unchecked")
+        Future<RecordMetadata> connectorConfigProducerFuture = 
mock(Future.class);
+
+        @SuppressWarnings("unchecked")
+        Future<RecordMetadata> targetStateProducerFuture = mock(Future.class);
+
+        when(configLog.sendWithReceipt(anyString(), isNull()))
+                // tombstone for the connector config
+                .thenReturn(connectorConfigProducerFuture)
+                // tombstone for the connector target state
+                .thenReturn(targetStateProducerFuture);
+
+        
when(connectorConfigProducerFuture.get(eq(READ_WRITE_TOTAL_TIMEOUT_MS), 
any(TimeUnit.class)))
+                .thenAnswer((Answer<RecordMetadata>) invocation -> {
+                    time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
+                    return null;
+                });
+
+        // the future get timeout is expected to be reduced according to how 
long the previous Future::get took
+        when(targetStateProducerFuture.get(eq(1000L), any(TimeUnit.class)))
+                .thenAnswer((Answer<RecordMetadata>) invocation -> {
+                    time.sleep(1000);
+                    return null;
+                });
+
+        @SuppressWarnings("unchecked")
+        Future<Void> future = mock(Future.class);
+        when(configLog.readToEnd()).thenReturn(future);
+
+        // the Future::get calls on the previous two producer futures 
exhausted the overall timeout; so expect the
+        // timeout on the log read future to be 0
+        when(future.get(eq(0L), any(TimeUnit.class))).thenReturn(null);
+
+        configStorage.removeConnectorConfig("test-connector");
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testWritePrivileges() throws Exception {
+        // With exactly.once.source.support = preparing (or also, "enabled"), 
we need to use a transactional producer
+        // to write some types of messages to the config topic
+        props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+        createStore();
+
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        // Try and fail to write a task count record to the config topic 
without write privileges
+        when(converter.fromConnectData(TOPIC, 
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, 
CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)))
+                .thenReturn(CONFIGS_SERIALIZED.get(0));
+
+        // Should fail the first time since we haven't claimed write privileges
+        assertThrows(IllegalStateException.class, () -> 
configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6));
+
+        // Claim write privileges
+        
doReturn(fencableProducer).when(configStorage).createFencableProducer();
+        // And write the task count record successfully
+        
when(fencableProducer.send(any(ProducerRecord.class))).thenReturn(null);
+        
doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0))))
+                
.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1),
 CONFIGS_SERIALIZED.get(2))))
+                .when(configLog).readToEnd();
+        expectRead(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0));
+
+        // Should succeed now
+        configStorage.claimWritePrivileges();
+        configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
+
+        verify(fencableProducer).beginTransaction();
+        verify(fencableProducer).commitTransaction();
+
+        // Try to write a connector config
+        when(converter.fromConnectData(TOPIC, 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, 
CONNECTOR_CONFIG_STRUCTS.get(0)))
+                .thenReturn(CONFIGS_SERIALIZED.get(1));
+        // Get fenced out
+        doThrow(new ProducerFencedException("Better luck next time"))
+                .doNothing()
+                .when(fencableProducer).commitTransaction();
+
+        // Should fail again when we get fenced out
+        assertThrows(PrivilegedWriteException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), 
null));
+
+        verify(fencableProducer, times(2)).beginTransaction();
+        verify(fencableProducer).close(Duration.ZERO);
+
+        // Should fail if we retry without reclaiming write privileges
+        assertThrows(IllegalStateException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), 
null));
+
+        // In the meantime, write a target state (which doesn't require write 
privileges)
+        when(converter.fromConnectData(TOPIC, 
KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED))
+                .thenReturn(CONFIGS_SERIALIZED.get(1));
+        when(configLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), 
CONFIGS_SERIALIZED.get(1)))
+                .thenReturn(producerFuture);
+        when(producerFuture.get(anyLong(), 
any(TimeUnit.class))).thenReturn(null);
+
+        // Should succeed even without write privileges (target states can be 
written by anyone)
+        configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED);
+
+        // Reclaim write privileges and successfully write the config
+        when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(2)))
+                .thenReturn(new SchemaAndValue(null, 
structToMap(CONNECTOR_CONFIG_STRUCTS.get(0))));
+
+        // Should succeed if we re-claim write privileges
+        configStorage.claimWritePrivileges();
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), 
SAMPLE_CONFIGS.get(0), null);
+
+        verify(fencableProducer, times(3)).beginTransaction();
+        verify(fencableProducer, times(3)).commitTransaction();
+        
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
+
+        configStorage.stop();
+        verify(configLog).stop();
+        verify(fencableProducer, times(2)).close(Duration.ZERO);

Review Comment:
   Could you add `verify(configStorage, times(2)).createFencableProducer();` to 
remind readers that we return the same `fencableProducer` instead of creating 
new one. Otherwise, `verify(fencableProducer, times(2)).close(Duration.ZERO);` 
is weird to me since it should not be closed two times in production.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##########
@@ -48,16 +53,19 @@
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.function.Supplier;

Review Comment:
   this import is redundant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to