No, I'm creating a custom SQL lookup table (which uses AsyncTableFunction<RowData>) which requires the internal types.
I implement the LookupTableSource, AsyncTableFunction<RowData>, DynamicTableSourceFactory trio as per the examples in the docs. My construction is the equivalent of this, and it still errors with that exception when using exactly this. Map<StringData, ArrayData> foo = new HashMap<StringData, ArrayData>(); foo.put( StringData.fromString("foo"), new GenericArrayData(new Object[] {StringData.fromString("bar")})); MapData mapColumn = new GenericMapData(foo); return (RowData)GenericRowData(new Object[] { mapColumn } ); On Wed, Feb 16, 2022 at 8:02 AM Francesco Guardiani <france...@ververica.com> wrote: > Hi, > > From what I understand, you're creating a scalar function taking a string > with json and then converting it to a map using a custom function. > > Assuming I understood correctly, I think the problem here is that you're > using internal data types for UDFs, which is discouraged in most of the use > cases. Rather than using StringData, MapData, ArrayData etc you should just > use Java's String, Map and arrays. Check out this particular paragraph of > our docs that shows using complex types for scalar functions: > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference. > Please try to convert > Looking only at the exception you provide here, it definitely seems like a > wrong usage of the internal data types, like that Tuple2 inserted into a > GenericMapData. There are no checks in GenericMapData to check that you're > constructing it with the correct types, and since Tuple2 is not a correct > type, the serializer just fails hard. > > Please correct me if I misunderstood what you're doing, and in case > provide more info about what your goal and how you've implemented the job. > > FG > > On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver <myordinat...@gmail.com> > wrote: > >> I've narrowed it down to a TableSource that is returning a MAP type as a >> column. Only errors when the column is referenced, and not on the first >> row, but somewhere in the stream of rows. >> >> On 1.15 master branch (I need the new JSON features in 1.15 for this >> project so riding the daily snapshot during development) >> >> In catalog column is defined as >> .column("vc", DataTypes.MAP(DataTypes.STRING(), >> DataTypes.ARRAY(DataTypes.STRING()))) >> >> My TableFunction is returning the following for the column >> >> return new GenericMapData( >> fields.<StringData, ArrayData>toJavaMap( >> v -> >> new Tuple2( >> StringData.fromString(v.getKey()), >> new GenericArrayData( >> v.getValue().isArray() >> ? List.ofAll(() -> ((ArrayNode) >> v.getValue()).elements()) >> .map(vv -> >> StringData.fromString(vv.asText())) >> >> .toJavaArray(StringData[]::new) >> : >> List.of(StringData.fromString(v.getValue().asText())) >> >> .toJavaArray(StringData[]::new))))); >> }); >> >> Where it's basically looping over a jackson JsonNode parsed from a DB >> table and returning as a MAP (the keys and values are sparse amongst >> hundreds of possibilities). The values in the Json are either a single text >> value, or an array of text values so I'm just turning all values into an >> array. >> >> There are around ~190 key-values in the map on average. >> >> The SQL that references the column is just >> >> COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type, >> >> So looks up a specific key and uses it if it exists, otherwise coalesces >> to a generic string. >> >> And I keep getting this exception during the processing on a random row. >> >> Caused by: java.lang.IndexOutOfBoundsException: offset=0, >> targetOffset=24, numBytes=8, address=16, targetAddress=16 >> at >> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441) >> at >> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249) >> at >> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93) >> at >> org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30) >> at >> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140) >> at >> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(ArrayDataSerializer.java:210) >> at >> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:109) >> at >> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30) >> at >> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147) >> at >> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175) >> at >> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109) >> at TableCalcMapFunction$130.flatMap_split26(Unknown Source) >> at TableCalcMapFunction$130.flatMap(Unknown Source) >> at >> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119) >> >> Is that enough context or is there something else I can give you all? >> >> Thanks! >> >> >> >> >> On Tue, Feb 15, 2022 at 1:24 PM Sid Kal <flinkbyhe...@gmail.com> wrote: >> >>> Hi Jonathan, >>> >>> It would be better if you describe your scenario along with the code. It >>> would be easier for the community to help. >>> >>> On Tue, 15 Feb 2022, 23:33 Jonathan Weaver, <myordinat...@gmail.com> >>> wrote: >>> >>>> I'm getting the following exception running locally from my IDE >>>> (IntelliJ) but seems to not occur >>>> when running on a cluster. I'm assuming it may be related to memory >>>> settings on the runtime (machine has 64GB of ram avail) but not sure what >>>> setting to try and change. >>>> >>>> Caused by: java.lang.IndexOutOfBoundsException: offset=0, >>>> targetOffset=3568, numBytes=40, address=16, targetAddress=16 >>>> at >>>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441) >>>> at >>>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249) >>>> at >>>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:110) >>>> at >>>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30) >>>> at >>>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147) >>>> at >>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175) >>>> at >>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109) >>>> at TableCalcMapFunction$148.flatMap_split18(Unknown Source) >>>> at TableCalcMapFunction$148.flatMap(Unknown Source) >>>> at >>>> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119) >>>> >>>> Was wondering if anyone had any insights or pointers on what could be >>>> causing that? >>>> >>>> Thanks! >>>> Jonathan >>>> >>>>