Agree, I would prefer to do the callback in the IO more than in the main.

Regards
JB

On 12/01/2017 03:54 PM, Steve Niemitz wrote:
I do something almost exactly like this, but with BigtableIO instead.  I have a pull request open here [1] (which reminds me I need to finish this up...).  It would really be nice for most IOs to support something like this.

Essentially you do a GroupByKey (or some CombineFn) on the output from the BigtableIO, and then feed that into your function which will run when all writes finish.

You probably want to avoid doing something in the main method because there's no guarantee it'll actually run (maybe the driver will die, get killed, machine will explode, etc).

[1] https://github.com/apache/beam/pull/3997

On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <[email protected] <mailto:[email protected]>> wrote:

    Assuming you're in Java. You could just follow on in your Main method.
    Checking the state of the Result.

    Example:
    PipelineResult result = pipeline.run();
    try {
    result.waitUntilFinish();
    if(result.getState() == PipelineResult.State.DONE) {
    //DO ES work
    }
    } catch(Exception e) {
    result.cancel();
    throw e;
    }

    Otherwise you could also use Oozie to construct a work flow.

    On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré <[email protected]
    <mailto:[email protected]>> wrote:

        Hi,

        yes, we had a similar question some days ago.

        We can imagine to have a user callback fn fired when the sink batch is
        complete.

        Let me think about that.

        Regards
        JB

        On 12/01/2017 09:04 AM, Philip Chan wrote:

            Hey JB,

            Thanks for getting back so quickly.
            I suppose in that case I would need a way of monitoring when the ES
            transform completes successfully before I can proceed with doing the
            swap.
            The problem with this is that I can't think of a good way to
            determine that termination state short of polling the new index to
            check the document count compared to the size of input PCollection.
            That, or maybe I'd need to use an external system like you mentioned
            to poll on the state of the pipeline (I'm using Google Dataflow, so
            maybe there's a way to do this with some API).
            But I would have thought that there would be an easy way of simply
            saying "do not process this transform until this other transform
            completes".
            Is there no established way of "signaling" between pipelines when
            some pipeline completes, or have some way of declaring a dependency
            of 1 transform on another transform?

            Thanks again,
            Philip

            On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré
            <[email protected] <mailto:[email protected]> 
<mailto:[email protected]
            <mailto:[email protected]>>> wrote:

                 Hi Philip,

                 You won't be able to do (3) in the same pipeline as the
            Elasticsearch Sink
                 PTransform ends the pipeline with PDone.

                 So, (3) has to be done in another pipeline (using a DoFn) or in
            another
                 "system" (like Camel for instance). I would do a check of the
            data in the
                 index and then trigger the swap there.

                 Regards
                 JB

                 On 12/01/2017 08:41 AM, Philip Chan wrote:

                     Hi,

                     I'm pretty new to Beam, and I've been trying to use the
            ElasticSearchIO
                     sink to write docs into ES.
                     With this, I want to be able to
                     1. ingest and transform rows from DB (done)
                     2. write JSON docs/strings into a new ES index (done)
                     3. After (2) is complete and all documents are written into
            a new index,
                     trigger an atomic index swap under an alias to replace the
            current
                     aliased index with the new index generated in step 2. This
            is basically
                     a single POST request to the ES cluster.

                     The problem I'm facing is that I don't seem to be able to
            find a way to
                     have a way for (3) to happen after step (2) is complete.

                     The ElasticSearchIO.Write transform returns a PDone, and
            I'm not sure
                     how to proceed from there because it doesn't seem to let me
            do another
                     apply on it to "define" a dependency.
            
https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
            
<https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
<https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
            
<https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>
<https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
            
<https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
<https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
            
<https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>>

                     Is there a recommended way to construct pipelines workflows
            like this?

                     Thanks in advance,
                     Philip


                 --     Jean-Baptiste Onofré
            [email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>
            http://blog.nanthrax.net
                 Talend - http://www.talend.com



-- Jean-Baptiste Onofré
        [email protected] <mailto:[email protected]>
        http://blog.nanthrax.net
        Talend - http://www.talend.com




-- Nick Verbeck - NerdyNick
    ----------------------------------------------------
    NerdyNick.com <http://NerdyNick.com>
    TrailsOffroad.com <http://TrailsOffroad.com>
    NoKnownBoundaries.com <http://NoKnownBoundaries.com>




--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to