Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1434#discussion_r47777988 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -440,4 +478,226 @@ private void configureExecutionRetryDelay() { long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay(); jobGraph.setExecutionRetryDelay(executionRetryDelay); } + + // ------------------------------------------------------------------------ + + /** + * Returns a map with a hash for each {@link StreamNode} of the {@link + * StreamGraph}. The hash is used as the {@link JobVertexID} in order to + * identify nodes across job submissions if they didn't change. + * + * <p>The complete {@link StreamGraph} is traversed. The hash is either + * computed from the transformation's user-specified id (see + * {@link StreamTransformation#getUid()}) or generated in a deterministic way. + * + * <p>The generated hash is deterministic with respect to: + * <ul> + * <li>node-local properties (like parallelism, UDF, node ID), + * <li>chained output nodes, and + * <li>input nodes hashes + * </ul> + * + * @return A map from {@link StreamNode#id} to hash as 16-byte array. + */ + private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() { + // The hash function used to generate the hash + final HashFunction hashFunction = Hashing.murmur3_128(0); + final Map<Integer, byte[]> hashes = new HashMap<>(); + + Set<Integer> visited = new HashSet<>(); + Queue<StreamNode> remaining = new ArrayDeque<>(); + + // We need to make the source order deterministic. This depends on the + // ordering of the sources in the Environment, e.g. if a source X is + // added before source Y, X will have a lower ID than Y (assigned by a + // static counter). + List<Integer> sources = new ArrayList<>(); + for (Integer sourceNodeId : streamGraph.getSourceIDs()) { + sources.add(sourceNodeId); + } + + Collections.sort(sources); + + // Traverse the graph in a breadth-first manner. Keep in mind that + // the graph is not a tree and multiple paths to nodes can exist. + + // Start with source nodes + for (Integer sourceNodeId : sources) { + remaining.add(streamGraph.getStreamNode(sourceNodeId)); + visited.add(sourceNodeId); + } + + StreamNode currentNode; + while ((currentNode = remaining.poll()) != null) { + // Generate the hash code. Because multiple path exist to each + // node, we might not have all required inputs available to + // generate the hash code. + if (generateNodeHash(currentNode, hashFunction, hashes)) { + // Add the child nodes + for (StreamEdge outEdge : currentNode.getOutEdges()) { --- End diff -- This is correct. Changing the order changes the hash code. The ordering question really depends on whether we include some form of ID to solve collisions.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---