Hi Kenn,
I think that this approach is not well maintainable and doesn't scale.
Main reasons:
a) modifying core has by definition some impact on runners, so
modifying core would imply necessity to modify all runners
b) having to implement core feature for all existing runners will make
any modification to core prohibitively expensive
c) even if we accept this, there can be runners that are outside of
beam repo (or even closed source!)
Therefore I think, that the correct and scalable approach would be to
split this into several pieces:
1) define pipeline requirements (this is pretty much similar to how we
currently scope @Category(ValidatesRunner.class) tests
2) let pipeline infer it's requirements prior to being translated via
runner
3) runner can check the set of required features and their support and
reject the pipeline if some feature is missing
This could even replace the annotations used in validates runner tests,
because each runner would simply execute all tests it has enough
features to run.
But as I mentioned - this is pretty much deep change. I don't know how
to safely do this for current runners, but to actually implement the
feature (it seems to be to me nearly equally complicated to fail
pipeline in batch case and to actually implement the sorting). It would
be super cool if anyone would be interested in implementing this in
runners that don't currently support it. A side note - currently the
annotation is not supported by all streaming runners due to missing
guarantees for timers ordering (which can lead to data losss). I think I
have found a solution to this, see [1], but I'd like to be 100% sure,
before enabling the support (I'm not sure what is the impact of
mis-ordered timers on output timestamps, and so on, and so forth).
Jan
[1]
https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209
On 2/7/20 7:53 PM, Kenneth Knowles wrote:
I see. It is good to see that the pipeline will at least fail.
However, the expect approach here is that the pipeline is rejected
prior to execution. That is a primary reason for our annotation-driven
API style; it allows much better "static" analysis by a runner, so we
don't have to wait and fail late. Here is an example:
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940
Kenn
On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Kenn,
that should not be the case. Care was taken to fail streaming
pipeline which needs this ability and the runner doesn't support
this [1]. It is true, however, that a batch pipeline will not
fail, because there is no generic (runner agnostic) way of
supporting this transform in batch case (which is why the
annotation was needed). Failing batch pipelines in this case would
mean runners have to understand this annotation, which is pretty
much close to implementing this feature as a whole.
This applies generally to any core functionality, it might take
some time before runners fully support this. I don't know how to
solve it, maybe add record to capability matrix? I can imagine a
fully generic solution (runners might publish their capabilities
and pipeline might be validated against these capabilities at
pipeline build time), but that is obviously out of scope of the
annotation.
Jan
[1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
On 2/7/20 1:01 AM, Kenneth Knowles wrote:
There is a major problem with this merge: the runners that do not
support it do not reject pipelines that need this feature. They
will silently produce the wrong answer, causing data loss.
Kenn
On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi,
the PR was merged to master and a few follow-up issues, were
created,
mainly [1] and [2]. I didn't find any reference to
SortedMapState in
JIRA, is there any tracking issue for that that I can link
to? I also
added link to design document here [3].
[1] https://issues.apache.org/jira/browse/BEAM-9256
[2] https://issues.apache.org/jira/browse/BEAM-9257
[3]
https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
On 1/30/20 1:39 PM, Jan Lukavský wrote:
> Hi,
>
> PR [1] (issue [2]) went though code review, and according
to [3] seems
> to me to be ready for merge. Current state of the
implementation is
> that it is supported only for direct runner, legacy flink
runner
> (batch and streaming) and legacy spark (batch). It could be
supported
> by all other (streaming) runners using StatefulDoFnRunner,
provided
> the runner can make guarantees about ordering of timer
firings (which
> is unfortunately the case only for legacy flink and direct
runner, at
> least for now - related issues are mentioned multiple times
on other
> threads). Implementation for other batch runners should be as
> straightforward as adding sorting by event timestamp before
stateful
> dofn (in case where the runner doesn't sort already - e.g.
Dataflow -
> in which case the annotation can be simply ignored - hence
support for
> batch Dataflow seems to be a no-op).
>
> There has been some slight controversy about this feature,
but current
> feature proposing and implementing guidelines do not cover
how to
> resolve those, so I'm using this opportunity to let the
community
> know, that there is a plan to merge this feature, unless
there is some
> veto (please provide specific reasons for that in that
case). The plan
> is to merge this in the second part of next week, unless
there is a veto.
>
> Thanks,
>
> Jan
>
> [1] https://github.com/apache/beam/pull/8774
>
> [2] https://issues.apache.org/jira/browse/BEAM-8550
>
> [3] https://beam.apache.org/contribute/committer-guide/
>