Hi Yu,

Thanks a lot for your suggestions.

I have addressed your inlined comments in the FLIP and also added a new
section "State backed access synchronization" that explains the way to make
sure there is no concurrent access to the state backend. Please have a look.

Best,
Shuiqiang


Yu Li <car...@gmail.com> 于2021年1月4日周一 下午4:15写道:

> Thanks for driving the discussion Shuiqiang, and sorry for chiming in late.
>
> *bq. However, all the state access will be synchronized in the Java
> operator and so there will be no concurrent access to the state backend.*
> Could you add a section to explicitly mention this in the FLIP document? I
> think single-threaded state access is an important prerequisite and it's
> important for later contributors to know about this clearly, from both the
> design doc and source codes.
>
> The other parts LGTM, added some minor inline comments in the FLIP, please
> take a look.
>
> Thanks.
>
> Best Regards,
> Yu
>
>
> On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen <acqua....@gmail.com> wrote:
>
> > Hi wei,
> >
> > Big thanks for pointing out the mistakes! I have updated the FLIP
> > according to your suggestions.
> >
> > Best,
> > Shuiqiang
> >
> > > 在 2020年12月18日,下午2:37,Wei Zhong <weizhong0...@gmail.com> 写道:
> > >
> > > Hi Shuiqiang,
> > >
> > > Thanks for driving this. +1 for this feature, just a minor comment to
> > the design doc.
> > >
> > > The interface of the `AppendingState` should be:
> > >
> > > class AppendingState(State, Generic[IN, OUT]):
> > >
> > >   @abstractmethod
> > >   def get(self) -> OUT:
> > >       pass
> > >
> > >   @abstractmethod
> > >   def add(self, value: IN) -> None:
> > >       pass
> > >
> > > The output type and the input type of the `AppendingState` maybe
> > different. And the definition of the child classes should be:
> > >
> > > class MergingState(AppendingState[IN, OUT]):
> > >    pass
> > >
> > >
> > > class ListState(MergingState[T, Iterable[T]]):
> > >
> > >   @abstractmethod
> > >   def update(self, values: List[T]) -> None:
> > >       pass
> > >
> > >   @abstractmethod
> > >   def add_all(self, values: List[T]) -> None:
> > >       pass
> > >
> > >   def __iter__(self) -> Iterator[T]:
> > >       return iter(self.get())
> > >
> > > Best,
> > > Wei
> > >
> > >> 在 2020年12月17日,21:06,Shuiqiang Chen <acqua....@gmail.com> 写道:
> > >>
> > >> Hi Yun,
> > >>
> > >> Highly appreciate for your questions! I have the corresponding answers
> > as bellow:
> > >>
> > >> Re 1: You are right that the state access occurs in an async thread.
> > However, all the state access will be synchrouzed in the Java operator
> and
> > so there will be no concurrent access to the state backend.
> > >>
> > >> Re 2: I think it could be handled well in Python DataStream API. In
> > this case, there will be two operators and so also two keyed state
> backend.
> > >>
> > >> Re 3: Sure, you are right. We will store the current key which may be
> > used by the timer.
> > >>
> > >> Re 4: Good point. State migration is still not covered in the current
> > FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal
> to
> > this FLIP. I have updated the FLIP and added clear description for this.
> > >>
> > >> Re 5: Good point. We may need to introduce a Python querable state
> > client if we want to support Queryable state for Python operators. I'd
> like
> > to cover it in a separate FLIP. I have updated the FLIP and add it as a
> > future work.
> > >>
> > >> Best,
> > >> Shuiqiang
> > >>
> > >>> 在 2020年12月17日,下午12:08,Yun Tang <myas...@live.com> 写道:
> > >>>
> > >>> Hi Shuiqiang,
> > >>>
> > >>> Thanks for driving this. I have several questions below:
> > >>>
> > >>>
> > >>> 1.  Thread safety of state write-access. As you might know, state
> > access is not thread-safe [1] in Flink, we depend on task single thread
> > access. Since you change the state access to another async thread, can we
> > still ensure this? It also includes not allow user to access state in its
> > java operator along with the bundled python operator.
> > >>> 2.  Number of keyed state backend per task. Flink would only have one
> > keyed state-backend per operator and would only have one keyed state
> > backend per operator chain (in the head operator if possible). However,
> > once we use experimental features such as reinterpretAsKeyedStream [2],
> we
> > could have two keyed state-backend in one operator chain within one task.
> > Can python datastream API could handle this well?
> > >>> 3.  Time to set current key. As we still need current key when
> > registering timer [3], we need some place to hole the current key even
> not
> > registered in keyed state backend.
> > >>> 4.  State migration. Flink supports to migrate state automatically if
> > new provided serializer is compatible with old serializer[4]. I'm afraid
> if
> > python data stream API wraps user's serializer as
> > BytePrimitiveArraySerializer, we will lose such functionality. Moreover,
> > RocksDB will migrate state automatically on java side [5] and this will
> > break if python related bytes involved.
> > >>> 5.  Queryable state client. Currently, we only have java-based
> > queryable state client [6], and we need another python-based queryable
> > state client if involved python bytes.
> > >>>
> > >>> [1] https://issues.apache.org/jira/browse/FLINK-13072
> > >>> [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> > >>> [3]
> >
> https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
> > >>> [4]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
> > >>> [5]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
> > >>> [6]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
> > >>>
> > >>> Best
> > >>> Yun Tang
> > >>>
> > >>>
> > >>> ________________________________
> > >>> From: Shuiqiang Chen <acqua....@gmail.com>
> > >>> Sent: Wednesday, December 16, 2020 17:32
> > >>> To: dev@flink.apache.org <dev@flink.apache.org>
> > >>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python
> > DataStream API
> > >>>
> > >>> Hi Xingbo,
> > >>>
> > >>> Thank you for your valuable suggestions.
> > >>>
> > >>> Indeed, we need to provide clearer abstractions for StateDescriptor
> > and State APIs, I have updated the FLIP accordingly. Looking forward to
> > your feedbacks!
> > >>>
> > >>> Best,
> > >>> Shuiqiang
> > >>>
> > >>>> 在 2020年12月14日,上午11:27,Xingbo Huang <hxbks...@gmail.com> 写道:
> > >>>>
> > >>>> Thanks Shuiqiang for starting this discussion.
> > >>>>
> > >>>> Big +1 for this feature. State access support can further improve
> the
> > >>>> functionality of our existing Python DataStream.
> > >>>>
> > >>>> I have 2 comments regarding to the design doc:
> > >>>>
> > >>>> a) I think that `StateDescriptor` needs to hold the variable
> > `typeInfo`
> > >>>> instead of letting each implementation class hold `typeInfo`
> > itself.For
> > >>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but
> > passes
> > >>>> `ListTypeInfo(elem_type_info)` to the construct method of
> > `StateDescriptor`.
> > >>>>
> > >>>> b) I think we need to add the `MergingState` and `AppendingState`
> > >>>> interfaces, and then extract the `get` and `add` methods from
> > `ListState`,
> > >>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then
> > let
> > >>>> `ListState`, `AggregatingState` and `ReducingState` inherit
> > `MergingState`.
> > >>>>
> > >>>> Best,
> > >>>> Xingbo
> > >>>>
> > >>>> Shuiqiang Chen <acqua....@gmail.com> 于2020年12月11日周五 下午9:44写道:
> > >>>>
> > >>>>> Hi devs,
> > >>>>>
> > >>>>> In FLIP-130, we have already supported Python DataStream stateless
> > APIs so
> > >>>>> that users are able to perform some basic data transformations. To
> > >>>>> implement more complex data processing, we need to provide state
> > access
> > >>>>> support. So I would propose to add state access APIs in Python
> > DataStream
> > >>>>> API to support stateful operations on a KeyedStream. More details
> > are in
> > >>>>> the FLIP wiki page [1].
> > >>>>>
> > >>>>> Any feedback will be highly appreciated!
> > >>>>>
> > >>>>> [1]
> > >>>>>
> > >>>>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
> > >>>>>
> > >>>>> Best,
> > >>>>> Shuiqiang
> > >>>>>
> > >>>
> > >>
> > >
> >
> >
>

Reply via email to