FYI: Yuval and I scheduled a call to investigate this serialization issue remotely on Monday. If you have any idea by looking at the code beforehand, let us know.

On 28.01.21 16:57, Yuval Itzchakov wrote:
Hi Timo,

The code example I posted doesn't really match the code that is causing this issue. I tried to extend it a bit but couldn't make the reproduction work there. I am no longer using the serialized strings, but registering the custom serializers with the runtime during bootstrap and overriding getTypeInference to provide the raw data type.

But again, I disabled the custom serializer for the test to make sure it is not the one causing the issues.

Regarding FLINK-20986 <https://issues.apache.org/jira/browse/FLINK-20986>, I'm not sure but I am no longer using the old type system so everything should pass through InternalTypeInfo and RawType. I don't see any type equality issues, and I see the same serializer being invoked for both serialization and deserialization.

On Thu, Jan 28, 2021 at 5:51 PM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    This is helpful information. So I guess the problem must be in the
    flink-table module and not in flink-core. I will try to reserve some
    time tomorrow to look into the code again. How did you express
    RawType(Array[String])? Again with fully serialized type string?

    Could it be related to
    https://issues.apache.org/jira/browse/FLINK-20986
    <https://issues.apache.org/jira/browse/FLINK-20986> ?

    Regards,
    Timo


    On 28.01.21 16:30, Yuval Itzchakov wrote:
     > Hi Timo,
     >
     > I tried replacing it with an ordinary ARRAY<STRING> DataType, which
     > doesn't reproduce the issue.
     > If I use a RawType(Array[String]), the problem still manifests, so I
     > assume it's not directly related to a Kryo serialization of the
    specific
     > underlying type (io.circe.Json), but something in the way it
    interacts
     > with BinaryRawValueData and writing out to the network buffer
    behind the
     > scenes.
     >
     > On Thu, Jan 28, 2021 at 5:26 PM Timo Walther <twal...@apache.org
    <mailto:twal...@apache.org>
     > <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
     >
     >     Hi Yuval,
     >
     >     we should definitely find the root cause of this issue. It
    helps if the
     >     exception happens frequently to nail down the problem.
     >
     >     Have you tried to replace the JSON object with a regular
    String? If the
     >     exception is gone after this change. I believe it must be the
     >     serialization and not the network stack.
     >
     >     Regards,
     >     Timo
     >
     >
     >     On 28.01.21 10:29, Yuval Itzchakov wrote:
     >      > Hi,
     >      >
     >      > I previously wrote about a problem I believed was caused
    by Kryo
     >      > serialization
     >      >
>  (https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E> >  <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>>
     >
     >      >
>  <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E> >  <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>>>),
     >
     >      > which I am no longer sure is the case.
     >      >
     >      > I have a job which involves a TableScan via a custom
    source operator
     >      > which generates a DataStream[RowData], a UDF to parse out a
     >     String =>
     >      > io.circe.Json object (which internally flows as a
     >     RAW('io.circe.Json')
     >      > data-type), and then an AggregateFunction with a
    java.util.List
     >      > accumulator which returns one of these objects and is used
    in a
     >     tumbling
     >      > window as follows:
     >      >
     >      >      SELECT any_json_array_value(parse_array(resources)) as
     >     resources_sample
     >      >      FROM foo
     >      >      GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)
     >      >
     >      > It generates the following physical plan:
     >      >
     >      > optimize result:
     >      > Sink(table=[catalog.default-db.foo],
    fields=[resources_sample])
     >      > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$,
    event_time,
     >      > 3600000)], select=[any_json_array_value($f1) AS
    resources_sample])
     >      >     +- Exchange(distribution=[single])
     >      >        +- Calc(select=[event_time, parse_array(resources)
    AS $f1])
     >      >           +- WatermarkAssigner(rowtime=[event_time],
     >     watermark=[event_time])
     >      >              +- TableSourceScan(table=[[catalog,
    default-db, foo]],
     >      > fields=[resources])
     >      >
     >      > When I run my job, I receive the following exception after
    10 - 30
     >      > seconds (it varies, which gives me a hunch this is related to
     >     some race
     >      > condition that might be happening):
     >      >
     >      > Caused by: java.io.IOException: Can't get next record for
    channel
     >      > InputChannelInfo{gateIdx=0, inputChannelIdx=0}
     >      > at
     >      > org.apache.flink.streaming.runtime.io
    <http://org.apache.flink.streaming.runtime.io>
     >     <http://runtime.io
    
