Thanks Shuiqiang for driving this.

The design looks good to me. +1 to start the vote if there are no more comments.

Regards,
Dian

> 在 2021年1月4日,下午7:40,Shuiqiang Chen <acqua....@gmail.com> 写道:
> 
> Hi Yu,
> 
> Thanks a lot for your suggestions.
> 
> I have addressed your inlined comments in the FLIP and also added a new
> section "State backed access synchronization" that explains the way to make
> sure there is no concurrent access to the state backend. Please have a look.
> 
> Best,
> Shuiqiang
> 
> 
> Yu Li <car...@gmail.com> 于2021年1月4日周一 下午4:15写道:
> 
>> Thanks for driving the discussion Shuiqiang, and sorry for chiming in late.
>> 
>> *bq. However, all the state access will be synchronized in the Java
>> operator and so there will be no concurrent access to the state backend.*
>> Could you add a section to explicitly mention this in the FLIP document? I
>> think single-threaded state access is an important prerequisite and it's
>> important for later contributors to know about this clearly, from both the
>> design doc and source codes.
>> 
>> The other parts LGTM, added some minor inline comments in the FLIP, please
>> take a look.
>> 
>> Thanks.
>> 
>> Best Regards,
>> Yu
>> 
>> 
>> On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen <acqua....@gmail.com> wrote:
>> 
>>> Hi wei,
>>> 
>>> Big thanks for pointing out the mistakes! I have updated the FLIP
>>> according to your suggestions.
>>> 
>>> Best,
>>> Shuiqiang
>>> 
>>>> 在 2020年12月18日,下午2:37,Wei Zhong <weizhong0...@gmail.com> 写道:
>>>> 
>>>> Hi Shuiqiang,
>>>> 
>>>> Thanks for driving this. +1 for this feature, just a minor comment to
>>> the design doc.
>>>> 
>>>> The interface of the `AppendingState` should be:
>>>> 
>>>> class AppendingState(State, Generic[IN, OUT]):
>>>> 
>>>>  @abstractmethod
>>>>  def get(self) -> OUT:
>>>>      pass
>>>> 
>>>>  @abstractmethod
>>>>  def add(self, value: IN) -> None:
>>>>      pass
>>>> 
>>>> The output type and the input type of the `AppendingState` maybe
>>> different. And the definition of the child classes should be:
>>>> 
>>>> class MergingState(AppendingState[IN, OUT]):
>>>>   pass
>>>> 
>>>> 
>>>> class ListState(MergingState[T, Iterable[T]]):
>>>> 
>>>>  @abstractmethod
>>>>  def update(self, values: List[T]) -> None:
>>>>      pass
>>>> 
>>>>  @abstractmethod
>>>>  def add_all(self, values: List[T]) -> None:
>>>>      pass
>>>> 
>>>>  def __iter__(self) -> Iterator[T]:
>>>>      return iter(self.get())
>>>> 
>>>> Best,
>>>> Wei
>>>> 
>>>>> 在 2020年12月17日,21:06,Shuiqiang Chen <acqua....@gmail.com> 写道:
>>>>> 
>>>>> Hi Yun,
>>>>> 
>>>>> Highly appreciate for your questions! I have the corresponding answers
>>> as bellow:
>>>>> 
>>>>> Re 1: You are right that the state access occurs in an async thread.
>>> However, all the state access will be synchrouzed in the Java operator
>> and
>>> so there will be no concurrent access to the state backend.
>>>>> 
>>>>> Re 2: I think it could be handled well in Python DataStream API. In
>>> this case, there will be two operators and so also two keyed state
>> backend.
>>>>> 
>>>>> Re 3: Sure, you are right. We will store the current key which may be
>>> used by the timer.
>>>>> 
>>>>> Re 4: Good point. State migration is still not covered in the current
>>> FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal
>> to
>>> this FLIP. I have updated the FLIP and added clear description for this.
>>>>> 
>>>>> Re 5: Good point. We may need to introduce a Python querable state
>>> client if we want to support Queryable state for Python operators. I'd
>> like
>>> to cover it in a separate FLIP. I have updated the FLIP and add it as a
>>> future work.
>>>>> 
>>>>> Best,
>>>>> Shuiqiang
>>>>> 
>>>>>> 在 2020年12月17日,下午12:08,Yun Tang <myas...@live.com> 写道:
>>>>>> 
>>>>>> Hi Shuiqiang,
>>>>>> 
>>>>>> Thanks for driving this. I have several questions below:
>>>>>> 
>>>>>> 
>>>>>> 1.  Thread safety of state write-access. As you might know, state
>>> access is not thread-safe [1] in Flink, we depend on task single thread
>>> access. Since you change the state access to another async thread, can we
>>> still ensure this? It also includes not allow user to access state in its
>>> java operator along with the bundled python operator.
>>>>>> 2.  Number of keyed state backend per task. Flink would only have one
>>> keyed state-backend per operator and would only have one keyed state
>>> backend per operator chain (in the head operator if possible). However,
>>> once we use experimental features such as reinterpretAsKeyedStream [2],
>> we
>>> could have two keyed state-backend in one operator chain within one task.
>>> Can python datastream API could handle this well?
>>>>>> 3.  Time to set current key. As we still need current key when
>>> registering timer [3], we need some place to hole the current key even
>> not
>>> registered in keyed state backend.
>>>>>> 4.  State migration. Flink supports to migrate state automatically if
>>> new provided serializer is compatible with old serializer[4]. I'm afraid
>> if
>>> python data stream API wraps user's serializer as
>>> BytePrimitiveArraySerializer, we will lose such functionality. Moreover,
>>> RocksDB will migrate state automatically on java side [5] and this will
>>> break if python related bytes involved.
>>>>>> 5.  Queryable state client. Currently, we only have java-based
>>> queryable state client [6], and we need another python-based queryable
>>> state client if involved python bytes.
>>>>>> 
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-13072
>>>>>> [2]
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>>>>>> [3]
>>> 
>> https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
>>>>>> [4]
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
>>>>>> [5]
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
>>>>>> [6]
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
>>>>>> 
>>>>>> Best
>>>>>> Yun Tang
>>>>>> 
>>>>>> 
>>>>>> ________________________________
>>>>>> From: Shuiqiang Chen <acqua....@gmail.com>
>>>>>> Sent: Wednesday, December 16, 2020 17:32
>>>>>> To: dev@flink.apache.org <dev@flink.apache.org>
>>>>>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python
>>> DataStream API
>>>>>> 
>>>>>> Hi Xingbo,
>>>>>> 
>>>>>> Thank you for your valuable suggestions.
>>>>>> 
>>>>>> Indeed, we need to provide clearer abstractions for StateDescriptor
>>> and State APIs, I have updated the FLIP accordingly. Looking forward to
>>> your feedbacks!
>>>>>> 
>>>>>> Best,
>>>>>> Shuiqiang
>>>>>> 
>>>>>>> 在 2020年12月14日,上午11:27,Xingbo Huang <hxbks...@gmail.com> 写道:
>>>>>>> 
>>>>>>> Thanks Shuiqiang for starting this discussion.
>>>>>>> 
>>>>>>> Big +1 for this feature. State access support can further improve
>> the
>>>>>>> functionality of our existing Python DataStream.
>>>>>>> 
>>>>>>> I have 2 comments regarding to the design doc:
>>>>>>> 
>>>>>>> a) I think that `StateDescriptor` needs to hold the variable
>>> `typeInfo`
>>>>>>> instead of letting each implementation class hold `typeInfo`
>>> itself.For
>>>>>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but
>>> passes
>>>>>>> `ListTypeInfo(elem_type_info)` to the construct method of
>>> `StateDescriptor`.
>>>>>>> 
>>>>>>> b) I think we need to add the `MergingState` and `AppendingState`
>>>>>>> interfaces, and then extract the `get` and `add` methods from
>>> `ListState`,
>>>>>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then
>>> let
>>>>>>> `ListState`, `AggregatingState` and `ReducingState` inherit
>>> `MergingState`.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Xingbo
>>>>>>> 
>>>>>>> Shuiqiang Chen <acqua....@gmail.com> 于2020年12月11日周五 下午9:44写道:
>>>>>>> 
>>>>>>>> Hi devs,
>>>>>>>> 
>>>>>>>> In FLIP-130, we have already supported Python DataStream stateless
>>> APIs so
>>>>>>>> that users are able to perform some basic data transformations. To
>>>>>>>> implement more complex data processing, we need to provide state
>>> access
>>>>>>>> support. So I would propose to add state access APIs in Python
>>> DataStream
>>>>>>>> API to support stateful operations on a KeyedStream. More details
>>> are in
>>>>>>>> the FLIP wiki page [1].
>>>>>>>> 
>>>>>>>> Any feedback will be highly appreciated!
>>>>>>>> 
>>>>>>>> [1]
>>>>>>>> 
>>>>>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Shuiqiang
>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 

Reply via email to