[jira] [Created] (FLINK-36173) Invalid link in checkpoint documentation

2024-08-29 Thread Gabor Somogyi (Jira)
Gabor Somogyi created FLINK-36173:
-

 Summary: Invalid link in checkpoint documentation
 Key: FLINK-36173
 URL: https://issues.apache.org/jira/browse/FLINK-36173
 Project: Flink
  Issue Type: Bug
Reporter: Gabor Somogyi


Some of the places we still have 
"checkpointing-with-parts-of-the-graph-finished-beta" link instead of 
"checkpointing-with-parts-of-the-graph-finished".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-474: Store operator name and UID in state metadata

2024-08-29 Thread Gabor Somogyi
Hi All,

I'm glad to announce that FLIP-474[1] has been accepted.

There are 9 approving votes, 7 of which are binding:
- Marton Balassi (binding)
- Ferenc Csaky (binding)
- Gyula Fora (binding)
- Orhidi Matyas (binding)
- Peter Huang (non-binding)
- Clara Xiong (non-binding)
- Zakelly Lan (binding)
- Rui Fan (binding)
- Yun Tang (binding)

There were no votes against it.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-474%3A+Store+operator+name+and+UID+in+state+metadata

BR,
G


[jira] [Created] (FLINK-36174) CDC yaml without pipeline will throw NullPointException

2024-08-29 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-36174:
-

 Summary: CDC yaml without pipeline will throw NullPointException
 Key: FLINK-36174
 URL: https://issues.apache.org/jira/browse/FLINK-36174
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36175) Remove `register_table_source` and `register_table_sink` in python module

2024-08-29 Thread xuyang (Jira)
xuyang created FLINK-36175:
--

 Summary: Remove `register_table_source` and `register_table_sink` 
in python module
 Key: FLINK-36175
 URL: https://issues.apache.org/jira/browse/FLINK-36175
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: xuyang
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36176) Remove support for ancient Kafka versions

2024-08-29 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-36176:
---

 Summary: Remove support for ancient Kafka versions
 Key: FLINK-36176
 URL: https://issues.apache.org/jira/browse/FLINK-36176
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Kafka
Affects Versions: kafka-3.2.0
Reporter: Arvid Heise
Assignee: Arvid Heise


Warp up https://issues.apache.org/jira/browse/FLINK-19152 by also removing all 
related producer code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSSION] Disabling japicmp plugin in master for 2.0

2024-08-29 Thread Matthias Pohl
Hi everyone,
for the 2.0 work, we are expecting to run into public API changes quite a
bit. This would get picked up by the japicmp plugin. The usual way is to
add exclusions to the plugin configuration [1] generating a (presumably
long) list of API changes.

I'm wondering whether we, instead, would want to disable the plugin [2] for
2.0 entirely to lower effort on the contributors side.

Best,
Matthias

[1] https://github.com/apache/flink/blob/master/pom.xml#L2367
[2] https://github.com/apache/flink/blob/master/pom.xml#L170


[jira] [Created] (FLINK-36177) Deprecating KafkaShuffle

2024-08-29 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-36177:
---

 Summary: Deprecating KafkaShuffle
 Key: FLINK-36177
 URL: https://issues.apache.org/jira/browse/FLINK-36177
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Kafka
Affects Versions: kafka-3.2.0
Reporter: Arvid Heise
Assignee: Arvid Heise


KafkaShuffle was never promoted to public API and is still build on top of the 
old producer  and consumer which we want to remove soonish. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSSION] Disabling japicmp plugin in master for 2.0

2024-08-29 Thread Gabor Somogyi
Hi Matthias,

I would turn japicmp for 2.0 off because adding a long list of exceptions
doesn't give any value.
1.x and 2.x is not going to be compatible in any way and when 2.x released,
then that will be the new
japicmp baseline (after a heavy migration).

What I see as a potential risk it that we break something which was not
intended, but fixing those
hopefully small amount of cases is less effort than maintaining an endless
list.

BR,
G


On Thu, Aug 29, 2024 at 11:40 AM Matthias Pohl  wrote:

> Hi everyone,
> for the 2.0 work, we are expecting to run into public API changes quite a
> bit. This would get picked up by the japicmp plugin. The usual way is to
> add exclusions to the plugin configuration [1] generating a (presumably
> long) list of API changes.
>
> I'm wondering whether we, instead, would want to disable the plugin [2] for
> 2.0 entirely to lower effort on the contributors side.
>
> Best,
> Matthias
>
> [1] https://github.com/apache/flink/blob/master/pom.xml#L2367
> [2] https://github.com/apache/flink/blob/master/pom.xml#L170
>


