Thanks all for responding. To give a bit more context: - I'm building a tool that performs a *fully deterministic* stream processing of mission-critical data - all input data is in the form of an append-only event log (Parquet files) - users define streaming SQL transformations to do all kinds of analysis of those event logs (joins, enrichment, aggregating events into reports etc.) - results are also written as append-only event logs
I think this use case fits very well into "batch is a special case of streaming" idea. Even though I have all data history on disk I want the SQL queries users write to be fully agnostic of how data is ingested, stored, how frequently new data arrives, or how frequently it's processed. Say for example you want to generate a weekly summary report of COVID-19 cases per country: - You could write a batch job that reads the last processed week end date from previous output, checks if one full week passed since then, checks that all input data sources already posted full data for that week, and finally filters and aggregates the data. - ...But isn't it much more elegant to express it as a tumbling window aggregation, where watermarks do all the hard job for you? This kind of "write query once and run it forever" is what I'm aiming for and why I'm not using batch processing. As for why start-stop-continue behavior - most data I'm dealing with is low volume and low frequency. Think open datasets you find on government data portals, e.g. property assessment data, zoning, transit lines. All of those are updated very infrequently, so If I run the application constantly it will be idle 99% of the time. Thus I use the "pull" model, where user runs the app to update some query result to the latest available data when necessary. I realize that this kind of usage is very different from how Flink is usually deployed, but imho it's not too far-fetched. Going back to specific problems that I encountered: It seems to be not possible to use Flink 1.10 for *unbounded* file streams at all. When reading files with FileProcessingMode.PROCESS_CONTINUOUSLY the following line emits the MAX_WATERMARK into the stream even when env.stopWithSavepoint() is called, prematurely closing my tumbling windows: https://github.com/apache/flink/blob/release-1.10.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L222 I had to implement my own ContinuousFileReaderOperator as a workaround. > @Arvid: You could monitor the number of records being processed and trigger stop/cancel-with-savepoint accordingly. I was thinking in a similar direction, but got stuck on what records I should be counting and how in case of a JOIN... if I have two or more input streams and some arbitrary SQL transforms - the number of rows in the output may be different from the number of rows read. I'm curious how Flink handles data in-flight during env.stopWithSavepoint(): - Will it wait for records to propagate through the topology? ...In this case all I need is to ensure that reader read all available data before calling stop. - ...Or will in-flight records become part of the savepoint? ...In this case I'll need to think of a way to make sure not only reading but all processing finishes too. Is there perhaps a way to send other types of special messages like Watermarks through the streams without them being treated as data? I wonder if I could send some special "InputWillBlock" message from the file readers and wait for it to propagate to the output to understand that processing has finished. Again, thanks everyone for your help! - Sergii On Mon, May 18, 2020 at 8:45 AM Thomas Huang <lyang...@hotmail.com> wrote: > Hi, > > Actually, seems like spark dynamic allocation saves more resources in > that case. > > ------------------------------ > *From:* Arvid Heise <ar...@ververica.com> > *Sent:* Monday, May 18, 2020 11:15:09 PM > *To:* Congxian Qiu <qcx978132...@gmail.com> > *Cc:* Sergii Mikhtoniuk <mikhton...@gmail.com>; user < > user@flink.apache.org> > *Subject:* Re: Process available data and stop with savepoint > > Hi Sergii, > > your requirements feel a bit odd. It's neither batch nor streaming. > > Could you tell us why it's not possible to let the job run as a streaming > job that runs continuously? Is it just a matter of saving costs? > If so, you could monitor the number of records being processed and trigger > stop/cancel-with-savepoint accordingly. > > On Mon, May 18, 2020 at 7:19 AM Congxian Qiu <qcx978132...@gmail.com> > wrote: > > Hi Sergii > > If I understand correctly, you want to process all the files in some > directory, and do not want to process them multiple times. I'm not sure if > using `FileProcessingMode#PROCESS_CONTINUOUSLY` > instead of `FileProcessingMode#PROCESS_ONCE`[1] can satisfy your needs, > and keep the job running 7*24. > > but be careful, under `FileProcessingMode#CONTINUOUSLY` mode, when a file > is modified, its contents are re-processed entirely. This can break the > “exactly-once” semantics, as appending data at the end of a file will lead > to all its contents being re-processed. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#data-sources > > Best, > Congxian > > > Sergii Mikhtoniuk <mikhton...@gmail.com> 于2020年5月18日周一 上午5:47写道: > > Hello, > > I'm migrating my Spark-based stream processing application to Flink > (Calcite SQL and temporal tables look too attractive to resist). > > My Spark app works as follows: > - application is started periodically > - it reads a directory of Parquet files as a stream > - SQL transformations are applied > - resulting append stream is written to another directory > - it runs until all available data is processed > - checkpoints its state > - and **exits** > - upon next run it resumes where it left off, processing only new data > > I'm having difficulties replicating this start-stop-resume behavior with > Flink. > > When I setup my input stream using: > > env.readFile[Row](..., FileProcessingMode.PROCESS_CONTINUOUSLY) > > ... I get an infinite stream, but the application will naturally keep > running until aborted manually. > > When I use FileProcessingMode.PROCESS_ONCE - the application exits after > exhausting all inputs, but it seems that Flink also treats the end of the > stream as max watermark so, for example, it will close all tumbling windows > that I don't want to be closed yet since more data will arrive upon next > run. > > Is there a way not to emit a max watermark with PROCESS_ONCE? If so, can I > still trigger a savepoint when env.execute() returns? > > Alternatively, if I use PROCESS_CONTINUOUSLY along with env.executeAsync() > is there a way for me to detect when file stream was exhausted to call > job.stopWithSavepoint()? > > Thanks for your help! > - Sergii > > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >