Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2966#discussion_r92003177
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java
 ---
    @@ -0,0 +1,313 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.graph;
    +
    +import com.google.common.hash.HashFunction;
    +import com.google.common.hash.Hasher;
    +import com.google.common.hash.Hashing;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
    +import org.apache.flink.streaming.api.operators.ChainingStrategy;
    +import org.apache.flink.streaming.api.operators.StreamOperator;
    +import org.apache.flink.streaming.api.transformations.StreamTransformation;
    +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.nio.charset.Charset;
    +import java.util.ArrayDeque;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Queue;
    +import java.util.Set;
    +
    +import static org.apache.flink.util.StringUtils.byteToHexString;
    +
    +public class DefaultStreamGraphHasher implements StreamGraphHasher {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStreamGraphHasher.class);
    +
    +   /**
    +    * Returns a map with a hash for each {@link StreamNode} of the {@link
    +    * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +    * identify nodes across job submissions if they didn't change.
    +    * <p>
    +    * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +    * computed from the transformation's user-specified id (see
    +    * {@link StreamTransformation#getUid()}) or generated in a 
deterministic way.
    +    * <p>
    +    * <p>The generated hash is deterministic with respect to:
    +    * <ul>
    +    * <li>node-local properties (like parallelism, UDF, node ID),
    +    * <li>chained output nodes, and
    +    * <li>input nodes hashes
    +    * </ul>
    +    *
    +    * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +    */
    +   @Override
    +   public Map<Integer, byte[]> 
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
    +           // The hash function used to generate the hash
    +           final HashFunction hashFunction = Hashing.murmur3_128(0);
    +           final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +           Set<Integer> visited = new HashSet<>();
    +           Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +           // We need to make the source order deterministic. The source 
IDs are
    +           // not returned in the same order, which means that submitting 
the same
    +           // program twice might result in different traversal, which 
breaks the
    +           // deterministic hash assignment.
    +           List<Integer> sources = new ArrayList<>();
    +           for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
    +                   sources.add(sourceNodeId);
    +           }
    +           Collections.sort(sources);
    +
    +           //
    +           // Traverse the graph in a breadth-first manner. Keep in mind 
that
    +           // the graph is not a tree and multiple paths to nodes can 