[jira] [Created] (FLINK-36178) Parse CREATE TABLE LIKE to CreateTableEvent

2024-08-29 Thread LvYanquan (Jira)
LvYanquan created FLINK-36178:
-

 Summary: Parse CREATE TABLE LIKE to CreateTableEvent
 Key: FLINK-36178
 URL: https://issues.apache.org/jira/browse/FLINK-36178
 Project: Flink
  Issue Type: Improvement
Reporter: LvYanquan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36179) Bump log4j version

2024-08-29 Thread Siddharth R (Jira)
Siddharth R created FLINK-36179:
---

 Summary: Bump log4j version
 Key: FLINK-36179
 URL: https://issues.apache.org/jira/browse/FLINK-36179
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.10.0
Reporter: Siddharth R


Bumping *log4j* to the latest version (2.23.1) - this will remediate a lot of 
vulnerabilities in dependant packages.

Package details:
 # 
[https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-1.2-api/2.23.1]
 # 
[https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j-impl/2.23.1]
 # 
[https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api/2.23.1]
 # 
[https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core/2.23.1]

Release notes:

[https://logging.apache.org/log4j/2.x/release-notes.html]

 

Lot of bug fixes has been done in the newer versions and I don't see any 
breaking changes as such.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36180) Flink pulsar connector when restoring from the state, the unconsumed messages of the first batch will be lost

2024-08-29 Thread Wenbing Shen (Jira)
Wenbing Shen created FLINK-36180:


 Summary: Flink pulsar connector when restoring from the state, the 
unconsumed messages of the first batch will be lost
 Key: FLINK-36180
 URL: https://issues.apache.org/jira/browse/FLINK-36180
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: pulsar-4.1.0
Reporter: Wenbing Shen


as the title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-475: Support Adaptive Skewed Join Optimization.

2024-08-29 Thread Lincoln Lee
Thanks for bringing up this! It would be a useful feature for batch users.

For the FLIP, I have some questions:

1st, the implementation plan is to rewrite the optimization based on the
execnode of the table planner, but the config option for the optimization
is under flink-core module, does it mean this optimization is available for
datastream jobs as well? (I didn't see the details in the FLIP)
If doesn't, my suggestion is to put these new options into table module.

2nd, the FLIP performs parameter control and optimization based on the
size of the key group, how can users perceive the specific key group size
(or the median key group size) from the job information provided by flink?

3rd, IIUC, the following limitation in the FLIP exists only for streaming
executions. So, is the new IntraInputKeyGroupCorrelation /
InterInputsKeyGroupCorrelation mentioned in FLIP still necessary?
> “The existing data distribution algorithms in Flink impose strict
limitations on joins, requiring that data within the same key group
must be sent to the same downstream for processing. This restricts
the adaptability of data distribution.”


Best,
Lincoln Lee


Zhu Zhu  于2024年8月19日周一 16:50写道:

> +1 for the FLIP
>
> Long-tail tasks caused by skewed data usually pose significant
> challenges for users. It's great that Flink can mitigate such
> issues automatically.
>
> Thanks,
> Zhu
>
> Lei Yang  于2024年8月16日周五 11:18写道:
>
> > Hi devs,
> >
> >
> > Junrui Lee, Xia Sun and I would like to initiate a discussion about
> > FLIP-475: Support Adaptive Skewed Join Optimization [1].
> >
> >
> > In a Join query, when certain keys occur frequently, it can lead to an
> > uneven distribution of data across partitions. This may affect the
> > execution performance of Flink jobs, as a single partition with skewed
> data
> > can severely downgrade the performance of the entire job. To ensure data
> is
> > evenly distributed to downstream tasks, we can use the statistics of the
> > input to split (and duplicate if needed) skewed and splittable partitions
> > into balanced partitions at runtime. However, currently, Flink is unable
> to
> > accurately determine which partitions are skewed and eligible for
> splitting
> > at runtime, and it also lacks the capability to split data within the
> same
> > key group.
> >
> >
> > To address this issue, we plan to introduce Adaptive Skewed Join
> > Optimization capability. This will allow the Join operator to dynamically
> > split partitions that are skewed and splittable based on the statistics
> of
> > the input at runtime, reducing the long-tail problem caused by skewed
> data.
> > This FLIP is based on FLIP-469 [2] and also leverages capabilities
> > introduced in FLIP-470 [3].
> >
> >
> > For more details, please refer to FLIP-475 [1]. We look forward to your
> > feedback.
> >
> >
> > Best,
> >
> >
> > Junrui Lee, Xia Sun and Lei Yang
> >
> >
> > [1]
> >
> > *
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization
> > >*
> >
> > [2]
> >
> > *
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > >*
> >
> > [3]
> >
> > *
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > >*
> >
>


[jira] [Created] (FLINK-36181) Drop support for Java 8 and use Java 17 by default

2024-08-29 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-36181:
--

 Summary: Drop support for Java 8 and use Java 17 by default
 Key: FLINK-36181
 URL: https://issues.apache.org/jira/browse/FLINK-36181
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System, Build System / Azure Pipelines, Build 
System / CI
Affects Versions: 2.0.0
Reporter: Martijn Visser






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36182) PyFlink SQL Job Got IllegalStateException During Task Manager Shutdown

