Hi,
Thanks for the hint. The infinite loop was the solution and my pipeline
works now.
Regards
Klemens
Am 24.11.20 um 16:59 schrieb Timo Walther:
For debugging you can also implement a simple non-parallel source
using
`org.apache.flink.streaming.api.functions.source.SourceFunction`. You
would need to implement the run() method with an endless loop after
emitting all your records.
Regards,
Timo
On 24.11.20 16:07, Klemens Muthmann wrote:
Hi,
Thanks for your reply. I am using processing time instead of event
time, since we do get the events in batches and some might arrive
days later.
But for my current dev setup I just use a CSV dump of finite size as
input. I will hand over the pipeline to some other guys, who will
need to integrate it with an Apache Kafka Service. Output is written
to a Postgres-Database-System.
I'll have a look at your proposal and let you know if it worked,
after having finished a few prerequisite parts.
Regards
Klemens
Am 24.11.20 um 12:59 schrieb Timo Walther:
Hi Klemens,
what you are observing are reasons why event-time should be
preferred over processing-time. Event-time uses the timestamp of
your data while processing-time is to basic for many use cases. Esp.
when you want to reprocess historic data, you want to do that at
full speed instead of waiting 1 hour for 1-hour-windows.
If you want to use processing-time nevertheless, you need to use a
source that produced unbounded streams instead of bounded streams
such that the pipeline execution theoretically is infinite. Some
documentation can be found here [1] where you need to use the
`FileProcessingMode.PROCESS_CONTINUOUSLY`. But what kind of
connector are you currently using?
Regards,
Timo
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources
On 24.11.20 09:59, Klemens Muthmann wrote:
Hi,
I have written an Apache Flink Pipeline containing the following
piece of code (Java):
stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new
CustomAggregator()).print();
If I run the pipeline using local execution I see the following
behavior. The "CustomAggregator" calls the `createAccumulator` and
`add` methods correctly with the correct data. However it never
calls `getResult` and my pipeline simply finishes.
So I did a little research and found out that it works if I change
the code to:
stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new
CustomAggregator()).print();
Notice the reduced gap time for the processing time session window.
So it seems that execution only continues if the window has been
closed and if that takes too long, the execution simply aborts. I
guess another factor playing a part in the problem is, that my
initial data is read in much faster than 50 seconds. This results
in the pipeline being in a state where it only waits for the window
to be closed and having nothing else to do it decides that there is
no work left and simply shuts down.
My question now is if it is possible to tell the local execution
environment to wait for that window to be closed, instead of just
shutting down.
Thanks and Regards
Klemens Muthmann
--
Mit freundlichen Grüßen
Dr.-Ing. Klemens Muthmann
-----------------------------------
Cyface GmbH
Hertha-Lindner-Straße 10
01067 Dresden
web: www.cyface.de
email: klemens.muthm...@cyface.de