Hi Timo,

The code example I posted doesn't really match the code that is causing
this issue. I tried to extend it a bit but couldn't make the reproduction
work there.
I am no longer using the serialized strings, but registering the custom
serializers with the runtime during bootstrap and overriding
getTypeInference to provide the raw data type.

But again, I disabled the custom serializer for the test to make sure it is
not the one causing the issues.

Regarding FLINK-20986 <https://issues.apache.org/jira/browse/FLINK-20986>,
I'm not sure but I am no longer using the old type system so everything
should pass through InternalTypeInfo and RawType. I don't see any type
equality issues, and I see the same serializer being invoked for both
serialization and deserialization.

On Thu, Jan 28, 2021 at 5:51 PM Timo Walther <twal...@apache.org> wrote:

> This is helpful information. So I guess the problem must be in the
> flink-table module and not in flink-core. I will try to reserve some
> time tomorrow to look into the code again. How did you express
> RawType(Array[String])? Again with fully serialized type string?
>
> Could it be related to https://issues.apache.org/jira/browse/FLINK-20986 ?
>
> Regards,
> Timo
>
>
> On 28.01.21 16:30, Yuval Itzchakov wrote:
> > Hi Timo,
> >
> > I tried replacing it with an ordinary ARRAY<STRING> DataType, which
> > doesn't reproduce the issue.
> > If I use a RawType(Array[String]), the problem still manifests, so I
> > assume it's not directly related to a Kryo serialization of the specific
> > underlying type (io.circe.Json), but something in the way it interacts
> > with BinaryRawValueData and writing out to the network buffer behind the
> > scenes.
> >
> > On Thu, Jan 28, 2021 at 5:26 PM Timo Walther <twal...@apache.org
> > <mailto:twal...@apache.org>> wrote:
> >
> >     Hi Yuval,
> >
> >     we should definitely find the root cause of this issue. It helps if
> the
> >     exception happens frequently to nail down the problem.
> >
> >     Have you tried to replace the JSON object with a regular String? If
> the
> >     exception is gone after this change. I believe it must be the
> >     serialization and not the network stack.
> >
> >     Regards,
> >     Timo
> >
> >
> >     On 28.01.21 10:29, Yuval Itzchakov wrote:
> >      > Hi,
> >      >
> >      > I previously wrote about a problem I believed was caused by Kryo
> >      > serialization
> >      >
> >     (
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
> >     <
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
> >
> >
> >      >
> >     <
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
> >     <
> https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E
> >>),
> >
> >      > which I am no longer sure is the case.
> >      >
> >      > I have a job which involves a TableScan via a custom source
> operator
> >      > which generates a DataStream[RowData], a UDF to parse out a
> >     String =>
> >      > io.circe.Json object (which internally flows as a
> >     RAW('io.circe.Json')
> >      > data-type), and then an AggregateFunction with a java.util.List
> >      > accumulator which returns one of these objects and is used in a
> >     tumbling
> >      > window as follows:
> >      >
> >      >      SELECT any_json_array_value(parse_array(resources)) as
> >     resources_sample
> >      >      FROM foo
> >      >      GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)
> >      >
> >      > It generates the following physical plan:
> >      >
> >      > optimize result:
> >      > Sink(table=[catalog.default-db.foo], fields=[resources_sample])
> >      > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$,
> event_time,
> >      > 3600000)], select=[any_json_array_value($f1) AS resources_sample])
> >      >     +- Exchange(distribution=[single])
> >      >        +- Calc(select=[event_time, parse_array(resources) AS $f1])
> >      >           +- WatermarkAssigner(rowtime=[event_time],
> >     watermark=[event_time])
> >      >              +- TableSourceScan(table=[[catalog, default-db,
> foo]],
> >      > fields=[resources])
> >      >
> >      > When I run my job, I receive the following exception after 10 - 30
> >      > seconds (it varies, which gives me a hunch this is related to
> >     some race
> >      > condition that might be happening):
> >      >
> >      > Caused by: java.io.IOException: Can't get next record for channel
> >      > InputChannelInfo{gateIdx=0, inputChannelIdx=0}
> >      > at
> >      > org.apache.flink.streaming.runtime.io
> >     <http://runtime.io
> >.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
> >      > at
> >      > org.apache.flink.streaming.runtime.io
> >     <http://runtime.io
> >.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> >      > at
> >      >
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
> >      > at
> >      >
> >
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> >      > at
> >      >
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> >      > at
> >      >
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> >      > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> >      > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >      > at java.lang.Thread.run(Thread.java:748)
> >      > Caused by: java.io.IOException: Serializer consumed more bytes
> >     than the
> >      > record had. This indicates broken serialization. If you are using
> >     custom
> >      > serialization types (Value or Writable), check their serialization
> >      > methods. If you are using a Kryo-serialized type, check the
> >      > corresponding Kryo serializer.
> >      > at
> >      > org.apache.flink.runtime.io
> >     <http://org.apache.flink.runtime.io
> >.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
> >      > at
> >      > org.apache.flink.runtime.io
> >     <http://org.apache.flink.runtime.io
> >.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111)
> >      > at
> >      > org.apache.flink.runtime.io
> >     <http://org.apache.flink.runtime.io
> >.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86)
> >      > at
> >      > org.apache.flink.streaming.runtime.io
> >     <http://runtime.io
> >.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163)
> >      > ... 8 more
> >      > Caused by: java.lang.IndexOutOfBoundsException: pos:
> >     140289414591019,
> >      > length: 546153590, index: 43, offset: 0
> >      > at
> >      >
> >
>  
> org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
> >      > at
> >      > org.apache.flink.runtime.io
> >     <http://org.apache.flink.runtime.io
> >.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
> >      > at
> >      > org.apache.flink.runtime.io
> >     <http://org.apache.flink.runtime.io
> >.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
> >      > at
> >      >
> >
>  
> org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
> >      > at
> >      >
> >
>  
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
> >      > at
> >      >
> >
>  
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
> >      > at
> >      >
> >
>  
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
> >      > at
> >      >
> >
>  
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> >      > at
> >      >
> >
>  
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
> >      > at
> >      > org.apache.flink.runtime.io
> >     <http://org.apache.flink.runtime.io
> >.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> >      > ... 11 more
> >      > Caused by: java.lang.IndexOutOfBoundsException: pos:
> >     140289414591019,
> >      > length: 546153590, index: 43, offset: 0
> >      > at
> >      >
> >
>  
> org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
> >      > at
> >      > org.apache.flink.runtime.io
> >     <http://org.apache.flink.runtime.io
> >.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
> >      > at
> >      > org.apache.flink.runtime.io
> >     <http://org.apache.flink.runtime.io
> >.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
> >      > at
> >      >
> >
>  
> org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
> >      > at
> >      >
> >
>  
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
> >      > at
> >      >
> >
>  
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
> >      > at
> >      >
> >
>  
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
> >      > at
> >      >
> >
>  
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> >      > at
> >      >
> >
>  
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
> >      > at
> >      > org.apache.flink.runtime.io
> >     <http://org.apache.flink.runtime.io
> >.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> >      > ... 11 more
> >      >
> >      > Just to say that right now I am using the built in Kryo
> >     serialization
> >      > with no custom serializers.
> >      >
> >      > Upon further investigation, it seems that StreamElementSerializer
> is
> >      > receiving a corrupt stream, which causes it to think it received a
> >      > record with timestamp:
> >      >
> >      > image.png
> >      >
> >      > As you can see, the tag is incorrectly as 0 and then a timestamp
> is
> >      > attempted to be read, yielding an invalid value (16), and once the
> >      > TypeSerialzier (BinaryRowDataSerializer) will try to decode this,
> >     it'll
> >      > fail with the index out of bounds exception.
> >      >
> >      > The interesting thing is that, when I *disable operator chaining*
> >      > completely via
> >     (StreamExecutionEnvironment.disableOperatorChaining), the
> >      > problem does not reproduce.
> >      >
> >      > I am wondering which sections of the networking + serialization
> >     stack
> >      > may help me further investigate this issue and understand what is
> >      > causing the corrupt stream to emerge, or perhaps if there are
> >     additional
> >      > logs that could assist.
> >      >
> >      > --
> >      > Best Regards,
> >      > Yuval Itzchakov.
> >
> >
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
>
>

-- 
Best Regards,
Yuval Itzchakov.

Reply via email to