[ 
https://issues.apache.org/jira/browse/FLINK-13535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-13535:
--------------------------------
    Description: 
During startup of a transactional Kafka producer from previous state, we 
recover in two steps:
# in {{TwoPhaseCommitSinkFunction}}, we commit pending commit-transactions and 
abort pending transactions and then call into {{finishRecoveringContext()}}
# in {{FlinkKafkaProducer#finishRecoveringContext()}} we iterate over all 
recovered transaction IDs and abort them.

This may lead to some transactions being worked on twice. Since this is quite 
some expensive operation, we unnecessarily slow down the job startup but could 
easily give {{finishRecoveringContext()}} a set of transactions that 
{{TwoPhaseCommitSinkFunction}} already covered instead.

  was:
During startup of a transactional Kafka producer from previous state, we 
recover in two steps:
# in {{TwoPhaseCommitSinkFunction}}, we commit pending commit-transactions and 
abort pending transactions and then call into {{finishRecoveringContext()}}
# in {{FlinkKafkaProducer#finishRecoveringContext()}} we iterate over all 
recovered transaction IDs and abort them
This may lead to some transactions being worked on twice. Since this is quite 
some expensive operation, we unnecessarily slow down the job startup but could 
easily give {{finishRecoveringContext()}} a set of transactions that 
{{TwoPhaseCommitSinkFunction}} already covered instead.


> Do not abort transactions twice during KafkaProducer startup
> ------------------------------------------------------------
>
>                 Key: FLINK-13535
>                 URL: https://issues.apache.org/jira/browse/FLINK-13535
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.8.1, 1.9.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Major
>
> During startup of a transactional Kafka producer from previous state, we 
> recover in two steps:
> # in {{TwoPhaseCommitSinkFunction}}, we commit pending commit-transactions 
> and abort pending transactions and then call into 
> {{finishRecoveringContext()}}
> # in {{FlinkKafkaProducer#finishRecoveringContext()}} we iterate over all 
> recovered transaction IDs and abort them.
> This may lead to some transactions being worked on twice. Since this is quite 
> some expensive operation, we unnecessarily slow down the job startup but 
> could easily give {{finishRecoveringContext()}} a set of transactions that 
> {{TwoPhaseCommitSinkFunction}} already covered instead.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to