I created https://issues.apache.org/jira/browse/BEAM-9273 and will send a PR for that in a few days.

On 2/8/20 12:04 AM, Kenneth Knowles wrote:
Regarding StatefulDoFnRunner: this fails during pipeline execution, too late, and as you noted is just a utility that a runner may optionally use. The change needs to be in the runner's run() method prior to execution starting. Here is a specific PR that demonstrates the technique: https://github.com/apache/beam/pull/3420. You could make some generalized shared code that runner's could share I suppose.

Regarding having a pipeline-level summary of required features: this introduces an opportunity for inconsistency where the pipeline misses some needed features. Jan added the needed info the proto so it can be scraped out easily, though it is still easy for a runner to just not be updated. So that's a proto design issue. With PTransform URNs if there is a leaf PTransform with an unknown URN the runner necessarily fails. That is a better model.

Regarding consensus: it is true that Jan did reach out repeatedly and it was the community that didn't engage. It is reasonable to move to implementation eventually. Yet we still need to avoid the code-level issues so need review. On this PR, to name names, I would consider Reuven or Luke to be valuable reviewers. Also the email thread suffers from a tragedy of the commons. You did directly ask for review. But mentions and using GitHub's "review request" are probably a good way to get the PR actually onto someone's dashboard.

Kenn

On Fri, Feb 7, 2020 at 1:52 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    I reviewed closely the runners ad it seems to me that:

     - all batch runners that would fail to support the annotation
    will fail already (spark structured streaming, apex) due to
    missing support for state or timers

     - streaming runners must explicitly enable this, _as long as they
    use StatefulDoFnRunner_, which is the case for apex, flink and samza

    I will explicitly disable any pipeline with this annotation for:

     - dataflow, jet and gearpump (because I don't see usage of
    StatefulDoFnRunner, although I though there was one, that's my
    mistake)

     - all batch runners should either support the annotation or fail
    already (due to missing support for state or timers)

    Does this proposal solve the issues you see?

    Regarding the process of introducing this annotation I tried
    really hard to get to the best consensus I could. The same holds
    true for getting core people involved in the review process
    (explicitly mentioned in the PR, multiple mailing list threads).
    The PR was opened for discussion for more than half a year. But
    because I agree with you, I proposed the BIP, so that we can have
    a more explicit process for arriving at a consensus for features
    like this. I'd be happy though, if we can get to consensus about
    what to do now (if the steps I wrote above will solve every
    doubts) and have a deeper process for similar features for future
    cases. As I mentioned this feature is already implemented and
    having open PR into core for nearly a year is expensive to keep it
    in sync with master.

    On 2/7/20 9:31 PM, Kenneth Knowles wrote:
    TL;DR I am not suggesting that you must implement this for any
    runner. I'm afraid I do have to propose this change be rolled
    back before release 2.21.0 unless we fix this. I think the fix is
    easily achieved.

    Clarifications inline.

    On Fri, Feb 7, 2020 at 11:20 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Kenn,

        I think that this approach is not well maintainable and
        doesn't scale. Main reasons:

         a) modifying core has by definition some impact on runners,
        so modifying core would imply necessity to modify all runners

    My concern is not about all changes to "core" but only changes to
    the model, which should be extraordinarily rare. They must
    receive extreme scrutiny and require a very high level of
    consensus. It is true that every runner needs to either correctly
    execute or refuse to execute every pipeline, to the extent
    possible. For the case we are talking about it is very easy to
    meet this requirement.

         b) having to implement core feature for all existing runners
        will make any modification to core prohibitively expensive

    No one is suggesting this. I am saying that you need to write the
    1 line that I linked to "if (usesRequiresTimeSortedInput) then
    reject pipeline" so the runner fails before it begins processing
    data, potentially consuming non-replayable messages.

         c) even if we accept this, there can be runners that are
        outside of beam repo (or even closed source!)

    Indeed. And those runners need time to adapt to the new proto
    fields. I did not mention it this time, because the proto is not
    considered stable. But very soon it will be. At that point
    additions like this will have to be fully specified and added to
    the proto long before they are enabled for use. That way all
    runners can adjust. The proper order is (1) add model feature (2)
    make runners reject it, unsupported (3) add functionality to SDK
    (4) add to some runners and enable.

        Therefore I think, that the correct and scalable approach
        would be to split this into several pieces:

         1) define pipeline requirements (this is pretty much similar
        to how we currently scope @Category(ValidatesRunner.class) tests

         2) let pipeline infer it's requirements prior to being
        translated via runner

         3) runner can check the set of required features and their
        support and reject the pipeline if some feature is missing

    This is exactly what happens today, but was not included in your
    change. The pipeline proto (or the Java pipeline object) clearly
    contain all the needed information. Whether pipeline summarizes
    it or the runner implements a trivial PipelineVisitor is not
    important.

        This could even replace the annotations used in validates
        runner tests, because each runner would simply execute all
        tests it has enough features to run.

    What you have described is exactly what happens today.

        But as I mentioned - this is pretty much deep change. I don't
        know how to safely do this for current runners, but to
        actually implement the feature (it seems to be to me nearly
        equally complicated to fail pipeline in batch case and to
        actually implement the sorting).

    Indeed. This feature hasn't really got consensus. The proposal
    thread [1] never really concluded affirmatively [1]. The [VOTE]
    thread indicates a clear *lack* of consensus, with all people who
    weighed in asking to raise awareness and build more support and
    consensus. Robert made the good point that if it is (a) useful
    and (b) not easy for users to do themselves, then we should
    consider it, even if most people here are not interested in the
    feature. So that is the closest thing to approval that this
    feature has. But getting more people interested and on board
    would get better feedback and achieve a better result for our users.

    And as a final note, the PR was not reviewed by the core people
    who built out state & timers, nor those who built out DoFn
    annotation systems, nor any runner author, nor those working on
    the Beam model protos. You really should have gotten most of
    these people involved. They would likely have caught the issues
    described here.

    The specific action that I am proposing is to implement the 1
    liner described in all runners. It might be best to roll back and
    proceed with steps 1-4 I outlined above, so we can be sure things
    are proceeding well.

    Kenn

    [1]
    
