Hello Cham, The Runner in use is Dataflow Runner. The last 28 lines aren't available in Cloud logging as well.
The Code shared above works just fine with 2-3 records but starts to fail when we try with a bigger source data payload. Does it look like multiple threads trying to acquire a write lock to the DB table(Oracle table)? *Thanks and Regards,* *Varun Rauthan* On Wed, Feb 22, 2023 at 1:23 AM Chamikara Jayalath via user < user@beam.apache.org> wrote: > Which runner are you using ? > > Also, do you have the bottom of the StackTrace here ? It's possibly due to > Docker containers running the Java SDK not having access to your database, > but I'm not sure based on the information provided. > > Thanks, > Cham > > On Tue, Feb 21, 2023 at 11:32 AM Somnath Chouwdhury < > somnat...@datametica.com> wrote: > >> Hii team, >> >> We are facing an issue while trying to push data to RDBMS(oracle in our >> case) while it runs for small amount of records but when is run through >> bigger dataset it fails, throwing this error, >> >> Error message from worker: org.apache.beam.sdk.util.UserCodeException: >>> java.sql.BatchUpdateException: IO Error: Connection reset by peer >>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) >>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown >>> Source) >>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) >>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) >>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) >>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) >>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) >>> org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) >>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown >>> Source) >>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) >>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) >>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) >>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123) >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546) >>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) >>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) >>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) >>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) >>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) >>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>> java.base/java.lang.Thread.run(Thread.java:829) Caused by: >>> java.sql.BatchUpdateException: IO Error: Connection reset by peer >>> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345) >>> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107) >>> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987) >>> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939) >>> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261) >>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242) >>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242) >>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414) >>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363) >>> Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: >>> [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: >>> [java.sql.SQLRecoverableException: Closed Connection]] at >>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174) >>> at >>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403) >>> at >>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363) >>> at >>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown >>> Source) at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) >>> at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at >>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown >>> Source) at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) >>> at >>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) >>> at >>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) >>> at >>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546) >>> at >>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) >>> at >>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) >>> at >>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) >>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at >>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>> at java.base/java.lang.Thread.run(Thread.java:829) Caused by: >>> org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: >>> [java.sql.SQLRecoverableException: Closed Connection] at >>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174) >>> at >>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161) >>> ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection >>> at >>> oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385) >>> at >>> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056) >>> at >>> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828) >>> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at >>> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146) >>> at >>> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110) >>> at >>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161) >>> ... 28 more >> >> >> Here is the code snippet we are using >> >> coders.registry.register_coder(self.ExampleRow, coders.RowCoder) >> >> data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map( >> lambda x: self.ExampleRow(**x)).with_output_types( >> self.ExampleRow) \ >> | f"Write to RDBMS" >> WriteToJdbc( >> table_name=self.task["tablename"], >> driver_class_name=self.task['driver_class_name'], >> jdbc_url=self.task['jdbc_url'], >> username=self.task['username'], >> password=self.task['password'], >> classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"]) >> >> >> How do we use dataflow to push bulk data in batch/streaming. >> >> Thanks, >> Somnath Chouwdhury. >> >
Error message from worker: org.apache.beam.sdk.util.UserCodeException: java.sql.BatchUpdateException: IO Error: Connection reset by peer org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123) org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546) org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.sql.BatchUpdateException: IO Error: Connection reset by peer oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345) oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107) oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987) oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939) oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261) org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242) org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242) org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414) org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363) Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: [java.sql.SQLRecoverableException: Closed Connection]] at org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174) at org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403) at org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363) at org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123) at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546) at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: [java.sql.SQLRecoverableException: Closed Connection] at org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174) at org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161) ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection at oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385) at oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056) at oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828) at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146) at oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110) at org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161) ... 28 more