dannycranmer commented on a change in pull request #13102:
URL: https://github.com/apache/flink/pull/13102#discussion_r472119011



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -330,23 +359,26 @@ public KinesisDataFetcher(List<String> streams,
                        new AtomicReference<>(),
                        new ArrayList<>(),
                        
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
-                       KinesisProxy::create);
+                       KinesisProxy::create,
+                       KinesisDataFetcher::createKinesisProxyV2);
        }
 
        @VisibleForTesting
-       protected KinesisDataFetcher(List<String> streams,
-                                                               
SourceFunction.SourceContext<T> sourceContext,
-                                                               Object 
checkpointLock,
-                                                               RuntimeContext 
runtimeContext,
-                                                               Properties 
configProps,
-                                                               
KinesisDeserializationSchema<T> deserializationSchema,
-                                                               
KinesisShardAssigner shardAssigner,
-                                                               
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
-                                                               
WatermarkTracker watermarkTracker,
-                                                               
AtomicReference<Throwable> error,
-                                                               
List<KinesisStreamShardState> subscribedShardsState,
-                                                               HashMap<String, 
String> subscribedStreamsToLastDiscoveredShardIds,
-                                                               
FlinkKinesisProxyFactory kinesisProxyFactory) {
+       protected KinesisDataFetcher(
+                       final List<String> streams,
+                       final SourceFunction.SourceContext<T> sourceContext,
+                       final Object checkpointLock,
+                       final RuntimeContext runtimeContext,
+                       final Properties configProps,
+                       final KinesisDeserializationSchema<T> 
deserializationSchema,
+                       final KinesisShardAssigner shardAssigner,
+                       final AssignerWithPeriodicWatermarks<T> 
periodicWatermarkAssigner,
+                       final WatermarkTracker watermarkTracker,
+                       final AtomicReference<Throwable> error,
+                       final List<KinesisStreamShardState> 
subscribedShardsState,
+                       final HashMap<String, String> 
subscribedStreamsToLastDiscoveredShardIds,
+                       final FlinkKinesisProxyFactory kinesisProxyFactory,
+                       @Nullable final FlinkKinesisProxyV2Factory 
kinesisProxyV2Factory) {

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to