AHeise commented on a change in pull request #17924:
URL: https://github.com/apache/flink/pull/17924#discussion_r762756877



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -242,6 +243,14 @@ public String getOperatorName() {
         return operatorName;
     }
 
+    public String getOperatorDesc() {
+        return null == operatorDesc ? operatorName : operatorDesc;

Review comment:
       I'd move this logic to call-site. Either while creating the StreamNode 
or while reading the StreamNode.

##########
File path: flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
##########
@@ -119,6 +119,8 @@ public static int getNewNodeId() {
 
     protected String name;
 
+    protected String desc;

Review comment:
       Please do to also keep getter and field in sync.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
##########
@@ -419,4 +419,17 @@ private boolean canBeParallel() {
                 new SideOutputTransformation<>(this.getTransformation(), 
sideOutputTag);
         return new DataStream<>(this.getExecutionEnvironment(), 
sideOutputTransformation);
     }
+
+    /**
+     * Sets the description for this operation. the description is used in 
json plan, is expected to
+     * provide more detailed information about the operation than name.
+     *
+     * @param desc The description for this operation.
+     * @return The operation with new description.
+     */
+    @PublicEvolving
+    public SingleOutputStreamOperator<T> setDescription(String desc) {
+        transformation.setDescription(desc);
+        return this;
+    }

Review comment:
       Should this be directly exposed on `DataStream`? For which operator 
wouldn't you want it to be set.

##########
File path: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
##########
@@ -1225,4 +1225,17 @@ class DataStream[T](stream: JavaStream[T]) {
       operator: OneInputStreamOperator[T, R]): DataStream[R] = {
     asScalaStream(stream.transform(operatorName, 
implicitly[TypeInformation[R]], operator))
   }
+
+  /**
+   * Sets the description of the current data stream. This description is in 
json plan,
+   * is expected to provide more detailed information about the operation than 
name.
+   *
+   * @return The operator with new description
+   */
+  @PublicEvolving
+  def setDescription(desc: String) : DataStream[T] = stream match {
+    case stream : SingleOutputStreamOperator[T] => 
asScalaStream(stream.setDescription(desc))
+    case _ => throw new UnsupportedOperationException("Only supported for 
operators.")
+      this
+  }

Review comment:
       If we don't really need it in DataStream (see top-level comment), please 
remove.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -65,6 +65,7 @@
     private final Set<ManagedMemoryUseCase> managedMemorySlotScopeUseCases = 
new HashSet<>();
     private long bufferTimeout;
     private final String operatorName;
+    private @Nullable String operatorDesc;

Review comment:
       Please do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to