[ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974161#comment-15974161 ]
ASF GitHub Bot commented on FLINK-4821: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112123126 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java --- @@ -559,48 +699,298 @@ public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoi @Test @SuppressWarnings("unchecked") + public void testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws Exception { + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all"); + + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + List<KinesisStreamShard> shards = new ArrayList<>(); + shards.addAll(fakeRestoredState.keySet()); + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); + + // assume the given config is correct + PowerMockito.mockStatic(KinesisConfigUtil.class); + PowerMockito.doNothing().when(KinesisConfigUtil.class); + + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( + "fakeStream", new Properties(), 10, 2); + consumer.restoreState(fakeRestoredState); + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); + + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true); + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + } + } + + @Test + @SuppressWarnings("unchecked") public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception { + // ---------------------------------------------------------------------- + // setting initial state + // ---------------------------------------------------------------------- + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all"); + + // ---------------------------------------------------------------------- + // mock operator state backend and initial state for initializeState() + // ---------------------------------------------------------------------- + TestingListState<Serializable> listState = new TestingListState<>(); + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + } + + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.isRestored()).thenReturn(true); + + // ---------------------------------------------------------------------- + // mock fetcher + // ---------------------------------------------------------------------- KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + List<KinesisStreamShard> shards = new ArrayList<>(); + shards.addAll(fakeRestoredState.keySet()); + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); // assume the given config is correct PowerMockito.mockStatic(KinesisConfigUtil.class); PowerMockito.doNothing().when(KinesisConfigUtil.class); - HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>(); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream1", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream1", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream1", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream2", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), - new SequenceNumber(UUID.randomUUID().toString())); - fakeRestoredState.put( - new KinesisStreamShard("fakeStream2", - new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), - new SequenceNumber(UUID.randomUUID().toString())); + // ---------------------------------------------------------------------- + // start to test seed initial state to fetcher + // ---------------------------------------------------------------------- + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( + "fakeStream", new Properties(), 10, 2); + consumer.initializeState(initializationContext); + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); + + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true); + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exception { + // ---------------------------------------------------------------------- + // setting initial state + // ---------------------------------------------------------------------- + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1"); + + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2"); + + // ---------------------------------------------------------------------- + // mock operator state backend and initial state for initializeState() + // ---------------------------------------------------------------------- + TestingListState<Serializable> listState = new TestingListState<>(); + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + } + for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredStateForOthers.entrySet()) { + listState.add(Tuple2.of(state.getKey(), state.getValue())); + } + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.isRestored()).thenReturn(true); + + // ---------------------------------------------------------------------- + // mock fetcher + // ---------------------------------------------------------------------- + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + List<KinesisStreamShard> shards = new ArrayList<>(); + shards.addAll(fakeRestoredState.keySet()); + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); + + // assume the given config is correct + PowerMockito.mockStatic(KinesisConfigUtil.class); + PowerMockito.doNothing().when(KinesisConfigUtil.class); + + // ---------------------------------------------------------------------- + // start to test seed initial state to fetcher + // ---------------------------------------------------------------------- TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( "fakeStream", new Properties(), 10, 2); - consumer.restoreState(fakeRestoredState); + consumer.initializeState(initializationContext); consumer.open(new Configuration()); consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true); + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) { + // should never get restored state not belonging to itself + Mockito.verify(mockedFetcher, never()).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + } for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + // should get restored state belonging to itself Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); Mockito.verify(mockedFetcher).registerNewSubscribedShardState( new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); } } + + @Test + @SuppressWarnings("unchecked") --- End diff -- Should place these annotations after the comment block. I think that's the usual convention. > Implement rescalable non-partitioned state for Kinesis Connector > ---------------------------------------------------------------- > > Key: FLINK-4821 > URL: https://issues.apache.org/jira/browse/FLINK-4821 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > > FLINK-4379 added the rescalable non-partitioned state feature, along with the > implementation for the Kafka connector. > The AWS Kinesis connector will benefit from the feature and should implement > it too. This ticket tracks progress for this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)