Hi, >From what I understand, you're creating a scalar function taking a string with json and then converting it to a map using a custom function.
Assuming I understood correctly, I think the problem here is that you're using internal data types for UDFs, which is discouraged in most of the use cases. Rather than using StringData, MapData, ArrayData etc you should just use Java's String, Map and arrays. Check out this particular paragraph of our docs that shows using complex types for scalar functions: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference. Please try to convert Looking only at the exception you provide here, it definitely seems like a wrong usage of the internal data types, like that Tuple2 inserted into a GenericMapData. There are no checks in GenericMapData to check that you're constructing it with the correct types, and since Tuple2 is not a correct type, the serializer just fails hard. Please correct me if I misunderstood what you're doing, and in case provide more info about what your goal and how you've implemented the job. FG On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver <myordinat...@gmail.com> wrote: > I've narrowed it down to a TableSource that is returning a MAP type as a > column. Only errors when the column is referenced, and not on the first > row, but somewhere in the stream of rows. > > On 1.15 master branch (I need the new JSON features in 1.15 for this > project so riding the daily snapshot during development) > > In catalog column is defined as > .column("vc", DataTypes.MAP(DataTypes.STRING(), > DataTypes.ARRAY(DataTypes.STRING()))) > > My TableFunction is returning the following for the column > > return new GenericMapData( > fields.<StringData, ArrayData>toJavaMap( > v -> > new Tuple2( > StringData.fromString(v.getKey()), > new GenericArrayData( > v.getValue().isArray() > ? List.ofAll(() -> ((ArrayNode) > v.getValue()).elements()) > .map(vv -> > StringData.fromString(vv.asText())) > > .toJavaArray(StringData[]::new) > : > List.of(StringData.fromString(v.getValue().asText())) > > .toJavaArray(StringData[]::new))))); > }); > > Where it's basically looping over a jackson JsonNode parsed from a DB > table and returning as a MAP (the keys and values are sparse amongst > hundreds of possibilities). The values in the Json are either a single text > value, or an array of text values so I'm just turning all values into an > array. > > There are around ~190 key-values in the map on average. > > The SQL that references the column is just > > COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type, > > So looks up a specific key and uses it if it exists, otherwise coalesces > to a generic string. > > And I keep getting this exception during the processing on a random row. > > Caused by: java.lang.IndexOutOfBoundsException: offset=0, targetOffset=24, > numBytes=8, address=16, targetAddress=16 > at > org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441) > at > org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249) > at > org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93) > at > org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30) > at > org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140) > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(ArrayDataSerializer.java:210) > at > org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:109) > at > org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30) > at > org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147) > at > org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175) > at > org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109) > at TableCalcMapFunction$130.flatMap_split26(Unknown Source) > at TableCalcMapFunction$130.flatMap(Unknown Source) > at > org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119) > > Is that enough context or is there something else I can give you all? > > Thanks! > > > > > On Tue, Feb 15, 2022 at 1:24 PM Sid Kal <flinkbyhe...@gmail.com> wrote: > >> Hi Jonathan, >> >> It would be better if you describe your scenario along with the code. It >> would be easier for the community to help. >> >> On Tue, 15 Feb 2022, 23:33 Jonathan Weaver, <myordinat...@gmail.com> >> wrote: >> >>> I'm getting the following exception running locally from my IDE >>> (IntelliJ) but seems to not occur >>> when running on a cluster. I'm assuming it may be related to memory >>> settings on the runtime (machine has 64GB of ram avail) but not sure what >>> setting to try and change. >>> >>> Caused by: java.lang.IndexOutOfBoundsException: offset=0, >>> targetOffset=3568, numBytes=40, address=16, targetAddress=16 >>> at >>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441) >>> at >>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249) >>> at >>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:110) >>> at >>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30) >>> at >>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147) >>> at >>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175) >>> at >>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109) >>> at TableCalcMapFunction$148.flatMap_split18(Unknown Source) >>> at TableCalcMapFunction$148.flatMap(Unknown Source) >>> at >>> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119) >>> >>> Was wondering if anyone had any insights or pointers on what could be >>> causing that? >>> >>> Thanks! >>> Jonathan >>> >>>