Hi Yuval,
could you share a reproducible example with us?
I see you are using SQL / Table API with a RAW type. I could imagine
that the KryoSerializer is configured differently when serializing and
when deserializing. This might be due to `ExecutionConfig` not shipped
(or copied) through the stack correctly.
Even though an error in the stack should be visible immediately and not
after 30 seconds, I still would also investigate an error in this direction.
Regards,
Timo
On 13.01.21 09:47, Piotr Nowojski wrote:
Hi Yuval,
I have a couple of questions:
1. I see that you are using Flink 1.12.0, is that correct?
2. Have you tried running your application with a different Flink
version? If you are using 1.12.0, could you check Flink 1.11.3, or vice
versa?
3. What's the configuration that you are using? For example, have you
enabled unaligned checkpoints or some other feature?
4. Is the problem still there if you replace Kryo with something else
(Java's serialisation?)?
5. Could it be a problem with dependency convergence? Like maybe there
are different versions of Flink jars present during runtime?
6. Lastly, would it be possible for you to prepare a minimal example
that could reproduce the problem?
Piotrek
wt., 12 sty 2021 o 17:19 Yuval Itzchakov <yuva...@gmail.com
<mailto:yuva...@gmail.com>> napisał(a):
Hi Chesnay,
Turns out it didn't actually work, there were one or two
successful runs but the problem still persists (it's a bit non
deterministic, and doesn't always reproduce when parallelism is set
to 1).
I turned off all Kryo custom serialization and am only using Flink
provided one's ATM, the problem still persists.
There seems to be an issue with how Flink serializes these raw types
over the wire, but I still can't put my finger as to what the
problem is.
What I can see is that Flink tries to consume a HybridMemorySegment
which contains one of these custom raw types I have and because of
malformed content it receives a negative length for the byte array:
image.png
Content seems to be prepended with a bunch of NULL values which
through off the length calculation:
image.png
But I still don't have the entire chain of execution wrapped
mentally in my head, trying to figure it out.
An additional error I'm receiving, even when removing the
problematic JSON field and switching it out for a String:
java.lang.IllegalStateException: When there are multiple buffers, an
unfinished bufferConsumer can not be at the head of the buffers queue.
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
~[flink-core-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
On Tue, Jan 12, 2021 at 4:14 PM Chesnay Schepler <ches...@apache.org
<mailto:ches...@apache.org>> wrote:
I would think that the likely explanation is some bug in the
formatting code of the library you are using.
Just for fun you could try manually removing all spaces within
write and see how that turns out. (let's ignore for now that
this might also affect keys&values).
On 1/12/2021 2:38 PM, Yuval Itzchakov wrote:
OK, this turned out to actually be a problem with the Kryo
serialization. For some reason, it does not like that I try to
generate a JSON with no spaces, only when I use 2 spaces will
it work properly.
I am at loss of words.
Just to emphasize the difference:
No Spaces:
image.png
Doesn't work.
With spaces:
image.png
works fine.
On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov
<yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
Further debugging this issue, this currently seems
unrelated to Kryo at all.
I have a stage that emits a case class down the stream. I
can see the serialization part works fine, but when the
receiving side is attempting to deserialize the the case class
it receives a NonSpanningWrapper that has already
surpassed it's buffer limit:
image.png
Any help would be greatly appreciated.
On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov
<yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
Hi,
I've implemented a KryoSerializer for a specific JSON
type in my application as I have a bunch of UDFs
that depend on a RAW('io.circe.Json') encoder being
available. The implementation is rather simple. When I
run my Flink application with Kryo in trace logs, I
see that data gets properly serialized / deserialized
using the serializer. However, after about 30 seconds,
the application blows up with the following error:
Caused by: java.io.IOException: Serializer consumed
more bytes than the record had. This indicates broken
serialization. If you are using custom serialization
types (Value or Writable), check their serialization
methods. If you are using a Kryo-serialized type,
check the corresponding Kryo serializer.
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: pos:
140513145180741, length: 733793654, index: 69, offset: 0
at
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
... 11 more
Or with the following exception:
Caused by: java.lang.NegativeArraySizeException
at
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
I have, however, checked that the serialization works
properly and there is no issue there.
I have the following registration during bootstrap:
flink.registerType(classOf[Json])
flink.addDefaultKryoSerializer(classOf[Json],
classOf[JsonKryoSerializer])
And the following is the implementation of the
serializer:
import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import io.circe.Json
import io.circe.jawn.JawnParser
final class JsonKryoSerializer extends
Serializer[Json](true, false) with Serializable {
private val jawnParser = new JawnParser()
override def write(kryo: Kryo, output: Output,
`object`: Json): Unit =
output.writeString(`object`.noSpaces)
override def read(kryo: Kryo, input: Input, `type`:
Class[Json]): Json = {
val str = input.readString()
if (str == null) Json.Null
else
jawnParser.parse(str) match {
case Left(err) => throw err
case Right(value) => value
}
}
override def copy(kryo: Kryo, original: Json): Json =
jawnParser.parse(original.noSpaces) match {
case Left(err) => throw err
case Right(value) => value
}
}
Would appreciate any help on how to debug this further.
--
Best Regards,
Yuval Itzchakov.
--
Best Regards,
Yuval Itzchakov.
--
Best Regards,
Yuval Itzchakov.
--
Best Regards,
Yuval Itzchakov.