[ 
https://issues.apache.org/jira/browse/FLINK-5290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15742622#comment-15742622
 ] 

ASF GitHub Bot commented on FLINK-5290:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2966#discussion_r92003177
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java
 ---
    @@ -0,0 +1,313 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.graph;
    +
    +import com.google.common.hash.HashFunction;
    +import com.google.common.hash.Hasher;
    +import com.google.common.hash.Hashing;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
    +import org.apache.flink.streaming.api.operators.ChainingStrategy;
    +import org.apache.flink.streaming.api.operators.StreamOperator;
    +import org.apache.flink.streaming.api.transformations.StreamTransformation;
    +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.nio.charset.Charset;
    +import java.util.ArrayDeque;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Queue;
    +import java.util.Set;
    +
    +import static org.apache.flink.util.StringUtils.byteToHexString;
    +
    +public class DefaultStreamGraphHasher implements StreamGraphHasher {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStreamGraphHasher.class);
    +
    +   /**
    +    * Returns a map with a hash for each {@link StreamNode} of the {@link
    +    * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +    * identify nodes across job submissions if they didn't change.
    +    * <p>
    +    * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +    * computed from the transformation's user-specified id (see
    +    * {@link StreamTransformation#getUid()}) or generated in a 
deterministic way.
    +    * <p>
    +    * <p>The generated hash is deterministic with respect to:
    +    * <ul>
    +    * <li>node-local properties (like parallelism, UDF, node ID),
    +    * <li>chained output nodes, and
    +    * <li>input nodes hashes
    +    * </ul>
    +    *
    +    * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +    */
    +   @Override
    +   public Map<Integer, byte[]> 
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
    +           // The hash function used to generate the hash
    +           final HashFunction hashFunction = Hashing.murmur3_128(0);
    +           final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +           Set<Integer> visited = new HashSet<>();
    +           Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +           // We need to make the source order deterministic. The source 
IDs are
    +           // not returned in the same order, which means that submitting 
the same
    +           // program twice might result in different traversal, which 
breaks the
    +           // deterministic hash assignment.
    +           List<Integer> sources = new ArrayList<>();
    +           for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
    +                   sources.add(sourceNodeId);
    +           }
    +           Collections.sort(sources);
    +
    +           //
    +           // Traverse the graph in a breadth-first manner. Keep in mind 
that
    +           // the graph is not a tree and multiple paths to nodes can 
