Hi Jonathan,

Wait.on() requires a PCollection - it is not possible to change it to wait
on PDone because all PDone's in the pipeline are the same so it's not clear
what exactly you'd be waiting on.

To use the Wait transform with JdbcIO.write(), you would need to change
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L761-L762
to
simply "return input.apply(ParDo.of(...))" and propagate that into the type
signature. Then you'd get a waitable PCollection<Void>.

This is a very simple, but backwards-incompatible change. Up to the Beam
community whether/when people would want to make it.

It's also possible to make a slightly larger but compatible change, where
JdbcIO.write() would stay as is, but you could write e.g.
"JdbcIO.write().withResults()" which would be a new transform that *does*
return results and is waitable. A similar approach is taken in
TextIO.write().withOutputFilenames().

On Wed, Feb 20, 2019 at 4:58 AM Jonathan Perron <jonathan.per...@lumapps.com>
wrote:

> Hello folks,
>
> I am meeting a special case where I need to wait for a JdbcIO.write()
> operation to be complete to start a second one.
>
> In the details, I have a PCollection<Map<String, String>> which is used
> to fill two different SQL statement. It is used in a first
> JdbcIO.write() operation to store anonymized user in a table (userId
> with an associated userUuid generated with UUID.randomUUID()). These two
> parameters have a unique constraint, meaning that a userId cannot have
> multiple userUuid. Unfortunately, on several runs of my pipeline, the
> UUID will be different, meaning that I need to query this table at some
> point, or to use what I describe in the following.
>
> I am planning to fill a second table with this userUuid with a couple of
> others information such as the time of first visit. To limit I/O and as
> I got a lot of information in my PCollection, I want to use it once more
> with a different SQL statement, where the userUuid is read from the
> first table using a SELECT statement. This cannot work if the first
> JdbcIO.write() operation is not complete.
>
> I saw that the Java SDK proposes a Wait.on() PTransform, but it is
> unfortunately only compatible with PCollection, and not a PDone such as
> the one output from the JdbcIO operation. Could my issue be solved by
> expanding the Wait.On() or should I go with an other solution ? If so,
> how could I implement it ?
>
> Many thanks for your input !
>
> Jonathan
>
>

Reply via email to