Hey Ori, As for the SequenceNumber issue, I'd say yes, it can be seen as a bug. In the current state one can not use kinesis consumer with the pipeline.generic-types=false. The problem is because we use the TypeInformation.of(SequenceNumber.class) method, which will in this case always fallback to GenericTypeInfo (The GenericTypeInfo is the one that uses KryoSerializer. That is way it does not help to register a Kryo serializer, it is still a generic type).
A dirty hack for you to try, could be to copy over the SequenceNumber over to your job and annotate it with TypeInfo where you provide a factory that would create something other than GenericTypeInfo (you could even use a copy of GenericTypeInfo, but with a removed check for the pipeline.generic-types flag). I know it is a really dirty hack. Ad. 2 Unfortunately I can't think of a better way. I have created FLINK-24549 to track the kinesis issue.[1] On the backpressure note, are you sure the issue is in the serialization? Have you tried identifying the slow task first?[2] Best, Dawid [1] https://issues.apache.org/jira/browse/FLINK-24549 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html On 14/10/2021 12:41, Ori Popowski wrote: > I'd appreciate if someone could advice on this issue. > > Thanks > > On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski <ori....@gmail.com > <mailto:ori....@gmail.com>> wrote: > > Hi, > > I have a large backpressure in a somewhat simple Flink application > in Scala. Using Flink version 1.12.1. > > To find the source of the problem, I want to eliminate all classes > with generic serialization, so I set > pipeline.generic-types=false > > in order to spot those classes and write a serializer for them. > > However, for some reason, I get the stracktrace attached below. > > 1. It looks suspicious that one of Flink's own classes doesn't > have a serializer and should fallback to generic > serialization. Is this a bug? > 2. I want to get a list of all classes which fallback to generic > serialization. How can I do it other than setting > pipeline.generic-types=false and eliminating those classes one > by one? > 3. I defined a custom Kryo serializer for this class using both > addDefaultKryoSerializer(…) and > registerTypeWithKryoSerializer(…) and I still get the same > error message. How can I provide Flink with custom > serialization so it stops complaining about this? > > > > java.lang.UnsupportedOperationException: Generic types have been > disabled in the ExecutionConfig and type > org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber > is treated as a generic type. > at > > org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87) > at > > org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104) > at > > org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49) > at > > org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99) > at > > org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302) > at > > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264) > at > > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216) > at > > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443) > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) > at > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.lang.Thread.run(Thread.java:748) > >
OpenPGP_signature
Description: OpenPGP digital signature