Hi Timo,
The code example I posted doesn't really match the code that is causing
this issue. I tried to extend it a bit but couldn't make the
reproduction work there.
I am no longer using the serialized strings, but registering the custom
serializers with the runtime during bootstrap and overriding
getTypeInference to provide the raw data type.
But again, I disabled the custom serializer for the test to make sure it
is not the one causing the issues.
Regarding FLINK-20986
<https://issues.apache.org/jira/browse/FLINK-20986>, I'm not sure but I
am no longer using the old type system so everything should pass through
InternalTypeInfo and RawType. I don't see any type equality issues, and
I see the same serializer being invoked for both serialization and
deserialization.
On Thu, Jan 28, 2021 at 5:51 PM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
This is helpful information. So I guess the problem must be in the
flink-table module and not in flink-core. I will try to reserve some
time tomorrow to look into the code again. How did you express
RawType(Array[String])? Again with fully serialized type string?
Could it be related to
https://issues.apache.org/jira/browse/FLINK-20986
<https://issues.apache.org/jira/browse/FLINK-20986> ?
Regards,
Timo
On 28.01.21 16:30, Yuval Itzchakov wrote:
> Hi Timo,
>
> I tried replacing it with an ordinary ARRAY<STRING> DataType, which
> doesn't reproduce the issue.
> If I use a RawType(Array[String]), the problem still manifests, so I
> assume it's not directly related to a Kryo serialization of the
specific
> underlying type (io.circe.Json), but something in the way it
interacts
> with BinaryRawValueData and writing out to the network buffer
behind the
> scenes.
>
> On Thu, Jan 28, 2021 at 5:26 PM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>
> <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
>
> Hi Yuval,
>
> we should definitely find the root cause of this issue. It
helps if the
> exception happens frequently to nail down the problem.
>
> Have you tried to replace the JSON object with a regular
String? If the
> exception is gone after this change. I believe it must be the
> serialization and not the network stack.
>
> Regards,
> Timo
>
>
> On 28.01.21 10:29, Yuval Itzchakov wrote:
> > Hi,
> >
> > I previously wrote about a problem I believed was caused
by Kryo
> > serialization
> >
>
(https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>
>
<https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>>
>
> >
>
<https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>
>
<https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E>>>),
>
> > which I am no longer sure is the case.
> >
> > I have a job which involves a TableScan via a custom
source operator
> > which generates a DataStream[RowData], a UDF to parse out a
> String =>
> > io.circe.Json object (which internally flows as a
> RAW('io.circe.Json')
> > data-type), and then an AggregateFunction with a
java.util.List
> > accumulator which returns one of these objects and is used
in a
> tumbling
> > window as follows:
> >
> > SELECT any_json_array_value(parse_array(resources)) as
> resources_sample
> > FROM foo
> > GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)
> >
> > It generates the following physical plan:
> >
> > optimize result:
> > Sink(table=[catalog.default-db.foo],
fields=[resources_sample])
> > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$,
event_time,
> > 3600000)], select=[any_json_array_value($f1) AS
resources_sample])
> > +- Exchange(distribution=[single])
> > +- Calc(select=[event_time, parse_array(resources)
AS $f1])
> > +- WatermarkAssigner(rowtime=[event_time],
> watermark=[event_time])
> > +- TableSourceScan(table=[[catalog,
default-db, foo]],
> > fields=[resources])
> >
> > When I run my job, I receive the following exception after
10 - 30
> > seconds (it varies, which gives me a hunch this is related to
> some race
> > condition that might be happening):
> >
> > Caused by: java.io.IOException: Can't get next record for
channel
> > InputChannelInfo{gateIdx=0, inputChannelIdx=0}
> > at
> > org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>
> <http://runtime.io
<http://runtime.io>>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
> > at
> > org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>
> <http://runtime.io
<http://runtime.io>>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> > at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.IOException: Serializer consumed more bytes
> than the
> > record had. This indicates broken serialization. If you
are using
> custom
> > serialization types (Value or Writable), check their
serialization
> > methods. If you are using a Kryo-serialized type, check the
> > corresponding Kryo serializer.
> > at
> > org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>
> <http://org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
> > at
> > org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>
> <http://org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111)
> > at
> > org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>
> <http://org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86)
> > at
> > org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>
> <http://runtime.io
<http://runtime.io>>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163)
> > ... 8 more
> > Caused by: java.lang.IndexOutOfBoundsException: pos:
> 140289414591019,
> > length: 546153590, index: 43, offset: 0
> > at
> >
>
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
> > at
> > org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>
> <http://org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
> > at
> > org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>
> <http://org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
> > at
> >
>
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
> > at
> >
>
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
> > at
> >
>
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
> > at
> >
>
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
> > at
> >
>
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> > at
> >
>
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
> > at
> > org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>
> <http://org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> > ... 11 more
> > Caused by: java.lang.IndexOutOfBoundsException: pos:
> 140289414591019,
> > length: 546153590, index: 43, offset: 0
> > at
> >
>
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190)
> > at
> > org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>
> <http://org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
> > at
> > org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>
> <http://org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
> > at
> >
>
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
> > at
> >
>
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
> > at
> >
>
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
> > at
> >
>
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
> > at
> >
>
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> > at
> >
>
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
> > at
> > org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>
> <http://org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> > ... 11 more
> >
> > Just to say that right now I am using the built in Kryo
> serialization
> > with no custom serializers.
> >
> > Upon further investigation, it seems that
StreamElementSerializer is
> > receiving a corrupt stream, which causes it to think it
received a
> > record with timestamp:
> >
> > image.png
> >
> > As you can see, the tag is incorrectly as 0 and then a
timestamp is
> > attempted to be read, yielding an invalid value (16), and
once the
> > TypeSerialzier (BinaryRowDataSerializer) will try to
decode this,
> it'll
> > fail with the index out of bounds exception.
> >
> > The interesting thing is that, when I *disable operator
chaining*
> > completely via
> (StreamExecutionEnvironment.disableOperatorChaining), the
> > problem does not reproduce.
> >
> > I am wondering which sections of the networking +
serialization
> stack
> > may help me further investigate this issue and understand
what is
> > causing the corrupt stream to emerge, or perhaps if there are
> additional
> > logs that could assist.
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
--
Best Regards,
Yuval Itzchakov.