AHeise commented on a change in pull request #17924: URL: https://github.com/apache/flink/pull/17924#discussion_r762756877
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java ########## @@ -242,6 +243,14 @@ public String getOperatorName() { return operatorName; } + public String getOperatorDesc() { + return null == operatorDesc ? operatorName : operatorDesc; Review comment: I'd move this logic to call-site. Either while creating the StreamNode or while reading the StreamNode. ########## File path: flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java ########## @@ -119,6 +119,8 @@ public static int getNewNodeId() { protected String name; + protected String desc; Review comment: Please do to also keep getter and field in sync. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ########## @@ -419,4 +419,17 @@ private boolean canBeParallel() { new SideOutputTransformation<>(this.getTransformation(), sideOutputTag); return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation); } + + /** + * Sets the description for this operation. the description is used in json plan, is expected to + * provide more detailed information about the operation than name. + * + * @param desc The description for this operation. + * @return The operation with new description. + */ + @PublicEvolving + public SingleOutputStreamOperator<T> setDescription(String desc) { + transformation.setDescription(desc); + return this; + } Review comment: Should this be directly exposed on `DataStream`? For which operator wouldn't you want it to be set. ########## File path: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ########## @@ -1225,4 +1225,17 @@ class DataStream[T](stream: JavaStream[T]) { operator: OneInputStreamOperator[T, R]): DataStream[R] = { asScalaStream(stream.transform(operatorName, implicitly[TypeInformation[R]], operator)) } + + /** + * Sets the description of the current data stream. This description is in json plan, + * is expected to provide more detailed information about the operation than name. + * + * @return The operator with new description + */ + @PublicEvolving + def setDescription(desc: String) : DataStream[T] = stream match { + case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.setDescription(desc)) + case _ => throw new UnsupportedOperationException("Only supported for operators.") + this + } Review comment: If we don't really need it in DataStream (see top-level comment), please remove. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java ########## @@ -65,6 +65,7 @@ private final Set<ManagedMemoryUseCase> managedMemorySlotScopeUseCases = new HashSet<>(); private long bufferTimeout; private final String operatorName; + private @Nullable String operatorDesc; Review comment: Please do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org