2024-08-29 Thread EMERSON WANG (Jira)
EMERSON WANG created FLINK-36182:


 Summary: PyFlink SQL Job Got IllegalStateException During Task 
Manager Shutdown
 Key: FLINK-36182
 URL: https://issues.apache.org/jira/browse/FLINK-36182
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.18.1
 Environment: EKS prod cluster
Reporter: EMERSON WANG


PyFlink SQL job was running in the AWS EKS cluster. When the task manager pods 
were scaled down, Preconditions.checkState in the class DefaultJobBundleFactory 
throwing "Caused by: java.lang.IllegalStateException: Reference count must not 
be negative.". Since refCount should be always >= 0, it should never happen. 
Please look into what root cause was. One task manager log is as follows:
{
"message": "2024-08-29 17:28:11,630 WARN 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory [] -
Expiring environment urn: \"beam:env:process:v1\"",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": " with 6 remaining bundle references. Taking note to clean it up 
during shutdown if the
references are not removed by then.",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "2024-08-29 17:28:11,635 WARN 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer [] -
Hanged up for unknown endpoint.",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "org.apache.flink.util.FlinkException: Disconnect from JobManager 
responsible for
d1c6852e22e8553ea2e13e19b5c60954.",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "Caused by: org.apache.flink.util.FlinkExpectedException: The 
TaskExecutor is shutting down.",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "2024-08-29 17:28:11,634 WARN 
org.apache.flink.runtime.taskmanager.Task [] -
WindowAggregate[34] -> Calc[35] -> (PythonCalc[36] -> Calc[37] -> 
StreamRecordTimestampInserter[38] -> StreamingFileWriter, PythonCalc[88] -> 
Calc[89]) (1/6)#14 
(ee3e0e638b2bc15ec5ad42a94435f170_7145381c4bbb09912ff683559937e2f3_0_14)
switched from RUNNING to FAILED with failure cause:",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "org.apache.flink.runtime.taskmanager.AsynchronousException: Caught 
exception while processing timer.",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: 
java.lang.RuntimeException:
Error while waiting for BeamPythonFunctionRunner flush",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "Caused by: java.lang.RuntimeException: Error while waiting for 
BeamPythonFunctionRunner flush",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "Caused by: java.lang.RuntimeException: Failed to close remote 
bundle",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "Caused by: java.lang.IllegalStateException: Reference count must 
not be negative.",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "2024-08-29 17:28:11,636 ERROR 
org.apache.beam.runners.fnexecution.control.FnApiControlClient [] -
FnApiControlClient closed, clearing outstanding requests 
{12=java.util.concurrent.CompletableFuture@7945ef9[Not completed, 1 dependents],
15=java.util.concurrent.CompletableFuture@3abdad3[Not completed, 1 dependents], 
18=java.util.concurrent.CompletableFuture@46a954c2[Not completed, 1 
dependents], 19=java.util.concurrent.CompletableFuture@52632b1d[Not completed, 
1 dependents], 5=java.util.concurrent.CompletableFuture@77a5d43e[Not completed, 
1 dependents], 6=java.util.concurrent.CompletableFuture@2a85f55b[Not completed, 
1 dependents]}",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "2024-08-29 17:28:11,637 WARN 
org.apache.flink.runtime.taskmanager.Task [] -
Source: utlb_res_vasn[7] -> Calc[8] -> LocalWindowAggregate[9] (1/4)#14 
(ee3e0e638b2bc15ec5ad42a94435f170_6cdc5bb954874d922eaee11a8e7b5dd5_0_14) 
switched from RUNNING to FAILED with failure cause:",
"time": "2024-08-29T17:28:11+00:00"
}
{
"message": "org.apache.flink.util.FlinkException: Disconnect from JobManager 
responsible for d1c6852e22e8553ea2e13e19b5c60954.",
"time": "2024-08-29T17:28:11+00:00"
}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36183) Lenient mode doesn't work with route blocks

