Hi Jark, Thanks for the reminder. I've updated the FLIP name in confluence to match the new name "State Processor API".
Concerning an API for changing max parallelism: That is actually in the works and has been considered, and would look something like - ``` ExistingSavepoint savepoint = Savepoint.load(oldPath); savepoint.modifyMaxParallelism(newParallelism, newPath); ``` For the contribution, we've currently reached the write-part of the State processing API [1]. After that is settled, we can then start thinking about adding this max parallelism change feature as a follow-up. Cheers, Gordon [1] https://github.com/apache/flink/pull/8861 On Tue, Jun 25, 2019 at 2:27 PM Jark Wu <imj...@gmail.com> wrote: > Thanks for the awesome FLIP. > > I think it will be very useful in state migration scenario. We are also > looking for a state reuse solution for SQL jobs. And I think this feature > will help a lot. > Looking forward to have it in the near future. > > Regarding to the naming, I'm +1 to "State Processing API". Should we also > update the FLIP name in confluence? > > Btw, what do you think to add a shortcut API for changing max parallelism > for savepoints? This is a very common scenario. > But from my understanding, it needs to do a lot of trivial thing to achieve > it under current API. > > Best, > Jark > > > On Wed, 5 Jun 2019 at 10:52, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > > > On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang <xiaow...@gmail.com> wrote: > > > > > Hi Gordon & Seth, this looks like a very useful feature for analyze > and > > > manage states. > > > I agree that using DataSet is probably the most practical choice right > > > now. But in the longer adding the TableAPI support for this will be > nice. > > > > > > > Agreed. Migrating this API in the future to the TableAPI is definitely > > something we have considered. > > > > > > > When analyzing the savepoint, I assume that the state backend restores > > the > > > state first? This approach is generic and works for any state backend. > > > > > > Yes, that is correct. The process of reading state in snapshots is > > currently: > > 1) the metadata file is read when creating the input splits for the > > InputFormat. Each input split is assigned operator state and key-group > > state handles. > > 2) For each input split, a state backend is launched and is restored with > > all state of the assigned state handles. Only partially some state is > > transformed into DataSets (using the savepoint.read*State(...) methods). > > > > > > > However, sometimes it may be more efficient to directly analyzing the > > > files on DFS without copying. We can probably add interface to allow > > state > > > backend optimize such behavior in the future. > > > > > > That sounds like an interesting direction, though at the moment it may > only > > make sense for full savepoints / checkpoints. > > One blocker for enabling this, is having the type information of state > > available in the snapshot metadata file so that schema / type of state is > > known before actually reading state. > > Making state schema / type information available in the metadata file is > > already a recurring discussion in this thread that would be useful for > not > > only this feature you mentioned, but also for features like SQL > integration > > in the future. > > Therefore, this seems to be a reasonable next step when extending on top > of > > the initial scope of the API proposed in the FLIP. > > > > > > > Also a quick question on the example in wiki: DataSet<MyPojo> > keyedState > > = > > > operator.readKeyedState("uid", new ReaderFunction());Should > > > operator.readKeyedState be replaced with savepoint.readKeyedState > here? > > > > > > > Correct, this is indeed a typo. I've corrected this in the FLIP. > > > > Cheers, > > Gordon > > > > > > > > > > Regards,Xiaowei > > > > > > On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek < > > > aljos...@apache.org> wrote: > > > > > > +1 I think is is a very valuable new additional and we should try and > > not > > > get stuck on trying to design the perfect solution for everything > > > > > > > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > > > wrote: > > > > > > > > +1 to renaming it as State Processing API and adding it under the > > > > flink-libraries module. > > > > > > > > I also think we can start with the development of the feature. From > the > > > > feedback so far, it seems like we're in a good spot to add in at > least > > > the > > > > initial version of this API, hopefully making it ready for 1.9.0. > > > > > > > > Cheers, > > > > Gordon > > > > > > > > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sjwies...@gmail.com> > > wrote: > > > > > > > >> It seems like a recurring piece of feedback was a different name. > I’d > > > like > > > >> to propose moving the functionality to the libraries module and > naming > > > this > > > >> the State Processing API. > > > >> > > > >> Seth > > > >> > > > >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com> > > wrote: > > > >>> > > > >>> The SavepointOutputFormat only writes out the savepoint metadata > file > > > >> and should be mostly ignored. > > > >>> > > > >>> The actual state is written out by stream operators and tied into > the > > > >> flink runtime[1, 2, 3]. > > > >>> > > > >>> This is the most important part and the piece that I don’t think > can > > be > > > >> reasonably extracted. > > > >>> > > > >>> Seth > > > >>> > > > >>> [1] > > > >> > > > > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84 > > > >>> > > > >>> [2] > > > >> > > > > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java > > > >>> > > > >>> [3] > > > >> > > > > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java > > > >>> > > > >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> > wrote: > > > >>>> > > > >>>> Hi Seth, > > > >>>> > > > >>>> yes, that helped! :-) > > > >>>> > > > >>>> What I was looking for is essentially > > > >> > `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. > > It > > > >> would be great if this could be written in a way, that would enable > > > reusing > > > >> it in different engine (as I mentioned - Apache Spark). There seem > to > > be > > > >> some issues though. It uses interface Savepoint, which uses several > > > other > > > >> objects and interfaces from Flink's runtime. Maybe some convenience > > API > > > >> might help - Apache Beam, handles operator naming, so that > definitely > > > >> should be transitionable between systems, but I'm not sure, how to > > > >> construct OperatorID from this name. Would you think, that it is > > > possible > > > >> to come up with something that could be used in this portable way? > > > >>>> > > > >>>> I understand, there are some more conditions, that need to be > > > satisfied > > > >> (grouping, aggregating, ...), which would of course have to be > handled > > > by > > > >> the target system. But Apache Beam can help leverage that. My idea > > would > > > >> be, that there can be runner specified PTransform, that takes > > > PCollection > > > >> of some tuples of `(operator name, key, state name, value1), > (operator > > > >> name, key, state name, value2)`, and Runner's responsibility would > be > > to > > > >> group/aggregate this so that it can be written by runner's provided > > > writer > > > >> (output format). > > > >>>> > > > >>>> All of this would need a lot more design, these are just ideas of > > > "what > > > >> could be possible", I was just wondering if this FLIP can make some > > > first > > > >> steps towards this. > > > >>>> > > > >>>> Many thanks for comments, > > > >>>> > > > >>>> Jan > > > >>>> > > > >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote: > > > >>>>> @Jan Gotcha, > > > >>>>> > > > >>>>> So in reusing components it explicitly is not a writer. This is > > not a > > > >> savepoint output format in the way we have a parquet output format. > > The > > > >> reason for the Transform api is to hide the underlying details, it > > does > > > not > > > >> simply append a output writer to the end of a dataset. This gets > into > > > the > > > >> implementation details but at a high level, the dataset is: > > > >>>>> > > > >>>>> 1) partitioned using key groups > > > >>>>> 2) data is run through a standard stream operator that takes a > > > >> snapshot of its state after processing all records and outputs > > metadata > > > >> handles for each subtask > > > >>>>> 3) those metadata handles are aggregated down to a single > savepoint > > > >> handle > > > >>>>> 4) that handle is written out as a final metadata file > > > >>>>> > > > >>>>> What’s important here is that the api actually depends on the > data > > > >> flow collection and state is written out as a side effect of taking > a > > > >> savepoint. The FLIP describes a lose coupling to the dataset api for > > > >> eventual migration to BoundedStream, that is true. However, the api > > does > > > >> require knowing what concrete data flow is being used to perform > these > > > >> re-partitionings and post aggregations. > > > >>>>> > > > >>>>> I’m linking to my prototype implementation, particularly what > > > actually > > > >> happens when you call write and run the transformations. Let me know > > if > > > >> that helps clarify. > > > >>>>> > > > >>>>> Seth > > > >>>>> > > > >>>>> > > > >> > > > > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> > > wrote: > > > >>>>>> > > > >>>>>> Hi Seth, > > > >>>>>> > > > >>>>>> that sounds reasonable. What I was asking for was not to reverse > > > >> engineer binary format, but to make the savepoint write API a little > > > more > > > >> reusable, so that it could be wrapped into some other technology. I > > > don't > > > >> know the details enough to propose a solution, but it seems to me, > > that > > > it > > > >> could be possible to use something like Writer instead of Transform. > > Or > > > >> maybe the Transform can use the Writer internally, the goal is just > to > > > >> enable to create the savepoint from "'outside" of Flink (with some > > > library, > > > >> of course). > > > >>>>>> > > > >>>>>> Jan > > > >>>>>> > > > >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote: > > > >>>>>>> @Konstantin agreed, that was a large impotence for this > feature. > > > >> Also I am happy to change the name to something that better > describes > > > the > > > >> feature set. > > > >>>>>>> > > > >>>>>>> @Lan > > > >>>>>>> > > > >>>>>>> Savepoints depend heavily on a number of flink internal > > components > > > >> that may change between versions: state backends internals, type > > > >> serializers, the specific hash function used to turn a UID into an > > > >> OperatorID, etc. I consider it a feature of this proposal that the > > > library > > > >> depends on those internal components instead of reverse engineering > > the > > > >> binary format. This way as those internals change, or new state > > features > > > >> are added (think the recent addition of TTL) we will get support > > > >> automatically. I do not believe anything else is maintainable. > > > >>>>>>> > > > >>>>>>> Seth > > > >>>>>>> > > > >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> > > > wrote: > > > >>>>>>>> > > > >>>>>>>> Hi, > > > >>>>>>>> > > > >>>>>>>> this is awesome, and really useful feature. If I might ask for > > one > > > >> thing to consider - would it be possible to make the Savepoint > > > manipulation > > > >> API (at least writing the Savepoint) less dependent on other parts > of > > > Flink > > > >> internals (e.g. |KeyedStateBootstrapFunction|) and provide something > > > more > > > >> general (e.g. some generic Writer)? Why I'm asking for that - I can > > > totally > > > >> imagine situation, where users might want to create bootstrapped > state > > > by > > > >> some other runner (e.g. Apache Spark), and then run Apache Flink > after > > > the > > > >> state has been created. This makes even more sense in context of > > Apache > > > >> Beam, which provides all the necessary work to make this happen. The > > > >> question is - would it be possible to design this feature so that > > > writing > > > >> the savepoint from different runner would be possible? > > > >>>>>>>> > > > >>>>>>>> Cheers, > > > >>>>>>>> > > > >>>>>>>> Jan > > > >>>>>>>> > > > >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: > > > >>>>>>>>> Hey Everyone! > > > >>>>>>>>> > > > >>>>>>>>> Gordon and I have been discussing adding a savepoint > connector > > to > > > >> flink for reading, writing and modifying savepoints. > > > >>>>>>>>> > > > >>>>>>>>> This is useful for: > > > >>>>>>>>> > > > >>>>>>>>> Analyzing state for interesting patterns > > > >>>>>>>>> Troubleshooting or auditing jobs by checking for > discrepancies > > > >> in state > > > >>>>>>>>> Bootstrapping state for new applications > > > >>>>>>>>> Modifying savepoints such as: > > > >>>>>>>>> Changing max parallelism > > > >>>>>>>>> Making breaking schema changes > > > >>>>>>>>> Correcting invalid state > > > >>>>>>>>> > > > >>>>>>>>> We are looking forward to your feedback! > > > >>>>>>>>> > > > >>>>>>>>> This is the FLIP: > > > >>>>>>>>> > > > >>>>>>>>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector > > > >>>>>>>>> > > > >>>>>>>>> Seth > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >> > > > > > >