I've narrowed it down to a TableSource that is returning a MAP type as a
column. Only errors when the column is referenced, and not on the first
row, but somewhere in the stream of rows.

On 1.15 master branch (I need the new JSON features in 1.15 for this
project so riding the daily snapshot during development)

In catalog column is defined as
.column("vc", DataTypes.MAP(DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.STRING())))

My TableFunction is returning the following for the column

                  return new GenericMapData(
                      fields.<StringData, ArrayData>toJavaMap(
                          v ->
                              new Tuple2(
                                  StringData.fromString(v.getKey()),
                                  new GenericArrayData(
                                      v.getValue().isArray()
                                          ? List.ofAll(() -> ((ArrayNode)
v.getValue()).elements())
                                              .map(vv ->
StringData.fromString(vv.asText()))

.toJavaArray(StringData[]::new)
                                          :
List.of(StringData.fromString(v.getValue().asText()))

.toJavaArray(StringData[]::new)))));
                });

Where it's basically looping over a jackson JsonNode parsed from a DB table
and returning as a MAP (the keys and values are sparse amongst hundreds of
possibilities). The values in the Json are either a single text value, or
an array of text values so I'm just turning all values into an array.

There are around ~190 key-values in the map on average.

The SQL that references the column is just

COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type,

So looks up a specific key and uses it if it exists, otherwise coalesces to
a generic string.

And I keep getting this exception during the processing on a random row.

Caused by: java.lang.IndexOutOfBoundsException: offset=0, targetOffset=24,
numBytes=8, address=16, targetAddress=16
at
org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93)
at
org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30)
at
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
at
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(ArrayDataSerializer.java:210)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:109)
at
org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
at
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
at
org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
at
org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
at TableCalcMapFunction$130.flatMap_split26(Unknown Source)
at TableCalcMapFunction$130.flatMap(Unknown Source)
at
org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)

Is that enough context or is there something else I can give you all?

Thanks!




On Tue, Feb 15, 2022 at 1:24 PM Sid Kal <flinkbyhe...@gmail.com> wrote:

> Hi Jonathan,
>
> It would be better if you describe your scenario along with the code. It
> would be easier for the community to help.
>
> On Tue, 15 Feb 2022, 23:33 Jonathan Weaver, <myordinat...@gmail.com>
> wrote:
>
>> I'm getting the following exception running locally from my IDE
>> (IntelliJ) but seems to not occur
>> when running on a cluster. I'm assuming it may be related to memory
>> settings on the runtime (machine has 64GB of ram avail) but not sure what
>> setting to try and change.
>>
>> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
>> targetOffset=3568, numBytes=40, address=16, targetAddress=16
>> at
>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:110)
>> at
>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
>> at
>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
>> at
>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
>> at
>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
>> at TableCalcMapFunction$148.flatMap_split18(Unknown Source)
>> at TableCalcMapFunction$148.flatMap(Unknown Source)
>> at
>> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
>>
>> Was wondering if anyone had any insights or pointers on what could be
>> causing that?
>>
>> Thanks!
>> Jonathan
>>
>>

Reply via email to