Hi Jin,

1) As far as I know the order is only guaranteed for events from the same
partition. If you want events across partitions to remain in order you may
need to use parallelism 1. I'll attach some links here which might be
useful:

https://stackoverflow.com/questions/50340107/order-of-events-with-chained-keyby-calls-on-same-key
https://stackoverflow.com/questions/44156774/ordering-of-records-in-a-keyed-stream-in-flink
https://stackoverflow.com/questions/50573174/flink-kafka-producer-elements-out-of-order

2) Indeed there doesn't seem to be a way to access the InternalTimerService
from a ProcessFunction at the moment. One approach could be to implement
this yourself using a MapState. Otherwise I think you need to implement
your own operator from which you can then access InternalTimerService
similar to how KeyedCoProcessOperator does it as well.


Regards
Ingo

On Wed, May 12, 2021 at 8:32 AM Jin Yi <j...@promoted.ai> wrote:

> hello.  thanks ahead of time for anyone who answers.
>
> 1.  verifying my understanding: for a kafka source that's partitioned on
> the same piece of data that is later used in a keyBy, if we are relying on
> the kafka timestamp as the event timestamp, is it guaranteed that the event
> stream of the source is in the kafka pipeline's insertion order for the
> topic?
>
> 2.  is there a way to use the InternalTimerService from within a
> ProcessFunction (specifically, a KeyedCoProcessFunction)?  i don't see an
> easy way to do this, except by changing the TimerService interface.  the
> use case for my need is that i'd like to have timers to clean up the left
> and right keyed state using namespaced timers like how IntervalJoin does it
> (
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L256).
> right now, b/c the KeyedCoProcessFunction only gives us the
> SimpleTimerService via the Context, i can only trigger onTimer execution
> without being able to refine the cleaning of state to just the event state
> of the side that a timer was originated from.  without this, it'll end up
> needing to visit state associated with both event streams which isn't
> performant as those streams can have different throughputs (and therefore,
> expect to have different retention characteristics/needs).
>
> thanks.
>

Reply via email to