Hi Robert,

thanks for this insight. I think that this sort of uncovered additional question - I'm not saying that I follow every thread in dev@, but I didn't notice anything about "trying to stabilize the protos", which is again where I think these big milestones probably should be defined in front in a form of BIP, BEP or whatever. I will try to get more time to invest into this (creating the cwiki, BIP template and some basic first BIP to have something to iterate on).

Jan

On 2/7/20 10:51 PM, Robert Bradshaw wrote:
There are two separable concerns here.

(1) The @RequiresTimeSortedInput feature itself. This is a subtle
feature needed for certain pipelines, and if anything Jan has gone the
extra mile discussing, documenting, and designing this and trying to
reach consensus. I feel like there has been a failure in the community
(myself included) to either fully accept it, or come up with sound
reasons to reject it, in a timely manner. (This is one of the things I
hope BEPs could address.) The feature seems similar in spirit to
@RequiresStableInputs which I also find a bit icky but can't think of
a way around. (My ideal implementation for both would be to express
this in terms of a naive implementation that could be swapped out by
more advanced runners...) That being said, I don't think we should
block on this forever.

(2) Especially as we're trying to stabilize the protos, how can one
safely add constraints like this such that runners will reject rather
than execute pipelines with unsatisfied constraints? For SDKs, we're
thinking about adding the notion of capabilities (as a list, or
possibly mapping, of URNs that get attached to an environment. Perhaps
a pipeline could likewise have a set of requirements for those "new"
features that augments what can be inferred by looking at the set of
transform URNs. In this case, @RequiresTimeSortedInput would be such a
requirement attached to any pipeline using this feature, and its
contract would be to look at (and respect) certain bits on the DoFns,
and a runner must reject any pipeline with unknown requirements. (If
it understands a requirement, it could reject it based on its ability
to satisfy the contract as it is actually used in the pipeline).

On Fri, Feb 7, 2020 at 12:31 PM Kenneth Knowles <[email protected]> wrote:
TL;DR I am not suggesting that you must implement this for any runner. I'm 
afraid I do have to propose this change be rolled back before release 2.21.0 
unless we fix this. I think the fix is easily achieved.

Clarifications inline.

On Fri, Feb 7, 2020 at 11:20 AM Jan Lukavský <[email protected]> wrote:
Hi Kenn,

I think that this approach is not well maintainable and doesn't scale. Main 
reasons:

  a) modifying core has by definition some impact on runners, so modifying core 
would imply necessity to modify all runners
My concern is not about all changes to "core" but only changes to the model, 
which should be extraordinarily rare. They must receive extreme scrutiny and require a 
very high level of consensus. It is true that every runner needs to either correctly 
execute or refuse to execute every pipeline, to the extent possible. For the case we are 
talking about it is very easy to meet this requirement.

  b) having to implement core feature for all existing runners will make any 
modification to core prohibitively expensive
No one is suggesting this. I am saying that you need to write the 1 line that I linked to 
"if (usesRequiresTimeSortedInput) then reject pipeline" so the runner fails 
before it begins processing data, potentially consuming non-replayable messages.

  c) even if we accept this, there can be runners that are outside of beam repo 
(or even closed source!)
Indeed. And those runners need time to adapt to the new proto fields. I did not 
mention it this time, because the proto is not considered stable. But very soon 
it will be. At that point additions like this will have to be fully specified 
and added to the proto long before they are enabled for use. That way all 
runners can adjust. The proper order is (1) add model feature (2) make runners 
reject it, unsupported (3) add functionality to SDK (4) add to some runners and 
enable.

Therefore I think, that the correct and scalable approach would be to split 
this into several pieces:

  1) define pipeline requirements (this is pretty much similar to how we 
currently scope @Category(ValidatesRunner.class) tests

  2) let pipeline infer it's requirements prior to being translated via runner

  3) runner can check the set of required features and their support and reject 
the pipeline if some feature is missing
This is exactly what happens today, but was not included in your change. The 
pipeline proto (or the Java pipeline object) clearly contain all the needed 
information. Whether pipeline summarizes it or the runner implements a trivial 
PipelineVisitor is not important.

This could even replace the annotations used in validates runner tests, because 
each runner would simply execute all tests it has enough features to run.
What you have described is exactly what happens today.

But as I mentioned - this is pretty much deep change. I don't know how to 
safely do this for current runners, but to actually implement the feature (it 
seems to be to me nearly equally complicated to fail pipeline in batch case and 
to actually implement the sorting).
Indeed. This feature hasn't really got consensus. The proposal thread [1] never 
really concluded affirmatively [1]. The [VOTE] thread indicates a clear *lack* 
of consensus, with all people who weighed in asking to raise awareness and 
build more support and consensus. Robert made the good point that if it is (a) 
useful and (b) not easy for users to do themselves, then we should consider it, 
even if most people here are not interested in the feature. So that is the 
closest thing to approval that this feature has. But getting more people 
interested and on board would get better feedback and achieve a better result 
for our users.

