[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996821#comment-15996821
 ] 

ASF GitHub Bot commented on FLINK-4022:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114786022
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ---
    @@ -0,0 +1,842 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internal;
    +
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.metrics.MetricGroup;
    +import 
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
    +import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
    +import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.common.errors.WakeupException;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.anyListOf;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Mockito.doAnswer;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.never;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Unit tests for the {@link KafkaConsumerThread}.
    + */
    +public class KafkaConsumerThreadTest {
    +
    +   /**
    +    * Tests reassignment works correctly in the case when:
    +    *  - the consumer initially had no assignments
    +    *  - new unassigned partitions already have defined offsets
    +    *
    +    * Setting a timeout because the test will not finish if there is logic 
error with
    +    * the reassignment flow.
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout = 10000)
    +   public void 
testReassigningPartitionsWithDefinedOffsetsWhenNoInitialAssignment() throws 
Exception {
    +           final String testTopic = "test-topic";
    +
    +           // -------- new partitions with defined offsets --------
    +
    +           KafkaTopicPartitionState<TopicPartition> newPartition1 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 0), new 
TopicPartition(testTopic, 0));
    +           newPartition1.setOffset(23L);
    +
    +           KafkaTopicPartitionState<TopicPartition> newPartition2 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 1), new 
TopicPartition(testTopic, 1));
    +           newPartition2.setOffset(31L);
    +
    +           final List<KafkaTopicPartitionState<TopicPartition>> 
newPartitions = new ArrayList<>(2);
    +           newPartitions.add(newPartition1);
    +           newPartitions.add(newPartition2);
    +
    +           // -------- setup mock KafkaConsumer --------
    +
    +           // no initial assignment
    +           final Map<TopicPartition, Long> 
mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +           final KafkaConsumer<byte[], byte[]> mockConsumer = 
createMockConsumer(
    +                           mockConsumerAssignmentsAndPositions,
    +                           Collections.<TopicPartition, Long>emptyMap(),
    +                           false,
    +                           null,
    +                           null);
    +
    +           // -------- setup new partitions to be polled from the 
unassigned partitions queue --------
    +
    +           final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> 
unassignedPartitionsQueue =
    +                   new ClosableBlockingQueue<>();
    +
    +           for (KafkaTopicPartitionState<TopicPartition> newPartition : 
newPartitions) {
    +                   unassignedPartitionsQueue.add(newPartition);
    +           }
    +
    +           // -------- start test --------
    +
    +           final TestKafkaConsumerThread testThread =
    +                           new TestKafkaConsumerThread(mockConsumer, 
unassignedPartitionsQueue, new Handover());
    +           testThread.start();
    +
    +           testThread.startPartitionReassignment();
    +           testThread.waitPartitionReassignmentComplete();
    +
    +           // verify that the consumer called assign() with all new 
partitions, and that positions are correctly advanced
    +
    +           assertEquals(newPartitions.size(), 
mockConsumerAssignmentsAndPositions.size());
    +
    +           for (KafkaTopicPartitionState<TopicPartition> newPartition : 
newPartitions) {
    +                   
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
    +
    +                   // should be seeked to (offset in state + 1) because 
offsets in state represent the last processed record
    +                   assertEquals(
    +                                   newPartition.getOffset() + 1,
    +                                   
mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
    +           }
    +
    +           assertEquals(0, unassignedPartitionsQueue.size());
    +   }
    +
    +   /**
    +    * Tests reassignment works correctly in the case when:
    +    *  - the consumer initially had no assignments
    +    *  - new unassigned partitions have undefined offsets (e.g. 
EARLIEST_OFFSET sentinel value)
    +    *
    +    * Setting a timeout because the test will not finish if there is logic 
error with
    +    * the reassignment flow.
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout = 10000)
    +   public void 
testReassigningPartitionsWithoutDefinedOffsetsWhenNoInitialAssignment() throws 
Exception {
    +           final String testTopic = "test-topic";
    +
    +           // -------- new partitions with undefined offsets --------
    +
    +           KafkaTopicPartitionState<TopicPartition> newPartition1 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 0), new 
TopicPartition(testTopic, 0));
    +           
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +           KafkaTopicPartitionState<TopicPartition> newPartition2 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 1), new 
TopicPartition(testTopic, 1));
    +           
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +           final List<KafkaTopicPartitionState<TopicPartition>> 
newPartitions = new ArrayList<>(2);
    +           newPartitions.add(newPartition1);
    +           newPartitions.add(newPartition2);
    +
    +           // -------- setup mock KafkaConsumer --------
    +
    +           // no initial assignment
    +           final Map<TopicPartition, Long> 
mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +           // mock retrieved values that should replace the 
EARLIEST_OFFSET sentinels
    +           final Map<TopicPartition, Long> mockRetrievedPositions = new 
HashMap<>();
    +           
mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
    +           
mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
    +
    +           final KafkaConsumer<byte[], byte[]> mockConsumer = 
createMockConsumer(
    +                           mockConsumerAssignmentsAndPositions,
    +                           mockRetrievedPositions,
    +                           false,
    +                           null,
    +                           null);
    +
    +           // -------- setup new partitions to be polled from the 
unassigned partitions queue --------
    +
    +           final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> 
unassignedPartitionsQueue =
    +                   new ClosableBlockingQueue<>();
    +
    +           for (KafkaTopicPartitionState<TopicPartition> newPartition : 
newPartitions) {
    +                   unassignedPartitionsQueue.add(newPartition);
    +           }
    +
    +           // -------- start test --------
    +
    +           final TestKafkaConsumerThread testThread =
    +                   new TestKafkaConsumerThread(mockConsumer, 
unassignedPartitionsQueue, new Handover());
    +           testThread.start();
    +
    +           testThread.startPartitionReassignment();
    +           testThread.waitPartitionReassignmentComplete();
    +
    +           // the sentinel offset states should have been replaced with 
defined values according to the retrieved values
    +           
assertEquals(mockRetrievedPositions.get(newPartition1.getKafkaPartitionHandle())
 - 1, newPartition1.getOffset());
    +           
assertEquals(mockRetrievedPositions.get(newPartition2.getKafkaPartitionHandle())
 - 1, newPartition2.getOffset());
    +
    +           // verify that the consumer called assign() with all new 
partitions, and that positions are correctly advanced
    +
    +           assertEquals(newPartitions.size(), 
mockConsumerAssignmentsAndPositions.size());
    +
    +           for (KafkaTopicPartitionState<TopicPartition> newPartition : 
newPartitions) {
    +                   
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
    +
    +                   // should be seeked to (offset in state + 1) because 
offsets in state represent the last processed record
    +                   assertEquals(
    +                                   newPartition.getOffset() + 1,
    +                                   
mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
    +           }
    +
    +           assertEquals(0, unassignedPartitionsQueue.size());
    +   }
    +
    +   /**
    +    * Tests reassignment works correctly in the case when:
    +    *  - the consumer already have some assignments
    +    *  - new unassigned partitions already have defined offsets
    +    *
    +    * Setting a timeout because the test will not finish if there is logic 
error with
    +    * the reassignment flow.
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout = 10000)
    +   public void testReassigningPartitionsWithDefinedOffsets() throws 
Exception {
    +           final String testTopic = "test-topic";
    +
    +           // -------- old partitions --------
    +
    +           KafkaTopicPartitionState<TopicPartition> oldPartition1 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 0), new 
TopicPartition(testTopic, 0));
    +           oldPartition1.setOffset(23L);
    +
    +           KafkaTopicPartitionState<TopicPartition> oldPartition2 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 1), new 
TopicPartition(testTopic, 1));
    +           oldPartition2.setOffset(32L);
    +
    +           List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = 
new ArrayList<>(2);
    +           oldPartitions.add(oldPartition1);
    +           oldPartitions.add(oldPartition2);
    +
    +           // -------- new partitions with defined offsets --------
    +
    +           KafkaTopicPartitionState<TopicPartition> newPartition = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 2), new 
TopicPartition(testTopic, 2));
    +           newPartition.setOffset(29L);
    +
    +           List<KafkaTopicPartitionState<TopicPartition>> totalPartitions 
= new ArrayList<>(3);
    +           totalPartitions.add(oldPartition1);
    +           totalPartitions.add(oldPartition2);
    +           totalPartitions.add(newPartition);
    +
    +           // -------- setup mock KafkaConsumer --------
    +
    +           // has initial assignments
    +           final Map<TopicPartition, Long> 
mockConsumerAssignmentsAndPositions = new HashMap<>();
    +           for (KafkaTopicPartitionState<TopicPartition> oldPartition : 
oldPartitions) {
    +                   
mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), 
oldPartition.getOffset() + 1);
    +           }
    +
    +           final KafkaConsumer<byte[], byte[]> mockConsumer = 
createMockConsumer(
    +                           mockConsumerAssignmentsAndPositions,
    +                           Collections.<TopicPartition, Long>emptyMap(),
    +                           false,
    +                           null,
    +                           null);
    +
    +           // -------- setup new partitions to be polled from the 
unassigned partitions queue --------
    +
    +           final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> 
unassignedPartitionsQueue =
    +                   new ClosableBlockingQueue<>();
    +
    +           unassignedPartitionsQueue.add(newPartition);
    +
    +           // -------- start test --------
    +
    +           final TestKafkaConsumerThread testThread =
    +                   new TestKafkaConsumerThread(mockConsumer, 
unassignedPartitionsQueue, new Handover());
    +           testThread.start();
    +
    +           testThread.startPartitionReassignment();
    +           testThread.waitPartitionReassignmentComplete();
    +
    +           // verify that the consumer called assign() with all new 
partitions, and that positions are correctly advanced
    +
    +           assertEquals(totalPartitions.size(), 
mockConsumerAssignmentsAndPositions.size());
    +
    +           // old partitions should be re-seeked to their previous 
positions
    +           for (KafkaTopicPartitionState<TopicPartition> partition : 
totalPartitions) {
    +                   
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
    +
    +                   // should be seeked to (offset in state + 1) because 
offsets in state represent the last processed record
    +                   assertEquals(
    +                                   partition.getOffset() + 1,
    +                                   
mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue());
    +           }
    +
    +           assertEquals(0, unassignedPartitionsQueue.size());
    +   }
    +
    +   /**
    +    * Tests reassignment works correctly in the case when:
    +    *  - the consumer already have some assignments
    +    *  - new unassigned partitions have undefined offsets (e.g. 
EARLIEST_OFFSET sentinel value)
    +    *
    +    * Setting a timeout because the test will not finish if there is logic 
error with
    +    * the reassignment flow.
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout = 10000)
    +   public void testReassigningPartitionsWithoutDefinedOffsets() throws 
Exception {
    +           final String testTopic = "test-topic";
    +
    +           // -------- old partitions --------
    +
    +           KafkaTopicPartitionState<TopicPartition> oldPartition1 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 0), new 
TopicPartition(testTopic, 0));
    +           oldPartition1.setOffset(23L);
    +
    +           KafkaTopicPartitionState<TopicPartition> oldPartition2 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 1), new 
TopicPartition(testTopic, 1));
    +           oldPartition2.setOffset(32L);
    +
    +           List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = 
new ArrayList<>(2);
    +           oldPartitions.add(oldPartition1);
    +           oldPartitions.add(oldPartition2);
    +
    +           // -------- new partitions with undefined offsets --------
    +
    +           KafkaTopicPartitionState<TopicPartition> newPartition = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 2), new 
TopicPartition(testTopic, 2));
    +           
newPartition.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +           List<KafkaTopicPartitionState<TopicPartition>> totalPartitions 
= new ArrayList<>(3);
    +           totalPartitions.add(oldPartition1);
    +           totalPartitions.add(oldPartition2);
    +           totalPartitions.add(newPartition);
    +
    +           // -------- setup mock KafkaConsumer --------
    +
    +           // has initial assignments
    +           final Map<TopicPartition, Long> 
mockConsumerAssignmentsAndPositions = new HashMap<>();
    +           for (KafkaTopicPartitionState<TopicPartition> oldPartition : 
oldPartitions) {
    +                   
mockConsumerAssignmentsAndPositions.put(oldPartition.getKafkaPartitionHandle(), 
oldPartition.getOffset() + 1);
    +           }
    +
    +           // mock retrieved values that should replace the 
EARLIEST_OFFSET sentinels
    +           final Map<TopicPartition, Long> mockRetrievedPositions = new 
HashMap<>();
    +           
mockRetrievedPositions.put(newPartition.getKafkaPartitionHandle(), 30L);
    +
    +           final KafkaConsumer<byte[], byte[]> mockConsumer = 
createMockConsumer(
    +                           mockConsumerAssignmentsAndPositions,
    +                           mockRetrievedPositions,
    +                           false,
    +                           null,
    +                           null);
    +
    +           // -------- setup new partitions to be polled from the 
unassigned partitions queue --------
    +
    +           final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> 
unassignedPartitionsQueue =
    +                   new ClosableBlockingQueue<>();
    +
    +           unassignedPartitionsQueue.add(newPartition);
    +
    +           // -------- start test --------
    +
    +           final TestKafkaConsumerThread testThread =
    +                   new TestKafkaConsumerThread(mockConsumer, 
unassignedPartitionsQueue, new Handover());
    +           testThread.start();
    +
    +           testThread.startPartitionReassignment();
    +           testThread.waitPartitionReassignmentComplete();
    +
    +           // the sentinel offset states should have been replaced with 
defined values according to the retrieved positions
    +           
assertEquals(mockRetrievedPositions.get(newPartition.getKafkaPartitionHandle()) 
- 1, newPartition.getOffset());
    +
    +           // verify that the consumer called assign() with all new 
partitions, and that positions are correctly advanced
    +
    +           assertEquals(totalPartitions.size(), 
mockConsumerAssignmentsAndPositions.size());
    +
    +           // old partitions should be re-seeked to their previous 
positions
    +           for (KafkaTopicPartitionState<TopicPartition> partition : 
totalPartitions) {
    +                   
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(partition.getKafkaPartitionHandle()));
    +
    +                   // should be seeked to (offset in state + 1) because 
offsets in state represent the last processed record
    +                   assertEquals(
    +                           partition.getOffset() + 1,
    +                           
mockConsumerAssignmentsAndPositions.get(partition.getKafkaPartitionHandle()).longValue());
    +           }
    +
    +           assertEquals(0, unassignedPartitionsQueue.size());
    +   }
    +
    +   /**
    +    * Tests reassignment works correctly in the case when:
    +    *  - the consumer already have some assignments
    +    *  - new unassigned partitions already have defined offsets
    +    *  - the consumer was woken up prior to the reassignment
    +    *
    +    * In this case, reassignment should not have occurred at all, and the 
consumer retains the original assignment.
    +    *
    +    * Setting a timeout because the test will not finish if there is logic 
error with
    +    * the reassignment flow.
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout = 10000)
    +   public void 
testReassigningPartitionsWithDefinedOffsetsWhenEarlyWakeup() throws Exception {
    +           final String testTopic = "test-topic";
    +
    +           // -------- old partitions --------
    +
    +           KafkaTopicPartitionState<TopicPartition> oldPartition1 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 0), new 
TopicPartition(testTopic, 0));
    +           oldPartition1.setOffset(23L);
    +
    +           KafkaTopicPartitionState<TopicPartition> oldPartition2 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 1), new 
TopicPartition(testTopic, 1));
    +           oldPartition2.setOffset(32L);
    +
    +           List<KafkaTopicPartitionState<TopicPartition>> oldPartitions = 
new ArrayList<>(2);
    +           oldPartitions.add(oldPartition1);
    +           oldPartitions.add(oldPartition2);
    +
    +           // -------- new partitions with defined offsets --------
    +
    +           KafkaTopicPartitionState<TopicPartition> newPartition = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 2), new 
TopicPartition(testTopic, 2));
    +           newPartition.setOffset(29L);
    +
    +           // -------- setup mock KafkaConsumer --------
    +
    +           // initial assignments
    +           final Map<TopicPartition, Long> 
mockConsumerAssignmentsToPositions = new LinkedHashMap<>();
    +           for (KafkaTopicPartitionState<TopicPartition> oldPartition : 
oldPartitions) {
    +                   
mockConsumerAssignmentsToPositions.put(oldPartition.getKafkaPartitionHandle(), 
oldPartition.getOffset() + 1);
    +           }
    +
    +           final KafkaConsumer<byte[], byte[]> mockConsumer = 
createMockConsumer(
    +                           mockConsumerAssignmentsToPositions,
    +                           Collections.<TopicPartition, Long>emptyMap(),
    +                           true,
    +                           null,
    +                           null);
    +
    +           // -------- setup new partitions to be polled from the 
unassigned partitions queue --------
    +
    +           final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> 
unassignedPartitionsQueue =
    +                   new ClosableBlockingQueue<>();
    +
    +           unassignedPartitionsQueue.add(newPartition);
    +
    +           // -------- start test --------
    +
    +           final TestKafkaConsumerThread testThread =
    +                   new TestKafkaConsumerThread(mockConsumer, 
unassignedPartitionsQueue, new Handover());
    +           testThread.start();
    +
    +           // pause just before the reassignment so we can inject the 
wakeup
    +           testThread.waitPartitionReassignmentInvoked();
    +
    +           testThread.setOffsetsToCommit(new HashMap<TopicPartition, 
OffsetAndMetadata>());
    +           verify(mockConsumer, times(1)).wakeup();
    +
    +           testThread.startPartitionReassignment();
    +           testThread.waitPartitionReassignmentComplete();
    +
    +           // the consumer's assignment should have remained untouched
    +
    +           assertEquals(oldPartitions.size(), 
mockConsumerAssignmentsToPositions.size());
    +
    +           for (KafkaTopicPartitionState<TopicPartition> oldPartition : 
oldPartitions) {
    +                   
assertTrue(mockConsumerAssignmentsToPositions.containsKey(oldPartition.getKafkaPartitionHandle()));
    +                   assertEquals(
    +                                   oldPartition.getOffset() + 1,
    +                                   
mockConsumerAssignmentsToPositions.get(oldPartition.getKafkaPartitionHandle()).longValue());
    +           }
    +
    +           // the new partitions should have been re-added to the 
unassigned partitions queue
    +           assertEquals(1, unassignedPartitionsQueue.size());
    +   }
    +
    +   /**
    +    * Tests reassignment works correctly in the case when:
    +    *  - the consumer has no initial assignments
    +    *  - new unassigned partitions have undefined offsets
    +    *  - the consumer was woken up prior to the reassignment
    +    *
    +    * In this case, reassignment should not have occurred at all, and the 
consumer retains the original assignment.
    +    *
    +    * Setting a timeout because the test will not finish if there is logic 
error with
    +    * the reassignment flow.
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout = 10000)
    +   public void 
testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenEarlyWakeup() 
throws Exception {
    +           final String testTopic = "test-topic";
    +
    +           // -------- new partitions with defined offsets --------
    +
    +           KafkaTopicPartitionState<TopicPartition> newPartition1 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 0), new 
TopicPartition(testTopic, 0));
    +           
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +           KafkaTopicPartitionState<TopicPartition> newPartition2 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 1), new 
TopicPartition(testTopic, 1));
    +           
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +           List<KafkaTopicPartitionState<TopicPartition>> newPartitions = 
new ArrayList<>(2);
    +           newPartitions.add(newPartition1);
    +           newPartitions.add(newPartition2);
    +
    +           // -------- setup mock KafkaConsumer --------
    +
    +           // no initial assignments
    +           final Map<TopicPartition, Long> 
mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +           // mock retrieved values that should replace the 
EARLIEST_OFFSET sentinels
    +           final Map<TopicPartition, Long> mockRetrievedPositions = new 
HashMap<>();
    +           
mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
    +           
mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
    +
    +           final KafkaConsumer<byte[], byte[]> mockConsumer = 
createMockConsumer(
    +                           mockConsumerAssignmentsAndPositions,
    +                           mockRetrievedPositions,
    +                           true,
    +                           null,
    +                           null);
    +
    +           // -------- setup new partitions to be polled from the 
unassigned partitions queue --------
    +
    +           final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> 
unassignedPartitionsQueue =
    +                   new ClosableBlockingQueue<>();
    +
    +           for (KafkaTopicPartitionState<TopicPartition> newPartition : 
newPartitions) {
    +                   unassignedPartitionsQueue.add(newPartition);
    +           }
    +
    +           // -------- start test --------
    +
    +           final TestKafkaConsumerThread testThread =
    +                   new TestKafkaConsumerThread(mockConsumer, 
unassignedPartitionsQueue, new Handover());
    +           testThread.start();
    +
    +           // pause just before the reassignment so we can inject the 
wakeup
    +           testThread.waitPartitionReassignmentInvoked();
    +
    +           testThread.setOffsetsToCommit(new HashMap<TopicPartition, 
OffsetAndMetadata>());
    +
    +           // make sure the consumer was actually woken up
    +           verify(mockConsumer, times(1)).wakeup();
    +
    +           testThread.startPartitionReassignment();
    +           testThread.waitPartitionReassignmentComplete();
    +
    +           // the consumer's assignment should have remained untouched (in 
this case, empty)
    +           assertEquals(0, mockConsumerAssignmentsAndPositions.size());
    +
    +           // the new partitions should have been re-added to the 
unassigned partitions queue
    +           assertEquals(2, unassignedPartitionsQueue.size());
    +   }
    +
    +   /**
    +    * Tests reassignment works correctly in the case when:
    +    *  - the consumer has no initial assignments
    +    *  - new unassigned partitions have undefined offsets
    +    *  - the consumer was woken up during the reassignment
    +    *
    +    * In this case, reassignment should have completed, and the consumer 
is restored the wakeup call after the reassignment.
    +    *
    +    * Setting a timeout because the test will not finish if there is logic 
error with
    +    * the reassignment flow.
    +    */
    +   @SuppressWarnings("unchecked")
    +   @Test(timeout = 10000)
    +   public void 
testReassignPartitionsDefinedOffsetsWithoutInitialAssignmentsWhenWakeupMidway() 
throws Exception {
    +           final String testTopic = "test-topic";
    +
    +           // -------- new partitions with defined offsets --------
    +
    +           KafkaTopicPartitionState<TopicPartition> newPartition1 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 0), new 
TopicPartition(testTopic, 0));
    +           
