Aljoscha Krettek created FLINK-7902:
---------------------------------------
Summary: TwoPhaseCommitSinkFunctions should use custom
TypeSerializer
Key: FLINK-7902
URL: https://issues.apache.org/jira/browse/FLINK-7902
Project: Flink
Issue Type: Bug
Components: Kafka Connector
Affects Versions: 1.4.0
Reporter: Aljoscha Krettek
Assignee: Piotr Nowojski
Priority: Blocker
Fix For: 1.4.0
Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new
TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {})}} to
create a {{TypeInformation}} which in turn is used to create a
{{StateDescriptor}} for the state that the Kafka sink stores.
Behind the scenes, this would be roughly analysed as a
{{PojoType(GenericType<KafkaTransactionState>,
GenericType<KafkaTransactionContext>)}} which means we don't have explicit
control over the serialisation format and we also use Kryo (which is the
default for {{GenericTypeInfo}}). This can be problematic if we want to evolve
the state schema in the future or if we want to change Kryo versions.
We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
{code}
public TwoPhaseCommitSinkFunction(TypeSerializer<State<TXN, CONTEXT>>
stateSerializer) {
{code}
and we should then change the {{FlinkKafkaProducer011}} to hand in a
custom-made {{TypeSerializer}} for the state.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)