+1 I think is is a very valuable new additional and we should try and not get stuck on trying to design the perfect solution for everything
> On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > > +1 to renaming it as State Processing API and adding it under the > flink-libraries module. > > I also think we can start with the development of the feature. From the > feedback so far, it seems like we're in a good spot to add in at least the > initial version of this API, hopefully making it ready for 1.9.0. > > Cheers, > Gordon > > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sjwies...@gmail.com> wrote: > >> It seems like a recurring piece of feedback was a different name. I’d like >> to propose moving the functionality to the libraries module and naming this >> the State Processing API. >> >> Seth >> >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com> wrote: >>> >>> The SavepointOutputFormat only writes out the savepoint metadata file >> and should be mostly ignored. >>> >>> The actual state is written out by stream operators and tied into the >> flink runtime[1, 2, 3]. >>> >>> This is the most important part and the piece that I don’t think can be >> reasonably extracted. >>> >>> Seth >>> >>> [1] >> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84 >>> >>> [2] >> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java >>> >>> [3] >> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java >>> >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>> Hi Seth, >>>> >>>> yes, that helped! :-) >>>> >>>> What I was looking for is essentially >> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It >> would be great if this could be written in a way, that would enable reusing >> it in different engine (as I mentioned - Apache Spark). There seem to be >> some issues though. It uses interface Savepoint, which uses several other >> objects and interfaces from Flink's runtime. Maybe some convenience API >> might help - Apache Beam, handles operator naming, so that definitely >> should be transitionable between systems, but I'm not sure, how to >> construct OperatorID from this name. Would you think, that it is possible >> to come up with something that could be used in this portable way? >>>> >>>> I understand, there are some more conditions, that need to be satisfied >> (grouping, aggregating, ...), which would of course have to be handled by >> the target system. But Apache Beam can help leverage that. My idea would >> be, that there can be runner specified PTransform, that takes PCollection >> of some tuples of `(operator name, key, state name, value1), (operator >> name, key, state name, value2)`, and Runner's responsibility would be to >> group/aggregate this so that it can be written by runner's provided writer >> (output format). >>>> >>>> All of this would need a lot more design, these are just ideas of "what >> could be possible", I was just wondering if this FLIP can make some first >> steps towards this. >>>> >>>> Many thanks for comments, >>>> >>>> Jan >>>> >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote: >>>>> @Jan Gotcha, >>>>> >>>>> So in reusing components it explicitly is not a writer. This is not a >> savepoint output format in the way we have a parquet output format. The >> reason for the Transform api is to hide the underlying details, it does not >> simply append a output writer to the end of a dataset. This gets into the >> implementation details but at a high level, the dataset is: >>>>> >>>>> 1) partitioned using key groups >>>>> 2) data is run through a standard stream operator that takes a >> snapshot of its state after processing all records and outputs metadata >> handles for each subtask >>>>> 3) those metadata handles are aggregated down to a single savepoint >> handle >>>>> 4) that handle is written out as a final metadata file >>>>> >>>>> What’s important here is that the api actually depends on the data >> flow collection and state is written out as a side effect of taking a >> savepoint. The FLIP describes a lose coupling to the dataset api for >> eventual migration to BoundedStream, that is true. However, the api does >> require knowing what concrete data flow is being used to perform these >> re-partitionings and post aggregations. >>>>> >>>>> I’m linking to my prototype implementation, particularly what actually >> happens when you call write and run the transformations. Let me know if >> that helps clarify. >>>>> >>>>> Seth >>>>> >>>>> >> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 >>>>> >>>>> >>>>> >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote: >>>>>> >>>>>> Hi Seth, >>>>>> >>>>>> that sounds reasonable. What I was asking for was not to reverse >> engineer binary format, but to make the savepoint write API a little more >> reusable, so that it could be wrapped into some other technology. I don't >> know the details enough to propose a solution, but it seems to me, that it >> could be possible to use something like Writer instead of Transform. Or >> maybe the Transform can use the Writer internally, the goal is just to >> enable to create the savepoint from "'outside" of Flink (with some library, >> of course). >>>>>> >>>>>> Jan >>>>>> >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote: >>>>>>> @Konstantin agreed, that was a large impotence for this feature. >> Also I am happy to change the name to something that better describes the >> feature set. >>>>>>> >>>>>>> @Lan >>>>>>> >>>>>>> Savepoints depend heavily on a number of flink internal components >> that may change between versions: state backends internals, type >> serializers, the specific hash function used to turn a UID into an >> OperatorID, etc. I consider it a feature of this proposal that the library >> depends on those internal components instead of reverse engineering the >> binary format. This way as those internals change, or new state features >> are added (think the recent addition of TTL) we will get support >> automatically. I do not believe anything else is maintainable. >>>>>>> >>>>>>> Seth >>>>>>> >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> this is awesome, and really useful feature. If I might ask for one >> thing to consider - would it be possible to make the Savepoint manipulation >> API (at least writing the Savepoint) less dependent on other parts of Flink >> internals (e.g. |KeyedStateBootstrapFunction|) and provide something more >> general (e.g. some generic Writer)? Why I'm asking for that - I can totally >> imagine situation, where users might want to create bootstrapped state by >> some other runner (e.g. Apache Spark), and then run Apache Flink after the >> state has been created. This makes even more sense in context of Apache >> Beam, which provides all the necessary work to make this happen. The >> question is - would it be possible to design this feature so that writing >> the savepoint from different runner would be possible? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> >>>>>>>> Jan >>>>>>>> >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: >>>>>>>>> Hey Everyone! >>>>>>>>> >>>>>>>>> Gordon and I have been discussing adding a savepoint connector to >> flink for reading, writing and modifying savepoints. >>>>>>>>> >>>>>>>>> This is useful for: >>>>>>>>> >>>>>>>>> Analyzing state for interesting patterns >>>>>>>>> Troubleshooting or auditing jobs by checking for discrepancies >> in state >>>>>>>>> Bootstrapping state for new applications >>>>>>>>> Modifying savepoints such as: >>>>>>>>> Changing max parallelism >>>>>>>>> Making breaking schema changes >>>>>>>>> Correcting invalid state >>>>>>>>> >>>>>>>>> We are looking forward to your feedback! >>>>>>>>> >>>>>>>>> This is the FLIP: >>>>>>>>> >>>>>>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>>>>>>>> >>>>>>>>> Seth >>>>>>>>> >>>>>>>>> >>>>>>>>> >>