Thanks for the awesome FLIP. I think it will be very useful in state migration scenario. We are also looking for a state reuse solution for SQL jobs. And I think this feature will help a lot. Looking forward to have it in the near future.
Regarding to the naming, I'm +1 to "State Processing API". Should we also update the FLIP name in confluence? Btw, what do you think to add a shortcut API for changing max parallelism for savepoints? This is a very common scenario. But from my understanding, it needs to do a lot of trivial thing to achieve it under current API. Best, Jark On Wed, 5 Jun 2019 at 10:52, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang <xiaow...@gmail.com> wrote: > > > Hi Gordon & Seth, this looks like a very useful feature for analyze and > > manage states. > > I agree that using DataSet is probably the most practical choice right > > now. But in the longer adding the TableAPI support for this will be nice. > > > > Agreed. Migrating this API in the future to the TableAPI is definitely > something we have considered. > > > > When analyzing the savepoint, I assume that the state backend restores > the > > state first? This approach is generic and works for any state backend. > > > Yes, that is correct. The process of reading state in snapshots is > currently: > 1) the metadata file is read when creating the input splits for the > InputFormat. Each input split is assigned operator state and key-group > state handles. > 2) For each input split, a state backend is launched and is restored with > all state of the assigned state handles. Only partially some state is > transformed into DataSets (using the savepoint.read*State(...) methods). > > > > However, sometimes it may be more efficient to directly analyzing the > > files on DFS without copying. We can probably add interface to allow > state > > backend optimize such behavior in the future. > > > That sounds like an interesting direction, though at the moment it may only > make sense for full savepoints / checkpoints. > One blocker for enabling this, is having the type information of state > available in the snapshot metadata file so that schema / type of state is > known before actually reading state. > Making state schema / type information available in the metadata file is > already a recurring discussion in this thread that would be useful for not > only this feature you mentioned, but also for features like SQL integration > in the future. > Therefore, this seems to be a reasonable next step when extending on top of > the initial scope of the API proposed in the FLIP. > > > > Also a quick question on the example in wiki: DataSet<MyPojo> keyedState > = > > operator.readKeyedState("uid", new ReaderFunction());Should > > operator.readKeyedState be replaced with savepoint.readKeyedState here? > > > > Correct, this is indeed a typo. I've corrected this in the FLIP. > > Cheers, > Gordon > > > > > > Regards,Xiaowei > > > > On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek < > > aljos...@apache.org> wrote: > > > > +1 I think is is a very valuable new additional and we should try and > not > > get stuck on trying to design the perfect solution for everything > > > > > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > > wrote: > > > > > > +1 to renaming it as State Processing API and adding it under the > > > flink-libraries module. > > > > > > I also think we can start with the development of the feature. From the > > > feedback so far, it seems like we're in a good spot to add in at least > > the > > > initial version of this API, hopefully making it ready for 1.9.0. > > > > > > Cheers, > > > Gordon > > > > > > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sjwies...@gmail.com> > wrote: > > > > > >> It seems like a recurring piece of feedback was a different name. I’d > > like > > >> to propose moving the functionality to the libraries module and naming > > this > > >> the State Processing API. > > >> > > >> Seth > > >> > > >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com> > wrote: > > >>> > > >>> The SavepointOutputFormat only writes out the savepoint metadata file > > >> and should be mostly ignored. > > >>> > > >>> The actual state is written out by stream operators and tied into the > > >> flink runtime[1, 2, 3]. > > >>> > > >>> This is the most important part and the piece that I don’t think can > be > > >> reasonably extracted. > > >>> > > >>> Seth > > >>> > > >>> [1] > > >> > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84 > > >>> > > >>> [2] > > >> > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java > > >>> > > >>> [3] > > >> > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java > > >>> > > >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote: > > >>>> > > >>>> Hi Seth, > > >>>> > > >>>> yes, that helped! :-) > > >>>> > > >>>> What I was looking for is essentially > > >> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. > It > > >> would be great if this could be written in a way, that would enable > > reusing > > >> it in different engine (as I mentioned - Apache Spark). There seem to > be > > >> some issues though. It uses interface Savepoint, which uses several > > other > > >> objects and interfaces from Flink's runtime. Maybe some convenience > API > > >> might help - Apache Beam, handles operator naming, so that definitely > > >> should be transitionable between systems, but I'm not sure, how to > > >> construct OperatorID from this name. Would you think, that it is > > possible > > >> to come up with something that could be used in this portable way? > > >>>> > > >>>> I understand, there are some more conditions, that need to be > > satisfied > > >> (grouping, aggregating, ...), which would of course have to be handled > > by > > >> the target system. But Apache Beam can help leverage that. My idea > would > > >> be, that there can be runner specified PTransform, that takes > > PCollection > > >> of some tuples of `(operator name, key, state name, value1), (operator > > >> name, key, state name, value2)`, and Runner's responsibility would be > to > > >> group/aggregate this so that it can be written by runner's provided > > writer > > >> (output format). > > >>>> > > >>>> All of this would need a lot more design, these are just ideas of > > "what > > >> could be possible", I was just wondering if this FLIP can make some > > first > > >> steps towards this. > > >>>> > > >>>> Many thanks for comments, > > >>>> > > >>>> Jan > > >>>> > > >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote: > > >>>>> @Jan Gotcha, > > >>>>> > > >>>>> So in reusing components it explicitly is not a writer. This is > not a > > >> savepoint output format in the way we have a parquet output format. > The > > >> reason for the Transform api is to hide the underlying details, it > does > > not > > >> simply append a output writer to the end of a dataset. This gets into > > the > > >> implementation details but at a high level, the dataset is: > > >>>>> > > >>>>> 1) partitioned using key groups > > >>>>> 2) data is run through a standard stream operator that takes a > > >> snapshot of its state after processing all records and outputs > metadata > > >> handles for each subtask > > >>>>> 3) those metadata handles are aggregated down to a single savepoint > > >> handle > > >>>>> 4) that handle is written out as a final metadata file > > >>>>> > > >>>>> What’s important here is that the api actually depends on the data > > >> flow collection and state is written out as a side effect of taking a > > >> savepoint. The FLIP describes a lose coupling to the dataset api for > > >> eventual migration to BoundedStream, that is true. However, the api > does > > >> require knowing what concrete data flow is being used to perform these > > >> re-partitionings and post aggregations. > > >>>>> > > >>>>> I’m linking to my prototype implementation, particularly what > > actually > > >> happens when you call write and run the transformations. Let me know > if > > >> that helps clarify. > > >>>>> > > >>>>> Seth > > >>>>> > > >>>>> > > >> > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 > > >>>>> > > >>>>> > > >>>>> > > >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> > wrote: > > >>>>>> > > >>>>>> Hi Seth, > > >>>>>> > > >>>>>> that sounds reasonable. What I was asking for was not to reverse > > >> engineer binary format, but to make the savepoint write API a little > > more > > >> reusable, so that it could be wrapped into some other technology. I > > don't > > >> know the details enough to propose a solution, but it seems to me, > that > > it > > >> could be possible to use something like Writer instead of Transform. > Or > > >> maybe the Transform can use the Writer internally, the goal is just to > > >> enable to create the savepoint from "'outside" of Flink (with some > > library, > > >> of course). > > >>>>>> > > >>>>>> Jan > > >>>>>> > > >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote: > > >>>>>>> @Konstantin agreed, that was a large impotence for this feature. > > >> Also I am happy to change the name to something that better describes > > the > > >> feature set. > > >>>>>>> > > >>>>>>> @Lan > > >>>>>>> > > >>>>>>> Savepoints depend heavily on a number of flink internal > components > > >> that may change between versions: state backends internals, type > > >> serializers, the specific hash function used to turn a UID into an > > >> OperatorID, etc. I consider it a feature of this proposal that the > > library > > >> depends on those internal components instead of reverse engineering > the > > >> binary format. This way as those internals change, or new state > features > > >> are added (think the recent addition of TTL) we will get support > > >> automatically. I do not believe anything else is maintainable. > > >>>>>>> > > >>>>>>> Seth > > >>>>>>> > > >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> > > wrote: > > >>>>>>>> > > >>>>>>>> Hi, > > >>>>>>>> > > >>>>>>>> this is awesome, and really useful feature. If I might ask for > one > > >> thing to consider - would it be possible to make the Savepoint > > manipulation > > >> API (at least writing the Savepoint) less dependent on other parts of > > Flink > > >> internals (e.g. |KeyedStateBootstrapFunction|) and provide something > > more > > >> general (e.g. some generic Writer)? Why I'm asking for that - I can > > totally > > >> imagine situation, where users might want to create bootstrapped state > > by > > >> some other runner (e.g. Apache Spark), and then run Apache Flink after > > the > > >> state has been created. This makes even more sense in context of > Apache > > >> Beam, which provides all the necessary work to make this happen. The > > >> question is - would it be possible to design this feature so that > > writing > > >> the savepoint from different runner would be possible? > > >>>>>>>> > > >>>>>>>> Cheers, > > >>>>>>>> > > >>>>>>>> Jan > > >>>>>>>> > > >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: > > >>>>>>>>> Hey Everyone! > > >>>>>>>>> > > >>>>>>>>> Gordon and I have been discussing adding a savepoint connector > to > > >> flink for reading, writing and modifying savepoints. > > >>>>>>>>> > > >>>>>>>>> This is useful for: > > >>>>>>>>> > > >>>>>>>>> Analyzing state for interesting patterns > > >>>>>>>>> Troubleshooting or auditing jobs by checking for discrepancies > > >> in state > > >>>>>>>>> Bootstrapping state for new applications > > >>>>>>>>> Modifying savepoints such as: > > >>>>>>>>> Changing max parallelism > > >>>>>>>>> Making breaking schema changes > > >>>>>>>>> Correcting invalid state > > >>>>>>>>> > > >>>>>>>>> We are looking forward to your feedback! > > >>>>>>>>> > > >>>>>>>>> This is the FLIP: > > >>>>>>>>> > > >>>>>>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector > > >>>>>>>>> > > >>>>>>>>> Seth > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >> > > >