Hi Gordon,

The modify max parallelism API looks good to me. Thank you and Seth for the
great work on it.

Cheers,
Jark

On Tue, 25 Jun 2019 at 16:01, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Jark,
>
> Thanks for the reminder. I've updated the FLIP name in confluence to match
> the new name "State Processor API".
>
> Concerning an API for changing max parallelism:
> That is actually in the works and has been considered, and would look
> something like -
> ```
> ExistingSavepoint savepoint = Savepoint.load(oldPath);
> savepoint.modifyMaxParallelism(newParallelism, newPath);
> ```
>
> For the contribution, we've currently reached the write-part of the State
> processing API [1].
> After that is settled, we can then start thinking about adding this max
> parallelism change feature as a follow-up.
>
> Cheers,
> Gordon
>
> [1] https://github.com/apache/flink/pull/8861
>
> On Tue, Jun 25, 2019 at 2:27 PM Jark Wu <imj...@gmail.com> wrote:
>
> > Thanks for the awesome FLIP.
> >
> > I think it will be very useful in state migration scenario. We are also
> > looking for a state reuse solution for SQL jobs. And I think this feature
> > will help a lot.
> > Looking forward to have it in the near future.
> >
> > Regarding to the naming, I'm +1 to "State Processing API". Should we also
> > update the FLIP name in confluence?
> >
> > Btw, what do you think to add a shortcut API for changing max parallelism
> > for savepoints? This is a very common scenario.
> > But from my understanding, it needs to do a lot of trivial thing to
> achieve
> > it under current API.
> >
> > Best,
> > Jark
> >
> >
> > On Wed, 5 Jun 2019 at 10:52, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> > wrote:
> >
> > > On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang <xiaow...@gmail.com>
> wrote:
> > >
> > > >  Hi Gordon & Seth, this looks like a very useful feature for analyze
> > and
> > > > manage states.
> > > > I agree that using DataSet is probably the most practical choice
> right
> > > > now. But in the longer adding the TableAPI support for this will be
> > nice.
> > > >
> > >
> > > Agreed. Migrating this API in the future to the TableAPI is definitely
> > > something we have considered.
> > >
> > >
> > > > When analyzing the savepoint, I assume that the state backend
> restores
> > > the
> > > > state first? This approach is generic and works for any state
> backend.
> > >
> > >
> > > Yes, that is correct. The process of reading state in snapshots is
> > > currently:
> > > 1) the metadata file is read when creating the input splits for the
> > > InputFormat. Each input split is assigned operator state and key-group
> > > state handles.
> > > 2) For each input split, a state backend is launched and is restored
> with
> > > all state of the assigned state handles. Only partially some state is
> > > transformed into DataSets (using the savepoint.read*State(...)
> methods).
> > >
> > >
> > > > However, sometimes it may be more efficient to directly analyzing the
> > > > files on DFS without copying. We can probably add interface to allow
> > > state
> > > > backend optimize such behavior in the future.
> > >
> > >
> > > That sounds like an interesting direction, though at the moment it may
> > only
> > > make sense for full savepoints / checkpoints.
> > > One blocker for enabling this, is having the type information of state
> > > available in the snapshot metadata file so that schema / type of state
> is
> > > known before actually reading state.
> > > Making state schema / type information available in the metadata file
> is
> > > already a recurring discussion in this thread that would be useful for
> > not
> > > only this feature you mentioned, but also for features like SQL
> > integration
> > > in the future.
> > > Therefore, this seems to be a reasonable next step when extending on
> top
> > of
> > > the initial scope of the API proposed in the FLIP.
> > >
> > >
> > > > Also a quick question on the example in wiki: DataSet<MyPojo>
> > keyedState
> > > =
> > > > operator.readKeyedState("uid", new ReaderFunction());Should
> > > > operator.readKeyedState  be replaced with savepoint.readKeyedState
> > here?
> > > >
> > >
> > > Correct, this is indeed a typo. I've corrected this in the FLIP.
> > >
> > > Cheers,
> > > Gordon
> > >
> > >
> > > >
> > > > Regards,Xiaowei
> > > >
> > > >     On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek <
> > > > aljos...@apache.org> wrote:
> > > >
> > > >  +1 I think is is a very valuable new additional and we should try
> and
> > > not
> > > > get stuck on trying to design the perfect solution for everything
> > > >
> > > > > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tzuli...@apache.org
> >
> > > > wrote:
> > > > >
> > > > > +1 to renaming it as State Processing API and adding it under the
> > > > > flink-libraries module.
> > > > >
> > > > > I also think we can start with the development of the feature. From
> > the
> > > > > feedback so far, it seems like we're in a good spot to add in at
> > least
> > > > the
> > > > > initial version of this API, hopefully making it ready for 1.9.0.
> > > > >
> > > > > Cheers,
> > > > > Gordon
> > > > >
> > > > > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sjwies...@gmail.com>
> > > wrote:
> > > > >
> > > > >> It seems like a recurring piece of feedback was a different name.
> > I’d
> > > > like
> > > > >> to propose moving the functionality to the libraries module and
> > naming
> > > > this
> > > > >> the State Processing API.
> > > > >>
> > > > >> Seth
> > > > >>
> > > > >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com>
> > > wrote:
> > > > >>>
> > > > >>> The SavepointOutputFormat only writes out the savepoint metadata
> > file
> > > > >> and should be mostly ignored.
> > > > >>>
> > > > >>> The actual state is written out by stream operators and tied into
> > the
> > > > >> flink runtime[1, 2, 3].
> > > > >>>
> > > > >>> This is the most important part and the piece that I don’t think
> > can
> > > be
> > > > >> reasonably extracted.
> > > > >>>
> > > > >>> Seth
> > > > >>>
> > > > >>> [1]
> > > > >>
> > > >
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> > > > >>>
> > > > >>> [2]
> > > > >>
> > > >
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> > > > >>>
> > > > >>> [3]
> > > > >>
> > > >
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> > > > >>>
> > > > >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz>
> > wrote:
> > > > >>>>
> > > > >>>> Hi Seth,
> > > > >>>>
> > > > >>>> yes, that helped! :-)
> > > > >>>>
> > > > >>>> What I was looking for is essentially
> > > > >>
> > `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`.
> > > It
> > > > >> would be great if this could be written in a way, that would
> enable
> > > > reusing
> > > > >> it in different engine (as I mentioned - Apache Spark). There seem
> > to
> > > be
> > > > >> some issues though. It uses interface Savepoint, which uses
> several
> > > > other
> > > > >> objects and interfaces from Flink's runtime. Maybe some
> convenience
> > > API
> > > > >> might help - Apache Beam, handles operator naming, so that
> > definitely
> > > > >> should be transitionable between systems, but I'm not sure, how to
> > > > >> construct OperatorID from this name. Would you think, that it is
> > > > possible
> > > > >> to come up with something that could be used in this portable way?
> > > > >>>>
> > > > >>>> I understand, there are some more conditions, that need to be
> > > > satisfied
> > > > >> (grouping, aggregating, ...), which would of course have to be
> > handled
> > > > by
> > > > >> the target system. But Apache Beam can help leverage that. My idea
> > > would
> > > > >> be, that there can be runner specified PTransform, that takes
> > > > PCollection
> > > > >> of some tuples of `(operator name, key, state name, value1),
> > (operator
> > > > >> name, key, state name, value2)`, and Runner's responsibility would
> > be
> > > to
> > > > >> group/aggregate this so that it can be written by runner's
> provided
> > > > writer
> > > > >> (output format).
> > > > >>>>
> > > > >>>> All of this would need a lot more design, these are just ideas
> of
> > > > "what
> > > > >> could be possible", I was just wondering if this FLIP can make
> some
> > > > first
> > > > >> steps towards this.
> > > > >>>>
> > > > >>>> Many thanks for comments,
> > > > >>>>
> > > > >>>> Jan
> > > > >>>>
> > > > >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> > > > >>>>> @Jan Gotcha,
> > > > >>>>>
> > > > >>>>> So in reusing components it explicitly is not a writer. This is
> > > not a
> > > > >> savepoint output format in the way we have a parquet output
> format.
> > > The
> > > > >> reason for the Transform api is to hide the underlying details, it
> > > does
> > > > not
> > > > >> simply append a output writer to the end of a dataset. This gets
> > into
> > > > the
> > > > >> implementation details but at a high level, the dataset is:
> > > > >>>>>
> > > > >>>>> 1) partitioned using key groups
> > > > >>>>> 2) data is run through a standard stream operator that takes a
> > > > >> snapshot of its state after processing all records and outputs
> > > metadata
> > > > >> handles for each subtask
> > > > >>>>> 3) those metadata handles are aggregated down to a single
> > savepoint
> > > > >> handle
> > > > >>>>> 4) that handle is written out as a final metadata file
> > > > >>>>>
> > > > >>>>> What’s important here is that the api actually depends on the
> > data
> > > > >> flow collection and state is written out as a side effect of
> taking
> > a
> > > > >> savepoint. The FLIP describes a lose coupling to the dataset api
> for
> > > > >> eventual migration to BoundedStream, that is true. However, the
> api
> > > does
> > > > >> require knowing what concrete data flow is being used to perform
> > these
> > > > >> re-partitionings  and post aggregations.
> > > > >>>>>
> > > > >>>>> I’m linking to my prototype implementation, particularly what
> > > > actually
> > > > >> happens when you call write and run the transformations. Let me
> know
> > > if
> > > > >> that helps clarify.
> > > > >>>>>
> > > > >>>>> Seth
> > > > >>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz>
> > > wrote:
> > > > >>>>>>
> > > > >>>>>> Hi Seth,
> > > > >>>>>>
> > > > >>>>>> that sounds reasonable. What I was asking for was not to
> reverse
> > > > >> engineer binary format, but to make the savepoint write API a
> little
> > > > more
> > > > >> reusable, so that it could be wrapped into some other technology.
> I
> > > > don't
> > > > >> know the details enough to propose a solution, but it seems to me,
> > > that
> > > > it
> > > > >> could be possible to use something like Writer instead of
> Transform.
> > > Or
> > > > >> maybe the Transform can use the Writer internally, the goal is
> just
> > to
> > > > >> enable to create the savepoint from "'outside" of Flink (with some
> > > > library,
> > > > >> of course).
> > > > >>>>>>
> > > > >>>>>> Jan
> > > > >>>>>>
> > > > >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
> > > > >>>>>>> @Konstantin agreed, that was a large impotence for this
> > feature.
> > > > >> Also I am happy to change the name to something that better
> > describes
> > > > the
> > > > >> feature set.
> > > > >>>>>>>
> > > > >>>>>>> @Lan
> > > > >>>>>>>
> > > > >>>>>>> Savepoints depend heavily on a number of flink internal
> > > components
> > > > >> that may change between versions: state backends internals, type
> > > > >> serializers, the specific hash function used to turn a UID into an
> > > > >> OperatorID, etc. I consider it a feature of this proposal that the
> > > > library
> > > > >> depends on those internal components instead of reverse
> engineering
> > > the
> > > > >> binary format. This way as those internals change, or new state
> > > features
> > > > >> are added (think the recent addition of TTL) we will get support
> > > > >> automatically. I do not believe anything else is maintainable.
> > > > >>>>>>>
> > > > >>>>>>> Seth
> > > > >>>>>>>
> > > > >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz>
> > > > wrote:
> > > > >>>>>>>>
> > > > >>>>>>>> Hi,
> > > > >>>>>>>>
> > > > >>>>>>>> this is awesome, and really useful feature. If I might ask
> for
> > > one
> > > > >> thing to consider - would it be possible to make the Savepoint
> > > > manipulation
> > > > >> API (at least writing the Savepoint) less dependent on other parts
> > of
> > > > Flink
> > > > >> internals (e.g. |KeyedStateBootstrapFunction|) and provide
> something
> > > > more
> > > > >> general (e.g. some generic Writer)? Why I'm asking for that - I
> can
> > > > totally
> > > > >> imagine situation, where users might want to create bootstrapped
> > state
> > > > by
> > > > >> some other runner (e.g. Apache Spark), and then run Apache Flink
> > after
> > > > the
> > > > >> state has been created. This makes even more sense in context of
> > > Apache
> > > > >> Beam, which provides all the necessary work to make this happen.
> The
> > > > >> question is - would it be possible to design this feature so that
> > > > writing
> > > > >> the savepoint from different runner would be possible?
> > > > >>>>>>>>
> > > > >>>>>>>> Cheers,
> > > > >>>>>>>>
> > > > >>>>>>>> Jan
> > > > >>>>>>>>
> > > > >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
> > > > >>>>>>>>> Hey Everyone!
> > > > >>>>>>>>>
> > > > >>>>>>>>> Gordon and I have been discussing adding a savepoint
> > connector
> > > to
> > > > >> flink for reading, writing and modifying savepoints.
> > > > >>>>>>>>>
> > > > >>>>>>>>> This is useful for:
> > > > >>>>>>>>>
> > > > >>>>>>>>>  Analyzing state for interesting patterns
> > > > >>>>>>>>>  Troubleshooting or auditing jobs by checking for
> > discrepancies
> > > > >> in state
> > > > >>>>>>>>>  Bootstrapping state for new applications
> > > > >>>>>>>>>  Modifying savepoints such as:
> > > > >>>>>>>>>      Changing max parallelism
> > > > >>>>>>>>>      Making breaking schema changes
> > > > >>>>>>>>>      Correcting invalid state
> > > > >>>>>>>>>
> > > > >>>>>>>>> We are looking forward to your feedback!
> > > > >>>>>>>>>
> > > > >>>>>>>>> This is the FLIP:
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
> > > > >>>>>>>>>
> > > > >>>>>>>>> Seth
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>
> > > >
> > >
> >
>

Reply via email to