Hi Shuiqiang,

Thanks for driving this. I have several questions below:


  1.  Thread safety of state write-access. As you might know, state access is 
not thread-safe [1] in Flink, we depend on task single thread access. Since you 
change the state access to another async thread, can we still ensure this? It 
also includes not allow user to access state in its java operator along with 
the bundled python operator.
  2.  Number of keyed state backend per task. Flink would only have one keyed 
state-backend per operator and would only have one keyed state backend per 
operator chain (in the head operator if possible). However, once we use 
experimental features such as reinterpretAsKeyedStream [2], we could have two 
keyed state-backend in one operator chain within one task. Can python 
datastream API could handle this well?
  3.  Time to set current key. As we still need current key when registering 
timer [3], we need some place to hole the current key even not registered in 
keyed state backend.
  4.  State migration. Flink supports to migrate state automatically if new 
provided serializer is compatible with old serializer[4]. I'm afraid if python 
data stream API wraps user's serializer as BytePrimitiveArraySerializer, we 
will lose such functionality. Moreover, RocksDB will migrate state 
automatically on java side [5] and this will break if python related bytes 
involved.
  5.  Queryable state client. Currently, we only have java-based queryable 
state client [6], and we need another python-based queryable state client if 
involved python bytes.

[1] https://issues.apache.org/jira/browse/FLINK-13072
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[3] 
https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
[4] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
[5] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
[6] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example

Best
Yun Tang


________________________________
From: Shuiqiang Chen <acqua....@gmail.com>
Sent: Wednesday, December 16, 2020 17:32
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Hi Xingbo,

Thank you for your valuable suggestions.

Indeed, we need to provide clearer abstractions for StateDescriptor and State 
APIs, I have updated the FLIP accordingly. Looking forward to your feedbacks!

Best,
Shuiqiang

> 在 2020年12月14日,上午11:27,Xingbo Huang <hxbks...@gmail.com> 写道:
>
> Thanks Shuiqiang for starting this discussion.
>
> Big +1 for this feature. State access support can further improve the
> functionality of our existing Python DataStream.
>
> I have 2 comments regarding to the design doc:
>
> a) I think that `StateDescriptor` needs to hold the variable `typeInfo`
> instead of letting each implementation class hold `typeInfo` itself.For
> example, `ListStateDescriptor` does not hold `elem_type_info`, but passes
> `ListTypeInfo(elem_type_info)` to the construct method of `StateDescriptor`.
>
> b) I think we need to add the `MergingState` and `AppendingState`
> interfaces, and then extract the `get` and `add` methods from `ListState`,
> `AggregatingState`, and `ReducingState` into `AppendingState`. Then let
> `ListState`, `AggregatingState` and `ReducingState` inherit `MergingState`.
>
> Best,
> Xingbo
>
> Shuiqiang Chen <acqua....@gmail.com> 于2020年12月11日周五 下午9:44写道:
>
>> Hi devs,
>>
>> In FLIP-130, we have already supported Python DataStream stateless APIs so
>> that users are able to perform some basic data transformations. To
>> implement more complex data processing, we need to provide state access
>> support. So I would propose to add state access APIs in Python DataStream
>> API to support stateful operations on a KeyedStream. More details are in
>> the FLIP wiki page [1].
>>
>> Any feedback will be highly appreciated!
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>
>> Best,
>> Shuiqiang
>>

Reply via email to