Hi Jose,

You are right about using {{applyToKeyedState}} to register timers under
{{KeyedBroadcastProcessFunction}}. I think the author overlooked this, and
we could update the document.

As for specifying a smaller keyset to apply function, I think this is a
possible direction of optimization. This needs more consideration and
discussion in the community.


Best,
Zakelly

On Thu, Aug 29, 2024 at 11:55 AM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Jose,
>
> You are right about using {{applyToKeyedState}} to register timers under
> {{KeyedBroadcastProcessFunction}}. I think the author overlooked this, and
> we could update the document.
>
> As for specifying a smaller keyset to apply function, I think this is a
> possible direction of optimization. This needs more consideration and
> discussion in the community.
>
>
> Best,
> Zakelly
>
> On Thu, Aug 29, 2024 at 4:27 AM Jose Vargas Badilla via user <
> user@flink.apache.org> wrote:
>
>> On a slight tangent, it seems to me that the DataStream v2 API only
>> exposes the ability to apply a function to all partitions, not a subset,
>> which the current applyToKeyedState does allow. Is that accurate? If so,
>> adding that ability to the v2 API would be helpful. For example, there can
>> be a process function with multiple keyed states whose keysets differ by
>> several orders of magnitude. In such cases, a user may want to apply a
>> function on just the smaller keyset.
>>
>> Thanks,
>> Jose
>>
>> On Wed, Aug 28, 2024 at 10:47 AM Jose Vargas Badilla <
>> jose.var...@datadoghq.com> wrote:
>>
>>> Hi,
>>>
>>> I recently learned that timers can be set in the KeyedStateFunction that
>>> is passed to KeyedBroadcastProcessFunction.Context#applyToKeyedState. The
>>> "trick" is to store a reference to the timerService that is available in
>>> processElement.
>>>
>>> This is behavior I have not seen explicitly documented before. The
>>> closest I could find is
>>>
>>> Context in the processBroadcastElement() method contains the method 
>>> applyToKeyedState(StateDescriptor<S,
>>>> VS> stateDescriptor, KeyedStateFunction<KS, S> function). This allows
>>>> to register a KeyedStateFunction to be applied to all states of all
>>>> keys associated with the provided stateDescriptor.
>>>
>>>
>>> from
>>> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/broadcast_state/#broadcastprocessfunction-and-keyedbroadcastprocessfunction
>>>
>>>
>>> A developer would need to know that timers are implied by *all states*.
>>> However, the section immediately after that says
>>>
>>> Registering timers is only possible at processElement() of the
>>>> KeyedBroadcastProcessFunction and only there. It is not possible in
>>>> the processBroadcastElement() method, as there is no key associated to
>>>> the broadcasted elements.
>>>
>>>
>>> which made me think that setting timers *anywhere* in
>>> processBroadcastElement, including in the user supplied KeyedStateFunction,
>>> is not possible, even though it is.
>>>
>>>
>>> *Should the documentation be updated to mention that timers can be set
>>> from applyToKeyedState?*
>>> I did notice that the new DataStream v2 API makes this behavior much
>>> clearer via the new ApplyPartitionFunction#apply(Collector<OUT> collector,
>>> PartitionedContext ctx) method, which is great!
>>>
>>> Below is an example of how to register timers in Flink 1.17.
>>>
>>> import lombok.NoArgsConstructor;
>>> import lombok.Setter;
>>> import org.apache.flink.api.common.state.State;
>>> import org.apache.flink.api.common.state.ValueState;
>>> import org.apache.flink.api.common.state.ValueStateDescriptor;
>>> import org.apache.flink.api.common.typeinfo.Types;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.runtime.state.KeyedStateFunction;
>>> import org.apache.flink.streaming.api.TimerService;
>>> import 
>>> org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
>>> import org.apache.flink.util.Collector;
>>>
>>> /**
>>>  * Function that shows that timers can be set in processBroadcastElement 
>>> via ctx.applyToKeyedState.
>>>  */
>>> public class ApplyToKeyedStateTimerExample
>>>     extends KeyedBroadcastProcessFunction<String, String, Long, String> {
>>>   private static final long serialVersionUID = -8682056773621520927L;
>>>   private ValueState<String> state;
>>>   private CachedTimerService cachedTimerService = new CachedTimerService();
>>>
>>>   @Override
>>>   public void processElement(String value, ReadOnlyContext ctx, 
>>> Collector<String> out)
>>>       throws Exception {
>>>     state.update(value);
>>>     cachedTimerService.setTimerService(ctx.timerService());
>>>   }
>>>
>>>   @Override
>>>   public void processBroadcastElement(Long value, Context ctx, 
>>> Collector<String> out)
>>>       throws Exception {
>>>     ctx.applyToKeyedState(
>>>         new ValueStateDescriptor<>("state", Types.STRING), 
>>> cachedTimerService.registerTimer(value));
>>>   }
>>>
>>>   @Override
>>>   public void onTimer(final long timestamp, final OnTimerContext ctx, final 
>>> Collector<String> out)
>>>       throws Exception {
>>>     var value = state.value();
>>>     if (value != null) {
>>>       out.collect(value);
>>>       state.clear();
>>>     }
>>>   }
>>>
>>>   @Override
>>>   public void open(Configuration parameters) throws Exception {
>>>     state = getRuntimeContext().getState(new 
>>> ValueStateDescriptor<>("state", Types.STRING));
>>>   }
>>>
>>>   @NoArgsConstructor
>>>   private static final class CachedTimerService {
>>>     @Setter private TimerService timerService;
>>>
>>>     public <K, S extends State> KeyedStateFunction<K, S> registerTimer(long 
>>> timestamp) {
>>>       return (key, state) -> {
>>>         if (timerService != null) {
>>>           timerService.registerEventTimeTimer(timestamp);
>>>         }
>>>       };
>>>     }
>>>   }
>>> }
>>>
>>> Here is a unit test for the above function.
>>>
>>> import java.util.stream.Collectors;
>>>
>>> import org.apache.flink.api.common.state.MapStateDescriptor;
>>> import org.apache.flink.api.common.typeinfo.Types;
>>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>>> import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
>>> import org.junit.jupiter.api.Assertions;
>>> import org.junit.jupiter.api.Test;
>>>
>>> public class ApplyToKeyedStateTimerExampleTest {
>>>   @Test
>>>   public void testTimersFromBroadcast() throws Exception {
>>>     var harness =
>>>         ProcessFunctionTestHarnesses.forKeyedBroadcastProcessFunction(
>>>             new ApplyToKeyedStateTimerExample(),
>>>             x -> x,
>>>             Types.STRING,
>>>             new MapStateDescriptor<>("foo", Types.STRING, Types.LONG));
>>>     harness.open();
>>>     harness.processElement("foo", 1);
>>>     harness.processElement("bar", 1);
>>>     harness.processBroadcastElement(2L, 2);
>>>     harness.watermark(2);
>>>     var output =
>>>         harness.getOutput().stream()
>>>             .filter(x -> x instanceof StreamRecord) // Filter out the 
>>> watermark StreamElement
>>>             .collect(Collectors.toList());
>>>     Assertions.assertEquals(2, output.size());
>>>   }
>>> }
>>>
>>>
>>> Thanks,
>>> Jose
>>>
>>

Reply via email to