hello. thanks ahead of time for anyone who answers. 1. verifying my understanding: for a kafka source that's partitioned on the same piece of data that is later used in a keyBy, if we are relying on the kafka timestamp as the event timestamp, is it guaranteed that the event stream of the source is in the kafka pipeline's insertion order for the topic?
2. is there a way to use the InternalTimerService from within a ProcessFunction (specifically, a KeyedCoProcessFunction)? i don't see an easy way to do this, except by changing the TimerService interface. the use case for my need is that i'd like to have timers to clean up the left and right keyed state using namespaced timers like how IntervalJoin does it ( https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L256). right now, b/c the KeyedCoProcessFunction only gives us the SimpleTimerService via the Context, i can only trigger onTimer execution without being able to refine the cleaning of state to just the event state of the side that a timer was originated from. without this, it'll end up needing to visit state associated with both event streams which isn't performant as those streams can have different throughputs (and therefore, expect to have different retention characteristics/needs). thanks.