Hi wei,

Big thanks for pointing out the mistakes! I have updated the FLIP according to 
your suggestions.

Best,
Shuiqiang

> 在 2020年12月18日,下午2:37,Wei Zhong <weizhong0...@gmail.com> 写道:
> 
> Hi Shuiqiang,
> 
> Thanks for driving this. +1 for this feature, just a minor comment to the 
> design doc.
> 
> The interface of the `AppendingState` should be:
> 
> class AppendingState(State, Generic[IN, OUT]):
> 
>   @abstractmethod
>   def get(self) -> OUT:
>       pass
> 
>   @abstractmethod
>   def add(self, value: IN) -> None:
>       pass
> 
> The output type and the input type of the `AppendingState` maybe different. 
> And the definition of the child classes should be:
> 
> class MergingState(AppendingState[IN, OUT]):
>    pass
> 
> 
> class ListState(MergingState[T, Iterable[T]]):
> 
>   @abstractmethod
>   def update(self, values: List[T]) -> None:
>       pass
> 
>   @abstractmethod
>   def add_all(self, values: List[T]) -> None:
>       pass
> 
>   def __iter__(self) -> Iterator[T]:
>       return iter(self.get())
> 
> Best,
> Wei
> 
>> 在 2020年12月17日,21:06,Shuiqiang Chen <acqua....@gmail.com> 写道:
>> 
>> Hi Yun,
>> 
>> Highly appreciate for your questions! I have the corresponding answers as 
>> bellow:
>> 
>> Re 1: You are right that the state access occurs in an async thread. 
>> However, all the state access will be synchrouzed in the Java operator and 
>> so there will be no concurrent access to the state backend.
>> 
>> Re 2: I think it could be handled well in Python DataStream API. In this 
>> case, there will be two operators and so also two keyed state backend.
>> 
>> Re 3: Sure, you are right. We will store the current key which may be used 
>> by the timer.
>> 
>> Re 4: Good point. State migration is still not covered in the current FLIP. 
>> I'd like to cover it in a separate FLIP as it should be orthogonal to this 
>> FLIP. I have updated the FLIP and added clear description for this.
>> 
>> Re 5: Good point. We may need to introduce a Python querable state client if 
>> we want to support Queryable state for Python operators. I'd like to cover 
>> it in a separate FLIP. I have updated the FLIP and add it as a future work.
>> 
>> Best,
>> Shuiqiang
>> 
>>> 在 2020年12月17日,下午12:08,Yun Tang <myas...@live.com> 写道:
>>> 
>>> Hi Shuiqiang,
>>> 
>>> Thanks for driving this. I have several questions below:
>>> 
>>> 
>>> 1.  Thread safety of state write-access. As you might know, state access is 
>>> not thread-safe [1] in Flink, we depend on task single thread access. Since 
>>> you change the state access to another async thread, can we still ensure 
>>> this? It also includes not allow user to access state in its java operator 
>>> along with the bundled python operator.
>>> 2.  Number of keyed state backend per task. Flink would only have one keyed 
>>> state-backend per operator and would only have one keyed state backend per 
>>> operator chain (in the head operator if possible). However, once we use 
>>> experimental features such as reinterpretAsKeyedStream [2], we could have 
>>> two keyed state-backend in one operator chain within one task. Can python 
>>> datastream API could handle this well?
>>> 3.  Time to set current key. As we still need current key when registering 
>>> timer [3], we need some place to hole the current key even not registered 
>>> in keyed state backend.
>>> 4.  State migration. Flink supports to migrate state automatically if new 
>>> provided serializer is compatible with old serializer[4]. I'm afraid if 
>>> python data stream API wraps user's serializer as 
>>> BytePrimitiveArraySerializer, we will lose such functionality. Moreover, 
>>> RocksDB will migrate state automatically on java side [5] and this will 
>>> break if python related bytes involved.
>>> 5.  Queryable state client. Currently, we only have java-based queryable 
>>> state client [6], and we need another python-based queryable state client 
>>> if involved python bytes.
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-13072
>>> [2] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>>> [3] 
>>> https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
>>> [4] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
>>> [5] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
>>> [6] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
>>> 
>>> Best
>>> Yun Tang
>>> 
>>> 
>>> ________________________________
>>> From: Shuiqiang Chen <acqua....@gmail.com>
>>> Sent: Wednesday, December 16, 2020 17:32
>>> To: dev@flink.apache.org <dev@flink.apache.org>
>>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python DataStream 
>>> API
>>> 
>>> Hi Xingbo,
>>> 
>>> Thank you for your valuable suggestions.
>>> 
>>> Indeed, we need to provide clearer abstractions for StateDescriptor and 
>>> State APIs, I have updated the FLIP accordingly. Looking forward to your 
>>> feedbacks!
>>> 
>>> Best,
>>> Shuiqiang
>>> 
>>>> 在 2020年12月14日,上午11:27,Xingbo Huang <hxbks...@gmail.com> 写道:
>>>> 
>>>> Thanks Shuiqiang for starting this discussion.
>>>> 
>>>> Big +1 for this feature. State access support can further improve the
>>>> functionality of our existing Python DataStream.
>>>> 
>>>> I have 2 comments regarding to the design doc:
>>>> 
>>>> a) I think that `StateDescriptor` needs to hold the variable `typeInfo`
>>>> instead of letting each implementation class hold `typeInfo` itself.For
>>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but passes
>>>> `ListTypeInfo(elem_type_info)` to the construct method of 
>>>> `StateDescriptor`.
>>>> 
>>>> b) I think we need to add the `MergingState` and `AppendingState`
>>>> interfaces, and then extract the `get` and `add` methods from `ListState`,
>>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then let
>>>> `ListState`, `AggregatingState` and `ReducingState` inherit `MergingState`.
>>>> 
>>>> Best,
>>>> Xingbo
>>>> 
>>>> Shuiqiang Chen <acqua....@gmail.com> 于2020年12月11日周五 下午9:44写道:
>>>> 
>>>>> Hi devs,
>>>>> 
>>>>> In FLIP-130, we have already supported Python DataStream stateless APIs so
>>>>> that users are able to perform some basic data transformations. To
>>>>> implement more complex data processing, we need to provide state access
>>>>> support. So I would propose to add state access APIs in Python DataStream
>>>>> API to support stateful operations on a KeyedStream. More details are in
>>>>> the FLIP wiki page [1].
>>>>> 
>>>>> Any feedback will be highly appreciated!
>>>>> 
>>>>> [1]
>>>>> 
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>>>> 
>>>>> Best,
>>>>> Shuiqiang
>>>>> 
>>> 
>> 
> 

Reply via email to