Pirotr,

> However you could do it via a custom Operator (there you have a constant
access to output collector).

Can you elaborate that a little bit? are you referring to
"Output<StreamRecord<OUT>> output" in AbstractStreamOperator class?

> register processing time service in your ProcessFunction.

I think your timer proposal can work.

I was originally register timer like this. ProcessingTimeCallback interface
doesn't supply the Collector parameter

((StreamingRuntimeContext) getRuntimeContext())
    .getProcessingTimeService()
    .registerTimer(..., this);

Thanks,
Steven



On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Indeed it seems like this is not possible to emit records on
> checkpoint/snapshot through ProcessFunction. However you could do it via a
> custom Operator (there you have a constant access to output collector).
> Another workaround might be to register processing time service in your
> ProcessFunction.
>
> @Override
> public void processElement(Integer value, Context ctx, Collector<Integer>
> out) throws Exception {
>    ctx.timerService().registerProcessingTimeTimer(...);
> }
>
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector<
> Integer> out) throws Exception {
>    // …
> }
>
> Piotrek
>
> On 11 Jun 2018, at 01:07, Steven Wu <stevenz...@gmail.com> wrote:
>
> I have a process function defined with these interfaces
>
> public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT>
>     implements CheckpointedFunction, ProcessingTimeCallback {...}
>
> In snapshotState() method, I want to close files and emit the metadata
> about the closed files to downstream operator. it doesn't seem possible
> with *snapshotState(FunctionSnapshotContext context*) interface.
>
> I can keep metadata in snapshot and restore them during recovery. but if
> there is no input record coming for a long time, * processElement(T
> value, Context ctx, Collector<DataFile> out)* won't be called. Then I
> can't forward the restored data to downstream operator with guaranteed
> latency.
>
> I can add a timer. but it doesn't seem that *onProcessingTime(long
> timestamp)* allows me to forward output to downstream operator either.
>
> Thanks,
> Steven
>
>
>

Reply via email to