[ 
https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-15450:
--------------------------------
    Description: 
If the user did not specify a custom name to the source, e.g. Kafka source, 
Flink would use the default name "Custom Source", which was not intuitive (Sink 
was the same).


{code:java}
Source: Custom Source -> Filter -> Map -> Sink: Unnamed
{code}

If we could add the Kafka topic information to the default Source/Sink name, it 
would be very helpful to catch the consuming/publishing topic quickly, like 
this:

{code:java}
Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
{code}

*Suggestion* (forgive me if it makes too many changes)

1. Add a `name` method to interface `Function`

{code:java}
public interface Function extends java.io.Serializable {
        default String name() { return ""; }
}
{code}

2. Source/Sink/Other functions override this method depending on their needs.

{code:java}
class FlinkKafkaConsumerBase {

String name() {
  return this.topicsDescriptor.toString();
}

}
{code}

3. Use Function#name if the returned value is not empty.

{code:java}
// StreamExecutionEnvironment
        public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> 
function) {
                String sourceName = function.name();
                if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
                        sourceName = "Custom Source";
                }
                return addSource(function, sourceName);
        }
{code}


  was:
If the user did not specify a custom name to the source, e.g. Kafka source, 
Flink would use the default name "Custom Source", which was not intuitive (Sink 
was the same).


{code:java}
Source: Custom Source -> Filter -> Map -> Sink: Unnamed
{code}

If we could add the Kafka topic information to the default Source/Sink name, it 
would be very helpful to catch the consuming/publishing topic quickly, like 
this:

{code:java}
Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
{code}

*Suggesion* (forgive me if it makes too much changes)

1. Add a `name` method to interface `Function`

{code:java}
public interface Function extends java.io.Serializable {
        default String name() { return ""; }
}
{code}

2. Source/Sink/Other functions override this method depending on their needs.

{code:java}
class FlinkKafkaConsumerBase {

String name() {
  return this.topicsDescriptor.toString();
}

}
{code}

3. Use Function#name if the returned value is not empty.

{code:java}
// StreamExecutionEnvironment
        public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> 
function) {
                String sourceName = function.name();
                if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
                        sourceName = "Custom Source";
                }
                return addSource(function, sourceName);
        }
{code}



> Add kafka topic information to Kafka source name on Flink UI
> ------------------------------------------------------------
>
>                 Key: FLINK-15450
>                 URL: https://issues.apache.org/jira/browse/FLINK-15450
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>            Reporter: Victor Wong
>            Priority: Major
>
> If the user did not specify a custom name to the source, e.g. Kafka source, 
> Flink would use the default name "Custom Source", which was not intuitive 
> (Sink was the same).
> {code:java}
> Source: Custom Source -> Filter -> Map -> Sink: Unnamed
> {code}
> If we could add the Kafka topic information to the default Source/Sink name, 
> it would be very helpful to catch the consuming/publishing topic quickly, 
> like this:
> {code:java}
> Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
> {code}
> *Suggestion* (forgive me if it makes too many changes)
> 1. Add a `name` method to interface `Function`
> {code:java}
> public interface Function extends java.io.Serializable {
>       default String name() { return ""; }
> }
> {code}
> 2. Source/Sink/Other functions override this method depending on their needs.
> {code:java}
> class FlinkKafkaConsumerBase {
> String name() {
>   return this.topicsDescriptor.toString();
> }
> }
> {code}
> 3. Use Function#name if the returned value is not empty.
> {code:java}
> // StreamExecutionEnvironment
>       public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> 
> function) {
>               String sourceName = function.name();
>               if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
>                       sourceName = "Custom Source";
>               }
>               return addSource(function, sourceName);
>       }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to