exist.
    +           //
    +
    +           // Start with source nodes
    +           for (Integer sourceNodeId : sources) {
    +                   remaining.add(streamGraph.getStreamNode(sourceNodeId));
    +                   visited.add(sourceNodeId);
    +           }
    +
    +           StreamNode currentNode;
    +           while ((currentNode = remaining.poll()) != null) {
    +                   // Generate the hash code. Because multiple path exist 
to each
    +                   // node, we might not have all required inputs 
available to
    +                   // generate the hash code.
    +                   if (generateNodeHash(currentNode, hashFunction, hashes, 
streamGraph.isChainingEnabled())) {
    +                           // Add the child nodes
    +                           for (StreamEdge outEdge : 
currentNode.getOutEdges()) {
    +                                   StreamNode child = 
outEdge.getTargetVertex();
    +
    +                                   if (!visited.contains(child.getId())) {
    +                                           remaining.add(child);
    +                                           visited.add(child.getId());
    +                                   }
    +                           }
    +                   } else {
    +                           // We will revisit this later.
    +                           visited.remove(currentNode.getId());
    +                   }
    +           }
    +
    +           return hashes;
    +   }
    +
    +   /**
    +    * Generates a hash for the node and returns whether the operation was
    +    * successful.
    +    *
    +    * @param node         The node to generate the hash for
    +    * @param hashFunction The hash function to use
    +    * @param hashes       The current state of generated hashes
    +    * @return <code>true</code> if the node hash has been generated.
    +    * <code>false</code>, otherwise. If the operation is not successful, 
the
    +    * hash needs be generated at a later point when all input is available.
    +    * @throws IllegalStateException If node has user-specified hash and is
    +    *                               intermediate node of a chain
    +    */
    +   private boolean generateNodeHash(
    +                   StreamNode node,
    +                   HashFunction hashFunction,
    +                   Map<Integer, byte[]> hashes,
    +                   boolean isChainingEnabled) {
    +
    +           // Check for user-specified ID
    +           String userSpecifiedHash = node.getTransformationId();
    +
    +           if (userSpecifiedHash == null) {
    +                   // Check that all input nodes have their hashes computed
    +                   for (StreamEdge inEdge : node.getInEdges()) {
    +                           // If the input node has not been visited yet, 
the current
    +                           // node will be visited again at a later point 
when all input
    +                           // nodes have been visited and their hashes set.
    +                           if (!hashes.containsKey(inEdge.getSourceId())) {
    +                                   return false;
    +                           }
    +                   }
    +
    +                   Hasher hasher = hashFunction.newHasher();
    +                   byte[] hash = generateDeterministicHash(node, hasher, 
hashes, isChainingEnabled);
    +
    +                   if (hashes.put(node.getId(), hash) != null) {
    +                           // Sanity check
    +                           throw new IllegalStateException("Unexpected 
state. Tried to add node hash " +
    +                                           "twice. This is probably a bug 
in the JobGraph generator.");
    +                   }
    +
    +                   return true;
    +           } else {
    +                   // Check that this node is not part of a chain. This is 
currently
    +                   // not supported, because the runtime takes the 
snapshots by the
    +                   // operator ID of the first vertex in a chain. It's OK 
if the node
    +                   // has chained outputs.
    +                   for (StreamEdge inEdge : node.getInEdges()) {
    +                           if (isChainable(inEdge, isChainingEnabled)) {
    +                                   throw new 
UnsupportedOperationException("Cannot assign user-specified hash "
    +                                                   + "to intermediate node 
in chain. This will be supported in future "
    +                                                   + "versions of Flink. 
As a work around start new chain at task "
    +                                                   + 
node.getOperatorName() + ".");
    +                           }
    +                   }
    +
    +                   Hasher hasher = hashFunction.newHasher();
    +                   byte[] hash = generateUserSpecifiedHash(node, hasher);
    +
    +                   for (byte[] previousHash : hashes.values()) {
    +                           if (Arrays.equals(previousHash, hash)) {
    +                                   throw new 
IllegalArgumentException("Hash collision on user-specified ID. " +
    +                                                   "Most likely cause is a 
non-unique ID. Please check that all IDs " +
    +                                                   "specified via 
`uid(String)` are unique.");
    +                           }
    +                   }
    +
    +                   if (hashes.put(node.getId(), hash) != null) {
    +                           // Sanity check
    +                           throw new IllegalStateException("Unexpected 
state. Tried to add node hash " +
    +                                           "twice. This is probably a bug 
in the JobGraph generator.");
    +                   }
    +
    +                   return true;
    +           }
    +   }
    +
    +   /**
    +    * Generates a hash from a user-specified ID.
    +    */
    +   private byte[] generateUserSpecifiedHash(StreamNode node, Hasher 
hasher) {
    +           hasher.putString(node.getTransformationId(), 
Charset.forName("UTF-8"));
    +
    +           return hasher.hash().asBytes();
    +   }
    +
    +   /**
    +    * Generates a deterministic hash from node-local properties and input 
and
    +    * output edges.
    +    */
    +   private byte[] generateDeterministicHash(
    +                   StreamNode node,
    +                   Hasher hasher,
    +                   Map<Integer, byte[]> hashes,
    +                   boolean isChainingEnabled) {
    +
    +           // Include stream node to hash. We use the current size of the 
computed
    +           // hashes as the ID. We cannot use the node's ID, because it is
    +           // assigned from a static counter. This will result in two 
identical
    +           // programs having different hashes.
    +           generateNodeLocalHash(node, hasher, hashes.size());
    +
    +           // Include chained nodes to hash
    +           for (StreamEdge outEdge : node.getOutEdges()) {
    +                   if (isChainable(outEdge, isChainingEnabled)) {
    +                           StreamNode chainedNode = 
outEdge.getTargetVertex();
    +
    +                           // Use the hash size again, because the nodes 
are chained to
    +                           // this node. This does not add a hash for the 
chained nodes.
    +                           generateNodeLocalHash(chainedNode, hasher, 
hashes.size());
    +                   }
    +           }
    +
    +           byte[] hash = hasher.hash().asBytes();
    +
    +           // Make sure that all input nodes have their hash set before 
entering
    +           // this loop (calling this method).
    +           for (StreamEdge inEdge : node.getInEdges()) {
    +                   byte[] otherHash = hashes.get(inEdge.getSourceId());
    +
    +                   // Sanity check
    +                   if (otherHash == null) {
    +                           throw new IllegalStateException("Missing hash 
for input node "
    +                                           + inEdge.getSourceVertex() + ". 
Cannot generate hash for "
    +                                           + node + ".");
    +                   }
    +
    +                   for (int j = 0; j < hash.length; j++) {
    +                           hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
    +                   }
    +           }
    +
    +           if (LOG.isDebugEnabled()) {
    +                   String udfClassName = "";
    +                   if (node.getOperator() instanceof 
AbstractUdfStreamOperator) {
    +                           udfClassName = ((AbstractUdfStreamOperator<?, 
?>) node.getOperator())
    +                                           
.getUserFunction().getClass().getName();
    +                   }
    +
    +                   LOG.debug("Generated hash '" + byteToHexString(hash) + 
"' for node " +
    +                                   "'" + node.toString() + "' {id: " + 
node.getId() + ", " +
    +                                   "parallelism: " + node.getParallelism() 
+ ", " +
    +                                   "user function: " + udfClassName + "}");
    +           }
    +
    +           return hash;
    +   }
    +
    +   /**
    +    * Applies the {@link Hasher} to the {@link StreamNode} (only node local
    +    * attributes are taken into account). The hasher encapsulates the 
current
    +    * state of the hash.
    +    * <p>
    +    * <p>The specified ID is local to this node. We cannot use the
    +    * {@link StreamNode#id}, because it is incremented in a static counter.
    +    * Therefore, the IDs for identical jobs will otherwise be different.
    +    */
    +   protected void generateNodeLocalHash(StreamNode node, Hasher hasher, 
int id) {
    +           // This resolves conflicts for otherwise identical source 
nodes. BUT
    +           // the generated hash codes depend on the ordering of the nodes 
in the
    +           // stream graph.
    +           hasher.putInt(id);
    +
    +           if (node.getOperator() instanceof AbstractUdfStreamOperator) {
    +                   String udfClassName = ((AbstractUdfStreamOperator<?, 
?>) node.getOperator())
    +                                   .getUserFunction().getClass().getName();
    +
    +                   hasher.putString(udfClassName, 
Charset.forName("UTF-8"));
    +           }
    +   }
    +
    +   private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) 
{
    --- End diff --
    
    You're right. We should probably not have this for the default hasher and 
only have it in the legacy hasher, then. WDYT?


> Ensure backwards compatibility of the hashes used to generate JobVertexIds
> --------------------------------------------------------------------------
>
>                 Key: FLINK-5290
>                 URL: https://issues.apache.org/jira/browse/FLINK-5290
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> The way in which hashes for JobVertexIds are generated changed between Flink 
> 1.1 and 1.2 (parallelism was considered for the hash in 1.1). We need to be 
> backwards compatible to old JobVertexId generation so that we can still 
> assign state from old savepoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to