[ https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Victor Wong updated FLINK-15450: -------------------------------- Component/s: API / Core > Add kafka topic information to Kafka source name on Flink UI > ------------------------------------------------------------ > > Key: FLINK-15450 > URL: https://issues.apache.org/jira/browse/FLINK-15450 > Project: Flink > Issue Type: Improvement > Components: API / Core, Connectors / Kafka > Reporter: Victor Wong > Priority: Major > > If the user did not specify a custom name to the source, e.g. Kafka source, > Flink would use the default name "Custom Source", which was not intuitive > (Sink was the same). > {code:java} > Source: Custom Source -> Filter -> Map -> Sink: Unnamed > {code} > If we could add the Kafka topic information to the default Source/Sink name, > it would be very helpful to catch the consuming/publishing topic quickly, > like this: > {code:java} > Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 > {code} > *Suggestion* (forgive me if it makes too many changes) > 1. Add a `name` method to interface `Function` > {code:java} > public interface Function extends java.io.Serializable { > default String name() { return ""; } > } > {code} > 2. Source/Sink/Other functions override this method depending on their needs. > {code:java} > class FlinkKafkaConsumerBase { > String name() { > return this.topicsDescriptor.toString(); > } > } > {code} > 3. Use Function#name if the returned value is not empty. > {code:java} > // StreamExecutionEnvironment > public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> > function) { > String sourceName = function.name(); > if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { > sourceName = "Custom Source"; > } > return addSource(function, sourceName); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)