It seems like a recurring piece of feedback was a different name. I’d like to 
propose moving the functionality to the libraries module and naming this the 
State Processing API. 

Seth

> On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com> wrote:
> 
> The SavepointOutputFormat only writes out the savepoint metadata file and 
> should be mostly ignored.
> 
> The actual state is written out by stream operators and tied into the flink 
> runtime[1, 2, 3].
> 
> This is the most important part and the piece that I don’t think can be 
> reasonably extracted.
> 
> Seth
> 
> [1] 
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> 
> [2] 
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> 
> [3] 
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> 
>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
>> 
>> Hi Seth,
>> 
>> yes, that helped! :-)
>> 
>> What I was looking for is essentially 
>> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It 
>> would be great if this could be written in a way, that would enable reusing 
>> it in different engine (as I mentioned - Apache Spark). There seem to be 
>> some issues though. It uses interface Savepoint, which uses several other 
>> objects and interfaces from Flink's runtime. Maybe some convenience API 
>> might help - Apache Beam, handles operator naming, so that definitely should 
>> be transitionable between systems, but I'm not sure, how to construct 
>> OperatorID from this name. Would you think, that it is possible to come up 
>> with something that could be used in this portable way?
>> 
>> I understand, there are some more conditions, that need to be satisfied 
>> (grouping, aggregating, ...), which would of course have to be handled by 
>> the target system. But Apache Beam can help leverage that. My idea would be, 
>> that there can be runner specified PTransform, that takes PCollection of 
>> some tuples of `(operator name, key, state name, value1), (operator name, 
>> key, state name, value2)`, and Runner's responsibility would be to 
>> group/aggregate this so that it can be written by runner's provided writer 
>> (output format).
>> 
>> All of this would need a lot more design, these are just ideas of "what 
>> could be possible", I was just wondering if this FLIP can make some first 
>> steps towards this.
>> 
>> Many thanks for comments,
>> 
>> Jan
>> 
>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
>>> @Jan Gotcha,
>>> 
>>> So in reusing components it explicitly is not a writer. This is not a 
>>> savepoint output format in the way we have a parquet output format. The 
>>> reason for the Transform api is to hide the underlying details, it does not 
>>> simply append a output writer to the end of a dataset. This gets into the 
>>> implementation details but at a high level, the dataset is:
>>> 
>>> 1) partitioned using key groups
>>> 2) data is run through a standard stream operator that takes a snapshot of 
>>> its state after processing all records and outputs metadata handles for 
>>> each subtask
>>> 3) those metadata handles are aggregated down to a single savepoint handle
>>> 4) that handle is written out as a final metadata file
>>> 
>>> What’s important here is that the api actually depends on the data flow 
>>> collection and state is written out as a side effect of taking a savepoint. 
>>> The FLIP describes a lose coupling to the dataset api for eventual 
>>> migration to BoundedStream, that is true. However, the api does require 
>>> knowing what concrete data flow is being used to perform these 
>>> re-partitionings  and post aggregations.
>>> 
>>> I’m linking to my prototype implementation, particularly what actually 
>>> happens when you call write and run the transformations. Let me know if 
>>> that helps clarify.
>>> 
>>> Seth
>>> 
>>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
>>> 
>>> 
>>> 
>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>> 
>>>> Hi Seth,
>>>> 
>>>> that sounds reasonable. What I was asking for was not to reverse engineer 
>>>> binary format, but to make the savepoint write API a little more reusable, 
>>>> so that it could be wrapped into some other technology. I don't know the 
>>>> details enough to propose a solution, but it seems to me, that it could be 
>>>> possible to use something like Writer instead of Transform. Or maybe the 
>>>> Transform can use the Writer internally, the goal is just to enable to 
>>>> create the savepoint from "'outside" of Flink (with some library, of 
>>>> course).
>>>> 
>>>> Jan
>>>> 
>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
>>>>> @Konstantin agreed, that was a large impotence for this feature. Also I 
>>>>> am happy to change the name to something that better  describes the 
>>>>> feature set.
>>>>> 
>>>>> @Lan
>>>>> 
>>>>> Savepoints depend heavily on a number of flink internal components that 
>>>>> may change between versions: state backends internals, type serializers, 
>>>>> the specific hash function used to turn a UID into an OperatorID, etc. I 
>>>>> consider it a feature of this proposal that the library depends on those 
>>>>> internal components instead of reverse engineering the binary format. 
>>>>> This way as those internals change, or new state features are added 
>>>>> (think the recent addition of TTL) we will get support automatically. I 
>>>>> do not believe anything else is maintainable.
>>>>> 
>>>>> Seth
>>>>> 
>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> this is awesome, and really useful feature. If I might ask for one thing 
>>>>>> to consider - would it be possible to make the Savepoint manipulation 
>>>>>> API (at least writing the Savepoint) less dependent on other parts of 
>>>>>> Flink internals (e.g. |KeyedStateBootstrapFunction|) and provide 
>>>>>> something more general (e.g. some generic Writer)? Why I'm asking for 
>>>>>> that - I can totally imagine situation, where users might want to create 
>>>>>> bootstrapped state by some other runner (e.g. Apache Spark), and then 
>>>>>> run Apache Flink after the state has been created. This makes even more 
>>>>>> sense in context of Apache Beam, which provides all the necessary work 
>>>>>> to make this happen. The question is - would it be possible to design 
>>>>>> this feature so that writing the savepoint from different runner would 
>>>>>> be possible?
>>>>>> 
>>>>>> Cheers,
>>>>>> 
>>>>>> Jan
>>>>>> 
>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
>>>>>>> Hey Everyone!
>>>>>>> ​
>>>>>>> Gordon and I have been discussing adding a savepoint connector to flink 
>>>>>>> for reading, writing and modifying savepoints.
>>>>>>> ​
>>>>>>> This is useful for:
>>>>>>> ​
>>>>>>>    Analyzing state for interesting patterns
>>>>>>>    Troubleshooting or auditing jobs by checking for discrepancies in 
>>>>>>> state
>>>>>>>    Bootstrapping state for new applications
>>>>>>>    Modifying savepoints such as:
>>>>>>>        Changing max parallelism
>>>>>>>        Making breaking schema changes
>>>>>>>        Correcting invalid state
>>>>>>> ​
>>>>>>> We are looking forward to your feedback!
>>>>>>> ​
>>>>>>> This is the FLIP:
>>>>>>> ​
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
>>>>>>> 
>>>>>>> Seth
>>>>>>> 
>>>>>>> 
>>>>>>> 

Reply via email to