Hey Ori,

As for the SequenceNumber issue, I'd say yes, it can be seen as a bug.
In the current state one can not use kinesis consumer with the
pipeline.generic-types=false. The problem is because we use the
TypeInformation.of(SequenceNumber.class) method, which will in this case
always fallback to GenericTypeInfo (The GenericTypeInfo is the one that
uses KryoSerializer. That is way it does not help to register a Kryo
serializer, it is still a generic type).

A dirty hack for you to try, could be to copy over the SequenceNumber
over to your job and annotate it with TypeInfo where you provide a
factory that would create something other than GenericTypeInfo (you
could even use a copy of GenericTypeInfo, but with a removed check for
the pipeline.generic-types flag). I know it is a really dirty hack.

Ad. 2 Unfortunately I can't think of a better way.

I have created FLINK-24549 to track the kinesis issue.[1]

On the backpressure note, are you sure the issue is in the
serialization? Have you tried identifying the slow task first?[2]

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-24549

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html

On 14/10/2021 12:41, Ori Popowski wrote:
> I'd appreciate if someone could advice on this issue.
>
> Thanks
>
> On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski <ori....@gmail.com
> <mailto:ori....@gmail.com>> wrote:
>
>     Hi,
>
>     I have a large backpressure in a somewhat simple Flink application
>     in Scala. Using Flink version 1.12.1.
>
>     To find the source of the problem, I want to eliminate all classes
>     with generic serialization, so I set
>     pipeline.generic-types=false
>
>     in order to spot those classes and write a serializer for them.
>
>     However, for some reason, I get the stracktrace attached below.
>
>      1. It looks suspicious that one of Flink's own classes doesn't
>         have a serializer and should fallback to generic
>         serialization. Is this a bug?
>      2. I want to get a list of all classes which fallback to generic
>         serialization. How can I do it other than setting
>         pipeline.generic-types=false and eliminating those classes one
>         by one?
>      3. I defined a custom Kryo serializer for this class using both
>         addDefaultKryoSerializer(…) and
>         registerTypeWithKryoSerializer(…) and I still get the same
>         error message. How can I provide Flink with custom
>         serialization so it stops complaining about this?
>
>
>
>     java.lang.UnsupportedOperationException: Generic types have been
>     disabled in the ExecutionConfig and type
>     org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
>     is treated as a generic type.
>     at
>     
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
>     at
>     
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
>     at
>     
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
>     at
>     
> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
>     at
>     
> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
>     at
>     
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
>     at
>     
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
>     at
>     
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
>     at
>     
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>     at
>     
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>     at
>     
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>     at
>     
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
>     at
>     
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>     at
>     
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>     at
>     
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>     at
>     
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>     at
>     
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>     at
>     
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>     at java.lang.Thread.run(Thread.java:748)
>
>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to