[
https://issues.apache.org/jira/browse/BEAM-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939262#comment-16939262
]
Anton Bankovskii edited comment on BEAM-7230 at 9/27/19 11:28 AM:
------------------------------------------------------------------
Unfortunately I came across the same behavior while executing the pipeline
using the DataflowRunner.
Exception in Dataflow console:
{noformat}
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: Caused
by:
[SKIPPED] 14 more Caused by: java.lang.NullPointerException at
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.buildDataSource(JdbcIO.java:1394)
at
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1389)
at
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1369)
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.setup(JdbcIO.java:862){noformat}
Testing locally with DirectRunner gives no error.
Here's my code:
{code:java}
public class ReadFromJdbcFn extends PTransform<PBegin, PCollection<String>> {
private JdbcToCsvOptions options;
public ReadFromJdbcFn(JdbcToCsvOptions options) {
this.options = options;
}
@Override
public PCollection<String> expand(PBegin input) {
final JdbcIO.DataSourceConfiguration dataSourceConfiguration =
configure(options.getConnectionString(),
options.getDriverName(), options.getUser(),
options.getPassword(), options.getConnectionProperties());
final SerializableFunction<Void, DataSource> dataSourceProviderFunction
=
JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration);
return input.apply(
JdbcIO.<String>read()
.withDataSourceProviderFn(dataSourceProviderFunction)
.withCoder(StringUtf8Coder.of())
.withFetchSize(options.getFetchSize())
.withQuery(options.getQuery())
.withRowMapper(new JdbcToCsvRowMapper()));
}
private static JdbcIO.DataSourceConfiguration configure(String connString,
String driver, String user, String password,
String
connProperties) {
final JdbcIO.DataSourceConfiguration dataSourceConfiguration =
JdbcIO.DataSourceConfiguration.create(driver, connString)
.withUsername(user)
.withPassword(password);
return connProperties == null ?
dataSourceConfiguration :
dataSourceConfiguration.withConnectionProperties(connProperties);
}
}
{code}
Also, I wonder will the Dataflow autoscale it's worker based on the amount of
data read from database?
was (Author: stabmeqt):
Unfortunately I came across the same behavior while executing the pipeline
using the DataflowRunner.
Exception in Dataflow console:
{noformat}
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: Caused
by:
[SKIPPED] 14 more Caused by: java.lang.NullPointerException at
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.buildDataSource(JdbcIO.java:1394)
at
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1389)
at
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1369)
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.setup(JdbcIO.java:862){noformat}
Testing locally with DirectRunner gives no error.
Here's my code:
{code:java}
public class ReadFromJdbcFn extends PTransform<PBegin, PCollection<String>> {
private JdbcToCsvOptions options;
public ReadFromJdbcFn(JdbcToCsvOptions options) {
this.options = options;
}
@Override
public PCollection<String> expand(PBegin input) {
final JdbcIO.DataSourceConfiguration dataSourceConfiguration =
configure(options.getConnectionString(),
options.getDriverName(), options.getUser(),
options.getPassword(), options.getConnectionProperties());
final SerializableFunction<Void, DataSource> dataSourceProviderFunction
=
JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration);
return input.apply(
JdbcIO.<String>read()
.withDataSourceProviderFn(dataSourceProviderFunction)
.withCoder(StringUtf8Coder.of())
.withFetchSize(options.getFetchSize())
.withQuery(options.getQuery())
.withRowMapper(new JdbcToCsvRowMapper()));
}
private static JdbcIO.DataSourceConfiguration configure(String connString,
String driver, String user, String password,
String
connProperties) {
final JdbcIO.DataSourceConfiguration dataSourceConfiguration =
JdbcIO.DataSourceConfiguration.create(driver, connString)
.withUsername(user)
.withPassword(password);
return connProperties == null ?
dataSourceConfiguration :
dataSourceConfiguration.withConnectionProperties(connProperties);
}
}
{code}
> Using JdbcIO creates huge amount of connections
> -----------------------------------------------
>
> Key: BEAM-7230
> URL: https://issues.apache.org/jira/browse/BEAM-7230
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Affects Versions: 2.11.0
> Reporter: Brachi Packter
> Assignee: Ismaël Mejía
> Priority: Major
> Fix For: 2.13.0
>
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> I want to write form DataFlow to GCP cloud SQL, I'm using connection pool,
> and still I see huge amount of connections in GCP SQL (4k while I set
> connection pool to 300), and most of them in sleep.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)