Thanks Shuiqiang for driving this. The design looks good to me. +1 to start the vote if there are no more comments.
Regards, Dian > 在 2021年1月4日,下午7:40,Shuiqiang Chen <acqua....@gmail.com> 写道: > > Hi Yu, > > Thanks a lot for your suggestions. > > I have addressed your inlined comments in the FLIP and also added a new > section "State backed access synchronization" that explains the way to make > sure there is no concurrent access to the state backend. Please have a look. > > Best, > Shuiqiang > > > Yu Li <car...@gmail.com> 于2021年1月4日周一 下午4:15写道: > >> Thanks for driving the discussion Shuiqiang, and sorry for chiming in late. >> >> *bq. However, all the state access will be synchronized in the Java >> operator and so there will be no concurrent access to the state backend.* >> Could you add a section to explicitly mention this in the FLIP document? I >> think single-threaded state access is an important prerequisite and it's >> important for later contributors to know about this clearly, from both the >> design doc and source codes. >> >> The other parts LGTM, added some minor inline comments in the FLIP, please >> take a look. >> >> Thanks. >> >> Best Regards, >> Yu >> >> >> On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen <acqua....@gmail.com> wrote: >> >>> Hi wei, >>> >>> Big thanks for pointing out the mistakes! I have updated the FLIP >>> according to your suggestions. >>> >>> Best, >>> Shuiqiang >>> >>>> 在 2020年12月18日,下午2:37,Wei Zhong <weizhong0...@gmail.com> 写道: >>>> >>>> Hi Shuiqiang, >>>> >>>> Thanks for driving this. +1 for this feature, just a minor comment to >>> the design doc. >>>> >>>> The interface of the `AppendingState` should be: >>>> >>>> class AppendingState(State, Generic[IN, OUT]): >>>> >>>> @abstractmethod >>>> def get(self) -> OUT: >>>> pass >>>> >>>> @abstractmethod >>>> def add(self, value: IN) -> None: >>>> pass >>>> >>>> The output type and the input type of the `AppendingState` maybe >>> different. And the definition of the child classes should be: >>>> >>>> class MergingState(AppendingState[IN, OUT]): >>>> pass >>>> >>>> >>>> class ListState(MergingState[T, Iterable[T]]): >>>> >>>> @abstractmethod >>>> def update(self, values: List[T]) -> None: >>>> pass >>>> >>>> @abstractmethod >>>> def add_all(self, values: List[T]) -> None: >>>> pass >>>> >>>> def __iter__(self) -> Iterator[T]: >>>> return iter(self.get()) >>>> >>>> Best, >>>> Wei >>>> >>>>> 在 2020年12月17日,21:06,Shuiqiang Chen <acqua....@gmail.com> 写道: >>>>> >>>>> Hi Yun, >>>>> >>>>> Highly appreciate for your questions! I have the corresponding answers >>> as bellow: >>>>> >>>>> Re 1: You are right that the state access occurs in an async thread. >>> However, all the state access will be synchrouzed in the Java operator >> and >>> so there will be no concurrent access to the state backend. >>>>> >>>>> Re 2: I think it could be handled well in Python DataStream API. In >>> this case, there will be two operators and so also two keyed state >> backend. >>>>> >>>>> Re 3: Sure, you are right. We will store the current key which may be >>> used by the timer. >>>>> >>>>> Re 4: Good point. State migration is still not covered in the current >>> FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal >> to >>> this FLIP. I have updated the FLIP and added clear description for this. >>>>> >>>>> Re 5: Good point. We may need to introduce a Python querable state >>> client if we want to support Queryable state for Python operators. I'd >> like >>> to cover it in a separate FLIP. I have updated the FLIP and add it as a >>> future work. >>>>> >>>>> Best, >>>>> Shuiqiang >>>>> >>>>>> 在 2020年12月17日,下午12:08,Yun Tang <myas...@live.com> 写道: >>>>>> >>>>>> Hi Shuiqiang, >>>>>> >>>>>> Thanks for driving this. I have several questions below: >>>>>> >>>>>> >>>>>> 1. Thread safety of state write-access. As you might know, state >>> access is not thread-safe [1] in Flink, we depend on task single thread >>> access. Since you change the state access to another async thread, can we >>> still ensure this? It also includes not allow user to access state in its >>> java operator along with the bundled python operator. >>>>>> 2. Number of keyed state backend per task. Flink would only have one >>> keyed state-backend per operator and would only have one keyed state >>> backend per operator chain (in the head operator if possible). However, >>> once we use experimental features such as reinterpretAsKeyedStream [2], >> we >>> could have two keyed state-backend in one operator chain within one task. >>> Can python datastream API could handle this well? >>>>>> 3. Time to set current key. As we still need current key when >>> registering timer [3], we need some place to hole the current key even >> not >>> registered in keyed state backend. >>>>>> 4. State migration. Flink supports to migrate state automatically if >>> new provided serializer is compatible with old serializer[4]. I'm afraid >> if >>> python data stream API wraps user's serializer as >>> BytePrimitiveArraySerializer, we will lose such functionality. Moreover, >>> RocksDB will migrate state automatically on java side [5] and this will >>> break if python related bytes involved. >>>>>> 5. Queryable state client. Currently, we only have java-based >>> queryable state client [6], and we need another python-based queryable >>> state client if involved python bytes. >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-13072 >>>>>> [2] >>> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream >>>>>> [3] >>> >> https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203 >>>>>> [4] >>> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema >>>>>> [5] >>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend >>>>>> [6] >>> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example >>>>>> >>>>>> Best >>>>>> Yun Tang >>>>>> >>>>>> >>>>>> ________________________________ >>>>>> From: Shuiqiang Chen <acqua....@gmail.com> >>>>>> Sent: Wednesday, December 16, 2020 17:32 >>>>>> To: dev@flink.apache.org <dev@flink.apache.org> >>>>>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python >>> DataStream API >>>>>> >>>>>> Hi Xingbo, >>>>>> >>>>>> Thank you for your valuable suggestions. >>>>>> >>>>>> Indeed, we need to provide clearer abstractions for StateDescriptor >>> and State APIs, I have updated the FLIP accordingly. Looking forward to >>> your feedbacks! >>>>>> >>>>>> Best, >>>>>> Shuiqiang >>>>>> >>>>>>> 在 2020年12月14日,上午11:27,Xingbo Huang <hxbks...@gmail.com> 写道: >>>>>>> >>>>>>> Thanks Shuiqiang for starting this discussion. >>>>>>> >>>>>>> Big +1 for this feature. State access support can further improve >> the >>>>>>> functionality of our existing Python DataStream. >>>>>>> >>>>>>> I have 2 comments regarding to the design doc: >>>>>>> >>>>>>> a) I think that `StateDescriptor` needs to hold the variable >>> `typeInfo` >>>>>>> instead of letting each implementation class hold `typeInfo` >>> itself.For >>>>>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but >>> passes >>>>>>> `ListTypeInfo(elem_type_info)` to the construct method of >>> `StateDescriptor`. >>>>>>> >>>>>>> b) I think we need to add the `MergingState` and `AppendingState` >>>>>>> interfaces, and then extract the `get` and `add` methods from >>> `ListState`, >>>>>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then >>> let >>>>>>> `ListState`, `AggregatingState` and `ReducingState` inherit >>> `MergingState`. >>>>>>> >>>>>>> Best, >>>>>>> Xingbo >>>>>>> >>>>>>> Shuiqiang Chen <acqua....@gmail.com> 于2020年12月11日周五 下午9:44写道: >>>>>>> >>>>>>>> Hi devs, >>>>>>>> >>>>>>>> In FLIP-130, we have already supported Python DataStream stateless >>> APIs so >>>>>>>> that users are able to perform some basic data transformations. To >>>>>>>> implement more complex data processing, we need to provide state >>> access >>>>>>>> support. So I would propose to add state access APIs in Python >>> DataStream >>>>>>>> API to support stateful operations on a KeyedStream. More details >>> are in >>>>>>>> the FLIP wiki page [1]. >>>>>>>> >>>>>>>> Any feedback will be highly appreciated! >>>>>>>> >>>>>>>> [1] >>>>>>>> >>>>>>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API >>>>>>>> >>>>>>>> Best, >>>>>>>> Shuiqiang >>>>>>>> >>>>>> >>>>> >>>> >>> >>> >>