No, I'm creating a custom SQL lookup table (which uses
AsyncTableFunction<RowData>) which requires the internal types.

I implement
the LookupTableSource, AsyncTableFunction<RowData>, DynamicTableSourceFactory
trio as per the examples in the docs.

My construction is the equivalent of this, and it still errors with that
exception when using exactly this.

                  Map<StringData, ArrayData> foo = new HashMap<StringData,
ArrayData>();
                  foo.put(
                      StringData.fromString("foo"),
                      new GenericArrayData(new Object[]
{StringData.fromString("bar")}));
                  MapData mapColumn = new GenericMapData(foo);

                  return (RowData)GenericRowData(new Object[] { mapColumn }
);




On Wed, Feb 16, 2022 at 8:02 AM Francesco Guardiani <france...@ververica.com>
wrote:

> Hi,
>
> From what I understand, you're creating a scalar function taking a string
> with json and then converting it to a map using a custom function.
>
> Assuming I understood correctly, I think the problem here is that you're
> using internal data types for UDFs, which is discouraged in most of the use
> cases. Rather than using StringData, MapData, ArrayData etc you should just
> use Java's String, Map and arrays. Check out this particular paragraph of
> our docs that shows using complex types for scalar functions:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference.
> Please try to convert
> Looking only at the exception you provide here, it definitely seems like a
> wrong usage of the internal data types, like that Tuple2 inserted into a
> GenericMapData. There are no checks in GenericMapData to check that you're
> constructing it with the correct types, and since Tuple2 is not a correct
> type, the serializer just fails hard.
>
> Please correct me if I misunderstood what you're doing, and in case
> provide more info about what your goal and how you've implemented the job.
>
> FG
>
> On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver <myordinat...@gmail.com>
> wrote:
>
>> I've narrowed it down to a TableSource that is returning a MAP type as a
>> column. Only errors when the column is referenced, and not on the first
>> row, but somewhere in the stream of rows.
>>
>> On 1.15 master branch (I need the new JSON features in 1.15 for this
>> project so riding the daily snapshot during development)
>>
>> In catalog column is defined as
>> .column("vc", DataTypes.MAP(DataTypes.STRING(),
>> DataTypes.ARRAY(DataTypes.STRING())))
>>
>> My TableFunction is returning the following for the column
>>
>>                   return new GenericMapData(
>>                       fields.<StringData, ArrayData>toJavaMap(
>>                           v ->
>>                               new Tuple2(
>>                                   StringData.fromString(v.getKey()),
>>                                   new GenericArrayData(
>>                                       v.getValue().isArray()
>>                                           ? List.ofAll(() -> ((ArrayNode)
>> v.getValue()).elements())
>>                                               .map(vv ->
>> StringData.fromString(vv.asText()))
>>
>> .toJavaArray(StringData[]::new)
>>                                           :
>> List.of(StringData.fromString(v.getValue().asText()))
>>
>> .toJavaArray(StringData[]::new)))));
>>                 });
>>
>> Where it's basically looping over a jackson JsonNode parsed from a DB
>> table and returning as a MAP (the keys and values are sparse amongst
>> hundreds of possibilities). The values in the Json are either a single text
>> value, or an array of text values so I'm just turning all values into an
>> array.
>>
>> There are around ~190 key-values in the map on average.
>>
>> The SQL that references the column is just
>>
>> COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type,
>>
>> So looks up a specific key and uses it if it exists, otherwise coalesces
>> to a generic string.
>>
>> And I keep getting this exception during the processing on a random row.
>>
>> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
>> targetOffset=24, numBytes=8, address=16, targetAddress=16
>> at
>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93)
>> at
>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30)
>> at
>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
>> at
>> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(ArrayDataSerializer.java:210)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:109)
>> at
>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
>> at
>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
>> at
>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
>> at
>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
>> at TableCalcMapFunction$130.flatMap_split26(Unknown Source)
>> at TableCalcMapFunction$130.flatMap(Unknown Source)
>> at
>> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
>>
>> Is that enough context or is there something else I can give you all?
>>
>> Thanks!
>>
>>
>>
>>
>> On Tue, Feb 15, 2022 at 1:24 PM Sid Kal <flinkbyhe...@gmail.com> wrote:
>>
>>> Hi Jonathan,
>>>
>>> It would be better if you describe your scenario along with the code. It
>>> would be easier for the community to help.
>>>
>>> On Tue, 15 Feb 2022, 23:33 Jonathan Weaver, <myordinat...@gmail.com>
>>> wrote:
>>>
>>>> I'm getting the following exception running locally from my IDE
>>>> (IntelliJ) but seems to not occur
>>>> when running on a cluster. I'm assuming it may be related to memory
>>>> settings on the runtime (machine has 64GB of ram avail) but not sure what
>>>> setting to try and change.
>>>>
>>>> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
>>>> targetOffset=3568, numBytes=40, address=16, targetAddress=16
>>>> at
>>>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
>>>> at
>>>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
>>>> at
>>>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:110)
>>>> at
>>>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
>>>> at
>>>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
>>>> at
>>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
>>>> at
>>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
>>>> at TableCalcMapFunction$148.flatMap_split18(Unknown Source)
>>>> at TableCalcMapFunction$148.flatMap(Unknown Source)
>>>> at
>>>> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
>>>>
>>>> Was wondering if anyone had any insights or pointers on what could be
>>>> causing that?
>>>>
>>>> Thanks!
>>>> Jonathan
>>>>
>>>>

Reply via email to