Hi,

Indeed it seems like this is not possible to emit records on 
checkpoint/snapshot through ProcessFunction. However you could do it via a 
custom Operator (there you have a constant access to output collector). Another 
workaround might be to register processing time service in your ProcessFunction.

@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) 
throws Exception {
   ctx.timerService().registerProcessingTimeTimer(...);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) 
throws Exception {
   // …
}

Piotrek

> On 11 Jun 2018, at 01:07, Steven Wu <stevenz...@gmail.com> wrote:
> 
> I have a process function defined with these interfaces
> 
> public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT> 
>     implements CheckpointedFunction, ProcessingTimeCallback {...}
> 
> In snapshotState() method, I want to close files and emit the metadata about 
> the closed files to downstream operator. it doesn't seem possible with 
> snapshotState(FunctionSnapshotContext context) interface.
> 
> I can keep metadata in snapshot and restore them during recovery. but if 
> there is no input record coming for a long time,  processElement(T value, 
> Context ctx, Collector<DataFile> out) won't be called. Then I can't forward 
> the restored data to downstream operator with guaranteed latency.
> 
> I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) 
> allows me to forward output to downstream operator either.
> 
> Thanks,
> Steven

Reply via email to