exist.
    +           //
    +
    +           // Start with source nodes
    +           for (Integer sourceNodeId : sources) {
    +                   remaining.add(streamGraph.getStreamNode(sourceNodeId));
    +                   visited.add(sourceNodeId);
    +           }
    +
    +           StreamNode currentNode;
    +           while ((currentNode = remaining.poll()) != null) {
    +                   // Generate the hash code. Because multiple path exist 
to each
    +                   // node, we might not have all required inputs 
available to
    +                   // generate the hash code.
    +                   if (generateNodeHash(currentNode, hashFunction, hashes, 
streamGraph.isChainingEnabled())) {
    +                           // Add the child nodes
    +                           for (StreamEdge outEdge : 
currentNode.getOutEdges()) {
    +                                   StreamNode child = 
outEdge.getTargetVertex();
    +
    +                                   if (!visited.contains(child.getId())) {
    +                                           remaining.add(child);
    +                                           visited.add(child.getId());
    +                                   }
    +                           }
    +                   } else {
    +                           // We will revisit this later.
    +                           visited.remove(currentNode.getId());
    +                   }
    +           }
    +
    +           return hashes;
    +   }
    +
    +   /**
    +    * Generates a hash for the node and returns whether the operation was
    +    * successful.
    +    *
    +    * @param node         The node to generate the hash for
    +    * @param hashFunction The hash function to use
    +    * @param hashes       The current state of generated hashes
    +    * @return <code>true</code> if the node hash has been generated.
    +    * <code>false</code>, otherwise. If the operation is not successful, 
the
    +    * hash needs be generated at a later point when all input is available.
    +    * @throws IllegalStateException If node has user-specified hash and is
    +    *                               intermediate node of a chain
    +    */
    +   private boolean generateNodeHash(
    +                   StreamNode node,
    +                   HashFunction hashFunction,
    +                   Map<Integer, byte[]> hashes,
    +                   boolean isChainingEnabled) {
    +
    +           // Check for user-specified ID
    +           String userSpecifiedHash = node.getTransformationId();
    +
    +           if (userSpecifiedHash == null) {
    +                   // Check that all input nodes have their hashes computed
    +                   for (StreamEdge inEdge : node.getInEdges()) {
    +                           // If the input node has not been visited yet, 
the current
    +                           // node will be visited again at a later point 
when all input
    +                           // nodes have been visited and their hashes set.
    +                           if (!hashes.containsKey(inEdge.getSourceId())) {
    +                                   return false;
    +                           }
    +                   }
    +
    +                   Hasher hasher = hashFunction.newHasher();
    +                   byte[] hash = generateDeterministicHash(node, hasher, 
hashes, isChainingEnabled);
    +
    +                   if (hashes.put(node.getId(), hash) != null) {
    +                           // Sanity check
    +                           throw new IllegalStateException("Unexpected 
state. Tried to add node hash " +
    +                                           "twice. This is probably a bug 
in the JobGraph generator.");
    +                   }
    +
    +                   return true;
    +           } else {
    +                   // Check that this node is not part of a chain. This is 
currently
    +                   // not supported, because the runtime takes the 
snapshots by the
    +                   // operator ID of the first vertex in a chain. It's OK 
if the node
    +                   // has chained outputs.
    +                   for (StreamEdge inEdge : node.getInEdges()) {
    +                           if (isChainable(inEdge, isChainingEnabled)) {
    +                                   throw new 
UnsupportedOperationException("Cannot assign user-specified hash "
    +                                                   + "to intermediate node 
in chain. This will be supported in future "
    +                                                   + "versions of Flink. 
As a work around start new chain at task "
    +                                                   + 
node.getOperatorName() + ".");
    +                           }
    +                   }
    +
    +                   Hasher hasher = hashFunction.newHasher();
    +                   byte[] hash = generateUserSpecifiedHash(node, hasher);
    +
    +                   for (byte[] previousHash : hashes.values()) {
    +                           if (Arrays.equals(previousHash, hash)) {
    +                                   throw new 
IllegalArgumentException("Hash collision on user-specified ID. " +
    +                                                   "Most likely cause is a 
non-unique ID. Please check that all IDs " +
    +                                                   "specified via 
`uid(String)` are unique.");
    +                           }
    +                   }
    +
    +                   if (hashes.put(node.getId(), hash) != null) {
    +                           // Sanity check
    +                           throw new IllegalStateException("Unexpected 
state. Tried to add node hash " +
    +                                           "twice. This is probably a bug 
in the JobGraph generator.");
    +                   }
    +
    +                   return true;
    +           }
    +   }
    +
    +   /**
    +    * Generates a hash from a user-specified ID.
    +    */
    +   private byte[] generateUserSpecifiedHash(StreamNode node, Hasher 
hasher) {
    +           hasher.putString(node.getTransformationId(), 
Charset.forName("UTF-8"));
    +
    +           return hasher.hash().asBytes();
    +   }
    +
    +   /**
    +    * Generates a deterministic hash from node-local properties and input 
and
    +    * output edges.
    +    */
    +   private byte[] generateDeterministicHash(
    +                   StreamNode node,
    +                   Hasher hasher,
    +                   Map<Integer, byte[]> hashes,
    +                   boolean isChainingEnabled) {
    +
    +           // Include stream node to hash. We use the current size of the 
computed
    +           // hashes as the ID. We cannot use the node's ID, because it is
    +           // assigned from a static counter. This will result in two 
identical
    +           // programs having different hashes.
    +           generateNodeLocalHash(node, hasher, hashes.size());
    +
    +           // Include chained nodes to hash
    +           for (StreamEdge outEdge : node.getOutEdges()) {
    +                   if (isChainable(outEdge, isChainingEnabled)) {
    +                           StreamNode chainedNode = 
outEdge.getTargetVertex();
    +
    +                           // Use the hash size again, because the nodes 
are chained to
    +                           // this node. This does not add a hash for the 
chained nodes.
    +                           generateNodeLocalHash(chainedNode, hasher, 
hashes.size());
    +                   }
    +           }
    +
    +           byte[] hash = hasher.hash().asBytes();
    +
    +           // Make sure that all input nodes have their hash set before 
entering
    +           // this loop (calling this method).
    +           for (StreamEdge inEdge : node.getInEdges()) {
    +                   byte[] otherHash = hashes.get(inEdge.getSourceId());
    +
    +                   // Sanity check
    +                   if (otherHash == null) {
    +                           throw new IllegalStateException("Missing hash 
for input node "
    +                                           + inEdge.getSourceVertex() + ". 
Cannot generate hash for "
    +                                           + node + ".");
    +                   }
    +
    +                   for (int j = 0; j < hash.length; j++) {
    +                           hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
    +                   }
    +           }
    +
    +           if (LOG.isDebugEnabled()) {
    +                   String udfClassName = "";
    +                   if (node.getOperator() instanceof 
AbstractUdfStreamOperator) {
    +                           udfClassName = ((AbstractUdfStreamOperator<?, 
?>) node.getOperator())
    +                                           
.getUserFunction().getClass().getName();
    +                   }
    +
    +                   LOG.debug("Generated hash '" + byteToHexString(hash) + 
"' for node " +
    +                                   "'" + node.toString() + "' {id: " + 
node.getId() + ", " +
    +                                   "parallelism: " + node.getParallelism() 
+ ", " +
    +                                   "user function: " + udfClassName + "}");
    +           }
    +
    +           return hash;
    +   }
    +
    +   /**
    +    * Applies the {@link Hasher} to the {@link StreamNode} (only node local
    +    * attributes are taken into account). The hasher encapsulates the 
current
    +    * state of the hash.
    +    * <p>
    +    * <p>The specified ID is local to this node. We cannot use the
    +    * {@link StreamNode#id}, because it is incremented in a static counter.
    +    * Therefore, the IDs for identical jobs will otherwise be different.
    +    */
    +   protected void generateNodeLocalHash(StreamNode node, Hasher hasher, 
int id) {
    +           // This resolves conflicts for otherwise identical source 
nodes. BUT
    +           // the generated hash codes depend on the ordering of the nodes 
in the
    +           // stream graph.
    +           hasher.putInt(id);
    +
    +           if (node.getOperator() instanceof AbstractUdfStreamOperator) {
    +                   String udfClassName = ((AbstractUdfStreamOperator<?, 
?>) node.getOperator())
    +                                   .getUserFunction().getClass().getName();
    +
    +                   hasher.putString(udfClassName, 
Charset.forName("UTF-8"));
    +           }
    +   }
    +
    +   private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) 
{
    --- End diff --
    
    You're right. We should probably not have this for the default hasher and 
only have it in the legacy hasher, then. WDYT?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to