Hi wei, Big thanks for pointing out the mistakes! I have updated the FLIP according to your suggestions.
Best, Shuiqiang > 在 2020年12月18日,下午2:37,Wei Zhong <weizhong0...@gmail.com> 写道: > > Hi Shuiqiang, > > Thanks for driving this. +1 for this feature, just a minor comment to the > design doc. > > The interface of the `AppendingState` should be: > > class AppendingState(State, Generic[IN, OUT]): > > @abstractmethod > def get(self) -> OUT: > pass > > @abstractmethod > def add(self, value: IN) -> None: > pass > > The output type and the input type of the `AppendingState` maybe different. > And the definition of the child classes should be: > > class MergingState(AppendingState[IN, OUT]): > pass > > > class ListState(MergingState[T, Iterable[T]]): > > @abstractmethod > def update(self, values: List[T]) -> None: > pass > > @abstractmethod > def add_all(self, values: List[T]) -> None: > pass > > def __iter__(self) -> Iterator[T]: > return iter(self.get()) > > Best, > Wei > >> 在 2020年12月17日,21:06,Shuiqiang Chen <acqua....@gmail.com> 写道: >> >> Hi Yun, >> >> Highly appreciate for your questions! I have the corresponding answers as >> bellow: >> >> Re 1: You are right that the state access occurs in an async thread. >> However, all the state access will be synchrouzed in the Java operator and >> so there will be no concurrent access to the state backend. >> >> Re 2: I think it could be handled well in Python DataStream API. In this >> case, there will be two operators and so also two keyed state backend. >> >> Re 3: Sure, you are right. We will store the current key which may be used >> by the timer. >> >> Re 4: Good point. State migration is still not covered in the current FLIP. >> I'd like to cover it in a separate FLIP as it should be orthogonal to this >> FLIP. I have updated the FLIP and added clear description for this. >> >> Re 5: Good point. We may need to introduce a Python querable state client if >> we want to support Queryable state for Python operators. I'd like to cover >> it in a separate FLIP. I have updated the FLIP and add it as a future work. >> >> Best, >> Shuiqiang >> >>> 在 2020年12月17日,下午12:08,Yun Tang <myas...@live.com> 写道: >>> >>> Hi Shuiqiang, >>> >>> Thanks for driving this. I have several questions below: >>> >>> >>> 1. Thread safety of state write-access. As you might know, state access is >>> not thread-safe [1] in Flink, we depend on task single thread access. Since >>> you change the state access to another async thread, can we still ensure >>> this? It also includes not allow user to access state in its java operator >>> along with the bundled python operator. >>> 2. Number of keyed state backend per task. Flink would only have one keyed >>> state-backend per operator and would only have one keyed state backend per >>> operator chain (in the head operator if possible). However, once we use >>> experimental features such as reinterpretAsKeyedStream [2], we could have >>> two keyed state-backend in one operator chain within one task. Can python >>> datastream API could handle this well? >>> 3. Time to set current key. As we still need current key when registering >>> timer [3], we need some place to hole the current key even not registered >>> in keyed state backend. >>> 4. State migration. Flink supports to migrate state automatically if new >>> provided serializer is compatible with old serializer[4]. I'm afraid if >>> python data stream API wraps user's serializer as >>> BytePrimitiveArraySerializer, we will lose such functionality. Moreover, >>> RocksDB will migrate state automatically on java side [5] and this will >>> break if python related bytes involved. >>> 5. Queryable state client. Currently, we only have java-based queryable >>> state client [6], and we need another python-based queryable state client >>> if involved python bytes. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-13072 >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream >>> [3] >>> https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203 >>> [4] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema >>> [5] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend >>> [6] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example >>> >>> Best >>> Yun Tang >>> >>> >>> ________________________________ >>> From: Shuiqiang Chen <acqua....@gmail.com> >>> Sent: Wednesday, December 16, 2020 17:32 >>> To: dev@flink.apache.org <dev@flink.apache.org> >>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python DataStream >>> API >>> >>> Hi Xingbo, >>> >>> Thank you for your valuable suggestions. >>> >>> Indeed, we need to provide clearer abstractions for StateDescriptor and >>> State APIs, I have updated the FLIP accordingly. Looking forward to your >>> feedbacks! >>> >>> Best, >>> Shuiqiang >>> >>>> 在 2020年12月14日,上午11:27,Xingbo Huang <hxbks...@gmail.com> 写道: >>>> >>>> Thanks Shuiqiang for starting this discussion. >>>> >>>> Big +1 for this feature. State access support can further improve the >>>> functionality of our existing Python DataStream. >>>> >>>> I have 2 comments regarding to the design doc: >>>> >>>> a) I think that `StateDescriptor` needs to hold the variable `typeInfo` >>>> instead of letting each implementation class hold `typeInfo` itself.For >>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but passes >>>> `ListTypeInfo(elem_type_info)` to the construct method of >>>> `StateDescriptor`. >>>> >>>> b) I think we need to add the `MergingState` and `AppendingState` >>>> interfaces, and then extract the `get` and `add` methods from `ListState`, >>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then let >>>> `ListState`, `AggregatingState` and `ReducingState` inherit `MergingState`. >>>> >>>> Best, >>>> Xingbo >>>> >>>> Shuiqiang Chen <acqua....@gmail.com> 于2020年12月11日周五 下午9:44写道: >>>> >>>>> Hi devs, >>>>> >>>>> In FLIP-130, we have already supported Python DataStream stateless APIs so >>>>> that users are able to perform some basic data transformations. To >>>>> implement more complex data processing, we need to provide state access >>>>> support. So I would propose to add state access APIs in Python DataStream >>>>> API to support stateful operations on a KeyedStream. More details are in >>>>> the FLIP wiki page [1]. >>>>> >>>>> Any feedback will be highly appreciated! >>>>> >>>>> [1] >>>>> >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API >>>>> >>>>> Best, >>>>> Shuiqiang >>>>> >>> >> >