I'd appreciate if someone could advice on this issue.

Thanks

On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski <ori....@gmail.com> wrote:

> Hi,
>
> I have a large backpressure in a somewhat simple Flink application in
> Scala. Using Flink version 1.12.1.
>
> To find the source of the problem, I want to eliminate all classes with
> generic serialization, so I set
> pipeline.generic-types=false
>
> in order to spot those classes and write a serializer for them.
>
> However, for some reason, I get the stracktrace attached below.
>
>    1. It looks suspicious that one of Flink's own classes doesn't have a
>    serializer and should fallback to generic serialization. Is this a bug?
>    2. I want to get a list of all classes which fallback to generic
>    serialization. How can I do it other than setting
>    pipeline.generic-types=false and eliminating those classes one by one?
>    3. I defined a custom Kryo serializer for this class using both
>    addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…) and
>    I still get the same error message. How can I provide Flink with custom
>    serialization so it stops complaining about this?
>
>
>
> java.lang.UnsupportedOperationException: Generic types have been disabled
> in the ExecutionConfig and type
> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is
> treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
> at
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
> at
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
> at
> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
> at
> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
>
>
>

Reply via email to