Unfortunately, the fact that the Kafka Sources use Kryo for state
serialization is a very early design misstep that we cannot get rid of
for now. We will get rid of that when the new source interface lands
([1]) and when we have a new Kafka Source based on that.
As a workaround, we should change the Kafka Consumer to go through a
different constructor of ListStateDescriptor which directly takes a
TypeSerializer instead of a TypeInformation here: [2]. This should
sidestep the "no generic types" check.
I created a Jira Issue for this:
https://issues.apache.org/jira/browse/FLINK-15904
Best,
Aljoscha
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
[2]
https://github.com/apache/flink/blob/68cc21e4af71505efa142110e35a1f8b1c25fe6e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L860
On 01.02.20 09:44, Guowei Ma wrote:
Hi,
I think there could be two workaround ways to 'disableGenericType' in case
of KafkaSource :
1. adding the TypeInfo annotation [1] to the KafaTopicPartition.
2. using the reflection to call the private method. :)
Maybe we could add this TypeInfo annotation to the KafakaConnector.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#defining-type-information-using-a-factory
Best,
Guowei
Oleksandr Nitavskyi <o.nitavs...@criteo.com> 于2020年1月31日周五 上午12:40写道:
Hi guys,
We have encountered on some issue related with possibility to
‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as
idea to ensure that nobody introduce some random change which penalize the
performance of the job.
The issue we have encountered is that Flink’s KafkaSource is storing
KafkaTopicPartition in the state for offset recovery, which is serialized
with Kryo.
For sure this feature itself is not penalizing performance, but looks like
it reduces the usefulness of the possibility to ‘disableGenericTypes’. Also
on the side of Flink user there is no good tool to add
KafkaTopicPartition’s non-Kryo type information.
On of the related tickets I have found:
https://issues.apache.org/jira/browse/FLINK-12031
Do you know any workaround to ‘disableGenericType’ in case of KafkaSources
or what do you think making some development to address this issue?
Kind Regards
Oleksandr