Hi all,

I would like to follow up on my previous email regarding your feedback. Below
is a concise summary of my main points:

1. Compiled Plan:
IIUC, the compiled plan is primarily for ensuring execution plan
compatibility across job versions (e.g., during upgrades). Eventually, it
needs to be converted to StreamGraph for submission. Persisting
StreamGraph/JobGraph is for supporting high availability in jm failover
scenarios.

2. JobGraph vs StreamGraph Submission:
For users, whether to submit JobGraph or StreamGraph is mostly transparent.
In the client mode, this detail is hidden from users. The REST API (/jobs)
was originally serving the internal JobGraph class, intended for the Flink
client, not as a strict public API.

3. Runtime SQL Optimization:
To facilitate runtime SQL-related optimization, we will introduce a
strategy mechanism called StreamGraphOptimizationStrategy. SQL can
implement specific strategies, such as
AdaptiveBroadcastJoinOptimizationStrategy. More details are available in
FLIP-469 [1].

4. Persistence Strategy:
Initially, we can retain the JobGraph persistence method to ensure it does
not affect stream jobs. After the new method is validated in batch
processing scenarios over several versions, we can unify the stream jobs to
also use StreamGraph persistence.

Your thoughts and insights on these points would be highly appreciated.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph

Best regards,

Junrui

Junrui Lee <jrlee....@gmail.com> 于2024年7月12日周五 20:29写道:

