David, thanks for the feedback, much appreciated! I'm hoping you can explain a bit more about how
Iterable<TimestampedValue<T>> readRange(long minTimestamp, long limitTimestamp); would be used (and perhaps, implemented) in practice. I worry that this might either prevent certain optimizations and/or be rather more complex to optimize well. I've been studying the implementation of temporal join that we did using our proposed interface [1] to see what it might look if we used readRange instead. For the most part, readRange seems like a good fit. However, I'm wondering about the code which currently reads private transient MapState<Long, RowData> rightState; ... if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) { ... } Our POC implementation changes this to private transient TemporalValueState<RowData> tRightState; ... if (lastUnprocessedTime < Long.MAX_VALUE || tRightState.valueAtOrAfter(Long.MIN_VALUE) != null) { ... } It seems to me that at least with the heap-based state backend, readRange is going to have to do a lot of unnecessary work to implement this isEmpty() operation, since it have will to consider the entire range from MIN_VALUE to MAX_VALUE. (Maybe we should add an explicit isEmpty method? I'm not convinced we need it, but it would be cheaper to implement. Or perhaps this join can be rewritten to not need this operation; I haven't thought enough about that alternative.) As for why we included TemporalValueState, we've found it useful on multiple occasions. It's the right abstraction for storing the right-hand side for temporal joins (the example in [2] is much easier to follow than [1]), and we also found it useful for storing window metadata when we implemented windowing. [1] https://github.com/dataArtisans/flink/blob/temporal-state/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java [2] https://github.com/dataArtisans/flink/blob/temporal-state/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/temporal/TemporalJoin.java Regards, David On Tue, Apr 12, 2022 at 10:01 AM David Morávek <d...@apache.org> wrote: > Hi David, > > I really like the proposal. This has so much potential for various > optimizations, especially for temporal joins. My only concern is that the > interfaces seems unnecessarily complicated. > > My feeling would be that we only need a single, simple interface that would > fit it all (the same way as it's already present in Apache Beam): > > @Experimental > public interface TemporalListState<T> > extends MergingState<TimestampedValue<T>, > Iterable<TimestampedValue<T>>> { > > /** > * Read a timestamp-limited subrange of the list. The result is ordered > by timestamp. > * > * <p>All values with timestamps >= minTimestamp and < limitTimestamp > will be in the resuling > * iterable. This means that only timestamps strictly less than > * Instant.ofEpochMilli(Long.MAX_VALUE) can be used as timestamps. > */ > Iterable<TimestampedValue<T>> readRange(long minTimestamp, long > limitTimestamp); > > /** > * Clear a timestamp-limited subrange of the list. > * > * <p>All values with timestamps >= minTimestamp and < limitTimestamp > will be removed from the > * list. > */ > void clearRange(long minTimestamp, long limitTimestamp); > } > > Is there anything missing here? Why do we need a temporal value state at > all? In my understanding it's still basically a "temporal list state", just > with a slightly different API. This is indeed necessary with the "temporal > list state" API you've proposed, would it make sense to try unifying the > two? I really think that the Beam community already did a good job on > designing this API. > > Adding one state primitive is already a big change, so if we can keep it > minimal it would be great. > > One more point on the proposed API, being able to clear only a single > "timestamped value" at the time might be limiting for some use cases > (performance wise, because we can't optimize it as we are with the range > delete). > > Best, > D. > > On Tue, Apr 12, 2022 at 9:32 AM Jingsong Li <jingsongl...@gmail.com> > wrote: > > > Hi David, > > > > Thanks for driving. > > > > I understand that state storage itself supports byte ordering, have we > > considered exposing Binary**State? This way the upper layers can be > > implemented on demand, Temporal is just one of them. > > > > Best, > > Jingsong > > > > On Tue, Apr 12, 2022 at 3:01 PM Aitozi <gjying1...@gmail.com> wrote: > > > > > > Hi David > > > I have look through the doc, I think it will be a good improvement > > to > > > this pattern usage, I'm interested in it. Do you have some POC work to > > > share for a closer look. > > > Besides, I have one question that can we support expose the namespace > in > > > the different state type not limited to `TemporalState`. By this, user > > can > > > specify the namespace > > > and the TemporalState is one of the special case that it use timestamp > as > > > the namespace. I think it will be more extendable. > > > What do you think about this ? > > > > > > Best, > > > Aitozi. > > > > > > David Anderson <dander...@apache.org> 于2022年4月11日周一 20:54写道: > > > > > > > Greetings, Flink developers. > > > > > > > > I would like to open up a discussion of a proposal [1] to add a new > > kind of > > > > state to Flink. > > > > > > > > The goal here is to optimize a fairly common pattern, which is using > > > > > > > > MapState<Long, List<Event>> > > > > > > > > to store lists of events associated with timestamps. This pattern is > > used > > > > internally in quite a few operators that implement sorting and joins, > > and > > > > it also shows up in user code, for example, when implementing custom > > > > windowing in a KeyedProcessFunction. > > > > > > > > Nico Kruber, Seth Wiesman, and I have implemented a POC that > achieves a > > > > more than 2x improvement in throughput when performing these > > operations on > > > > RocksDB by better leveraging the capabilities of the RocksDB state > > backend. > > > > > > > > See FLIP-220 [1] for details. > > > > > > > > Best, > > > > David > > > > > > > > [1] https://cwiki.apache.org/confluence/x/Xo_FD > > > > > > >