If you submit a fat jar to Flink, it contains the Kinesis connector. Dawid
was suggesting to also add the SequenceNumber to your src with the original
package name such that you effectively overwrite the class of Kinesis while
creating the fat jar (there should be warning and you should double-check
that your SequenceNumber wins).

On Thu, Oct 14, 2021 at 3:22 PM Ori Popowski <ori....@gmail.com> wrote:

> Thanks for answering.
>
> Not sure I understood the hack suggestion. If I copy SequenceNumber over
> to my job, how the original Flink Kinesis lib will use that class? It's
> fixed on a specific package (in this case
> org.apache.flink.streaming.connectors.kinesis.model. Unless, you meant to
> somehow hack the JAR itself and replace the class with an annotated class?
>
> About the backpressure - I eliminated almost everything by now, so I don't
> know what it could be. I've ran out of ideas so I'm starting to look into
> serialization. The job is very, very simple. No algorithms. Most operations
> are just list/set concatenations, and still getting backpressure, no matter
> how big a cluster I use. I know where the backpressure is, I also started
> profiling and there's not a single function which is slow. GC is also
> looking good, no long pauses.
>
> On Thu, Oct 14, 2021 at 3:53 PM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hey Ori,
>>
>> As for the SequenceNumber issue, I'd say yes, it can be seen as a bug. In
>> the current state one can not use kinesis consumer with the
>> pipeline.generic-types=false. The problem is because we use the
>> TypeInformation.of(SequenceNumber.class) method, which will in this case
>> always fallback to GenericTypeInfo (The GenericTypeInfo is the one that
>> uses KryoSerializer. That is way it does not help to register a Kryo
>> serializer, it is still a generic type).
>>
>> A dirty hack for you to try, could be to copy over the SequenceNumber
>> over to your job and annotate it with TypeInfo where you provide a factory
>> that would create something other than GenericTypeInfo (you could even use
>> a copy of GenericTypeInfo, but with a removed check for the
>> pipeline.generic-types flag). I know it is a really dirty hack.
>>
>> Ad. 2 Unfortunately I can't think of a better way.
>>
>> I have created FLINK-24549 to track the kinesis issue.[1]
>>
>> On the backpressure note, are you sure the issue is in the serialization?
>> Have you tried identifying the slow task first?[2]
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-24549
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
>> On 14/10/2021 12:41, Ori Popowski wrote:
>>
>> I'd appreciate if someone could advice on this issue.
>>
>> Thanks
>>
>> On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski <ori....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a large backpressure in a somewhat simple Flink application in
>>> Scala. Using Flink version 1.12.1.
>>>
>>> To find the source of the problem, I want to eliminate all classes with
>>> generic serialization, so I set
>>> pipeline.generic-types=false
>>>
>>> in order to spot those classes and write a serializer for them.
>>>
>>> However, for some reason, I get the stracktrace attached below.
>>>
>>>    1. It looks suspicious that one of Flink's own classes doesn't have
>>>    a serializer and should fallback to generic serialization. Is this a bug?
>>>    2. I want to get a list of all classes which fallback to generic
>>>    serialization. How can I do it other than setting
>>>    pipeline.generic-types=false and eliminating those classes one by
>>>    one?
>>>    3. I defined a custom Kryo serializer for this class using both
>>>    addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…)
>>>    and I still get the same error message. How can I provide Flink with 
>>> custom
>>>    serialization so it stops complaining about this?
>>>
>>>
>>>
>>> java.lang.UnsupportedOperationException: Generic types have been
>>> disabled in the ExecutionConfig and type
>>> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is
>>> treated as a generic type.
>>> at
>>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
>>> at
>>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
>>> at
>>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
>>> at
>>> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
>>> at
>>> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
>>> at
>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>

Reply via email to