Hi Shuiqiang,
Thanks for driving this. +1 for this feature, just a minor comment to the
design doc.
The interface of the `AppendingState` should be:
class AppendingState(State, Generic[IN, OUT]):
@abstractmethod
def get(self) -> OUT:
pass
@abstractmethod
def add(self, value: IN) -> None:
pass
The output type and the input type of the `AppendingState` maybe different. And
the definition of the child classes should be:
class MergingState(AppendingState[IN, OUT]):
pass
class ListState(MergingState[T, Iterable[T]]):
@abstractmethod
def update(self, values: List[T]) -> None:
pass
@abstractmethod
def add_all(self, values: List[T]) -> None:
pass
def __iter__(self) -> Iterator[T]:
return iter(self.get())
Best,
Wei
> 在 2020年12月17日,21:06,Shuiqiang Chen <[email protected]> 写道:
>
> Hi Yun,
>
> Highly appreciate for your questions! I have the corresponding answers as
> bellow:
>
> Re 1: You are right that the state access occurs in an async thread. However,
> all the state access will be synchrouzed in the Java operator and so there
> will be no concurrent access to the state backend.
>
> Re 2: I think it could be handled well in Python DataStream API. In this
> case, there will be two operators and so also two keyed state backend.
>
> Re 3: Sure, you are right. We will store the current key which may be used by
> the timer.
>
> Re 4: Good point. State migration is still not covered in the current FLIP.
> I'd like to cover it in a separate FLIP as it should be orthogonal to this
> FLIP. I have updated the FLIP and added clear description for this.
>
> Re 5: Good point. We may need to introduce a Python querable state client if
> we want to support Queryable state for Python operators. I'd like to cover it
> in a separate FLIP. I have updated the FLIP and add it as a future work.
>
> Best,
> Shuiqiang
>
>> 在 2020年12月17日,下午12:08,Yun Tang <[email protected]> 写道:
>>
>> Hi Shuiqiang,
>>
>> Thanks for driving this. I have several questions below:
>>
>>
>> 1. Thread safety of state write-access. As you might know, state access is
>> not thread-safe [1] in Flink, we depend on task single thread access. Since
>> you change the state access to another async thread, can we still ensure
>> this? It also includes not allow user to access state in its java operator
>> along with the bundled python operator.
>> 2. Number of keyed state backend per task. Flink would only have one keyed
>> state-backend per operator and would only have one keyed state backend per
>> operator chain (in the head operator if possible). However, once we use
>> experimental features such as reinterpretAsKeyedStream [2], we could have
>> two keyed state-backend in one operator chain within one task. Can python
>> datastream API could handle this well?
>> 3. Time to set current key. As we still need current key when registering
>> timer [3], we need some place to hole the current key even not registered in
>> keyed state backend.
>> 4. State migration. Flink supports to migrate state automatically if new
>> provided serializer is compatible with old serializer[4]. I'm afraid if
>> python data stream API wraps user's serializer as
>> BytePrimitiveArraySerializer, we will lose such functionality. Moreover,
>> RocksDB will migrate state automatically on java side [5] and this will
>> break if python related bytes involved.
>> 5. Queryable state client. Currently, we only have java-based queryable
>> state client [6], and we need another python-based queryable state client if
>> involved python bytes.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13072
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>> [3]
>> https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
>> [4]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
>> [5]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
>> [6]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
>>
>> Best
>> Yun Tang
>>
>>
>> ________________________________
>> From: Shuiqiang Chen <[email protected]>
>> Sent: Wednesday, December 16, 2020 17:32
>> To: [email protected] <[email protected]>
>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python DataStream
>> API
>>
>> Hi Xingbo,
>>
>> Thank you for your valuable suggestions.
>>
>> Indeed, we need to provide clearer abstractions for StateDescriptor and
>> State APIs, I have updated the FLIP accordingly. Looking forward to your
>> feedbacks!
>>
>> Best,
>> Shuiqiang
>>
>>> 在 2020年12月14日,上午11:27,Xingbo Huang <[email protected]> 写道:
>>>
>>> Thanks Shuiqiang for starting this discussion.
>>>
>>> Big +1 for this feature. State access support can further improve the
>>> functionality of our existing Python DataStream.
>>>
>>> I have 2 comments regarding to the design doc:
>>>
>>> a) I think that `StateDescriptor` needs to hold the variable `typeInfo`
>>> instead of letting each implementation class hold `typeInfo` itself.For
>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but passes
>>> `ListTypeInfo(elem_type_info)` to the construct method of `StateDescriptor`.
>>>
>>> b) I think we need to add the `MergingState` and `AppendingState`
>>> interfaces, and then extract the `get` and `add` methods from `ListState`,
>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then let
>>> `ListState`, `AggregatingState` and `ReducingState` inherit `MergingState`.
>>>
>>> Best,
>>> Xingbo
>>>
>>> Shuiqiang Chen <[email protected]> 于2020年12月11日周五 下午9:44写道:
>>>
>>>> Hi devs,
>>>>
>>>> In FLIP-130, we have already supported Python DataStream stateless APIs so
>>>> that users are able to perform some basic data transformations. To
>>>> implement more complex data processing, we need to provide state access
>>>> support. So I would propose to add state access APIs in Python DataStream
>>>> API to support stateful operations on a KeyedStream. More details are in
>>>> the FLIP wiki page [1].
>>>>
>>>> Any feedback will be highly appreciated!
>>>>
>>>> [1]
>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>>>
>>>> Best,
>>>> Shuiqiang
>>>>
>>
>