Thanks for driving the discussion Shuiqiang, and sorry for chiming in late.
*bq. However, all the state access will be synchronized in the Java operator and so there will be no concurrent access to the state backend.* Could you add a section to explicitly mention this in the FLIP document? I think single-threaded state access is an important prerequisite and it's important for later contributors to know about this clearly, from both the design doc and source codes. The other parts LGTM, added some minor inline comments in the FLIP, please take a look. Thanks. Best Regards, Yu On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen <acqua....@gmail.com> wrote: > Hi wei, > > Big thanks for pointing out the mistakes! I have updated the FLIP > according to your suggestions. > > Best, > Shuiqiang > > > 在 2020年12月18日,下午2:37,Wei Zhong <weizhong0...@gmail.com> 写道: > > > > Hi Shuiqiang, > > > > Thanks for driving this. +1 for this feature, just a minor comment to > the design doc. > > > > The interface of the `AppendingState` should be: > > > > class AppendingState(State, Generic[IN, OUT]): > > > > @abstractmethod > > def get(self) -> OUT: > > pass > > > > @abstractmethod > > def add(self, value: IN) -> None: > > pass > > > > The output type and the input type of the `AppendingState` maybe > different. And the definition of the child classes should be: > > > > class MergingState(AppendingState[IN, OUT]): > > pass > > > > > > class ListState(MergingState[T, Iterable[T]]): > > > > @abstractmethod > > def update(self, values: List[T]) -> None: > > pass > > > > @abstractmethod > > def add_all(self, values: List[T]) -> None: > > pass > > > > def __iter__(self) -> Iterator[T]: > > return iter(self.get()) > > > > Best, > > Wei > > > >> 在 2020年12月17日,21:06,Shuiqiang Chen <acqua....@gmail.com> 写道: > >> > >> Hi Yun, > >> > >> Highly appreciate for your questions! I have the corresponding answers > as bellow: > >> > >> Re 1: You are right that the state access occurs in an async thread. > However, all the state access will be synchrouzed in the Java operator and > so there will be no concurrent access to the state backend. > >> > >> Re 2: I think it could be handled well in Python DataStream API. In > this case, there will be two operators and so also two keyed state backend. > >> > >> Re 3: Sure, you are right. We will store the current key which may be > used by the timer. > >> > >> Re 4: Good point. State migration is still not covered in the current > FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal to > this FLIP. I have updated the FLIP and added clear description for this. > >> > >> Re 5: Good point. We may need to introduce a Python querable state > client if we want to support Queryable state for Python operators. I'd like > to cover it in a separate FLIP. I have updated the FLIP and add it as a > future work. > >> > >> Best, > >> Shuiqiang > >> > >>> 在 2020年12月17日,下午12:08,Yun Tang <myas...@live.com> 写道: > >>> > >>> Hi Shuiqiang, > >>> > >>> Thanks for driving this. I have several questions below: > >>> > >>> > >>> 1. Thread safety of state write-access. As you might know, state > access is not thread-safe [1] in Flink, we depend on task single thread > access. Since you change the state access to another async thread, can we > still ensure this? It also includes not allow user to access state in its > java operator along with the bundled python operator. > >>> 2. Number of keyed state backend per task. Flink would only have one > keyed state-backend per operator and would only have one keyed state > backend per operator chain (in the head operator if possible). However, > once we use experimental features such as reinterpretAsKeyedStream [2], we > could have two keyed state-backend in one operator chain within one task. > Can python datastream API could handle this well? > >>> 3. Time to set current key. As we still need current key when > registering timer [3], we need some place to hole the current key even not > registered in keyed state backend. > >>> 4. State migration. Flink supports to migrate state automatically if > new provided serializer is compatible with old serializer[4]. I'm afraid if > python data stream API wraps user's serializer as > BytePrimitiveArraySerializer, we will lose such functionality. Moreover, > RocksDB will migrate state automatically on java side [5] and this will > break if python related bytes involved. > >>> 5. Queryable state client. Currently, we only have java-based > queryable state client [6], and we need another python-based queryable > state client if involved python bytes. > >>> > >>> [1] https://issues.apache.org/jira/browse/FLINK-13072 > >>> [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream > >>> [3] > https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203 > >>> [4] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema > >>> [5] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend > >>> [6] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example > >>> > >>> Best > >>> Yun Tang > >>> > >>> > >>> ________________________________ > >>> From: Shuiqiang Chen <acqua....@gmail.com> > >>> Sent: Wednesday, December 16, 2020 17:32 > >>> To: dev@flink.apache.org <dev@flink.apache.org> > >>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python > DataStream API > >>> > >>> Hi Xingbo, > >>> > >>> Thank you for your valuable suggestions. > >>> > >>> Indeed, we need to provide clearer abstractions for StateDescriptor > and State APIs, I have updated the FLIP accordingly. Looking forward to > your feedbacks! > >>> > >>> Best, > >>> Shuiqiang > >>> > >>>> 在 2020年12月14日,上午11:27,Xingbo Huang <hxbks...@gmail.com> 写道: > >>>> > >>>> Thanks Shuiqiang for starting this discussion. > >>>> > >>>> Big +1 for this feature. State access support can further improve the > >>>> functionality of our existing Python DataStream. > >>>> > >>>> I have 2 comments regarding to the design doc: > >>>> > >>>> a) I think that `StateDescriptor` needs to hold the variable > `typeInfo` > >>>> instead of letting each implementation class hold `typeInfo` > itself.For > >>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but > passes > >>>> `ListTypeInfo(elem_type_info)` to the construct method of > `StateDescriptor`. > >>>> > >>>> b) I think we need to add the `MergingState` and `AppendingState` > >>>> interfaces, and then extract the `get` and `add` methods from > `ListState`, > >>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then > let > >>>> `ListState`, `AggregatingState` and `ReducingState` inherit > `MergingState`. > >>>> > >>>> Best, > >>>> Xingbo > >>>> > >>>> Shuiqiang Chen <acqua....@gmail.com> 于2020年12月11日周五 下午9:44写道: > >>>> > >>>>> Hi devs, > >>>>> > >>>>> In FLIP-130, we have already supported Python DataStream stateless > APIs so > >>>>> that users are able to perform some basic data transformations. To > >>>>> implement more complex data processing, we need to provide state > access > >>>>> support. So I would propose to add state access APIs in Python > DataStream > >>>>> API to support stateful operations on a KeyedStream. More details > are in > >>>>> the FLIP wiki page [1]. > >>>>> > >>>>> Any feedback will be highly appreciated! > >>>>> > >>>>> [1] > >>>>> > >>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API > >>>>> > >>>>> Best, > >>>>> Shuiqiang > >>>>> > >>> > >> > > > >