The SavepointOutputFormat only writes out the savepoint metadata file and should be mostly ignored.
The actual state is written out by stream operators and tied into the flink runtime[1, 2, 3]. This is the most important part and the piece that I don’t think can be reasonably extracted. Seth [1] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84 [2] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java [3] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java > On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote: > > Hi Seth, > > yes, that helped! :-) > > What I was looking for is essentially > `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It > would be great if this could be written in a way, that would enable reusing > it in different engine (as I mentioned - Apache Spark). There seem to be some > issues though. It uses interface Savepoint, which uses several other objects > and interfaces from Flink's runtime. Maybe some convenience API might help - > Apache Beam, handles operator naming, so that definitely should be > transitionable between systems, but I'm not sure, how to construct OperatorID > from this name. Would you think, that it is possible to come up with > something that could be used in this portable way? > > I understand, there are some more conditions, that need to be satisfied > (grouping, aggregating, ...), which would of course have to be handled by the > target system. But Apache Beam can help leverage that. My idea would be, that > there can be runner specified PTransform, that takes PCollection of some > tuples of `(operator name, key, state name, value1), (operator name, key, > state name, value2)`, and Runner's responsibility would be to group/aggregate > this so that it can be written by runner's provided writer (output format). > > All of this would need a lot more design, these are just ideas of "what could > be possible", I was just wondering if this FLIP can make some first steps > towards this. > > Many thanks for comments, > > Jan > >> On 5/31/19 8:12 PM, Seth Wiesman wrote: >> @Jan Gotcha, >> >> So in reusing components it explicitly is not a writer. This is not a >> savepoint output format in the way we have a parquet output format. The >> reason for the Transform api is to hide the underlying details, it does not >> simply append a output writer to the end of a dataset. This gets into the >> implementation details but at a high level, the dataset is: >> >> 1) partitioned using key groups >> 2) data is run through a standard stream operator that takes a snapshot of >> its state after processing all records and outputs metadata handles for each >> subtask >> 3) those metadata handles are aggregated down to a single savepoint handle >> 4) that handle is written out as a final metadata file >> >> What’s important here is that the api actually depends on the data flow >> collection and state is written out as a side effect of taking a savepoint. >> The FLIP describes a lose coupling to the dataset api for eventual migration >> to BoundedStream, that is true. However, the api does require knowing what >> concrete data flow is being used to perform these re-partitionings and post >> aggregations. >> >> I’m linking to my prototype implementation, particularly what actually >> happens when you call write and run the transformations. Let me know if that >> helps clarify. >> >> Seth >> >> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 >> >> >> >>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote: >>> >>> Hi Seth, >>> >>> that sounds reasonable. What I was asking for was not to reverse engineer >>> binary format, but to make the savepoint write API a little more reusable, >>> so that it could be wrapped into some other technology. I don't know the >>> details enough to propose a solution, but it seems to me, that it could be >>> possible to use something like Writer instead of Transform. Or maybe the >>> Transform can use the Writer internally, the goal is just to enable to >>> create the savepoint from "'outside" of Flink (with some library, of >>> course). >>> >>> Jan >>> >>>> On 5/31/19 1:17 PM, Seth Wiesman wrote: >>>> @Konstantin agreed, that was a large impotence for this feature. Also I am >>>> happy to change the name to something that better describes the feature >>>> set. >>>> >>>> @Lan >>>> >>>> Savepoints depend heavily on a number of flink internal components that >>>> may change between versions: state backends internals, type serializers, >>>> the specific hash function used to turn a UID into an OperatorID, etc. I >>>> consider it a feature of this proposal that the library depends on those >>>> internal components instead of reverse engineering the binary format. This >>>> way as those internals change, or new state features are added (think the >>>> recent addition of TTL) we will get support automatically. I do not >>>> believe anything else is maintainable. >>>> >>>> Seth >>>> >>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote: >>>>> >>>>> Hi, >>>>> >>>>> this is awesome, and really useful feature. If I might ask for one thing >>>>> to consider - would it be possible to make the Savepoint manipulation API >>>>> (at least writing the Savepoint) less dependent on other parts of Flink >>>>> internals (e.g. |KeyedStateBootstrapFunction|) and provide something more >>>>> general (e.g. some generic Writer)? Why I'm asking for that - I can >>>>> totally imagine situation, where users might want to create bootstrapped >>>>> state by some other runner (e.g. Apache Spark), and then run Apache Flink >>>>> after the state has been created. This makes even more sense in context >>>>> of Apache Beam, which provides all the necessary work to make this >>>>> happen. The question is - would it be possible to design this feature so >>>>> that writing the savepoint from different runner would be possible? >>>>> >>>>> Cheers, >>>>> >>>>> Jan >>>>> >>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: >>>>>> Hey Everyone! >>>>>> >>>>>> Gordon and I have been discussing adding a savepoint connector to flink >>>>>> for reading, writing and modifying savepoints. >>>>>> >>>>>> This is useful for: >>>>>> >>>>>> Analyzing state for interesting patterns >>>>>> Troubleshooting or auditing jobs by checking for discrepancies in >>>>>> state >>>>>> Bootstrapping state for new applications >>>>>> Modifying savepoints such as: >>>>>> Changing max parallelism >>>>>> Making breaking schema changes >>>>>> Correcting invalid state >>>>>> >>>>>> We are looking forward to your feedback! >>>>>> >>>>>> This is the FLIP: >>>>>> >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>>>>> >>>>>> Seth >>>>>> >>>>>> >>>>>>