yashmayya commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1152720712


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        executor.submit(() -> {
+            String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+            Connector connector;
+
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                connector = plugins.newConnector(connectorClassOrAlias);

Review Comment:
   Whoops, thanks, that was a remnant from before we wrapped this whole section 
with the thread context classloader switch.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        executor.submit(() -> {
+            String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+            Connector connector;
+
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                connector = plugins.newConnector(connectorClassOrAlias);
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    log.debug("Fetching offsets for sink connector: {}", 
connName);
+                    sinkConnectorOffsets(connName, connector, connectorConfig, 
cb);
+                } else {
+                    log.debug("Fetching offsets for source connector: {}", 
connName);
+                    sourceConnectorOffsets(connName, connector, 
connectorConfig, cb);
+                }
+            } catch (Exception e) {
+                cb.onCompletion(e, null);
+            }
+        });
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                      Callback<ConnectorOffsets> cb) {
+        sinkConnectorOffsets(connName, connector, connectorConfig, cb, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                              Callback<ConnectorOffsets> cb, 
Function<Map<String, Object>, Admin> adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        ListConsumerGroupOffsetsOptions listOffsetsOptions = new 
ListConsumerGroupOffsetsOptions()
+                .timeoutMs((int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+        ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
+        listConsumerGroupOffsetsResult.all().whenComplete((result, error) -> {
+            if (error != null) {
+                cb.onCompletion(new ConnectException("Failed to retrieve 
consumer group offsets for sink connector " + connName, error), null);

Review Comment:
   Thanks, that's a good point.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -866,4 +867,18 @@ public List<ConfigKeyInfo> connectorPluginConfig(String 
pluginName) {
         }
     }
 
+    @Override
+    public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {

Review Comment:
   > I think we may want to put this through the tick thread in the 
DistributedHerder, to ensure that the connector isn't deleted between when we 
check to see that it exists in the config backing store and when we try to get 
its config
   
   Hm wouldn't we still be susceptible to that if the request is served on a 
non-leader worker (since we aren't forwarding get offset requests to the 
leader)? And isn't that acceptable for a GET API?
   
   > We might actually want to tweak the message in the AbstractHerder class to 
not use the "Submitting... request" language since it's more like we're 
actually fulfilling the request at that point.
   
   I think the submitting request language can still be used because the worker 
API being called isn't synchronous either?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java:
##########
@@ -109,6 +121,69 @@ public void testThreadName() {
                 
.newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName()));
     }
 
+    @Test
+    public void testConnectorPartitions() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> setCallback = mock(Callback.class);
+
+        JsonConverter jsonConverter = new JsonConverter();
+        
jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 "false"), true);
+
+        Map<ByteBuffer, ByteBuffer> serializedPartitionOffsets = new 
HashMap<>();
+        serializedPartitionOffsets.put(
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Arrays.asList("connector1", 
Collections.singletonMap("partitionKey", "partitionValue1")))),
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Collections.singletonMap("offsetKey", "offsetValue")))
+        );
+        serializedPartitionOffsets.put(
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Arrays.asList("connector1", 
Collections.singletonMap("partitionKey", "partitionValue1")))),
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Collections.singletonMap("offsetKey", "offsetValue2")))
+        );
+        serializedPartitionOffsets.put(
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Arrays.asList("connector1", 
Collections.singletonMap("partitionKey", "partitionValue2")))),
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Collections.singletonMap("offsetKey", "offsetValue")))
+        );
+        serializedPartitionOffsets.put(
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Arrays.asList("connector2", 
Collections.singletonMap("partitionKey", "partitionValue")))),
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Collections.singletonMap("offsetKey", "offsetValue")))
+        );
+
+        store.set(serializedPartitionOffsets, setCallback).get();
+        store.stop();
+
+        // Restore into a new store to ensure correct reload from scratch
+        FileOffsetBackingStore restore = new 
FileOffsetBackingStore(jsonConverter);
+        restore.configure(config);
+        restore.start();
+
+        Set<Map<String, Object>> connectorPartitions1 = 
restore.connectorPartitions("connector1");
+        assertEquals(2, connectorPartitions1.size());
+
+        Set<Map<String, Object>> expectedConnectorPartition1 = new HashSet<>();
+        
expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", 
"partitionValue1"));
+        
expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", 
"partitionValue2"));
+
+        
assertTrue(connectorPartitions1.containsAll(expectedConnectorPartition1));
+        
assertTrue(expectedConnectorPartition1.containsAll(connectorPartitions1));
+
+        Set<Map<String, Object>> connectorPartitions2 = 
restore.connectorPartitions("connector2");
+        assertEquals(1, connectorPartitions2.size());
+
+        Set<Map<String, Object>> expectedConnectorPartition2 = 
Collections.singleton(Collections.singletonMap("partitionKey", 
"partitionValue"));
+
+        
assertTrue(connectorPartitions2.containsAll(expectedConnectorPartition2));
+        
assertTrue(expectedConnectorPartition2.containsAll(connectorPartitions2));

