Hello Cham,

The Runner in use is Dataflow Runner.
The last 28 lines aren't available in Cloud logging as well.

The Code shared above works just fine with 2-3 records but starts to fail
when we try with a bigger source data payload.
Does it look like multiple threads trying to acquire a write lock to the DB
table(Oracle table)?

*Thanks and Regards,*

*Varun Rauthan*




On Wed, Feb 22, 2023 at 1:23 AM Chamikara Jayalath via user <
user@beam.apache.org> wrote:

> Which runner are you using ?
>
> Also, do you have the bottom of the StackTrace here ? It's possibly due to
> Docker containers running the Java SDK not having access to your database,
> but I'm not sure based on the information provided.
>
> Thanks,
> Cham
>
> On Tue, Feb 21, 2023 at 11:32 AM Somnath Chouwdhury <
> somnat...@datametica.com> wrote:
>
>> Hii team,
>>
>> We are facing an issue while trying to push data to RDBMS(oracle in our
>> case) while it runs for small amount of records but when is run through
>> bigger dataset it fails, throwing this error,
>>
>> Error message from worker: org.apache.beam.sdk.util.UserCodeException:
>>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>>> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
>>> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
>>> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
>>> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
>>> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
>>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>>> Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>> [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>> [java.sql.SQLRecoverableException: Closed Connection]] at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>>> at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403)
>>> at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>>> at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>> Source) at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>>> at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source) at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> at
>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>>> at
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>>> at
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>>> at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>> at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>> at
>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
>>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> at java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>>> org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>> [java.sql.SQLRecoverableException: Closed Connection] at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>>> at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>>> ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection
>>> at
>>> oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385)
>>> at
>>> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056)
>>> at
>>> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828)
>>> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at
>>> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146)
>>> at
>>> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110)
>>> at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>>> ... 28 more
>>
>>
>> Here is the code snippet we are using
>>
>> coders.registry.register_coder(self.ExampleRow, coders.RowCoder)
>>
>> data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map(
>>     lambda x: self.ExampleRow(**x)).with_output_types(
>>     self.ExampleRow) \
>>        | f"Write to RDBMS" >> WriteToJdbc(
>>     table_name=self.task["tablename"],
>>     driver_class_name=self.task['driver_class_name'],
>>     jdbc_url=self.task['jdbc_url'],
>>     username=self.task['username'],
>>     password=self.task['password'],
>>     classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"])
>>
>>
>> How do we use dataflow to push bulk data in batch/streaming.
>>
>> Thanks,
>> Somnath Chouwdhury.
>>
>
Error message from worker: org.apache.beam.sdk.util.UserCodeException: 
java.sql.BatchUpdateException: IO Error: Connection reset by peer
        
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
        
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
        org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
        
org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown 
Source)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
        
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
        
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
        
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
        
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
        
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
        
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
        
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.sql.BatchUpdateException: IO Error: Connection reset by peer
        
oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
        
oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
        
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
        
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
        
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
        
org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
        
org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
        
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414)
        
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
        Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: 
[org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: 
[java.sql.SQLRecoverableException: Closed Connection]]
                at 
org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
                at 
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403)
                at 
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
                at 
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
                at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
                at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
                at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
                at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
                at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
                at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
                at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
                at 
org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
                at 
org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown 
Source)
                at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
                at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
                at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
                at 
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
                at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
                at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
                at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
                at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
                at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
                at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
                at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                at 
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
                at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                at java.base/java.lang.Thread.run(Thread.java:829)
        Caused by: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: 
[java.sql.SQLRecoverableException: Closed Connection]
                at 
org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
                at 
org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
                ... 27 more
        Caused by: java.sql.SQLRecoverableException: Closed Connection
                at 
oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385)
                at 
oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056)
                at 
oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828)
                at 
oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811)
                at 
oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146)
                at 
oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110)
                at 
org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
                ... 28 more

Reply via email to