Hi Reuven,

this would be a great addition to the Flink Runner and could help with
broader adoption ;)

to make an effective implementation that works well across different state
backends, this will most likely require adding a new primitive state type
to the Flink's state backend ecosystem. I'll do some analysis to see what's
necessary and will get back to you until end of the week. I can also
shepherd the effort on the Flink side.

I think it's a good idea to start with the naive implementation anyway as
this could be potentially only supported by not-yet released Flink versions.

Feel free to assign me as a reviewer for this on the Beam side, I'm still
familiar with the Flink runner code base.

Best,
D.



On Tue, Nov 16, 2021 at 5:29 AM Reuven Lax <re...@google.com> wrote:

> Not hearing any answers - I'll add a naive implementation to our Flink
> runner then.
>
> Who currently is the best reviewer for changes to the Flink runner?
>
> On Mon, Nov 15, 2021 at 1:49 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Reuven,
>>
>> cc dev@flink.apache.org.
>>
>>  Jan
>> On 11/12/21 04:35, Reuven Lax wrote:
>>
>> OrderedListState
>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java>
>>  was
>> added to Beam over a year ago. To date it is only supported by the Dataflow
>> runner and the DirectRunner. I want to see if it's possible to support this
>> well on the Flink runner (and eventually on other runners as well).
>>
>> This is a state type that is indexed by an int64 sorting key (the Beam
>> API exposes this as a 64-bit timestamp, as that's the most-common use case,
>> but fundamentally it's just an integer). Users can insert element, fetch
>> ranges of elements, and delete ranges of elements.
>>
>> Is there any way to implement this on Flink? I could of course add a
>> naive implementation - store everything in a ListState and have the Flink
>> runner sort the list every time it is fetched. This seems quite
>> inefficient, and the range deletes will be even less efficient.
>> https://issues.apache.org/jira/browse/FLINK-6219 seems to imply that
>> Flink has considered sorted state primitives, but I don't see any activity
>> on this issue.
>>
>> Is there any way to do this, or should I just add the naive
>> implementation?
>>
>> Reuven
>>
>>

Reply via email to