There is no forking after the "Generate Queries" transform. We noticed that the "Generate Queries" transform is in a different stage than the reading itself. This is likely due to the Reparallelize-transform, and we also see this with JdbcIO.readAll.
After reading up on Splittable DoFn's, we decided to give it a try. We essentially copied the source of JdbcIO into our project and changed `ReadFn` into `SplittableJdbcIO` which acts as (more or less) a drop-in replacement (see source below). The DAG here is seemingly simpler with a single stage containing all steps from reading the DB to writing. We are also seeing that the job is parallelizing much better than before. class SplittableJdbcIO { > /* ... */ > @ProcessElement > public void processElement(@Element ParameterT element, > RestrictionTracker<OffsetRange, Long> > tracker, > OutputReceiver<OutputT> out) throws > Exception { > if (connection == null) { > connection = dataSource.getConnection(); > } > > if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) > { > LOG.error("Failed to claim restriction"); > ProcessContinuation.stop(); > } > > LOG.info("Preparing query. fetchSize={}, shardSize={}, > from={}, to={}", fetchSize, shardSize, > tracker.currentRestriction().getFrom(), > tracker.currentRestriction().getTo()); > > String executeQuery = String.format("SELECT * FROM (%s) t > OFFSET ? ROWS FETCH NEXT ? ROWS ONLY;", query.get()); > > // PostgreSQL requires autocommit to be disabled to enable > cursor streaming > // see > https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor > LOG.debug("Autocommit has been disabled"); > connection.setAutoCommit(false); > try (PreparedStatement statement = > connection.prepareStatement(executeQuery, ResultSet.TYPE_FORWARD_ONLY, > ResultSet.CONCUR_READ_ONLY)) { > statement.setFetchSize(fetchSize); > parameterSetter.setParameters(element, statement); > > > statement.setLong(statement.getParameterMetaData().getParameterCount() - 1, > tracker.currentRestriction().getFrom()); > > statement.setLong(statement.getParameterMetaData().getParameterCount(), > tracker.currentRestriction().getTo() - > tracker.currentRestriction().getFrom()); > > queryCounter.inc(); > > int count = 0; > long t0 = Instant.now().getMillis(); > > try (ResultSet resultSet = statement.executeQuery()) { > queryLatency.update(Instant.now().getMillis() - t0); > LOG.info("Query took {} ms", Instant.now().getMillis() > - t0); > > while (resultSet.next()) { > out.output(rowMapper.mapRow(resultSet)); > > rowCounter.inc(); > count++; > } > LOG.info("Fetched {} rows", count); > > } > } > } > > @SplitRestriction > public void splitRestriction(@Element ParameterT element, > @Restriction OffsetRange range, > OutputReceiver<OffsetRange> out) { > Long span = range.getTo() - range.getFrom(); > > for (long i = range.getFrom(); i < range.getTo(); i += > shardSize) { > out.output(new OffsetRange(i, Math.min(i + shardSize, > range.getTo()))); > } > } > > @Teardown > public void teardown() throws Exception { > if (connection != null) { > try { > connection.close(); > } finally { > connection = null; > } > } > } > > @GetInitialRestriction > public OffsetRange initialRestriction(@Element ParameterT element) > throws Exception { > > if (connection == null) { > connection = dataSource.getConnection(); > } > > String executeQuery = String.format("SELECT COUNT(*) FROM (%s) > t;", query.get()); > > try (PreparedStatement statement = > connection.prepareStatement(executeQuery, ResultSet.TYPE_FORWARD_ONLY, > ResultSet.CONCUR_READ_ONLY)) { > parameterSetter.setParameters(element, statement); > > try (ResultSet resultSet = statement.executeQuery()) { > resultSet.next(); > > long result = resultSet.getLong(1); > LOG.info("The query results in {} rows", result); > return new OffsetRange(0, result); > } > } > } > } > } > On Tue, May 25, 2021 at 6:35 PM Alexey Romanenko <aromanenko....@gmail.com> wrote: > Hi, > > Did you check a Spark DAG if it doesn’t fork branches after "Genereate > queries” transform? > > — > Alexey > > On 24 May 2021, at 20:32, Thomas Fredriksen(External) < > thomas.fredrik...@cognite.com> wrote: > > Hi there, > > We are struggling to get the JdbcIO-connector to read a large table on > spark. > > In short - we wish to read a large table (several billion rows), transform > then write the transformed data to a new table. > > We are aware that `JdbcIO.read()` does not parallelize. In order to solve > this, we attempted to create ranges then generate `limit/offset` queries > and use `JdbcIO.readAll()` instead. > > The overall steps look something like this (sanitized for readability): > > ``` > pipeline > .apply("Read row count", JdbcIo.read() > .withQuery("select count(*) from MYTABLE;") > .withCoder(VarLongCoder.of()) > .withOtherOptions(...)) > .apply("Genereate queries", ParDo.of(new DoFn<Long, Long>() {...}) // > Outputs table offsets > .apply("Read results", JdbcIO.<Long, Row>readAll() > .withCoder(SchemaCoder.of(...)) > .withOutputParallelization(false) > .withQuery("select * from MYTABLE offset ? limit MYLIMIT;") > .withParameterSetter((element, statement) -> statement.setLong(1, > element)) > .withOtherOptions(...)) > .apply("more steps", ...); > ``` > > The problem is that this does not seem to parallelize on the spark runner. > Only a single worker seem to be doing all the work. > > We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`, > however this did not seem to make a difference. > > Our goal is to avoid all data from the query be cached in memory between > the read and transform operations. This causes OOM-exceptions. Having a > single worker reading the database is okay as long as other workers can > process the data as soon as it is read and not having to wait for all the > data to be ready. > > Any advice on how we approach this. > > Best Regards > Thomas Li Fredriksen > > >