When you run larger workloads, Dataflow likely tries to split the work into
more splits and may be also autoscaling to add more workers. So the number
of parallel connections to the Database will go up if the workload is
higher. So probably try adjusting the Database settings to allow more
parallel connections.

If that is not possible you can try using the following pipeline options to
keep the number of workers low.

'max_num_workers': caps the number of workers of the pipeline [1]
'number_of_worker_harness_threads': caps the number of worker threads in a
single worker VM. (try setting this to 1). [2]

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/sdks/python/apache_beam/options/pipeline_options.py#L938
[2]
https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/sdks/python/apache_beam/options/pipeline_options.py#L1116


On Tue, Feb 21, 2023 at 7:01 PM Varun Rauthan <varun.raut...@datametica.com>
wrote:

> Hello Cham,
>
> The Runner in use is Dataflow Runner.
> The last 28 lines aren't available in Cloud logging as well.
>
> The Code shared above works just fine with 2-3 records but starts to fail
> when we try with a bigger source data payload.
> Does it look like multiple threads trying to acquire a write lock to the
> DB table(Oracle table)?
>
> *Thanks and Regards,*
>
> *Varun Rauthan*
>
>
>
>
> On Wed, Feb 22, 2023 at 1:23 AM Chamikara Jayalath via user <
> user@beam.apache.org> wrote:
>
>> Which runner are you using ?
>>
>> Also, do you have the bottom of the StackTrace here ? It's possibly due
>> to Docker containers running the Java SDK not having access to your
>> database, but I'm not sure based on the information provided.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Feb 21, 2023 at 11:32 AM Somnath Chouwdhury <
>> somnat...@datametica.com> wrote:
>>
>>> Hii team,
>>>
>>> We are facing an issue while trying to push data to RDBMS(oracle in our
>>> case) while it runs for small amount of records but when is run through
>>> bigger dataset it fails, throwing this error,
>>>
>>> Error message from worker: org.apache.beam.sdk.util.UserCodeException:
>>>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>>>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>>>> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
>>>> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
>>>> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
>>>> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
>>>> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
>>>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>>>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414)
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>>>> Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>>> [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>>> [java.sql.SQLRecoverableException: Closed Connection]] at
>>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>>>> at
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403)
>>>> at
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>>>> at
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source) at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>>> at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>>> at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>>>> at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source) at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>>> at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>>> at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>>> at
>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>>>> at
>>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>>>> at
>>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>>>> at
>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>>>> at
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>>> at
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>>> at
>>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
>>>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> at java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>>>> org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>>> [java.sql.SQLRecoverableException: Closed Connection] at
>>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>>>> at
>>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>>>> ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection
>>>> at
>>>> oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385)
>>>> at
>>>> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056)
>>>> at
>>>> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828)
>>>> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at
>>>> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146)
>>>> at
>>>> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110)
>>>> at
>>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>>>> ... 28 more
>>>
>>>
>>> Here is the code snippet we are using
>>>
>>> coders.registry.register_coder(self.ExampleRow, coders.RowCoder)
>>>
>>> data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map(
>>>     lambda x: self.ExampleRow(**x)).with_output_types(
>>>     self.ExampleRow) \
>>>        | f"Write to RDBMS" >> WriteToJdbc(
>>>     table_name=self.task["tablename"],
>>>     driver_class_name=self.task['driver_class_name'],
>>>     jdbc_url=self.task['jdbc_url'],
>>>     username=self.task['username'],
>>>     password=self.task['password'],
>>>     classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"])
>>>
>>>
>>> How do we use dataflow to push bulk data in batch/streaming.
>>>
>>> Thanks,
>>> Somnath Chouwdhury.
>>>
>>

Reply via email to