Hi, > Can you elaborate that a little bit? are you referring to > "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class?
Yes. However I have never tried it, so I’m not 100% sure there are no pit falls with that. Regarding processing time timers. You should be able to register the timer once and then re-register in `onTimer(…)` callback using `ctx.timerService()`. Piotrek > On 11 Jun 2018, at 18:59, Steven Wu <[email protected]> wrote: > > > @Override > public void processElement(Integer value, Context ctx, Collector<Integer> > out) throws Exception { > ctx.timerService().registerProcessingTimeTimer(...); > } > > @Override > public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> > out) throws Exception { > // … > } > > correcting myself regarding the above timer proposal. it still requires a > message/record come in. I am trying to guard against when there is a long gap > of idle. then I won't be able to register a timer. > > > On Mon, Jun 11, 2018 at 9:22 AM, Steven Wu <[email protected] > <mailto:[email protected]>> wrote: > Pirotr, > > > However you could do it via a custom Operator (there you have a constant > > access to output collector). > > Can you elaborate that a little bit? are you referring to > "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class? > > > register processing time service in your ProcessFunction. > > I think your timer proposal can work. > > I was originally register timer like this. ProcessingTimeCallback interface > doesn't supply the Collector parameter > > ((StreamingRuntimeContext) getRuntimeContext()) > .getProcessingTimeService() > .registerTimer(..., this); > > Thanks, > Steven > > > > On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski <[email protected] > <mailto:[email protected]>> wrote: > Hi, > > Indeed it seems like this is not possible to emit records on > checkpoint/snapshot through ProcessFunction. However you could do it via a > custom Operator (there you have a constant access to output collector). > Another workaround might be to register processing time service in your > ProcessFunction. > > @Override > public void processElement(Integer value, Context ctx, Collector<Integer> > out) throws Exception { > ctx.timerService().registerProcessingTimeTimer(...); > } > > @Override > public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> > out) throws Exception { > // … > } > > Piotrek > >> On 11 Jun 2018, at 01:07, Steven Wu <[email protected] >> <mailto:[email protected]>> wrote: >> >> I have a process function defined with these interfaces >> >> public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT> >> implements CheckpointedFunction, ProcessingTimeCallback {...} >> >> In snapshotState() method, I want to close files and emit the metadata about >> the closed files to downstream operator. it doesn't seem possible with >> snapshotState(FunctionSnapshotContext context) interface. >> >> I can keep metadata in snapshot and restore them during recovery. but if >> there is no input record coming for a long time, processElement(T value, >> Context ctx, Collector<DataFile> out) won't be called. Then I can't forward >> the restored data to downstream operator with guaranteed latency. >> >> I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) >> allows me to forward output to downstream operator either. >> >> Thanks, >> Steven > > >
