Hi,
Indeed it seems like this is not possible to emit records on
checkpoint/snapshot through ProcessFunction. However you could do it via a
custom Operator (there you have a constant access to output collector). Another
workaround might be to register processing time service in your ProcessFunction.
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out)
throws Exception {
ctx.timerService().registerProcessingTimeTimer(...);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out)
throws Exception {
// …
}
Piotrek
> On 11 Jun 2018, at 01:07, Steven Wu <[email protected]> wrote:
>
> I have a process function defined with these interfaces
>
> public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT>
> implements CheckpointedFunction, ProcessingTimeCallback {...}
>
> In snapshotState() method, I want to close files and emit the metadata about
> the closed files to downstream operator. it doesn't seem possible with
> snapshotState(FunctionSnapshotContext context) interface.
>
> I can keep metadata in snapshot and restore them during recovery. but if
> there is no input record coming for a long time, processElement(T value,
> Context ctx, Collector<DataFile> out) won't be called. Then I can't forward
> the restored data to downstream operator with guaranteed latency.
>
> I can add a timer. but it doesn't seem that onProcessingTime(long timestamp)
> allows me to forward output to downstream operator either.
>
> Thanks,
> Steven