Review Comment:
   Makes sense, added. This also made me realize that there was a bug in the 
`FileOffsetBackingStore` implementation for `connectorPartitions`. We were only 
populating the `connectorPartitions` map in the `load` method which only 
happens once during startup; we should also be populating it on calls to `save` 
which gets called on every `OffsetBackingStore::set` (this wasn't an issue in 
the `KafkaOffsetBackingStore` where the `connectorPartitions` map is populated 
in the separate thread consuming from the `KafkaBasedLog`). 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        executor.submit(() -> {
+            String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+            Connector connector;
+
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                connector = plugins.newConnector(connectorClassOrAlias);
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    log.debug("Fetching offsets for sink connector: {}", 
connName);
+                    sinkConnectorOffsets(connName, connector, connectorConfig, 
cb);
+                } else {
+                    log.debug("Fetching offsets for source connector: {}", 
connName);
+                    sourceConnectorOffsets(connName, connector, 
connectorConfig, cb);
+                }
+            } catch (Exception e) {
+                cb.onCompletion(e, null);
+            }
+        });
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                      Callback<ConnectorOffsets> cb) {
+        sinkConnectorOffsets(connName, connector, connectorConfig, cb, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                              Callback<ConnectorOffsets> cb, 
Function<Map<String, Object>, Admin> adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);

Review Comment:
   Hmm, in theory there shouldn't be any exceptions thrown in the remainder of 
the method and the admin client is closed at the end of the `whenComplete` 
chained to the returned future. I'm not using a try-with-resources or a finally 
block here because we don't want to close the admin client before the 
`listConsumerGroupOffsets` request completes but I guess it doesn't hurt to 
wrap this in a catch all try block (also since we're undoing the earlier 
suggestion of centralizing the try/catch logic, this looks okay).



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for Kafka Connect's connector offset management REST APIs
+ */
+@Category(IntegrationTest.class)
+public class OffsetsApiIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final Integer NUM_TASKS = 2;
+    private static final long OFFSET_COMMIT_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(1);
+    private static final int NUM_WORKERS = 3;
+    private EmbeddedConnectCluster connect;
+
+    @Before
+    public void setup() {
+        // setup Connect worker properties
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
+
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
+                .workerProps(workerProps)
+                .build();
+        connect.start();
+    }
+
+    @After
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testGetNonExistentConnectorOffsets() {
+        ConnectRestException e = assertThrows(ConnectRestException.class,
+                () -> connect.connectorOffsets("non-existent-connector"));
+        assertEquals(404, e.errorCode());
+    }
+
+    @Test
+    public void testGetSinkConnectorOffsets() throws Exception {
+        getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), 
connect.kafka());
+    }
+
+    @Test
+    public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws 
Exception {
+        Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
+        
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.GROUP_ID_CONFIG,

Review Comment:
   Hm I get where you're coming from but I feel like adding anything too 
involved here would be detracting from this test and actually testing a 
different part of Connect (client config overrides). So I've just added a 
simple admin client based check after the call to 
`getAndVerifySinkConnectorOffsets` to ensure that the overridden group ID 
exists and the default one doesn't.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1722,6 +1727,128 @@ public void testZombieFencing() {
         verifyGenericIsolation();
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testGetSinkConnectorOffsets() throws Exception {
+        mockKafkaClusterId();
+
+        String connectorClass = SampleSinkConnector.class.getName();
+        connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, executorService,
+                allConnectorClientConfigOverridePolicy);
+        worker.start();
+
+        Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
+                Collections.singletonMap(new TopicPartition("test-topic", 0), 
new OffsetAndMetadata(10));
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> 
consumerGroupToOffsetsMap =
+                
Collections.singletonMap(SinkUtils.consumerGroupId(CONNECTOR_ID), 
consumerGroupOffsets);
+
+        Admin admin = mock(Admin.class);
+        ListConsumerGroupOffsetsResult result = 
mock(ListConsumerGroupOffsetsResult.class);
+        when(admin.listConsumerGroupOffsets(anyString(), 
any(ListConsumerGroupOffsetsOptions.class))).thenReturn(result);
+        KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> 
adminFuture = mock(KafkaFuture.class);
+        when(result.all()).thenReturn(adminFuture);
+        when(adminFuture.whenComplete(any())).thenAnswer(invocation -> {
+            ((KafkaFuture.BiConsumer<Map<String, Map<TopicPartition, 
OffsetAndMetadata>>, Throwable>) invocation.getArgument(0))
+                    .accept(consumerGroupToOffsetsMap, null);
+            return null;
+        });
+
+        FutureCallback<ConnectorOffsets> cb = new FutureCallback<>();
+        worker.sinkConnectorOffsets(CONNECTOR_ID, sinkConnector, 
connectorProps, cb, config -> admin);
+        ConnectorOffsets offsets = cb.get(1000, TimeUnit.MILLISECONDS);
+
+        assertEquals(1, offsets.offsets().size());
+        assertEquals(10L, 
offsets.offsets().get(0).offset().get(SinkUtils.KAFKA_OFFSET_KEY));
+        assertEquals(0, 
offsets.offsets().get(0).partition().get(SinkUtils.KAFKA_PARTITION_KEY));
+        assertEquals("test-topic", 
offsets.offsets().get(0).partition().get(SinkUtils.KAFKA_TOPIC_KEY));
+
+        
verify(admin).listConsumerGroupOffsets(eq(SinkUtils.consumerGroupId(CONNECTOR_ID)),
 any(ListConsumerGroupOffsetsOptions.class));
+        verify(admin).close();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws 
Exception {
+        mockKafkaClusterId();
+
+        String connectorClass = SampleSinkConnector.class.getName();
+        connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
+        
connectorProps.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
CommonClientConfigs.GROUP_ID_CONFIG, "overridden-group-id");
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, executorService,
+                allConnectorClientConfigOverridePolicy);
+        worker.start();
+
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> 
consumerGroupToOffsetsMap = new HashMap<>();
+        consumerGroupToOffsetsMap.put("overridden-group-id", 
Collections.singletonMap(new TopicPartition("test-topic", 0), new 
OffsetAndMetadata(10)));
+        consumerGroupToOffsetsMap.put(SinkUtils.consumerGroupId(CONNECTOR_ID),
+                Collections.singletonMap(new TopicPartition("test-topic-2", 
1), new OffsetAndMetadata(0)));
+
+        Admin admin = mock(Admin.class);
+        ListConsumerGroupOffsetsResult result = 
mock(ListConsumerGroupOffsetsResult.class);
+        when(admin.listConsumerGroupOffsets(anyString(), 
any(ListConsumerGroupOffsetsOptions.class))).thenReturn(result);
+        KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> 
adminFuture = mock(KafkaFuture.class);
+        when(result.all()).thenReturn(adminFuture);
+        when(adminFuture.whenComplete(any())).thenAnswer(invocation -> {
+            ((KafkaFuture.BiConsumer<Map<String, Map<TopicPartition, 
OffsetAndMetadata>>, Throwable>) invocation.getArgument(0))
+                    .accept(consumerGroupToOffsetsMap, null);
+            return null;
+        });
+
+        FutureCallback<ConnectorOffsets> cb = new FutureCallback<>();
+        worker.sinkConnectorOffsets(CONNECTOR_ID, sinkConnector, 
connectorProps, cb, config -> admin);
+        ConnectorOffsets offsets = cb.get(1000, TimeUnit.MILLISECONDS);
+
+        assertEquals(1, offsets.offsets().size());
+        assertEquals(10L, 
offsets.offsets().get(0).offset().get(SinkUtils.KAFKA_OFFSET_KEY));
+        assertEquals(0, 
offsets.offsets().get(0).partition().get(SinkUtils.KAFKA_PARTITION_KEY));
+        assertEquals("test-topic", 
offsets.offsets().get(0).partition().get(SinkUtils.KAFKA_TOPIC_KEY));
+
+        verify(admin).listConsumerGroupOffsets(eq("overridden-group-id"), 
any(ListConsumerGroupOffsetsOptions.class));
+        verify(admin).close();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    public void testGetSourceConnectorOffsets() throws Exception {

Review Comment:
   I've added a test case where `offsetStore::connectorPartitions` throws an 
exception and verified that the store is stopped. The callback invocation is 
indirectly tested through the use of a `FutureCallback` in the test and making 
assertions on the exception thrown by `FutureCallback::get`.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -460,6 +471,74 @@ public void testClientIds() {
         assertEquals(expectedClientId, 
capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
     }
 
+    @Test
+    public void testConnectorPartitions() throws Exception {
+        JsonConverter jsonConverter = new JsonConverter();
+        
jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 "false"), true);
+        store = spy(new KafkaOffsetBackingStore(() -> {
+            fail("Should not attempt to instantiate admin in these tests");
+            return null;
+        }, () -> CLIENT_ID_BASE, jsonConverter));
+
+        
doReturn(storeLog).when(store).createKafkaBasedLog(capturedTopic.capture(), 
capturedProducerProps.capture(),
+                capturedConsumerProps.capture(), 
capturedConsumedCallback.capture(),
+                capturedNewTopic.capture(), capturedAdminSupplier.capture());
+
+        store.configure(mockConfig(props));
+        store.start();
+
+        verify(storeLog).start();
+
+        doAnswer(invocation -> {
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector1",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue1"))), TP0_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 1, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector1",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue1"))), TP1_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 2, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector1",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue2"))), TP2_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 3, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector2",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue"))), TP1_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
+            return null;
+        }).when(storeLog).readToEnd(storeLogCallbackArgumentCaptor.capture());
+
+        // Trigger a read to the end of the log
+        store.get(Collections.emptyList()).get(10000, TimeUnit.MILLISECONDS);
+
+        Set<Map<String, Object>> connectorPartitions1 = 
store.connectorPartitions("connector1");
+        assertEquals(2, connectorPartitions1.size());
+
+        Set<Map<String, Object>> expectedConnectorPartition1 = new HashSet<>();
+        
expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", 
"partitionValue1"));
+        
expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", 
"partitionValue2"));
+
+        
assertTrue(connectorPartitions1.containsAll(expectedConnectorPartition1));
+        
assertTrue(expectedConnectorPartition1.containsAll(connectorPartitions1));

