The SavepointOutputFormat only writes out the savepoint metadata file and 
should be mostly ignored.

The actual state is written out by stream operators and tied into the flink 
runtime[1, 2, 3].

This is the most important part and the piece that I don’t think can be 
reasonably extracted.

Seth

[1] 
https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84

[2] 
https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java

[3] 
https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java

> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
> 
> Hi Seth,
> 
> yes, that helped! :-)
> 
> What I was looking for is essentially 
> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It 
> would be great if this could be written in a way, that would enable reusing 
> it in different engine (as I mentioned - Apache Spark). There seem to be some 
> issues though. It uses interface Savepoint, which uses several other objects 
> and interfaces from Flink's runtime. Maybe some convenience API might help - 
> Apache Beam, handles operator naming, so that definitely should be 
> transitionable between systems, but I'm not sure, how to construct OperatorID 
> from this name. Would you think, that it is possible to come up with 
> something that could be used in this portable way?
> 
> I understand, there are some more conditions, that need to be satisfied 
> (grouping, aggregating, ...), which would of course have to be handled by the 
> target system. But Apache Beam can help leverage that. My idea would be, that 
> there can be runner specified PTransform, that takes PCollection of some 
> tuples of `(operator name, key, state name, value1), (operator name, key, 
> state name, value2)`, and Runner's responsibility would be to group/aggregate 
> this so that it can be written by runner's provided writer (output format).
> 
> All of this would need a lot more design, these are just ideas of "what could 
> be possible", I was just wondering if this FLIP can make some first steps 
> towards this.
> 
> Many thanks for comments,
> 
> Jan
> 
>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
>> @Jan Gotcha,
>> 
>> So in reusing components it explicitly is not a writer. This is not a 
>> savepoint output format in the way we have a parquet output format. The 
>> reason for the Transform api is to hide the underlying details, it does not 
>> simply append a output writer to the end of a dataset. This gets into the 
>> implementation details but at a high level, the dataset is:
>> 
>> 1) partitioned using key groups
>> 2) data is run through a standard stream operator that takes a snapshot of 
>> its state after processing all records and outputs metadata handles for each 
>> subtask
>> 3) those metadata handles are aggregated down to a single savepoint handle
>> 4) that handle is written out as a final metadata file
>> 
>> What’s important here is that the api actually depends on the data flow 
>> collection and state is written out as a side effect of taking a savepoint. 
>> The FLIP describes a lose coupling to the dataset api for eventual migration 
>> to BoundedStream, that is true. However, the api does require knowing what 
>> concrete data flow is being used to perform these re-partitionings  and post 
>> aggregations.
>> 
>> I’m linking to my prototype implementation, particularly what actually 
>> happens when you call write and run the transformations. Let me know if that 
>> helps clarify.
>> 
>> Seth
>> 
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
>> 
>> 
>> 
>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>> 
>>> Hi Seth,
>>> 
>>> that sounds reasonable. What I was asking for was not to reverse engineer 
>>> binary format, but to make the savepoint write API a little more reusable, 
>>> so that it could be wrapped into some other technology. I don't know the 
>>> details enough to propose a solution, but it seems to me, that it could be 
>>> possible to use something like Writer instead of Transform. Or maybe the 
>>> Transform can use the Writer internally, the goal is just to enable to 
>>> create the savepoint from "'outside" of Flink (with some library, of 
>>> course).
>>> 
>>> Jan
>>> 
>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
>>>> @Konstantin agreed, that was a large impotence for this feature. Also I am 
>>>> happy to change the name to something that better  describes the feature 
>>>> set.
>>>> 
>>>> @Lan
>>>> 
>>>> Savepoints depend heavily on a number of flink internal components that 
>>>> may change between versions: state backends internals, type serializers, 
>>>> the specific hash function used to turn a UID into an OperatorID, etc. I 
>>>> consider it a feature of this proposal that the library depends on those 
>>>> internal components instead of reverse engineering the binary format. This 
>>>> way as those internals change, or new state features are added (think the 
>>>> recent addition of TTL) we will get support automatically. I do not 
>>>> believe anything else is maintainable.
>>>> 
>>>> Seth
>>>> 
>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> this is awesome, and really useful feature. If I might ask for one thing 
>>>>> to consider - would it be possible to make the Savepoint manipulation API 
>>>>> (at least writing the Savepoint) less dependent on other parts of Flink 
>>>>> internals (e.g. |KeyedStateBootstrapFunction|) and provide something more 
>>>>> general (e.g. some generic Writer)? Why I'm asking for that - I can 
>>>>> totally imagine situation, where users might want to create bootstrapped 
>>>>> state by some other runner (e.g. Apache Spark), and then run Apache Flink 
>>>>> after the state has been created. This makes even more sense in context 
>>>>> of Apache Beam, which provides all the necessary work to make this 
>>>>> happen. The question is - would it be possible to design this feature so 
>>>>> that writing the savepoint from different runner would be possible?
>>>>> 
>>>>> Cheers,
>>>>> 
>>>>>  Jan
>>>>> 
>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
>>>>>> Hey Everyone!
>>>>>> ​
>>>>>> Gordon and I have been discussing adding a savepoint connector to flink 
>>>>>> for reading, writing and modifying savepoints.
>>>>>> ​
>>>>>> This is useful for:
>>>>>> ​
>>>>>>     Analyzing state for interesting patterns
>>>>>>     Troubleshooting or auditing jobs by checking for discrepancies in 
>>>>>> state
>>>>>>     Bootstrapping state for new applications
>>>>>>     Modifying savepoints such as:
>>>>>>         Changing max parallelism
>>>>>>         Making breaking schema changes
>>>>>>         Correcting invalid state
>>>>>> ​
>>>>>> We are looking forward to your feedback!
>>>>>> ​
>>>>>> This is the FLIP:
>>>>>> ​
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
>>>>>> 
>>>>>> Seth
>>>>>> 
>>>>>> 
>>>>>> 

Reply via email to