https://lists.apache.org/thread.html/b91f96121d37bf16403acbd88bc264cf16e40ddb636f0435276e89aa%40%3Cdev.beam.apache.org%3E
    [2]
    
https://lists.apache.org/thread.html/91b87940ba7736f9f1021928271a0090f8a0096e5e3f9e52de89acf2%40%3Cdev.beam.apache.org%3E

        It would be super cool if anyone would be interested in
        implementing this in runners that don't currently support it.
        A side note - currently the annotation is not supported by
        all streaming runners due to missing guarantees for timers
        ordering (which can lead to data losss). I think I have found
        a solution to this, see [1], but I'd like to be 100% sure,
        before enabling the support (I'm not sure what is the impact
        of mis-ordered timers on output timestamps, and so on, and so
        forth).

        Jan

        [1]
        
https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209

        On 2/7/20 7:53 PM, Kenneth Knowles wrote:
        I see. It is good to see that the pipeline will at least
        fail. However, the expect approach here is that the pipeline
        is rejected prior to execution. That is a primary reason for
        our annotation-driven API style; it allows much better
        "static" analysis by a runner, so we don't have to wait and
        fail late. Here is an example:
        
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940


        Kenn

        On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:

            Hi Kenn,

            that should not be the case. Care was taken to fail
            streaming pipeline which needs this ability and the
            runner doesn't support this [1]. It is true, however,
            that a batch pipeline will not fail, because there is no
            generic (runner agnostic) way of supporting this
            transform in batch case (which is why the annotation was
            needed). Failing batch pipelines in this case would mean
            runners have to understand this annotation, which is
            pretty much close to implementing this feature as a whole.

            This applies generally to any core functionality, it
            might take some time before runners fully support this.
            I don't know how to solve it, maybe add record to
            capability matrix? I can imagine a fully generic
            solution (runners might publish their capabilities and
            pipeline might be validated against these capabilities
            at pipeline build time), but that is obviously out of
            scope of the annotation.

            Jan

            [1]
            
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150

            On 2/7/20 1:01 AM, Kenneth Knowles wrote:
            There is a major problem with this merge: the runners
            that do not support it do not reject pipelines that
            need this feature. They will silently produce the wrong
            answer, causing data loss.

            Kenn

            On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                Hi,

                the PR was merged to master and a few follow-up
                issues, were created,
                mainly [1] and [2]. I didn't find any reference to
                SortedMapState in
                JIRA, is there any tracking issue for that that I
                can link to? I also
                added link to design document here [3].

                [1] https://issues.apache.org/jira/browse/BEAM-9256

                [2] https://issues.apache.org/jira/browse/BEAM-9257

                [3]
                
https://cwiki.apache.org/confluence/display/BEAM/Design+Documents

                On 1/30/20 1:39 PM, Jan Lukavský wrote:
                > Hi,
                >
                > PR [1] (issue [2]) went though code review, and
                according to [3] seems
                > to me to be ready for merge. Current state of the
                implementation is
                > that it is supported only for direct runner,
                legacy flink runner
                > (batch and streaming) and legacy spark (batch).
                It could be supported
                > by all other (streaming) runners using
                StatefulDoFnRunner, provided
                > the runner can make guarantees about ordering of
                timer firings (which
                > is unfortunately the case only for legacy flink
                and direct runner, at
                > least for now - related issues are mentioned
                multiple times on other
                > threads). Implementation for other batch runners
                should be as
                > straightforward as adding sorting by event
                timestamp before stateful
                > dofn (in case where the runner doesn't sort
                already - e.g. Dataflow -
                > in which case the annotation can be simply
                ignored - hence support for
                > batch Dataflow seems to be a no-op).
                >
                > There has been some slight controversy about this
                feature, but current
                > feature proposing and implementing guidelines do
                not cover how to
                > resolve those, so I'm using this opportunity to
                let the community
                > know, that there is a plan to merge this feature,
                unless there is some
                > veto (please provide specific reasons for that in
                that case). The plan
                > is to merge this in the second part of next week,
                unless there is a veto.
                >
                > Thanks,
                >
                >  Jan
                >
                > [1] https://github.com/apache/beam/pull/8774
                >
                > [2] https://issues.apache.org/jira/browse/BEAM-8550
                >
                > [3]
                https://beam.apache.org/contribute/committer-guide/
                >

Reply via email to