Hi, Thanks for the reminder. I will review it soon during my free time. Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年5月4日周六 10:10写道:
> Jinrui and Xia > > Gentle ping for reviews. > > On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan <vsowr...@asu.edu > > > wrote: > > > Hi Xia and Jinrui, > > > > Filed https://github.com/apache/flink/pull/24736 to address the above > > described issue. Please take a look whenever you can. > > > > Thanks > > Venkat > > > > > > On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan < > > vsowr...@asu.edu> wrote: > > > >> Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the > >> above described issue. Will share the PR here once it is ready for > review. > >> > >> Regards > >> Venkata krishnan > >> > >> > >> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee <jrlee....@gmail.com> wrote: > >> > >>> Thanks Venkata and Xia for providing further clarification. I think > your > >>> example illustrates the significance of this proposal very well. Please > >>> feel free go ahead and address the concerns. > >>> > >>> Best, > >>> Junrui > >>> > >>> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月16日周二 07:01写道: > >>> > >>> > Thanks for adding your thoughts to this discussion. > >>> > > >>> > If we all agree that the source vertex parallelism shouldn't be bound > >>> by > >>> > the downstream max parallelism > >>> > (jobmanager.adaptive-batch-scheduler.max-parallelism) > >>> > based on the rationale and the issues described above, I can take a > >>> stab at > >>> > addressing the issue. > >>> > > >>> > Let me file a ticket to track this issue. Otherwise, I'm looking > >>> forward to > >>> > hearing more thoughts from others as well, especially Lijie and > Junrui > >>> who > >>> > have more context on the AdaptiveBatchScheduler. > >>> > > >>> > Regards > >>> > Venkata krishnan > >>> > > >>> > > >>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun <xingbe...@gmail.com> > wrote: > >>> > > >>> > > Hi Venkat, > >>> > > I agree that the parallelism of source vertex should not be upper > >>> bounded > >>> > > by the job's global max parallelism. The case you mentioned, >> > High > >>> > filter > >>> > > selectivity with huge amounts of data to read excellently supports > >>> this > >>> > > viewpoint. (In fact, in the current implementation, if the source > >>> > > parallelism is pre-specified at job create stage, rather than > >>> relying on > >>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, > the > >>> > source > >>> > > vertex's parallelism can indeed exceed the job's global max > >>> parallelism.) > >>> > > > >>> > > As Lijie and Junrui pointed out, the key issue is "semantic > >>> consistency." > >>> > > Currently, if a vertex has not set maxParallelism, the > >>> > > AdaptiveBatchScheduler will use > >>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the > >>> > vertex's > >>> > > maxParallelism. Since the current implementation does not > distinguish > >>> > > between source vertices and downstream vertices, source vertices > are > >>> also > >>> > > subject to this limitation. > >>> > > > >>> > > Therefore, I believe that if the issue of "semantic consistency" > can > >>> be > >>> > > well explained in the code and configuration documentation, the > >>> > > AdaptiveBatchScheduler should support that the parallelism of > source > >>> > > vertices can exceed the job's global max parallelism. > >>> > > > >>> > > Best, > >>> > > Xia > >>> > > > >>> > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日 > 10:31写道: > >>> > > > >>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche* > >>> > > > *duler.default-source-parallelism*" should not be bound by the " > >>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". > >>> > > > > >>> > > > - Source vertex is unique and does not have any upstream > >>> vertices > >>> > > > - Downstream vertices read shuffled data partitioned by key, > >>> which > >>> > is > >>> > > > not the case for the Source vertex > >>> > > > - Limiting source parallelism by downstream vertices' max > >>> > parallelism > >>> > > is > >>> > > > incorrect > >>> > > > > >>> > > > If we say for ""semantic consistency" the source vertex > >>> parallelism has > >>> > > to > >>> > > > be bound by the overall job's max parallelism, it can lead to > >>> following > >>> > > > issues: > >>> > > > > >>> > > > - High filter selectivity with huge amounts of data to read - > >>> > setting > >>> > > > high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" > so > >>> that > >>> > > > source parallelism can be set higher can lead to small blocks > >>> and > >>> > > > sub-optimal performance. > >>> > > > - Setting high > >>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*" > >>> > > > requires careful tuning of network buffer configurations which > >>> is > >>> > > > unnecessary in cases where it is not required just so that the > >>> > source > >>> > > > parallelism can be set high. > >>> > > > > >>> > > > Regards > >>> > > > Venkata krishnan > >>> > > > > >>> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee <jrlee....@gmail.com> > >>> > wrote: > >>> > > > > >>> > > > > Hello Venkata krishnan, > >>> > > > > > >>> > > > > I think the term "semantic inconsistency" defined by > >>> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to > >>> > > > maintaining a > >>> > > > > uniform upper limit on parallelism across all vertices within a > >>> job. > >>> > As > >>> > > > the > >>> > > > > source vertices are part of the global execution graph, they > >>> should > >>> > > also > >>> > > > > respect this rule to ensure consistent application of > parallelism > >>> > > > > constraints. > >>> > > > > > >>> > > > > Best, > >>> > > > > Junrui > >>> > > > > > >>> > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五 > >>> 02:10写道: > >>> > > > > > >>> > > > > > Gentle bump on this question. cc @Becket Qin < > >>> becket....@gmail.com > >>> > > > >>> > > as > >>> > > > > > well. > >>> > > > > > > >>> > > > > > Regards > >>> > > > > > Venkata krishnan > >>> > > > > > > >>> > > > > > > >>> > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > >>> > > > > > vsowr...@asu.edu> wrote: > >>> > > > > > > >>> > > > > > > Thanks for the response Lijie and Junrui. Sorry for the > late > >>> > reply. > >>> > > > Few > >>> > > > > > > follow up questions. > >>> > > > > > > > >>> > > > > > > > Source can actually ignore this limit > >>> > > > > > > because it has no upstream, but this will lead to semantic > >>> > > > > inconsistency. > >>> > > > > > > > >>> > > > > > > Lijie, can you please elaborate on the above comment > further? > >>> > What > >>> > > do > >>> > > > > you > >>> > > > > > > mean when you say it will lead to "semantic inconsistency"? > >>> > > > > > > > >>> > > > > > > > Secondly, we first need to limit the max parallelism of > >>> > > > (downstream) > >>> > > > > > > vertex, and then we can decide how many subpartitions > >>> (upstream > >>> > > > vertex) > >>> > > > > > > should produce. The limit should be effective, otherwise > some > >>> > > > > downstream > >>> > > > > > > tasks will have no data to process. > >>> > > > > > > > >>> > > > > > > This makes sense in the context of any other vertices other > >>> than > >>> > > the > >>> > > > > > > source vertex. As you mentioned above ("Source can actually > >>> > ignore > >>> > > > this > >>> > > > > > > limit because it has no upstream"), therefore I feel " > >>> > > > > > > > >>> jobmanager.adaptive-batch-scheduler.default-source-parallelism" > >>> > > need > >>> > > > > not > >>> > > > > > > be upper bounded by > >>> > > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism". > >>> > > > > > > > >>> > > > > > > Regards > >>> > > > > > > Venkata krishnan > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee < > >>> jrlee....@gmail.com> > >>> > > > > wrote: > >>> > > > > > > > >>> > > > > > >> Hi Venkat, > >>> > > > > > >> > >>> > > > > > >> As Lijie mentioned, in Flink, the parallelism is required > >>> to be > >>> > > > less > >>> > > > > > than > >>> > > > > > >> or equal to the maximum parallelism. The config option > >>> > > > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and > >>> > > > > > >> > >>> jobmanager.adaptive-batch-scheduler.default-source-parallelism > >>> > > will > >>> > > > be > >>> > > > > > set > >>> > > > > > >> as the source's parallelism and max-parallelism, > >>> respectively. > >>> > > > > > Therefore, > >>> > > > > > >> the check failed situation you encountered is in line with > >>> the > >>> > > > > > >> expectations. > >>> > > > > > >> > >>> > > > > > >> Best, > >>> > > > > > >> Junrui > >>> > > > > > >> > >>> > > > > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 > >>> 17:35写道: > >>> > > > > > >> > >>> > > > > > >> > Hi Venkat, > >>> > > > > > >> > > >>> > > > > > >> > >> default-source-parallelism config should be > independent > >>> > from > >>> > > > the > >>> > > > > > >> > max-parallelism > >>> > > > > > >> > > >>> > > > > > >> > Actually, it's not. > >>> > > > > > >> > > >>> > > > > > >> > Firstly, it's obvious that the parallelism should be > less > >>> than > >>> > > or > >>> > > > > > equal > >>> > > > > > >> to > >>> > > > > > >> > the max parallelism(both literally and execution). The > >>> > > > > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" > >>> will be > >>> > > used > >>> > > > > as > >>> > > > > > >> the > >>> > > > > > >> > max parallelism for a vertex if you don't set max > >>> parallelism > >>> > > for > >>> > > > it > >>> > > > > > >> > individually (Just like the source in your case). > >>> > > > > > >> > > >>> > > > > > >> > Secondly, we first need to limit the max parallelism of > >>> > > > (downstream) > >>> > > > > > >> > vertex, and then we can decide how many subpartitions > >>> > (upstream > >>> > > > > > vertex) > >>> > > > > > >> > should produce. The limit should be effective, otherwise > >>> some > >>> > > > > > downstream > >>> > > > > > >> > tasks will have no data to process. Source can actually > >>> ignore > >>> > > > this > >>> > > > > > >> limit > >>> > > > > > >> > because it has no upstream, but this will lead to > semantic > >>> > > > > > >> inconsistency. > >>> > > > > > >> > > >>> > > > > > >> > Best, > >>> > > > > > >> > Lijie > >>> > > > > > >> > > >>> > > > > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> > >>> 于2024年2月29日周四 > >>> > > > > 05:49写道: > >>> > > > > > >> > > >>> > > > > > >> > > Hi Flink devs, > >>> > > > > > >> > > > >>> > > > > > >> > > With Flink's AdaptiveBatchScheduler > >>> > > > > > >> > > < > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > >> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$ > >>> > > > > > >> > > > > >>> > > > > > >> > > (Note: > >>> > > > > > >> > > this is different from AdaptiveScheduler > >>> > > > > > >> > > < > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > >> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$ > >>> > > > > > >> > > >), > >>> > > > > > >> > > the scheduler automatically determines the correct > >>> number of > >>> > > > > > >> downstream > >>> > > > > > >> > > tasks required to process the shuffle generated by the > >>> > > upstream > >>> > > > > > >> vertex. > >>> > > > > > >> > > > >>> > > > > > >> > > I have a question regarding the current behavior. > There > >>> are > >>> > 2 > >>> > > > > > configs > >>> > > > > > >> > which > >>> > > > > > >> > > are in interplay here. > >>> > > > > > >> > > 1. > >>> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism > >>> > > > > > >> > > < > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > >> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > >>> > > > > > >> > > > > >>> > > > > > >> > > - The default parallelism of data source. > >>> > > > > > >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism > >>> > > > > > >> > > < > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > >> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ > >>> > > > > > >> > > > > >>> > > > > > >> > > - > >>> > > > > > >> > > Upper bound of allowed parallelism to set adaptively. > >>> > > > > > >> > > > >>> > > > > > >> > > Currently, if " > >>> > > > > > >> > > > >>> > jobmanager.adaptive-batch-scheduler.default-source-parallelism > >>> > > > > > >> > > < > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > >> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > >>> > > > > > >> > > >" > >>> > > > > > >> > > is greater than > >>> > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism > >>> > > > > > >> > > < > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > >> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ > >>> > > > > > >> > > >", > >>> > > > > > >> > > Flink application fails with the below message: > >>> > > > > > >> > > > >>> > > > > > >> > > "Vertex's parallelism should be smaller than or equal > to > >>> > > > vertex's > >>> > > > > > max > >>> > > > > > >> > > parallelism." > >>> > > > > > >> > > > >>> > > > > > >> > > This is the corresponding code in Flink's > >>> > > > > > DefaultVertexParallelismInfo > >>> > > > > > >> > > < > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > >> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$ > >>> > > > > > >> > > >. > >>> > > > > > >> > > My question is, "default-source-parallelism" config > >>> should > >>> > be > >>> > > > > > >> independent > >>> > > > > > >> > > from the "max-parallelism" flag. The former controls > the > >>> > > default > >>> > > > > > >> source > >>> > > > > > >> > > parallelism while the latter controls the max number > of > >>> > > > partitions > >>> > > > > > to > >>> > > > > > >> > write > >>> > > > > > >> > > the intermediate shuffle. > >>> > > > > > >> > > > >>> > > > > > >> > > If this is true, then the above check should be fixed. > >>> > > > Otherwise, > >>> > > > > > >> wanted > >>> > > > > > >> > to > >>> > > > > > >> > > understand why the "default-source-parallelism` should > >>> be > >>> > less > >>> > > > > than > >>> > > > > > >> the > >>> > > > > > >> > > "max-parallelism" > >>> > > > > > >> > > > >>> > > > > > >> > > Thanks > >>> > > > > > >> > > Venkat > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > >> > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >> >