Review Comment:
   Ah nope, leftover from earlier where I was using a List rather than a Set 
and we didn't care about the order.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java:
##########
@@ -109,6 +121,69 @@ public void testThreadName() {
                 
.newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName()));
     }
 
+    @Test
+    public void testConnectorPartitions() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> setCallback = mock(Callback.class);
+
+        JsonConverter jsonConverter = new JsonConverter();
+        
jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 "false"), true);
+
+        Map<ByteBuffer, ByteBuffer> serializedPartitionOffsets = new 
HashMap<>();
+        serializedPartitionOffsets.put(
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Arrays.asList("connector1", 
Collections.singletonMap("partitionKey", "partitionValue1")))),
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Collections.singletonMap("offsetKey", "offsetValue")))
+        );
+        serializedPartitionOffsets.put(

Review Comment:
   Whoops yeah, the intention was to test the case where new offsets are 
written for partitions that are already known and ensure that the number of 
partitions doesn't increase. I've split it into separate 
`OffsetBackingStore::set` calls.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java:
##########
@@ -191,12 +194,16 @@ public void start(Map<String, String> props) {
         @Override
         public List<SourceRecord> poll() {
             if (!stopped) {
+                // Don't return any more records since we've already produced 
the configured maximum number.
+                if (seqno >= maxMessages) {
+                    return null;
+                }
                 if (throttler.shouldThrottle(seqno - startingSeqno, 
System.currentTimeMillis())) {
                     throttler.throttle();
                 }
                 taskHandle.record(batchSize);

Review Comment:
   Good suggestion, I've moved it to a new variable called `currentBatchSize`.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java:
##########
@@ -109,6 +121,69 @@ public void testThreadName() {
                 
.newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName()));
     }
 
+    @Test
+    public void testConnectorPartitions() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> setCallback = mock(Callback.class);
+
+        JsonConverter jsonConverter = new JsonConverter();
+        
jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 "false"), true);
+
+        Map<ByteBuffer, ByteBuffer> serializedPartitionOffsets = new 
HashMap<>();
+        serializedPartitionOffsets.put(
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Arrays.asList("connector1", 
Collections.singletonMap("partitionKey", "partitionValue1")))),
+                ByteBuffer.wrap(jsonConverter.fromConnectData("", null,
+                        Collections.singletonMap("offsetKey", "offsetValue")))

Review Comment:
   Nice 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to