Are you sure you're always matching the output row type provided by
DynamicTableFactory
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/factories/DynamicTableFactory.Context.html#getPhysicalRowDataType-->
?

Also looking at the javadocs it seems like you can use both internal and
external types, depending on your preference:

* AsyncTableFunction
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/functions/AsyncTableFunction.html>
* AsyncTableFunctionProvider

<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.html>
Not sure how I can help more without looking at the full code, perhaps can
you provide a fully working reproducible?

FG
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.html>

On Wed, Feb 16, 2022 at 4:15 PM Jonathan Weaver <myordinat...@gmail.com>
wrote:

> No, I'm creating a custom SQL lookup table (which uses
> AsyncTableFunction<RowData>) which requires the internal types.
>
> I implement
> the LookupTableSource, AsyncTableFunction<RowData>, DynamicTableSourceFactory
> trio as per the examples in the docs.
>
> My construction is the equivalent of this, and it still errors with that
> exception when using exactly this.
>
>                   Map<StringData, ArrayData> foo = new HashMap<StringData,
> ArrayData>();
>                   foo.put(
>                       StringData.fromString("foo"),
>                       new GenericArrayData(new Object[]
> {StringData.fromString("bar")}));
>                   MapData mapColumn = new GenericMapData(foo);
>
>                   return (RowData)GenericRowData(new Object[] { mapColumn
> } );
>
>
>
>
> On Wed, Feb 16, 2022 at 8:02 AM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> Hi,
>>
>> From what I understand, you're creating a scalar function taking a string
>> with json and then converting it to a map using a custom function.
>>
>> Assuming I understood correctly, I think the problem here is that you're
>> using internal data types for UDFs, which is discouraged in most of the use
>> cases. Rather than using StringData, MapData, ArrayData etc you should just
>> use Java's String, Map and arrays. Check out this particular paragraph of
>> our docs that shows using complex types for scalar functions:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference.
>> Please try to convert
>> Looking only at the exception you provide here, it definitely seems like
>> a wrong usage of the internal data types, like that Tuple2 inserted into a
>> GenericMapData. There are no checks in GenericMapData to check that you're
>> constructing it with the correct types, and since Tuple2 is not a correct
>> type, the serializer just fails hard.
>>
>> Please correct me if I misunderstood what you're doing, and in case
>> provide more info about what your goal and how you've implemented the job.
>>
>> FG
>>
>> On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver <myordinat...@gmail.com>
>> wrote:
>>
>>> I've narrowed it down to a TableSource that is returning a MAP type as a
>>> column. Only errors when the column is referenced, and not on the first
>>> row, but somewhere in the stream of rows.
>>>
>>> On 1.15 master branch (I need the new JSON features in 1.15 for this
>>> project so riding the daily snapshot during development)
>>>
>>> In catalog column is defined as
>>> .column("vc", DataTypes.MAP(DataTypes.STRING(),
>>> DataTypes.ARRAY(DataTypes.STRING())))
>>>
>>> My TableFunction is returning the following for the column
>>>
>>>                   return new GenericMapData(
>>>                       fields.<StringData, ArrayData>toJavaMap(
>>>                           v ->
>>>                               new Tuple2(
>>>                                   StringData.fromString(v.getKey()),
>>>                                   new GenericArrayData(
>>>                                       v.getValue().isArray()
>>>                                           ? List.ofAll(() ->
>>> ((ArrayNode) v.getValue()).elements())
>>>                                               .map(vv ->
>>> StringData.fromString(vv.asText()))
>>>
>>> .toJavaArray(StringData[]::new)
>>>                                           :
>>> List.of(StringData.fromString(v.getValue().asText()))
>>>
>>> .toJavaArray(StringData[]::new)))));
>>>                 });
>>>
>>> Where it's basically looping over a jackson JsonNode parsed from a DB
>>> table and returning as a MAP (the keys and values are sparse amongst
>>> hundreds of possibilities). The values in the Json are either a single text
>>> value, or an array of text values so I'm just turning all values into an
>>> array.
>>>
>>> There are around ~190 key-values in the map on average.
>>>
>>> The SQL that references the column is just
>>>
>>> COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type,
>>>
>>> So looks up a specific key and uses it if it exists, otherwise coalesces
>>> to a generic string.
>>>
>>> And I keep getting this exception during the processing on a random row.
>>>
>>> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
>>> targetOffset=24, numBytes=8, address=16, targetAddress=16
>>> at
>>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
>>> at
>>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
>>> at
>>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93)
>>> at
>>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30)
>>> at
>>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
>>> at
>>> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(ArrayDataSerializer.java:210)
>>> at
>>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:109)
>>> at
>>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
>>> at
>>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
>>> at
>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
>>> at
>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
>>> at TableCalcMapFunction$130.flatMap_split26(Unknown Source)
>>> at TableCalcMapFunction$130.flatMap(Unknown Source)
>>> at
>>> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
>>>
>>> Is that enough context or is there something else I can give you all?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>> On Tue, Feb 15, 2022 at 1:24 PM Sid Kal <flinkbyhe...@gmail.com> wrote:
>>>
>>>> Hi Jonathan,
>>>>
>>>> It would be better if you describe your scenario along with the code.
>>>> It would be easier for the community to help.
>>>>
>>>> On Tue, 15 Feb 2022, 23:33 Jonathan Weaver, <myordinat...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm getting the following exception running locally from my IDE
>>>>> (IntelliJ) but seems to not occur
>>>>> when running on a cluster. I'm assuming it may be related to memory
>>>>> settings on the runtime (machine has 64GB of ram avail) but not sure what
>>>>> setting to try and change.
>>>>>
>>>>> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
>>>>> targetOffset=3568, numBytes=40, address=16, targetAddress=16
>>>>> at
>>>>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
>>>>> at
>>>>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
>>>>> at
>>>>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:110)
>>>>> at
>>>>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
>>>>> at
>>>>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
>>>>> at
>>>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
>>>>> at
>>>>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
>>>>> at TableCalcMapFunction$148.flatMap_split18(Unknown Source)
>>>>> at TableCalcMapFunction$148.flatMap(Unknown Source)
>>>>> at
>>>>> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
>>>>>
>>>>> Was wondering if anyone had any insights or pointers on what could be
>>>>> causing that?
>>>>>
>>>>> Thanks!
>>>>> Jonathan
>>>>>
>>>>>

Reply via email to