[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996821#comment-15996821 ]
ASF GitHub Bot commented on FLINK-4022: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114786022 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java --- @@ -0,0 +1,842 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the {@link KafkaConsumerThread}. + */ +public class KafkaConsumerThreadTest { + + /** + * Tests reassignment works correctly in the case when: + * - the consumer initially had no assignments + * - new unassigned partitions already have defined offsets + * + * Setting a timeout because the test will not finish if there is logic error with + * the reassignment flow. + */ + @SuppressWarnings("unchecked") + @Test(timeout = 10000) + public void testReassigningPartitionsWithDefinedOffsetsWhenNoInitialAssignment() throws Exception { + final String testTopic = "test-topic"; + + // -------- new partitions with defined offsets -------- + + KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); + newPartition1.setOffset(23L); + + KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); + newPartition2.setOffset(31L); + + final List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2); + newPartitions.add(newPartition1); + newPartitions.add(newPartition2); + + // -------- setup mock KafkaConsumer -------- + + // no initial assignment + final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>(); + + final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer( + mockConsumerAssignmentsAndPositions, + Collections.<TopicPartition, Long>emptyMap(), + false, + null, + null); + + // -------- setup new partitions to be polled from the unassigned partitions queue -------- + + final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue = + new ClosableBlockingQueue<>(); + + for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) { + unassignedPartitionsQueue.add(newPartition); + } + + // -------- start test -------- + + final TestKafkaConsumerThread testThread = + new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover()); + testThread.start(); + + testThread.startPartitionReassignment(); + testThread.waitPartitionReassignmentComplete(); + + // verify that the consumer called assign() with all new partitions, and that positions are correctly advanced + + assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size()); + + for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) { + assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle())); + + // should be seeked to (offset in state + 1) because offsets in state represent the last processed record + assertEquals( + newPartition.getOffset() + 1, + mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue()); + } + + assertEquals(0, unassignedPartitionsQueue.size()); + } + + /** + * Tests reassignment works correctly in the case when: + * - the consumer initially had no assignments + * - new unassigned partitions have undefined offsets (e.g. EARLIEST_OFFSET sentinel value) + * + * Setting a timeout because the test will not finish if there is logic error with + * the reassignment flow. + */ + @SuppressWarnings("unchecked") + @Test(timeout = 10000) + public void testReassigningPartitionsWithoutDefinedOffsetsWhenNoInitialAssignment() throws Exception { + final String testTopic = "test-topic"; + + // -------- new partitions with undefined offsets -------- + + KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); + newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); + + KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); + newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); + + final List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2); + newPartitions.add(newPartition1); + newPartitions.add(newPartition2); + + // -------- setup mock KafkaConsumer -------- + + // no initial assignment + final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>(); + + // mock retrieved values that should replace the EARLIEST_OFFSET sentinels + final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>(); + mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L); + mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L); + + final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer( + mockConsumerAssignmentsAndPositions, + mockRetrievedPositions, + false, + null, + null); + + // -------- setup new partitions to be polled from the unassigned partitions queue -------- + + final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue = + new ClosableBlockingQueue<>(); + + for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) { + unassignedPartitionsQueue.add(newPartition); + } + + // -------- start test -------- + + final TestKafkaConsumerThread testThread = + new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover()); + testThread.start(); + + testThread.startPartitionReassignment(); + testThread.waitPartitionReassignmentComplete(); + + // the sentinel offset states should have been replaced with defined values according to the retrieved values + assertEquals(mockRetrievedPositions.get(newPartition1.getKafkaPartitionHandle()) - 1, newPartition1.getOffset()); + assertEquals(mockRetrievedPositions.get(newPartition2.getKafkaPartitionHandle()) - 1, newPartition2.getOffset()); + + // verify that the consumer called assign() with all new partitions, and that positions are correctly advanced + + assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size()); + + for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) { + assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle())); + + // should be seeked to (offset in state + 1) because offsets in state represent the last processed record + assertEquals( + newPartition.getOffset() + 1, + mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue()); + } + + assertEquals(0, unassignedPartitionsQueue.size()); + } + + /** + * Tests reassignment works correctly in the case when: + * - the consumer already have some assignments + * - new unassigned partitions already have defined offsets + * + * Setting a timeout because the test will not finish if there is logic error with + * the reassignment flow. + */ + @SuppressWarnings("unchecked") + @Test(timeout = 10000) + public void testReassigningPartitionsWithDefinedOffsets() throws Exception { + final String testTopic = "test-topic"; + + // -------- old partitions -------- + + KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); + oldPartition1.setOffset(23L); + + KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); + oldPartition2.setOffset(32L); + + List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2); + oldPartitions.add(oldPartition1); + oldPartitions.add(oldPartition2); + + // -------- new partitions with defined offsets -------- + + KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2)); + newPartition.setOffset(29L); + + List<KafkaTopicPartitionState<TopicPartition>> totalPartitions = new ArrayList<>(3); + totalPartitions.add(oldPartition1); + totalPartitions.add(oldPartition2); + totalPartitions.add(newPartition); + + // -------- setup mock KafkaConsumer -------- + + // has initial assignments + final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new HashMap<>(); + for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) { + mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1); + } + + final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer( + mockConsumerAssignmentsAndPositions, + Collections.<TopicPartition, Long>emptyMap(), + false, + null, + null); + + // -------- setup new partitions to be polled from the unassigned partitions queue -------- + + final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue = + new ClosableBlockingQueue<>(); + + unassignedPartitionsQueue.add(newPartition); + + // -------- start test -------- + + final TestKafkaConsumerThread testThread = + new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover()); + testThread.start(); + + testThread.startPartitionReassignment(); + testThread.waitPartitionReassignmentComplete(); + + // verify that the consumer called assign() with all new partitions, and that positions are correctly advanced + + assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size()); + + // old partitions should be re-seeked to their previous positions + for (KafkaTopicPartitionState<TopicPartition> partition : totalPartitions) { + assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle())); + + // should be seeked to (offset in state + 1) because offsets in state represent the last processed record + assertEquals( + partition.getOffset() + 1, + mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue()); + } + + assertEquals(0, unassignedPartitionsQueue.size()); + } + + /** + * Tests reassignment works correctly in the case when: + * - the consumer already have some assignments + * - new unassigned partitions have undefined offsets (e.g. EARLIEST_OFFSET sentinel value) + * + * Setting a timeout because the test will not finish if there is logic error with + * the reassignment flow. + */ + @SuppressWarnings("unchecked") + @Test(timeout = 10000) + public void testReassigningPartitionsWithoutDefinedOffsets() throws Exception { + final String testTopic = "test-topic"; + + // -------- old partitions -------- + + KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); + oldPartition1.setOffset(23L); + + KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); + oldPartition2.setOffset(32L); + + List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2); + oldPartitions.add(oldPartition1); + oldPartitions.add(oldPartition2); + + // -------- new partitions with undefined offsets -------- + + KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2)); + newPartition.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); + + List<KafkaTopicPartitionState<TopicPartition>> totalPartitions = new ArrayList<>(3); + totalPartitions.add(oldPartition1); + totalPartitions.add(oldPartition2); + totalPartitions.add(newPartition); + + // -------- setup mock KafkaConsumer -------- + + // has initial assignments + final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new HashMap<>(); + for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) { + mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1); + } + + // mock retrieved values that should replace the EARLIEST_OFFSET sentinels + final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>(); + mockRetrievedPositions.put(newPartition.getKafkaPartitionHandle(), 30L); + + final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer( + mockConsumerAssignmentsAndPositions, + mockRetrievedPositions, + false, + null, + null); + + // -------- setup new partitions to be polled from the unassigned partitions queue -------- + + final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue = + new ClosableBlockingQueue<>(); + + unassignedPartitionsQueue.add(newPartition); + + // -------- start test -------- + + final TestKafkaConsumerThread testThread = + new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover()); + testThread.start(); + + testThread.startPartitionReassignment(); + testThread.waitPartitionReassignmentComplete(); + + // the sentinel offset states should have been replaced with defined values according to the retrieved positions + assertEquals(mockRetrievedPositions.get(newPartition.getKafkaPartitionHandle()) - 1, newPartition.getOffset()); + + // verify that the consumer called assign() with all new partitions, and that positions are correctly advanced + + assertEquals(totalPartitions.size(), mockConsumerAssignmentsAndPositions.size()); + + // old partitions should be re-seeked to their previous positions + for (KafkaTopicPartitionState<TopicPartition> partition : totalPartitions) { + assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle())); + + // should be seeked to (offset in state + 1) because offsets in state represent the last processed record + assertEquals( + partition.getOffset() + 1, + mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue()); + } + + assertEquals(0, unassignedPartitionsQueue.size()); + } + + /** + * Tests reassignment works correctly in the case when: + * - the consumer already have some assignments + * - new unassigned partitions already have defined offsets + * - the consumer was woken up prior to the reassignment + * + * In this case, reassignment should not have occurred at all, and the consumer retains the original assignment. + * + * Setting a timeout because the test will not finish if there is logic error with + * the reassignment flow. + */ + @SuppressWarnings("unchecked") + @Test(timeout = 10000) + public void testReassigningPartitionsWithDefinedOffsetsWhenEarlyWakeup() throws Exception { + final String testTopic = "test-topic"; + + // -------- old partitions -------- + + KafkaTopicPartitionState<TopicPartition> oldPartition1 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); + oldPartition1.setOffset(23L); + + KafkaTopicPartitionState<TopicPartition> oldPartition2 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); + oldPartition2.setOffset(32L); + + List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = new ArrayList<>(2); + oldPartitions.add(oldPartition1); + oldPartitions.add(oldPartition2); + + // -------- new partitions with defined offsets -------- + + KafkaTopicPartitionState<TopicPartition> newPartition = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 2), new TopicPartition(testTopic, 2)); + newPartition.setOffset(29L); + + // -------- setup mock KafkaConsumer -------- + + // initial assignments + final Map<TopicPartition, Long> mockConsumerAssignmentsToPositions = new LinkedHashMap<>(); + for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) { + mockConsumerAssignmentsToPositions.put(oldPartition.getKafkaPartitionHandle(), oldPartition.getOffset() + 1); + } + + final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer( + mockConsumerAssignmentsToPositions, + Collections.<TopicPartition, Long>emptyMap(), + true, + null, + null); + + // -------- setup new partitions to be polled from the unassigned partitions queue -------- + + final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue = + new ClosableBlockingQueue<>(); + + unassignedPartitionsQueue.add(newPartition); + + // -------- start test -------- + + final TestKafkaConsumerThread testThread = + new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover()); + testThread.start(); + + // pause just before the reassignment so we can inject the wakeup + testThread.waitPartitionReassignmentInvoked(); + + testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>()); + verify(mockConsumer, times(1)).wakeup(); + + testThread.startPartitionReassignment(); + testThread.waitPartitionReassignmentComplete(); + + // the consumer's assignment should have remained untouched + + assertEquals(oldPartitions.size(), mockConsumerAssignmentsToPositions.size()); + + for (KafkaTopicPartitionState<TopicPartition> oldPartition : oldPartitions) { + assertTrue(mockConsumerAssignmentsToPositions.containsKey(oldPartition.getKafkaPartitionHandle())); + assertEquals( + oldPartition.getOffset() + 1, + mockConsumerAssignmentsToPositions.get(oldPartition.getKafkaPartitionHandle()).longValue()); + } + + // the new partitions should have been re-added to the unassigned partitions queue + assertEquals(1, unassignedPartitionsQueue.size()); + } + + /** + * Tests reassignment works correctly in the case when: + * - the consumer has no initial assignments + * - new unassigned partitions have undefined offsets + * - the consumer was woken up prior to the reassignment + * + * In this case, reassignment should not have occurred at all, and the consumer retains the original assignment. + * + * Setting a timeout because the test will not finish if there is logic error with + * the reassignment flow. + */ + @SuppressWarnings("unchecked") + @Test(timeout = 10000) + public void testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenEarlyWakeup() throws Exception { + final String testTopic = "test-topic"; + + // -------- new partitions with defined offsets -------- + + KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); + newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); + + KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); + newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); + + List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2); + newPartitions.add(newPartition1); + newPartitions.add(newPartition2); + + // -------- setup mock KafkaConsumer -------- + + // no initial assignments + final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>(); + + // mock retrieved values that should replace the EARLIEST_OFFSET sentinels + final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>(); + mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L); + mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L); + + final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer( + mockConsumerAssignmentsAndPositions, + mockRetrievedPositions, + true, + null, + null); + + // -------- setup new partitions to be polled from the unassigned partitions queue -------- + + final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue = + new ClosableBlockingQueue<>(); + + for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) { + unassignedPartitionsQueue.add(newPartition); + } + + // -------- start test -------- + + final TestKafkaConsumerThread testThread = + new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover()); + testThread.start(); + + // pause just before the reassignment so we can inject the wakeup + testThread.waitPartitionReassignmentInvoked(); + + testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>()); + + // make sure the consumer was actually woken up + verify(mockConsumer, times(1)).wakeup(); + + testThread.startPartitionReassignment(); + testThread.waitPartitionReassignmentComplete(); + + // the consumer's assignment should have remained untouched (in this case, empty) + assertEquals(0, mockConsumerAssignmentsAndPositions.size()); + + // the new partitions should have been re-added to the unassigned partitions queue + assertEquals(2, unassignedPartitionsQueue.size()); + } + + /** + * Tests reassignment works correctly in the case when: + * - the consumer has no initial assignments + * - new unassigned partitions have undefined offsets + * - the consumer was woken up during the reassignment + * + * In this case, reassignment should have completed, and the consumer is restored the wakeup call after the reassignment. + * + * Setting a timeout because the test will not finish if there is logic error with + * the reassignment flow. + */ + @SuppressWarnings("unchecked") + @Test(timeout = 10000) + public void testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenWakeupMidway() throws Exception { + final String testTopic = "test-topic"; + + // -------- new partitions with defined offsets -------- + + KafkaTopicPartitionState<TopicPartition> newPartition1 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); + newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); + + KafkaTopicPartitionState<TopicPartition> newPartition2 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); + newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); + + List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>(2); + newPartitions.add(newPartition1); + newPartitions.add(newPartition2); + + // -------- setup mock KafkaConsumer -------- + + // no initial assignments + final Map<TopicPartition, Long> mockConsumerAssignmentsAndPositions = new LinkedHashMap<>(); + + // mock retrieved values that should replace the EARLIEST_OFFSET sentinels + final Map<TopicPartition, Long> mockRetrievedPositions = new HashMap<>(); + mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L); + mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L); + + // these latches are used to pause midway the reassignment process + final OneShotLatch midAssignmentLatch = new OneShotLatch(); + final OneShotLatch continueAssigmentLatch = new OneShotLatch(); + + final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer( + mockConsumerAssignmentsAndPositions, + mockRetrievedPositions, + false, + midAssignmentLatch, + continueAssigmentLatch); + + // -------- setup new partitions to be polled from the unassigned partitions queue -------- + + final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue = + new ClosableBlockingQueue<>(); + + for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) { + unassignedPartitionsQueue.add(newPartition); + } + + // -------- start test -------- + + final TestKafkaConsumerThread testThread = + new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover()); + testThread.start(); + + testThread.startPartitionReassignment(); + + // wait until the reassignment has started + midAssignmentLatch.await(); + + testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>()); + + // the wakeup the setOffsetsToCommit() call should have been buffered, and not called on the consumer + verify(mockConsumer, never()).wakeup(); + + continueAssigmentLatch.trigger(); + + testThread.waitPartitionReassignmentComplete(); + + // verify that the consumer called assign() with all new partitions, and that positions are correctly advanced + + assertEquals(newPartitions.size(), mockConsumerAssignmentsAndPositions.size()); + + for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) { + assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle())); + + // should be seeked to (offset in state + 1) because offsets in state represent the last processed record + assertEquals( + newPartition.getOffset() + 1, + mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue()); + } + + // after the reassignment, the consumer should be restored the wakeup call + verify(mockConsumer, times(1)).wakeup(); + + assertEquals(0, unassignedPartitionsQueue.size()); + } + + /** + * A testable {@link KafkaConsumerThread} that injects multiple latches exactly before and after + * partition reassignment, so that tests are eligible to setup various conditions before the reassignment happens + * and inspect reqssignment results after it is completed. --- End diff -- Typo "reqssignment" > Partition discovery / regex topic subscription for the Kafka consumer > --------------------------------------------------------------------- > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors > Affects Versions: 1.0.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)