Hi Junrui, Thanks for driving this. It’s really helpful for user to explore the job they submitted.
It's an important cornerstone of the FLINK-33230 since there are no stream-graph informations serialized. I'd like to base on this flip to expose operator level metrics and the presentation of stream-graphs on the web to enable fine-grained observations for users, as mentioned in Future Work. Best, Yu Chen > 2024年7月24日 16:03,Junrui Lee <jrlee....@gmail.com> 写道: > > Hi all, > > Thank you for all the feedback and suggestions so far. If there are no > further comments, we will open the voting thread on Friday, July 26, 2024. > > Best regards, > Junrui > > Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 13:43写道: > >> Hi, Junrui >> >> Thanks for your detailed reply. >> >> After reading the updated FLIP-468 & FLIP-470, I see that the design looks >> good. >> >> >> Best. >> Ron >> >> Junrui Lee <jrlee....@gmail.com> 于2024年7月18日周四 14:26写道: >> >>> Hi all, >>> >>> I would like to follow up on my previous email regarding your feedback. >>> Below >>> is a concise summary of my main points: >>> >>> 1. Compiled Plan: >>> IIUC, the compiled plan is primarily for ensuring execution plan >>> compatibility across job versions (e.g., during upgrades). Eventually, it >>> needs to be converted to StreamGraph for submission. Persisting >>> StreamGraph/JobGraph is for supporting high availability in jm failover >>> scenarios. >>> >>> 2. JobGraph vs StreamGraph Submission: >>> For users, whether to submit JobGraph or StreamGraph is mostly >> transparent. >>> In the client mode, this detail is hidden from users. The REST API >> (/jobs) >>> was originally serving the internal JobGraph class, intended for the >> Flink >>> client, not as a strict public API. >>> >>> 3. Runtime SQL Optimization: >>> To facilitate runtime SQL-related optimization, we will introduce a >>> strategy mechanism called StreamGraphOptimizationStrategy. SQL can >>> implement specific strategies, such as >>> AdaptiveBroadcastJoinOptimizationStrategy. More details are available in >>> FLIP-469 [1]. >>> >>> 4. Persistence Strategy: >>> Initially, we can retain the JobGraph persistence method to ensure it >> does >>> not affect stream jobs. After the new method is validated in batch >>> processing scenarios over several versions, we can unify the stream jobs >> to >>> also use StreamGraph persistence. >>> >>> Your thoughts and insights on these points would be highly appreciated. >>> >>> [1] >>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph >>> >>> Best regards, >>> >>> Junrui >>> >>> Junrui Lee <jrlee....@gmail.com> 于2024年7月12日周五 20:29写道: >>> >>>> Hi all, >>>> >>>> Thanks for your feedback. Below are my thoughts on the questions you've >>>> raised >>>> >>>> @Fabian >>>> >>>> - What is the future plan for job submissions in Flink? With the >> current >>>>> proposal, Flink will support JobGraph/StreamGraph/compiled plan >>>>> submissions? It might be confusing for users and complicate the >> existing >>>>> job submission logic significantly. >>>> >>>> >>>> In my view, Flink should support submissions via StreamGraph and, >>>> eventually, remove support for JobGraph submissions. As for the >> compiled >>>> plan, I consider it to be a concept related to table/sql, which also >>> needs >>>> to be compiled into the corresponding StreamGraph (a more generalized >>>> concept, applicable as well to DataStream API jobs) upon submission. >>>> In this FLIP, considering to provide a smoother migration for users, >> and >>>> recognizing that currently, a vast number of Flink UT/IT cases depend >> on >>>> the submission of a specialized JobGraph to achieve test objectives, >>>> transitioning these test cases and replacing with StreamGraph would be >> a >>>> highly risky and complicated action. Therefore, I am inclined not to >>>> eliminate the pathway of submitting JobGraph within this FLIP. >>>> >>>> How do we plan to connect the optimizer with Flink's runtime? >>>> >>>> @Ron >>>> >>>>> For batch scenario, if we want to better support dynamic plan tuning >>>>> strategies, the fundamental solution is still to put SQL Optimizer to >>>>> flink-runtime. >>>> >>>> >>>> Our current solution is to abstract an interface called >>>> StreamGraphOptimizationStrategy. For table-related optimization >>> strategies, >>>> taking the Broadcast Join concept as an example, we can implement an >>>> AdaptiveBroadcastJoinOptimizationStrategy at the table layer. When this >>>> strategy is needed, during SQL compilation, we will set the >> configuration >>>> item execution.batch.adaptive.stream-graph-optimization.strategies. >> Then, >>>> the runtime layer will load the corresponding Strategy class based on >>> this >>>> configuration item to be used at runtime. >>>> >>>> We will describe this part in the StreamGraphOptimizer section of >>>> FLIP-469, and the specific introduction of >>>> AdaptiveBroadcastJoinOptimizationStrategy will be discussed in >> subsequent >>>> FLIPs, which is expected to happen within a few days. >>>> >>>> @David >>>> >>>>> 1. Transformations in StreamGraphGenerator:* >>>>> >>>> >>>> I'm inclined to support submitting StreamGraph because it could already >>>> provide the actual logical plan of the job at runtime. To be honest, >> I'm >>>> not sure what additional benefits submitting Transformations would >> bring >>>> compared to StreamGraph. I would appreciate any insights that you might >>>> offer on this matter. >>>> >>>> *2. StreamGraph for Recovery Purposes: >>>> >>>> >>>> In fact, this conflicts with our desire to make adaptive optimization >> to >>>> the StreamGraph at runtime, as under such scenarios, the StreamGraph is >>> the >>>> complete expression of a job's logic, not the JobGraph. More details >> can >>>> refer to the specific details in FLIP-469. >>>> >>>> I have reviewed the code related to the JobGraphStore and discovered >> that >>>> it can be extended to store both StreamGraph and JobGraph >> simultaneously. >>>> As for your concerns, can we consider the following: in batch mode, we >>> use >>>> Job Recovery based on StreamGraph, whereas for stream mode, we continue >>> to >>>> use the original JobGraph recovery and the StreamGraph would be >> converted >>>> to a JobGraph right at the beginning. >>>> >>>> 3. Moving Away from Java Serializables: >>>> >>>> >>>> Are you suggesting that Java serialization has limitations, and that we >>>> should explore alternative serialization approaches? I agree that this >>> is a >>>> valuable consideration for the future. Do you think this should be >>> included >>>> in this FLIP? I would prefer to address it as a separate FLIP. >>>> >>>> Best, >>>> Junrui >>>> >>>> David Morávek <david.mora...@gmail.com> 于2024年7月12日周五 14:47写道: >>>> >>>>>> >>>>>> For batch scenario, if we want to better support dynamic plan tuning >>>>>> strategies, the fundamental solution is still to put SQL Optimizer >> to >>>>>> flink-runtime. >>>>> >>>>> >>>>> One accompanying question is: how do you envision this to work for >>>>> streaming where you need to ensure state compatibility after the plan >>>>> change? FLIP-496 seems to only focus on batch. >>>>> >>>>> Best, >>>>> D. >>>>> >>>>> On Fri, Jul 12, 2024 at 4:29 AM Ron Liu <ron9....@gmail.com> wrote: >>>>> >>>>>> Hi, Junrui >>>>>> >>>>>> The FLIP proposal looks good to me. >>>>>> >>>>>> I have the same question as Fabian: >>>>>> >>>>>>> For join strategies, they are only >>>>>> applicable when using an optimizer (that's currently not part of >>> Flink's >>>>>> runtime) with the Table API or Flink SQL. How do we plan to connect >>> the >>>>>> optimizer with Flink's runtime? >>>>>> >>>>>> For batch scenario, if we want to better support dynamic plan tuning >>>>>> strategies, the fundamental solution is still to put SQL Optimizer >> to >>>>>> flink-runtime. >>>>>> >>>>>> Best, >>>>>> Ron >>>>>> >>>>>> David Morávek <d...@apache.org> 于2024年7月11日周四 19:17写道: >>>>>> >>>>>>> Hi Junrui, >>>>>>> >>>>>>> Thank you for drafting the FLIP. I really appreciate the direction >>>>> it’s >>>>>>> taking. We’ve discussed similar approaches multiple times, and >> it’s >>>>> great >>>>>>> to see this progress. >>>>>>> >>>>>>> I have a few questions and thoughts: >>>>>>> >>>>>>> >>>>>>> * 1. Transformations in StreamGraphGenerator:* >>>>>>> Should we consider taking this a step further by working on a list >>> of >>>>>>> transformations (inputs of StreamGraphGenerator)? >>>>>>> >>>>>>> public StreamGraphGenerator( >>>>>>> List<Transformation<?>> transformations, >>>>>>> ExecutionConfig executionConfig, >>>>>>> CheckpointConfig checkpointConfig, >>>>>>> ReadableConfig configuration) { >>>>>>> >>>>>>> We could potentially merge ExecutionConfig and CheckpointConfig >> into >>>>>>> ReadableConfig. This approach might offer us even more >> flexibility. >>>>>>> >>>>>>> >>>>>>> *2. StreamGraph for Recovery Purposes:* >>>>>>> Should we avoid using StreamGraph for recovery purposes? The >>> existing >>>>>>> JG-based recovery code paths took years to perfect, and it doesn’t >>>>> seem >>>>>>> necessary to replace them. We only need SG for cases where we want >>> to >>>>>>> regenerate the JG. >>>>>>> Additionally, translating SG into JG before persisting it in HA >>> could >>>>> be >>>>>>> beneficial, as it allows us to catch potential issues early on. >>>>>>> >>>>>>> >>>>>>> * 3. Moving Away from Java Serializables:* >>>>>>> It would be great to start moving away from Java Serializables as >>>>> much as >>>>>>> possible. Could we instead define proper versioned serializers, >>>>> possibly >>>>>>> based on a well-defined protobuf blueprint? This change could help >>> us >>>>>> avoid >>>>>>> ongoing issues associated with Serializables. >>>>>>> >>>>>>> Looking forward to your thoughts. >>>>>>> >>>>>>> Best, >>>>>>> D. >>>>>>> >>>>>>> On Thu, Jul 11, 2024 at 12:58 PM Fabian Paul <fp...@apache.org> >>>>> wrote: >>>>>>> >>>>>>>> Thanks for drafting this FLIP. I really like the idea of >>>>> introducing a >>>>>>>> concept in Flink that is close to a logical plan submission. >>>>>>>> >>>>>>>> I have a few questions about the proposal and its future >>>>> evolvability. >>>>>>>> >>>>>>>> - What is the future plan for job submissions in Flink? With the >>>>>> current >>>>>>>> proposal, Flink will support JobGraph/StreamGraph/compiled plan >>>>>>>> submissions? It might be confusing for users and complicate the >>>>>> existing >>>>>>>> job submission logic significantly. >>>>>>>> - The FLIP mentions multiple areas of optimization, first >> operator >>>>>>> chaining >>>>>>>> and second dynamic switches between join strategies. I think >> from >>> a >>>>>> Flink >>>>>>>> perspective, these are, at the moment, separate concerns. For >>>>> operator >>>>>>>> chaining, I agree with the current proposal, which is a concept >>> that >>>>>>>> applies generally to Flink's runtime. For join strategies, they >>> are >>>>>> only >>>>>>>> applicable when using an optimizer (that's currently not part of >>>>>> Flink's >>>>>>>> runtime) with the Table API or Flink SQL. How do we plan to >>> connect >>>>> the >>>>>>>> optimizer with Flink's runtime? >>>>>>>> - With table/SQL API we already expose a compiled plan to >> support >>>>>> stable >>>>>>>> version upgrades. It would be great to explore a joined plan to >>> also >>>>>>> offer >>>>>>>> stable version upgrades with a potentially persistent >> streamgraph. >>>>>>>> >>>>>>>> Best, >>>>>>>> Fabian >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>