dannycranmer commented on a change in pull request #13102:
URL: https://github.com/apache/flink/pull/13102#discussion_r472119646



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -389,25 +424,54 @@ private RecordEmitter createRecordEmitter(Properties 
configProps) {
         * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
         * @param subscribedShard the shard this consumer is subscribed to
         * @param lastSequenceNum the sequence number in the shard to start 
consuming
-        * @param shardMetricsReporter the reporter to report metrics to
+        * @param metricGroup the metric group to report metrics to
         * @return shard consumer
         */
        protected ShardConsumer<T> createShardConsumer(
-               Integer subscribedShardStateIndex,
-               StreamShardHandle subscribedShard,
-               SequenceNumber lastSequenceNum,
-               ShardMetricsReporter shardMetricsReporter,
-               KinesisDeserializationSchema<T> shardDeserializer) {
+                       final Integer subscribedShardStateIndex,
+                       final StreamShardHandle subscribedShard,
+                       final SequenceNumber lastSequenceNum,
+                       final MetricGroup metricGroup,
+                       final KinesisDeserializationSchema<T> 
shardDeserializer) throws InterruptedException {
+
                return new ShardConsumer<>(
                        this,
+                       createRecordPublisher(lastSequenceNum, configProps, 
metricGroup, subscribedShard),
                        subscribedShardStateIndex,
                        subscribedShard,
                        lastSequenceNum,
-                       this.kinesisProxyFactory.create(configProps),
-                       shardMetricsReporter,
+                       new ShardConsumerMetricsReporter(metricGroup),
                        shardDeserializer);
        }
 
+       protected RecordPublisherFactory createRecordPublisherFactory() {
+               RecordPublisherType recordPublisherType = 
RecordPublisherType.valueOf(
+                       configProps.getProperty(RECORD_PUBLISHER_TYPE, 
POLLING.name()));
+
+               switch (recordPublisherType) {
+                       case EFO:
+                               return new 
FanOutRecordPublisherFactory(kinesisProxyV2Factory.create(configProps));
+                       case POLLING:

Review comment:
       @xiaolong-sn How so? Please elaborate




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to