>> >> Yes, it should be possible to register a timer for Long.MAX_WATERMARK if >> you want to apply a transformation at the end of each key. You could >> also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode. > > According to [0], timer time is irrelevant since timer will be triggered > at the end of time right? If that is the case, we can use the same code > for both streaming and batch mode.
Yes, timers will fire regardless of it's value. However what I believe Dawid meant, is that if you pick a value not very far from the future, you are risking that the timer will fire while your job is still running. Picking MAX_WATERMARK would prevent that from happening. > Currently, we want to use batch execution mode [0] and historical data > to build state for our streaming application (...) > We hope that in this way, we can rebuild our states with almost the same code in streaming. If that's your main purpose, you can also consider using State Processor API [1] to bootstrap the state of your job. That's after the main purpose of the State Processor API. Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/ śr., 26 maj 2021 o 14:04 ChangZhuo Chen (陳昌倬) <czc...@czchen.org> napisał(a): > On Wed, May 26, 2021 at 01:03:53PM +0200, Dawid Wysakowicz wrote: > > Hi, > > > > No there is no API in the operator to know which mode it works in. We > > aim to have separate operators for both modes if required. You can check > > e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1]. > > Thanks for the information. We implement this according to Piotrek's > suggestion. > > > > > Yes, it should be possible to register a timer for Long.MAX_WATERMARK if > > you want to apply a transformation at the end of each key. You could > > also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode. > > According to [0], timer time is irrelevant since timer will be triggered > at the end of time right? If that is the case, we can use the same code > for both streaming and batch mode. > > [0] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/ > > > > > > A side note, I don't fully get what you mean by "build state for our > > streaming application". Bear in mind though you cannot take a savepoint > > from a job running in the BATCH execution mode. Moreover it uses a > > different kind of StateBackend. Actually a dummy one, which just > > imitates a real state backend. > > What we plan to do here is: > > 1. Load configuration from broadcast event (custom source backed by REST > API). > 2. Load historical events as batch mode input (From GCS). > 3. Use timer to trigger output so that the following will happen: > a. Serialize keyed states into JSON. > b. Output to Kafka. > c. Streaming application consumes data from Kafka, and update its > keyed states according to it. > > We hope that in this way, we can rebuild our states with almost the same > code in streaming. > > > -- > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > http://czchen.info/ > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B >