Hi Gordon, The modify max parallelism API looks good to me. Thank you and Seth for the great work on it.
Cheers, Jark On Tue, 25 Jun 2019 at 16:01, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Jark, > > Thanks for the reminder. I've updated the FLIP name in confluence to match > the new name "State Processor API". > > Concerning an API for changing max parallelism: > That is actually in the works and has been considered, and would look > something like - > ``` > ExistingSavepoint savepoint = Savepoint.load(oldPath); > savepoint.modifyMaxParallelism(newParallelism, newPath); > ``` > > For the contribution, we've currently reached the write-part of the State > processing API [1]. > After that is settled, we can then start thinking about adding this max > parallelism change feature as a follow-up. > > Cheers, > Gordon > > [1] https://github.com/apache/flink/pull/8861 > > On Tue, Jun 25, 2019 at 2:27 PM Jark Wu <imj...@gmail.com> wrote: > > > Thanks for the awesome FLIP. > > > > I think it will be very useful in state migration scenario. We are also > > looking for a state reuse solution for SQL jobs. And I think this feature > > will help a lot. > > Looking forward to have it in the near future. > > > > Regarding to the naming, I'm +1 to "State Processing API". Should we also > > update the FLIP name in confluence? > > > > Btw, what do you think to add a shortcut API for changing max parallelism > > for savepoints? This is a very common scenario. > > But from my understanding, it needs to do a lot of trivial thing to > achieve > > it under current API. > > > > Best, > > Jark > > > > > > On Wed, 5 Jun 2019 at 10:52, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > > wrote: > > > > > On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang <xiaow...@gmail.com> > wrote: > > > > > > > Hi Gordon & Seth, this looks like a very useful feature for analyze > > and > > > > manage states. > > > > I agree that using DataSet is probably the most practical choice > right > > > > now. But in the longer adding the TableAPI support for this will be > > nice. > > > > > > > > > > Agreed. Migrating this API in the future to the TableAPI is definitely > > > something we have considered. > > > > > > > > > > When analyzing the savepoint, I assume that the state backend > restores > > > the > > > > state first? This approach is generic and works for any state > backend. > > > > > > > > > Yes, that is correct. The process of reading state in snapshots is > > > currently: > > > 1) the metadata file is read when creating the input splits for the > > > InputFormat. Each input split is assigned operator state and key-group > > > state handles. > > > 2) For each input split, a state backend is launched and is restored > with > > > all state of the assigned state handles. Only partially some state is > > > transformed into DataSets (using the savepoint.read*State(...) > methods). > > > > > > > > > > However, sometimes it may be more efficient to directly analyzing the > > > > files on DFS without copying. We can probably add interface to allow > > > state > > > > backend optimize such behavior in the future. > > > > > > > > > That sounds like an interesting direction, though at the moment it may > > only > > > make sense for full savepoints / checkpoints. > > > One blocker for enabling this, is having the type information of state > > > available in the snapshot metadata file so that schema / type of state > is > > > known before actually reading state. > > > Making state schema / type information available in the metadata file > is > > > already a recurring discussion in this thread that would be useful for > > not > > > only this feature you mentioned, but also for features like SQL > > integration > > > in the future. > > > Therefore, this seems to be a reasonable next step when extending on > top > > of > > > the initial scope of the API proposed in the FLIP. > > > > > > > > > > Also a quick question on the example in wiki: DataSet<MyPojo> > > keyedState > > > = > > > > operator.readKeyedState("uid", new ReaderFunction());Should > > > > operator.readKeyedState be replaced with savepoint.readKeyedState > > here? > > > > > > > > > > Correct, this is indeed a typo. I've corrected this in the FLIP. > > > > > > Cheers, > > > Gordon > > > > > > > > > > > > > > Regards,Xiaowei > > > > > > > > On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek < > > > > aljos...@apache.org> wrote: > > > > > > > > +1 I think is is a very valuable new additional and we should try > and > > > not > > > > get stuck on trying to design the perfect solution for everything > > > > > > > > > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tzuli...@apache.org > > > > > > wrote: > > > > > > > > > > +1 to renaming it as State Processing API and adding it under the > > > > > flink-libraries module. > > > > > > > > > > I also think we can start with the development of the feature. From > > the > > > > > feedback so far, it seems like we're in a good spot to add in at > > least > > > > the > > > > > initial version of this API, hopefully making it ready for 1.9.0. > > > > > > > > > > Cheers, > > > > > Gordon > > > > > > > > > > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sjwies...@gmail.com> > > > wrote: > > > > > > > > > >> It seems like a recurring piece of feedback was a different name. > > I’d > > > > like > > > > >> to propose moving the functionality to the libraries module and > > naming > > > > this > > > > >> the State Processing API. > > > > >> > > > > >> Seth > > > > >> > > > > >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com> > > > wrote: > > > > >>> > > > > >>> The SavepointOutputFormat only writes out the savepoint metadata > > file > > > > >> and should be mostly ignored. > > > > >>> > > > > >>> The actual state is written out by stream operators and tied into > > the > > > > >> flink runtime[1, 2, 3]. > > > > >>> > > > > >>> This is the most important part and the piece that I don’t think > > can > > > be > > > > >> reasonably extracted. > > > > >>> > > > > >>> Seth > > > > >>> > > > > >>> [1] > > > > >> > > > > > > > > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84 > > > > >>> > > > > >>> [2] > > > > >> > > > > > > > > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java > > > > >>> > > > > >>> [3] > > > > >> > > > > > > > > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java > > > > >>> > > > > >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> > > wrote: > > > > >>>> > > > > >>>> Hi Seth, > > > > >>>> > > > > >>>> yes, that helped! :-) > > > > >>>> > > > > >>>> What I was looking for is essentially > > > > >> > > `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. > > > It > > > > >> would be great if this could be written in a way, that would > enable > > > > reusing > > > > >> it in different engine (as I mentioned - Apache Spark). There seem > > to > > > be > > > > >> some issues though. It uses interface Savepoint, which uses > several > > > > other > > > > >> objects and interfaces from Flink's runtime. Maybe some > convenience > > > API > > > > >> might help - Apache Beam, handles operator naming, so that > > definitely > > > > >> should be transitionable between systems, but I'm not sure, how to > > > > >> construct OperatorID from this name. Would you think, that it is > > > > possible > > > > >> to come up with something that could be used in this portable way? > > > > >>>> > > > > >>>> I understand, there are some more conditions, that need to be > > > > satisfied > > > > >> (grouping, aggregating, ...), which would of course have to be > > handled > > > > by > > > > >> the target system. But Apache Beam can help leverage that. My idea > > > would > > > > >> be, that there can be runner specified PTransform, that takes > > > > PCollection > > > > >> of some tuples of `(operator name, key, state name, value1), > > (operator > > > > >> name, key, state name, value2)`, and Runner's responsibility would > > be > > > to > > > > >> group/aggregate this so that it can be written by runner's > provided > > > > writer > > > > >> (output format). > > > > >>>> > > > > >>>> All of this would need a lot more design, these are just ideas > of > > > > "what > > > > >> could be possible", I was just wondering if this FLIP can make > some > > > > first > > > > >> steps towards this. > > > > >>>> > > > > >>>> Many thanks for comments, > > > > >>>> > > > > >>>> Jan > > > > >>>> > > > > >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote: > > > > >>>>> @Jan Gotcha, > > > > >>>>> > > > > >>>>> So in reusing components it explicitly is not a writer. This is > > > not a > > > > >> savepoint output format in the way we have a parquet output > format. > > > The > > > > >> reason for the Transform api is to hide the underlying details, it > > > does > > > > not > > > > >> simply append a output writer to the end of a dataset. This gets > > into > > > > the > > > > >> implementation details but at a high level, the dataset is: > > > > >>>>> > > > > >>>>> 1) partitioned using key groups > > > > >>>>> 2) data is run through a standard stream operator that takes a > > > > >> snapshot of its state after processing all records and outputs > > > metadata > > > > >> handles for each subtask > > > > >>>>> 3) those metadata handles are aggregated down to a single > > savepoint > > > > >> handle > > > > >>>>> 4) that handle is written out as a final metadata file > > > > >>>>> > > > > >>>>> What’s important here is that the api actually depends on the > > data > > > > >> flow collection and state is written out as a side effect of > taking > > a > > > > >> savepoint. The FLIP describes a lose coupling to the dataset api > for > > > > >> eventual migration to BoundedStream, that is true. However, the > api > > > does > > > > >> require knowing what concrete data flow is being used to perform > > these > > > > >> re-partitionings and post aggregations. > > > > >>>>> > > > > >>>>> I’m linking to my prototype implementation, particularly what > > > > actually > > > > >> happens when you call write and run the transformations. Let me > know > > > if > > > > >> that helps clarify. > > > > >>>>> > > > > >>>>> Seth > > > > >>>>> > > > > >>>>> > > > > >> > > > > > > > > > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> > > > wrote: > > > > >>>>>> > > > > >>>>>> Hi Seth, > > > > >>>>>> > > > > >>>>>> that sounds reasonable. What I was asking for was not to > reverse > > > > >> engineer binary format, but to make the savepoint write API a > little > > > > more > > > > >> reusable, so that it could be wrapped into some other technology. > I > > > > don't > > > > >> know the details enough to propose a solution, but it seems to me, > > > that > > > > it > > > > >> could be possible to use something like Writer instead of > Transform. > > > Or > > > > >> maybe the Transform can use the Writer internally, the goal is > just > > to > > > > >> enable to create the savepoint from "'outside" of Flink (with some > > > > library, > > > > >> of course). > > > > >>>>>> > > > > >>>>>> Jan > > > > >>>>>> > > > > >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote: > > > > >>>>>>> @Konstantin agreed, that was a large impotence for this > > feature. > > > > >> Also I am happy to change the name to something that better > > describes > > > > the > > > > >> feature set. > > > > >>>>>>> > > > > >>>>>>> @Lan > > > > >>>>>>> > > > > >>>>>>> Savepoints depend heavily on a number of flink internal > > > components > > > > >> that may change between versions: state backends internals, type > > > > >> serializers, the specific hash function used to turn a UID into an > > > > >> OperatorID, etc. I consider it a feature of this proposal that the > > > > library > > > > >> depends on those internal components instead of reverse > engineering > > > the > > > > >> binary format. This way as those internals change, or new state > > > features > > > > >> are added (think the recent addition of TTL) we will get support > > > > >> automatically. I do not believe anything else is maintainable. > > > > >>>>>>> > > > > >>>>>>> Seth > > > > >>>>>>> > > > > >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> > > > > wrote: > > > > >>>>>>>> > > > > >>>>>>>> Hi, > > > > >>>>>>>> > > > > >>>>>>>> this is awesome, and really useful feature. If I might ask > for > > > one > > > > >> thing to consider - would it be possible to make the Savepoint > > > > manipulation > > > > >> API (at least writing the Savepoint) less dependent on other parts > > of > > > > Flink > > > > >> internals (e.g. |KeyedStateBootstrapFunction|) and provide > something > > > > more > > > > >> general (e.g. some generic Writer)? Why I'm asking for that - I > can > > > > totally > > > > >> imagine situation, where users might want to create bootstrapped > > state > > > > by > > > > >> some other runner (e.g. Apache Spark), and then run Apache Flink > > after > > > > the > > > > >> state has been created. This makes even more sense in context of > > > Apache > > > > >> Beam, which provides all the necessary work to make this happen. > The > > > > >> question is - would it be possible to design this feature so that > > > > writing > > > > >> the savepoint from different runner would be possible? > > > > >>>>>>>> > > > > >>>>>>>> Cheers, > > > > >>>>>>>> > > > > >>>>>>>> Jan > > > > >>>>>>>> > > > > >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: > > > > >>>>>>>>> Hey Everyone! > > > > >>>>>>>>> > > > > >>>>>>>>> Gordon and I have been discussing adding a savepoint > > connector > > > to > > > > >> flink for reading, writing and modifying savepoints. > > > > >>>>>>>>> > > > > >>>>>>>>> This is useful for: > > > > >>>>>>>>> > > > > >>>>>>>>> Analyzing state for interesting patterns > > > > >>>>>>>>> Troubleshooting or auditing jobs by checking for > > discrepancies > > > > >> in state > > > > >>>>>>>>> Bootstrapping state for new applications > > > > >>>>>>>>> Modifying savepoints such as: > > > > >>>>>>>>> Changing max parallelism > > > > >>>>>>>>> Making breaking schema changes > > > > >>>>>>>>> Correcting invalid state > > > > >>>>>>>>> > > > > >>>>>>>>> We are looking forward to your feedback! > > > > >>>>>>>>> > > > > >>>>>>>>> This is the FLIP: > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector > > > > >>>>>>>>> > > > > >>>>>>>>> Seth > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >> > > > > > > > > > >