Hi Le,

I am pulling in Gordon who might be able to help you with your question.

Looking at the interface Context, it looks that you cannot easily specify a
TypeHint for the message you want to send. Hence, I guess that you
explicitly need to register these types.

Cheers,
Till

On Tue, Mar 30, 2021 at 8:20 AM Le Xu <sharonx...@gmail.com> wrote:

> Hello!
>
> I'm trying to figure out whether Flink Statefun supports sending object
> with class that has generic parameter types (and potentially nested types).
> For example, I send a message that looks like this:
>
> context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject,
> listOfLongObject, Long));
>
> And obviously I'm getting complaints like this:
>
> Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract
> TypeInformation from Class alone, because generic parameters are missing.
> Please use TypeInformation.of(TypeHint) instead, or another equivalent
> method in the API that accepts a TypeHint instead of a Class. For example
> for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
> at
> org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
> at
> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
> at
> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
> at
> org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
> at
> org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
> at
> org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
> at
> org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
> at
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
> at
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
> at
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
> at
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
> at
> org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
> at
> org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
> at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
> at
> benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
> at
> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
>
>
> Is there any API function that statefun support for parameterized class
> like this or does the user function need to handle the serialization
> process -- or is there anyway to quickly modify statefun message interface
> to support this functionality.
>
> Thanks!
>
> Le
>
>
>
>
>
>

Reply via email to