I have created new Jira issue for this feature:
https://issues.apache.org/jira/browse/BEAM-6732

Jonathan, feel free to assign it to yourself if you want to contribute, it is 
always welcomed =)

> On 21 Feb 2019, at 10:23, Jonathan Perron <jonathan.per...@lumapps.com> wrote:
> 
> Thank you Eugene for your answer.
> 
> According to your explanation, I think I will go with your 3rd solution, as 
> this seems the most robust and friendly way to act.
> 
> Jonathan
> On 21/02/2019 02:22, Eugene Kirpichov wrote:
>> Hi Jonathan,
>> 
>> Wait.on() requires a PCollection - it is not possible to change it to wait 
>> on PDone because all PDone's in the pipeline are the same so it's not clear 
>> what exactly you'd be waiting on.
>> 
>> To use the Wait transform with JdbcIO.write(), you would need to change 
>> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L761-L762
>>  
>> <https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L761-L762>
>>  to simply "return input.apply(ParDo.of(...))" and propagate that into the 
>> type signature. Then you'd get a waitable PCollection<Void>.
>> 
>> This is a very simple, but backwards-incompatible change. Up to the Beam 
>> community whether/when people would want to make it.
>> 
>> It's also possible to make a slightly larger but compatible change, where 
>> JdbcIO.write() would stay as is, but you could write e.g. 
>> "JdbcIO.write().withResults()" which would be a new transform that *does* 
>> return results and is waitable. A similar approach is taken in 
>> TextIO.write().withOutputFilenames().
>> 
>> On Wed, Feb 20, 2019 at 4:58 AM Jonathan Perron <jonathan.per...@lumapps.com 
>> <mailto:jonathan.per...@lumapps.com>> wrote:
>> Hello folks,
>> 
>> I am meeting a special case where I need to wait for a JdbcIO.write() 
>> operation to be complete to start a second one.
>> 
>> In the details, I have a PCollection<Map<String, String>> which is used 
>> to fill two different SQL statement. It is used in a first 
>> JdbcIO.write() operation to store anonymized user in a table (userId 
>> with an associated userUuid generated with UUID.randomUUID()). These two 
>> parameters have a unique constraint, meaning that a userId cannot have 
>> multiple userUuid. Unfortunately, on several runs of my pipeline, the 
>> UUID will be different, meaning that I need to query this table at some 
>> point, or to use what I describe in the following.
>> 
>> I am planning to fill a second table with this userUuid with a couple of 
>> others information such as the time of first visit. To limit I/O and as 
>> I got a lot of information in my PCollection, I want to use it once more 
>> with a different SQL statement, where the userUuid is read from the 
>> first table using a SELECT statement. This cannot work if the first 
>> JdbcIO.write() operation is not complete.
>> 
>> I saw that the Java SDK proposes a Wait.on() PTransform, but it is 
>> unfortunately only compatible with PCollection, and not a PDone such as 
>> the one output from the JdbcIO operation. Could my issue be solved by 
>> expanding the Wait.On() or should I go with an other solution ? If so, 
>> how could I implement it ?
>> 
>> Many thanks for your input !
>> 
>> Jonathan
>> 

Reply via email to