And as a final note, the PR was not reviewed by the core people who built out state 
& timers, nor those who built out DoFn annotation systems, nor any runner 
author, nor those working on the Beam model protos. You really should have gotten 
most of these people involved. They would likely have caught the issues described 
here.

The specific action that I am proposing is to implement the 1 liner described 
in all runners. It might be best to roll back and proceed with steps 1-4 I 
outlined above, so we can be sure things are proceeding well.

Kenn

[1] 
https://lists.apache.org/thread.html/b91f96121d37bf16403acbd88bc264cf16e40ddb636f0435276e89aa%40%3Cdev.beam.apache.org%3E
[2] 
https://lists.apache.org/thread.html/91b87940ba7736f9f1021928271a0090f8a0096e5e3f9e52de89acf2%40%3Cdev.beam.apache.org%3E
It would be super cool if anyone would be interested in implementing this in 
runners that don't currently support it. A side note - currently the annotation 
is not supported by all streaming runners due to missing guarantees for timers 
ordering (which can lead to data losss). I think I have found a solution to 
this, see [1], but I'd like to be 100% sure, before enabling the support (I'm 
not sure what is the impact of mis-ordered timers on output timestamps, and so 
on, and so forth).

Jan

[1] 
https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209

On 2/7/20 7:53 PM, Kenneth Knowles wrote:

I see. It is good to see that the pipeline will at least fail. However, the expect 
approach here is that the pipeline is rejected prior to execution. That is a primary 
reason for our annotation-driven API style; it allows much better "static" 
analysis by a runner, so we don't have to wait and fail late. Here is an example: 
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940

Kenn

On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <[email protected]> wrote:
Hi Kenn,

that should not be the case. Care was taken to fail streaming pipeline which 
needs this ability and the runner doesn't support this [1]. It is true, 
however, that a batch pipeline will not fail, because there is no generic 
(runner agnostic) way of supporting this transform in batch case (which is why 
the annotation was needed). Failing batch pipelines in this case would mean 
runners have to understand this annotation, which is pretty much close to 
implementing this feature as a whole.

This applies generally to any core functionality, it might take some time 
before runners fully support this. I don't know how to solve it, maybe add 
record to capability matrix? I can imagine a fully generic solution (runners 
might publish their capabilities and pipeline might be validated against these 
capabilities at pipeline build time), but that is obviously out of scope of the 
annotation.

Jan

[1] 
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150

On 2/7/20 1:01 AM, Kenneth Knowles wrote:

There is a major problem with this merge: the runners that do not support it do 
not reject pipelines that need this feature. They will silently produce the 
wrong answer, causing data loss.

Kenn

On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <[email protected]> wrote:
Hi,

the PR was merged to master and a few follow-up issues, were created,
mainly [1] and [2]. I didn't find any reference to SortedMapState in
JIRA, is there any tracking issue for that that I can link to? I also
added link to design document here [3].

[1] https://issues.apache.org/jira/browse/BEAM-9256

[2] https://issues.apache.org/jira/browse/BEAM-9257

[3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents

On 1/30/20 1:39 PM, Jan Lukavský wrote:
Hi,

PR [1] (issue [2]) went though code review, and according to [3] seems
to me to be ready for merge. Current state of the implementation is
that it is supported only for direct runner, legacy flink runner
(batch and streaming) and legacy spark (batch). It could be supported
by all other (streaming) runners using StatefulDoFnRunner, provided
the runner can make guarantees about ordering of timer firings (which
is unfortunately the case only for legacy flink and direct runner, at
least for now - related issues are mentioned multiple times on other
threads). Implementation for other batch runners should be as
straightforward as adding sorting by event timestamp before stateful
dofn (in case where the runner doesn't sort already - e.g. Dataflow -
in which case the annotation can be simply ignored - hence support for
batch Dataflow seems to be a no-op).

There has been some slight controversy about this feature, but current
feature proposing and implementing guidelines do not cover how to
resolve those, so I'm using this opportunity to let the community
know, that there is a plan to merge this feature, unless there is some
veto (please provide specific reasons for that in that case). The plan
is to merge this in the second part of next week, unless there is a veto.

Thanks,

  Jan

[1] https://github.com/apache/beam/pull/8774

[2] https://issues.apache.org/jira/browse/BEAM-8550

[3] https://beam.apache.org/contribute/committer-guide/

Reply via email to