2024-08-29 Thread yux (Jira)
yux created FLINK-36183:
---

 Summary: Lenient mode doesn't work with route blocks
 Key: FLINK-36183
 URL: https://issues.apache.org/jira/browse/FLINK-36183
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0, cdc-3.3.0
Reporter: yux


We should applySchemaChange (where the route rule works) first and lenientize 
its result then, or we may not be able to get evolved schema since tableId 
isn't routed:

Caused by: java.lang.IllegalStateException: Evolved schema does not exist, not 
ready for schema change event AddColumnEvent{tableId=kunni_test.customers, 
addedColumns=[ColumnWithPosition{column=`newCol2` VARCHAR(100), position=AFTER, 
existedColumnName=newCol}]}
at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lambda$lenientizeSchemaChangeEvent$3(SchemaRegistryRequestHandler.java:378)
at java.util.Optional.orElseThrow(Optional.java:290)
at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lenientizeSchemaChangeEvent(SchemaRegistryRequestHandler.java:376)
at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.calculateDerivedSchemaChangeEvents(SchemaRegistryRequestHandler.java:360)
at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.handleSchemaChangeRequest(SchemaRegistryRequestHandler.java:184)
at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry.lambda$handleCoordinationRequest$3(SchemaRegistry.java:273)
... 4 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36184) Transform Operator swallows schema changes from tables not present in transform rules

2024-08-29 Thread yux (Jira)
yux created FLINK-36184:
---

 Summary: Transform Operator swallows schema changes from tables 
not present in transform rules
 Key: FLINK-36184
 URL: https://issues.apache.org/jira/browse/FLINK-36184
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0, cdc-3.3.0
Reporter: yux


Currently, tables that are not present in transform blocks should be treated as 
if there's such a dummy fallback block:

transform:
  - source-table: "\.*.\.*" # capture all unmentioned tables
projection: "*" # keep all columns
# without filtering any rows
There's a bug in #3557's implementation, where schema change events should be 
filtered out if there's no wildcard (*) in transform rules. However, it also 
filters out those tables that are not defined in transform rules, which causes 
schema change events lost with the following example:

transform:
  - source-table: foo.bar.baz   # Another table that doesn't really matter
projection: ...
Here, since there's one transform block, TransformOperator will be added into 
operator chain. Now let's perform some schema change events in another table 
(like db.table), it will be filtered out since TransformOperator regards it as 
a asterisk-less table and does not require schema change events.

By checking if a table is transformed or not, we could set hasAsterisk flag map 
correctly and resolve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36185) Remove RuntimeContext#getExecutionConfig

2024-08-29 Thread Junrui Li (Jira)
Junrui Li created FLINK-36185:
-

 Summary: Remove RuntimeContext#getExecutionConfig
 Key: FLINK-36185
 URL: https://issues.apache.org/jira/browse/FLINK-36185
 Project: Flink
  Issue Type: Sub-task
  Components: API / Core
Reporter: Junrui Li


FLINK-33713/FLIP-391 Deprecate RuntimeContext#getExecutionConfig
In Flink 2.0 we should remove these deprecated method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36186) Speed up RocksDB close during manual compaction

2024-08-29 Thread Yue Ma (Jira)
Yue Ma created FLINK-36186:
--

 Summary: Speed up RocksDB close during manual compaction 
 Key: FLINK-36186
 URL: https://issues.apache.org/jira/browse/FLINK-36186
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 2.0.0
Reporter: Yue Ma
 Fix For: 2.0.0


After https://issues.apache.org/jira/browse/FLINK-26050, Flink RocksDB may 
schedule manual compaction asynchronously, but if a failover occurs at this 
time, RocksDB will need to wait for the manual comparison to complete before it 
can close. This may result in a very long time for task cancellation, affecting 
the time for task recovery.
{code:java}
// After this function call, CompactRange() or CompactFiles() will not
// run compactions and fail. Calling this function will tell outstanding
// manual compactions to abort and will wait for them to finish or abort
// before returning.
virtual void DisableManualCompaction() = 0; {code}
The solution is relatively simple. We can manually call _DisabManulCompaction_ 
during db close to abort the running ManulCompaction, which can accelerate db 
close faster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-475: Support Adaptive Skewed Join Optimization.

