Hi Sergii, your requirements feel a bit odd. It's neither batch nor streaming.
Could you tell us why it's not possible to let the job run as a streaming job that runs continuously? Is it just a matter of saving costs? If so, you could monitor the number of records being processed and trigger stop/cancel-with-savepoint accordingly. On Mon, May 18, 2020 at 7:19 AM Congxian Qiu <qcx978132...@gmail.com> wrote: > Hi Sergii > > If I understand correctly, you want to process all the files in some > directory, and do not want to process them multiple times. I'm not sure if > using `FileProcessingMode#PROCESS_CONTINUOUSLY` > instead of `FileProcessingMode#PROCESS_ONCE`[1] can satisfy your needs, > and keep the job running 7*24. > > but be careful, under `FileProcessingMode#CONTINUOUSLY` mode, when a file > is modified, its contents are re-processed entirely. This can break the > “exactly-once” semantics, as appending data at the end of a file will lead > to all its contents being re-processed. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#data-sources > > Best, > Congxian > > > Sergii Mikhtoniuk <mikhton...@gmail.com> 于2020年5月18日周一 上午5:47写道: > >> Hello, >> >> I'm migrating my Spark-based stream processing application to Flink >> (Calcite SQL and temporal tables look too attractive to resist). >> >> My Spark app works as follows: >> - application is started periodically >> - it reads a directory of Parquet files as a stream >> - SQL transformations are applied >> - resulting append stream is written to another directory >> - it runs until all available data is processed >> - checkpoints its state >> - and **exits** >> - upon next run it resumes where it left off, processing only new data >> >> I'm having difficulties replicating this start-stop-resume behavior with >> Flink. >> >> When I setup my input stream using: >> >> env.readFile[Row](..., FileProcessingMode.PROCESS_CONTINUOUSLY) >> >> ... I get an infinite stream, but the application will naturally keep >> running until aborted manually. >> >> When I use FileProcessingMode.PROCESS_ONCE - the application exits after >> exhausting all inputs, but it seems that Flink also treats the end of the >> stream as max watermark so, for example, it will close all tumbling windows >> that I don't want to be closed yet since more data will arrive upon next >> run. >> >> Is there a way not to emit a max watermark with PROCESS_ONCE? If so, can >> I still trigger a savepoint when env.execute() returns? >> >> Alternatively, if I use PROCESS_CONTINUOUSLY along with >> env.executeAsync() is there a way for me to detect when file stream was >> exhausted to call job.stopWithSavepoint()? >> >> Thanks for your help! >> - Sergii >> >> -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng