Which runner are you using ?

Also, do you have the bottom of the StackTrace here ? It's possibly due to
Docker containers running the Java SDK not having access to your database,
but I'm not sure based on the information provided.

Thanks,
Cham

On Tue, Feb 21, 2023 at 11:32 AM Somnath Chouwdhury <
somnat...@datametica.com> wrote:

> Hii team,
>
> We are facing an issue while trying to push data to RDBMS(oracle in our
> case) while it runs for small amount of records but when is run through
> bigger dataset it fails, throwing this error,
>
> Error message from worker: org.apache.beam.sdk.util.UserCodeException:
>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>> org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
>> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
>> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
>> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
>> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414)
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>> Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>> [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>> [java.sql.SQLRecoverableException: Closed Connection]] at
>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>> at
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403)
>> at
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>> at
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source) at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>> at
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>> at
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>> at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at
>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source) at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>> at
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>> at
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>> at
>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>> at
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>> at
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>> at
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>> at
>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> at java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>> org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>> [java.sql.SQLRecoverableException: Closed Connection] at
>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>> at
>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>> ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection
>> at
>> oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385)
>> at
>> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056)
>> at
>> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828)
>> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at
>> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146)
>> at
>> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110)
>> at
>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>> ... 28 more
>
>
> Here is the code snippet we are using
>
> coders.registry.register_coder(self.ExampleRow, coders.RowCoder)
>
> data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map(
>     lambda x: self.ExampleRow(**x)).with_output_types(
>     self.ExampleRow) \
>        | f"Write to RDBMS" >> WriteToJdbc(
>     table_name=self.task["tablename"],
>     driver_class_name=self.task['driver_class_name'],
>     jdbc_url=self.task['jdbc_url'],
>     username=self.task['username'],
>     password=self.task['password'],
>     classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"])
>
>
> How do we use dataflow to push bulk data in batch/streaming.
>
> Thanks,
> Somnath Chouwdhury.
>

Reply via email to