[ https://issues.apache.org/jira/browse/FLINK-2976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15060001#comment-15060001 ]
ASF GitHub Bot commented on FLINK-2976: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1434#discussion_r47777988 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -440,4 +478,226 @@ private void configureExecutionRetryDelay() { long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay(); jobGraph.setExecutionRetryDelay(executionRetryDelay); } + + // ------------------------------------------------------------------------ + + /** + * Returns a map with a hash for each {@link StreamNode} of the {@link + * StreamGraph}. The hash is used as the {@link JobVertexID} in order to + * identify nodes across job submissions if they didn't change. + * + * <p>The complete {@link StreamGraph} is traversed. The hash is either + * computed from the transformation's user-specified id (see + * {@link StreamTransformation#getUid()}) or generated in a deterministic way. + * + * <p>The generated hash is deterministic with respect to: + * <ul> + * <li>node-local properties (like parallelism, UDF, node ID), + * <li>chained output nodes, and + * <li>input nodes hashes + * </ul> + * + * @return A map from {@link StreamNode#id} to hash as 16-byte array. + */ + private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() { + // The hash function used to generate the hash + final HashFunction hashFunction = Hashing.murmur3_128(0); + final Map<Integer, byte[]> hashes = new HashMap<>(); + + Set<Integer> visited = new HashSet<>(); + Queue<StreamNode> remaining = new ArrayDeque<>(); + + // We need to make the source order deterministic. This depends on the + // ordering of the sources in the Environment, e.g. if a source X is + // added before source Y, X will have a lower ID than Y (assigned by a + // static counter). + List<Integer> sources = new ArrayList<>(); + for (Integer sourceNodeId : streamGraph.getSourceIDs()) { + sources.add(sourceNodeId); + } + + Collections.sort(sources); + + // Traverse the graph in a breadth-first manner. Keep in mind that + // the graph is not a tree and multiple paths to nodes can exist. + + // Start with source nodes + for (Integer sourceNodeId : sources) { + remaining.add(streamGraph.getStreamNode(sourceNodeId)); + visited.add(sourceNodeId); + } + + StreamNode currentNode; + while ((currentNode = remaining.poll()) != null) { + // Generate the hash code. Because multiple path exist to each + // node, we might not have all required inputs available to + // generate the hash code. + if (generateNodeHash(currentNode, hashFunction, hashes)) { + // Add the child nodes + for (StreamEdge outEdge : currentNode.getOutEdges()) { --- End diff -- This is correct. Changing the order changes the hash code. The ordering question really depends on whether we include some form of ID to solve collisions. > Save and load checkpoints manually > ---------------------------------- > > Key: FLINK-2976 > URL: https://issues.apache.org/jira/browse/FLINK-2976 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime > Affects Versions: 0.10.0 > Reporter: Ufuk Celebi > Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > Currently, all checkpointed state is bound to a job. After the job finishes > all state is lost. In case of an HA cluster, jobs can live longer than the > cluster, but they still suffer from the same issue when they finish. > Multiple users have requested the feature to manually save a checkpoint in > order to resume from it at a later point. This is especially important for > production environments. As an example, consider upgrading your existing > production Flink program. Currently, you loose all the state of your program. > With the proposed mechanism, it will be possible to save a checkpoint, stop > and update your program, and then continue your program with the checkpoint. > The required operations can be simple: > saveCheckpoint(JobID) => checkpointID: long > loadCheckpoint(JobID, long) => void > For the initial version, I would apply the following restriction: > - The topology needs to stay the same (JobGraph parallelism, etc.) > A user can configure this behaviour via the environment like the > checkpointing interval. Furthermore, the user can trigger the save operation > via the command line at arbitrary times and load a checkpoint when submitting > a job, e.g. > bin/flink checkpoint <JobID> => checkpointID: long > and > bin/flink run --loadCheckpoint JobID [latest saved checkpoint] > bin/flink run --loadCheckpoint (JobID,long) [specific saved checkpoint] > As far as I can tell, the required mechanisms are similar to the ones > implemented for JobManager high availability. We need to make sure to persist > the CompletedCheckpoint instances as a pointer to the checkpoint state and to > *not* remove saved checkpoint state. > On the client side, we need to give the job and its vertices the same IDs to > allow mapping the checkpoint state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)