I reviewed closely the runners ad it seems to me that:
- all batch runners that would fail to support the annotation will
fail already (spark structured streaming, apex) due to missing support
for state or timers
- streaming runners must explicitly enable this, _as long as they use
StatefulDoFnRunner_, which is the case for apex, flink and samza
I will explicitly disable any pipeline with this annotation for:
- dataflow, jet and gearpump (because I don't see usage of
StatefulDoFnRunner, although I though there was one, that's my mistake)
- all batch runners should either support the annotation or fail
already (due to missing support for state or timers)
Does this proposal solve the issues you see?
Regarding the process of introducing this annotation I tried really hard
to get to the best consensus I could. The same holds true for getting
core people involved in the review process (explicitly mentioned in the
PR, multiple mailing list threads). The PR was opened for discussion for
more than half a year. But because I agree with you, I proposed the BIP,
so that we can have a more explicit process for arriving at a consensus
for features like this. I'd be happy though, if we can get to consensus
about what to do now (if the steps I wrote above will solve every
doubts) and have a deeper process for similar features for future cases.
As I mentioned this feature is already implemented and having open PR
into core for nearly a year is expensive to keep it in sync with master.
On 2/7/20 9:31 PM, Kenneth Knowles wrote:
TL;DR I am not suggesting that you must implement this for any runner.
I'm afraid I do have to propose this change be rolled back before
release 2.21.0 unless we fix this. I think the fix is easily achieved.
Clarifications inline.
On Fri, Feb 7, 2020 at 11:20 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Kenn,
I think that this approach is not well maintainable and doesn't
scale. Main reasons:
a) modifying core has by definition some impact on runners, so
modifying core would imply necessity to modify all runners
My concern is not about all changes to "core" but only changes to the
model, which should be extraordinarily rare. They must receive extreme
scrutiny and require a very high level of consensus. It is true that
every runner needs to either correctly execute or refuse to execute
every pipeline, to the extent possible. For the case we are talking
about it is very easy to meet this requirement.
b) having to implement core feature for all existing runners will
make any modification to core prohibitively expensive
No one is suggesting this. I am saying that you need to write the 1
line that I linked to "if (usesRequiresTimeSortedInput) then reject
pipeline" so the runner fails before it begins processing data,
potentially consuming non-replayable messages.
c) even if we accept this, there can be runners that are outside
of beam repo (or even closed source!)
Indeed. And those runners need time to adapt to the new proto fields.
I did not mention it this time, because the proto is not considered
stable. But very soon it will be. At that point additions like this
will have to be fully specified and added to the proto long before
they are enabled for use. That way all runners can adjust. The proper
order is (1) add model feature (2) make runners reject it, unsupported
(3) add functionality to SDK (4) add to some runners and enable.
Therefore I think, that the correct and scalable approach would be
to split this into several pieces:
1) define pipeline requirements (this is pretty much similar to
how we currently scope @Category(ValidatesRunner.class) tests
2) let pipeline infer it's requirements prior to being translated
via runner
3) runner can check the set of required features and their
support and reject the pipeline if some feature is missing
This is exactly what happens today, but was not included in your
change. The pipeline proto (or the Java pipeline object) clearly
contain all the needed information. Whether pipeline summarizes it or
the runner implements a trivial PipelineVisitor is not important.
This could even replace the annotations used in validates runner
tests, because each runner would simply execute all tests it has
enough features to run.
What you have described is exactly what happens today.
But as I mentioned - this is pretty much deep change. I don't know
how to safely do this for current runners, but to actually
implement the feature (it seems to be to me nearly equally
complicated to fail pipeline in batch case and to actually
implement the sorting).
Indeed. This feature hasn't really got consensus. The proposal thread
[1] never really concluded affirmatively [1]. The [VOTE] thread
indicates a clear *lack* of consensus, with all people who weighed in
asking to raise awareness and build more support and consensus. Robert
made the good point that if it is (a) useful and (b) not easy for
users to do themselves, then we should consider it, even if most
people here are not interested in the feature. So that is the closest
thing to approval that this feature has. But getting more people
interested and on board would get better feedback and achieve a better
result for our users.
And as a final note, the PR was not reviewed by the core people who
built out state & timers, nor those who built out DoFn annotation
systems, nor any runner author, nor those working on the Beam model
protos. You really should have gotten most of these people involved.
They would likely have caught the issues described here.
The specific action that I am proposing is to implement the 1 liner
described in all runners. It might be best to roll back and proceed
with steps 1-4 I outlined above, so we can be sure things are
proceeding well.
Kenn
[1]
https://lists.apache.org/thread.html/b91f96121d37bf16403acbd88bc264cf16e40ddb636f0435276e89aa%40%3Cdev.beam.apache.org%3E
[2]
https://lists.apache.org/thread.html/91b87940ba7736f9f1021928271a0090f8a0096e5e3f9e52de89acf2%40%3Cdev.beam.apache.org%3E
It would be super cool if anyone would be interested in
implementing this in runners that don't currently support it. A
side note - currently the annotation is not supported by all
streaming runners due to missing guarantees for timers ordering
(which can lead to data losss). I think I have found a solution to
this, see [1], but I'd like to be 100% sure, before enabling the
support (I'm not sure what is the impact of mis-ordered timers on
output timestamps, and so on, and so forth).
Jan
[1]
https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209
On 2/7/20 7:53 PM, Kenneth Knowles wrote:
I see. It is good to see that the pipeline will at least fail.
However, the expect approach here is that the pipeline is
rejected prior to execution. That is a primary reason for our
annotation-driven API style; it allows much better "static"
analysis by a runner, so we don't have to wait and fail late.
Here is an example:
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940
Kenn
On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Kenn,
that should not be the case. Care was taken to fail streaming
pipeline which needs this ability and the runner doesn't
support this [1]. It is true, however, that a batch pipeline
will not fail, because there is no generic (runner agnostic)
way of supporting this transform in batch case (which is why
the annotation was needed). Failing batch pipelines in this
case would mean runners have to understand this annotation,
which is pretty much close to implementing this feature as a
whole.
This applies generally to any core functionality, it might
take some time before runners fully support this. I don't
know how to solve it, maybe add record to capability matrix?
I can imagine a fully generic solution (runners might publish
their capabilities and pipeline might be validated against
these capabilities at pipeline build time), but that is
obviously out of scope of the annotation.
Jan
[1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
On 2/7/20 1:01 AM, Kenneth Knowles wrote:
There is a major problem with this merge: the runners that
do not support it do not reject pipelines that need this
feature. They will silently produce the wrong answer,
causing data loss.
Kenn
On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi,
the PR was merged to master and a few follow-up issues,
were created,
mainly [1] and [2]. I didn't find any reference to
SortedMapState in
JIRA, is there any tracking issue for that that I can
link to? I also
added link to design document here [3].
[1] https://issues.apache.org/jira/browse/BEAM-9256
[2] https://issues.apache.org/jira/browse/BEAM-9257
[3]
https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
On 1/30/20 1:39 PM, Jan Lukavský wrote:
> Hi,
>
> PR [1] (issue [2]) went though code review, and
according to [3] seems
> to me to be ready for merge. Current state of the
implementation is
> that it is supported only for direct runner, legacy
flink runner
> (batch and streaming) and legacy spark (batch). It
could be supported
> by all other (streaming) runners using
StatefulDoFnRunner, provided
> the runner can make guarantees about ordering of timer
firings (which
> is unfortunately the case only for legacy flink and
direct runner, at
> least for now - related issues are mentioned multiple
times on other
> threads). Implementation for other batch runners
should be as
> straightforward as adding sorting by event timestamp
before stateful
> dofn (in case where the runner doesn't sort already -
e.g. Dataflow -
> in which case the annotation can be simply ignored -
hence support for
> batch Dataflow seems to be a no-op).
>
> There has been some slight controversy about this
feature, but current
> feature proposing and implementing guidelines do not
cover how to
> resolve those, so I'm using this opportunity to let
the community
> know, that there is a plan to merge this feature,
unless there is some
> veto (please provide specific reasons for that in that
case). The plan
> is to merge this in the second part of next week,
unless there is a veto.
>
> Thanks,
>
> Jan
>
> [1] https://github.com/apache/beam/pull/8774
>
> [2] https://issues.apache.org/jira/browse/BEAM-8550
>
> [3] https://beam.apache.org/contribute/committer-guide/
>