Got that, thanks. I'll try

On Mon, Oct 18, 2021 at 11:50 AM Arvid Heise <ar...@apache.org> wrote:

> If you submit a fat jar to Flink, it contains the Kinesis connector. Dawid
> was suggesting to also add the SequenceNumber to your src with the original
> package name such that you effectively overwrite the class of Kinesis while
> creating the fat jar (there should be warning and you should double-check
> that your SequenceNumber wins).
>
> On Thu, Oct 14, 2021 at 3:22 PM Ori Popowski <ori....@gmail.com> wrote:
>
>> Thanks for answering.
>>
>> Not sure I understood the hack suggestion. If I copy SequenceNumber over
>> to my job, how the original Flink Kinesis lib will use that class? It's
>> fixed on a specific package (in this case
>> org.apache.flink.streaming.connectors.kinesis.model. Unless, you meant to
>> somehow hack the JAR itself and replace the class with an annotated class?
>>
>> About the backpressure - I eliminated almost everything by now, so I
>> don't know what it could be. I've ran out of ideas so I'm starting to look
>> into serialization. The job is very, very simple. No algorithms. Most
>> operations are just list/set concatenations, and still getting
>> backpressure, no matter how big a cluster I use. I know where the
>> backpressure is, I also started profiling and there's not a single function
>> which is slow. GC is also looking good, no long pauses.
>>
>> On Thu, Oct 14, 2021 at 3:53 PM Dawid Wysakowicz <dwysakow...@apache.org>
>> wrote:
>>
>>> Hey Ori,
>>>
>>> As for the SequenceNumber issue, I'd say yes, it can be seen as a bug.
>>> In the current state one can not use kinesis consumer with the
>>> pipeline.generic-types=false. The problem is because we use the
>>> TypeInformation.of(SequenceNumber.class) method, which will in this case
>>> always fallback to GenericTypeInfo (The GenericTypeInfo is the one that
>>> uses KryoSerializer. That is way it does not help to register a Kryo
>>> serializer, it is still a generic type).
>>>
>>> A dirty hack for you to try, could be to copy over the SequenceNumber
>>> over to your job and annotate it with TypeInfo where you provide a factory
>>> that would create something other than GenericTypeInfo (you could even use
>>> a copy of GenericTypeInfo, but with a removed check for the
>>> pipeline.generic-types flag). I know it is a really dirty hack.
>>>
>>> Ad. 2 Unfortunately I can't think of a better way.
>>>
>>> I have created FLINK-24549 to track the kinesis issue.[1]
>>>
>>> On the backpressure note, are you sure the issue is in the
>>> serialization? Have you tried identifying the slow task first?[2]
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-24549
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
>>> On 14/10/2021 12:41, Ori Popowski wrote:
>>>
>>> I'd appreciate if someone could advice on this issue.
>>>
>>> Thanks
>>>
>>> On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski <ori....@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a large backpressure in a somewhat simple Flink application in
>>>> Scala. Using Flink version 1.12.1.
>>>>
>>>> To find the source of the problem, I want to eliminate all classes with
>>>> generic serialization, so I set
>>>> pipeline.generic-types=false
>>>>
>>>> in order to spot those classes and write a serializer for them.
>>>>
>>>> However, for some reason, I get the stracktrace attached below.
>>>>
>>>>    1. It looks suspicious that one of Flink's own classes doesn't have
>>>>    a serializer and should fallback to generic serialization. Is this a 
>>>> bug?
>>>>    2. I want to get a list of all classes which fallback to generic
>>>>    serialization. How can I do it other than setting
>>>>    pipeline.generic-types=false and eliminating those classes one by
>>>>    one?
>>>>    3. I defined a custom Kryo serializer for this class using both
>>>>    addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…)
>>>>    and I still get the same error message. How can I provide Flink with 
>>>> custom
>>>>    serialization so it stops complaining about this?
>>>>
>>>>
>>>>
>>>> java.lang.UnsupportedOperationException: Generic types have been
>>>> disabled in the ExecutionConfig and type
>>>> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is
>>>> treated as a generic type.
>>>> at
>>>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
>>>> at
>>>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
>>>> at
>>>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
>>>> at
>>>> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
>>>> at
>>>> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
>>>> at
>>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
>>>> at
>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>>>> at
>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>>

Reply via email to