Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2966#discussion_r92003177 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java --- @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.StringUtils.byteToHexString; + +public class DefaultStreamGraphHasher implements StreamGraphHasher { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamGraphHasher.class); + + /** + * Returns a map with a hash for each {@link StreamNode} of the {@link + * StreamGraph}. The hash is used as the {@link JobVertexID} in order to + * identify nodes across job submissions if they didn't change. + * <p> + * <p>The complete {@link StreamGraph} is traversed. The hash is either + * computed from the transformation's user-specified id (see + * {@link StreamTransformation#getUid()}) or generated in a deterministic way. + * <p> + * <p>The generated hash is deterministic with respect to: + * <ul> + * <li>node-local properties (like parallelism, UDF, node ID), + * <li>chained output nodes, and + * <li>input nodes hashes + * </ul> + * + * @return A map from {@link StreamNode#id} to hash as 16-byte array. + */ + @Override + public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) { + // The hash function used to generate the hash + final HashFunction hashFunction = Hashing.murmur3_128(0); + final Map<Integer, byte[]> hashes = new HashMap<>(); + + Set<Integer> visited = new HashSet<>(); + Queue<StreamNode> remaining = new ArrayDeque<>(); + + // We need to make the source order deterministic. The source IDs are + // not returned in the same order, which means that submitting the same + // program twice might result in different traversal, which breaks the + // deterministic hash assignment. + List<Integer> sources = new ArrayList<>(); + for (Integer sourceNodeId : streamGraph.getSourceIDs()) { + sources.add(sourceNodeId); + } + Collections.sort(sources); + + // + // Traverse the graph in a breadth-first manner. Keep in mind that + // the graph is not a tree and multiple paths to nodes can exist. + // + + // Start with source nodes + for (Integer sourceNodeId : sources) { + remaining.add(streamGraph.getStreamNode(sourceNodeId)); + visited.add(sourceNodeId); + } + + StreamNode currentNode; + while ((currentNode = remaining.poll()) != null) { + // Generate the hash code. Because multiple path exist to each + // node, we might not have all required inputs available to + // generate the hash code. + if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) { + // Add the child nodes + for (StreamEdge outEdge : currentNode.getOutEdges()) { + StreamNode child = outEdge.getTargetVertex(); + + if (!visited.contains(child.getId())) { + remaining.add(child); + visited.add(child.getId()); + } + } + } else { + // We will revisit this later. + visited.remove(currentNode.getId()); + } + } + + return hashes; + } + + /** + * Generates a hash for the node and returns whether the operation was + * successful. + * + * @param node The node to generate the hash for + * @param hashFunction The hash function to use + * @param hashes The current state of generated hashes + * @return <code>true</code> if the node hash has been generated. + * <code>false</code>, otherwise. If the operation is not successful, the + * hash needs be generated at a later point when all input is available. + * @throws IllegalStateException If node has user-specified hash and is + * intermediate node of a chain + */ + private boolean generateNodeHash( + StreamNode node, + HashFunction hashFunction, + Map<Integer, byte[]> hashes, + boolean isChainingEnabled) { + + // Check for user-specified ID + String userSpecifiedHash = node.getTransformationId(); + + if (userSpecifiedHash == null) { + // Check that all input nodes have their hashes computed + for (StreamEdge inEdge : node.getInEdges()) { + // If the input node has not been visited yet, the current + // node will be visited again at a later point when all input + // nodes have been visited and their hashes set. + if (!hashes.containsKey(inEdge.getSourceId())) { + return false; + } + } + + Hasher hasher = hashFunction.newHasher(); + byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled); + + if (hashes.put(node.getId(), hash) != null) { + // Sanity check + throw new IllegalStateException("Unexpected state. Tried to add node hash " + + "twice. This is probably a bug in the JobGraph generator."); + } + + return true; + } else { + // Check that this node is not part of a chain. This is currently + // not supported, because the runtime takes the snapshots by the + // operator ID of the first vertex in a chain. It's OK if the node + // has chained outputs. + for (StreamEdge inEdge : node.getInEdges()) { + if (isChainable(inEdge, isChainingEnabled)) { + throw new UnsupportedOperationException("Cannot assign user-specified hash " + + "to intermediate node in chain. This will be supported in future " + + "versions of Flink. As a work around start new chain at task " + + node.getOperatorName() + "."); + } + } + + Hasher hasher = hashFunction.newHasher(); + byte[] hash = generateUserSpecifiedHash(node, hasher); + + for (byte[] previousHash : hashes.values()) { + if (Arrays.equals(previousHash, hash)) { + throw new IllegalArgumentException("Hash collision on user-specified ID. " + + "Most likely cause is a non-unique ID. Please check that all IDs " + + "specified via `uid(String)` are unique."); + } + } + + if (hashes.put(node.getId(), hash) != null) { + // Sanity check + throw new IllegalStateException("Unexpected state. Tried to add node hash " + + "twice. This is probably a bug in the JobGraph generator."); + } + + return true; + } + } + + /** + * Generates a hash from a user-specified ID. + */ + private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) { + hasher.putString(node.getTransformationId(), Charset.forName("UTF-8")); + + return hasher.hash().asBytes(); + } + + /** + * Generates a deterministic hash from node-local properties and input and + * output edges. + */ + private byte[] generateDeterministicHash( + StreamNode node, + Hasher hasher, + Map<Integer, byte[]> hashes, + boolean isChainingEnabled) { + + // Include stream node to hash. We use the current size of the computed + // hashes as the ID. We cannot use the node's ID, because it is + // assigned from a static counter. This will result in two identical + // programs having different hashes. + generateNodeLocalHash(node, hasher, hashes.size()); + + // Include chained nodes to hash + for (StreamEdge outEdge : node.getOutEdges()) { + if (isChainable(outEdge, isChainingEnabled)) { + StreamNode chainedNode = outEdge.getTargetVertex(); + + // Use the hash size again, because the nodes are chained to + // this node. This does not add a hash for the chained nodes. + generateNodeLocalHash(chainedNode, hasher, hashes.size()); + } + } + + byte[] hash = hasher.hash().asBytes(); + + // Make sure that all input nodes have their hash set before entering + // this loop (calling this method). + for (StreamEdge inEdge : node.getInEdges()) { + byte[] otherHash = hashes.get(inEdge.getSourceId()); + + // Sanity check + if (otherHash == null) { + throw new IllegalStateException("Missing hash for input node " + + inEdge.getSourceVertex() + ". Cannot generate hash for " + + node + "."); + } + + for (int j = 0; j < hash.length; j++) { + hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]); + } + } + + if (LOG.isDebugEnabled()) { + String udfClassName = ""; + if (node.getOperator() instanceof AbstractUdfStreamOperator) { + udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator()) + .getUserFunction().getClass().getName(); + } + + LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " + + "'" + node.toString() + "' {id: " + node.getId() + ", " + + "parallelism: " + node.getParallelism() + ", " + + "user function: " + udfClassName + "}"); + } + + return hash; + } + + /** + * Applies the {@link Hasher} to the {@link StreamNode} (only node local + * attributes are taken into account). The hasher encapsulates the current + * state of the hash. + * <p> + * <p>The specified ID is local to this node. We cannot use the + * {@link StreamNode#id}, because it is incremented in a static counter. + * Therefore, the IDs for identical jobs will otherwise be different. + */ + protected void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) { + // This resolves conflicts for otherwise identical source nodes. BUT + // the generated hash codes depend on the ordering of the nodes in the + // stream graph. + hasher.putInt(id); + + if (node.getOperator() instanceof AbstractUdfStreamOperator) { + String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator()) + .getUserFunction().getClass().getName(); + + hasher.putString(udfClassName, Charset.forName("UTF-8")); + } + } + + private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) { --- End diff -- You're right. We should probably not have this for the default hasher and only have it in the legacy hasher, then. WDYT?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---