Thanks for the awesome FLIP.

I think it will be very useful in state migration scenario. We are also
looking for a state reuse solution for SQL jobs. And I think this feature
will help a lot.
Looking forward to have it in the near future.

Regarding to the naming, I'm +1 to "State Processing API". Should we also
update the FLIP name in confluence?

Btw, what do you think to add a shortcut API for changing max parallelism
for savepoints? This is a very common scenario.
But from my understanding, it needs to do a lot of trivial thing to achieve
it under current API.

Best,
Jark


On Wed, 5 Jun 2019 at 10:52, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang <xiaow...@gmail.com> wrote:
>
> >  Hi Gordon & Seth, this looks like a very useful feature for analyze and
> > manage states.
> > I agree that using DataSet is probably the most practical choice right
> > now. But in the longer adding the TableAPI support for this will be nice.
> >
>
> Agreed. Migrating this API in the future to the TableAPI is definitely
> something we have considered.
>
>
> > When analyzing the savepoint, I assume that the state backend restores
> the
> > state first? This approach is generic and works for any state backend.
>
>
> Yes, that is correct. The process of reading state in snapshots is
> currently:
> 1) the metadata file is read when creating the input splits for the
> InputFormat. Each input split is assigned operator state and key-group
> state handles.
> 2) For each input split, a state backend is launched and is restored with
> all state of the assigned state handles. Only partially some state is
> transformed into DataSets (using the savepoint.read*State(...) methods).
>
>
> > However, sometimes it may be more efficient to directly analyzing the
> > files on DFS without copying. We can probably add interface to allow
> state
> > backend optimize such behavior in the future.
>
>
> That sounds like an interesting direction, though at the moment it may only
> make sense for full savepoints / checkpoints.
> One blocker for enabling this, is having the type information of state
> available in the snapshot metadata file so that schema / type of state is
> known before actually reading state.
> Making state schema / type information available in the metadata file is
> already a recurring discussion in this thread that would be useful for not
> only this feature you mentioned, but also for features like SQL integration
> in the future.
> Therefore, this seems to be a reasonable next step when extending on top of
> the initial scope of the API proposed in the FLIP.
>
>
> > Also a quick question on the example in wiki: DataSet<MyPojo> keyedState
> =
> > operator.readKeyedState("uid", new ReaderFunction());Should
> > operator.readKeyedState  be replaced with savepoint.readKeyedState here?
> >
>
> Correct, this is indeed a typo. I've corrected this in the FLIP.
>
> Cheers,
> Gordon
>
>
> >
> > Regards,Xiaowei
> >
> >     On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek <
> > aljos...@apache.org> wrote:
> >
> >  +1 I think is is a very valuable new additional and we should try and
> not
> > get stuck on trying to design the perfect solution for everything
> >
> > > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> > wrote:
> > >
> > > +1 to renaming it as State Processing API and adding it under the
> > > flink-libraries module.
> > >
> > > I also think we can start with the development of the feature. From the
> > > feedback so far, it seems like we're in a good spot to add in at least
> > the
> > > initial version of this API, hopefully making it ready for 1.9.0.
> > >
> > > Cheers,
> > > Gordon
> > >
> > > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sjwies...@gmail.com>
> wrote:
> > >
> > >> It seems like a recurring piece of feedback was a different name. I’d
> > like
> > >> to propose moving the functionality to the libraries module and naming
> > this
> > >> the State Processing API.
> > >>
> > >> Seth
> > >>
> > >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com>
> wrote:
> > >>>
> > >>> The SavepointOutputFormat only writes out the savepoint metadata file
> > >> and should be mostly ignored.
> > >>>
> > >>> The actual state is written out by stream operators and tied into the
> > >> flink runtime[1, 2, 3].
> > >>>
> > >>> This is the most important part and the piece that I don’t think can
> be
> > >> reasonably extracted.
> > >>>
> > >>> Seth
> > >>>
> > >>> [1]
> > >>
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> > >>>
> > >>> [2]
> > >>
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> > >>>
> > >>> [3]
> > >>
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> > >>>
> > >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
> > >>>>
> > >>>> Hi Seth,
> > >>>>
> > >>>> yes, that helped! :-)
> > >>>>
> > >>>> What I was looking for is essentially
> > >> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`.
> It
> > >> would be great if this could be written in a way, that would enable
> > reusing
> > >> it in different engine (as I mentioned - Apache Spark). There seem to
> be
> > >> some issues though. It uses interface Savepoint, which uses several
> > other
> > >> objects and interfaces from Flink's runtime. Maybe some convenience
> API
> > >> might help - Apache Beam, handles operator naming, so that definitely
> > >> should be transitionable between systems, but I'm not sure, how to
> > >> construct OperatorID from this name. Would you think, that it is
> > possible
> > >> to come up with something that could be used in this portable way?
> > >>>>
> > >>>> I understand, there are some more conditions, that need to be
> > satisfied
> > >> (grouping, aggregating, ...), which would of course have to be handled
> > by
> > >> the target system. But Apache Beam can help leverage that. My idea
> would
> > >> be, that there can be runner specified PTransform, that takes
> > PCollection
> > >> of some tuples of `(operator name, key, state name, value1), (operator
> > >> name, key, state name, value2)`, and Runner's responsibility would be
> to
> > >> group/aggregate this so that it can be written by runner's provided
> > writer
> > >> (output format).
> > >>>>
> > >>>> All of this would need a lot more design, these are just ideas of
> > "what
> > >> could be possible", I was just wondering if this FLIP can make some
> > first
> > >> steps towards this.
> > >>>>
> > >>>> Many thanks for comments,
> > >>>>
> > >>>> Jan
> > >>>>
> > >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> > >>>>> @Jan Gotcha,
> > >>>>>
> > >>>>> So in reusing components it explicitly is not a writer. This is
> not a
> > >> savepoint output format in the way we have a parquet output format.
> The
> > >> reason for the Transform api is to hide the underlying details, it
> does
> > not
> > >> simply append a output writer to the end of a dataset. This gets into
> > the
> > >> implementation details but at a high level, the dataset is:
> > >>>>>
> > >>>>> 1) partitioned using key groups
> > >>>>> 2) data is run through a standard stream operator that takes a
> > >> snapshot of its state after processing all records and outputs
> metadata
> > >> handles for each subtask
> > >>>>> 3) those metadata handles are aggregated down to a single savepoint
> > >> handle
> > >>>>> 4) that handle is written out as a final metadata file
> > >>>>>
> > >>>>> What’s important here is that the api actually depends on the data
> > >> flow collection and state is written out as a side effect of taking a
> > >> savepoint. The FLIP describes a lose coupling to the dataset api for
> > >> eventual migration to BoundedStream, that is true. However, the api
> does
> > >> require knowing what concrete data flow is being used to perform these
> > >> re-partitionings  and post aggregations.
> > >>>>>
> > >>>>> I’m linking to my prototype implementation, particularly what
> > actually
> > >> happens when you call write and run the transformations. Let me know
> if
> > >> that helps clarify.
> > >>>>>
> > >>>>> Seth
> > >>>>>
> > >>>>>
> > >>
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz>
> wrote:
> > >>>>>>
> > >>>>>> Hi Seth,
> > >>>>>>
> > >>>>>> that sounds reasonable. What I was asking for was not to reverse
> > >> engineer binary format, but to make the savepoint write API a little
> > more
> > >> reusable, so that it could be wrapped into some other technology. I
> > don't
> > >> know the details enough to propose a solution, but it seems to me,
> that
> > it
> > >> could be possible to use something like Writer instead of Transform.
> Or
> > >> maybe the Transform can use the Writer internally, the goal is just to
> > >> enable to create the savepoint from "'outside" of Flink (with some
> > library,
> > >> of course).
> > >>>>>>
> > >>>>>> Jan
> > >>>>>>
> > >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
> > >>>>>>> @Konstantin agreed, that was a large impotence for this feature.
> > >> Also I am happy to change the name to something that better  describes
> > the
> > >> feature set.
> > >>>>>>>
> > >>>>>>> @Lan
> > >>>>>>>
> > >>>>>>> Savepoints depend heavily on a number of flink internal
> components
> > >> that may change between versions: state backends internals, type
> > >> serializers, the specific hash function used to turn a UID into an
> > >> OperatorID, etc. I consider it a feature of this proposal that the
> > library
> > >> depends on those internal components instead of reverse engineering
> the
> > >> binary format. This way as those internals change, or new state
> features
> > >> are added (think the recent addition of TTL) we will get support
> > >> automatically. I do not believe anything else is maintainable.
> > >>>>>>>
> > >>>>>>> Seth
> > >>>>>>>
> > >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz>
> > wrote:
> > >>>>>>>>
> > >>>>>>>> Hi,
> > >>>>>>>>
> > >>>>>>>> this is awesome, and really useful feature. If I might ask for
> one
> > >> thing to consider - would it be possible to make the Savepoint
> > manipulation
> > >> API (at least writing the Savepoint) less dependent on other parts of
> > Flink
> > >> internals (e.g. |KeyedStateBootstrapFunction|) and provide something
> > more
> > >> general (e.g. some generic Writer)? Why I'm asking for that - I can
> > totally
> > >> imagine situation, where users might want to create bootstrapped state
> > by
> > >> some other runner (e.g. Apache Spark), and then run Apache Flink after
> > the
> > >> state has been created. This makes even more sense in context of
> Apache
> > >> Beam, which provides all the necessary work to make this happen. The
> > >> question is - would it be possible to design this feature so that
> > writing
> > >> the savepoint from different runner would be possible?
> > >>>>>>>>
> > >>>>>>>> Cheers,
> > >>>>>>>>
> > >>>>>>>> Jan
> > >>>>>>>>
> > >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
> > >>>>>>>>> Hey Everyone!
> > >>>>>>>>>
> > >>>>>>>>> Gordon and I have been discussing adding a savepoint connector
> to
> > >> flink for reading, writing and modifying savepoints.
> > >>>>>>>>>
> > >>>>>>>>> This is useful for:
> > >>>>>>>>>
> > >>>>>>>>>  Analyzing state for interesting patterns
> > >>>>>>>>>  Troubleshooting or auditing jobs by checking for discrepancies
> > >> in state
> > >>>>>>>>>  Bootstrapping state for new applications
> > >>>>>>>>>  Modifying savepoints such as:
> > >>>>>>>>>      Changing max parallelism
> > >>>>>>>>>      Making breaking schema changes
> > >>>>>>>>>      Correcting invalid state
> > >>>>>>>>>
> > >>>>>>>>> We are looking forward to your feedback!
> > >>>>>>>>>
> > >>>>>>>>> This is the FLIP:
> > >>>>>>>>>
> > >>>>>>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
> > >>>>>>>>>
> > >>>>>>>>> Seth
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>
> >
>

Reply via email to