It sounds like the jdbc driver's connection is closed somehow, and probably
has nothing to do with flink itself.
Maybe you could check if there's some settings on the db that could close
the connection after some inactivity, or otherwise it could be your network
drops the inactive tcp connection after some time (you can try to use tcp
keepalive in this case).

BR,


Le mar. 2 mars 2021 à 19:38, Fuyao Li <fuyao...@oracle.com> a écrit :

> Sorry for the uncompleted email.
>
>
>
> Error log of broken pipeline, the failed SQL will be executed after
> checkpoint automatic recovery. Please share some ideas on this issue.
> Really appreciate it. Thanks!
>
>
>
> 09:20:02,868 ERROR
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC
> executeBatch error, retry times = 3
>
> java.sql.SQLRecoverableException: Closed Connection
>
>                 at
> oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
>
>                 at
> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
>
>                 at
> oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
>
>                 at
> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
>
>                 at
> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
>
>                 at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
>
>                 at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
>
>                 at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>
>                 at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
>                 at java.base/java.lang.Thread.run(Thread.java:834)
>
> 09:20:02,869 WARN
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  -
> Writing records to JDBC failed.
>
> java.io.IOException: java.sql.SQLRecoverableException: Closed Connection
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
>
>                 at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
>
>                 at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>
>                 at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
>                 at java.base/java.lang.Thread.run(Thread.java:834)
>
> Caused by: java.sql.SQLRecoverableException: Closed Connection
>
>                 at
> oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
>
>                 at
> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
>
>                 at
> oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
>
>                 at
> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
>
>                 at
> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
>
>                 at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
>
>                 ... 11 more
>
> 09:20:02,869 WARN
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Error
> closing producer.
>
> java.lang.NoSuchMethodError:
> org.apache.kafka.clients.producer.KafkaProducer.close(Ljava/time/Duration;)V
>
>                 at
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:172)
>
>                 at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:949)
>
>                 at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>
>                 at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
>                 at java.base/java.lang.Thread.run(Thread.java:834)
>
> 09:20:02,871 WARN
> org.apache.flink.runtime.taskmanager.Task                     -
> ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink:
> invoice-notification, Sink: Print to Std. Out, Sink: header-notification,
> Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out,
> Sink: distributions-notification) (1/1)#0
> (b57e84496b38c77e8201536e7d0e3723) switched from RUNNING to FAILED.
>
> java.io.IOException: Writing records to JDBC failed.
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)
>
>                 at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
>
>                 at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
>
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>
>                 at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
>
>                 at
> org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
>
>                 at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
>
>                 at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
>
>                 at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
>
>                 at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
>
>                 at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>
>                 at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
>                 at java.base/java.lang.Thread.run(Thread.java:834)
>
> Caused by: java.io.IOException: Reestablish JDBC connection failed
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
>
>                 ... 29 more
>
> Caused by: java.sql.SQLRecoverableException: Closed Connection
>
>                 at
> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)
>
>                 at
> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)
>
>                 at
> oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)
>
>                 at
> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)
>
>                 at
> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)
>
>                 at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)
>
>                 ... 30 more
>
> 09:20:02,872 INFO
> org.apache.flink.runtime.taskmanager.Task                     - Freeing
> task resources for ProcessTableOutput -> (Sink: adwSink, Sink: Print to
> Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink:
> header-notification, Sink: Print to Std. Out, Sink: lines-notification,
> Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0
> (b57e84496b38c77e8201536e7d0e3723).
>
> 09:20:02,878 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor            -
> Un-registering task and sending final execution state FAILED to JobManager
> for task ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out,
> Sink: invoice-notification, Sink: Print to Std. Out, Sink:
> header-notification, Sink: Print to Std. Out, Sink: lines-notification,
> Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0
> b57e84496b38c77e8201536e7d0e3723.
>
> 09:20:02,880 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink:
> invoice-notification, Sink: Print to Std. Out, Sink: header-notification,
> Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out,
> Sink: distributions-notification) (1/1) (b57e84496b38c77e8201536e7d0e3723)
> switched from RUNNING to FAILED on bc6eadd0-19bf-4924-9e6e-62ba5f239d1e @
> localhost (dataPort=-1).
>
> java.io.IOException: Writing records to JDBC failed.
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)
>
>                 at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
>
>                 at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
>
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>
>                 at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
>
>                 at
> org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
>
>                 at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
>
>                 at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
>
>                 at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
>
>                 at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
>
>                 at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>
>                 at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
>                 at java.base/java.lang.Thread.run(Thread.java:834)
>
> Caused by: java.io.IOException: Reestablish JDBC connection failed
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
>
>                 ... 29 more
>
> Caused by: java.sql.SQLRecoverableException: Closed Connection
>
>                 at
> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)
>
>                 at
> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)
>
>                 at
> oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)
>
>                 at
> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)
>
>                 at
> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)
>
>                 at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
>
>                 at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)
>
>                 ... 30 more
>
>
>
>
>
> Thanks,
>
>
>
> Best regards,
>
> Fuyao
>
>
>
> *From: *Fuyao Li <fuyao...@oracle.com>
> *Date: *Tuesday, March 2, 2021 at 10:33
> *To: *user <user@flink.apache.org>, Timo Walther <twal...@apache.org>
> *Subject: *Need help with JDBC Broken Pipeline Issue after some idle time
>
> Hi Flink Community,
>
>
>
> I need some help with JDBC sink in Datastream API. I can produce some
> records and sink it to database correctly. However, if I wait for 5 minutes
> between insertions. I will run into broken pipeline issue. Ater that, the
> Flink application will restart and recover from checkpoint and execute the
> failed SQL query. I tried hard to search for resources to understand such
> broken pipeline will happen, but I still can’t understand it.
>
>
>
> The interesting thing is that, if the idle time is around 3 minutes,
> everything seems to be fine.
>
>
>
> It seems to be a timeout related issue, but I just don’t know what should
> I do to fix the issue. I have shared the sink code. Could anyone share some
> ideas? Thank you so much!
>
> My environment settings:
>
> Flink version: 1.12.1
>
> Scala version: 2.11
>
> Java version: 1.11
>
> Flink System parallelism: 1
>
> JDBC Driver: Oracle ojdbc10
>
> Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You
> can regard this as an cloud based Oracle Database)
>
>
>
> The code for the sink:
>
>         boDataStream
>
>         .addSink(
>
>             JdbcSink.sink(
>
>                 "INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)",
>
>                 (preparedStatement, testInvoiceBo) -> {
>
>                   try {
>
>                       Gson gson = new GsonBuilder()
>
>                               .excludeFieldsWithoutExposeAnnotation()
>
>                               .create();
>
>                       String invoiceId = testInvoiceBo.getINVOICE_ID();
>
>                       String json = gson.toJson(testInvoiceBo);
>
>                       log.info("insertion information: {}", json);
>
>                       preparedStatement.setString(1, invoiceId);
>
>                       preparedStatement.setString(2, json);
>
>                   } catch (JsonIOException e) {
>
>                       log.error("Failed to parse JSON", e);
>
>                   }
>
>                 },
>
>                 new JdbcExecutionOptions.Builder()
>
>                 .withBatchIntervalMs(0)
>
>                 .withBatchSize(1)
>
>                 .withMaxRetries(3)
>
>                 .build(),
>
>                 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>
>                     .withUrl(DB_URL)
>
>                     .withDriverName("oracle.jdbc.driver.OracleDriver")
>
>                     .withUsername("admin")
>
>                     .withPassword("password")
>
>                     .build()))
>
>         .name("adwSink")
>
>         .uid("adwSink")
>
>         .setParallelism(1);
>
>
>
> The JDBC broken pipeline log:
>
>
>
>
>

Reply via email to