Got that, thanks. I'll try On Mon, Oct 18, 2021 at 11:50 AM Arvid Heise <ar...@apache.org> wrote:
> If you submit a fat jar to Flink, it contains the Kinesis connector. Dawid > was suggesting to also add the SequenceNumber to your src with the original > package name such that you effectively overwrite the class of Kinesis while > creating the fat jar (there should be warning and you should double-check > that your SequenceNumber wins). > > On Thu, Oct 14, 2021 at 3:22 PM Ori Popowski <ori....@gmail.com> wrote: > >> Thanks for answering. >> >> Not sure I understood the hack suggestion. If I copy SequenceNumber over >> to my job, how the original Flink Kinesis lib will use that class? It's >> fixed on a specific package (in this case >> org.apache.flink.streaming.connectors.kinesis.model. Unless, you meant to >> somehow hack the JAR itself and replace the class with an annotated class? >> >> About the backpressure - I eliminated almost everything by now, so I >> don't know what it could be. I've ran out of ideas so I'm starting to look >> into serialization. The job is very, very simple. No algorithms. Most >> operations are just list/set concatenations, and still getting >> backpressure, no matter how big a cluster I use. I know where the >> backpressure is, I also started profiling and there's not a single function >> which is slow. GC is also looking good, no long pauses. >> >> On Thu, Oct 14, 2021 at 3:53 PM Dawid Wysakowicz <dwysakow...@apache.org> >> wrote: >> >>> Hey Ori, >>> >>> As for the SequenceNumber issue, I'd say yes, it can be seen as a bug. >>> In the current state one can not use kinesis consumer with the >>> pipeline.generic-types=false. The problem is because we use the >>> TypeInformation.of(SequenceNumber.class) method, which will in this case >>> always fallback to GenericTypeInfo (The GenericTypeInfo is the one that >>> uses KryoSerializer. That is way it does not help to register a Kryo >>> serializer, it is still a generic type). >>> >>> A dirty hack for you to try, could be to copy over the SequenceNumber >>> over to your job and annotate it with TypeInfo where you provide a factory >>> that would create something other than GenericTypeInfo (you could even use >>> a copy of GenericTypeInfo, but with a removed check for the >>> pipeline.generic-types flag). I know it is a really dirty hack. >>> >>> Ad. 2 Unfortunately I can't think of a better way. >>> >>> I have created FLINK-24549 to track the kinesis issue.[1] >>> >>> On the backpressure note, are you sure the issue is in the >>> serialization? Have you tried identifying the slow task first?[2] >>> >>> Best, >>> >>> Dawid >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-24549 >>> >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html >>> On 14/10/2021 12:41, Ori Popowski wrote: >>> >>> I'd appreciate if someone could advice on this issue. >>> >>> Thanks >>> >>> On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski <ori....@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have a large backpressure in a somewhat simple Flink application in >>>> Scala. Using Flink version 1.12.1. >>>> >>>> To find the source of the problem, I want to eliminate all classes with >>>> generic serialization, so I set >>>> pipeline.generic-types=false >>>> >>>> in order to spot those classes and write a serializer for them. >>>> >>>> However, for some reason, I get the stracktrace attached below. >>>> >>>> 1. It looks suspicious that one of Flink's own classes doesn't have >>>> a serializer and should fallback to generic serialization. Is this a >>>> bug? >>>> 2. I want to get a list of all classes which fallback to generic >>>> serialization. How can I do it other than setting >>>> pipeline.generic-types=false and eliminating those classes one by >>>> one? >>>> 3. I defined a custom Kryo serializer for this class using both >>>> addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…) >>>> and I still get the same error message. How can I provide Flink with >>>> custom >>>> serialization so it stops complaining about this? >>>> >>>> >>>> >>>> java.lang.UnsupportedOperationException: Generic types have been >>>> disabled in the ExecutionConfig and type >>>> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is >>>> treated as a generic type. >>>> at >>>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87) >>>> at >>>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104) >>>> at >>>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49) >>>> at >>>> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99) >>>> at >>>> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302) >>>> at >>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264) >>>> at >>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216) >>>> at >>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443) >>>> at >>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) >>>> at >>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> >>>>