[ https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aljoscha Krettek closed FLINK-7902. ----------------------------------- Resolution: Fixed Fixed in 0ba528c71e35858a043bd513ead37800262f7e0c > TwoPhaseCommitSinkFunctions should use custom TypeSerializer > ------------------------------------------------------------ > > Key: FLINK-7902 > URL: https://issues.apache.org/jira/browse/FLINK-7902 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.4.0 > > > Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new > TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {})}} to > create a {{TypeInformation}} which in turn is used to create a > {{StateDescriptor}} for the state that the Kafka sink stores. > Behind the scenes, this would be roughly analysed as a > {{PojoType(GenericType<KafkaTransactionState>, > GenericType<KafkaTransactionContext>)}} which means we don't have explicit > control over the serialisation format and we also use Kryo (which is the > default for {{GenericTypeInfo}}). This can be problematic if we want to > evolve the state schema in the future or if we want to change Kryo versions. > We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor: > {code} > public TwoPhaseCommitSinkFunction(TypeSerializer<State<TXN, CONTEXT>> > stateSerializer) { > {code} > and we should then change the {{FlinkKafkaProducer011}} to hand in a > custom-made {{TypeSerializer}} for the state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)