Hi Yun,

Highly appreciate for your questions! I have the corresponding answers as 
bellow:

Re 1: You are right that the state access occurs in an async thread. However, 
all the state access will be synchrouzed in the Java operator and so there will 
be no concurrent access to the state backend.

Re 2: I think it could be handled well in Python DataStream API. In this case, 
there will be two operators and so also two keyed state backend.

Re 3: Sure, you are right. We will store the current key which may be used by 
the timer.

Re 4: Good point. State migration is still not covered in the current FLIP. I'd 
like to cover it in a separate FLIP as it should be orthogonal to this FLIP. I 
have updated the FLIP and added clear description for this.

Re 5: Good point. We may need to introduce a Python querable state client if we 
want to support Queryable state for Python operators. I'd like to cover it in a 
separate FLIP. I have updated the FLIP and add it as a future work.

Best,
Shuiqiang

> 在 2020年12月17日,下午12:08,Yun Tang <myas...@live.com> 写道:
> 
> Hi Shuiqiang,
> 
> Thanks for driving this. I have several questions below:
> 
> 
>  1.  Thread safety of state write-access. As you might know, state access is 
> not thread-safe [1] in Flink, we depend on task single thread access. Since 
> you change the state access to another async thread, can we still ensure 
> this? It also includes not allow user to access state in its java operator 
> along with the bundled python operator.
>  2.  Number of keyed state backend per task. Flink would only have one keyed 
> state-backend per operator and would only have one keyed state backend per 
> operator chain (in the head operator if possible). However, once we use 
> experimental features such as reinterpretAsKeyedStream [2], we could have two 
> keyed state-backend in one operator chain within one task. Can python 
> datastream API could handle this well?
>  3.  Time to set current key. As we still need current key when registering 
> timer [3], we need some place to hole the current key even not registered in 
> keyed state backend.
>  4.  State migration. Flink supports to migrate state automatically if new 
> provided serializer is compatible with old serializer[4]. I'm afraid if 
> python data stream API wraps user's serializer as 
> BytePrimitiveArraySerializer, we will lose such functionality. Moreover, 
> RocksDB will migrate state automatically on java side [5] and this will break 
> if python related bytes involved.
>  5.  Queryable state client. Currently, we only have java-based queryable 
> state client [6], and we need another python-based queryable state client if 
> involved python bytes.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-13072
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> [3] 
> https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
> [4] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
> [5] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
> [6] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
> 
> Best
> Yun Tang
> 
> 
> ________________________________
> From: Shuiqiang Chen <acqua....@gmail.com>
> Sent: Wednesday, December 16, 2020 17:32
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API
> 
> Hi Xingbo,
> 
> Thank you for your valuable suggestions.
> 
> Indeed, we need to provide clearer abstractions for StateDescriptor and State 
> APIs, I have updated the FLIP accordingly. Looking forward to your feedbacks!
> 
> Best,
> Shuiqiang
> 
>> 在 2020年12月14日,上午11:27,Xingbo Huang <hxbks...@gmail.com> 写道:
>> 
>> Thanks Shuiqiang for starting this discussion.
>> 
>> Big +1 for this feature. State access support can further improve the
>> functionality of our existing Python DataStream.
>> 
>> I have 2 comments regarding to the design doc:
>> 
>> a) I think that `StateDescriptor` needs to hold the variable `typeInfo`
>> instead of letting each implementation class hold `typeInfo` itself.For
>> example, `ListStateDescriptor` does not hold `elem_type_info`, but passes
>> `ListTypeInfo(elem_type_info)` to the construct method of `StateDescriptor`.
>> 
>> b) I think we need to add the `MergingState` and `AppendingState`
>> interfaces, and then extract the `get` and `add` methods from `ListState`,
>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then let
>> `ListState`, `AggregatingState` and `ReducingState` inherit `MergingState`.
>> 
>> Best,
>> Xingbo
>> 
>> Shuiqiang Chen <acqua....@gmail.com> 于2020年12月11日周五 下午9:44写道:
>> 
>>> Hi devs,
>>> 
>>> In FLIP-130, we have already supported Python DataStream stateless APIs so
>>> that users are able to perform some basic data transformations. To
>>> implement more complex data processing, we need to provide state access
>>> support. So I would propose to add state access APIs in Python DataStream
>>> API to support stateful operations on a KeyedStream. More details are in
>>> the FLIP wiki page [1].
>>> 
>>> Any feedback will be highly appreciated!
>>> 
>>> [1]
>>> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>> 
>>> Best,
>>> Shuiqiang
>>> 
> 

Reply via email to