Hi Yu, Thanks a lot for your suggestions.
I have addressed your inlined comments in the FLIP and also added a new section "State backed access synchronization" that explains the way to make sure there is no concurrent access to the state backend. Please have a look. Best, Shuiqiang Yu Li <car...@gmail.com> 于2021年1月4日周一 下午4:15写道: > Thanks for driving the discussion Shuiqiang, and sorry for chiming in late. > > *bq. However, all the state access will be synchronized in the Java > operator and so there will be no concurrent access to the state backend.* > Could you add a section to explicitly mention this in the FLIP document? I > think single-threaded state access is an important prerequisite and it's > important for later contributors to know about this clearly, from both the > design doc and source codes. > > The other parts LGTM, added some minor inline comments in the FLIP, please > take a look. > > Thanks. > > Best Regards, > Yu > > > On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen <acqua....@gmail.com> wrote: > > > Hi wei, > > > > Big thanks for pointing out the mistakes! I have updated the FLIP > > according to your suggestions. > > > > Best, > > Shuiqiang > > > > > 在 2020年12月18日,下午2:37,Wei Zhong <weizhong0...@gmail.com> 写道: > > > > > > Hi Shuiqiang, > > > > > > Thanks for driving this. +1 for this feature, just a minor comment to > > the design doc. > > > > > > The interface of the `AppendingState` should be: > > > > > > class AppendingState(State, Generic[IN, OUT]): > > > > > > @abstractmethod > > > def get(self) -> OUT: > > > pass > > > > > > @abstractmethod > > > def add(self, value: IN) -> None: > > > pass > > > > > > The output type and the input type of the `AppendingState` maybe > > different. And the definition of the child classes should be: > > > > > > class MergingState(AppendingState[IN, OUT]): > > > pass > > > > > > > > > class ListState(MergingState[T, Iterable[T]]): > > > > > > @abstractmethod > > > def update(self, values: List[T]) -> None: > > > pass > > > > > > @abstractmethod > > > def add_all(self, values: List[T]) -> None: > > > pass > > > > > > def __iter__(self) -> Iterator[T]: > > > return iter(self.get()) > > > > > > Best, > > > Wei > > > > > >> 在 2020年12月17日,21:06,Shuiqiang Chen <acqua....@gmail.com> 写道: > > >> > > >> Hi Yun, > > >> > > >> Highly appreciate for your questions! I have the corresponding answers > > as bellow: > > >> > > >> Re 1: You are right that the state access occurs in an async thread. > > However, all the state access will be synchrouzed in the Java operator > and > > so there will be no concurrent access to the state backend. > > >> > > >> Re 2: I think it could be handled well in Python DataStream API. In > > this case, there will be two operators and so also two keyed state > backend. > > >> > > >> Re 3: Sure, you are right. We will store the current key which may be > > used by the timer. > > >> > > >> Re 4: Good point. State migration is still not covered in the current > > FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal > to > > this FLIP. I have updated the FLIP and added clear description for this. > > >> > > >> Re 5: Good point. We may need to introduce a Python querable state > > client if we want to support Queryable state for Python operators. I'd > like > > to cover it in a separate FLIP. I have updated the FLIP and add it as a > > future work. > > >> > > >> Best, > > >> Shuiqiang > > >> > > >>> 在 2020年12月17日,下午12:08,Yun Tang <myas...@live.com> 写道: > > >>> > > >>> Hi Shuiqiang, > > >>> > > >>> Thanks for driving this. I have several questions below: > > >>> > > >>> > > >>> 1. Thread safety of state write-access. As you might know, state > > access is not thread-safe [1] in Flink, we depend on task single thread > > access. Since you change the state access to another async thread, can we > > still ensure this? It also includes not allow user to access state in its > > java operator along with the bundled python operator. > > >>> 2. Number of keyed state backend per task. Flink would only have one > > keyed state-backend per operator and would only have one keyed state > > backend per operator chain (in the head operator if possible). However, > > once we use experimental features such as reinterpretAsKeyedStream [2], > we > > could have two keyed state-backend in one operator chain within one task. > > Can python datastream API could handle this well? > > >>> 3. Time to set current key. As we still need current key when > > registering timer [3], we need some place to hole the current key even > not > > registered in keyed state backend. > > >>> 4. State migration. Flink supports to migrate state automatically if > > new provided serializer is compatible with old serializer[4]. I'm afraid > if > > python data stream API wraps user's serializer as > > BytePrimitiveArraySerializer, we will lose such functionality. Moreover, > > RocksDB will migrate state automatically on java side [5] and this will > > break if python related bytes involved. > > >>> 5. Queryable state client. Currently, we only have java-based > > queryable state client [6], and we need another python-based queryable > > state client if involved python bytes. > > >>> > > >>> [1] https://issues.apache.org/jira/browse/FLINK-13072 > > >>> [2] > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream > > >>> [3] > > > https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203 > > >>> [4] > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema > > >>> [5] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend > > >>> [6] > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example > > >>> > > >>> Best > > >>> Yun Tang > > >>> > > >>> > > >>> ________________________________ > > >>> From: Shuiqiang Chen <acqua....@gmail.com> > > >>> Sent: Wednesday, December 16, 2020 17:32 > > >>> To: dev@flink.apache.org <dev@flink.apache.org> > > >>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python > > DataStream API > > >>> > > >>> Hi Xingbo, > > >>> > > >>> Thank you for your valuable suggestions. > > >>> > > >>> Indeed, we need to provide clearer abstractions for StateDescriptor > > and State APIs, I have updated the FLIP accordingly. Looking forward to > > your feedbacks! > > >>> > > >>> Best, > > >>> Shuiqiang > > >>> > > >>>> 在 2020年12月14日,上午11:27,Xingbo Huang <hxbks...@gmail.com> 写道: > > >>>> > > >>>> Thanks Shuiqiang for starting this discussion. > > >>>> > > >>>> Big +1 for this feature. State access support can further improve > the > > >>>> functionality of our existing Python DataStream. > > >>>> > > >>>> I have 2 comments regarding to the design doc: > > >>>> > > >>>> a) I think that `StateDescriptor` needs to hold the variable > > `typeInfo` > > >>>> instead of letting each implementation class hold `typeInfo` > > itself.For > > >>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but > > passes > > >>>> `ListTypeInfo(elem_type_info)` to the construct method of > > `StateDescriptor`. > > >>>> > > >>>> b) I think we need to add the `MergingState` and `AppendingState` > > >>>> interfaces, and then extract the `get` and `add` methods from > > `ListState`, > > >>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then > > let > > >>>> `ListState`, `AggregatingState` and `ReducingState` inherit > > `MergingState`. > > >>>> > > >>>> Best, > > >>>> Xingbo > > >>>> > > >>>> Shuiqiang Chen <acqua....@gmail.com> 于2020年12月11日周五 下午9:44写道: > > >>>> > > >>>>> Hi devs, > > >>>>> > > >>>>> In FLIP-130, we have already supported Python DataStream stateless > > APIs so > > >>>>> that users are able to perform some basic data transformations. To > > >>>>> implement more complex data processing, we need to provide state > > access > > >>>>> support. So I would propose to add state access APIs in Python > > DataStream > > >>>>> API to support stateful operations on a KeyedStream. More details > > are in > > >>>>> the FLIP wiki page [1]. > > >>>>> > > >>>>> Any feedback will be highly appreciated! > > >>>>> > > >>>>> [1] > > >>>>> > > >>>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API > > >>>>> > > >>>>> Best, > > >>>>> Shuiqiang > > >>>>> > > >>> > > >> > > > > > > > >