No, I'm creating a custom SQL lookup table (which uses
AsyncTableFunction<RowData>) which requires the internal types.

I implement
the LookupTableSource, AsyncTableFunction<RowData>, DynamicTableSourceFactory
trio as per the examples in the docs.

My construction is the equivalent of this, and it still errors with that
exception when using exactly this.

                  Map<StringData, ArrayData> foo = new HashMap<StringData,
                      new GenericArrayData(new Object[]
                  MapData mapColumn = new GenericMapData(foo);

                  return (RowData)GenericRowData(new Object[] { mapColumn }

On Wed, Feb 16, 2022 at 8:02 AM Francesco Guardiani <>

> Hi,
> From what I understand, you're creating a scalar function taking a string
> with json and then converting it to a map using a custom function.
> Assuming I understood correctly, I think the problem here is that you're
> using internal data types for UDFs, which is discouraged in most of the use
> cases. Rather than using StringData, MapData, ArrayData etc you should just
> use Java's String, Map and arrays. Check out this particular paragraph of
> our docs that shows using complex types for scalar functions:
> Please try to convert
> Looking only at the exception you provide here, it definitely seems like a
> wrong usage of the internal data types, like that Tuple2 inserted into a
> GenericMapData. There are no checks in GenericMapData to check that you're
> constructing it with the correct types, and since Tuple2 is not a correct
> type, the serializer just fails hard.
> Please correct me if I misunderstood what you're doing, and in case
> provide more info about what your goal and how you've implemented the job.
> FG
> On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver <>
> wrote:
>> I've narrowed it down to a TableSource that is returning a MAP type as a
>> column. Only errors when the column is referenced, and not on the first
>> row, but somewhere in the stream of rows.
>> On 1.15 master branch (I need the new JSON features in 1.15 for this
>> project so riding the daily snapshot during development)
>> In catalog column is defined as
>> .column("vc", DataTypes.MAP(DataTypes.STRING(),
>> DataTypes.ARRAY(DataTypes.STRING())))
>> My TableFunction is returning the following for the column
>>                   return new GenericMapData(
>>                       fields.<StringData, ArrayData>toJavaMap(
>>                           v ->
>>                               new Tuple2(
>>                                   StringData.fromString(v.getKey()),
>>                                   new GenericArrayData(
>>                                       v.getValue().isArray()
>>                                           ? List.ofAll(() -> ((ArrayNode)
>> v.getValue()).elements())
>>                                               .map(vv ->
>> StringData.fromString(vv.asText()))
>> .toJavaArray(StringData[]::new)
>>                                           :
>> List.of(StringData.fromString(v.getValue().asText()))
>> .toJavaArray(StringData[]::new)))));
>>                 });
>> Where it's basically looping over a jackson JsonNode parsed from a DB
>> table and returning as a MAP (the keys and values are sparse amongst
>> hundreds of possibilities). The values in the Json are either a single text
>> value, or an array of text values so I'm just turning all values into an
>> array.
>> There are around ~190 key-values in the map on average.
>> The SQL that references the column is just
>> COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type,
>> So looks up a specific key and uses it if it exists, otherwise coalesces
>> to a generic string.
>> And I keep getting this exception during the processing on a random row.
>> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
>> targetOffset=24, numBytes=8, address=16, targetAddress=16
>> at
>> org.apache.flink.core.memory.MemorySegment.copyTo(
>> at
>> at
>> at
>> at
>> at
>> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(
>> at
>> at
>> at
>> at
>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(
>> at
>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(
>> at TableCalcMapFunction$130.flatMap_split26(Unknown Source)
>> at TableCalcMapFunction$130.flatMap(Unknown Source)
>> at
>> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(
>> Is that enough context or is there something else I can give you all?
>> Thanks!
>> On Tue, Feb 15, 2022 at 1:24 PM Sid Kal <> wrote:
>>> Hi Jonathan,
>>> It would be better if you describe your scenario along with the code. It
>>> would be easier for the community to help.
>>> On Tue, 15 Feb 2022, 23:33 Jonathan Weaver, <>
>>> wrote:
>>>> I'm getting the following exception running locally from my IDE
>>>> (IntelliJ) but seems to not occur
>>>> when running on a cluster. I'm assuming it may be related to memory
>>>> settings on the runtime (machine has 64GB of ram avail) but not sure what
>>>> setting to try and change.
>>>> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
>>>> targetOffset=3568, numBytes=40, address=16, targetAddress=16
>>>> at
>>>> org.apache.flink.core.memory.MemorySegment.copyTo(
>>>> at
>>>> at
>>>> at
>>>> at
>>>> at
>>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(
>>>> at
>>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(
>>>> at TableCalcMapFunction$148.flatMap_split18(Unknown Source)
>>>> at TableCalcMapFunction$148.flatMap(Unknown Source)
>>>> at
>>>> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(
>>>> Was wondering if anyone had any insights or pointers on what could be
>>>> causing that?
>>>> Thanks!
>>>> Jonathan

Reply via email to