[
https://issues.apache.org/jira/browse/FLINK-22311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17324171#comment-17324171
]
Maciej Bryński edited comment on FLINK-22311 at 4/17/21, 8:34 AM:
------------------------------------------------------------------
This is the log.
I have few duplicate records with 2021-04-15T22:15:42 timestamp.
{code:java}
1:052021-04-15T23:16:49.762722475+00:00 stdout F 2021-04-15 23:16:49,761 INFO
org.apache.kafka.clients.FetchSessionHandler [] - [Consumer
clientId=consumer-flink-ingestion-raw-2, groupId=flink-ingestion-raw] Error
sending fetch request (sessionId=842995853, epoch=11339) to node 1: {}.
2021-04-15T22:16:04.498583738+00:00 stdout F at
java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
com.getindata.flink.ingestion.JdbcXaSinkDeserializationWrapper.invoke(JdbcXaSinkDeserializationWrapper.java:22)
~[ingestion-1.2.1.jar:?]
2021-04-15T22:16:04.498583738+00:00 stdout F at
com.getindata.flink.ingestion.JdbcXaSinkDeserializationWrapper.invoke(JdbcXaSinkDeserializationWrapper.java:37)
~[ingestion-1.2.1.jar:?]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.invoke(JdbcXaSinkFunction.java:287)
~[ingestion-1.2.1.jar:?]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
~[ingestion-1.2.1.jar:?]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
~[ingestion-1.2.1.jar:?]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
~[ingestion-1.2.1.jar:?]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.connector.jdbc.internal.executor.DynamicBatchStatementExecutor.executeBatch(DynamicBatchStatementExecutor.java:73)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
~[ingestion-1.2.1.jar:?]
2021-04-15T22:16:04.498583738+00:00 stdout F at
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
~[ingestion-1.2.1.jar:?]
2021-04-15T22:16:04.498583738+00:00 stdout F at
oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
~[ingestion-1.2.1.jar:?]
2021-04-15T22:16:04.498583738+00:00 stdout F at
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9711)
~[ingestion-1.2.1.jar:?]
2021-04-15T22:16:04.498583738+00:00 stdout F
2021-04-15T22:16:04.498583738+00:00 stdout F java.sql.BatchUpdateException:
ORA-04021: timeout occurred while waiting to lock object
2021-04-15T22:16:04.498583738+00:00 stdout F 2021-04-15 22:16:04,490 ERROR
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat [] - JDBC
executeBatch error, retry times = 0
{code}
was (Author: maver1ck):
This is the log.
I have few duplicate records with 2021-04-15T22:15:42 timestamp.
{code:java}
1:052021-04-15T23:16:49.762722475+00:00 stdout F 2021-04-15 23:16:49,761 INFO
org.apache.kafka.clients.FetchSessionHandler [] - [Consumer
clientId=consumer-flink-ingestion-raw-2, groupId=flink-ingestion-raw] Error
sending fetch request (sessionId=842995853, epoch=11339) to node 1: {}.
2021-04-15T22:16:04.498583738+00:00 stdout F at
java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
2021-04-15T22:16:04.498583738+00:00 stdout F at
com.getindata.flink.ingestion.JdbcXaSinkDeserializationWrapper.invoke(JdbcXaSinkDeserializationWrapper.java:22)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at
com.getindata.flink.ingestion.JdbcXaSinkDeserializationWrapper.invoke(JdbcXaSinkDeserializationWrapper.java:37)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.invoke(JdbcXaSinkFunction.java:287)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at
org.apache.flink.connector.jdbc.internal.executor.DynamicBatchStatementExecutor.executeBatch(DynamicBatchStatementExecutor.java:73)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at
oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9711)
~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F
2021-04-15T22:16:04.498583738+00:00 stdout F java.sql.BatchUpdateException:
ORA-04021: timeout occurred while waiting to lock object
2021-04-15T22:16:04.498583738+00:00 stdout F 2021-04-15 22:16:04,490 ERROR
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat [] - JDBC
executeBatch error, retry times = 0
{code}
> Flink JDBC XA connector need to set maxRetries to 0 to properly working
> -----------------------------------------------------------------------
>
> Key: FLINK-22311
> URL: https://issues.apache.org/jira/browse/FLINK-22311
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.13.0
> Reporter: Maciej Bryński
> Priority: Major
>
> Hi,
> We're using XA connector from Flink 1.13 in one of our projects and we were
> able to create duplicates of records during write to Oracle.
> The reason was that default MAX_RETRIES in JdbcExecutionOptions is 3 and this
> can cause duplicates in DB.
> I think we should at least mention this in docs or even validate this option
> when creating XA Sink.
> In documentation we're using defaults.
> https://github.com/apache/flink/pull/10847/files#diff-a585e56c997756bb7517ebd2424e5fab5813cee67d8dee3eab6ddd0780aff627R88
--
This message was sent by Atlassian Jira
(v8.3.4#803005)