> Hi all,
>
> Thanks for your feedback. Below are my thoughts on the questions you've
> raised
>
> @Fabian
>
> - What is the future plan for job submissions in Flink? With the current
>> proposal, Flink will support JobGraph/StreamGraph/compiled plan
>> submissions? It might be confusing for users and complicate the existing
>> job submission logic significantly.
>
>
> In my view, Flink should support submissions via StreamGraph and,
> eventually, remove support for JobGraph submissions. As for the compiled
> plan, I consider it to be a concept related to table/sql, which also needs
> to be compiled into the corresponding StreamGraph (a more generalized
> concept, applicable as well to DataStream API jobs) upon submission.
> In this FLIP, considering to provide a smoother migration for users, and
> recognizing that currently, a vast number of Flink UT/IT cases depend on
> the submission of a specialized JobGraph to achieve test objectives,
> transitioning these test cases and replacing with StreamGraph would be a
> highly risky and complicated action. Therefore, I am inclined not to
> eliminate the pathway of submitting JobGraph within this FLIP.
>
>  How do we plan to connect the optimizer with Flink's runtime?
>
> @Ron
>
>> For batch scenario, if we want to better support dynamic plan tuning
>> strategies, the fundamental solution is still to put SQL Optimizer to
>> flink-runtime.
>
>
> Our current solution is to abstract an interface called
> StreamGraphOptimizationStrategy. For table-related optimization strategies,
> taking the Broadcast Join concept as an example, we can implement an
> AdaptiveBroadcastJoinOptimizationStrategy at the table layer. When this
> strategy is needed, during SQL compilation, we will set the configuration
> item execution.batch.adaptive.stream-graph-optimization.strategies. Then,
> the runtime layer will load the corresponding Strategy class based on this
> configuration item to be used at runtime.
>
> We will describe this part in the StreamGraphOptimizer section of
> FLIP-469, and the specific introduction of
> AdaptiveBroadcastJoinOptimizationStrategy will be discussed in subsequent
> FLIPs, which is expected to happen within a few days.
>
> @David
>
>> 1. Transformations in StreamGraphGenerator:*
>>
>
> I'm inclined to support submitting StreamGraph because it could already
> provide the actual logical plan of the job at runtime. To be honest, I'm
> not sure what additional benefits submitting Transformations would bring
> compared to StreamGraph. I would appreciate any insights that you might
> offer on this matter.
>
> *2. StreamGraph for Recovery Purposes:
>
>
> In fact, this conflicts with our desire to make adaptive optimization to
> the StreamGraph at runtime, as under such scenarios, the StreamGraph is the
> complete expression of a job's logic, not the JobGraph. More details can
> refer to the specific details in FLIP-469.
>
> I have reviewed the code related to the JobGraphStore and discovered that
> it can be extended to store both StreamGraph and JobGraph simultaneously.
> As for your concerns, can we consider the following: in batch mode, we use
> Job Recovery based on StreamGraph, whereas for stream mode, we continue to
> use the original JobGraph recovery and the StreamGraph would be converted
> to a JobGraph right at the beginning.
>
> 3. Moving Away from Java Serializables:
>
>
> Are you suggesting that Java serialization has limitations, and that we
> should explore alternative serialization approaches? I agree that this is a
> valuable consideration for the future. Do you think this should be included
> in this FLIP? I would prefer to address it as a separate FLIP.
>
> Best,
> Junrui
>
> David Morávek <david.mora...@gmail.com> 于2024年7月12日周五 14:47写道:
>
>> >
>> > For batch scenario, if we want to better support dynamic plan tuning
>> > strategies, the fundamental solution is still to put SQL Optimizer to
>> > flink-runtime.
>>
>>
>> One accompanying question is: how do you envision this to work for
>> streaming where you need to ensure state compatibility after the plan
>> change? FLIP-496 seems to only focus on batch.
>>
>> Best,
>> D.
>>
>> On Fri, Jul 12, 2024 at 4:29 AM Ron Liu <ron9....@gmail.com> wrote:
>>
>> > Hi, Junrui
>> >
>> > The FLIP proposal looks good to me.
>> >
>> > I have the same question as Fabian:
>> >
>> > > For join strategies, they are only
>> > applicable when using an optimizer (that's currently not part of Flink's
>> > runtime) with the Table API or Flink SQL. How do we plan to connect the
>> > optimizer with Flink's runtime?
>> >
>> > For batch scenario, if we want to better support dynamic plan tuning
>> > strategies, the fundamental solution is still to put SQL Optimizer to
>> > flink-runtime.
>> >
>> > Best,
>> > Ron
>> >
>> > David Morávek <d...@apache.org> 于2024年7月11日周四 19:17写道:
>> >
>> > > Hi Junrui,
>> > >
>> > > Thank you for drafting the FLIP. I really appreciate the direction
>> it’s
>> > > taking. We’ve discussed similar approaches multiple times, and it’s
>> great
>> > > to see this progress.
>> > >
>> > > I have a few questions and thoughts:
>> > >
>> > >
>> > > * 1. Transformations in StreamGraphGenerator:*
>> > > Should we consider taking this a step further by working on a list of
>> > > transformations (inputs of StreamGraphGenerator)?
>> > >
>> > >     public StreamGraphGenerator(
>> > >             List<Transformation<?>> transformations,
>> > >             ExecutionConfig executionConfig,
>> > >             CheckpointConfig checkpointConfig,
>> > >             ReadableConfig configuration) {
>> > >
>> > > We could potentially merge ExecutionConfig and CheckpointConfig into
>> > > ReadableConfig. This approach might offer us even more flexibility.
>> > >
>> > >
>> > > *2. StreamGraph for Recovery Purposes:*
>> > > Should we avoid using StreamGraph for recovery purposes? The existing
>> > > JG-based recovery code paths took years to perfect, and it doesn’t
>> seem
>> > > necessary to replace them. We only need SG for cases where we want to
>> > > regenerate the JG.
>> > > Additionally, translating SG into JG before persisting it in HA could
>> be
>> > > beneficial, as it allows us to catch potential issues early on.
>> > >
>> > >
>> > > * 3. Moving Away from Java Serializables:*
>> > > It would be great to start moving away from Java Serializables as
>> much as
>> > > possible. Could we instead define proper versioned serializers,
>> possibly
>> > > based on a well-defined protobuf blueprint? This change could help us
>> > avoid
>> > > ongoing issues associated with Serializables.
>> > >
>> > > Looking forward to your thoughts.
>> > >
>> > > Best,
>> > > D.
>> > >
>> > > On Thu, Jul 11, 2024 at 12:58 PM Fabian Paul <fp...@apache.org>
>> wrote:
>> > >
>> > > > Thanks for drafting this FLIP. I really like the idea of
>> introducing a
>> > > > concept in Flink that is close to a logical plan submission.
>> > > >
>> > > > I have a few questions about the proposal and its future
>> evolvability.
>> > > >
>> > > > - What is the future plan for job submissions in Flink? With the
>> > current
>> > > > proposal, Flink will support JobGraph/StreamGraph/compiled plan
>> > > > submissions? It might be confusing for users and complicate the
>> > existing
>> > > > job submission logic significantly.
>> > > > - The FLIP mentions multiple areas of optimization, first operator
>> > > chaining
>> > > > and second dynamic switches between join strategies. I think from a
>> > Flink
>> > > > perspective, these are, at the moment, separate concerns.  For
>> operator
>> > > > chaining, I agree with the current proposal, which is a concept that
>> > > > applies generally to Flink's runtime. For join strategies, they are
>> > only
>> > > > applicable when using an optimizer (that's currently not part of
>> > Flink's
>> > > > runtime) with the Table API or Flink SQL. How do we plan to connect
>> the
>> > > > optimizer with Flink's runtime?
>> > > > - With table/SQL API we already expose a compiled plan to support
>> > stable
>> > > > version upgrades. It would be great to explore a joined plan to also
>> > > offer
>> > > > stable version upgrades with a potentially persistent streamgraph.
>> > > >
>> > > > Best,
>> > > > Fabian
>> > > >
>> > >
>> >
>>
>

Reply via email to