Thanks for sharing the thoughts, David! IIUC, there are two goals to make the APIs used for RPCs as versionless as possible? 1. no version mismatch happens if no code changes 2. supports different versioned Flink clients and clusters The first goal can be achieved by explicitly assigning a serialVersionUID to serializable classes, which is already required in Flink code style. Therefore, it should not be a major problem. The second goal requires introducing versioned serializers and rewriting classes to serialize. It can also bring benefits like better performance. However, it is not limited to the job submission API. And it would require a thorough and maybe complex design. Therefore, I think it's better to do it in an individual FLIP.
Regarding submitting a StreamGraph or a list of transformations with configuration, I think StreamGraph is simple enough. It contains fields like operators, partitioners and configuration, which are already serialized to JobGraph nowadays, which means there is little work and risk to turn it into a serializable. Directly serializing and submitting transformations is also acceptable at the first glance, but SinkTransformation seems to be a blocker, as Junrui just mentioned. Thanks, Zhu Junrui Lee <jrlee....@gmail.com> 于2024年7月30日周二 12:13写道: > Hi David, > > Thank you very much for your detailed explanation, which is crucial in > helping to further improve this FLIP. > > This FLIP is applicable to both batch and stream processing. For batch > processing, it can be used to optimize the StreamGraph (e.g., FLIP-469), > while for streaming, we can use the StreamGraph to show a detailed logical > plan at runtime (e.g., FLINK-33230) and potentially retain possible logical > topology optimizations in the future (e.g., performing intelligent chain > breaks while ensuring state compatibility). > > Overall, whether based on the existing JobGraph or the proposed submission > based on StreamGraph, it is invisible to users, and its REST API is > internal because the JobGraph and StreamGraph are internal classes. > Although it is documented, we could consider removing it from the official > documentation. > > Regarding your mention of "making the RPC APIs as versionless as possible," > I think your viewpoint is correct and highly valuable. I have carefully > considered your suggestion of serializing a list of transformations and a > configuration. This is indeed a step towards making the APIs used for RPCs > as versionless as possible. > However, I think this task is much more complex than serializing a > StreamGraph, as it requires ensuring that each subclass of transformation > and its properties are serializable. This obviously adds a significant > amount of complexity. For example, the SinkTransformation includes > DataStream properties, which have many unserializable fields, such as the > StreamExecutionEnvironment. > Moreover, this solution does not completely solve the problem of RPC API > versioning. > > Therefore, as this FLIP does not truly change a public REST API, I think we > can narrow down the scope of this FLIP a bit, focusing on how to enable the > JM to see and operate on StreamGraph. > In my understanding, current proposal will not complicate the future work > if Flink tries to make its REST API more versionless, e.g. directly submit > transformations. Instead, most of the work can be reused, like creating > JobGraph at runtime. > > WDYT? Looking forward to your feedback > > Best, > Junrui > > David Morávek <d...@apache.org> 于2024年7月29日周一 21:34写道: > > > Hi all, > > > > My main concern is the absence of a comprehensive vision for how we could > > make this work for both Batch and Streaming. Right now, it feels like the > > proposal is solely centered around very specific batch optimizations. > > > > I’m inclined to support submitting StreamGraph because it could already > > > provide the actual logical plan of the job at runtime. To be honest, > I’m > > > not sure what additional benefits submitting Transformations would > bring > > > compared to StreamGraph. I would appreciate any insights that you might > > > offer on this matter. > > > > > > This FLIP actually proposes a new job representation, so it would be > great > > to learn from the mistakes of the JobGraph. The main drawback of the > > JobGraph is its very tight coupling with the particular Flink version. > Even > > a small patch version difference between client and server can make the > > JobGraph invalid due to the nature of Java Serializability. StreamGraph > is > > exposed to an even bigger surface area with more internal data structures > > that have the potential to make this problem more visible. > > > > In general, it would be highly valuable to make the APIs used for RPCs as > > versionless as possible. If you were to write a custom serializer for the > > StreamGraph, you’d actually find that you don’t need more than a list of > > transformations and a configuration. Passing those to the > > StreamGraphGenerator will produce a deterministic result. In this case, > > you’ve limited the surface area of what you need to JavaSerialize to > > user-provided transformations, which is a pretty nice property. > > > > A list of transformations plus configuration is also the most > minimalistic > > common denominator across the APIs Flink offers (DataStream, Table, SQL). > > Additionally, it would be beneficial to make the submission data > structures > > immutable. This immutability would ensure the integrity and consistency > of > > the job definitions throughout their lifecycle, reducing the risk of > > inadvertent modifications and making the system more robust. > > > > Best, > > D. > > > > On Sat, Jul 27, 2024 at 4:16 AM Yu Chen <yuchen.e...@gmail.com> wrote: > > > > > Hi Junrui, > > > > > > Thanks for driving this. It’s really helpful for user to explore the > job > > > they submitted. > > > > > > It's an important cornerstone of the FLINK-33230 since there are no > > > stream-graph informations serialized. > > > I'd like to base on this flip to expose operator level metrics and the > > > presentation of stream-graphs on the web to enable fine-grained > > > observations for users, as mentioned in Future Work. > > > > > > > > > Best, > > > Yu Chen > > > > > > > > > > 2024年7月24日 16:03,Junrui Lee <jrlee....@gmail.com> 写道: > > > > > > > > Hi all, > > > > > > > > Thank you for all the feedback and suggestions so far. If there are > no > > > > further comments, we will open the voting thread on Friday, July 26, > > > 2024. > > > > > > > > Best regards, > > > > Junrui > > > > > > > > Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 13:43写道: > > > > > > > >> Hi, Junrui > > > >> > > > >> Thanks for your detailed reply. > > > >> > > > >> After reading the updated FLIP-468 & FLIP-470, I see that the design > > > looks > > > >> good. > > > >> > > > >> > > > >> Best. > > > >> Ron > > > >> > > > >> Junrui Lee <jrlee....@gmail.com> 于2024年7月18日周四 14:26写道: > > > >> > > > >>> Hi all, > > > >>> > > > >>> I would like to follow up on my previous email regarding your > > feedback. > > > >>> Below > > > >>> is a concise summary of my main points: > > > >>> > > > >>> 1. Compiled Plan: > > > >>> IIUC, the compiled plan is primarily for ensuring execution plan > > > >>> compatibility across job versions (e.g., during upgrades). > > Eventually, > > > it > > > >>> needs to be converted to StreamGraph for submission. Persisting > > > >>> StreamGraph/JobGraph is for supporting high availability in jm > > failover > > > >>> scenarios. > > > >>> > > > >>> 2. JobGraph vs StreamGraph Submission: > > > >>> For users, whether to submit JobGraph or StreamGraph is mostly > > > >> transparent. > > > >>> In the client mode, this detail is hidden from users. The REST API > > > >> (/jobs) > > > >>> was originally serving the internal JobGraph class, intended for > the > > > >> Flink > > > >>> client, not as a strict public API. > > > >>> > > > >>> 3. Runtime SQL Optimization: > > > >>> To facilitate runtime SQL-related optimization, we will introduce a > > > >>> strategy mechanism called StreamGraphOptimizationStrategy. SQL can > > > >>> implement specific strategies, such as > > > >>> AdaptiveBroadcastJoinOptimizationStrategy. More details are > available > > > in > > > >>> FLIP-469 [1]. > > > >>> > > > >>> 4. Persistence Strategy: > > > >>> Initially, we can retain the JobGraph persistence method to ensure > it > > > >> does > > > >>> not affect stream jobs. After the new method is validated in batch > > > >>> processing scenarios over several versions, we can unify the stream > > > jobs > > > >> to > > > >>> also use StreamGraph persistence. > > > >>> > > > >>> Your thoughts and insights on these points would be highly > > appreciated. > > > >>> > > > >>> [1] > > > >>> > > > >>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph > > > >>> > > > >>> Best regards, > > > >>> > > > >>> Junrui > > > >>> > > > >>> Junrui Lee <jrlee....@gmail.com> 于2024年7月12日周五 20:29写道: > > > >>> > > > >>>> Hi all, > > > >>>> > > > >>>> Thanks for your feedback. Below are my thoughts on the questions > > > you've > > > >>>> raised > > > >>>> > > > >>>> @Fabian > > > >>>> > > > >>>> - What is the future plan for job submissions in Flink? With the > > > >> current > > > >>>>> proposal, Flink will support JobGraph/StreamGraph/compiled plan > > > >>>>> submissions? It might be confusing for users and complicate the > > > >> existing > > > >>>>> job submission logic significantly. > > > >>>> > > > >>>> > > > >>>> In my view, Flink should support submissions via StreamGraph and, > > > >>>> eventually, remove support for JobGraph submissions. As for the > > > >> compiled > > > >>>> plan, I consider it to be a concept related to table/sql, which > also > > > >>> needs > > > >>>> to be compiled into the corresponding StreamGraph (a more > > generalized > > > >>>> concept, applicable as well to DataStream API jobs) upon > submission. > > > >>>> In this FLIP, considering to provide a smoother migration for > users, > > > >> and > > > >>>> recognizing that currently, a vast number of Flink UT/IT cases > > depend > > > >> on > > > >>>> the submission of a specialized JobGraph to achieve test > objectives, > > > >>>> transitioning these test cases and replacing with StreamGraph > would > > be > > > >> a > > > >>>> highly risky and complicated action. Therefore, I am inclined not > to > > > >>>> eliminate the pathway of submitting JobGraph within this FLIP. > > > >>>> > > > >>>> How do we plan to connect the optimizer with Flink's runtime? > > > >>>> > > > >>>> @Ron > > > >>>> > > > >>>>> For batch scenario, if we want to better support dynamic plan > > tuning > > > >>>>> strategies, the fundamental solution is still to put SQL > Optimizer > > to > > > >>>>> flink-runtime. > > > >>>> > > > >>>> > > > >>>> Our current solution is to abstract an interface called > > > >>>> StreamGraphOptimizationStrategy. For table-related optimization > > > >>> strategies, > > > >>>> taking the Broadcast Join concept as an example, we can implement > an > > > >>>> AdaptiveBroadcastJoinOptimizationStrategy at the table layer. When > > > this > > > >>>> strategy is needed, during SQL compilation, we will set the > > > >> configuration > > > >>>> item > execution.batch.adaptive.stream-graph-optimization.strategies. > > > >> Then, > > > >>>> the runtime layer will load the corresponding Strategy class based > > on > > > >>> this > > > >>>> configuration item to be used at runtime. > > > >>>> > > > >>>> We will describe this part in the StreamGraphOptimizer section of > > > >>>> FLIP-469, and the specific introduction of > > > >>>> AdaptiveBroadcastJoinOptimizationStrategy will be discussed in > > > >> subsequent > > > >>>> FLIPs, which is expected to happen within a few days. > > > >>>> > > > >>>> @David > > > >>>> > > > >>>>> 1. Transformations in StreamGraphGenerator:* > > > >>>>> > > > >>>> > > > >>>> I'm inclined to support submitting StreamGraph because it could > > > already > > > >>>> provide the actual logical plan of the job at runtime. To be > honest, > > > >> I'm > > > >>>> not sure what additional benefits submitting Transformations would > > > >> bring > > > >>>> compared to StreamGraph. I would appreciate any insights that you > > > might > > > >>>> offer on this matter. > > > >>>> > > > >>>> *2. StreamGraph for Recovery Purposes: > > > >>>> > > > >>>> > > > >>>> In fact, this conflicts with our desire to make adaptive > > optimization > > > >> to > > > >>>> the StreamGraph at runtime, as under such scenarios, the > StreamGraph > > > is > > > >>> the > > > >>>> complete expression of a job's logic, not the JobGraph. More > details > > > >> can > > > >>>> refer to the specific details in FLIP-469. > > > >>>> > > > >>>> I have reviewed the code related to the JobGraphStore and > discovered > > > >> that > > > >>>> it can be extended to store both StreamGraph and JobGraph > > > >> simultaneously. > > > >>>> As for your concerns, can we consider the following: in batch > mode, > > we > > > >>> use > > > >>>> Job Recovery based on StreamGraph, whereas for stream mode, we > > > continue > > > >>> to > > > >>>> use the original JobGraph recovery and the StreamGraph would be > > > >> converted > > > >>>> to a JobGraph right at the beginning. > > > >>>> > > > >>>> 3. Moving Away from Java Serializables: > > > >>>> > > > >>>> > > > >>>> Are you suggesting that Java serialization has limitations, and > that > > > we > > > >>>> should explore alternative serialization approaches? I agree that > > this > > > >>> is a > > > >>>> valuable consideration for the future. Do you think this should be > > > >>> included > > > >>>> in this FLIP? I would prefer to address it as a separate FLIP. > > > >>>> > > > >>>> Best, > > > >>>> Junrui > > > >>>> > > > >>>> David Morávek <david.mora...@gmail.com> 于2024年7月12日周五 14:47写道: > > > >>>> > > > >>>>>> > > > >>>>>> For batch scenario, if we want to better support dynamic plan > > tuning > > > >>>>>> strategies, the fundamental solution is still to put SQL > Optimizer > > > >> to > > > >>>>>> flink-runtime. > > > >>>>> > > > >>>>> > > > >>>>> One accompanying question is: how do you envision this to work > for > > > >>>>> streaming where you need to ensure state compatibility after the > > plan > > > >>>>> change? FLIP-496 seems to only focus on batch. > > > >>>>> > > > >>>>> Best, > > > >>>>> D. > > > >>>>> > > > >>>>> On Fri, Jul 12, 2024 at 4:29 AM Ron Liu <ron9....@gmail.com> > > wrote: > > > >>>>> > > > >>>>>> Hi, Junrui > > > >>>>>> > > > >>>>>> The FLIP proposal looks good to me. > > > >>>>>> > > > >>>>>> I have the same question as Fabian: > > > >>>>>> > > > >>>>>>> For join strategies, they are only > > > >>>>>> applicable when using an optimizer (that's currently not part of > > > >>> Flink's > > > >>>>>> runtime) with the Table API or Flink SQL. How do we plan to > > connect > > > >>> the > > > >>>>>> optimizer with Flink's runtime? > > > >>>>>> > > > >>>>>> For batch scenario, if we want to better support dynamic plan > > tuning > > > >>>>>> strategies, the fundamental solution is still to put SQL > Optimizer > > > >> to > > > >>>>>> flink-runtime. > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Ron > > > >>>>>> > > > >>>>>> David Morávek <d...@apache.org> 于2024年7月11日周四 19:17写道: > > > >>>>>> > > > >>>>>>> Hi Junrui, > > > >>>>>>> > > > >>>>>>> Thank you for drafting the FLIP. I really appreciate the > > direction > > > >>>>> it’s > > > >>>>>>> taking. We’ve discussed similar approaches multiple times, and > > > >> it’s > > > >>>>> great > > > >>>>>>> to see this progress. > > > >>>>>>> > > > >>>>>>> I have a few questions and thoughts: > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> * 1. Transformations in StreamGraphGenerator:* > > > >>>>>>> Should we consider taking this a step further by working on a > > list > > > >>> of > > > >>>>>>> transformations (inputs of StreamGraphGenerator)? > > > >>>>>>> > > > >>>>>>> public StreamGraphGenerator( > > > >>>>>>> List<Transformation<?>> transformations, > > > >>>>>>> ExecutionConfig executionConfig, > > > >>>>>>> CheckpointConfig checkpointConfig, > > > >>>>>>> ReadableConfig configuration) { > > > >>>>>>> > > > >>>>>>> We could potentially merge ExecutionConfig and CheckpointConfig > > > >> into > > > >>>>>>> ReadableConfig. This approach might offer us even more > > > >> flexibility. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> *2. StreamGraph for Recovery Purposes:* > > > >>>>>>> Should we avoid using StreamGraph for recovery purposes? The > > > >>> existing > > > >>>>>>> JG-based recovery code paths took years to perfect, and it > > doesn’t > > > >>>>> seem > > > >>>>>>> necessary to replace them. We only need SG for cases where we > > want > > > >>> to > > > >>>>>>> regenerate the JG. > > > >>>>>>> Additionally, translating SG into JG before persisting it in HA > > > >>> could > > > >>>>> be > > > >>>>>>> beneficial, as it allows us to catch potential issues early on. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> * 3. Moving Away from Java Serializables:* > > > >>>>>>> It would be great to start moving away from Java Serializables > as > > > >>>>> much as > > > >>>>>>> possible. Could we instead define proper versioned serializers, > > > >>>>> possibly > > > >>>>>>> based on a well-defined protobuf blueprint? This change could > > help > > > >>> us > > > >>>>>> avoid > > > >>>>>>> ongoing issues associated with Serializables. > > > >>>>>>> > > > >>>>>>> Looking forward to your thoughts. > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> D. > > > >>>>>>> > > > >>>>>>> On Thu, Jul 11, 2024 at 12:58 PM Fabian Paul <fp...@apache.org > > > > > >>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Thanks for drafting this FLIP. I really like the idea of > > > >>>>> introducing a > > > >>>>>>>> concept in Flink that is close to a logical plan submission. > > > >>>>>>>> > > > >>>>>>>> I have a few questions about the proposal and its future > > > >>>>> evolvability. > > > >>>>>>>> > > > >>>>>>>> - What is the future plan for job submissions in Flink? With > the > > > >>>>>> current > > > >>>>>>>> proposal, Flink will support JobGraph/StreamGraph/compiled > plan > > > >>>>>>>> submissions? It might be confusing for users and complicate > the > > > >>>>>> existing > > > >>>>>>>> job submission logic significantly. > > > >>>>>>>> - The FLIP mentions multiple areas of optimization, first > > > >> operator > > > >>>>>>> chaining > > > >>>>>>>> and second dynamic switches between join strategies. I think > > > >> from > > > >>> a > > > >>>>>> Flink > > > >>>>>>>> perspective, these are, at the moment, separate concerns. For > > > >>>>> operator > > > >>>>>>>> chaining, I agree with the current proposal, which is a > concept > > > >>> that > > > >>>>>>>> applies generally to Flink's runtime. For join strategies, > they > > > >>> are > > > >>>>>> only > > > >>>>>>>> applicable when using an optimizer (that's currently not part > of > > > >>>>>> Flink's > > > >>>>>>>> runtime) with the Table API or Flink SQL. How do we plan to > > > >>> connect > > > >>>>> the > > > >>>>>>>> optimizer with Flink's runtime? > > > >>>>>>>> - With table/SQL API we already expose a compiled plan to > > > >> support > > > >>>>>> stable > > > >>>>>>>> version upgrades. It would be great to explore a joined plan > to > > > >>> also > > > >>>>>>> offer > > > >>>>>>>> stable version upgrades with a potentially persistent > > > >> streamgraph. > > > >>>>>>>> > > > >>>>>>>> Best, > > > >>>>>>>> Fabian > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > > > >