Hi Jose, You are right about using {{applyToKeyedState}} to register timers under {{KeyedBroadcastProcessFunction}}. I think the author overlooked this, and we could update the document.
As for specifying a smaller keyset to apply function, I think this is a possible direction of optimization. This needs more consideration and discussion in the community. Best, Zakelly On Thu, Aug 29, 2024 at 11:55 AM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Jose, > > You are right about using {{applyToKeyedState}} to register timers under > {{KeyedBroadcastProcessFunction}}. I think the author overlooked this, and > we could update the document. > > As for specifying a smaller keyset to apply function, I think this is a > possible direction of optimization. This needs more consideration and > discussion in the community. > > > Best, > Zakelly > > On Thu, Aug 29, 2024 at 4:27 AM Jose Vargas Badilla via user < > user@flink.apache.org> wrote: > >> On a slight tangent, it seems to me that the DataStream v2 API only >> exposes the ability to apply a function to all partitions, not a subset, >> which the current applyToKeyedState does allow. Is that accurate? If so, >> adding that ability to the v2 API would be helpful. For example, there can >> be a process function with multiple keyed states whose keysets differ by >> several orders of magnitude. In such cases, a user may want to apply a >> function on just the smaller keyset. >> >> Thanks, >> Jose >> >> On Wed, Aug 28, 2024 at 10:47 AM Jose Vargas Badilla < >> jose.var...@datadoghq.com> wrote: >> >>> Hi, >>> >>> I recently learned that timers can be set in the KeyedStateFunction that >>> is passed to KeyedBroadcastProcessFunction.Context#applyToKeyedState. The >>> "trick" is to store a reference to the timerService that is available in >>> processElement. >>> >>> This is behavior I have not seen explicitly documented before. The >>> closest I could find is >>> >>> Context in the processBroadcastElement() method contains the method >>> applyToKeyedState(StateDescriptor<S, >>>> VS> stateDescriptor, KeyedStateFunction<KS, S> function). This allows >>>> to register a KeyedStateFunction to be applied to all states of all >>>> keys associated with the provided stateDescriptor. >>> >>> >>> from >>> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/broadcast_state/#broadcastprocessfunction-and-keyedbroadcastprocessfunction >>> >>> >>> A developer would need to know that timers are implied by *all states*. >>> However, the section immediately after that says >>> >>> Registering timers is only possible at processElement() of the >>>> KeyedBroadcastProcessFunction and only there. It is not possible in >>>> the processBroadcastElement() method, as there is no key associated to >>>> the broadcasted elements. >>> >>> >>> which made me think that setting timers *anywhere* in >>> processBroadcastElement, including in the user supplied KeyedStateFunction, >>> is not possible, even though it is. >>> >>> >>> *Should the documentation be updated to mention that timers can be set >>> from applyToKeyedState?* >>> I did notice that the new DataStream v2 API makes this behavior much >>> clearer via the new ApplyPartitionFunction#apply(Collector<OUT> collector, >>> PartitionedContext ctx) method, which is great! >>> >>> Below is an example of how to register timers in Flink 1.17. >>> >>> import lombok.NoArgsConstructor; >>> import lombok.Setter; >>> import org.apache.flink.api.common.state.State; >>> import org.apache.flink.api.common.state.ValueState; >>> import org.apache.flink.api.common.state.ValueStateDescriptor; >>> import org.apache.flink.api.common.typeinfo.Types; >>> import org.apache.flink.configuration.Configuration; >>> import org.apache.flink.runtime.state.KeyedStateFunction; >>> import org.apache.flink.streaming.api.TimerService; >>> import >>> org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; >>> import org.apache.flink.util.Collector; >>> >>> /** >>> * Function that shows that timers can be set in processBroadcastElement >>> via ctx.applyToKeyedState. >>> */ >>> public class ApplyToKeyedStateTimerExample >>> extends KeyedBroadcastProcessFunction<String, String, Long, String> { >>> private static final long serialVersionUID = -8682056773621520927L; >>> private ValueState<String> state; >>> private CachedTimerService cachedTimerService = new CachedTimerService(); >>> >>> @Override >>> public void processElement(String value, ReadOnlyContext ctx, >>> Collector<String> out) >>> throws Exception { >>> state.update(value); >>> cachedTimerService.setTimerService(ctx.timerService()); >>> } >>> >>> @Override >>> public void processBroadcastElement(Long value, Context ctx, >>> Collector<String> out) >>> throws Exception { >>> ctx.applyToKeyedState( >>> new ValueStateDescriptor<>("state", Types.STRING), >>> cachedTimerService.registerTimer(value)); >>> } >>> >>> @Override >>> public void onTimer(final long timestamp, final OnTimerContext ctx, final >>> Collector<String> out) >>> throws Exception { >>> var value = state.value(); >>> if (value != null) { >>> out.collect(value); >>> state.clear(); >>> } >>> } >>> >>> @Override >>> public void open(Configuration parameters) throws Exception { >>> state = getRuntimeContext().getState(new >>> ValueStateDescriptor<>("state", Types.STRING)); >>> } >>> >>> @NoArgsConstructor >>> private static final class CachedTimerService { >>> @Setter private TimerService timerService; >>> >>> public <K, S extends State> KeyedStateFunction<K, S> registerTimer(long >>> timestamp) { >>> return (key, state) -> { >>> if (timerService != null) { >>> timerService.registerEventTimeTimer(timestamp); >>> } >>> }; >>> } >>> } >>> } >>> >>> Here is a unit test for the above function. >>> >>> import java.util.stream.Collectors; >>> >>> import org.apache.flink.api.common.state.MapStateDescriptor; >>> import org.apache.flink.api.common.typeinfo.Types; >>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; >>> import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; >>> import org.junit.jupiter.api.Assertions; >>> import org.junit.jupiter.api.Test; >>> >>> public class ApplyToKeyedStateTimerExampleTest { >>> @Test >>> public void testTimersFromBroadcast() throws Exception { >>> var harness = >>> ProcessFunctionTestHarnesses.forKeyedBroadcastProcessFunction( >>> new ApplyToKeyedStateTimerExample(), >>> x -> x, >>> Types.STRING, >>> new MapStateDescriptor<>("foo", Types.STRING, Types.LONG)); >>> harness.open(); >>> harness.processElement("foo", 1); >>> harness.processElement("bar", 1); >>> harness.processBroadcastElement(2L, 2); >>> harness.watermark(2); >>> var output = >>> harness.getOutput().stream() >>> .filter(x -> x instanceof StreamRecord) // Filter out the >>> watermark StreamElement >>> .collect(Collectors.toList()); >>> Assertions.assertEquals(2, output.size()); >>> } >>> } >>> >>> >>> Thanks, >>> Jose >>> >>