Pirotr, > However you could do it via a custom Operator (there you have a constant access to output collector).
Can you elaborate that a little bit? are you referring to "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class? > register processing time service in your ProcessFunction. I think your timer proposal can work. I was originally register timer like this. ProcessingTimeCallback interface doesn't supply the Collector parameter ((StreamingRuntimeContext) getRuntimeContext()) .getProcessingTimeService() .registerTimer(..., this); Thanks, Steven On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > Indeed it seems like this is not possible to emit records on > checkpoint/snapshot through ProcessFunction. However you could do it via a > custom Operator (there you have a constant access to output collector). > Another workaround might be to register processing time service in your > ProcessFunction. > > @Override > public void processElement(Integer value, Context ctx, Collector<Integer> > out) throws Exception { > ctx.timerService().registerProcessingTimeTimer(...); > } > > @Override > public void onTimer(long timestamp, OnTimerContext ctx, Collector< > Integer> out) throws Exception { > // … > } > > Piotrek > > On 11 Jun 2018, at 01:07, Steven Wu <stevenz...@gmail.com> wrote: > > I have a process function defined with these interfaces > > public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT> > implements CheckpointedFunction, ProcessingTimeCallback {...} > > In snapshotState() method, I want to close files and emit the metadata > about the closed files to downstream operator. it doesn't seem possible > with *snapshotState(FunctionSnapshotContext context*) interface. > > I can keep metadata in snapshot and restore them during recovery. but if > there is no input record coming for a long time, * processElement(T > value, Context ctx, Collector<DataFile> out)* won't be called. Then I > can't forward the restored data to downstream operator with guaranteed > latency. > > I can add a timer. but it doesn't seem that *onProcessingTime(long > timestamp)* allows me to forward output to downstream operator either. > > Thanks, > Steven > > >