2024-08-29 Thread Lei Yang
Hi Lincoln Lee,
Thanks for your feedback!
For the 1st question, thank you for the reminder. This optimization is only
available for Table jobs in batch mode, and I have put these new options
into table module. I also replaced the "enable" and "force" configurations
with a new enum type configuration to avoid confusing. The new enum type
configuration has three values: "AUTO" means Flink will automatically apply
this optimization, "FORCED" means Flink will enforce this optimization even
if it introduces extra hash shuffle, and "NONE" means this optimization
will not be executed.
For 2nd question, the key group size (or median size) is calculated based
on statistical information from the upstream output and is used to
determine data skewness, so users do not need to know the specific values.
I specifically mentioned "the median key group size" just because I chose
to use it to represent the central tendency of data processing volumes
across all parallel instances.
For 3rd question, after my confirmation, this limitation also exists in
batch mode. Therefore, IntraInputKeyGroupCorrelation and
InterInputsKeyGroupCorrelation are necessary. I need them to determine
whether and how to split the skewed key group to ensure data correctness.
Additionally, adding these two correlations has other benefits: other
optimization strategies can also modify them to flexibly choose the data
distribution algorithm based on the operator’s specific situation.

Best, Lei Yang

Lincoln Lee  于2024年8月29日周四 23:13写道:


Lincoln Lee  于2024年8月29日周四 23:13写道:

> Thanks for bringing up this! It would be a useful feature for batch users.
>
> For the FLIP, I have some questions:
>
> 1st, the implementation plan is to rewrite the optimization based on the
> execnode of the table planner, but the config option for the optimization
> is under flink-core module, does it mean this optimization is available for
> datastream jobs as well? (I didn't see the details in the FLIP)
> If doesn't, my suggestion is to put these new options into table module.
>
> 2nd, the FLIP performs parameter control and optimization based on the
> size of the key group, how can users perceive the specific key group size
> (or the median key group size) from the job information provided by flink?
>
> 3rd, IIUC, the following limitation in the FLIP exists only for streaming
> executions. So, is the new IntraInputKeyGroupCorrelation /
> InterInputsKeyGroupCorrelation mentioned in FLIP still necessary?
> > “The existing data distribution algorithms in Flink impose strict
> limitations on joins, requiring that data within the same key group
> must be sent to the same downstream for processing. This restricts
> the adaptability of data distribution.”
>
>
> Best,
> Lincoln Lee
>
>
> Zhu Zhu  于2024年8月19日周一 16:50写道:
>
> > +1 for the FLIP
> >
> > Long-tail tasks caused by skewed data usually pose significant
> > challenges for users. It's great that Flink can mitigate such
> > issues automatically.
> >
> > Thanks,
> > Zhu
> >
> > Lei Yang  于2024年8月16日周五 11:18写道:
> >
> > > Hi devs,
> > >
> > >
> > > Junrui Lee, Xia Sun and I would like to initiate a discussion about
> > > FLIP-475: Support Adaptive Skewed Join Optimization [1].
> > >
> > >
> > > In a Join query, when certain keys occur frequently, it can lead to an
> > > uneven distribution of data across partitions. This may affect the
> > > execution performance of Flink jobs, as a single partition with skewed
> > data
> > > can severely downgrade the performance of the entire job. To ensure
> data
> > is
> > > evenly distributed to downstream tasks, we can use the statistics of
> the
> > > input to split (and duplicate if needed) skewed and splittable
> partitions
> > > into balanced partitions at runtime. However, currently, Flink is
> unable
> > to
> > > accurately determine which partitions are skewed and eligible for
> > splitting
> > > at runtime, and it also lacks the capability to split data within the
> > same
> > > key group.
> > >
> > >
> > > To address this issue, we plan to introduce Adaptive Skewed Join
> > > Optimization capability. This will allow the Join operator to
> dynamically
> > > split partitions that are skewed and splittable based on the statistics
> > of
> > > the input at runtime, reducing the long-tail problem caused by skewed
> > data.
> > > This FLIP is based on FLIP-469 [2] and also leverages capabilities
> > > introduced in FLIP-470 [3].
> > >
> > >
> > > For more details, please refer to FLIP-475 [1]. We look forward to your
> > > feedback.
> > >
> > >
> > > Best,
> > >
> > >
> > > Junrui Lee, Xia Sun and Lei Yang
> > >
> > >
> > > [1]
> > >
> > > *
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization
> > > >*
> > >
> > > [2]
> > >
> > > *
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+