Hi Jark,

Thanks for the reminder. I've updated the FLIP name in confluence to match
the new name "State Processor API".

Concerning an API for changing max parallelism:
That is actually in the works and has been considered, and would look
something like -
```
ExistingSavepoint savepoint = Savepoint.load(oldPath);
savepoint.modifyMaxParallelism(newParallelism, newPath);
```

For the contribution, we've currently reached the write-part of the State
processing API [1].
After that is settled, we can then start thinking about adding this max
parallelism change feature as a follow-up.

Cheers,
Gordon

[1] https://github.com/apache/flink/pull/8861

On Tue, Jun 25, 2019 at 2:27 PM Jark Wu <imj...@gmail.com> wrote:

> Thanks for the awesome FLIP.
>
> I think it will be very useful in state migration scenario. We are also
> looking for a state reuse solution for SQL jobs. And I think this feature
> will help a lot.
> Looking forward to have it in the near future.
>
> Regarding to the naming, I'm +1 to "State Processing API". Should we also
> update the FLIP name in confluence?
>
> Btw, what do you think to add a shortcut API for changing max parallelism
> for savepoints? This is a very common scenario.
> But from my understanding, it needs to do a lot of trivial thing to achieve
> it under current API.
>
> Best,
> Jark
>
>
> On Wed, 5 Jun 2019 at 10:52, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
> > On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang <xiaow...@gmail.com> wrote:
> >
> > >  Hi Gordon & Seth, this looks like a very useful feature for analyze
> and
> > > manage states.
> > > I agree that using DataSet is probably the most practical choice right
> > > now. But in the longer adding the TableAPI support for this will be
> nice.
> > >
> >
> > Agreed. Migrating this API in the future to the TableAPI is definitely
> > something we have considered.
> >
> >
> > > When analyzing the savepoint, I assume that the state backend restores
> > the
> > > state first? This approach is generic and works for any state backend.
> >
> >
> > Yes, that is correct. The process of reading state in snapshots is
> > currently:
> > 1) the metadata file is read when creating the input splits for the
> > InputFormat. Each input split is assigned operator state and key-group
> > state handles.
> > 2) For each input split, a state backend is launched and is restored with
> > all state of the assigned state handles. Only partially some state is
> > transformed into DataSets (using the savepoint.read*State(...) methods).
> >
> >
> > > However, sometimes it may be more efficient to directly analyzing the
> > > files on DFS without copying. We can probably add interface to allow
> > state
> > > backend optimize such behavior in the future.
> >
> >
> > That sounds like an interesting direction, though at the moment it may
> only
> > make sense for full savepoints / checkpoints.
> > One blocker for enabling this, is having the type information of state
> > available in the snapshot metadata file so that schema / type of state is
> > known before actually reading state.
> > Making state schema / type information available in the metadata file is
> > already a recurring discussion in this thread that would be useful for
> not
> > only this feature you mentioned, but also for features like SQL
> integration
> > in the future.
> > Therefore, this seems to be a reasonable next step when extending on top
> of
> > the initial scope of the API proposed in the FLIP.
> >
> >
> > > Also a quick question on the example in wiki: DataSet<MyPojo>
> keyedState
> > =
> > > operator.readKeyedState("uid", new ReaderFunction());Should
> > > operator.readKeyedState  be replaced with savepoint.readKeyedState
> here?
> > >
> >
> > Correct, this is indeed a typo. I've corrected this in the FLIP.
> >
> > Cheers,
> > Gordon
> >
> >
> > >
> > > Regards,Xiaowei
> > >
> > >     On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek <
> > > aljos...@apache.org> wrote:
> > >
> > >  +1 I think is is a very valuable new additional and we should try and
> > not
> > > get stuck on trying to design the perfect solution for everything
> > >
> > > > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> > > wrote:
> > > >
> > > > +1 to renaming it as State Processing API and adding it under the
> > > > flink-libraries module.
> > > >
> > > > I also think we can start with the development of the feature. From
> the
> > > > feedback so far, it seems like we're in a good spot to add in at
> least
> > > the
> > > > initial version of this API, hopefully making it ready for 1.9.0.
> > > >
> > > > Cheers,
> > > > Gordon
> > > >
> > > > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sjwies...@gmail.com>
> > wrote:
> > > >
> > > >> It seems like a recurring piece of feedback was a different name.
> I’d
> > > like
> > > >> to propose moving the functionality to the libraries module and
> naming
> > > this
> > > >> the State Processing API.
> > > >>
> > > >> Seth
> > > >>
> > > >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com>
> > wrote:
> > > >>>
> > > >>> The SavepointOutputFormat only writes out the savepoint metadata
> file
> > > >> and should be mostly ignored.
> > > >>>
> > > >>> The actual state is written out by stream operators and tied into
> the
> > > >> flink runtime[1, 2, 3].
> > > >>>
> > > >>> This is the most important part and the piece that I don’t think
> can
> > be
> > > >> reasonably extracted.
> > > >>>
> > > >>> Seth
> > > >>>
> > > >>> [1]
> > > >>
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> > > >>>
> > > >>> [2]
> > > >>
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> > > >>>
> > > >>> [3]
> > > >>
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> > > >>>
> > > >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz>
> wrote:
> > > >>>>
> > > >>>> Hi Seth,
> > > >>>>
> > > >>>> yes, that helped! :-)
> > > >>>>
> > > >>>> What I was looking for is essentially
> > > >>
> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`.
> > It
> > > >> would be great if this could be written in a way, that would enable
> > > reusing
> > > >> it in different engine (as I mentioned - Apache Spark). There seem
> to
> > be
> > > >> some issues though. It uses interface Savepoint, which uses several
> > > other
> > > >> objects and interfaces from Flink's runtime. Maybe some convenience
> > API
> > > >> might help - Apache Beam, handles operator naming, so that
> definitely
> > > >> should be transitionable between systems, but I'm not sure, how to
> > > >> construct OperatorID from this name. Would you think, that it is
> > > possible
> > > >> to come up with something that could be used in this portable way?
> > > >>>>
> > > >>>> I understand, there are some more conditions, that need to be
> > > satisfied
> > > >> (grouping, aggregating, ...), which would of course have to be
> handled
> > > by
> > > >> the target system. But Apache Beam can help leverage that. My idea
> > would
> > > >> be, that there can be runner specified PTransform, that takes
> > > PCollection
> > > >> of some tuples of `(operator name, key, state name, value1),
> (operator
> > > >> name, key, state name, value2)`, and Runner's responsibility would
> be
> > to
> > > >> group/aggregate this so that it can be written by runner's provided
> > > writer
> > > >> (output format).
> > > >>>>
> > > >>>> All of this would need a lot more design, these are just ideas of
> > > "what
> > > >> could be possible", I was just wondering if this FLIP can make some
> > > first
> > > >> steps towards this.
> > > >>>>
> > > >>>> Many thanks for comments,
> > > >>>>
> > > >>>> Jan
> > > >>>>
> > > >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> > > >>>>> @Jan Gotcha,
> > > >>>>>
> > > >>>>> So in reusing components it explicitly is not a writer. This is
> > not a
> > > >> savepoint output format in the way we have a parquet output format.
> > The
> > > >> reason for the Transform api is to hide the underlying details, it
> > does
> > > not
> > > >> simply append a output writer to the end of a dataset. This gets
> into
> > > the
> > > >> implementation details but at a high level, the dataset is:
> > > >>>>>
> > > >>>>> 1) partitioned using key groups
> > > >>>>> 2) data is run through a standard stream operator that takes a
> > > >> snapshot of its state after processing all records and outputs
> > metadata
> > > >> handles for each subtask
> > > >>>>> 3) those metadata handles are aggregated down to a single
> savepoint
> > > >> handle
> > > >>>>> 4) that handle is written out as a final metadata file
> > > >>>>>
> > > >>>>> What’s important here is that the api actually depends on the
> data
> > > >> flow collection and state is written out as a side effect of taking
> a
> > > >> savepoint. The FLIP describes a lose coupling to the dataset api for
> > > >> eventual migration to BoundedStream, that is true. However, the api
> > does
> > > >> require knowing what concrete data flow is being used to perform
> these
> > > >> re-partitionings  and post aggregations.
> > > >>>>>
> > > >>>>> I’m linking to my prototype implementation, particularly what
> > > actually
> > > >> happens when you call write and run the transformations. Let me know
> > if
> > > >> that helps clarify.
> > > >>>>>
> > > >>>>> Seth
> > > >>>>>
> > > >>>>>
> > > >>
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz>
> > wrote:
> > > >>>>>>
> > > >>>>>> Hi Seth,
> > > >>>>>>
> > > >>>>>> that sounds reasonable. What I was asking for was not to reverse
> > > >> engineer binary format, but to make the savepoint write API a little
> > > more
> > > >> reusable, so that it could be wrapped into some other technology. I
> > > don't
> > > >> know the details enough to propose a solution, but it seems to me,
> > that
> > > it
> > > >> could be possible to use something like Writer instead of Transform.
> > Or
> > > >> maybe the Transform can use the Writer internally, the goal is just
> to
> > > >> enable to create the savepoint from "'outside" of Flink (with some
> > > library,
> > > >> of course).
> > > >>>>>>
> > > >>>>>> Jan
> > > >>>>>>
> > > >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
> > > >>>>>>> @Konstantin agreed, that was a large impotence for this
> feature.
> > > >> Also I am happy to change the name to something that better
> describes
> > > the
> > > >> feature set.
> > > >>>>>>>
> > > >>>>>>> @Lan
> > > >>>>>>>
> > > >>>>>>> Savepoints depend heavily on a number of flink internal
> > components
> > > >> that may change between versions: state backends internals, type
> > > >> serializers, the specific hash function used to turn a UID into an
> > > >> OperatorID, etc. I consider it a feature of this proposal that the
> > > library
> > > >> depends on those internal components instead of reverse engineering
> > the
> > > >> binary format. This way as those internals change, or new state
> > features
> > > >> are added (think the recent addition of TTL) we will get support
> > > >> automatically. I do not believe anything else is maintainable.
> > > >>>>>>>
> > > >>>>>>> Seth
> > > >>>>>>>
> > > >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz>
> > > wrote:
> > > >>>>>>>>
> > > >>>>>>>> Hi,
> > > >>>>>>>>
> > > >>>>>>>> this is awesome, and really useful feature. If I might ask for
> > one
> > > >> thing to consider - would it be possible to make the Savepoint
> > > manipulation
> > > >> API (at least writing the Savepoint) less dependent on other parts
> of
> > > Flink
> > > >> internals (e.g. |KeyedStateBootstrapFunction|) and provide something
> > > more
> > > >> general (e.g. some generic Writer)? Why I'm asking for that - I can
> > > totally
> > > >> imagine situation, where users might want to create bootstrapped
> state
> > > by
> > > >> some other runner (e.g. Apache Spark), and then run Apache Flink
> after
> > > the
> > > >> state has been created. This makes even more sense in context of
> > Apache
> > > >> Beam, which provides all the necessary work to make this happen. The
> > > >> question is - would it be possible to design this feature so that
> > > writing
> > > >> the savepoint from different runner would be possible?
> > > >>>>>>>>
> > > >>>>>>>> Cheers,
> > > >>>>>>>>
> > > >>>>>>>> Jan
> > > >>>>>>>>
> > > >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
> > > >>>>>>>>> Hey Everyone!
> > > >>>>>>>>>
> > > >>>>>>>>> Gordon and I have been discussing adding a savepoint
> connector
> > to
> > > >> flink for reading, writing and modifying savepoints.
> > > >>>>>>>>>
> > > >>>>>>>>> This is useful for:
> > > >>>>>>>>>
> > > >>>>>>>>>  Analyzing state for interesting patterns
> > > >>>>>>>>>  Troubleshooting or auditing jobs by checking for
> discrepancies
> > > >> in state
> > > >>>>>>>>>  Bootstrapping state for new applications
> > > >>>>>>>>>  Modifying savepoints such as:
> > > >>>>>>>>>      Changing max parallelism
> > > >>>>>>>>>      Making breaking schema changes
> > > >>>>>>>>>      Correcting invalid state
> > > >>>>>>>>>
> > > >>>>>>>>> We are looking forward to your feedback!
> > > >>>>>>>>>
> > > >>>>>>>>> This is the FLIP:
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
> > > >>>>>>>>>
> > > >>>>>>>>> Seth
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>
> > >
> >
>

Reply via email to