Hi Yuval,

could you share a reproducible example with us?

I see you are using SQL / Table API with a RAW type. I could imagine that the KryoSerializer is configured differently when serializing and when deserializing. This might be due to `ExecutionConfig` not shipped (or copied) through the stack correctly.

Even though an error in the stack should be visible immediately and not after 30 seconds, I still would also investigate an error in this direction.

Regards,
Timo


On 13.01.21 09:47, Piotr Nowojski wrote:
Hi Yuval,

I have a couple of questions:

1. I see that you are using Flink 1.12.0, is that correct?
2. Have you tried running your application with a different Flink version? If you are using 1.12.0, could you check Flink 1.11.3, or vice versa? 3. What's the configuration that you are using? For example, have you enabled unaligned checkpoints or some other feature? 4. Is the problem still there if you replace Kryo with something else (Java's serialisation?)? 5. Could it be a problem with dependency convergence? Like maybe there are different versions of Flink jars present during runtime? 6. Lastly, would it be possible for you to prepare a minimal example that could reproduce the problem?

Piotrek

wt., 12 sty 2021 o 17:19 Yuval Itzchakov <yuva...@gmail.com <mailto:yuva...@gmail.com>> napisał(a):

    Hi Chesnay,
    Turns out it didn't actually work, there were one or two
    successful runs but the problem still persists (it's a bit non
    deterministic, and doesn't always reproduce when parallelism is set
    to 1).

    I turned off all Kryo custom serialization and am only using Flink
    provided one's ATM, the problem still persists.
    There seems to be an issue with how Flink serializes these raw types
    over the wire, but I still can't put my finger as to what the
    problem is.

    What I can see is that Flink tries to consume a HybridMemorySegment
    which contains one of these custom raw types I have and because of
    malformed content it receives a negative length for the byte array:

    image.png

    Content seems to be prepended with a bunch of NULL values which
    through off the length calculation:

    image.png

    But I still don't have the entire chain of execution wrapped
    mentally in my head, trying to figure it out.

    An additional error I'm receiving, even when removing the
    problematic JSON field and switching it out for a String:

    java.lang.IllegalStateException: When there are multiple buffers, an
    unfinished bufferConsumer can not be at the head of the buffers queue.
    at
    org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
    ~[flink-core-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277)
    ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
    ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214)
    ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
    ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
    ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
    ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
    ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
    ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
    ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]

    On Tue, Jan 12, 2021 at 4:14 PM Chesnay Schepler <ches...@apache.org
    <mailto:ches...@apache.org>> wrote:

        I would think that the likely explanation is some bug in the
        formatting code of the library you are using.
        Just for fun you could try manually removing all spaces within
        write and see how that turns out. (let's ignore for now that
        this might also affect keys&values).

        On 1/12/2021 2:38 PM, Yuval Itzchakov wrote:
        OK, this turned out to actually be a problem with the Kryo
        serialization. For some reason, it does not like that I try to
        generate a JSON with no spaces, only when I use 2 spaces will
        it work properly.
        I am at loss of words.

        Just to emphasize the difference:

        No Spaces:

        image.png
        Doesn't work.

        With spaces:

        image.png

        works fine.



        On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov
        <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:

            Further debugging this issue, this currently seems
            unrelated to Kryo at all.

            I have a stage that emits a case class down the stream. I
            can see the serialization part works fine, but when the
            receiving side is attempting to deserialize the the case class
            it receives a NonSpanningWrapper that has already
            surpassed it's buffer limit:

            image.png

            Any help would be greatly appreciated.

            On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov
            <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:

                Hi,

                I've implemented a KryoSerializer for a specific JSON
                type in my application as I have a bunch of UDFs
                that depend on a RAW('io.circe.Json') encoder being
                available. The implementation is rather simple. When I
                run my Flink application with Kryo in trace logs, I
                see that data gets properly serialized / deserialized
                using the serializer. However, after about 30 seconds,
                the application blows up with the following error:

                Caused by: java.io.IOException: Serializer consumed
                more bytes than the record had. This indicates broken
                serialization. If you are using custom serialization
                types (Value or Writable), check their serialization
                methods. If you are using a Kryo-serialized type,
                check the corresponding Kryo serializer.
                at
                
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
                at
                
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
                at
                
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
                at
                
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
                at
                
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
                at
                
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
                at
                
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
                at
                
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
                at
                
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
                at
                org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
                at
                org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
                at java.lang.Thread.run(Thread.java:748)
                Caused by: java.lang.IndexOutOfBoundsException: pos:
                140513145180741, length: 733793654, index: 69, offset: 0
                at
                
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
                at
                
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
                at
                
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
                at
                
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
                at
                
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
                at
                
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
                at
                
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
                at
                
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
                at
                
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
                at
                
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
                ... 11 more

                Or with the following exception:

                Caused by: java.lang.NegativeArraySizeException
                at
                
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
                at
                
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
                at
                
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
                at
                
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
                at
                
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
                at
                
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
                at
                
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
                at
                
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
                at
                
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
                at
                
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
                at
                
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
                at
                
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
                at
                
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
                at
                
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
                at
                
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
                at
                org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
                at
                org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
                at java.lang.Thread.run(Thread.java:748)

                I have, however, checked that the serialization works
                properly and there is no issue there.
                I have the following registration during bootstrap:

                flink.registerType(classOf[Json])
                flink.addDefaultKryoSerializer(classOf[Json],
                classOf[JsonKryoSerializer])

                And the following is the implementation of the
                serializer:

                import com.esotericsoftware.kryo.io.{ Input, Output }
                import com.esotericsoftware.kryo.{ Kryo, Serializer }
                import io.circe.Json
                import io.circe.jawn.JawnParser

                final class JsonKryoSerializer extends
                Serializer[Json](true, false) with Serializable {
                  private val jawnParser = new JawnParser()

                  override def write(kryo: Kryo, output: Output,
                `object`: Json): Unit =
                    output.writeString(`object`.noSpaces)

                  override def read(kryo: Kryo, input: Input, `type`:
                Class[Json]): Json = {
                    val str = input.readString()

                    if (str == null) Json.Null
                    else
                      jawnParser.parse(str) match {
                        case Left(err)    => throw err
                        case Right(value) => value
                      }
                  }

                  override def copy(kryo: Kryo, original: Json): Json =
                    jawnParser.parse(original.noSpaces) match {
                      case Left(err)    => throw err
                      case Right(value) => value
                    }
                }

                Would appreciate any help on how to debug this further.

-- Best Regards,
                Yuval Itzchakov.



-- Best Regards,
            Yuval Itzchakov.



-- Best Regards,
        Yuval Itzchakov.




-- Best Regards,
    Yuval Itzchakov.


Reply via email to