Hey,
Thank you for your help. With your help I was able to identify the
problem within my code. The problem is that my trigger followed by an
Latest.perKey transformation results in a
AfterSynchronizedProcessingTime trigger.
The minimal example of this problem is the following:
https://gist.github.com/janb15/8c536dbdbc1be40e2698122257f74760
I did try this within the wordcount project using version 2.27.0. This
did not fix the problem.
The use case that i want to implement uses a changelog stream to enrich
my input with further data using side inputs. The stream, which is used
to generate the side input is only updated when a user performs manual
re-configuration. Because of this I use a trigger on a global window to
periodically generate a view. Is there a way to circumvent the creation
of a synchronized processing time trigger?
With many thanks,
Jan
Am 2021-02-10 00:57, schrieb Xinyu Liu:
Jan,
I tried the latest Beam 2.27 version and ran into the same issue as you
saw. I dug a bit deeper and it was caused by the recent changes in beam
to
enable SplitableParDo in all runners. While we are going to work with
Beam
to get this resolved, you can avoid the issue by adding this argument
"--experiments=use_deprecated_read" when running your program. This
flag
will disable the new code path to make it work as before.
I also tried your triggering code in the KafkaWordCount example in
samza-beam-examples git repo
(https://github.com/apache/samza-beam-examples).
Seems it is working for me as I can see the 1 second early firing
within a
10 sec window and the fired panes are accumulated. You can also use
this
git repo as a reference.
Thanks,
Xinyu
On Mon, Feb 8, 2021 at 9:41 AM Xinyu Liu <xinyuliu...@gmail.com> wrote:
Hi, Jan,
Thanks for reporting this issue to us. Processing time triggers are
supported in Samza Runner with version Beam 2.22.0 [1]. The
exception message wasn't updated after we added the support of
processing
time. Apologize for the confusion here. Looks most of the exception
messages have been fixed in the latest code.
From reading the code, it seems we will only run into this exception if
we
somehow end up having TimeDomain as synchronized_processing_time [2].
Samza
runner does not support this time domain. Are you aware that your code
might use it somehow? If not, I can help debug further. We have other
users
who use processing time triggers for early triggering, and it was
working
fine.
I will also take a look at 2.27.0. LinkedIn has been recently upgraded
to
2.26.0, and we found a few issues. Previously we were using a version
close
to 2.24.0.
Thanks,
Xinyu
[1]:
https://github.com/apache/beam/blob/9b43fadb8bb6f4bcabc945fc299b378eb1d7d205/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java#L347
[2]
https://github.com/apache/beam/blob/055140203ce2df56ba903b05266466cf16562dde/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java#L49
On Sun, Feb 7, 2021 at 2:24 PM Jan Bensien <stu128...@mail.uni-kiel.de>
wrote:
Hello,
I am currently trying to execute my Beam Pipelines using the Samza
Runner. I am using processing time triggers for calculating early
results for my larger windows.
However i am getting the following error:
java.lang.UnsupportedOperationException: class
org.apache.beam.runners.samza.SamzaRunner currently only supports event
time.
Looking at the capability matrix of
Beam(https://beam.apache.org/documentation/runners/capability-matrix/),
it looks like processing time should be supported.
I could not find a documentation, for the exact supported features for
the different runner versions.
I am using the version 2.22.0 for the Samza Runner but also tried
2.25.0
and got the same error. When i tried to upgrade to 2.27.0 I got the
following error: java.lang.UnsupportedOperationException:
BundleFinalizer unsupported in Samza. This happens whenever i use
KafkaIO to read from Kafka. Even when i tried a Pipeline that did
nothing except reading from Kafka.
The trigger that caused the exception is the following: .
triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1))))
.accumulatingFiredPanes());
Running the pipeline with the Direct Runner worked fine. Which version
is the latest stable version of the Samza Runner and does it support
processing time triggers?
With many thanks,
Jan