Hi Timo and Piotr,
Let me try and answer all your questions:
Piotr:
1. Yes, I am using Flink 1.12.0
2. I have no tried downgrading to Flink 1.11.3, as I have features
that are specific to 1.12 that I need (namely the ability to create
a DataStreamScanProvider which was not available in previous versions)
3. I am using a pretty standard configuration. The only thing I've set
was checkpointing (using the default MemoryStateBackend):
image.png
4. This is the interesting bit. When I try to create a small
reproduction outside the codebase, using a simple source the issue
does not reproduce, both with default Kryo serialization and with my
own Kryo serializer.
5. No, here is the relevant bit of build.sbt (flinkVersion is set to
1.12)
image.png
6. I am trying to come up with a reproduction, thus far with no luck.
Here's what I have so far:
https://github.com/YuvalItzchakov/flink-bug-repro
<https://github.com/YuvalItzchakov/flink-bug-repro>. I am afraid that
there are many more moving parts that are affecting this issue (I have
a custom flink source and sink involved)
Timo:
I am explicitly passing a serialized string of my custom Kryo
serializer to the UDF (see
https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31
<https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31>).
I can validate that both serialization and deserialization invoke the
method defined on my custom serializer, if that's what you mean.
Otherwise, if there's a mismatch between the two serializers Flink
blows up at runtime saying that the types don't match.
On Wed, Jan 13, 2021 at 1:19 PM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Yuval,
could you share a reproducible example with us?
I see you are using SQL / Table API with a RAW type. I could imagine
that the KryoSerializer is configured differently when serializing
and
when deserializing. This might be due to `ExecutionConfig` not
shipped
(or copied) through the stack correctly.
Even though an error in the stack should be visible immediately
and not
after 30 seconds, I still would also investigate an error in this
direction.
Regards,
Timo
On 13.01.21 09:47, Piotr Nowojski wrote:
> Hi Yuval,
>
> I have a couple of questions:
>
> 1. I see that you are using Flink 1.12.0, is that correct?
> 2. Have you tried running your application with a different Flink
> version? If you are using 1.12.0, could you check Flink 1.11.3,
or vice
> versa?
> 3. What's the configuration that you are using? For example, have
you
> enabled unaligned checkpoints or some other feature?
> 4. Is the problem still there if you replace Kryo with something
else
> (Java's serialisation?)?
> 5. Could it be a problem with dependency convergence? Like maybe
there
> are different versions of Flink jars present during runtime?
> 6. Lastly, would it be possible for you to prepare a minimal
example
> that could reproduce the problem?
>
> Piotrek
>
> wt., 12 sty 2021 o 17:19 Yuval Itzchakov <yuva...@gmail.com
<mailto:yuva...@gmail.com>
> <mailto:yuva...@gmail.com <mailto:yuva...@gmail.com>>> napisał(a):
>
> Hi Chesnay,
> Turns out it didn't actually work, there were one or two
> successful runs but the problem still persists (it's a bit non
> deterministic, and doesn't always reproduce when parallelism
is set
> to 1).
>
> I turned off all Kryo custom serialization and am only using
Flink
> provided one's ATM, the problem still persists.
> There seems to be an issue with how Flink serializes these
raw types
> over the wire, but I still can't put my finger as to what the
> problem is.
>
> What I can see is that Flink tries to consume a
HybridMemorySegment
> which contains one of these custom raw types I have and
because of
> malformed content it receives a negative length for the byte
array:
>
> image.png
>
> Content seems to be prepended with a bunch of NULL values
which
> through off the length calculation:
>
> image.png
>
> But I still don't have the entire chain of execution wrapped
> mentally in my head, trying to figure it out.
>
> An additional error I'm receiving, even when removing the
> problematic JSON field and switching it out for a String:
>
> java.lang.IllegalStateException: When there are multiple
buffers, an
> unfinished bufferConsumer can not be at the head of the
buffers queue.
> at
>
org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
> ~[flink-core-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277)
> ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
> ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214)
> ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
> ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
> ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
> ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
> at
>
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
> ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.io
<http://runtime.io>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
> ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.io
<http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
> ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.io
<http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
> at
>
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
> ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
> at
>
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
> at
>
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
> ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
> at
>
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
> ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
> at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
> at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
>
> On Tue, Jan 12, 2021 at 4:14 PM Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>
> <mailto:ches...@apache.org <mailto:ches...@apache.org>>>
wrote:
>
> I would think that the likely explanation is some bug
in the
> formatting code of the library you are using.
> Just for fun you could try manually removing all spaces
within
> write and see how that turns out. (let's ignore for now
that
> this might also affect keys&values).
>
> On 1/12/2021 2:38 PM, Yuval Itzchakov wrote:
>> OK, this turned out to actually be a problem with the
Kryo
>> serialization. For some reason, it does not like that I
try to
>> generate a JSON with no spaces, only when I use 2 spaces
will
>> it work properly.
>> I am at loss of words.
>>
>> Just to emphasize the difference:
>>
>> No Spaces:
>>
>> image.png
>> Doesn't work.
>>
>> With spaces:
>>
>> image.png
>>
>> works fine.
>>
>>
>>
>> On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov
>> <yuva...@gmail.com <mailto:yuva...@gmail.com>
<mailto:yuva...@gmail.com <mailto:yuva...@gmail.com>>> wrote:
>>
>> Further debugging this issue, this currently seems
>> unrelated to Kryo at all.
>>
>> I have a stage that emits a case class down the
stream. I
>> can see the serialization part works fine, but
when the
>> receiving side is attempting to deserialize the the
case class
>> it receives a NonSpanningWrapper that has already
>> surpassed it's buffer limit:
>>
>> image.png
>>
>> Any help would be greatly appreciated.
>>
>> On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov
>> <yuva...@gmail.com <mailto:yuva...@gmail.com>
<mailto:yuva...@gmail.com <mailto:yuva...@gmail.com>>> wrote:
>>
>> Hi,
>>
>> I've implemented a KryoSerializer for a specific
JSON
>> type in my application as I have a bunch of UDFs
>> that depend on a RAW('io.circe.Json') encoder
being
>> available. The implementation is rather simple.
When I
>> run my Flink application with Kryo in trace
logs, I
>> see that data gets properly serialized /
deserialized
>> using the serializer. However, after about 30
seconds,
>> the application blows up with the following
error:
>>
>> Caused by: java.io.IOException: Serializer
consumed
>> more bytes than the record had. This indicates
broken
>> serialization. If you are using custom
serialization
>> types (Value or Writable), check their
serialization
>> methods. If you are using a Kryo-serialized type,
>> check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>> at
>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>> at
>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>> at
>> org.apache.flink.streaming.runtime.io
<http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>> at
>> org.apache.flink.streaming.runtime.io
<http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>> at
>>
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>> at
>>
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>> at
>>
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>> at
>>
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>> at
>>
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> at
>>
org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
java.lang.IndexOutOfBoundsException: pos:
>> 140513145180741, length: 733793654, index: 69,
offset: 0
>> at
>>
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
>> at
>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
>> at
>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
>> at
>>
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
>> at
>>
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>> at
>>
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>> at
>>
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>> at
>>
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>> at
>>
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>> at
>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>> ... 11 more
>>
>> Or with the following exception:
>>
>> Caused by: java.lang.NegativeArraySizeException
>> at
>>
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
>> at
>>
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>> at
>>
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>> at
>>
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>> at
>>
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>> at
>>
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>> at
>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>> at
>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>> at
>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>> at
>> org.apache.flink.streaming.runtime.io
<http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>> at
>> org.apache.flink.streaming.runtime.io
<http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>> at
>>
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>> at
>>
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>> at
>>
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>> at
>>
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>> at
>>
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> at
>>
org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> I have, however, checked that the serialization
works
>> properly and there is no issue there.
>> I have the following registration during
bootstrap:
>>
>> flink.registerType(classOf[Json])
>> flink.addDefaultKryoSerializer(classOf[Json],
>> classOf[JsonKryoSerializer])
>>
>> And the following is the implementation of the
>> serializer:
>>
>> import com.esotericsoftware.kryo.io.{ Input,
Output }
>> import com.esotericsoftware.kryo.{ Kryo,
Serializer }
>> import io.circe.Json
>> import io.circe.jawn.JawnParser
>>
>> final class JsonKryoSerializer extends
>> Serializer[Json](true, false) with Serializable {
>> private val jawnParser = new JawnParser()
>>
>> override def write(kryo: Kryo, output: Output,
>> `object`: Json): Unit =
>> output.writeString(`object`.noSpaces)
>>
>> override def read(kryo: Kryo, input: Input,
`type`:
>> Class[Json]): Json = {
>> val str = input.readString()
>>
>> if (str == null) Json.Null
>> else
>> jawnParser.parse(str) match {
>> case Left(err) => throw err
>> case Right(value) => value
>> }
>> }
>>
>> override def copy(kryo: Kryo, original: Json):
Json =
>> jawnParser.parse(original.noSpaces) match {
>> case Left(err) => throw err
>> case Right(value) => value
>> }
>> }
>>
>> Would appreciate any help on how to debug this
further.
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>
--
Best Regards,
Yuval Itzchakov.