It seems like a recurring piece of feedback was a different name. I’d like to propose moving the functionality to the libraries module and naming this the State Processing API.
Seth > On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com> wrote: > > The SavepointOutputFormat only writes out the savepoint metadata file and > should be mostly ignored. > > The actual state is written out by stream operators and tied into the flink > runtime[1, 2, 3]. > > This is the most important part and the piece that I don’t think can be > reasonably extracted. > > Seth > > [1] > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84 > > [2] > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java > > [3] > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java > >> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote: >> >> Hi Seth, >> >> yes, that helped! :-) >> >> What I was looking for is essentially >> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It >> would be great if this could be written in a way, that would enable reusing >> it in different engine (as I mentioned - Apache Spark). There seem to be >> some issues though. It uses interface Savepoint, which uses several other >> objects and interfaces from Flink's runtime. Maybe some convenience API >> might help - Apache Beam, handles operator naming, so that definitely should >> be transitionable between systems, but I'm not sure, how to construct >> OperatorID from this name. Would you think, that it is possible to come up >> with something that could be used in this portable way? >> >> I understand, there are some more conditions, that need to be satisfied >> (grouping, aggregating, ...), which would of course have to be handled by >> the target system. But Apache Beam can help leverage that. My idea would be, >> that there can be runner specified PTransform, that takes PCollection of >> some tuples of `(operator name, key, state name, value1), (operator name, >> key, state name, value2)`, and Runner's responsibility would be to >> group/aggregate this so that it can be written by runner's provided writer >> (output format). >> >> All of this would need a lot more design, these are just ideas of "what >> could be possible", I was just wondering if this FLIP can make some first >> steps towards this. >> >> Many thanks for comments, >> >> Jan >> >>> On 5/31/19 8:12 PM, Seth Wiesman wrote: >>> @Jan Gotcha, >>> >>> So in reusing components it explicitly is not a writer. This is not a >>> savepoint output format in the way we have a parquet output format. The >>> reason for the Transform api is to hide the underlying details, it does not >>> simply append a output writer to the end of a dataset. This gets into the >>> implementation details but at a high level, the dataset is: >>> >>> 1) partitioned using key groups >>> 2) data is run through a standard stream operator that takes a snapshot of >>> its state after processing all records and outputs metadata handles for >>> each subtask >>> 3) those metadata handles are aggregated down to a single savepoint handle >>> 4) that handle is written out as a final metadata file >>> >>> What’s important here is that the api actually depends on the data flow >>> collection and state is written out as a side effect of taking a savepoint. >>> The FLIP describes a lose coupling to the dataset api for eventual >>> migration to BoundedStream, that is true. However, the api does require >>> knowing what concrete data flow is being used to perform these >>> re-partitionings and post aggregations. >>> >>> I’m linking to my prototype implementation, particularly what actually >>> happens when you call write and run the transformations. Let me know if >>> that helps clarify. >>> >>> Seth >>> >>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 >>> >>> >>> >>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>> Hi Seth, >>>> >>>> that sounds reasonable. What I was asking for was not to reverse engineer >>>> binary format, but to make the savepoint write API a little more reusable, >>>> so that it could be wrapped into some other technology. I don't know the >>>> details enough to propose a solution, but it seems to me, that it could be >>>> possible to use something like Writer instead of Transform. Or maybe the >>>> Transform can use the Writer internally, the goal is just to enable to >>>> create the savepoint from "'outside" of Flink (with some library, of >>>> course). >>>> >>>> Jan >>>> >>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote: >>>>> @Konstantin agreed, that was a large impotence for this feature. Also I >>>>> am happy to change the name to something that better describes the >>>>> feature set. >>>>> >>>>> @Lan >>>>> >>>>> Savepoints depend heavily on a number of flink internal components that >>>>> may change between versions: state backends internals, type serializers, >>>>> the specific hash function used to turn a UID into an OperatorID, etc. I >>>>> consider it a feature of this proposal that the library depends on those >>>>> internal components instead of reverse engineering the binary format. >>>>> This way as those internals change, or new state features are added >>>>> (think the recent addition of TTL) we will get support automatically. I >>>>> do not believe anything else is maintainable. >>>>> >>>>> Seth >>>>> >>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> this is awesome, and really useful feature. If I might ask for one thing >>>>>> to consider - would it be possible to make the Savepoint manipulation >>>>>> API (at least writing the Savepoint) less dependent on other parts of >>>>>> Flink internals (e.g. |KeyedStateBootstrapFunction|) and provide >>>>>> something more general (e.g. some generic Writer)? Why I'm asking for >>>>>> that - I can totally imagine situation, where users might want to create >>>>>> bootstrapped state by some other runner (e.g. Apache Spark), and then >>>>>> run Apache Flink after the state has been created. This makes even more >>>>>> sense in context of Apache Beam, which provides all the necessary work >>>>>> to make this happen. The question is - would it be possible to design >>>>>> this feature so that writing the savepoint from different runner would >>>>>> be possible? >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Jan >>>>>> >>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: >>>>>>> Hey Everyone! >>>>>>> >>>>>>> Gordon and I have been discussing adding a savepoint connector to flink >>>>>>> for reading, writing and modifying savepoints. >>>>>>> >>>>>>> This is useful for: >>>>>>> >>>>>>> Analyzing state for interesting patterns >>>>>>> Troubleshooting or auditing jobs by checking for discrepancies in >>>>>>> state >>>>>>> Bootstrapping state for new applications >>>>>>> Modifying savepoints such as: >>>>>>> Changing max parallelism >>>>>>> Making breaking schema changes >>>>>>> Correcting invalid state >>>>>>> >>>>>>> We are looking forward to your feedback! >>>>>>> >>>>>>> This is the FLIP: >>>>>>> >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>>>>>> >>>>>>> Seth >>>>>>> >>>>>>> >>>>>>>