Hi Seth,
yes, that helped! :-)
What I was looking for is essentially
`org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
would be great if this could be written in a way, that would enable
reusing it in different engine (as I mentioned - Apache Spark). There
seem to be some issues though. It uses interface Savepoint, which uses
several other objects and interfaces from Flink's runtime. Maybe some
convenience API might help - Apache Beam, handles operator naming, so
that definitely should be transitionable between systems, but I'm not
sure, how to construct OperatorID from this name. Would you think, that
it is possible to come up with something that could be used in this
portable way?
I understand, there are some more conditions, that need to be satisfied
(grouping, aggregating, ...), which would of course have to be handled
by the target system. But Apache Beam can help leverage that. My idea
would be, that there can be runner specified PTransform, that takes
PCollection of some tuples of `(operator name, key, state name, value1),
(operator name, key, state name, value2)`, and Runner's responsibility
would be to group/aggregate this so that it can be written by runner's
provided writer (output format).
All of this would need a lot more design, these are just ideas of "what
could be possible", I was just wondering if this FLIP can make some
first steps towards this.
Many thanks for comments,
Jan
On 5/31/19 8:12 PM, Seth Wiesman wrote:
@Jan Gotcha,
So in reusing components it explicitly is not a writer. This is not a savepoint
output format in the way we have a parquet output format. The reason for the
Transform api is to hide the underlying details, it does not simply append a
output writer to the end of a dataset. This gets into the implementation
details but at a high level, the dataset is:
1) partitioned using key groups
2) data is run through a standard stream operator that takes a snapshot of its
state after processing all records and outputs metadata handles for each subtask
3) those metadata handles are aggregated down to a single savepoint handle
4) that handle is written out as a final metadata file
What’s important here is that the api actually depends on the data flow
collection and state is written out as a side effect of taking a savepoint. The
FLIP describes a lose coupling to the dataset api for eventual migration to
BoundedStream, that is true. However, the api does require knowing what
concrete data flow is being used to perform these re-partitionings and post
aggregations.
I’m linking to my prototype implementation, particularly what actually happens
when you call write and run the transformations. Let me know if that helps
clarify.
Seth
https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote:
Hi Seth,
that sounds reasonable. What I was asking for was not to reverse engineer binary format,
but to make the savepoint write API a little more reusable, so that it could be wrapped
into some other technology. I don't know the details enough to propose a solution, but it
seems to me, that it could be possible to use something like Writer instead of Transform.
Or maybe the Transform can use the Writer internally, the goal is just to enable to
create the savepoint from "'outside" of Flink (with some library, of course).
Jan
On 5/31/19 1:17 PM, Seth Wiesman wrote:
@Konstantin agreed, that was a large impotence for this feature. Also I am
happy to change the name to something that better describes the feature set.
@Lan
Savepoints depend heavily on a number of flink internal components that may
change between versions: state backends internals, type serializers, the
specific hash function used to turn a UID into an OperatorID, etc. I consider
it a feature of this proposal that the library depends on those internal
components instead of reverse engineering the binary format. This way as those
internals change, or new state features are added (think the recent addition of
TTL) we will get support automatically. I do not believe anything else is
maintainable.
Seth
On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote:
Hi,
this is awesome, and really useful feature. If I might ask for one thing to
consider - would it be possible to make the Savepoint manipulation API (at
least writing the Savepoint) less dependent on other parts of Flink internals
(e.g. |KeyedStateBootstrapFunction|) and provide something more general (e.g.
some generic Writer)? Why I'm asking for that - I can totally imagine
situation, where users might want to create bootstrapped state by some other
runner (e.g. Apache Spark), and then run Apache Flink after the state has been
created. This makes even more sense in context of Apache Beam, which provides
all the necessary work to make this happen. The question is - would it be
possible to design this feature so that writing the savepoint from different
runner would be possible?
Cheers,
Jan
On 5/30/19 1:14 AM, Seth Wiesman wrote:
Hey Everyone!
Gordon and I have been discussing adding a savepoint connector to flink for
reading, writing and modifying savepoints.
This is useful for:
Analyzing state for interesting patterns
Troubleshooting or auditing jobs by checking for discrepancies in state
Bootstrapping state for new applications
Modifying savepoints such as:
Changing max parallelism
Making breaking schema changes
Correcting invalid state
We are looking forward to your feedback!
This is the FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
Seth