dannycranmer commented on a change in pull request #13102: URL: https://github.com/apache/flink/pull/13102#discussion_r472119646
########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ########## @@ -389,25 +424,54 @@ private RecordEmitter createRecordEmitter(Properties configProps) { * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to * @param subscribedShard the shard this consumer is subscribed to * @param lastSequenceNum the sequence number in the shard to start consuming - * @param shardMetricsReporter the reporter to report metrics to + * @param metricGroup the metric group to report metrics to * @return shard consumer */ protected ShardConsumer<T> createShardConsumer( - Integer subscribedShardStateIndex, - StreamShardHandle subscribedShard, - SequenceNumber lastSequenceNum, - ShardMetricsReporter shardMetricsReporter, - KinesisDeserializationSchema<T> shardDeserializer) { + final Integer subscribedShardStateIndex, + final StreamShardHandle subscribedShard, + final SequenceNumber lastSequenceNum, + final MetricGroup metricGroup, + final KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException { + return new ShardConsumer<>( this, + createRecordPublisher(lastSequenceNum, configProps, metricGroup, subscribedShard), subscribedShardStateIndex, subscribedShard, lastSequenceNum, - this.kinesisProxyFactory.create(configProps), - shardMetricsReporter, + new ShardConsumerMetricsReporter(metricGroup), shardDeserializer); } + protected RecordPublisherFactory createRecordPublisherFactory() { + RecordPublisherType recordPublisherType = RecordPublisherType.valueOf( + configProps.getProperty(RECORD_PUBLISHER_TYPE, POLLING.name())); + + switch (recordPublisherType) { + case EFO: + return new FanOutRecordPublisherFactory(kinesisProxyV2Factory.create(configProps)); + case POLLING: Review comment: @xiaolong-sn How so? Please elaborate ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org