[
https://issues.apache.org/jira/browse/FLINK-5290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15742622#comment-15742622
]
ASF GitHub Bot commented on FLINK-5290:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2966#discussion_r92003177
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java
---
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+public class DefaultStreamGraphHasher implements StreamGraphHasher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultStreamGraphHasher.class);
+
+ /**
+ * Returns a map with a hash for each {@link StreamNode} of the {@link
+ * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
+ * identify nodes across job submissions if they didn't change.
+ * <p>
+ * <p>The complete {@link StreamGraph} is traversed. The hash is either
+ * computed from the transformation's user-specified id (see
+ * {@link StreamTransformation#getUid()}) or generated in a
deterministic way.
+ * <p>
+ * <p>The generated hash is deterministic with respect to:
+ * <ul>
+ * <li>node-local properties (like parallelism, UDF, node ID),
+ * <li>chained output nodes, and
+ * <li>input nodes hashes
+ * </ul>
+ *
+ * @return A map from {@link StreamNode#id} to hash as 16-byte array.
+ */
+ @Override
+ public Map<Integer, byte[]>
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+ // The hash function used to generate the hash
+ final HashFunction hashFunction = Hashing.murmur3_128(0);
+ final Map<Integer, byte[]> hashes = new HashMap<>();
+
+ Set<Integer> visited = new HashSet<>();
+ Queue<StreamNode> remaining = new ArrayDeque<>();
+
+ // We need to make the source order deterministic. The source
IDs are
+ // not returned in the same order, which means that submitting
the same
+ // program twice might result in different traversal, which
breaks the
+ // deterministic hash assignment.
+ List<Integer> sources = new ArrayList<>();
+ for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+ sources.add(sourceNodeId);
+ }
+ Collections.sort(sources);
+
+ //
+ // Traverse the graph in a breadth-first manner. Keep in mind
that
+ // the graph is not a tree and multiple paths to nodes can
exist.
+ //
+
+ // Start with source nodes
+ for (Integer sourceNodeId : sources) {
+ remaining.add(streamGraph.getStreamNode(sourceNodeId));
+ visited.add(sourceNodeId);
+ }
+
+ StreamNode currentNode;
+ while ((currentNode = remaining.poll()) != null) {
+ // Generate the hash code. Because multiple path exist
to each
+ // node, we might not have all required inputs
available to
+ // generate the hash code.
+ if (generateNodeHash(currentNode, hashFunction, hashes,
streamGraph.isChainingEnabled())) {
+ // Add the child nodes
+ for (StreamEdge outEdge :
currentNode.getOutEdges()) {
+ StreamNode child =
outEdge.getTargetVertex();
+
+ if (!visited.contains(child.getId())) {
+ remaining.add(child);
+ visited.add(child.getId());
+ }
+ }
+ } else {
+ // We will revisit this later.
+ visited.remove(currentNode.getId());
+ }
+ }
+
+ return hashes;
+ }
+
+ /**
+ * Generates a hash for the node and returns whether the operation was
+ * successful.
+ *
+ * @param node The node to generate the hash for
+ * @param hashFunction The hash function to use
+ * @param hashes The current state of generated hashes
+ * @return <code>true</code> if the node hash has been generated.
+ * <code>false</code>, otherwise. If the operation is not successful,
the
+ * hash needs be generated at a later point when all input is available.
+ * @throws IllegalStateException If node has user-specified hash and is
+ * intermediate node of a chain
+ */
+ private boolean generateNodeHash(
+ StreamNode node,
+ HashFunction hashFunction,
+ Map<Integer, byte[]> hashes,
+ boolean isChainingEnabled) {
+
+ // Check for user-specified ID
+ String userSpecifiedHash = node.getTransformationId();
+
+ if (userSpecifiedHash == null) {
+ // Check that all input nodes have their hashes computed
+ for (StreamEdge inEdge : node.getInEdges()) {
+ // If the input node has not been visited yet,
the current
+ // node will be visited again at a later point
when all input
+ // nodes have been visited and their hashes set.
+ if (!hashes.containsKey(inEdge.getSourceId())) {
+ return false;
+ }
+ }
+
+ Hasher hasher = hashFunction.newHasher();
+ byte[] hash = generateDeterministicHash(node, hasher,
hashes, isChainingEnabled);
+
+ if (hashes.put(node.getId(), hash) != null) {
+ // Sanity check
+ throw new IllegalStateException("Unexpected
state. Tried to add node hash " +
+ "twice. This is probably a bug
in the JobGraph generator.");
+ }
+
+ return true;
+ } else {
+ // Check that this node is not part of a chain. This is
currently
+ // not supported, because the runtime takes the
snapshots by the
+ // operator ID of the first vertex in a chain. It's OK
if the node
+ // has chained outputs.
+ for (StreamEdge inEdge : node.getInEdges()) {
+ if (isChainable(inEdge, isChainingEnabled)) {
+ throw new
UnsupportedOperationException("Cannot assign user-specified hash "
+ + "to intermediate node
in chain. This will be supported in future "
+ + "versions of Flink.
As a work around start new chain at task "
+ +
node.getOperatorName() + ".");
+ }
+ }
+
+ Hasher hasher = hashFunction.newHasher();
+ byte[] hash = generateUserSpecifiedHash(node, hasher);
+
+ for (byte[] previousHash : hashes.values()) {
+ if (Arrays.equals(previousHash, hash)) {
+ throw new
IllegalArgumentException("Hash collision on user-specified ID. " +
+ "Most likely cause is a
non-unique ID. Please check that all IDs " +
+ "specified via
`uid(String)` are unique.");
+ }
+ }
+
+ if (hashes.put(node.getId(), hash) != null) {
+ // Sanity check
+ throw new IllegalStateException("Unexpected
state. Tried to add node hash " +
+ "twice. This is probably a bug
in the JobGraph generator.");
+ }
+
+ return true;
+ }
+ }
+
+ /**
+ * Generates a hash from a user-specified ID.
+ */
+ private byte[] generateUserSpecifiedHash(StreamNode node, Hasher
hasher) {
+ hasher.putString(node.getTransformationId(),
Charset.forName("UTF-8"));
+
+ return hasher.hash().asBytes();
+ }
+
+ /**
+ * Generates a deterministic hash from node-local properties and input
and
+ * output edges.
+ */
+ private byte[] generateDeterministicHash(
+ StreamNode node,
+ Hasher hasher,
+ Map<Integer, byte[]> hashes,
+ boolean isChainingEnabled) {
+
+ // Include stream node to hash. We use the current size of the
computed
+ // hashes as the ID. We cannot use the node's ID, because it is
+ // assigned from a static counter. This will result in two
identical
+ // programs having different hashes.
+ generateNodeLocalHash(node, hasher, hashes.size());
+
+ // Include chained nodes to hash
+ for (StreamEdge outEdge : node.getOutEdges()) {
+ if (isChainable(outEdge, isChainingEnabled)) {
+ StreamNode chainedNode =
outEdge.getTargetVertex();
+
+ // Use the hash size again, because the nodes
are chained to
+ // this node. This does not add a hash for the
chained nodes.
+ generateNodeLocalHash(chainedNode, hasher,
hashes.size());
+ }
+ }
+
+ byte[] hash = hasher.hash().asBytes();
+
+ // Make sure that all input nodes have their hash set before
entering
+ // this loop (calling this method).
+ for (StreamEdge inEdge : node.getInEdges()) {
+ byte[] otherHash = hashes.get(inEdge.getSourceId());
+
+ // Sanity check
+ if (otherHash == null) {
+ throw new IllegalStateException("Missing hash
for input node "
+ + inEdge.getSourceVertex() + ".
Cannot generate hash for "
+ + node + ".");
+ }
+
+ for (int j = 0; j < hash.length; j++) {
+ hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ String udfClassName = "";
+ if (node.getOperator() instanceof
AbstractUdfStreamOperator) {
+ udfClassName = ((AbstractUdfStreamOperator<?,
?>) node.getOperator())
+
.getUserFunction().getClass().getName();
+ }
+
+ LOG.debug("Generated hash '" + byteToHexString(hash) +
"' for node " +
+ "'" + node.toString() + "' {id: " +
node.getId() + ", " +
+ "parallelism: " + node.getParallelism()
+ ", " +
+ "user function: " + udfClassName + "}");
+ }
+
+ return hash;
+ }
+
+ /**
+ * Applies the {@link Hasher} to the {@link StreamNode} (only node local
+ * attributes are taken into account). The hasher encapsulates the
current
+ * state of the hash.
+ * <p>
+ * <p>The specified ID is local to this node. We cannot use the
+ * {@link StreamNode#id}, because it is incremented in a static counter.
+ * Therefore, the IDs for identical jobs will otherwise be different.
+ */
+ protected void generateNodeLocalHash(StreamNode node, Hasher hasher,
int id) {
+ // This resolves conflicts for otherwise identical source
nodes. BUT
+ // the generated hash codes depend on the ordering of the nodes
in the
+ // stream graph.
+ hasher.putInt(id);
+
+ if (node.getOperator() instanceof AbstractUdfStreamOperator) {
+ String udfClassName = ((AbstractUdfStreamOperator<?,
?>) node.getOperator())
+ .getUserFunction().getClass().getName();
+
+ hasher.putString(udfClassName,
Charset.forName("UTF-8"));
+ }
+ }
+
+ private boolean isChainable(StreamEdge edge, boolean isChainingEnabled)
{
--- End diff --
You're right. We should probably not have this for the default hasher and
only have it in the legacy hasher, then. WDYT?
> Ensure backwards compatibility of the hashes used to generate JobVertexIds
> --------------------------------------------------------------------------
>
> Key: FLINK-5290
> URL: https://issues.apache.org/jira/browse/FLINK-5290
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> The way in which hashes for JobVertexIds are generated changed between Flink
> 1.1 and 1.2 (parallelism was considered for the hash in 1.1). We need to be
> backwards compatible to old JobVertexId generation so that we can still
> assign state from old savepoints.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)