[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16593298#comment-16593298 ]
ASF GitHub Bot commented on FLINK-10204: ---------------------------------------- zentol closed pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java index 932e1300d9c..4074251986f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java @@ -80,7 +80,7 @@ public boolean equals(Object o) { if (markedTime != that.markedTime) { return false; } - if (operatorId != that.operatorId) { + if (!operatorId.equals(that.operatorId)) { return false; } return subtaskIndex == that.subtaskIndex; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index ba92416d792..ed6022ff592 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -156,7 +156,8 @@ else if (tag == TAG_STREAM_STATUS) { } else if (tag == TAG_LATENCY_MARKER) { target.writeLong(source.readLong()); - target.writeInt(source.readInt()); + target.writeLong(source.readLong()); + target.writeLong(source.readLong()); target.writeInt(source.readInt()); } else { throw new IOException("Corrupt stream, found tag: " + tag); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java index 4a993179793..0e4e84b5d30 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.watermark.Watermark; import org.junit.Test; @@ -89,6 +90,9 @@ public void testSerialization() throws Exception { Watermark negativeWatermark = new Watermark(-4647654567676555876L); assertEquals(negativeWatermark, serializeAndDeserialize(negativeWatermark, serializer)); + + LatencyMarker latencyMarker = new LatencyMarker(System.currentTimeMillis(), new OperatorID(-1, -1), 1); + assertEquals(latencyMarker, serializeAndDeserialize(latencyMarker, serializer)); } @SuppressWarnings("unchecked") ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > ----------------------------------------------------- > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug > Reporter: Ben La Monica > Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)