Hi, Indeed it seems like this is not possible to emit records on checkpoint/snapshot through ProcessFunction. However you could do it via a custom Operator (there you have a constant access to output collector). Another workaround might be to register processing time service in your ProcessFunction.
@Override public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception { ctx.timerService().registerProcessingTimeTimer(...); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception { // … } Piotrek > On 11 Jun 2018, at 01:07, Steven Wu <stevenz...@gmail.com> wrote: > > I have a process function defined with these interfaces > > public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT> > implements CheckpointedFunction, ProcessingTimeCallback {...} > > In snapshotState() method, I want to close files and emit the metadata about > the closed files to downstream operator. it doesn't seem possible with > snapshotState(FunctionSnapshotContext context) interface. > > I can keep metadata in snapshot and restore them during recovery. but if > there is no input record coming for a long time, processElement(T value, > Context ctx, Collector<DataFile> out) won't be called. Then I can't forward > the restored data to downstream operator with guaranteed latency. > > I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) > allows me to forward output to downstream operator either. > > Thanks, > Steven