Which runner are you using ? Also, do you have the bottom of the StackTrace here ? It's possibly due to Docker containers running the Java SDK not having access to your database, but I'm not sure based on the information provided.
Thanks, Cham On Tue, Feb 21, 2023 at 11:32 AM Somnath Chouwdhury < somnat...@datametica.com> wrote: > Hii team, > > We are facing an issue while trying to push data to RDBMS(oracle in our > case) while it runs for small amount of records but when is run through > bigger dataset it fails, throwing this error, > > Error message from worker: org.apache.beam.sdk.util.UserCodeException: >> java.sql.BatchUpdateException: IO Error: Connection reset by peer >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) >> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown >> Source) >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) >> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) >> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) >> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) >> org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) >> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown >> Source) >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) >> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) >> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) >> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123) >> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546) >> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) >> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) >> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) >> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) >> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> java.base/java.lang.Thread.run(Thread.java:829) Caused by: >> java.sql.BatchUpdateException: IO Error: Connection reset by peer >> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345) >> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107) >> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987) >> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939) >> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261) >> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242) >> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242) >> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414) >> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363) >> Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: >> [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: >> [java.sql.SQLRecoverableException: Closed Connection]] at >> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174) >> at >> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403) >> at >> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363) >> at >> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown >> Source) at >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) >> at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at >> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown >> Source) at >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) >> at >> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) >> at >> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) >> at >> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) >> at >> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) >> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at >> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> at java.base/java.lang.Thread.run(Thread.java:829) Caused by: >> org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: >> [java.sql.SQLRecoverableException: Closed Connection] at >> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174) >> at >> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161) >> ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection >> at >> oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385) >> at >> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056) >> at >> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828) >> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at >> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146) >> at >> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110) >> at >> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161) >> ... 28 more > > > Here is the code snippet we are using > > coders.registry.register_coder(self.ExampleRow, coders.RowCoder) > > data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map( > lambda x: self.ExampleRow(**x)).with_output_types( > self.ExampleRow) \ > | f"Write to RDBMS" >> WriteToJdbc( > table_name=self.task["tablename"], > driver_class_name=self.task['driver_class_name'], > jdbc_url=self.task['jdbc_url'], > username=self.task['username'], > password=self.task['password'], > classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"]) > > > How do we use dataflow to push bulk data in batch/streaming. > > Thanks, > Somnath Chouwdhury. >