Hi Sergii,

your requirements feel a bit odd. It's neither batch nor streaming.

Could you tell us why it's not possible to let the job run as a streaming
job that runs continuously? Is it just a matter of saving costs?
If so, you could monitor the number of records being processed and trigger
stop/cancel-with-savepoint accordingly.

On Mon, May 18, 2020 at 7:19 AM Congxian Qiu <qcx978132...@gmail.com> wrote:

> Hi Sergii
>
> If I understand correctly, you want to process all the files in some
> directory, and do not want to process them multiple times. I'm not sure if
> using `FileProcessingMode#PROCESS_CONTINUOUSLY`
> instead of `FileProcessingMode#PROCESS_ONCE`[1] can satisfy your needs,
> and keep the job running 7*24.
>
> but be careful, under `FileProcessingMode#CONTINUOUSLY` mode, when a file
> is modified, its contents are re-processed entirely. This can break the
> “exactly-once” semantics, as appending data at the end of a file will lead
> to all its contents being re-processed.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#data-sources
>
> Best,
> Congxian
>
>
> Sergii Mikhtoniuk <mikhton...@gmail.com> 于2020年5月18日周一 上午5:47写道:
>
>> Hello,
>>
>> I'm migrating my Spark-based stream processing application to Flink
>> (Calcite SQL and temporal tables look too attractive to resist).
>>
>> My Spark app works as follows:
>> - application is started periodically
>> - it reads a directory of Parquet files as a stream
>> - SQL transformations are applied
>> - resulting append stream is written to another directory
>> - it runs until all available data is processed
>> - checkpoints its state
>> - and **exits**
>> - upon next run it resumes where it left off, processing only new data
>>
>> I'm having difficulties replicating this start-stop-resume behavior with
>> Flink.
>>
>> When I setup my input stream using:
>>
>>     env.readFile[Row](..., FileProcessingMode.PROCESS_CONTINUOUSLY)
>>
>> ... I get an infinite stream, but the application will naturally keep
>> running until aborted manually.
>>
>> When I use FileProcessingMode.PROCESS_ONCE - the application exits after
>> exhausting all inputs, but it seems that Flink also treats the end of the
>> stream as max watermark so, for example, it will close all tumbling windows
>> that I don't want to be closed yet since more data will arrive upon next
>> run.
>>
>> Is there a way not to emit a max watermark with PROCESS_ONCE? If so, can
>> I still trigger a savepoint when env.execute() returns?
>>
>> Alternatively, if I use PROCESS_CONTINUOUSLY along with
>> env.executeAsync() is there a way for me to detect when file stream was
>> exhausted to call job.stopWithSavepoint()?
>>
>> Thanks for your help!
>> - Sergii
>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to