Victor Wong created FLINK-15450: ----------------------------------- Summary: Add kafka topic information to Kafka source Key: FLINK-15450 URL: https://issues.apache.org/jira/browse/FLINK-15450 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: Victor Wong
If the user did not specify a custom name to the source, e.g. Kafka source, Flink would use the default name "Custom Source", which was not intuitive (Sink was the same). {code:java} Source: Custom Source -> Filter -> Map -> Sink: Unnamed {code} If we could add the Kafka topic information to the default Source/Sink name, it would be very helpful to catch the consuming/publishing topic quickly, like this: {code:java} Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 {code} *Suggesion* (forgive me if it makes too much changes) 1. Add a `name` method to interface `Function` {code:java} public interface Function extends java.io.Serializable { default String name() { return ""; } } {code} 2. Source/Sink/Other functions override this method depending on their needs. {code:java} class FlinkKafkaConsumerBase { String name() { return this.topicsDescriptor.toString(); } } {code} 3. Use Function#name if the returned value is not empty. {code:java} // StreamExecutionEnvironment public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) { String sourceName = function.name(); if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { sourceName = "Custom Source"; } return addSource(function, sourceName); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)