Wei Hao created FLINK-21743:
-------------------------------

             Summary: JdbcXaSinkFunction throws XAER_RMFAIL when calling 
snapshotState and beginTx
                 Key: FLINK-21743
                 URL: https://issues.apache.org/jira/browse/FLINK-21743
             Project: Flink
          Issue Type: Test
          Components: Connectors / JDBC
    Affects Versions: 1.13.0
         Environment: org.apache.flink:flink-streaming-java_2.11:1.12.1
org.apache.flink:flink-connector-jdbc_2.11:1.13-SNAPSHOT
            Reporter: Wei Hao


{code:java}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    LOG.debug("snapshot state, checkpointId={}", context.getCheckpointId());
    this.rollbackPreparedFromCheckpoint(context.getCheckpointId());
    this.prepareCurrentTx(context.getCheckpointId());
    this.beginTx(context.getCheckpointId() + 1L);
    this.stateHandler.store(JdbcXaSinkFunctionState.of(this.preparedXids, 
this.hangingXids));
}
{code}
When checkpointing starts, it calls snapshotState(), which ends and prepares 
the current transaction. The issue I found is with beginTx(), where a new Xid 
is generated and xaFacade will run command like 'xa start new_xid', which will 
throw the exception as shown below and causes checkpointing failure.
{code:java}
Caused by: org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException: 
com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be executed 
when global transaction is in the  PREPARED stateCaused by: 
org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException: 
com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be executed 
when global transaction is in the  PREPARED state at 
org.apache.flink.connector.jdbc.xa.XaFacadeImpl.wrapException(XaFacadeImpl.java:353)
 at 
org.apache.flink.connector.jdbc.xa.XaFacadeImpl.access$800(XaFacadeImpl.java:66)
 at 
org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$0(XaFacadeImpl.java:288)
 at 
org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$4(XaFacadeImpl.java:327)
 at 
org.apache.flink.connector.jdbc.xa.XaFacadeImpl.execute(XaFacadeImpl.java:267) 
at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.start(XaFacadeImpl.java:160) 
at 
org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.beginTx(JdbcXaSinkFunction.java:302)
 at 
org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.snapshotState(JdbcXaSinkFunction.java:241)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
 at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
 ... 23 more
{code}
I think the scenario is quite predictable because it is how xa transaction 
works.
The MySQL shell example below behaves quite similar to how JdbcXaSinkFunction 
works.
{code:java}
xa start "1111";
# Inserting some rows
# end the current transaction
xa end "1111";
xa prepare "1111";
# start a new transaction with the same connection while the previous one is 
PREPARED
xa prepare "2222";
{code}
This also produces error 'SQL Error [1399] [XAE07]: XAER_RMFAIL: The command 
cannot be executed when global transaction is in the PREPARED state'.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to