Hi,

> Can you elaborate that a little bit? are you referring to 
> "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class?

Yes. However I have never tried it, so I’m not 100% sure there are no pit falls 
with that.

Regarding processing time timers. You should be able to register the timer once 
and then re-register in `onTimer(…)` callback using  `ctx.timerService()`.

Piotrek

> On 11 Jun 2018, at 18:59, Steven Wu <[email protected]> wrote:
> 
> 
> @Override
> public void processElement(Integer value, Context ctx, Collector<Integer> 
> out) throws Exception {
>    ctx.timerService().registerProcessingTimeTimer(...);
> }
> 
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> 
> out) throws Exception {
>    // …
> }
> 
> correcting myself regarding the above timer proposal. it still requires a 
> message/record come in. I am trying to guard against when there is a long gap 
> of idle. then I won't be able to register a timer.
> 
> 
> On Mon, Jun 11, 2018 at 9:22 AM, Steven Wu <[email protected] 
> <mailto:[email protected]>> wrote:
> Pirotr,
> 
> > However you could do it via a custom Operator (there you have a constant 
> > access to output collector). 
> 
> Can you elaborate that a little bit? are you referring to 
> "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class?
> 
> > register processing time service in your ProcessFunction.
> 
> I think your timer proposal can work. 
> 
> I was originally register timer like this. ProcessingTimeCallback interface 
> doesn't supply the Collector parameter
> 
> ((StreamingRuntimeContext) getRuntimeContext())
>     .getProcessingTimeService()
>     .registerTimer(..., this);
> 
> Thanks, 
> Steven
> 
> 
> 
> On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi,
> 
> Indeed it seems like this is not possible to emit records on 
> checkpoint/snapshot through ProcessFunction. However you could do it via a 
> custom Operator (there you have a constant access to output collector). 
> Another workaround might be to register processing time service in your 
> ProcessFunction.
> 
> @Override
> public void processElement(Integer value, Context ctx, Collector<Integer> 
> out) throws Exception {
>    ctx.timerService().registerProcessingTimeTimer(...);
> }
> 
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> 
> out) throws Exception {
>    // …
> }
> 
> Piotrek
> 
>> On 11 Jun 2018, at 01:07, Steven Wu <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> I have a process function defined with these interfaces
>> 
>> public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT> 
>>     implements CheckpointedFunction, ProcessingTimeCallback {...}
>> 
>> In snapshotState() method, I want to close files and emit the metadata about 
>> the closed files to downstream operator. it doesn't seem possible with 
>> snapshotState(FunctionSnapshotContext context) interface.
>> 
>> I can keep metadata in snapshot and restore them during recovery. but if 
>> there is no input record coming for a long time,  processElement(T value, 
>> Context ctx, Collector<DataFile> out) won't be called. Then I can't forward 
>> the restored data to downstream operator with guaranteed latency.
>> 
>> I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) 
>> allows me to forward output to downstream operator either.
>> 
>> Thanks,
>> Steven
> 
> 
> 

Reply via email to