Hi Ken,

It is true that there is not reason for not having access to the timerService 
from the processElement of 
the keyed side. On the other side (the non-keyed side) you cannot set timers 
because timers are bound 
to a specific key. 

Now, if one stream is broadcasted and the other is keyed, then FLINK-1.5 also 
has BroadcastState which 
does exactly what you are describing. 

Unfortunately the documentation is being prepared but I will open a Pull 
Request today and I can send 
you the link so that you can have a look.

Kostas

> On Apr 26, 2018, at 2:37 AM, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> Hi devs,
> 
> I’m using Flink 1.5-SNAPSHOT, and I’ve got a connected stream that I’m using 
> with a CoProcessFunction.
> 
> One of the streams is keyed, and the other is broadcast.
> 
> As per the documentation 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html>),
>  I tried to set a timer, but that fails with:
> 
> java.lang.UnsupportedOperationException: Setting timers is only supported on 
> a keyed streams.
>       at 
> org.apache.flink.streaming.api.operators.co.CoProcessOperator$ContextImpl.registerProcessingTimeTimer(CoProcessOperator.java:123)
> 
> CoProcessOperator.java has:
> 
>               @Override
>               public void registerProcessingTimeTimer(long time) {
>                       throw new UnsupportedOperationException("Setting timers 
> is only supported on a keyed streams.");
>               }
> 
>               @Override
>               public void registerEventTimeTimer(long time) {
>                       throw new UnsupportedOperationException("Setting timers 
> is only supported on a keyed streams.");
>               }
> 
> So it seems like the documentation is wrong, and you currently can’t use 
> timers with CoProcessFunction.
> 
> If that’s true, I’m curious why. Is it just an implementation detail, or is 
> there a fundamental architectural problem?
> 
> I can see some challenges with needing two onTimerX() methods, and thus 
> different timer services for each method, etc.
> 
> Thanks,
> 
> — Ken
> 
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
> 

Reply via email to