On a slight tangent, it seems to me that the DataStream v2 API only exposes the ability to apply a function to all partitions, not a subset, which the current applyToKeyedState does allow. Is that accurate? If so, adding that ability to the v2 API would be helpful. For example, there can be a process function with multiple keyed states whose keysets differ by several orders of magnitude. In such cases, a user may want to apply a function on just the smaller keyset.
Thanks, Jose On Wed, Aug 28, 2024 at 10:47 AM Jose Vargas Badilla < jose.var...@datadoghq.com> wrote: > Hi, > > I recently learned that timers can be set in the KeyedStateFunction that > is passed to KeyedBroadcastProcessFunction.Context#applyToKeyedState. The > "trick" is to store a reference to the timerService that is available in > processElement. > > This is behavior I have not seen explicitly documented before. The closest > I could find is > > Context in the processBroadcastElement() method contains the method > applyToKeyedState(StateDescriptor<S, >> VS> stateDescriptor, KeyedStateFunction<KS, S> function). This allows to >> register a KeyedStateFunction to be applied to all states of all keys >> associated >> with the provided stateDescriptor. > > > from > https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/broadcast_state/#broadcastprocessfunction-and-keyedbroadcastprocessfunction > > > A developer would need to know that timers are implied by *all states*. > However, the section immediately after that says > > Registering timers is only possible at processElement() of the >> KeyedBroadcastProcessFunction and only there. It is not possible in the >> processBroadcastElement() method, as there is no key associated to the >> broadcasted elements. > > > which made me think that setting timers *anywhere* in > processBroadcastElement, including in the user supplied KeyedStateFunction, > is not possible, even though it is. > > > *Should the documentation be updated to mention that timers can be set > from applyToKeyedState?* > I did notice that the new DataStream v2 API makes this behavior much > clearer via the new ApplyPartitionFunction#apply(Collector<OUT> collector, > PartitionedContext ctx) method, which is great! > > Below is an example of how to register timers in Flink 1.17. > > import lombok.NoArgsConstructor; > import lombok.Setter; > import org.apache.flink.api.common.state.State; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.runtime.state.KeyedStateFunction; > import org.apache.flink.streaming.api.TimerService; > import > org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; > import org.apache.flink.util.Collector; > > /** > * Function that shows that timers can be set in processBroadcastElement via > ctx.applyToKeyedState. > */ > public class ApplyToKeyedStateTimerExample > extends KeyedBroadcastProcessFunction<String, String, Long, String> { > private static final long serialVersionUID = -8682056773621520927L; > private ValueState<String> state; > private CachedTimerService cachedTimerService = new CachedTimerService(); > > @Override > public void processElement(String value, ReadOnlyContext ctx, > Collector<String> out) > throws Exception { > state.update(value); > cachedTimerService.setTimerService(ctx.timerService()); > } > > @Override > public void processBroadcastElement(Long value, Context ctx, > Collector<String> out) > throws Exception { > ctx.applyToKeyedState( > new ValueStateDescriptor<>("state", Types.STRING), > cachedTimerService.registerTimer(value)); > } > > @Override > public void onTimer(final long timestamp, final OnTimerContext ctx, final > Collector<String> out) > throws Exception { > var value = state.value(); > if (value != null) { > out.collect(value); > state.clear(); > } > } > > @Override > public void open(Configuration parameters) throws Exception { > state = getRuntimeContext().getState(new ValueStateDescriptor<>("state", > Types.STRING)); > } > > @NoArgsConstructor > private static final class CachedTimerService { > @Setter private TimerService timerService; > > public <K, S extends State> KeyedStateFunction<K, S> registerTimer(long > timestamp) { > return (key, state) -> { > if (timerService != null) { > timerService.registerEventTimeTimer(timestamp); > } > }; > } > } > } > > Here is a unit test for the above function. > > import java.util.stream.Collectors; > > import org.apache.flink.api.common.state.MapStateDescriptor; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; > import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; > import org.junit.jupiter.api.Assertions; > import org.junit.jupiter.api.Test; > > public class ApplyToKeyedStateTimerExampleTest { > @Test > public void testTimersFromBroadcast() throws Exception { > var harness = > ProcessFunctionTestHarnesses.forKeyedBroadcastProcessFunction( > new ApplyToKeyedStateTimerExample(), > x -> x, > Types.STRING, > new MapStateDescriptor<>("foo", Types.STRING, Types.LONG)); > harness.open(); > harness.processElement("foo", 1); > harness.processElement("bar", 1); > harness.processBroadcastElement(2L, 2); > harness.watermark(2); > var output = > harness.getOutput().stream() > .filter(x -> x instanceof StreamRecord) // Filter out the > watermark StreamElement > .collect(Collectors.toList()); > Assertions.assertEquals(2, output.size()); > } > } > > > Thanks, > Jose >