Thanks Fabian for replying. But I am using KafkaSource only. The code is something like below.
class MetricSource { final Set<String> metricSdms = new HashSet(); ... env.addSource(MetricKafkaSourceFactory.createConsumer(jobParams)) .name(MetricSource.class.getSimpleName()) .uid(MetricSource.class.getSimpleName()) .filter(sdm -> metricSdms.contains(sdm.getType())); } class MetricKafkaSourceFactory { public static FlinkKafkaConsumer<SelfDescribingMessageDO> createConsumer(final Configuration jobParams) { ... return new FlinkKafkaConsumer<>(topic, new DeserializationSchema(), props); } } On Wed, Jun 23, 2021 at 7:31 PM Fabian Paul <fabianp...@data-artisans.com> wrote: > Hi Debraj, > > By Source Legacy Thread we refer to all sources which do not implement the > new interface yet [1]. Currently only the Hive, Kafka and FileSource > are already migrated. In general, there is no sever downside of using the > older source but in the future we plan only to implement ones based on > the new operator model. > > Best, > Fabian > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > >