[ https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Victor Wong updated FLINK-15450: -------------------------------- Description: If the user did not specify a custom name to the source, e.g. Kafka source, Flink would use the default name "Custom Source", which was not intuitive (Sink was the same). {code:java} Source: Custom Source -> Filter -> Map -> Sink: Unnamed {code} If we could add the Kafka topic information to the default Source/Sink name, it would be very helpful to catch the consuming/publishing topic quickly, like this: {code:java} Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 {code} *Suggestion* (forgive me if it makes too many changes) 1. Add a `name` method to interface `Function` {code:java} public interface Function extends java.io.Serializable { default String name() { return ""; } } {code} 2. Source/Sink/Other functions override this method depending on their needs. {code:java} class FlinkKafkaConsumerBase { String name() { return this.topicsDescriptor.toString(); } } {code} 3. Use Function#name if the returned value is not empty. {code:java} // StreamExecutionEnvironment public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) { String sourceName = function.name(); if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { sourceName = "Custom Source"; } return addSource(function, sourceName); } {code} was: If the user did not specify a custom name to the source, e.g. Kafka source, Flink would use the default name "Custom Source", which was not intuitive (Sink was the same). {code:java} Source: Custom Source -> Filter -> Map -> Sink: Unnamed {code} If we could add the Kafka topic information to the default Source/Sink name, it would be very helpful to catch the consuming/publishing topic quickly, like this: {code:java} Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 {code} *Suggesion* (forgive me if it makes too much changes) 1. Add a `name` method to interface `Function` {code:java} public interface Function extends java.io.Serializable { default String name() { return ""; } } {code} 2. Source/Sink/Other functions override this method depending on their needs. {code:java} class FlinkKafkaConsumerBase { String name() { return this.topicsDescriptor.toString(); } } {code} 3. Use Function#name if the returned value is not empty. {code:java} // StreamExecutionEnvironment public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) { String sourceName = function.name(); if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { sourceName = "Custom Source"; } return addSource(function, sourceName); } {code} > Add kafka topic information to Kafka source name on Flink UI > ------------------------------------------------------------ > > Key: FLINK-15450 > URL: https://issues.apache.org/jira/browse/FLINK-15450 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Reporter: Victor Wong > Priority: Major > > If the user did not specify a custom name to the source, e.g. Kafka source, > Flink would use the default name "Custom Source", which was not intuitive > (Sink was the same). > {code:java} > Source: Custom Source -> Filter -> Map -> Sink: Unnamed > {code} > If we could add the Kafka topic information to the default Source/Sink name, > it would be very helpful to catch the consuming/publishing topic quickly, > like this: > {code:java} > Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 > {code} > *Suggestion* (forgive me if it makes too many changes) > 1. Add a `name` method to interface `Function` > {code:java} > public interface Function extends java.io.Serializable { > default String name() { return ""; } > } > {code} > 2. Source/Sink/Other functions override this method depending on their needs. > {code:java} > class FlinkKafkaConsumerBase { > String name() { > return this.topicsDescriptor.toString(); > } > } > {code} > 3. Use Function#name if the returned value is not empty. > {code:java} > // StreamExecutionEnvironment > public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> > function) { > String sourceName = function.name(); > if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { > sourceName = "Custom Source"; > } > return addSource(function, sourceName); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)