Are you sure you're always matching the output row type provided by DynamicTableFactory <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/factories/DynamicTableFactory.Context.html#getPhysicalRowDataType--> ?
Also looking at the javadocs it seems like you can use both internal and external types, depending on your preference: * AsyncTableFunction <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/functions/AsyncTableFunction.html> * AsyncTableFunctionProvider <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.html> Not sure how I can help more without looking at the full code, perhaps can you provide a fully working reproducible? FG <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.html> On Wed, Feb 16, 2022 at 4:15 PM Jonathan Weaver <myordinat...@gmail.com> wrote: > No, I'm creating a custom SQL lookup table (which uses > AsyncTableFunction<RowData>) which requires the internal types. > > I implement > the LookupTableSource, AsyncTableFunction<RowData>, DynamicTableSourceFactory > trio as per the examples in the docs. > > My construction is the equivalent of this, and it still errors with that > exception when using exactly this. > > Map<StringData, ArrayData> foo = new HashMap<StringData, > ArrayData>(); > foo.put( > StringData.fromString("foo"), > new GenericArrayData(new Object[] > {StringData.fromString("bar")})); > MapData mapColumn = new GenericMapData(foo); > > return (RowData)GenericRowData(new Object[] { mapColumn > } ); > > > > > On Wed, Feb 16, 2022 at 8:02 AM Francesco Guardiani < > france...@ververica.com> wrote: > >> Hi, >> >> From what I understand, you're creating a scalar function taking a string >> with json and then converting it to a map using a custom function. >> >> Assuming I understood correctly, I think the problem here is that you're >> using internal data types for UDFs, which is discouraged in most of the use >> cases. Rather than using StringData, MapData, ArrayData etc you should just >> use Java's String, Map and arrays. Check out this particular paragraph of >> our docs that shows using complex types for scalar functions: >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference. >> Please try to convert >> Looking only at the exception you provide here, it definitely seems like >> a wrong usage of the internal data types, like that Tuple2 inserted into a >> GenericMapData. There are no checks in GenericMapData to check that you're >> constructing it with the correct types, and since Tuple2 is not a correct >> type, the serializer just fails hard. >> >> Please correct me if I misunderstood what you're doing, and in case >> provide more info about what your goal and how you've implemented the job. >> >> FG >> >> On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver <myordinat...@gmail.com> >> wrote: >> >>> I've narrowed it down to a TableSource that is returning a MAP type as a >>> column. Only errors when the column is referenced, and not on the first >>> row, but somewhere in the stream of rows. >>> >>> On 1.15 master branch (I need the new JSON features in 1.15 for this >>> project so riding the daily snapshot during development) >>> >>> In catalog column is defined as >>> .column("vc", DataTypes.MAP(DataTypes.STRING(), >>> DataTypes.ARRAY(DataTypes.STRING()))) >>> >>> My TableFunction is returning the following for the column >>> >>> return new GenericMapData( >>> fields.<StringData, ArrayData>toJavaMap( >>> v -> >>> new Tuple2( >>> StringData.fromString(v.getKey()), >>> new GenericArrayData( >>> v.getValue().isArray() >>> ? List.ofAll(() -> >>> ((ArrayNode) v.getValue()).elements()) >>> .map(vv -> >>> StringData.fromString(vv.asText())) >>> >>> .toJavaArray(StringData[]::new) >>> : >>> List.of(StringData.fromString(v.getValue().asText())) >>> >>> .toJavaArray(StringData[]::new))))); >>> }); >>> >>> Where it's basically looping over a jackson JsonNode parsed from a DB >>> table and returning as a MAP (the keys and values are sparse amongst >>> hundreds of possibilities). The values in the Json are either a single text >>> value, or an array of text values so I'm just turning all values into an >>> array. >>> >>> There are around ~190 key-values in the map on average. >>> >>> The SQL that references the column is just >>> >>> COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type, >>> >>> So looks up a specific key and uses it if it exists, otherwise coalesces >>> to a generic string. >>> >>> And I keep getting this exception during the processing on a random row. >>> >>> Caused by: java.lang.IndexOutOfBoundsException: offset=0, >>> targetOffset=24, numBytes=8, address=16, targetAddress=16 >>> at >>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441) >>> at >>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249) >>> at >>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93) >>> at >>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30) >>> at >>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140) >>> at >>> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(ArrayDataSerializer.java:210) >>> at >>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:109) >>> at >>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30) >>> at >>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147) >>> at >>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175) >>> at >>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109) >>> at TableCalcMapFunction$130.flatMap_split26(Unknown Source) >>> at TableCalcMapFunction$130.flatMap(Unknown Source) >>> at >>> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119) >>> >>> Is that enough context or is there something else I can give you all? >>> >>> Thanks! >>> >>> >>> >>> >>> On Tue, Feb 15, 2022 at 1:24 PM Sid Kal <flinkbyhe...@gmail.com> wrote: >>> >>>> Hi Jonathan, >>>> >>>> It would be better if you describe your scenario along with the code. >>>> It would be easier for the community to help. >>>> >>>> On Tue, 15 Feb 2022, 23:33 Jonathan Weaver, <myordinat...@gmail.com> >>>> wrote: >>>> >>>>> I'm getting the following exception running locally from my IDE >>>>> (IntelliJ) but seems to not occur >>>>> when running on a cluster. I'm assuming it may be related to memory >>>>> settings on the runtime (machine has 64GB of ram avail) but not sure what >>>>> setting to try and change. >>>>> >>>>> Caused by: java.lang.IndexOutOfBoundsException: offset=0, >>>>> targetOffset=3568, numBytes=40, address=16, targetAddress=16 >>>>> at >>>>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441) >>>>> at >>>>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249) >>>>> at >>>>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:110) >>>>> at >>>>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30) >>>>> at >>>>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147) >>>>> at >>>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175) >>>>> at >>>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109) >>>>> at TableCalcMapFunction$148.flatMap_split18(Unknown Source) >>>>> at TableCalcMapFunction$148.flatMap(Unknown Source) >>>>> at >>>>> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119) >>>>> >>>>> Was wondering if anyone had any insights or pointers on what could be >>>>> causing that? >>>>> >>>>> Thanks! >>>>> Jonathan >>>>> >>>>>