Sxnan commented on code in PR #20147:
URL: https://github.com/apache/flink/pull/20147#discussion_r925262604


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java:
##########
@@ -21,22 +21,26 @@
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.util.Preconditions;
 
+import java.io.Serializable;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.OptionalInt;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.function.ToIntFunction;
 
 /** Container for meta-data of a data set. */
-public final class DataSetMetaInfo {
+public final class DataSetMetaInfo implements Serializable {
     private static final int UNKNOWN = -1;
 
     private final int numRegisteredPartitions;
     private final int numTotalPartitions;
     private final SortedMap<ResultPartitionID, ShuffleDescriptor>
             shuffleDescriptorsOrderByPartitionId =
                     new TreeMap<>(
-                            Comparator.comparingInt(o -> 
o.getPartitionId().getPartitionNumber()));
+                            Comparator.comparingInt(

Review Comment:
   This change is to specify that the lambda function is serializable so that 
the DataSetMetaInfo as a whole is serializable.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * When in batch mode, the {@link CacheTransformation} represents the 
intermediate result of the
+ * upper stream should be cached when it is computed at the first time. And it 
consumes the cached
+ * intermediate result in later jobs. In stream mode, it has no affect.
+ *
+ * @param <T>
+ */
+@Internal
+public class CacheTransformation<T> extends Transformation<T> {
+    private final Transformation<T> transformationToCache;
+    private final IntermediateDataSetID intermediateDataSetID;
+    private boolean isCached;
+    /**
+     * Creates a new {@code Transformation} with the given name, output type 
and parallelism.
+     *
+     * @param name The name of the {@code Transformation}, this will be shown 
in Visualizations and
+     *     the Log
+     * @param outputType The output type of this {@code Transformation}
+     * @param parallelism The parallelism of this {@code Transformation}
+     */
+    public CacheTransformation(
+            Transformation<T> transformationToCache,
+            String name,
+            TypeInformation<T> outputType,
+            int parallelism) {
+        super(name, outputType, parallelism);
+        this.transformationToCache = transformationToCache;
+
+        this.intermediateDataSetID = new IntermediateDataSetID();
+        this.isCached = false;
+    }
+
+    @Override
+    public List<Transformation<?>> getTransitivePredecessors() {
+        List<Transformation<?>> result = Lists.newArrayList();
+        if (isCached) {
+            return result;
+        }
+        result.add(this);

Review Comment:
   If I am not missing something, I think `this` should always be added if the 
cached is not produced yet.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CachedDataStream.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.CacheTransformation;
+
+/**
+ * {@link CachedDataStream} represents a {@link DataStream} whose intermediate 
result will be cached
+ * at the first time when it is computed. And the cached intermediate result 
can be used in later
+ * job that using the same {@link CachedDataStream} to avoid re-computing the 
intermediate result.
+ *
+ * @param <T> The type of the elements in this stream.
+ */
+@PublicEvolving
+public class CachedDataStream<T> extends DataStream<T> implements 
AutoCloseable {
+    /**
+     * Create a new {@link CachedDataStream} in the given execution 
environment that wrap the given
+     * physical transformation to indicates that the transformation should be 
cached.
+     *
+     * @param environment The StreamExecutionEnvironment
+     * @param transformation The physical transformation whose intermediate 
result should be cached.
+     */
+    public CachedDataStream(
+            StreamExecutionEnvironment environment, Transformation<T> 
transformation) {
+        super(
+                environment,
+                new CacheTransformation<>(
+                        transformation,
+                        String.format("Cache: %s", transformation.getName()),
+                        transformation.getOutputType(),
+                        transformation.getParallelism()));
+
+        final CacheTransformation<T> t = (CacheTransformation<T>) 
this.getTransformation();
+        environment.addCache(t.getIntermediateDataSetID(), t);
+    }
+
+    /**
+     * Invalidate the cache intermediate result of this DataStream to release 
the physical
+     * resources. Users are not required to invoke this method to release 
physical resources unless
+     * they want to. The CachedDataStream should not be used after it is 
closed.
+     */
+    @Override
+    public void close() throws Exception {

Review Comment:
   You are correct that users are less likely to use the try-with-resource with 
the CachedDataStream. The reason for using `close` instead of `invalidate` or 
`unpersist` is that `close` implies that the object should not be used after 
close, while the `invalidate` and `unpersist` don't have that implication.
   
   With that said, I am a bit leaning toward using `close` and not extends 
`AutoCloseable`. What do you think?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to