Hi Junrui,

Thanks for driving this. It’s really helpful for user to explore the job they 
submitted. 

It's an important cornerstone of the FLINK-33230 since there are no 
stream-graph informations serialized.
I'd like to base on this flip to expose operator level metrics and the 
presentation of stream-graphs on the web to enable fine-grained observations 
for users, as mentioned in Future Work.


Best,
Yu Chen


> 2024年7月24日 16:03,Junrui Lee <jrlee....@gmail.com> 写道:
> 
> Hi all,
> 
> Thank you for all the feedback and suggestions so far. If there are no
> further comments, we will open the voting thread on Friday, July 26, 2024.
> 
> Best regards,
> Junrui
> 
> Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 13:43写道:
> 
>> Hi, Junrui
>> 
>> Thanks for your detailed reply.
>> 
>> After reading the updated FLIP-468 & FLIP-470, I see that the design looks
>> good.
>> 
>> 
>> Best.
>> Ron
>> 
>> Junrui Lee <jrlee....@gmail.com> 于2024年7月18日周四 14:26写道:
>> 
>>> Hi all,
>>> 
>>> I would like to follow up on my previous email regarding your feedback.
>>> Below
>>> is a concise summary of my main points:
>>> 
>>> 1. Compiled Plan:
>>> IIUC, the compiled plan is primarily for ensuring execution plan
>>> compatibility across job versions (e.g., during upgrades). Eventually, it
>>> needs to be converted to StreamGraph for submission. Persisting
>>> StreamGraph/JobGraph is for supporting high availability in jm failover
>>> scenarios.
>>> 
>>> 2. JobGraph vs StreamGraph Submission:
>>> For users, whether to submit JobGraph or StreamGraph is mostly
>> transparent.
>>> In the client mode, this detail is hidden from users. The REST API
>> (/jobs)
>>> was originally serving the internal JobGraph class, intended for the
>> Flink
>>> client, not as a strict public API.
>>> 
>>> 3. Runtime SQL Optimization:
>>> To facilitate runtime SQL-related optimization, we will introduce a
>>> strategy mechanism called StreamGraphOptimizationStrategy. SQL can
>>> implement specific strategies, such as
>>> AdaptiveBroadcastJoinOptimizationStrategy. More details are available in
>>> FLIP-469 [1].
>>> 
>>> 4. Persistence Strategy:
>>> Initially, we can retain the JobGraph persistence method to ensure it
>> does
>>> not affect stream jobs. After the new method is validated in batch
>>> processing scenarios over several versions, we can unify the stream jobs
>> to
>>> also use StreamGraph persistence.
>>> 
>>> Your thoughts and insights on these points would be highly appreciated.
>>> 
>>> [1]
>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
>>> 
>>> Best regards,
>>> 
>>> Junrui
>>> 
>>> Junrui Lee <jrlee....@gmail.com> 于2024年7月12日周五 20:29写道:
>>> 
>>>> Hi all,
>>>> 
>>>> Thanks for your feedback. Below are my thoughts on the questions you've
>>>> raised
>>>> 
>>>> @Fabian
>>>> 
>>>> - What is the future plan for job submissions in Flink? With the
>> current
>>>>> proposal, Flink will support JobGraph/StreamGraph/compiled plan
>>>>> submissions? It might be confusing for users and complicate the
>> existing
>>>>> job submission logic significantly.
>>>> 
>>>> 
>>>> In my view, Flink should support submissions via StreamGraph and,
>>>> eventually, remove support for JobGraph submissions. As for the
>> compiled
>>>> plan, I consider it to be a concept related to table/sql, which also
>>> needs
>>>> to be compiled into the corresponding StreamGraph (a more generalized
>>>> concept, applicable as well to DataStream API jobs) upon submission.
>>>> In this FLIP, considering to provide a smoother migration for users,
>> and
>>>> recognizing that currently, a vast number of Flink UT/IT cases depend
>> on
>>>> the submission of a specialized JobGraph to achieve test objectives,
>>>> transitioning these test cases and replacing with StreamGraph would be
>> a
>>>> highly risky and complicated action. Therefore, I am inclined not to
>>>> eliminate the pathway of submitting JobGraph within this FLIP.
>>>> 
>>>> How do we plan to connect the optimizer with Flink's runtime?
>>>> 
>>>> @Ron
>>>> 
>>>>> For batch scenario, if we want to better support dynamic plan tuning
>>>>> strategies, the fundamental solution is still to put SQL Optimizer to
>>>>> flink-runtime.
>>>> 
>>>> 
>>>> Our current solution is to abstract an interface called
>>>> StreamGraphOptimizationStrategy. For table-related optimization
>>> strategies,
>>>> taking the Broadcast Join concept as an example, we can implement an
>>>> AdaptiveBroadcastJoinOptimizationStrategy at the table layer. When this
>>>> strategy is needed, during SQL compilation, we will set the
>> configuration
>>>> item execution.batch.adaptive.stream-graph-optimization.strategies.
>> Then,
>>>> the runtime layer will load the corresponding Strategy class based on
>>> this
>>>> configuration item to be used at runtime.
>>>> 
>>>> We will describe this part in the StreamGraphOptimizer section of
>>>> FLIP-469, and the specific introduction of
>>>> AdaptiveBroadcastJoinOptimizationStrategy will be discussed in
>> subsequent
>>>> FLIPs, which is expected to happen within a few days.
>>>> 
>>>> @David
>>>> 
>>>>> 1. Transformations in StreamGraphGenerator:*
>>>>> 
>>>> 
>>>> I'm inclined to support submitting StreamGraph because it could already
>>>> provide the actual logical plan of the job at runtime. To be honest,
>> I'm
>>>> not sure what additional benefits submitting Transformations would
>> bring
>>>> compared to StreamGraph. I would appreciate any insights that you might
>>>> offer on this matter.
>>>> 
>>>> *2. StreamGraph for Recovery Purposes:
>>>> 
>>>> 
>>>> In fact, this conflicts with our desire to make adaptive optimization
>> to
>>>> the StreamGraph at runtime, as under such scenarios, the StreamGraph is
>>> the
>>>> complete expression of a job's logic, not the JobGraph. More details
>> can
>>>> refer to the specific details in FLIP-469.
>>>> 
>>>> I have reviewed the code related to the JobGraphStore and discovered
>> that
>>>> it can be extended to store both StreamGraph and JobGraph
>> simultaneously.
>>>> As for your concerns, can we consider the following: in batch mode, we
>>> use
>>>> Job Recovery based on StreamGraph, whereas for stream mode, we continue
>>> to
>>>> use the original JobGraph recovery and the StreamGraph would be
>> converted
>>>> to a JobGraph right at the beginning.
>>>> 
>>>> 3. Moving Away from Java Serializables:
>>>> 
>>>> 
>>>> Are you suggesting that Java serialization has limitations, and that we
>>>> should explore alternative serialization approaches? I agree that this
>>> is a
>>>> valuable consideration for the future. Do you think this should be
>>> included
>>>> in this FLIP? I would prefer to address it as a separate FLIP.
>>>> 
>>>> Best,
>>>> Junrui
>>>> 
>>>> David Morávek <david.mora...@gmail.com> 于2024年7月12日周五 14:47写道:
>>>> 
>>>>>> 
>>>>>> For batch scenario, if we want to better support dynamic plan tuning
>>>>>> strategies, the fundamental solution is still to put SQL Optimizer
>> to
>>>>>> flink-runtime.
>>>>> 
>>>>> 
>>>>> One accompanying question is: how do you envision this to work for
>>>>> streaming where you need to ensure state compatibility after the plan
>>>>> change? FLIP-496 seems to only focus on batch.
>>>>> 
>>>>> Best,
>>>>> D.
>>>>> 
>>>>> On Fri, Jul 12, 2024 at 4:29 AM Ron Liu <ron9....@gmail.com> wrote:
>>>>> 
>>>>>> Hi, Junrui
>>>>>> 
>>>>>> The FLIP proposal looks good to me.
>>>>>> 
>>>>>> I have the same question as Fabian:
>>>>>> 
>>>>>>> For join strategies, they are only
>>>>>> applicable when using an optimizer (that's currently not part of
>>> Flink's
>>>>>> runtime) with the Table API or Flink SQL. How do we plan to connect
>>> the
>>>>>> optimizer with Flink's runtime?
>>>>>> 
>>>>>> For batch scenario, if we want to better support dynamic plan tuning
>>>>>> strategies, the fundamental solution is still to put SQL Optimizer
>> to
>>>>>> flink-runtime.
>>>>>> 
>>>>>> Best,
>>>>>> Ron
>>>>>> 
>>>>>> David Morávek <d...@apache.org> 于2024年7月11日周四 19:17写道:
>>>>>> 
>>>>>>> Hi Junrui,
>>>>>>> 
>>>>>>> Thank you for drafting the FLIP. I really appreciate the direction
>>>>> it’s
>>>>>>> taking. We’ve discussed similar approaches multiple times, and
>> it’s
>>>>> great
>>>>>>> to see this progress.
>>>>>>> 
>>>>>>> I have a few questions and thoughts:
>>>>>>> 
>>>>>>> 
>>>>>>> * 1. Transformations in StreamGraphGenerator:*
>>>>>>> Should we consider taking this a step further by working on a list
>>> of
>>>>>>> transformations (inputs of StreamGraphGenerator)?
>>>>>>> 
>>>>>>>    public StreamGraphGenerator(
>>>>>>>            List<Transformation<?>> transformations,
>>>>>>>            ExecutionConfig executionConfig,
>>>>>>>            CheckpointConfig checkpointConfig,
>>>>>>>            ReadableConfig configuration) {
>>>>>>> 
>>>>>>> We could potentially merge ExecutionConfig and CheckpointConfig
>> into
>>>>>>> ReadableConfig. This approach might offer us even more
>> flexibility.
>>>>>>> 
>>>>>>> 
>>>>>>> *2. StreamGraph for Recovery Purposes:*
>>>>>>> Should we avoid using StreamGraph for recovery purposes? The
>>> existing
>>>>>>> JG-based recovery code paths took years to perfect, and it doesn’t
>>>>> seem
>>>>>>> necessary to replace them. We only need SG for cases where we want
>>> to
>>>>>>> regenerate the JG.
>>>>>>> Additionally, translating SG into JG before persisting it in HA
>>> could
>>>>> be
>>>>>>> beneficial, as it allows us to catch potential issues early on.
>>>>>>> 
>>>>>>> 
>>>>>>> * 3. Moving Away from Java Serializables:*
>>>>>>> It would be great to start moving away from Java Serializables as
>>>>> much as
>>>>>>> possible. Could we instead define proper versioned serializers,
>>>>> possibly
>>>>>>> based on a well-defined protobuf blueprint? This change could help
>>> us
>>>>>> avoid
>>>>>>> ongoing issues associated with Serializables.
>>>>>>> 
>>>>>>> Looking forward to your thoughts.
>>>>>>> 
>>>>>>> Best,
>>>>>>> D.
>>>>>>> 
>>>>>>> On Thu, Jul 11, 2024 at 12:58 PM Fabian Paul <fp...@apache.org>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Thanks for drafting this FLIP. I really like the idea of
>>>>> introducing a
>>>>>>>> concept in Flink that is close to a logical plan submission.
>>>>>>>> 
>>>>>>>> I have a few questions about the proposal and its future
>>>>> evolvability.
>>>>>>>> 
>>>>>>>> - What is the future plan for job submissions in Flink? With the
>>>>>> current
>>>>>>>> proposal, Flink will support JobGraph/StreamGraph/compiled plan
>>>>>>>> submissions? It might be confusing for users and complicate the
>>>>>> existing
>>>>>>>> job submission logic significantly.
>>>>>>>> - The FLIP mentions multiple areas of optimization, first
>> operator
>>>>>>> chaining
>>>>>>>> and second dynamic switches between join strategies. I think
>> from
>>> a
>>>>>> Flink
>>>>>>>> perspective, these are, at the moment, separate concerns.  For
>>>>> operator
>>>>>>>> chaining, I agree with the current proposal, which is a concept
>>> that
>>>>>>>> applies generally to Flink's runtime. For join strategies, they
>>> are
>>>>>> only
>>>>>>>> applicable when using an optimizer (that's currently not part of
>>>>>> Flink's
>>>>>>>> runtime) with the Table API or Flink SQL. How do we plan to
>>> connect
>>>>> the
>>>>>>>> optimizer with Flink's runtime?
>>>>>>>> - With table/SQL API we already expose a compiled plan to
>> support
>>>>>> stable
>>>>>>>> version upgrades. It would be great to explore a joined plan to
>>> also
>>>>>>> offer
>>>>>>>> stable version upgrades with a potentially persistent
>> streamgraph.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Fabian
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to