newPartition1.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +           KafkaTopicPartitionState<TopicPartition> newPartition2 = new 
KafkaTopicPartitionState<>(
    +                   new KafkaTopicPartition(testTopic, 1), new 
TopicPartition(testTopic, 1));
    +           
newPartition2.setOffset(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
    +
    +           List<KafkaTopicPartitionState<TopicPartition>> newPartitions = 
new ArrayList<>(2);
    +           newPartitions.add(newPartition1);
    +           newPartitions.add(newPartition2);
    +
    +           // -------- setup mock KafkaConsumer --------
    +
    +           // no initial assignments
    +           final Map<TopicPartition, Long> 
mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
    +
    +           // mock retrieved values that should replace the 
EARLIEST_OFFSET sentinels
    +           final Map<TopicPartition, Long> mockRetrievedPositions = new 
HashMap<>();
    +           
mockRetrievedPositions.put(newPartition1.getKafkaPartitionHandle(), 23L);
    +           
mockRetrievedPositions.put(newPartition2.getKafkaPartitionHandle(), 32L);
    +
    +           // these latches are used to pause midway the reassignment 
process
    +           final OneShotLatch midAssignmentLatch = new OneShotLatch();
    +           final OneShotLatch continueAssigmentLatch = new OneShotLatch();
    +
    +           final KafkaConsumer<byte[], byte[]> mockConsumer = 
createMockConsumer(
    +                           mockConsumerAssignmentsAndPositions,
    +                           mockRetrievedPositions,
    +                           false,
    +                           midAssignmentLatch,
    +                           continueAssigmentLatch);
    +
    +           // -------- setup new partitions to be polled from the 
unassigned partitions queue --------
    +
    +           final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> 
unassignedPartitionsQueue =
    +                   new ClosableBlockingQueue<>();
    +
    +           for (KafkaTopicPartitionState<TopicPartition> newPartition : 
newPartitions) {
    +                   unassignedPartitionsQueue.add(newPartition);
    +           }
    +
    +           // -------- start test --------
    +
    +           final TestKafkaConsumerThread testThread =
    +                   new TestKafkaConsumerThread(mockConsumer, 
unassignedPartitionsQueue, new Handover());
    +           testThread.start();
    +
    +           testThread.startPartitionReassignment();
    +
    +           // wait until the reassignment has started
    +           midAssignmentLatch.await();
    +
    +           testThread.setOffsetsToCommit(new HashMap<TopicPartition, 
OffsetAndMetadata>());
    +
    +           // the wakeup the setOffsetsToCommit() call should have been 
buffered, and not called on the consumer
    +           verify(mockConsumer, never()).wakeup();
    +
    +           continueAssigmentLatch.trigger();
    +
    +           testThread.waitPartitionReassignmentComplete();
    +
    +           // verify that the consumer called assign() with all new 
partitions, and that positions are correctly advanced
    +
    +           assertEquals(newPartitions.size(), 
mockConsumerAssignmentsAndPositions.size());
    +
    +           for (KafkaTopicPartitionState<TopicPartition> newPartition : 
newPartitions) {
    +                   
assertTrue(mockConsumerAssignmentsAndPositions.containsKey(newPartition.getKafkaPartitionHandle()));
    +
    +                   // should be seeked to (offset in state + 1) because 
offsets in state represent the last processed record
    +                   assertEquals(
    +                           newPartition.getOffset() + 1,
    +                           
mockConsumerAssignmentsAndPositions.get(newPartition.getKafkaPartitionHandle()).longValue());
    +           }
    +
    +           // after the reassignment, the consumer should be restored the 
wakeup call
    +           verify(mockConsumer, times(1)).wakeup();
    +
    +           assertEquals(0, unassignedPartitionsQueue.size());
    +   }
    +
    +   /**
    +    * A testable {@link KafkaConsumerThread} that injects multiple latches 
exactly before and after
    +    * partition reassignment, so that tests are eligible to setup various 
conditions before the reassignment happens
    +    * and inspect reqssignment results after it is completed.
    --- End diff --
    
    Typo "reqssignment" 


> Partition discovery / regex topic subscription for the Kafka consumer
> ---------------------------------------------------------------------
>
>                 Key: FLINK-4022
>                 URL: https://issues.apache.org/jira/browse/FLINK-4022
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector, Streaming Connectors
>    Affects Versions: 1.0.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to