There is no forking after the "Generate Queries" transform.

We noticed that the "Generate Queries" transform is in a different stage
than the reading itself. This is likely due to the Reparallelize-transform,
and we also see this with JdbcIO.readAll.

After reading up on Splittable DoFn's, we decided to give it a try. We
essentially copied the source of JdbcIO into our project and changed
`ReadFn` into `SplittableJdbcIO` which acts as (more or less) a drop-in
replacement (see source below).

The DAG here is seemingly simpler with a single stage containing all steps
from reading the DB to writing. We are also seeing that the job is
parallelizing much better than before.

class SplittableJdbcIO {
>     /* ... */
>         @ProcessElement
>         public void processElement(@Element ParameterT element,
>                                    RestrictionTracker<OffsetRange, Long>
> tracker,
>                                    OutputReceiver<OutputT> out) throws
> Exception {
>             if (connection == null) {
>                 connection = dataSource.getConnection();
>             }
>
>             if (!tracker.tryClaim(tracker.currentRestriction().getFrom()))
> {
>                 LOG.error("Failed to claim restriction");
>                 ProcessContinuation.stop();
>             }
>
>             LOG.info("Preparing query. fetchSize={}, shardSize={},
> from={}, to={}", fetchSize, shardSize,
>                     tracker.currentRestriction().getFrom(),
> tracker.currentRestriction().getTo());
>
>             String executeQuery = String.format("SELECT * FROM (%s) t
> OFFSET ? ROWS FETCH NEXT ? ROWS ONLY;", query.get());
>
>             // PostgreSQL requires autocommit to be disabled to enable
> cursor streaming
>             // see
> https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
>             LOG.debug("Autocommit has been disabled");
>             connection.setAutoCommit(false);
>             try (PreparedStatement statement =
> connection.prepareStatement(executeQuery, ResultSet.TYPE_FORWARD_ONLY,
> ResultSet.CONCUR_READ_ONLY)) {
>                 statement.setFetchSize(fetchSize);
>                 parameterSetter.setParameters(element, statement);
>
>
> statement.setLong(statement.getParameterMetaData().getParameterCount() - 1,
> tracker.currentRestriction().getFrom());
>
> statement.setLong(statement.getParameterMetaData().getParameterCount(),
>                         tracker.currentRestriction().getTo() -
> tracker.currentRestriction().getFrom());
>
>                 queryCounter.inc();
>
>                 int count = 0;
>                 long t0 = Instant.now().getMillis();
>
>                 try (ResultSet resultSet = statement.executeQuery()) {
>                     queryLatency.update(Instant.now().getMillis() - t0);
>                     LOG.info("Query took {} ms", Instant.now().getMillis()
> - t0);
>
>                     while (resultSet.next()) {
>                         out.output(rowMapper.mapRow(resultSet));
>
>                         rowCounter.inc();
>                         count++;
>                     }
>                     LOG.info("Fetched {} rows", count);
>
>                 }
>             }
>         }
>
>         @SplitRestriction
>         public void splitRestriction(@Element ParameterT element,
>                                      @Restriction OffsetRange range,
>                                      OutputReceiver<OffsetRange> out) {
>             Long span = range.getTo() - range.getFrom();
>
>             for (long i = range.getFrom(); i < range.getTo(); i +=
> shardSize) {
>                 out.output(new OffsetRange(i, Math.min(i + shardSize,
> range.getTo())));
>             }
>         }
>
>         @Teardown
>         public void teardown() throws Exception {
>             if (connection != null) {
>                 try {
>                     connection.close();
>                 } finally {
>                     connection = null;
>                 }
>             }
>         }
>
>         @GetInitialRestriction
>         public OffsetRange initialRestriction(@Element ParameterT element)
> throws Exception {
>
>             if (connection == null) {
>                 connection = dataSource.getConnection();
>             }
>
>             String executeQuery = String.format("SELECT COUNT(*) FROM (%s)
> t;", query.get());
>
>             try (PreparedStatement statement =
> connection.prepareStatement(executeQuery, ResultSet.TYPE_FORWARD_ONLY,
> ResultSet.CONCUR_READ_ONLY)) {
>                 parameterSetter.setParameters(element, statement);
>
>                 try (ResultSet resultSet = statement.executeQuery()) {
>                     resultSet.next();
>
>                     long result = resultSet.getLong(1);
>                     LOG.info("The query results in {} rows", result);
>                     return new OffsetRange(0, result);
>                 }
>             }
>         }
>     }
> }
>



On Tue, May 25, 2021 at 6:35 PM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Hi,
>
> Did you check a Spark DAG if it doesn’t fork branches after "Genereate
> queries” transform?
>
> —
> Alexey
>
> On 24 May 2021, at 20:32, Thomas Fredriksen(External) <
> thomas.fredrik...@cognite.com> wrote:
>
> Hi there,
>
> We are struggling to get the JdbcIO-connector to read a large table on
> spark.
>
> In short - we wish to read a large table (several billion rows), transform
> then write the transformed data to a new table.
>
> We are aware that `JdbcIO.read()` does not parallelize. In order to solve
> this, we attempted to create ranges then generate `limit/offset` queries
> and use `JdbcIO.readAll()` instead.
>
> The overall steps look something like this (sanitized for readability):
>
> ```
> pipeline
>   .apply("Read row count", JdbcIo.read()
>     .withQuery("select count(*) from MYTABLE;")
>     .withCoder(VarLongCoder.of())
>     .withOtherOptions(...))
>   .apply("Genereate queries", ParDo.of(new DoFn<Long, Long>() {...}) //
> Outputs table offsets
>   .apply("Read results", JdbcIO.<Long, Row>readAll()
>     .withCoder(SchemaCoder.of(...))
>     .withOutputParallelization(false)
>     .withQuery("select * from MYTABLE offset ? limit MYLIMIT;")
>     .withParameterSetter((element, statement) -> statement.setLong(1,
> element))
>     .withOtherOptions(...))
>   .apply("more steps", ...);
> ```
>
> The problem is that this does not seem to parallelize on the spark runner.
> Only a single worker seem to be doing all the work.
>
> We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`,
> however this did not seem to make a difference.
>
> Our goal is to avoid all data from the query be cached in memory between
> the read and transform operations. This causes OOM-exceptions. Having a
> single worker reading the database is okay as long as other workers can
> process the data as soon as it is read and not having to wait for all the
> data to be ready.
>
> Any advice on how we approach this.
>
> Best Regards
> Thomas Li Fredriksen
>
>
>

Reply via email to