TL;DR I am not suggesting that you must implement this for any
runner. I'm afraid I do have to propose this change be rolled
back before release 2.21.0 unless we fix this. I think the fix is
easily achieved.
Clarifications inline.
On Fri, Feb 7, 2020 at 11:20 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Kenn,
I think that this approach is not well maintainable and
doesn't scale. Main reasons:
a) modifying core has by definition some impact on runners,
so modifying core would imply necessity to modify all runners
My concern is not about all changes to "core" but only changes to
the model, which should be extraordinarily rare. They must
receive extreme scrutiny and require a very high level of
consensus. It is true that every runner needs to either correctly
execute or refuse to execute every pipeline, to the extent
possible. For the case we are talking about it is very easy to
meet this requirement.
b) having to implement core feature for all existing runners
will make any modification to core prohibitively expensive
No one is suggesting this. I am saying that you need to write the
1 line that I linked to "if (usesRequiresTimeSortedInput) then
reject pipeline" so the runner fails before it begins processing
data, potentially consuming non-replayable messages.
c) even if we accept this, there can be runners that are
outside of beam repo (or even closed source!)
Indeed. And those runners need time to adapt to the new proto
fields. I did not mention it this time, because the proto is not
considered stable. But very soon it will be. At that point
additions like this will have to be fully specified and added to
the proto long before they are enabled for use. That way all
runners can adjust. The proper order is (1) add model feature (2)
make runners reject it, unsupported (3) add functionality to SDK
(4) add to some runners and enable.
Therefore I think, that the correct and scalable approach
would be to split this into several pieces:
1) define pipeline requirements (this is pretty much similar
to how we currently scope @Category(ValidatesRunner.class) tests
2) let pipeline infer it's requirements prior to being
translated via runner
3) runner can check the set of required features and their
support and reject the pipeline if some feature is missing
This is exactly what happens today, but was not included in your
change. The pipeline proto (or the Java pipeline object) clearly
contain all the needed information. Whether pipeline summarizes
it or the runner implements a trivial PipelineVisitor is not
important.
This could even replace the annotations used in validates
runner tests, because each runner would simply execute all
tests it has enough features to run.
What you have described is exactly what happens today.
But as I mentioned - this is pretty much deep change. I don't
know how to safely do this for current runners, but to
actually implement the feature (it seems to be to me nearly
equally complicated to fail pipeline in batch case and to
actually implement the sorting).
Indeed. This feature hasn't really got consensus. The proposal
thread [1] never really concluded affirmatively [1]. The [VOTE]
thread indicates a clear *lack* of consensus, with all people who
weighed in asking to raise awareness and build more support and
consensus. Robert made the good point that if it is (a) useful
and (b) not easy for users to do themselves, then we should
consider it, even if most people here are not interested in the
feature. So that is the closest thing to approval that this
feature has. But getting more people interested and on board
would get better feedback and achieve a better result for our users.
And as a final note, the PR was not reviewed by the core people
who built out state & timers, nor those who built out DoFn
annotation systems, nor any runner author, nor those working on
the Beam model protos. You really should have gotten most of
these people involved. They would likely have caught the issues
described here.
The specific action that I am proposing is to implement the 1
liner described in all runners. It might be best to roll back and
proceed with steps 1-4 I outlined above, so we can be sure things
are proceeding well.
Kenn
[1]
https://lists.apache.org/thread.html/b91f96121d37bf16403acbd88bc264cf16e40ddb636f0435276e89aa%40%3Cdev.beam.apache.org%3E
[2]
https://lists.apache.org/thread.html/91b87940ba7736f9f1021928271a0090f8a0096e5e3f9e52de89acf2%40%3Cdev.beam.apache.org%3E
It would be super cool if anyone would be interested in
implementing this in runners that don't currently support it.
A side note - currently the annotation is not supported by
all streaming runners due to missing guarantees for timers
ordering (which can lead to data losss). I think I have found
a solution to this, see [1], but I'd like to be 100% sure,
before enabling the support (I'm not sure what is the impact
of mis-ordered timers on output timestamps, and so on, and so
forth).
Jan
[1]
https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209
On 2/7/20 7:53 PM, Kenneth Knowles wrote:
I see. It is good to see that the pipeline will at least
fail. However, the expect approach here is that the pipeline
is rejected prior to execution. That is a primary reason for
our annotation-driven API style; it allows much better
"static" analysis by a runner, so we don't have to wait and
fail late. Here is an example:
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940
Kenn
On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi Kenn,
that should not be the case. Care was taken to fail
streaming pipeline which needs this ability and the
runner doesn't support this [1]. It is true, however,
that a batch pipeline will not fail, because there is no
generic (runner agnostic) way of supporting this
transform in batch case (which is why the annotation was
needed). Failing batch pipelines in this case would mean
runners have to understand this annotation, which is
pretty much close to implementing this feature as a whole.
This applies generally to any core functionality, it
might take some time before runners fully support this.
I don't know how to solve it, maybe add record to
capability matrix? I can imagine a fully generic
solution (runners might publish their capabilities and
pipeline might be validated against these capabilities
at pipeline build time), but that is obviously out of
scope of the annotation.
Jan
[1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
On 2/7/20 1:01 AM, Kenneth Knowles wrote:
There is a major problem with this merge: the runners
that do not support it do not reject pipelines that
need this feature. They will silently produce the wrong
answer, causing data loss.
Kenn
On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi,
the PR was merged to master and a few follow-up
issues, were created,
mainly [1] and [2]. I didn't find any reference to
SortedMapState in
JIRA, is there any tracking issue for that that I
can link to? I also
added link to design document here [3].
[1] https://issues.apache.org/jira/browse/BEAM-9256
[2] https://issues.apache.org/jira/browse/BEAM-9257
[3]
https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
On 1/30/20 1:39 PM, Jan Lukavský wrote:
> Hi,
>
> PR [1] (issue [2]) went though code review, and
according to [3] seems
> to me to be ready for merge. Current state of the
implementation is
> that it is supported only for direct runner,
legacy flink runner
> (batch and streaming) and legacy spark (batch).
It could be supported
> by all other (streaming) runners using
StatefulDoFnRunner, provided
> the runner can make guarantees about ordering of
timer firings (which
> is unfortunately the case only for legacy flink
and direct runner, at
> least for now - related issues are mentioned
multiple times on other
> threads). Implementation for other batch runners
should be as
> straightforward as adding sorting by event
timestamp before stateful
> dofn (in case where the runner doesn't sort
already - e.g. Dataflow -
> in which case the annotation can be simply
ignored - hence support for
> batch Dataflow seems to be a no-op).
>
> There has been some slight controversy about this
feature, but current
> feature proposing and implementing guidelines do
not cover how to
> resolve those, so I'm using this opportunity to
let the community
> know, that there is a plan to merge this feature,
unless there is some
> veto (please provide specific reasons for that in
that case). The plan
> is to merge this in the second part of next week,
unless there is a veto.
>
> Thanks,
>
> Jan
>
> [1] https://github.com/apache/beam/pull/8774
>
> [2] https://issues.apache.org/jira/browse/BEAM-8550
>
> [3]
https://beam.apache.org/contribute/committer-guide/
>