<http://runtime.io>>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
     >      > at
     >      > org.apache.flink.streaming.runtime.io
    <http://org.apache.flink.streaming.runtime.io>
     >     <http://runtime.io
    
<http://runtime.io>>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
     >      > at
     >      >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
     >      > at
     >      >
>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
     >      > at
     >      >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
     >      > at
     >      >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
     >      > at
    org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
     >      > at
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
     >      > at java.lang.Thread.run(Thread.java:748)
     >      > Caused by: java.io.IOException: Serializer consumed more bytes
     >     than the
     >      > record had. This indicates broken serialization. If you
    are using
     >     custom
     >      > serialization types (Value or Writable), check their
    serialization
     >      > methods. If you are using a Kryo-serialized type, check the
     >      > corresponding Kryo serializer.
     >      > at
     >      > org.apache.flink.runtime.io
    <http://org.apache.flink.runtime.io>
     >     <http://org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
     >      > at
     >      > org.apache.flink.runtime.io
    <http://org.apache.flink.runtime.io>
     >     <http://org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111)
     >      > at
     >      > org.apache.flink.runtime.io
    <http://org.apache.flink.runtime.io>
     >     <http://org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86)
     >      > at
     >      > org.apache.flink.streaming.runtime.io
    <http://org.apache.flink.streaming.runtime.io>
     >     <http://runtime.io
    
<http://runtime.io>>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163)
     >      > ... 8 more
     >      > Caused by: java.lang.IndexOutOfBoundsException: pos:
     >     140289414591019,
     >      > length: 546153590, index: 43, offset: 0
     >      > at
     >      >
>  org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
     >      > at
     >      > org.apache.flink.runtime.io
    <http://org.apache.flink.runtime.io>
     >     <http://org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
     >      > at
     >      > org.apache.flink.runtime.io
    <http://org.apache.flink.runtime.io>
     >     <http://org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
     >      > at
     >      >
>  org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
     >      > at
     >      >
>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
     >      > at
     >      >
>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
     >      > at
     >      >
>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
     >      > at
     >      >
>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
     >      > at
     >      >
>  org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
     >      > at
     >      > org.apache.flink.runtime.io
    <http://org.apache.flink.runtime.io>
     >     <http://org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
     >      > ... 11 more
     >      > Caused by: java.lang.IndexOutOfBoundsException: pos:
     >     140289414591019,
     >      > length: 546153590, index: 43, offset: 0
     >      > at
     >      >
>  org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
     >      > at
     >      > org.apache.flink.runtime.io
    <http://org.apache.flink.runtime.io>
     >     <http://org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
     >      > at
     >      > org.apache.flink.runtime.io
    <http://org.apache.flink.runtime.io>
     >     <http://org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
     >      > at
     >      >
>  org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
     >      > at
     >      >
>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
     >      > at
     >      >
>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
     >      > at
     >      >
>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
     >      > at
     >      >
>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
     >      > at
     >      >
>  org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
     >      > at
     >      > org.apache.flink.runtime.io
    <http://org.apache.flink.runtime.io>
     >     <http://org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
     >      > ... 11 more
     >      >
     >      > Just to say that right now I am using the built in Kryo
     >     serialization
     >      > with no custom serializers.
     >      >
     >      > Upon further investigation, it seems that
    StreamElementSerializer is
     >      > receiving a corrupt stream, which causes it to think it
    received a
     >      > record with timestamp:
     >      >
     >      > image.png
     >      >
     >      > As you can see, the tag is incorrectly as 0 and then a
    timestamp is
     >      > attempted to be read, yielding an invalid value (16), and
    once the
     >      > TypeSerialzier (BinaryRowDataSerializer) will try to
    decode this,
     >     it'll
     >      > fail with the index out of bounds exception.
     >      >
     >      > The interesting thing is that, when I *disable operator
    chaining*
     >      > completely via
     >     (StreamExecutionEnvironment.disableOperatorChaining), the
     >      > problem does not reproduce.
     >      >
     >      > I am wondering which sections of the networking +
    serialization
     >     stack
     >      > may help me further investigate this issue and understand
    what is
     >      > causing the corrupt stream to emerge, or perhaps if there are
     >     additional
     >      > logs that could assist.
     >      >
     >      > --
     >      > Best Regards,
     >      > Yuval Itzchakov.
     >
     >
     >
     > --
     > Best Regards,
     > Yuval Itzchakov.



--
Best Regards,
Yuval Itzchakov.

Reply via email to