Hi Timo, The code example I posted doesn't really match the code that is causing this issue. I tried to extend it a bit but couldn't make the reproduction work there. I am no longer using the serialized strings, but registering the custom serializers with the runtime during bootstrap and overriding getTypeInference to provide the raw data type.
But again, I disabled the custom serializer for the test to make sure it is not the one causing the issues. Regarding FLINK-20986 <https://issues.apache.org/jira/browse/FLINK-20986>, I'm not sure but I am no longer using the old type system so everything should pass through InternalTypeInfo and RawType. I don't see any type equality issues, and I see the same serializer being invoked for both serialization and deserialization. On Thu, Jan 28, 2021 at 5:51 PM Timo Walther <twal...@apache.org> wrote: > This is helpful information. So I guess the problem must be in the > flink-table module and not in flink-core. I will try to reserve some > time tomorrow to look into the code again. How did you express > RawType(Array[String])? Again with fully serialized type string? > > Could it be related to https://issues.apache.org/jira/browse/FLINK-20986 ? > > Regards, > Timo > > > On 28.01.21 16:30, Yuval Itzchakov wrote: > > Hi Timo, > > > > I tried replacing it with an ordinary ARRAY<STRING> DataType, which > > doesn't reproduce the issue. > > If I use a RawType(Array[String]), the problem still manifests, so I > > assume it's not directly related to a Kryo serialization of the specific > > underlying type (io.circe.Json), but something in the way it interacts > > with BinaryRawValueData and writing out to the network buffer behind the > > scenes. > > > > On Thu, Jan 28, 2021 at 5:26 PM Timo Walther <twal...@apache.org > > <mailto:twal...@apache.org>> wrote: > > > > Hi Yuval, > > > > we should definitely find the root cause of this issue. It helps if > the > > exception happens frequently to nail down the problem. > > > > Have you tried to replace the JSON object with a regular String? If > the > > exception is gone after this change. I believe it must be the > > serialization and not the network stack. > > > > Regards, > > Timo > > > > > > On 28.01.21 10:29, Yuval Itzchakov wrote: > > > Hi, > > > > > > I previously wrote about a problem I believed was caused by Kryo > > > serialization > > > > > ( > https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E > > < > https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E > > > > > > > > > < > https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E > > < > https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3cf9c44b72-f221-8cd3-24cc-b1090345c...@apache.org%3E > >>), > > > > > which I am no longer sure is the case. > > > > > > I have a job which involves a TableScan via a custom source > operator > > > which generates a DataStream[RowData], a UDF to parse out a > > String => > > > io.circe.Json object (which internally flows as a > > RAW('io.circe.Json') > > > data-type), and then an AggregateFunction with a java.util.List > > > accumulator which returns one of these objects and is used in a > > tumbling > > > window as follows: > > > > > > SELECT any_json_array_value(parse_array(resources)) as > > resources_sample > > > FROM foo > > > GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR) > > > > > > It generates the following physical plan: > > > > > > optimize result: > > > Sink(table=[catalog.default-db.foo], fields=[resources_sample]) > > > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, > event_time, > > > 3600000)], select=[any_json_array_value($f1) AS resources_sample]) > > > +- Exchange(distribution=[single]) > > > +- Calc(select=[event_time, parse_array(resources) AS $f1]) > > > +- WatermarkAssigner(rowtime=[event_time], > > watermark=[event_time]) > > > +- TableSourceScan(table=[[catalog, default-db, > foo]], > > > fields=[resources]) > > > > > > When I run my job, I receive the following exception after 10 - 30 > > > seconds (it varies, which gives me a hunch this is related to > > some race > > > condition that might be happening): > > > > > > Caused by: java.io.IOException: Can't get next record for channel > > > InputChannelInfo{gateIdx=0, inputChannelIdx=0} > > > at > > > org.apache.flink.streaming.runtime.io > > <http://runtime.io > >.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166) > > > at > > > org.apache.flink.streaming.runtime.io > > <http://runtime.io > >.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > > at java.lang.Thread.run(Thread.java:748) > > > Caused by: java.io.IOException: Serializer consumed more bytes > > than the > > > record had. This indicates broken serialization. If you are using > > custom > > > serialization types (Value or Writable), check their serialization > > > methods. If you are using a Kryo-serialized type, check the > > > corresponding Kryo serializer. > > > at > > > org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io > >.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339) > > > at > > > org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io > >.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111) > > > at > > > org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io > >.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86) > > > at > > > org.apache.flink.streaming.runtime.io > > <http://runtime.io > >.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163) > > > ... 8 more > > > Caused by: java.lang.IndexOutOfBoundsException: pos: > > 140289414591019, > > > length: 546153590, index: 43, offset: 0 > > > at > > > > > > > org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) > > > at > > > org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io > >.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) > > > at > > > org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io > >.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) > > > at > > > > > > > org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) > > > at > > > > > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) > > > at > > > > > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) > > > at > > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) > > > at > > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > > > at > > > > > > > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > > > at > > > org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io > >.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > > > ... 11 more > > > Caused by: java.lang.IndexOutOfBoundsException: pos: > > 140289414591019, > > > length: 546153590, index: 43, offset: 0 > > > at > > > > > > > org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) > > > at > > > org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io > >.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) > > > at > > > org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io > >.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) > > > at > > > > > > > org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) > > > at > > > > > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) > > > at > > > > > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) > > > at > > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) > > > at > > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > > > at > > > > > > > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > > > at > > > org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io > >.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > > > ... 11 more > > > > > > Just to say that right now I am using the built in Kryo > > serialization > > > with no custom serializers. > > > > > > Upon further investigation, it seems that StreamElementSerializer > is > > > receiving a corrupt stream, which causes it to think it received a > > > record with timestamp: > > > > > > image.png > > > > > > As you can see, the tag is incorrectly as 0 and then a timestamp > is > > > attempted to be read, yielding an invalid value (16), and once the > > > TypeSerialzier (BinaryRowDataSerializer) will try to decode this, > > it'll > > > fail with the index out of bounds exception. > > > > > > The interesting thing is that, when I *disable operator chaining* > > > completely via > > (StreamExecutionEnvironment.disableOperatorChaining), the > > > problem does not reproduce. > > > > > > I am wondering which sections of the networking + serialization > > stack > > > may help me further investigate this issue and understand what is > > > causing the corrupt stream to emerge, or perhaps if there are > > additional > > > logs that could assist. > > > > > > -- > > > Best Regards, > > > Yuval Itzchakov. > > > > > > > > -- > > Best Regards, > > Yuval Itzchakov. > > -- Best Regards, Yuval Itzchakov.