Hi Reuven, this would be a great addition to the Flink Runner and could help with broader adoption ;)
to make an effective implementation that works well across different state backends, this will most likely require adding a new primitive state type to the Flink's state backend ecosystem. I'll do some analysis to see what's necessary and will get back to you until end of the week. I can also shepherd the effort on the Flink side. I think it's a good idea to start with the naive implementation anyway as this could be potentially only supported by not-yet released Flink versions. Feel free to assign me as a reviewer for this on the Beam side, I'm still familiar with the Flink runner code base. Best, D. On Tue, Nov 16, 2021 at 5:29 AM Reuven Lax <re...@google.com> wrote: > Not hearing any answers - I'll add a naive implementation to our Flink > runner then. > > Who currently is the best reviewer for changes to the Flink runner? > > On Mon, Nov 15, 2021 at 1:49 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Reuven, >> >> cc dev@flink.apache.org. >> >> Jan >> On 11/12/21 04:35, Reuven Lax wrote: >> >> OrderedListState >> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java> >> was >> added to Beam over a year ago. To date it is only supported by the Dataflow >> runner and the DirectRunner. I want to see if it's possible to support this >> well on the Flink runner (and eventually on other runners as well). >> >> This is a state type that is indexed by an int64 sorting key (the Beam >> API exposes this as a 64-bit timestamp, as that's the most-common use case, >> but fundamentally it's just an integer). Users can insert element, fetch >> ranges of elements, and delete ranges of elements. >> >> Is there any way to implement this on Flink? I could of course add a >> naive implementation - store everything in a ListState and have the Flink >> runner sort the list every time it is fetched. This seems quite >> inefficient, and the range deletes will be even less efficient. >> https://issues.apache.org/jira/browse/FLINK-6219 seems to imply that >> Flink has considered sorted state primitives, but I don't see any activity >> on this issue. >> >> Is there any way to do this, or should I just add the naive >> implementation? >> >> Reuven >> >>