[PR] [FLINK-37332] Support any column as chunk key column (postgres, orcale, db2, sqlserver) #3922 [flink-cdc]

2025-02-20 Thread via GitHub


SML0127 opened a new pull request, #3928:
URL: https://github.com/apache/flink-cdc/pull/3928

   MySQL CDC connector supports this feature from flink cdc v3.2
   Now, other connectors that support incremental snapshot allow any columns as 
chunk key column
   
   apache jira: https://issues.apache.org/jira/browse/FLINK-37332
   
   cc. @gtk96 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37236] Bump the flink version from 1.20.0 to 1.20.1 [flink-kubernetes-operator]

2025-02-20 Thread via GitHub


gyfora commented on PR #943:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/943#issuecomment-2673514124

   No changes to the NOTICE file necessary? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-37161) Cross-team verification for "Adaptive skewed join optimization for batch jobs"

2025-02-20 Thread xingbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929029#comment-17929029
 ] 

xingbe commented on FLINK-37161:


The above test cases have been sequentially verified.

For test cases 1, 3, 4, and 5, it has been verified that the skewed join can 
handle join skew issues as expected.

For example, the data distribution before optimization is:

!https://static.dingtalk.com/media/lQLPJxH9h4tniDfNBIzNCHqw9SDlZWw_GjwHmzXkALzlAQ_2170_1164.png|width=355,height=191!

and the data distribution after optimization is:

!https://static.dingtalk.com/media/lQLPJxDsrln4yDfNBIbNCLKwBUSGUGCqbGQHmzXkALzlAA_2226_1158.png|width=362,height=188!

But for test case 2, when there is an aggregation node with the same hash key 
downstream of the join node, it was found that the Join operator cannot be 
converted into an AdaptiveJoin operator.

> Cross-team verification for "Adaptive skewed join optimization for batch jobs"
> --
>
> Key: FLINK-37161
> URL: https://issues.apache.org/jira/browse/FLINK-37161
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Junrui Lee
>Assignee: xingbe
>Priority: Blocker
> Fix For: 2.0.0
>
>
> In Flink 2.0, we support the capability of adaptive skewed join optimization 
> for batch jobs, which will allow the Join operator to dynamically split 
> skewed and splittable partitions based on runtime input statistics, thereby 
> mitigating the long-tail problem caused by skewed data.
> We may need the following tests:
>  # 
> Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is 
> set to {{{}auto{}}}. We need to construct a simple join case with data skewed 
> on a single key (e.g., making the data of a specified join key N times larger 
> than other join keys, where N is defined by 
> {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}). And ensuring 
> the data volume for the skewed join key exceeds the skewed-threshold (defined 
> by {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}). 
> Finally, observe whether the ratio of the maximum data volume to the median 
> data volume processed by concurrent join tasks is less than the skew factor.
>  # 
> Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is 
> set to {{{}forced{}}}. Construct a skewed join instance similar to Test 1, 
> but with the following difference: the join case should be connected to a 
> downstream operator that performs hashing on the same field (e.g., hash 
> aggregation or group by). It is recommended to set different parallelisms for 
> the join operator and the downstream operator to prevent the out edge from 
> being optimized to a forward edge. Finally, observe whether the ratio of the 
> maximum data volume to the median data volume processed by concurrent join 
> tasks is less than the skew factor.
>  # 
> Test the case where 
> {{{}table.optimizer.skewed-join-optimization.strategy{}}}as none, and verify 
> that the join operator will not be optimized into an adaptive join operator 
> under any circumstances.
>  # Test the case with customized 
> {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}. We need to 
> construct a skewed join instance similar to Test 1, setting different skewed 
> factors and observing whether the ratio of the maximum data volume to the 
> median data volume processed by concurrent join tasks is less than the skew 
> factor. Note that currently, Flink can only reduce the ratio to 2.0, and 
> please ensure that the skewed-factor is greater than 2.0 during testing.
>  # Test the case with customized 
> {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}. We need to 
> construct a skewed join instance similar to Test 1, setting different 
> skewed-threshold and observing whether the optimization is effective only 
> when the data volume processed by the skewed join instance is greater than 
> the skewed threshold.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37322) surport eval groovy scripts udf for flink sql

2025-02-20 Thread tianyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tianyuan updated FLINK-37322:
-
Component/s: Table SQL / API

> surport eval groovy scripts udf for flink sql
> -
>
> Key: FLINK-37322
> URL: https://issues.apache.org/jira/browse/FLINK-37322
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: tianyuan
>Priority: Major
>
> I want to add a new UDF to dynamically execute Groovy scripts to achieve any 
> desired functions.
> Just like using Lua scripts in Redis. I have implemented this UDF locally, 
> which has very efficient execution performance and strong flexibility. Usage 
> is as follows:
> select eval_groovy_script('INT', 'return arg0+arg1', 1, 2);
> select eval_groovy_script('INT', 'return p1+p2', MAP['p1', 2, 'p2', 5]);
> select eval_groovy_script('MAP', 'return ["TopicName" : 
> "Lists", "Author" : "Raghav"] ');
> This udf requires the following three parameters:
> 1. Data return type
> 2. Groovy script
> 3. Parameters passed to the groovy script. If it is a variable type, use the 
> variable in the groovy script through arg\{index}. If it is a map type, you 
> can use the variable through the key in the groovy script.
> Apache pinot has implemented similar functions:
> https://docs.pinot.apache.org/users/user-guide-query/scalar-functions
> I hope a committer will recognize my idea and assign this task to me.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37322) surport eval groovy scripts udf for flink sql

2025-02-20 Thread tianyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tianyuan updated FLINK-37322:
-
Affects Version/s: 2.1.0

> surport eval groovy scripts udf for flink sql
> -
>
> Key: FLINK-37322
> URL: https://issues.apache.org/jira/browse/FLINK-37322
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 2.1.0
>Reporter: tianyuan
>Priority: Major
>
> I want to add a new UDF to dynamically execute Groovy scripts to achieve any 
> desired functions.
> Just like using Lua scripts in Redis. I have implemented this UDF locally, 
> which has very efficient execution performance and strong flexibility. Usage 
> is as follows:
> select eval_groovy_script('INT', 'return arg0+arg1', 1, 2);
> select eval_groovy_script('INT', 'return p1+p2', MAP['p1', 2, 'p2', 5]);
> select eval_groovy_script('MAP', 'return ["TopicName" : 
> "Lists", "Author" : "Raghav"] ');
> This udf requires the following three parameters:
> 1. Data return type
> 2. Groovy script
> 3. Parameters passed to the groovy script. If it is a variable type, use the 
> variable in the groovy script through arg\{index}. If it is a map type, you 
> can use the variable through the key in the groovy script.
> Apache pinot has implemented similar functions:
> https://docs.pinot.apache.org/users/user-guide-query/scalar-functions
> I hope a committer will recognize my idea and assign this task to me.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37320] [Observer] FINISHED finite streaming jobs incorrectly being set to RECONCILING [flink-kubernetes-operator]

2025-02-20 Thread via GitHub


gyfora merged PR #944:
URL: https://github.com/apache/flink-kubernetes-operator/pull/944


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-3154) Update Kryo version from 2.24.0 to latest Kryo LTS version

2025-02-20 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-3154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929032#comment-17929032
 ] 

Gyula Fora commented on FLINK-3154:
---

I feel that there is not a very strong consensus here regarding the approach, I 
started a discussion to finalize the decision on the dev mailing list: 
[https://lists.apache.org/thread/odhglx8tmpdt6jnorgcsvxjqjfd169x6]
Please chime in and then we can move ahead with this

> Update Kryo version from 2.24.0 to latest Kryo LTS version
> --
>
> Key: FLINK-3154
> URL: https://issues.apache.org/jira/browse/FLINK-3154
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Priority: Not a Priority
>  Labels: pull-request-available
>
> Flink's Kryo version is outdated and could be updated to a newer version, 
> e.g. kryo-3.0.3.
> From ML: we cannot bumping the Kryo version easily - the serialization format 
> changed (that's why they have a new major version), which would render all 
> Flink savepoints and checkpoints incompatible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-37320) [Observer] FINISHED finite streaming jobs incorrectly being set to RECONCILING

2025-02-20 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-37320.
--
Fix Version/s: kubernetes-operator-1.11.0
 Assignee: Luca Castelli
   Resolution: Fixed

merged to main f874b03dfddb47cd55cc9491c0dd4ebdd4fa3c38

> [Observer] FINISHED finite streaming jobs incorrectly being set to RECONCILING
> --
>
> Key: FLINK-37320
> URL: https://issues.apache.org/jira/browse/FLINK-37320
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.10.0
> Environment: I've attached the flinkdeployment CR and operator-config 
> I used to locally replicate.
>Reporter: Luca Castelli
>Assignee: Luca Castelli
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.11.0
>
> Attachments: operator-config.yaml, 
> operator-log-finite-streaming-job.log, test-finite-streaming-job.yaml
>
>
> Hello,
> I believe I've found a bug within the observation logic for finite streaming 
> jobs. This is a follow-up to: 
> [https://lists.apache.org/thread/xvsk4fmlqln092cdolvox4dgko0pw81k].
> *For finite streaming jobs:*
>  # The job finishes successfully and the job status changes to FINISHED
>  # TTL (kubernetes.operator.jm-deployment.shutdown-ttl) cleanup removes the 
> JM deployments and clears HA configmap data
>  # On the next loop, the observer sees MISSING JM and changes the job status 
> from FINISHED to RECONCILING
> The job had reached a terminal state. It shouldn't have been set back to 
> RECONCILING.
> This leads to an operator error later when a recovery attempt is triggered. 
> The recovery is triggered because the JM is MISSING, the status is 
> RECONCILING, spec shows RUNNING, and HA enabled. The recovery fails with 
> validateHaMetadataExists throwing UpgradeFailureException.
> At that point the deployment gets stuck in a loop with status RECONCILING and 
> UpgradeFailureException thrown on each cycle. I've attached operator logs 
> showing this.
> *Proposed solution:* I think the fix would be to wrap 
> [AbstractFlinkDeploymentObserver.observeJmDeployment|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L155]
>  in an if-statement that checks the job is not in a terminal state. Happy to 
> discuss and/or put up the 2 line code change PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-37350][table-planner] Fix memory leak caused by skewed join optimization strategy [flink]

2025-02-20 Thread via GitHub


noorall opened a new pull request, #26180:
URL: https://github.com/apache/flink/pull/26180

   
   
   ## What is the purpose of the change
   
   When the AdaptiveBroadcastJoinOptimizationStrategy is in effect, it leads to 
the statistics in the AdaptiveSkewedJoinOptimizationStrategy to not be released 
properly, which causes memory leaks. This PR is intended to fix the issue.
   
   
   ## Brief change log
   
   Fix memory leak caused by skewed join optimization strategy.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37350) Memory leak caused by skewed join optimization strategy

2025-02-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37350:
---
Labels: pull-request-available  (was: )

> Memory leak caused by skewed join optimization strategy
> ---
>
> Key: FLINK-37350
> URL: https://issues.apache.org/jira/browse/FLINK-37350
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Lei Yang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 2.0.1
>
>
> When the AdaptiveBroadcastJoinOptimizationStrategy is in effect, it leads to 
> the statistics in the AdaptiveSkewedJoinOptimizationStrategy to not be 
> released properly, which causes memory leaks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-2.0][FLINK-28047][docs] Remove deprecated StreamExecutionEnvironment#readTextFile method in documentation [flink]

2025-02-20 Thread via GitHub


reswqa merged PR #26164:
URL: https://github.com/apache/flink/pull/26164


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-37356) Recycle use of kafka producer(which commit error) maybe send data without AddPartitionsToTxnRequest

2025-02-20 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928691#comment-17928691
 ] 

Arvid Heise commented on FLINK-37356:
-

Thank you very much for the detailed analysis. However, I'm still not sure I 
fully understand the issue.

In general, everything that we do with reflection is fishy and may have worked 
at some point in time but prone to fail at a later time. So that makes your 
analysis very plausible.

What I didn't fully understand is: how do we get an entry in 
{{partitionsInTransaction}} when we have an empty data? As far as I understood 
FLINK-31363, we actually don't have any partitions and as such the transaction 
is invisible to the broker.

Nevertheless, I do see two issues:
* Committing an empty transaction will result in a 
{{{}InvalidTxnStateException{}}}. I think that the current {{main}} contains 
already a fix for that by simple not trying to commit and resetting the state 
only on client side (please double-check if that makes sense at all).
* I also think it may be inherently unsafe to reuse a producer that encountered 
an error for any reason. Let's say the producer the producer was fenced. Can we 
assume that the transaction manager is in a state from which we can easily 
resume? In theory, {{initTransaction}} should reset everything but you 
definitively found cases where it doesn't. So I'd propose to close a producer 
that encountered a non-retriable error and open a new one instead.

Coming back to your original problem. It would be awesome if you could distill 
the issue to a small repo/test case that fails. From your description it sounds 
easy to do, although - as I said - I cannot follow 100%. {{main}} now also 
contains better debug logging that makes it easier to follow reuse because I 
also print the system hash code. That also helps tremendously in debugging 
leaks.

> Recycle use of kafka producer(which commit error) maybe send data without 
> AddPartitionsToTxnRequest
> ---
>
> Key: FLINK-37356
> URL: https://issues.apache.org/jira/browse/FLINK-37356
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.4.0
>Reporter: Hongshun Wang
>Priority: Major
>
> In my production environment,  READ_COMMITTED consumer can no longer consume 
> any records. Then I found that the LSO of the partition doesn't change for a 
> long time.  I lookup all the log in Kafka cluster, then find that there is a 
> transaction lacking AddPartitionsToTxnRequest.
>  
> At first, I think the problem is caused by 
> https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster 
> log contains InvalidTxnStateException. However, though the transaction is in 
> an invalid state,  no data is written into Kafka topic partition in this 
> transaction( because in this case, the transaction is empty). It will not 
> influence any Kafka topic partition's LSO, thus consumer won't be blocked.
>  
> Then I check the code of Kafka client, it seems no way to produce data 
> without AddPartitionsToTxnRequest done because the the `Sender` will refuse 
> to dequeue batches from the accumulator until they have been added to the 
> transaction.
> {code:java}
> // org.apache.kafka.clients.producer.KafkaProducer#doSend
> private Future doSend(ProducerRecord record, Callback 
> callback) {
> // ..ignore code
> // Add the partition to the transaction (if in progress) after it has been 
> successfully
> // appended to the accumulator. We cannot do it before because the partition 
> may be
> // unknown or the initially selected partition may be changed when the batch 
> is closed
> // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse 
> to dequeue
> // batches from the accumulator until they have been added to the transaction.
> if (transactionManager != null) {
> transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
> // ignore code
> }{code}
> {code:java}
> //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
>  
> if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 
> there is a rare case that a single batch size is larger than the request size 
> due to // compression; in this case we will still eventually send this batch 
> in a single request break; } else { if 
> (shouldStopDrainBatchesForPartition(first, tp)) break; }
> {code}
>  
> Then I have a idea that if a TransactionManager which doesn't clear 
> partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will 
> be sent again.  It maybe happen in Flink Kafka connector:
>  
>       1. The flink kafka connector also reuse and recycle KafkaProducer: 
> KafkaCommitter will recycle

Re: [PR] [FLINK-37350][table-planner] Fix memory leak caused by skewed join optimization strategy [flink]

2025-02-20 Thread via GitHub


JunRuiLee commented on code in PR #26180:
URL: https://github.com/apache/flink/pull/26180#discussion_r1963051617


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java:
##
@@ -658,4 +660,148 @@ public static long calculateDataVolumePerTaskForInput(
 long globalDataVolumePerTask, long inputsGroupBytes, long 
totalDataBytes) {
 return (long) ((double) inputsGroupBytes / totalDataBytes * 
globalDataVolumePerTask);
 }
+
+public static Optional constructOptimizationLog(
+BlockingInputInfo inputInfo, JobVertexInputInfo 
jobVertexInputInfo) {
+if (inputInfo.areInterInputsKeysCorrelated() && 
inputInfo.isIntraInputKeyCorrelated()) {
+return Optional.empty();
+}
+boolean optimized = false;
+List executionVertexInputInfos =
+jobVertexInputInfo.getExecutionVertexInputInfos();
+int parallelism = executionVertexInputInfos.size();
+long[] optimizedDataBytes = new long[parallelism];
+long optimizedMin = Long.MAX_VALUE, optimizedMax = 0;
+long[] nonOptimizedDataBytes = new long[parallelism];
+long nonOptimizedMin = Long.MAX_VALUE, nonOptimizedMax = 0;
+for (int i = 0; i < parallelism; ++i) {
+Map consumedSubpartitionGroups =
+
executionVertexInputInfos.get(i).getConsumedSubpartitionGroups();
+for (Map.Entry entry : 
consumedSubpartitionGroups.entrySet()) {
+IndexRange partitionRange = entry.getKey();
+IndexRange subpartitionRange = entry.getValue();
+optimizedDataBytes[i] +=
+inputInfo.getNumBytesProduced(partitionRange, 
subpartitionRange);
+}
+optimizedMin = Math.min(optimizedMin, optimizedDataBytes[i]);
+optimizedMax = Math.max(optimizedMax, optimizedDataBytes[i]);
+
+Map nonOptimizedConsumedSubpartitionGroup =
+computeNumBasedConsumedSubpartitionGroup(parallelism, i, 
inputInfo);
+checkState(nonOptimizedConsumedSubpartitionGroup.size() == 1);
+nonOptimizedDataBytes[i] +=
+inputInfo.getNumBytesProduced(
+nonOptimizedConsumedSubpartitionGroup
+.entrySet()
+.iterator()
+.next()
+.getKey(),
+nonOptimizedConsumedSubpartitionGroup
+.entrySet()
+.iterator()
+.next()
+.getValue());
+nonOptimizedMin = Math.min(nonOptimizedMin, 
nonOptimizedDataBytes[i]);
+nonOptimizedMax = Math.max(nonOptimizedMax, 
nonOptimizedDataBytes[i]);
+
+if (!optimized
+&& 
!consumedSubpartitionGroups.equals(nonOptimizedConsumedSubpartitionGroup)) {
+optimized = true;
+}
+}
+if (optimized) {
+long optimizedMed = median(optimizedDataBytes);
+long nonOptimizedMed = median(nonOptimizedDataBytes);
+String logMessage =
+String.format(
+"Result id: %s, "
++ "type number: %d, "
++ "input data size: "
++ "[ Before: {min: %s, median: %s, max: 
%s}, "
++ "After: {min: %s, median: %s, max: %s} 
]",
+inputInfo.getResultId(),
+inputInfo.getInputTypeNumber(),
+new 
MemorySize(nonOptimizedMin).toHumanReadableString(),
+new 
MemorySize(nonOptimizedMed).toHumanReadableString(),
+new 
MemorySize(nonOptimizedMax).toHumanReadableString(),
+new 
MemorySize(optimizedMin).toHumanReadableString(),
+new 
MemorySize(optimizedMed).toHumanReadableString(),
+new 
MemorySize(optimizedMax).toHumanReadableString());
+return Optional.of(logMessage);
+}
+return Optional.empty();
+}
+
+private static Map 
computeNumBasedConsumedSubpartitionGroup(
+int parallelism, int currentIndex, BlockingInputInfo inputInfo) {
+int sourceParallelism = inputInfo.getNumPartitions();
+
+if (inputInfo.isPointwise()) {
+return computeNumBasedConsumedSubpartitionGroupForPointwise(
+sourceParallelism, parallelism, currentIndex, 
inputInfo::getNumSubpartitions);
+} else {
+return computeNumBasedConsumedSubpartitionGroupForAllToAll(
+   

Re: [PR] [FLINK-37149][doc] Add documents for adaptive batch execution. [flink]

2025-02-20 Thread via GitHub


JunRuiLee merged PR #26168:
URL: https://github.com/apache/flink/pull/26168


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-37069) Cross-team verification for "Disaggregated State Management"

2025-02-20 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928718#comment-17928718
 ] 

Zakelly Lan commented on FLINK-37069:
-

Thanks Weijie!

> Cross-team verification for "Disaggregated State Management"
> 
>
> Key: FLINK-37069
> URL: https://issues.apache.org/jira/browse/FLINK-37069
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xintong Song
>Assignee: Weijie Guo
>Priority: Blocker
> Fix For: 2.0.0
>
>
> Instructions:
> First of all, please read the related documents briefly (still under review, 
> will replace with formal links if merged):
>  * Disaggregated State Management: 
> [https://github.com/apache/flink/pull/26107/files#diff-bfa19e04bb5c3487c3e9bf514d61c0fa8bb973950fb0ad0e3d4a6898a99b83e3]
>  * State V2: 
> [https://github.com/apache/flink/pull/26107/files#diff-5d1147987fecbda329132403c1d92384575be220092995c4be491e12b8c50cc9]
>  * ForSt State Backend: 
> [https://github.com/apache/flink/pull/26107/files#diff-b7c52c06f6ed4d5af6f230d11ba23ea051bf4a08c589d98392143f080c468a87]
> For the SQL part, verification goes in FLINK-37068, we mainly focus on 
> Datastream jobs and APIs here.
> 1. Make sure you are verifying this on release-2.0 branch, since we have 
> fixed several bugs since the rc0 package.
> 2. Choose one example in `flink-examples-streaming`. Most of the jobs has 
> been rewritten using new API. Here we take `StateMachineExample` as an 
> example.
> 3. Compile and run `StateMachineExample` in proper environment (I suggest a 
> standalone session cluster or yarn), make sure you have the following command 
> line params:
> {code:bash}
> ./flink run x \
>   --backend forst \
>   --checkpoint-dir s3://your/cp/dir \
>   --incremental-checkpoints true
> {code}
> Or set via `config.yaml`.
> {code:yaml}
> state.backend.type: forst
> execution.checkpointing.incremental: true
> execution.checkpointing.dir: s3://your-bucket/flink-checkpoints
> {code}
> 4. Check the job is running smoothly, the periodic checkpoints are 
> successfully taken.
> 5. Stop the job and restart from the latest checkpoint.
> It would be great if you could write your own job using State V2 API, and 
> follow the above Step 3~5. It is important to check whether there is any bug 
> in new State APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36549) Using the ignore-parse-errors parameter in Debezium/Canal/Maxwell/Ogg JSON results in unexpected data loss.

2025-02-20 Thread xiaoyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928717#comment-17928717
 ] 

xiaoyu commented on FLINK-36549:


The PRs for versions 1.19 and 1.20 have been created. Here are the links for 
your reference:
 * [https://github.com/apache/flink/pull/26172]
 * https://github.com/apache/flink/pull/26173

When you have some time, I’d really appreciate it if you could help review 
them. Thank you in advance for your time and support![~libenchao] 

> Using the ignore-parse-errors parameter in Debezium/Canal/Maxwell/Ogg JSON 
> results in unexpected data loss.
> ---
>
> Key: FLINK-36549
> URL: https://issues.apache.org/jira/browse/FLINK-36549
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.17.2, 1.18.1, 1.19.1
>Reporter: jiangyu
>Assignee: xiaoyu
>Priority: Critical
>  Labels: pull-request-available
>
> In Debezium/Canal/Maxwell/Ogg JSON, setting {{ignore-parse-errors}} would 
> cause data loss if an operator chained with a format-related operator 
> encounters an exception. The reason is that in the deserialization 
> implementation of Debezium/Canal/Maxwell/Ogg JSON, enabling the 
> {{ignore-parse-errors}} parameter skips exceptions related to the format's 
> emitted data. For example, in Canal's JSON code, enabling the 
> {{ignore-parse-errors}} parameter catches and skips exceptions for {{emitRow}}
> {code:java}
> @Override
> public void deserialize(@Nullable byte[] message, Collector out) 
> throws IOException {
> if (message == null || message.length == 0) {
> return;
> }
> try {
> final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
> if (database != null) {
> if (!databasePattern
> .matcher(root.get(ReadableMetadata.DATABASE.key).asText())
> .matches()) {
> return;
> }
> }
> if (table != null) {
> if (!tablePattern
> .matcher(root.get(ReadableMetadata.TABLE.key).asText())
> .matches()) {
> return;
> }
> }
> final GenericRowData row = (GenericRowData) 
> jsonDeserializer.convertToRowData(root);
> String type = row.getString(2).toString(); // "type" field
> if (OP_INSERT.equals(type)) {
> // "data" field is an array of row, contains inserted rows
> ArrayData data = row.getArray(0);
> for (int i = 0; i < data.size(); i++) {
> GenericRowData insert = (GenericRowData) data.getRow(i, 
> fieldCount);
> insert.setRowKind(RowKind.INSERT);
> emitRow(row, insert, out);
> }
> } else if (OP_UPDATE.equals(type)) {
> // "data" field is an array of row, contains new rows
> ArrayData data = row.getArray(0);
> // "old" field is an array of row, contains old values
> ArrayData old = row.getArray(1);
> for (int i = 0; i < data.size(); i++) {
> // the underlying JSON deserialization schema always produce 
> GenericRowData.
> GenericRowData after = (GenericRowData) data.getRow(i, 
> fieldCount);
> GenericRowData before = (GenericRowData) old.getRow(i, 
> fieldCount);
> final JsonNode oldField = root.get(FIELD_OLD);
> for (int f = 0; f < fieldCount; f++) {
> if (before.isNullAt(f) && 
> oldField.findValue(fieldNames.get(f)) == null) {
> // fields in "old" (before) means the fields are 
> changed
> // fields not in "old" (before) means the fields are 
> not changed
> // so we just copy the not changed fields into before
> before.setField(f, after.getField(f));
> }
> }
> before.setRowKind(RowKind.UPDATE_BEFORE);
> after.setRowKind(RowKind.UPDATE_AFTER);
> emitRow(row, before, out);
> emitRow(row, after, out);
> }
> } else if (OP_DELETE.equals(type)) {
> // "data" field is an array of row, contains deleted rows
> ArrayData data = row.getArray(0);
> for (int i = 0; i < data.size(); i++) {
> GenericRowData insert = (GenericRowData) data.getRow(i, 
> fieldCount);
> insert.setRowKind(RowKind.DELETE);
> emitRow(row, insert, out);
> }
> } else if (OP_CREATE.equals(type)) {
> // "data"

Re: [PR] [FLINK-37327] [Formats (JSON, Avro, Parquet, ORC, SequenceFile)] Debezium Avro Format: Add FormatOption to Optionally Skip emitting UPDATE_BEFORE Rows [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26159:
URL: https://github.com/apache/flink/pull/26159#discussion_r1963417984


##
docs/content.zh/docs/connectors/table/formats/debezium.md:
##
@@ -320,6 +320,13 @@ Flink 提供了 `debezium-avro-confluent` 和 `debezium-json` 两种 
format 来
 String
 Bearer auth token for Schema Registry
 
+
+debezium-avro-confluent.upsert-mode
+optional
+false
+Boolean
+In upsert mode the format will not produce UPDATE_BEFORE Rows 
from Debezium op='u' change events when deserializing.

Review Comment:
   nit: Rows ->rows



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37329][table-planner] Skip Source Stats Collection When table.optimizer.source.report-statistics-enabled is False [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26162:
URL: https://github.com/apache/flink/pull/26162#discussion_r1963420772


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##
@@ -115,25 +115,26 @@ private TableStats recomputeStatistics(
 FilterPushDownSpec filterPushDownSpec,
 boolean reportStatEnabled) {
 TableStats origTableStats = table.getStatistic().getTableStats();
+// bail out early if stats computation is disabled.
+if (!reportStatEnabled) {
+return origTableStats;
+}
 DynamicTableSource tableSource = table.tableSource();
 if (filterPushDownSpec != null && 
!filterPushDownSpec.isAllPredicatesRetained()) {
 // filter push down but some predicates are accepted by source and 
not in reaming
 // predicates
 // the catalog do not support get statistics with filters,
-// so only call reportStatistics method if reportStatEnabled is 
true
+// so only call reportStatistics

Review Comment:
   nit: the comment now does not read well.  "so only call reportStatistics" 
implies there will be more to the sentence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36409] Publish some autoscaler metrics during stabilisation period [flink-kubernetes-operator]

2025-02-20 Thread via GitHub


mxm commented on PR #945:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/945#issuecomment-2671259434

   I'm ok with not having flag / config option. I do see that adding the 
changes under a feature flag is of little use. Plus, we have too many config 
options already. You will likely still have gaps in the metric processing 
because of the job restarts for which metrics aren't available, but it may help 
users to better understand what's happening with the cluster during the 
stabilization period.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Fix post-release japicmp update script for macOS [flink]

2025-02-20 Thread via GitHub


flinkbot commented on PR #26182:
URL: https://github.com/apache/flink/pull/26182#issuecomment-2671276189

   
   ## CI report:
   
   * 90fcdf11fa4a9d9933965f4430d7a6c2d3fb182c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963430852


##
flink-python/docs/reference/pyflink.table/catalog.rst:
##
@@ -250,3 +250,31 @@ A CatalogDescriptor is a template for creating a catalog 
instance. It closely re
 :toctree: api/
 
 CatalogDescriptor.of
+
+
+ObjectIdentifier
+
+
+Identifies an object in a catalog. It allows to identify objects such as 
tables, views,

Review Comment:
   nit: `It allows to identify objects ` does not read well. I suggest
   `Identifies an object in a catalog, including tables, views, functions and 
types.`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963431910


##
flink-python/docs/reference/pyflink.table/catalog.rst:
##
@@ -250,3 +250,31 @@ A CatalogDescriptor is a template for creating a catalog 
instance. It closely re
 :toctree: api/
 
 CatalogDescriptor.of
+
+
+ObjectIdentifier
+
+
+Identifies an object in a catalog. It allows to identify objects such as 
tables, views,
+function, or types in a catalog. An identifier must be fully qualified. It is 
the

Review Comment:
   nit:
   An identifier -> ObjectIdentifier
   An identifier must be fully qualified. -> An identifier must be fully 
qualified, by this we mean .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963432849


##
flink-python/docs/reference/pyflink.table/catalog.rst:
##
@@ -250,3 +250,31 @@ A CatalogDescriptor is a template for creating a catalog 
instance. It closely re
 :toctree: api/
 
 CatalogDescriptor.of
+
+
+ObjectIdentifier
+
+
+Identifies an object in a catalog. It allows to identify objects such as 
tables, views,
+function, or types in a catalog. An identifier must be fully qualified. It is 
the
+responsibility of the catalog manager to resolve an identifier to an object.

Review Comment:
   nit: identifier -> objectIdentifier



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-37359] Use pinned qemu image [flink-kubernetes-operator]

2025-02-20 Thread via GitHub


gyfora opened a new pull request, #946:
URL: https://github.com/apache/flink-kubernetes-operator/pull/946

   ## Brief change log
   
   Use fixed qemu image as new latest seems to be broken
   
   ## Verifying this change
   CI
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
   no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963431910


##
flink-python/docs/reference/pyflink.table/catalog.rst:
##
@@ -250,3 +250,31 @@ A CatalogDescriptor is a template for creating a catalog 
instance. It closely re
 :toctree: api/
 
 CatalogDescriptor.of
+
+
+ObjectIdentifier
+
+
+Identifies an object in a catalog. It allows to identify objects such as 
tables, views,
+function, or types in a catalog. An identifier must be fully qualified. It is 
the

Review Comment:
   nit:
   An identifier -> ObjectIdentifier ( think it is always best to use the same 
word to describe something )
   An identifier must be fully qualified. -> An identifier must be fully 
qualified, by this we mean .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-2.0][FLINK-37149][doc] Add documents for adaptive batch execution. [flink]

2025-02-20 Thread via GitHub


JunRuiLee merged PR #26181:
URL: https://github.com/apache/flink/pull/26181


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-2.0][FLINK-37149][doc] Add documents for adaptive batch execution. [flink]

2025-02-20 Thread via GitHub


JunRuiLee opened a new pull request, #26181:
URL: https://github.com/apache/flink/pull/26181

   
   
   
   ## What is the purpose of the change
   
   [BP-2.0][FLINK-37149][doc] Add documents for adaptive batch execution.
   
   
   ## Brief change log
   
   [BP-2.0][FLINK-37149][doc] Add documents for adaptive batch execution.
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37350][table-planner] Fix memory leak caused by skewed join optimization strategy [flink]

2025-02-20 Thread via GitHub


JunRuiLee commented on code in PR #26180:
URL: https://github.com/apache/flink/pull/26180#discussion_r1963051617


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java:
##
@@ -658,4 +660,148 @@ public static long calculateDataVolumePerTaskForInput(
 long globalDataVolumePerTask, long inputsGroupBytes, long 
totalDataBytes) {
 return (long) ((double) inputsGroupBytes / totalDataBytes * 
globalDataVolumePerTask);
 }
+
+public static Optional constructOptimizationLog(
+BlockingInputInfo inputInfo, JobVertexInputInfo 
jobVertexInputInfo) {
+if (inputInfo.areInterInputsKeysCorrelated() && 
inputInfo.isIntraInputKeyCorrelated()) {
+return Optional.empty();
+}
+boolean optimized = false;
+List executionVertexInputInfos =
+jobVertexInputInfo.getExecutionVertexInputInfos();
+int parallelism = executionVertexInputInfos.size();
+long[] optimizedDataBytes = new long[parallelism];
+long optimizedMin = Long.MAX_VALUE, optimizedMax = 0;
+long[] nonOptimizedDataBytes = new long[parallelism];
+long nonOptimizedMin = Long.MAX_VALUE, nonOptimizedMax = 0;
+for (int i = 0; i < parallelism; ++i) {
+Map consumedSubpartitionGroups =
+
executionVertexInputInfos.get(i).getConsumedSubpartitionGroups();
+for (Map.Entry entry : 
consumedSubpartitionGroups.entrySet()) {
+IndexRange partitionRange = entry.getKey();
+IndexRange subpartitionRange = entry.getValue();
+optimizedDataBytes[i] +=
+inputInfo.getNumBytesProduced(partitionRange, 
subpartitionRange);
+}
+optimizedMin = Math.min(optimizedMin, optimizedDataBytes[i]);
+optimizedMax = Math.max(optimizedMax, optimizedDataBytes[i]);
+
+Map nonOptimizedConsumedSubpartitionGroup =
+computeNumBasedConsumedSubpartitionGroup(parallelism, i, 
inputInfo);
+checkState(nonOptimizedConsumedSubpartitionGroup.size() == 1);
+nonOptimizedDataBytes[i] +=
+inputInfo.getNumBytesProduced(
+nonOptimizedConsumedSubpartitionGroup
+.entrySet()
+.iterator()
+.next()
+.getKey(),
+nonOptimizedConsumedSubpartitionGroup
+.entrySet()
+.iterator()
+.next()
+.getValue());
+nonOptimizedMin = Math.min(nonOptimizedMin, 
nonOptimizedDataBytes[i]);
+nonOptimizedMax = Math.max(nonOptimizedMax, 
nonOptimizedDataBytes[i]);
+
+if (!optimized
+&& 
!consumedSubpartitionGroups.equals(nonOptimizedConsumedSubpartitionGroup)) {
+optimized = true;
+}
+}
+if (optimized) {
+long optimizedMed = median(optimizedDataBytes);
+long nonOptimizedMed = median(nonOptimizedDataBytes);
+String logMessage =
+String.format(
+"Result id: %s, "
++ "type number: %d, "
++ "input data size: "
++ "[ Before: {min: %s, median: %s, max: 
%s}, "
++ "After: {min: %s, median: %s, max: %s} 
]",
+inputInfo.getResultId(),
+inputInfo.getInputTypeNumber(),
+new 
MemorySize(nonOptimizedMin).toHumanReadableString(),
+new 
MemorySize(nonOptimizedMed).toHumanReadableString(),
+new 
MemorySize(nonOptimizedMax).toHumanReadableString(),
+new 
MemorySize(optimizedMin).toHumanReadableString(),
+new 
MemorySize(optimizedMed).toHumanReadableString(),
+new 
MemorySize(optimizedMax).toHumanReadableString());
+return Optional.of(logMessage);
+}
+return Optional.empty();
+}
+
+private static Map 
computeNumBasedConsumedSubpartitionGroup(
+int parallelism, int currentIndex, BlockingInputInfo inputInfo) {
+int sourceParallelism = inputInfo.getNumPartitions();
+
+if (inputInfo.isPointwise()) {
+return computeNumBasedConsumedSubpartitionGroupForPointwise(
+sourceParallelism, parallelism, currentIndex, 
inputInfo::getNumSubpartitions);
+} else {
+return computeNumBasedConsumedSubpartitionGroupForAllToAll(
+   

[jira] [Created] (FLINK-37359) Docker build fails for Kubernetes operator

2025-02-20 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-37359:
--

 Summary: Docker build fails for Kubernetes operator
 Key: FLINK-37359
 URL: https://issues.apache.org/jira/browse/FLINK-37359
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Reporter: Gyula Fora


All docker build started to fail with the following error:


{noformat}
20.45 Setting up libc-bin (2.35-0ubuntu3.9) ...
20.59 qemu: uncaught target signal 11 (Segmentation fault) - core dumped
20.95 Segmentation fault (core dumped)
21.00 qemu: uncaught target signal 11 (Segmentation fault) - core dumped
21.37 Segmentation fault (core dumped)
Sub-process /usr/bin/dpkg returned an error code (1)
--
WARNING: No output specified for bake-platform target(s) with docker-container 
driver. Build result will only remain in the build cache. To push result image 
into registry use --push or to load image into docker use --load
Dockerfile:68

  66 | # Updating Debian
  67 | RUN if [ "$SKIP_OS_UPDATE" = "false" ]; then apt-get update; fi
  68 | >>> RUN if [ "$SKIP_OS_UPDATE" = "false" ]; then apt-get upgrade -y; fi
  69 | 
  70 | ARG DISABLE_JEMALLOC=false

ERROR: failed to solve: process "/bin/sh -c if [ \"$SKIP_OS_UPDATE\" = 
\"false\" ]; then apt-get upgrade -y; fi" did not complete successfully: exit 
code: 100{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-37356) Recycle use of kafka producer(which commit error) maybe send data without AddPartitionsToTxnRequest

2025-02-20 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928706#comment-17928706
 ] 

Hongshun Wang commented on FLINK-37356:
---

[~arvid] . The flink log is lost, thus I have to try in logically. I will add 
flink log next time I met again.

 

I will try describe it in a small case:

1. KafkaWriter some data in a transaction 1 to 
{color:#172b4d}{color:#0747a6}partition A{color} {color}with 
{color:#FF}Producer1{color}, then generate KafkaCommittable to 
KafkaCommitter. At this time, transactionManager in {color:#de350b}Producer 
1{color} save {color:#4c9aff}Partition A {color}into 
partitionsInTransaction after ADD_OFFSETS_TO_TXN.
 
2. KafkaCommitter try to commit transaction 1, but fail with OOM(or else)
. Thus, KafkaCommitter will recycle {color:#de350b}Producer 1{color} (put back 
into producerPool of KafkaWriter). But in this step, we doesn't clear the 
partitionsInTransaction of  {color:#de350b}Producer 1. {color}{color:#de350b} 
The KafkaCommitter and KafkaWriter are not in same thread, thus before 
KafkaCommitter stop the job, KafkaWriter can get {color:#de350b}Producer 1 from 
producerPool.{color}{color}
 
3. KafkaWriter some data in a transaction 2 to 
{color:#172b4d}{color:#0747a6}partition A{color} {color}with 
{color:#de350b}Producer 1(which is from producerPool).{color:#172b4d} When try 
to add partition A to transaction, it turn out that {color:#4c9aff}Partition A 
{color:#172b4d}is already in partitionsInTransaction of transactionManager. 
Thus, it won't send {color:#de350b}{color:#172b4d}{color:#4c9aff}Partition A 
{color:#172b4d}with ADD_OFFSETS_TO_TXN again, but directly write into the 
cluster.{color}{color}{color}{color}{color}{color}{color}{color}

> Recycle use of kafka producer(which commit error) maybe send data without 
> AddPartitionsToTxnRequest
> ---
>
> Key: FLINK-37356
> URL: https://issues.apache.org/jira/browse/FLINK-37356
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.4.0
>Reporter: Hongshun Wang
>Priority: Major
>
> In my production environment,  READ_COMMITTED consumer can no longer consume 
> any records. Then I found that the LSO of the partition doesn't change for a 
> long time.  I lookup all the log in Kafka cluster, then find that there is a 
> transaction lacking AddPartitionsToTxnRequest.
>  
> At first, I think the problem is caused by 
> https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster 
> log contains InvalidTxnStateException. However, though the transaction is in 
> an invalid state,  no data is written into Kafka topic partition in this 
> transaction( because in this case, the transaction is empty). It will not 
> influence any Kafka topic partition's LSO, thus consumer won't be blocked.
>  
> Then I check the code of Kafka client, it seems no way to produce data 
> without AddPartitionsToTxnRequest done because the the `Sender` will refuse 
> to dequeue batches from the accumulator until they have been added to the 
> transaction.
> {code:java}
> // org.apache.kafka.clients.producer.KafkaProducer#doSend
> private Future doSend(ProducerRecord record, Callback 
> callback) {
> // ..ignore code
> // Add the partition to the transaction (if in progress) after it has been 
> successfully
> // appended to the accumulator. We cannot do it before because the partition 
> may be
> // unknown or the initially selected partition may be changed when the batch 
> is closed
> // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse 
> to dequeue
> // batches from the accumulator until they have been added to the transaction.
> if (transactionManager != null) {
> transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
> // ignore code
> }{code}
> {code:java}
> //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
>  
> if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 
> there is a rare case that a single batch size is larger than the request size 
> due to // compression; in this case we will still eventually send this batch 
> in a single request break; } else { if 
> (shouldStopDrainBatchesForPartition(first, tp)) break; }
> {code}
>  
> Then I have a idea that if a TransactionManager which doesn't clear 
> partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will 
> be sent again.  It maybe happen in Flink Kafka connector:
>  
>       1. The flink kafka connector also reuse and recycle KafkaProducer: 
> KafkaCommitter will recycle the producer to
> producerPool after the transaction complete or exception,  and then 
> KafkaWriter will reuse it from producerPoo

Re: [PR] [FLINK-36409] Publish some autoscaler metrics during stabilisation period [flink-kubernetes-operator]

2025-02-20 Thread via GitHub


mxm commented on PR #945:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/945#issuecomment-2671149679

   > I guess Max is under the assumption that the current logic does not 
collect metrics during the stabilization period. We do collect samples, and 
once the stabilization is over we even evaluate them. This PR does not change 
that logic, so not sure what should be controlled by a flag. The only thing the 
PR does is that it reports those metrics. Can you clarify? I might missing 
something obvious from the current logic.
   
   You're right, we already return metrics from the stabilization phase, but 
only to measure the observed true processing rate. In the original model, we 
only returned metrics once the metric window was full. I think that was more 
elegant, but the source metrics proved not reliable enough that we had to 
manually measure the processing capacity instead of always relying on the 
processing rate and busyness metrics of sources.
   
   I might be a bit pedantic here, but I want to see the actual metrics used 
for evaluation reported as autoscaler metrics. Reporting metrics during 
stabilization removes that clarity. You can only observe what the assumptions 
of the autoscaler were, if you observed what is actually used for evaluation. 
That's why I suggested to put reporting autoscaler metrics during the 
stabilization period behind a flag.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36409] Publish some autoscaler metrics during stabilisation period [flink-kubernetes-operator]

2025-02-20 Thread via GitHub


mxm commented on code in PR #945:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/945#discussion_r1963310240


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -149,19 +149,31 @@ public CollectedMetricHistory updateMetrics(
 // Add scaling metrics to history if they were computed successfully
 metricHistory.put(now, scalingMetrics);
 
-if (isStabilizing) {
-LOG.info("Stabilizing until {}", readable(stableTime));
-stateStore.storeCollectedMetrics(ctx, metricHistory);
-return new CollectedMetricHistory(topology, 
Collections.emptySortedMap(), jobRunningTs);
-}
-
 var collectedMetrics = new CollectedMetricHistory(topology, 
metricHistory, jobRunningTs);
 if (now.isBefore(windowFullTime)) {
-LOG.info("Metric window not full until {}", 
readable(windowFullTime));
+if (isStabilizing) {
+LOG.info(
+"Stabilizing... until {}. {} samples collected",

Review Comment:
   True, we return the metrics from the stabilization period once the 
stabilization period is over. They are used to calculate the observed true 
processing rate. When the window is full, we clear the metrics from the 
stabilization period and mark the window as fully collected.
   
   I still think that returning this logging message ("XX samples collected") 
to the user during stabilization is confusing. It suggests that these metrics 
will be evaluated for scaling decisions, which they are not. We are merely 
measuring the processing capacity of the sources.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/DateTimeUtils.java:
##
@@ -26,7 +26,7 @@
 public class DateTimeUtils {
 
 private static final DateTimeFormatter DEFAULT_FORMATTER =
-DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss");
+DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss.SSS");

Review Comment:
   AFAIK we don't use any milliseconds in the tests, but advance only by full 
seconds. I think this may have unexpected consequences for the storage and 
logging layer. If this is a test-only change, then it should go into test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37350][table-planner] Fix memory leak caused by skewed join optimization strategy [flink]

2025-02-20 Thread via GitHub


JunRuiLee commented on code in PR #26180:
URL: https://github.com/apache/flink/pull/26180#discussion_r1963052390


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java:
##
@@ -658,4 +660,148 @@ public static long calculateDataVolumePerTaskForInput(
 long globalDataVolumePerTask, long inputsGroupBytes, long 
totalDataBytes) {
 return (long) ((double) inputsGroupBytes / totalDataBytes * 
globalDataVolumePerTask);
 }
+
+public static Optional constructOptimizationLog(
+BlockingInputInfo inputInfo, JobVertexInputInfo 
jobVertexInputInfo) {
+if (inputInfo.areInterInputsKeysCorrelated() && 
inputInfo.isIntraInputKeyCorrelated()) {
+return Optional.empty();
+}
+boolean optimized = false;
+List executionVertexInputInfos =
+jobVertexInputInfo.getExecutionVertexInputInfos();
+int parallelism = executionVertexInputInfos.size();
+long[] optimizedDataBytes = new long[parallelism];
+long optimizedMin = Long.MAX_VALUE, optimizedMax = 0;
+long[] nonOptimizedDataBytes = new long[parallelism];
+long nonOptimizedMin = Long.MAX_VALUE, nonOptimizedMax = 0;
+for (int i = 0; i < parallelism; ++i) {
+Map consumedSubpartitionGroups =
+
executionVertexInputInfos.get(i).getConsumedSubpartitionGroups();
+for (Map.Entry entry : 
consumedSubpartitionGroups.entrySet()) {
+IndexRange partitionRange = entry.getKey();
+IndexRange subpartitionRange = entry.getValue();
+optimizedDataBytes[i] +=
+inputInfo.getNumBytesProduced(partitionRange, 
subpartitionRange);
+}
+optimizedMin = Math.min(optimizedMin, optimizedDataBytes[i]);
+optimizedMax = Math.max(optimizedMax, optimizedDataBytes[i]);
+
+Map nonOptimizedConsumedSubpartitionGroup =
+computeNumBasedConsumedSubpartitionGroup(parallelism, i, 
inputInfo);
+checkState(nonOptimizedConsumedSubpartitionGroup.size() == 1);
+nonOptimizedDataBytes[i] +=
+inputInfo.getNumBytesProduced(
+nonOptimizedConsumedSubpartitionGroup
+.entrySet()
+.iterator()
+.next()
+.getKey(),
+nonOptimizedConsumedSubpartitionGroup
+.entrySet()
+.iterator()
+.next()
+.getValue());
+nonOptimizedMin = Math.min(nonOptimizedMin, 
nonOptimizedDataBytes[i]);
+nonOptimizedMax = Math.max(nonOptimizedMax, 
nonOptimizedDataBytes[i]);
+
+if (!optimized
+&& 
!consumedSubpartitionGroups.equals(nonOptimizedConsumedSubpartitionGroup)) {
+optimized = true;
+}
+}
+if (optimized) {
+long optimizedMed = median(optimizedDataBytes);
+long nonOptimizedMed = median(nonOptimizedDataBytes);
+String logMessage =
+String.format(
+"Result id: %s, "
++ "type number: %d, "
++ "input data size: "
++ "[ Before: {min: %s, median: %s, max: 
%s}, "
++ "After: {min: %s, median: %s, max: %s} 
]",
+inputInfo.getResultId(),
+inputInfo.getInputTypeNumber(),
+new 
MemorySize(nonOptimizedMin).toHumanReadableString(),
+new 
MemorySize(nonOptimizedMed).toHumanReadableString(),
+new 
MemorySize(nonOptimizedMax).toHumanReadableString(),
+new 
MemorySize(optimizedMin).toHumanReadableString(),
+new 
MemorySize(optimizedMed).toHumanReadableString(),
+new 
MemorySize(optimizedMax).toHumanReadableString());
+return Optional.of(logMessage);
+}
+return Optional.empty();
+}
+
+private static Map 
computeNumBasedConsumedSubpartitionGroup(
+int parallelism, int currentIndex, BlockingInputInfo inputInfo) {
+int sourceParallelism = inputInfo.getNumPartitions();
+
+if (inputInfo.isPointwise()) {
+return computeNumBasedConsumedSubpartitionGroupForPointwise(
+sourceParallelism, parallelism, currentIndex, 
inputInfo::getNumSubpartitions);
+} else {
+return computeNumBasedConsumedSubpartitionGroupForAllToAll(
+   

Re: [PR] [FLINK-37348][pipeline-paimon] fix paimon ArrayIndexOutOfBoundsException when add column first [flink-cdc]

2025-02-20 Thread via GitHub


MOBIN-F commented on code in PR #3925:
URL: https://github.com/apache/flink-cdc/pull/3925#discussion_r1961136374


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java:
##
@@ -463,12 +463,20 @@ public void testAddColumnWithPosition(String metastore)
 "col7_after", 
org.apache.flink.cdc.common.types.DataTypes.STRING()),
 "col2"));
 
+addedColumns.add(
+AddColumnEvent.before(
+Column.physicalColumn(
+"col4_first_before",
+
org.apache.flink.cdc.common.types.DataTypes.STRING()),
+"col1"));

Review Comment:
   fix~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36659] Bump to 2.0-preview1. [flink-connector-jdbc]

2025-02-20 Thread via GitHub


leonardBang commented on code in PR #153:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/153#discussion_r1944348435


##
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java:
##
@@ -106,66 +92,66 @@ void testEnrichedClassCastException() {
 }
 }
 
-private void runTest(boolean exploitParallelism) throws Exception {
-ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
-JdbcInputFormat.JdbcInputFormatBuilder inputBuilder =
-JdbcInputFormat.buildJdbcInputFormat()
-.setDrivername(getMetadata().getDriverClass())
-.setDBUrl(getMetadata().getJdbcUrl())
-.setQuery(SELECT_ALL_BOOKS)
-.setRowTypeInfo(ROW_TYPE_INFO);
-
-if (exploitParallelism) {
-final int fetchSize = 1;
-final long min = TEST_DATA[0].id;
-final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
-// use a "splittable" query to exploit parallelism
-inputBuilder =
-inputBuilder
-.setQuery(SELECT_ALL_BOOKS_SPLIT_BY_ID)
-.setParametersProvider(
-new 
JdbcNumericBetweenParametersProvider(min, max)
-.ofBatchSize(fetchSize));
-}
-DataSet source = environment.createInput(inputBuilder.finish());
-
-// NOTE: in this case (with Derby driver) setSqlTypes could be 
skipped, but
-// some databases don't null values correctly when no column type was 
specified
-// in PreparedStatement.setObject (see its javadoc for more details)
-org.apache.flink.connector.jdbc.JdbcConnectionOptions 
connectionOptions =
-new org.apache.flink.connector.jdbc.JdbcConnectionOptions
-.JdbcConnectionOptionsBuilder()
-.withUrl(getMetadata().getJdbcUrl())
-.withDriverName(getMetadata().getDriverClass())
-.build();
-
-JdbcOutputFormat jdbcOutputFormat =
-new JdbcOutputFormat<>(
-new SimpleJdbcConnectionProvider(connectionOptions),
-JdbcExecutionOptions.defaults(),
-() ->
-createSimpleRowExecutor(
-String.format(INSERT_TEMPLATE, 
OUTPUT_TABLE),
-new int[] {
-Types.INTEGER,
-Types.VARCHAR,
-Types.VARCHAR,
-Types.DOUBLE,
-Types.INTEGER
-}));
-source.output(new TestOutputFormat(jdbcOutputFormat));
-environment.execute();
-
-try (Connection dbConn = 
DriverManager.getConnection(getMetadata().getJdbcUrl());
-PreparedStatement statement = 
dbConn.prepareStatement(SELECT_ALL_NEWBOOKS);
-ResultSet resultSet = statement.executeQuery()) {
-int count = 0;
-while (resultSet.next()) {
-count++;
-}
-assertThat(count).isEqualTo(TEST_DATA.length);
-}
-}
+//private void runTest(boolean exploitParallelism) throws Exception {

Review Comment:
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-2.0][FLINK-37149][doc] Add documents for adaptive batch execution. [flink]

2025-02-20 Thread via GitHub


flinkbot commented on PR #26181:
URL: https://github.com/apache/flink/pull/26181#issuecomment-2670908708

   
   ## CI report:
   
   * f5ea5d1375ce527720ee8be6fd8081f5805535a5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35014] SqlNode to operation conversion for models [flink]

2025-02-20 Thread via GitHub


lihaosky commented on code in PR #25834:
URL: https://github.com/apache/flink/pull/25834#discussion_r1964599071


##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * ALTER MODEL [IF EXISTS] [[catalogName.] dataBasesName.]modelName RESET ( 
'key1' [, 'key2']...).
+ */
+public class SqlAlterModelReset extends SqlAlterModel {
+private final SqlNodeList optionKeyList;
+
+public SqlAlterModelReset(
+SqlParserPos pos,
+SqlIdentifier modelName,
+boolean ifModelExists,
+SqlNodeList optionKeyList) {
+super(pos, modelName, ifModelExists);
+this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList 
should not be null");
+}
+
+@Override
+public List getOperandList() {
+return ImmutableNullableList.of(modelName, optionKeyList);

Review Comment:
   Didn't change this. Let me know if you want me to change this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34467] add lineage integration for jdbc connector [flink-connector-jdbc]

2025-02-20 Thread via GitHub


HuangZhenQiu commented on code in PR #149:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/149#discussion_r1964608069


##
flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/Db2LocationExtractor.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.db2.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocation;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor;
+import org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor;
+
+import java.net.URISyntaxException;
+import java.util.Properties;
+
+/**
+ * Implementation of {@link JdbcLocationExtractor} for DB2.
+ *
+ * @see https://www.ibm.com/docs/en/db2woc?topic=programmatically-jdbc";>DB2 URL 
Format
+ */
+@Internal
+public class Db2LocationExtractor implements JdbcLocationExtractor {
+
+private JdbcLocationExtractor delegate() {
+return new OverrideJdbcLocationExtractor("db2");

Review Comment:
   Make sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35724][oss] Bump OSS SDK to 3.17.4 [flink]

2025-02-20 Thread via GitHub


JunRuiLee merged PR #26177:
URL: https://github.com/apache/flink/pull/26177


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-37333) Use ForwardForUnspecifiedPartitioner when adaptive broadcast join takes effect

2025-02-20 Thread Junrui Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928971#comment-17928971
 ] 

Junrui Lee commented on FLINK-37333:


release-2.0 83dc6f4013854ccc5089daf7d9bc0472504e9bde

master ee0c187b35df9a20ee3249f984274f012a7b5309

> Use ForwardForUnspecifiedPartitioner when adaptive broadcast join takes effect
> --
>
> Key: FLINK-37333
> URL: https://issues.apache.org/jira/browse/FLINK-37333
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 2.0.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> According to the design, we need to change the original forward partitioner 
> to a rescale partitioner after the adaptive broadcast join takes effect. 
> Accordingly, we need to use ForwardForUnspecifiedPartitioner to achieve this 
> purpose.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [BP-2.0][FLINK-35724][oss] Bump OSS SDK to 3.17.4 [flink]

2025-02-20 Thread via GitHub


JunRuiLee opened a new pull request, #26189:
URL: https://github.com/apache/flink/pull/26189

   (cherry picked from commit 225ef6a2a060bff13530d699d20fb8d5f1f8b7e4)
   
   
   
   ## What is the purpose of the change
   
   [BP-2.0][FLINK-35724][oss] Bump OSS SDK to 3.17.4
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35936][cdc-connector][paimon] Fix paimon cdc schema evolution failure when restart job [flink-cdc]

2025-02-20 Thread via GitHub


github-actions[bot] closed pull request #3502: 
[FLINK-35936][cdc-connector][paimon] Fix paimon cdc schema evolution failure 
when restart job
URL: https://github.com/apache/flink-cdc/pull/3502


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35936][cdc-connector][paimon] Fix paimon cdc schema evolution failure when restart job [flink-cdc]

2025-02-20 Thread via GitHub


github-actions[bot] commented on PR #3502:
URL: https://github.com/apache/flink-cdc/pull/3502#issuecomment-2672990347

   This pull request has been closed because it has not had recent activity. 
You could reopen it if you try to continue your work, and anyone who are 
interested in it are encouraged to continue work on this pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35622][runtime] Filter out noisy 'Coordinator of operator xxx does not exist' exceptions in CollectResultFetcher [flink]

2025-02-20 Thread via GitHub


JunRuiLee merged PR #26171:
URL: https://github.com/apache/flink/pull/26171


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35622) Filter out noisy "Coordinator of operator xxxx does not exist" exceptions in batch mode

2025-02-20 Thread Junrui Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junrui Lee closed FLINK-35622.
--
Fix Version/s: 2.0.1
   Resolution: Fixed

master c92f208f4a074812b5314f2b4858f244b5ecd9fe

> Filter out noisy "Coordinator of operator  does not exist" exceptions in 
> batch mode
> ---
>
> Key: FLINK-35622
> URL: https://issues.apache.org/jira/browse/FLINK-35622
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Assignee: Biao Geng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.1
>
>
> In batch mode, the Flink JobManager logs frequently print "Coordinator of 
> operator  does not exist or the job vertex this operator belongs to is 
> not initialized." exceptions when using the collect() method.
> This exception is caused by the collect sink attempting to fetch data from 
> the corresponding operator coordinator on the JM based on the operator ID. 
> However, batch jobs do not initialize all job vertices at the beginning, and 
> instead, initialize them in a sequential manner. If a job vertex has not been 
> initialized yet, the corresponding coordinator cannot be found, leading to 
> the printing of this message.
> These exceptions are harmless and do not affect the job execution, but they 
> can clutter the logs and make it difficult to find relevant information, 
> especially for large-scale batch jobs.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35622) Filter out noisy "Coordinator of operator xxxx does not exist" exceptions in batch mode

2025-02-20 Thread Junrui Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junrui Lee reassigned FLINK-35622:
--

Assignee: Biao Geng

> Filter out noisy "Coordinator of operator  does not exist" exceptions in 
> batch mode
> ---
>
> Key: FLINK-35622
> URL: https://issues.apache.org/jira/browse/FLINK-35622
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Assignee: Biao Geng
>Priority: Major
>  Labels: pull-request-available
>
> In batch mode, the Flink JobManager logs frequently print "Coordinator of 
> operator  does not exist or the job vertex this operator belongs to is 
> not initialized." exceptions when using the collect() method.
> This exception is caused by the collect sink attempting to fetch data from 
> the corresponding operator coordinator on the JM based on the operator ID. 
> However, batch jobs do not initialize all job vertices at the beginning, and 
> instead, initialize them in a sequential manner. If a job vertex has not been 
> initialized yet, the corresponding coordinator cannot be found, leading to 
> the printing of this message.
> These exceptions are harmless and do not affect the job execution, but they 
> can clutter the logs and make it difficult to find relevant information, 
> especially for large-scale batch jobs.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-37366) Allow configurable retry for Kafka topic metadata fetch

2025-02-20 Thread Shuyi Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen reassigned FLINK-37366:
--

Assignee: Shuyi Chen

> Allow configurable retry for Kafka topic metadata fetch
> ---
>
> Key: FLINK-37366
> URL: https://issues.apache.org/jira/browse/FLINK-37366
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> For high availability, we adopted a multi-primary Kafka cluster setup, so the 
> data of a Kafka topic will be in multiple physical clusters. In case of a 
> kafka cluster failure, Flink pipeline should  continue to run w/o failure. 
> Currently, Flink pipeline will fail due to SubscriberUtils.getTopicMetadata() 
> throwing RuntimeException if a kafka cluster fails, causing the pipeline keep 
> restarting. We propose to add a configurable retry policy in 
> SubscriberUtils.getTopicMetadata(), so we can configure flink Kafka connector 
> to tolerate kafka failure for longer period of time w/o restarting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37357) Migrating SqlNode conversion logic from to SqlNodeConverters

2025-02-20 Thread Hirson Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hirson Zhang updated FLINK-37357:
-
Summary: Migrating SqlNode conversion logic from  to SqlNodeConverters  
(was: Migrating conversion logic to SqlNodeConverters)

> Migrating SqlNode conversion logic from  to SqlNodeConverters
> -
>
> Key: FLINK-37357
> URL: https://issues.apache.org/jira/browse/FLINK-37357
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 2.0-preview
>Reporter: Hirson Zhang
>Priority: Major
>
> There is a "TODO" at class "SqlNodeToOperationConversion", method 
> "convertValidatedSqlNode": 
> "TODO: all the below conversion logic should be migrated to 
> SqlNodeConverters".
> Part of the code: 
> {code:java}
> /** Convert a validated sql node to Operation. */
> private static Optional convertValidatedSqlNode(
>         FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode 
> validated) {
>     beforeConversion();    // delegate conversion to the registered 
> converters first
>     SqlNodeConvertContext context = new SqlNodeConvertContext(flinkPlanner, 
> catalogManager);
>     Optional operation = 
> SqlNodeConverters.convertSqlNode(validated, context);
>     if (operation.isPresent()) {
>         return operation;
>     }    // TODO: all the below conversion logic should be migrated to 
> SqlNodeConverters
>     SqlNodeToOperationConversion converter =
>             new SqlNodeToOperationConversion(flinkPlanner, catalogManager);
>     if (validated instanceof SqlDropCatalog) {
>         return Optional.of(converter.convertDropCatalog((SqlDropCatalog) 
> validated));
>     } else if (validated instanceof SqlLoadModule) {
>         return Optional.of(converter.convertLoadModule((SqlLoadModule) 
> validated));
>     } else if (validated instanceof SqlShowCurrentCatalog) {
>         return Optional.of(
>                 converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) 
> validated));
>     } else if (validated instanceof SqlShowModules) {
>         return Optional.of(converter.convertShowModules((SqlShowModules) 
> validated)); 
> .{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37357) Migrating SqlNode conversion logic from SqlNodeToOperationConversion to SqlNodeConverters

2025-02-20 Thread Hirson Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hirson Zhang updated FLINK-37357:
-
Summary: Migrating SqlNode conversion logic from  
SqlNodeToOperationConversion to SqlNodeConverters  (was: Migrating SqlNode 
conversion logic from  to SqlNodeConverters)

> Migrating SqlNode conversion logic from  SqlNodeToOperationConversion to 
> SqlNodeConverters
> --
>
> Key: FLINK-37357
> URL: https://issues.apache.org/jira/browse/FLINK-37357
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 2.0-preview
>Reporter: Hirson Zhang
>Priority: Major
>
> There is a "TODO" at class "SqlNodeToOperationConversion", method 
> "convertValidatedSqlNode": 
> "TODO: all the below conversion logic should be migrated to 
> SqlNodeConverters".
> Part of the code: 
> {code:java}
> /** Convert a validated sql node to Operation. */
> private static Optional convertValidatedSqlNode(
>         FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode 
> validated) {
>     beforeConversion();    // delegate conversion to the registered 
> converters first
>     SqlNodeConvertContext context = new SqlNodeConvertContext(flinkPlanner, 
> catalogManager);
>     Optional operation = 
> SqlNodeConverters.convertSqlNode(validated, context);
>     if (operation.isPresent()) {
>         return operation;
>     }    // TODO: all the below conversion logic should be migrated to 
> SqlNodeConverters
>     SqlNodeToOperationConversion converter =
>             new SqlNodeToOperationConversion(flinkPlanner, catalogManager);
>     if (validated instanceof SqlDropCatalog) {
>         return Optional.of(converter.convertDropCatalog((SqlDropCatalog) 
> validated));
>     } else if (validated instanceof SqlLoadModule) {
>         return Optional.of(converter.convertLoadModule((SqlLoadModule) 
> validated));
>     } else if (validated instanceof SqlShowCurrentCatalog) {
>         return Optional.of(
>                 converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) 
> validated));
>     } else if (validated instanceof SqlShowModules) {
>         return Optional.of(converter.convertShowModules((SqlShowModules) 
> validated)); 
> .{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37366) Allow configurable retry for Kafka topic metadata fetch

2025-02-20 Thread Shuyi Chen (Jira)
Shuyi Chen created FLINK-37366:
--

 Summary: Allow configurable retry for Kafka topic metadata fetch
 Key: FLINK-37366
 URL: https://issues.apache.org/jira/browse/FLINK-37366
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Shuyi Chen


For high availability, we adopted a multi-primary Kafka cluster setup, so the 
data of a Kafka topic will be in multiple physical clusters. In case of a kafka 
cluster failure, Flink pipeline should  continue to run w/o failure. Currently, 
Flink pipeline will fail due to SubscriberUtils.getTopicMetadata() throwing 
RuntimeException if a kafka cluster fails, causing the pipeline keep 
restarting. We propose to add a configurable retry policy in 
SubscriberUtils.getTopicMetadata(), so we can configure flink Kafka connector 
to tolerate kafka failure for longer period of time w/o restarting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-2.0][FLINK-35724][oss] Bump OSS SDK to 3.17.4 [flink]

2025-02-20 Thread via GitHub


flinkbot commented on PR #26189:
URL: https://github.com/apache/flink/pull/26189#issuecomment-2673252457

   
   ## CI report:
   
   * 3f24b40dd73221a2ac8790e2ce9fdf93d7e316e9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]

2025-02-20 Thread via GitHub


beryllw commented on code in PR #3856:
URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964738198


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##
@@ -470,7 +470,13 @@ private List splitEvenlySizedChunks(
 }
 }
 // add the ending split
-splits.add(ChunkRange.of(chunkStart, null));
+// assign ending split first, both the largest and smallest unbounded 
chunks are completed
+// in the first two splits
+if (sourceConfig.isAssignEndingChunkFirst()) {
+splits.add(0, ChunkRange.of(chunkStart, null));

Review Comment:
   Considering the use of `System.arraycopy` for performance, as we need to 
move n-1 elements, I noticed that the `ArrayList` function `add(int index, E 
element)` is implemented as follows and already uses `System.arraycopy` 
internally. Is there anything else I need to change?
   
   ```java
   /**
* Inserts the specified element at the specified position in this
* list. Shifts the element currently at that position (if any) and
* any subsequent elements to the right (adds one to their indices).
*
* @param index index at which the specified element is to be inserted
* @param element element to be inserted
* @throws IndexOutOfBoundsException {@inheritDoc}
*/
   public void add(int index, E element) {
   rangeCheckForAdd(index);
   
   ensureCapacityInternal(size + 1);  // Increments modCount!!
   System.arraycopy(elementData, index, elementData, index + 1,
size - index);
   elementData[index] = element;
   size++;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]

2025-02-20 Thread via GitHub


beryllw commented on code in PR #3856:
URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964738198


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##
@@ -470,7 +470,13 @@ private List splitEvenlySizedChunks(
 }
 }
 // add the ending split
-splits.add(ChunkRange.of(chunkStart, null));
+// assign ending split first, both the largest and smallest unbounded 
chunks are completed
+// in the first two splits
+if (sourceConfig.isAssignEndingChunkFirst()) {
+splits.add(0, ChunkRange.of(chunkStart, null));

Review Comment:
   Considering the use of `System.arraycopy` for performance, I noticed that 
the `ArrayList` function `add(int index, E element)` is implemented as follows 
and already uses `System.arraycopy` internally. Is there anything else I need 
to change?
   
   ```java
   /**
* Inserts the specified element at the specified position in this
* list. Shifts the element currently at that position (if any) and
* any subsequent elements to the right (adds one to their indices).
*
* @param index index at which the specified element is to be inserted
* @param element element to be inserted
* @throws IndexOutOfBoundsException {@inheritDoc}
*/
   public void add(int index, E element) {
   rangeCheckForAdd(index);
   
   ensureCapacityInternal(size + 1);  // Increments modCount!!
   System.arraycopy(elementData, index, elementData, index + 1,
size - index);
   elementData[index] = element;
   size++;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37329][table-planner] Skip Source Stats Collection When table.optimizer.source.report-statistics-enabled is False [flink]

2025-02-20 Thread via GitHub


shameersss1 commented on PR #26162:
URL: https://github.com/apache/flink/pull/26162#issuecomment-2673427565

   @davidradl  - I have addressed your comments. Could you please review the 
same ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37332] Support any column as chunk key column (postgres, orcale, db2, sqlserver) [flink-cdc]

2025-02-20 Thread via GitHub


SML0127 commented on PR #3922:
URL: https://github.com/apache/flink-cdc/pull/3922#issuecomment-2673458269

   > cc @lvyanquan
   
   I recently changed my laptop and some environment seems to have become a bit 
messed up..🥲
   I'll make a new PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37332] Support any column as chunk key column (postgres, orcale, db2, sqlserver) [flink-cdc]

2025-02-20 Thread via GitHub


SML0127 closed pull request #3922: [FLINK-37332] Support any column as chunk 
key column (postgres, orcale, db2, sqlserver)
URL: https://github.com/apache/flink-cdc/pull/3922


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36409] Publish some autoscaler metrics during stabilisation period [flink-kubernetes-operator]

2025-02-20 Thread via GitHub


morhidi merged PR #945:
URL: https://github.com/apache/flink-kubernetes-operator/pull/945


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]

2025-02-20 Thread via GitHub


beryllw commented on code in PR #3856:
URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964751251


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java:
##
@@ -313,4 +313,12 @@ public class MySqlDataSourceOptions {
 .defaultValue(true)
 .withDescription(
 "Whether to use legacy json format. The default 
value is true, which means there is no whitespace before value and after comma 
in json format.");
+
+@Experimental
+public static final ConfigOption 
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
+
ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled")

Review Comment:
   Given that "unboundary" is not a commonly used term and may cause confusion, 
I have decided to use "unbounded" instead. Therefore, the configuration item 
will be `scan.incremental.snapshot.unbounded-chunk-first.enabled`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]

2025-02-20 Thread via GitHub


leonardBang commented on code in PR #3856:
URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964761727


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##
@@ -470,7 +470,13 @@ private List splitEvenlySizedChunks(
 }
 }
 // add the ending split
-splits.add(ChunkRange.of(chunkStart, null));
+// assign ending split first, both the largest and smallest unbounded 
chunks are completed
+// in the first two splits
+if (sourceConfig.isAssignEndingChunkFirst()) {
+splits.add(0, ChunkRange.of(chunkStart, null));

Review Comment:
   No, thanks for your explanation 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]

2025-02-20 Thread via GitHub


leonardBang commented on code in PR #3856:
URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964756149


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java:
##
@@ -313,4 +313,12 @@ public class MySqlDataSourceOptions {
 .defaultValue(true)
 .withDescription(
 "Whether to use legacy json format. The default 
value is true, which means there is no whitespace before value and after comma 
in json format.");
+
+@Experimental
+public static final ConfigOption 
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
+
ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled")

Review Comment:
   +1 from my side, please keep go on



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]

2025-02-20 Thread via GitHub


beryllw commented on code in PR #3856:
URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964738198


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##
@@ -470,7 +470,13 @@ private List splitEvenlySizedChunks(
 }
 }
 // add the ending split
-splits.add(ChunkRange.of(chunkStart, null));
+// assign ending split first, both the largest and smallest unbounded 
chunks are completed
+// in the first two splits
+if (sourceConfig.isAssignEndingChunkFirst()) {
+splits.add(0, ChunkRange.of(chunkStart, null));

Review Comment:
   You mentioned using `System.arraycopy` for performance as we need to move 
n-1 elements. The `ArrayList` function `add(int index, E element)` is 
implemented as follows, using `System.arraycopy` internally. Is there anything 
else I need to change?
   
   ```java
   /**
* Inserts the specified element at the specified position in this
* list. Shifts the element currently at that position (if any) and
* any subsequent elements to the right (adds one to their indices).
*
* @param index index at which the specified element is to be inserted
* @param element element to be inserted
* @throws IndexOutOfBoundsException {@inheritDoc}
*/
   public void add(int index, E element) {
   rangeCheckForAdd(index);
   
   ensureCapacityInternal(size + 1);  // Increments modCount!!
   System.arraycopy(elementData, index, elementData, index + 1,
size - index);
   elementData[index] = element;
   size++;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37313] Fix the problem of reading binlog before the high and low watermarks during the snapshot process [flink-cdc]

2025-02-20 Thread via GitHub


gongzexin closed pull request #3920: [FLINK-37313] Fix the problem of reading 
binlog before the high and low watermarks during the snapshot process
URL: https://github.com/apache/flink-cdc/pull/3920


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37313] Fix the problem of reading binlog before the high and low watermarks during the snapshot process [flink-cdc]

2025-02-20 Thread via GitHub


gongzexin commented on PR #3920:
URL: https://github.com/apache/flink-cdc/pull/3920#issuecomment-2673182080

   > Hi, @gongzexin.
   > 
   > I think that the problem that you met was resolved in #3902 as [#3902 
(comment)](https://github.com/apache/flink-cdc/pull/3902#issuecomment-2639121416)
 describes, can you check for this?
   
   Hi,


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35360] support Flink cdc pipeline Yarn application mode [flink-cdc]

2025-02-20 Thread via GitHub


joyCurry30 commented on PR #3643:
URL: https://github.com/apache/flink-cdc/pull/3643#issuecomment-2673185291

   Could you please add a document, like this: 
https://github.com/apache/flink-cdc/blob/master/docs/content.zh/docs/deployment/yarn.md
 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]

2025-02-20 Thread via GitHub


leonardBang commented on code in PR #3856:
URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964634199


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java:
##
@@ -313,4 +313,12 @@ public class MySqlDataSourceOptions {
 .defaultValue(true)
 .withDescription(
 "Whether to use legacy json format. The default 
value is true, which means there is no whitespace before value and after comma 
in json format.");
+
+@Experimental
+public static final ConfigOption 
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
+
ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled")

Review Comment:
   I mean 
ConfigOptions.key("scan.incremental.snapshot.boundary-chunk-first.enabled") 
with default value `true`, currently we assign first chunk(unboundary chunk) 
and then assign boundary chunk, right?
   
   But the two ConfigOptions make sense to me when you use different default 
value, feel free to choose you flavor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37332] Support any column as chunk key column (postgres, orcale, db2, sqlserver) [flink-cdc]

2025-02-20 Thread via GitHub


gtk96 commented on PR #3922:
URL: https://github.com/apache/flink-cdc/pull/3922#issuecomment-2673187886

   cc @lvyanquan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37333][runtime] Use ForwardForUnspecifiedPartitioner when adaptive broadcast join takes effect [flink]

2025-02-20 Thread via GitHub


JunRuiLee merged PR #26178:
URL: https://github.com/apache/flink/pull/26178


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-37149) Add Adaptive Batch Execution Documentation

2025-02-20 Thread Junrui Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junrui Lee closed FLINK-37149.
--
Resolution: Done

master 1c759337fd9eef2bcf482ae173d0d45355213565
release-2.0 63e1bfbbae1ff43b150fbc0ec85c21996a73a0d1

> Add Adaptive Batch Execution Documentation
> --
>
> Key: FLINK-37149
> URL: https://issues.apache.org/jira/browse/FLINK-37149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Junrui Lee
>Assignee: Junrui Lee
>Priority: Major
>  Labels: 2.0-related, pull-request-available
>
> This ticket aims to introduce a new page for adaptive batch execution and 
> includes the following parts:
>  # Adaptive parallelism inference
>  # Adaptive broadcast join
>  # Adaptive skewed join optimization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963468045


##
flink-python/pyflink/table/table.py:
##
@@ -1077,6 +1078,91 @@ def explain(self, *extra_details: ExplainDetail) -> str:
 j_extra_details = to_j_explain_detail_arr(extra_details)
 return self._j_table.explain(TEXT, j_extra_details)
 
+def insert_into(
+self, table_path_or_descriptor: Union[str, TableDescriptor], 
overwrite: bool = False
+) -> TablePipeline:
+"""
+When ``target_path_or_descriptor`` is a table path:
+
+Declares that the pipeline defined by the given :class:`Table` 
(backed by a
+DynamicTableSink) should be written to a table that was registered 
under the specified
+path.
+
+See the documentation of
+
:func:`pyflink.table.table_environment.TableEnvironment.use_database` or
+
:func:`pyflink.table.table_environment.TableEnvironment.use_catalog` for the 
rules on
+the path resolution.
+
+Example:
+::
+
+>>> table = table_env.sql_query("SELECTFROM MyTable")
+>>> table_pipeline = 
table.insert_into_table_path("MySinkTable", True)
+>>> table_result = table_pipeline.execute().wait()
+
+When ``target_path_or_descriptor`` is a  
:class:`~pyflink.table.TableDescriptor` :
+
+Declares that the pipeline defined by the given :class:`Table` 
object should be written
+to a table (backed by a DynamicTableSink) expressed via the given
+:class:`~pyflink.table.TableDescriptor`.
+
+The descriptor won't be registered in the catalog, but it will be 
propagated directly
+in the operation tree. Note that calling this method multiple 
times, even with the same
+descriptor, results in multiple sink tables instances.
+
+This method allows to declare a :class:`~pyflink.table.Schema` for 
the sink descriptor.

Review Comment:
   nit: `This method allows to declare` -> A :class:`~pyflink.table.Schema` can 
be associated with the sink descriptor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Fix post-release japicmp update script for macOS [flink]

2025-02-20 Thread via GitHub


flinkbot commented on PR #26183:
URL: https://github.com/apache/flink/pull/26183#issuecomment-2671340643

   
   ## CI report:
   
   * 00795756ff864f9783d6ce8e38c3d99213845493 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37361) Update japicmp configuration post 1.19.2

2025-02-20 Thread Alexander Fedulov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Fedulov updated FLINK-37361:
--
Fix Version/s: 1.19.3

> Update japicmp configuration post 1.19.2
> 
>
> Key: FLINK-37361
> URL: https://issues.apache.org/jira/browse/FLINK-37361
> Project: Flink
>  Issue Type: Technical Debt
>Affects Versions: 1.19.1
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
> Fix For: 1.19.3
>
>
> Update the japicmp reference version and wipe exclusions / enable API 
> compatibility checks for {{@PublicEvolving}} APIs on the corresponding 
> SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see 
> below).
> For a new major release (x.y.0), run the same command also on the master 
> branch for updating the japicmp reference version and removing out-dated 
> exclusions in the japicmp configuration.
> Make sure that all Maven artifacts are already pushed to Maven Central. 
> Otherwise, there's a risk that CI fails due to missing reference artifacts.
> {code:bash}
> tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh
> tools $ cd ..
> $ git add *
> $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37360) Update japicmp configuration post 1.20.1

2025-02-20 Thread Alexander Fedulov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Fedulov updated FLINK-37360:
--
Fix Version/s: 1.20.2

> Update japicmp configuration post 1.20.1
> 
>
> Key: FLINK-37360
> URL: https://issues.apache.org/jira/browse/FLINK-37360
> Project: Flink
>  Issue Type: Technical Debt
>Affects Versions: 1.20.1
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
> Fix For: 1.20.2
>
>
> Update the japicmp reference version and wipe exclusions / enable API 
> compatibility checks for {{@PublicEvolving}} APIs on the corresponding 
> SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see 
> below).
> For a new major release (x.y.0), run the same command also on the master 
> branch for updating the japicmp reference version and removing out-dated 
> exclusions in the japicmp configuration.
> Make sure that all Maven artifacts are already pushed to Maven Central. 
> Otherwise, there's a risk that CI fails due to missing reference artifacts.
> {code:bash}
> tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh
> tools $ cd ..
> $ git add *
> $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37361) Update japicmp configuration post 1.19.2

2025-02-20 Thread Alexander Fedulov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Fedulov updated FLINK-37361:
--
Affects Version/s: 1.19.1
   (was: 1.20.2)

> Update japicmp configuration post 1.19.2
> 
>
> Key: FLINK-37361
> URL: https://issues.apache.org/jira/browse/FLINK-37361
> Project: Flink
>  Issue Type: Technical Debt
>Affects Versions: 1.19.1
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>
> Update the japicmp reference version and wipe exclusions / enable API 
> compatibility checks for {{@PublicEvolving}} APIs on the corresponding 
> SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see 
> below).
> For a new major release (x.y.0), run the same command also on the master 
> branch for updating the japicmp reference version and removing out-dated 
> exclusions in the japicmp configuration.
> Make sure that all Maven artifacts are already pushed to Maven Central. 
> Otherwise, there's a risk that CI fails due to missing reference artifacts.
> {code:bash}
> tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh
> tools $ cd ..
> $ git add *
> $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37361) Update japicmp configuration post 1.19.2

2025-02-20 Thread Alexander Fedulov (Jira)
Alexander Fedulov created FLINK-37361:
-

 Summary: Update japicmp configuration post 1.19.2
 Key: FLINK-37361
 URL: https://issues.apache.org/jira/browse/FLINK-37361
 Project: Flink
  Issue Type: Technical Debt
Affects Versions: 1.20.2
Reporter: Alexander Fedulov
Assignee: Alexander Fedulov


Update the japicmp reference version and wipe exclusions / enable API 
compatibility checks for {{@PublicEvolving}} APIs on the corresponding SNAPSHOT 
branch with the {{update_japicmp_configuration.sh}} script (see below).

For a new major release (x.y.0), run the same command also on the master branch 
for updating the japicmp reference version and removing out-dated exclusions in 
the japicmp configuration.

Make sure that all Maven artifacts are already pushed to Maven Central. 
Otherwise, there's a risk that CI fails due to missing reference artifacts.
{code:bash}
tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh
tools $ cd ..
$ git add *
$ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37360) Update japicmp configuration post 1.20.1

2025-02-20 Thread Alexander Fedulov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Fedulov updated FLINK-37360:
--
Affects Version/s: 1.20.1
   (was: 1.20.2)

> Update japicmp configuration post 1.20.1
> 
>
> Key: FLINK-37360
> URL: https://issues.apache.org/jira/browse/FLINK-37360
> Project: Flink
>  Issue Type: Technical Debt
>Affects Versions: 1.20.1
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>
> Update the japicmp reference version and wipe exclusions / enable API 
> compatibility checks for {{@PublicEvolving}} APIs on the corresponding 
> SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see 
> below).
> For a new major release (x.y.0), run the same command also on the master 
> branch for updating the japicmp reference version and removing out-dated 
> exclusions in the japicmp configuration.
> Make sure that all Maven artifacts are already pushed to Maven Central. 
> Otherwise, there's a risk that CI fails due to missing reference artifacts.
> {code:bash}
> tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh
> tools $ cd ..
> $ git add *
> $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37361) Update japicmp configuration post 1.19.2

2025-02-20 Thread Alexander Fedulov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Fedulov updated FLINK-37361:
--
Affects Version/s: 1.19.2
   (was: 1.19.1)

> Update japicmp configuration post 1.19.2
> 
>
> Key: FLINK-37361
> URL: https://issues.apache.org/jira/browse/FLINK-37361
> Project: Flink
>  Issue Type: Technical Debt
>Affects Versions: 1.19.2
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
> Fix For: 1.19.3
>
>
> Update the japicmp reference version and wipe exclusions / enable API 
> compatibility checks for {{@PublicEvolving}} APIs on the corresponding 
> SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see 
> below).
> For a new major release (x.y.0), run the same command also on the master 
> branch for updating the japicmp reference version and removing out-dated 
> exclusions in the japicmp configuration.
> Make sure that all Maven artifacts are already pushed to Maven Central. 
> Otherwise, there's a risk that CI fails due to missing reference artifacts.
> {code:bash}
> tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh
> tools $ cd ..
> $ git add *
> $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37360) Update japicmp configuration post 1.20.1

2025-02-20 Thread Alexander Fedulov (Jira)
Alexander Fedulov created FLINK-37360:
-

 Summary: Update japicmp configuration post 1.20.1
 Key: FLINK-37360
 URL: https://issues.apache.org/jira/browse/FLINK-37360
 Project: Flink
  Issue Type: Technical Debt
Affects Versions: 1.20.2
Reporter: Alexander Fedulov
Assignee: Alexander Fedulov


Update the japicmp reference version and wipe exclusions / enable API 
compatibility checks for {{@PublicEvolving}} APIs on the corresponding SNAPSHOT 
branch with the {{update_japicmp_configuration.sh}} script (see below).

For a new major release (x.y.0), run the same command also on the master branch 
for updating the japicmp reference version and removing out-dated exclusions in 
the japicmp configuration.

Make sure that all Maven artifacts are already pushed to Maven Central. 
Otherwise, there's a risk that CI fails due to missing reference artifacts.
{code:bash}
tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh
tools $ cd ..
$ git add *
$ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963481246


##
flink-python/pyflink/table/table_pipeline.py:
##
@@ -0,0 +1,78 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from typing import Optional
+from pyflink.java_gateway import get_gateway
+from pyflink.table import ExplainDetail
+from pyflink.table.catalog import ObjectIdentifier
+from pyflink.table.table_result import TableResult
+from pyflink.util.java_utils import to_j_explain_detail_arr
+
+__all__ = ["TablePipeline"]
+
+
+class TablePipeline(object):
+"""
+Describes a complete pipeline from one or more source tables to a sink 
table.
+"""
+
+def __init__(self, j_table_pipeline, t_env):
+self._j_table_pipeline = j_table_pipeline
+self._t_env = t_env
+
+def __str__(self) -> str:
+return self._j_table_pipeline.toString()
+
+def execute(self) -> TableResult:
+"""
+Executes the table pipeline.
+
+.. versionadded:: 2.1.0
+"""
+self._t_env._before_execute()
+return TableResult(self._j_table_pipeline.execute())
+
+def explain(self, *extra_details: ExplainDetail) -> str:
+"""
+Returns the AST and the execution plan of the table pipeline.
+
+:param extra_details: The extra explain details which the explain 
result should include,
+  e.g. estimated cost, changelog mode for streaming
+:return: AST and execution plans
+
+.. versionadded:: 2.1.0
+"""
+gateway = get_gateway()
+j_extra_details = to_j_explain_detail_arr(extra_details)
+return self._j_table_pipeline.explain(
+gateway.jvm.org.apache.flink.table.api.ExplainFormat.TEXT, 
j_extra_details
+)
+
+def get_sink_identifier(self) -> Optional[ObjectIdentifier]:
+"""
+Returns the sink table's 
:class:`~pyflink.table.catalog.ObjectIdentifier`, if any.
+The result is empty for anonymous sink tables that haven't been 
registered before.

Review Comment:
   maybe point to details on how to register these types of tables. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-37356) Recycle use of kafka producer(which commit error) maybe send data without AddPartitionsToTxnRequest

2025-02-20 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928771#comment-17928771
 ] 

Hongshun Wang edited comment on FLINK-37356 at 2/20/25 12:28 PM:
-

[~arvid] just do it. 

 

> I'd avoid tinkering around with the internal state of the transactionManager

It does make sense to me! I'd like to see your redesign of kafka sink.


was (Author: JIRAUSER298968):
[~arvid] just do it. 

 

> I'd avoid tinkering around with the internal state of the transactionManager

It does make sense to me! I'd like to see your resign of kafka sink.

> Recycle use of kafka producer(which commit error) maybe send data without 
> AddPartitionsToTxnRequest
> ---
>
> Key: FLINK-37356
> URL: https://issues.apache.org/jira/browse/FLINK-37356
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.4.0
>Reporter: Hongshun Wang
>Priority: Major
>
> In my production environment,  READ_COMMITTED consumer can no longer consume 
> any records. Then I found that the LSO of the partition doesn't change for a 
> long time.  I lookup all the log in Kafka cluster, then find that there is a 
> transaction lacking AddPartitionsToTxnRequest.
>  
> At first, I think the problem is caused by 
> https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster 
> log contains InvalidTxnStateException. However, though the transaction is in 
> an invalid state,  no data is written into Kafka topic partition in this 
> transaction( because in this case, the transaction is empty). It will not 
> influence any Kafka topic partition's LSO, thus consumer won't be blocked.
>  
> Then I check the code of Kafka client, it seems no way to produce data 
> without AddPartitionsToTxnRequest done because the the `Sender` will refuse 
> to dequeue batches from the accumulator until they have been added to the 
> transaction.
> {code:java}
> // org.apache.kafka.clients.producer.KafkaProducer#doSend
> private Future doSend(ProducerRecord record, Callback 
> callback) {
> // ..ignore code
> // Add the partition to the transaction (if in progress) after it has been 
> successfully
> // appended to the accumulator. We cannot do it before because the partition 
> may be
> // unknown or the initially selected partition may be changed when the batch 
> is closed
> // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse 
> to dequeue
> // batches from the accumulator until they have been added to the transaction.
> if (transactionManager != null) {
> transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
> // ignore code
> }{code}
> {code:java}
> //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
>  
> if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 
> there is a rare case that a single batch size is larger than the request size 
> due to // compression; in this case we will still eventually send this batch 
> in a single request break; } else { if 
> (shouldStopDrainBatchesForPartition(first, tp)) break; }
> {code}
>  
> Then I have a idea that if a TransactionManager which doesn't clear 
> partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will 
> be sent again.  It maybe happen in Flink Kafka connector:
>  
>       1. The flink kafka connector also reuse and recycle KafkaProducer: 
> KafkaCommitter will recycle the producer to
> producerPool after the transaction complete or exception,  and then 
> KafkaWriter will reuse it from producerPool.
> {code:java}
> // code placeholder
> org.apache.flink.connector.kafka.sink.KafkaCommitter#commit
> @Override
> public void commit(Collection> requests)
> throws IOException, InterruptedException {
> for (CommitRequest request : requests) {
>  
> producer.commitTransaction();
> producer.flush();
> recyclable.ifPresent(Recyclable::close);
> } catch (RetriableException e) {
> request.retryLater();
> } catch (ProducerFencedException e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (InvalidTxnStateException e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (UnknownProducerIdException e) {
> LOG.error(
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (Exception e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithUnknownReason(e);
> }

Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963476837


##
flink-python/pyflink/table/table_pipeline.py:
##
@@ -0,0 +1,78 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from typing import Optional
+from pyflink.java_gateway import get_gateway
+from pyflink.table import ExplainDetail
+from pyflink.table.catalog import ObjectIdentifier
+from pyflink.table.table_result import TableResult
+from pyflink.util.java_utils import to_j_explain_detail_arr
+
+__all__ = ["TablePipeline"]
+
+
+class TablePipeline(object):
+"""
+Describes a complete pipeline from one or more source tables to a sink 
table.
+"""
+
+def __init__(self, j_table_pipeline, t_env):
+self._j_table_pipeline = j_table_pipeline
+self._t_env = t_env
+
+def __str__(self) -> str:
+return self._j_table_pipeline.toString()
+
+def execute(self) -> TableResult:
+"""
+Executes the table pipeline.
+
+.. versionadded:: 2.1.0
+"""
+self._t_env._before_execute()
+return TableResult(self._j_table_pipeline.execute())
+
+def explain(self, *extra_details: ExplainDetail) -> str:
+"""
+Returns the AST and the execution plan of the table pipeline.
+
+:param extra_details: The extra explain details which the explain 
result should include,

Review Comment:
   nit: I am not sure what the possible values are for this field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37361) Update japicmp configuration post 1.19.2

2025-02-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37361:
---
Labels: pull-request-available  (was: )

> Update japicmp configuration post 1.19.2
> 
>
> Key: FLINK-37361
> URL: https://issues.apache.org/jira/browse/FLINK-37361
> Project: Flink
>  Issue Type: Technical Debt
>Affects Versions: 1.19.2
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.3
>
>
> Update the japicmp reference version and wipe exclusions / enable API 
> compatibility checks for {{@PublicEvolving}} APIs on the corresponding 
> SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see 
> below).
> For a new major release (x.y.0), run the same command also on the master 
> branch for updating the japicmp reference version and removing out-dated 
> exclusions in the japicmp configuration.
> Make sure that all Maven artifacts are already pushed to Maven Central. 
> Otherwise, there's a risk that CI fails due to missing reference artifacts.
> {code:bash}
> tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh
> tools $ cd ..
> $ git add *
> $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-37361][release] Update japicmp configuration post 1.19.2 [flink]

2025-02-20 Thread via GitHub


afedulov opened a new pull request, #26185:
URL: https://github.com/apache/flink/pull/26185

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-37356) Recycle use of kafka producer(which commit error) maybe send data without AddPartitionsToTxnRequest

2025-02-20 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928771#comment-17928771
 ] 

Hongshun Wang commented on FLINK-37356:
---

[~arvid] just do it. I'd like to see your resign of kafka sink.

> Recycle use of kafka producer(which commit error) maybe send data without 
> AddPartitionsToTxnRequest
> ---
>
> Key: FLINK-37356
> URL: https://issues.apache.org/jira/browse/FLINK-37356
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.4.0
>Reporter: Hongshun Wang
>Priority: Major
>
> In my production environment,  READ_COMMITTED consumer can no longer consume 
> any records. Then I found that the LSO of the partition doesn't change for a 
> long time.  I lookup all the log in Kafka cluster, then find that there is a 
> transaction lacking AddPartitionsToTxnRequest.
>  
> At first, I think the problem is caused by 
> https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster 
> log contains InvalidTxnStateException. However, though the transaction is in 
> an invalid state,  no data is written into Kafka topic partition in this 
> transaction( because in this case, the transaction is empty). It will not 
> influence any Kafka topic partition's LSO, thus consumer won't be blocked.
>  
> Then I check the code of Kafka client, it seems no way to produce data 
> without AddPartitionsToTxnRequest done because the the `Sender` will refuse 
> to dequeue batches from the accumulator until they have been added to the 
> transaction.
> {code:java}
> // org.apache.kafka.clients.producer.KafkaProducer#doSend
> private Future doSend(ProducerRecord record, Callback 
> callback) {
> // ..ignore code
> // Add the partition to the transaction (if in progress) after it has been 
> successfully
> // appended to the accumulator. We cannot do it before because the partition 
> may be
> // unknown or the initially selected partition may be changed when the batch 
> is closed
> // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse 
> to dequeue
> // batches from the accumulator until they have been added to the transaction.
> if (transactionManager != null) {
> transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
> // ignore code
> }{code}
> {code:java}
> //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
>  
> if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 
> there is a rare case that a single batch size is larger than the request size 
> due to // compression; in this case we will still eventually send this batch 
> in a single request break; } else { if 
> (shouldStopDrainBatchesForPartition(first, tp)) break; }
> {code}
>  
> Then I have a idea that if a TransactionManager which doesn't clear 
> partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will 
> be sent again.  It maybe happen in Flink Kafka connector:
>  
>       1. The flink kafka connector also reuse and recycle KafkaProducer: 
> KafkaCommitter will recycle the producer to
> producerPool after the transaction complete or exception,  and then 
> KafkaWriter will reuse it from producerPool.
> {code:java}
> // code placeholder
> org.apache.flink.connector.kafka.sink.KafkaCommitter#commit
> @Override
> public void commit(Collection> requests)
> throws IOException, InterruptedException {
> for (CommitRequest request : requests) {
>  
> producer.commitTransaction();
> producer.flush();
> recyclable.ifPresent(Recyclable::close);
> } catch (RetriableException e) {
> request.retryLater();
> } catch (ProducerFencedException e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (InvalidTxnStateException e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (UnknownProducerIdException e) {
> LOG.error(
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (Exception e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithUnknownReason(e);
> }
> }
> } {code}
>      2. If KafkaCommitter meet an exception and doesn't sucess to 
> commitTransaction, the partitionsInTransaction in 
> TransactionManager won't be 
> clear(org.apache.kafka.clients.producer.internals.TransactionManager#completeTransaction).
>  
>    3. If KafkaWriter which reuse same producer and send data to  same 
> partitions in next transact

[jira] [Updated] (FLINK-37360) Update japicmp configuration post 1.20.1

2025-02-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37360:
---
Labels: pull-request-available  (was: )

> Update japicmp configuration post 1.20.1
> 
>
> Key: FLINK-37360
> URL: https://issues.apache.org/jira/browse/FLINK-37360
> Project: Flink
>  Issue Type: Technical Debt
>Affects Versions: 1.20.1
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.2
>
>
> Update the japicmp reference version and wipe exclusions / enable API 
> compatibility checks for {{@PublicEvolving}} APIs on the corresponding 
> SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see 
> below).
> For a new major release (x.y.0), run the same command also on the master 
> branch for updating the japicmp reference version and removing out-dated 
> exclusions in the japicmp configuration.
> Make sure that all Maven artifacts are already pushed to Maven Central. 
> Otherwise, there's a risk that CI fails due to missing reference artifacts.
> {code:bash}
> tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh
> tools $ cd ..
> $ git add *
> $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37359] Pin qemu image to 7.0.0 [flink-kubernetes-operator]

2025-02-20 Thread via GitHub


gyfora merged PR #946:
URL: https://github.com/apache/flink-kubernetes-operator/pull/946


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-37362) PolardbxCharsetITCase test has timed out after 60 minutes

2025-02-20 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-37362:
--

 Summary: PolardbxCharsetITCase test has timed out after 60 minutes
 Key: FLINK-37362
 URL: https://issues.apache.org/jira/browse/FLINK-37362
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.4.0
Reporter: Leonard Xu
 Fix For: cdc-3.4.0


Failed instance: 
https://github.com/apache/flink-cdc/actions/runs/13390330565/job/37400625838


{code:java}
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 160.571 
s - in org.apache.flink.cdc.connectors.polardbx.PolardbxSourceITCase
[INFO] Running org.apache.flink.cdc.connectors.polardbx.PolardbxCharsetITCase
Error: The action 'Compile and test' has timed out after 60 minutes.
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-37359) Docker build fails for Kubernetes operator

2025-02-20 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-37359.
--
Fix Version/s: kubernetes-operator-1.11.0
   Resolution: Fixed

merged to main 3a2cfdd916c475ee1ccb63fe42f5cfeb3fe912fb

 

> Docker build fails for Kubernetes operator
> --
>
> Key: FLINK-37359
> URL: https://issues.apache.org/jira/browse/FLINK-37359
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.11.0
>
>
> All docker build started to fail with the following error:
> {noformat}
> 20.45 Setting up libc-bin (2.35-0ubuntu3.9) ...
> 20.59 qemu: uncaught target signal 11 (Segmentation fault) - core dumped
> 20.95 Segmentation fault (core dumped)
> 21.00 qemu: uncaught target signal 11 (Segmentation fault) - core dumped
> 21.37 Segmentation fault (core dumped)
> Sub-process /usr/bin/dpkg returned an error code (1)
> --
> WARNING: No output specified for bake-platform target(s) with 
> docker-container driver. Build result will only remain in the build cache. To 
> push result image into registry use --push or to load image into docker use 
> --load
> Dockerfile:68
> 
>   66 | # Updating Debian
>   67 | RUN if [ "$SKIP_OS_UPDATE" = "false" ]; then apt-get update; fi
>   68 | >>> RUN if [ "$SKIP_OS_UPDATE" = "false" ]; then apt-get upgrade -y; fi
>   69 | 
>   70 | ARG DISABLE_JEMALLOC=false
> 
> ERROR: failed to solve: process "/bin/sh -c if [ \"$SKIP_OS_UPDATE\" = 
> \"false\" ]; then apt-get upgrade -y; fi" did not complete successfully: exit 
> code: 100{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37363) Remove confusing and duplicated state.backend.type config from state SQL connector

2025-02-20 Thread Gabor Somogyi (Jira)
Gabor Somogyi created FLINK-37363:
-

 Summary: Remove confusing and duplicated state.backend.type config 
from state SQL connector
 Key: FLINK-37363
 URL: https://issues.apache.org/jira/browse/FLINK-37363
 Project: Flink
  Issue Type: Improvement
  Components: API / State Processor
Reporter: Gabor Somogyi
Assignee: Gabor Somogyi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37152][build] Update Flink version to 1.20 [flink-cdc]

2025-02-20 Thread via GitHub


leonardBang commented on PR #3868:
URL: https://github.com/apache/flink-cdc/pull/3868#issuecomment-2671490111

   Failed case tracked via https://issues.apache.org/jira/browse/FLINK-37362


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]

2025-02-20 Thread via GitHub


beryllw commented on code in PR #3856:
URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1963566316


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java:
##
@@ -313,4 +313,12 @@ public class MySqlDataSourceOptions {
 .defaultValue(true)
 .withDescription(
 "Whether to use legacy json format. The default 
value is true, which means there is no whitespace before value and after comma 
in json format.");
+
+@Experimental
+public static final ConfigOption 
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
+
ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled")

Review Comment:
   IIUC, should it be 
`ConfigOptions.key("scan.incremental.snapshot.unboundary-chunk-first.enabled")`?
 The end is the largest and the start is the smallest unbounded chunks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963448829


##
flink-python/pyflink/table/catalog.py:
##
@@ -1391,3 +1391,87 @@ def of(catalog_name: str, configuration: Configuration, 
comment: str = None):
 j_catalog_descriptor = 
gateway.jvm.org.apache.flink.table.catalog.CatalogDescriptor.of(
 catalog_name, configuration._j_configuration, comment)
 return CatalogDescriptor(j_catalog_descriptor)
+
+
+class ObjectIdentifier(object):
+"""
+Identifies an object in a catalog. It allows to identify objects such as 
tables, views,
+function, or types in a catalog. An identifier must be fully qualified. It 
is the
+responsibility of the catalog manager to resolve an identifier to an 
object.
+
+While Path :class:`ObjectPath` is used within the same catalog, instances 
of this class can be
+used across catalogs.
+
+Two objects are considered equal if they share the same object identifier 
in a stable session
+context.
+"""
+
+_UNKNOWN = ""
+
+def __init__(self, j_object_identifier):
+self._j_object_identifier = j_object_identifier
+
+def __str__(self):
+return self._j_object_identifier.toString()
+
+def __hash__(self):
+return self._j_object_identifier.hashCode()
+
+def __eq__(self, other):
+return isinstance(other, self.__class__) and 
self._j_object_identifier.equals(
+other._j_object_identifier
+)
+
+@staticmethod
+def of(catalog_name: str, database_name: str, object_name: str) -> 
"ObjectIdentifier":
+assert catalog_name is not None, "Catalog name must not be null."
+assert database_name is not None, "Database name must not be null."
+assert object_name is not None, "Object name must not be null."
+
+if catalog_name == ObjectIdentifier._UNKNOWN or database_name == 
ObjectIdentifier._UNKNOWN:
+raise ValueError(f"Catalog or database cannot be named 
{ObjectIdentifier._UNKNOWN}")
+else:
+gateway = get_gateway()
+j_object_identifier = 
gateway.jvm.org.apache.flink.table.catalog.ObjectIdentifier.of(
+catalog_name, database_name, object_name
+)
+return ObjectIdentifier(j_object_identifier=j_object_identifier)
+
+def get_catalog_name(self) -> str:
+return self._j_object_identifier.getCatalogName()
+
+def get_database_name(self) -> str:
+return self._j_object_identifier.getDatabaseName()
+
+def get_object_name(self) -> str:
+return self._j_object_identifier.getObjectName()
+
+def to_object_path(self) -> ObjectPath:
+"""
+Convert this :class:`ObjectIdentifier` to :class:`ObjectPath`.
+
+Throws a TableException if the identifier cannot be converted.
+"""
+j_object_path = self._j_object_identifier.toObjectPath()
+return ObjectPath(j_object_path=j_object_path)
+
+def to_list(self) -> List[str]:
+"""
+List of the component names of this object identifier.
+"""
+return self._j_object_identifier.toList()
+
+def as_serializable_string(self) -> str:
+"""
+Returns a string that fully serializes this instance. The serialized 
string can be used for
+transmitting or persisting an object identifier.
+
+Throws a TableException if the identifier cannot be serialized.

Review Comment:
   So this means it might not always be a Table Exception - as it could be 
mapped to a CatalogException  or one of the others in 
[https://github.com/apache/flink/blob/master/flink-python/pyflink/util/exceptions.py#L126:L127](https://github.com/apache/flink/blob/master/flink-python/pyflink/util/exceptions.py#L126:L127)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-37356) Recycle use of kafka producer(which commit error) maybe send data without AddPartitionsToTxnRequest

2025-02-20 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928771#comment-17928771
 ] 

Hongshun Wang edited comment on FLINK-37356 at 2/20/25 12:27 PM:
-

[~arvid] just do it. 

 

> I'd avoid tinkering around with the internal state of the transactionManager

It does make sense to me! I'd like to see your resign of kafka sink.


was (Author: JIRAUSER298968):
[~arvid] just do it. I'd like to see your resign of kafka sink.

> Recycle use of kafka producer(which commit error) maybe send data without 
> AddPartitionsToTxnRequest
> ---
>
> Key: FLINK-37356
> URL: https://issues.apache.org/jira/browse/FLINK-37356
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.4.0
>Reporter: Hongshun Wang
>Priority: Major
>
> In my production environment,  READ_COMMITTED consumer can no longer consume 
> any records. Then I found that the LSO of the partition doesn't change for a 
> long time.  I lookup all the log in Kafka cluster, then find that there is a 
> transaction lacking AddPartitionsToTxnRequest.
>  
> At first, I think the problem is caused by 
> https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster 
> log contains InvalidTxnStateException. However, though the transaction is in 
> an invalid state,  no data is written into Kafka topic partition in this 
> transaction( because in this case, the transaction is empty). It will not 
> influence any Kafka topic partition's LSO, thus consumer won't be blocked.
>  
> Then I check the code of Kafka client, it seems no way to produce data 
> without AddPartitionsToTxnRequest done because the the `Sender` will refuse 
> to dequeue batches from the accumulator until they have been added to the 
> transaction.
> {code:java}
> // org.apache.kafka.clients.producer.KafkaProducer#doSend
> private Future doSend(ProducerRecord record, Callback 
> callback) {
> // ..ignore code
> // Add the partition to the transaction (if in progress) after it has been 
> successfully
> // appended to the accumulator. We cannot do it before because the partition 
> may be
> // unknown or the initially selected partition may be changed when the batch 
> is closed
> // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse 
> to dequeue
> // batches from the accumulator until they have been added to the transaction.
> if (transactionManager != null) {
> transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
> // ignore code
> }{code}
> {code:java}
> //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
>  
> if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 
> there is a rare case that a single batch size is larger than the request size 
> due to // compression; in this case we will still eventually send this batch 
> in a single request break; } else { if 
> (shouldStopDrainBatchesForPartition(first, tp)) break; }
> {code}
>  
> Then I have a idea that if a TransactionManager which doesn't clear 
> partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will 
> be sent again.  It maybe happen in Flink Kafka connector:
>  
>       1. The flink kafka connector also reuse and recycle KafkaProducer: 
> KafkaCommitter will recycle the producer to
> producerPool after the transaction complete or exception,  and then 
> KafkaWriter will reuse it from producerPool.
> {code:java}
> // code placeholder
> org.apache.flink.connector.kafka.sink.KafkaCommitter#commit
> @Override
> public void commit(Collection> requests)
> throws IOException, InterruptedException {
> for (CommitRequest request : requests) {
>  
> producer.commitTransaction();
> producer.flush();
> recyclable.ifPresent(Recyclable::close);
> } catch (RetriableException e) {
> request.retryLater();
> } catch (ProducerFencedException e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (InvalidTxnStateException e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (UnknownProducerIdException e) {
> LOG.error(
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (Exception e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithUnknownReason(e);
> }
> }
> } {code}
>      2. If KafkaCommitter meet an exception and doesn't sucess to 
> commitTransaction, the 

Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]

2025-02-20 Thread via GitHub


leonardBang commented on code in PR #3856:
URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1963519196


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java:
##
@@ -313,4 +313,12 @@ public class MySqlDataSourceOptions {
 .defaultValue(true)
 .withDescription(
 "Whether to use legacy json format. The default 
value is true, which means there is no whitespace before value and after comma 
in json format.");
+
+@Experimental
+public static final ConfigOption 
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
+
ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled")

Review Comment:
   How about ```suggestion
   
ConfigOptions.key("scan.incremental.snapshot.boundary-chunk-first.enabled")
   ```? IIUC, this options is used to control the boundary chunks should be 
assigned first or not?
   



##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##
@@ -470,7 +470,13 @@ private List splitEvenlySizedChunks(
 }
 }
 // add the ending split
-splits.add(ChunkRange.of(chunkStart, null));
+// assign ending split first, both the largest and smallest unbounded 
chunks are completed
+// in the first two splits
+if (sourceConfig.isAssignEndingChunkFirst()) {
+splits.add(0, ChunkRange.of(chunkStart, null));

Review Comment:
   Could you consider `  System.arraycopy` for performance ? Currently we need  
move n-1 elements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36900] Migrate from conda to uv for managing Python environments [flink-connector-shared-utils]

2025-02-20 Thread via GitHub


autophagy commented on code in PR #43:
URL: 
https://github.com/apache/flink-connector-shared-utils/pull/43#discussion_r1963579142


##
python/lint-python.sh:
##
@@ -377,73 +375,62 @@ function install_environment() {
 print_function "STAGE" "installing environment"
 
 local sys_os=`uname -s`
-#get the index of the SUPPORT_OS array for convinient to intall tool.
+#get the index of the SUPPORT_OS array for convenient to install tool.
 get_os_index $sys_os
 local os_index=$?
 
-# step-1 install wget
-# the file size of the miniconda.sh is too big to use "wget" tool to 
download instead
-# of the "curl" in the weak network environment.
+# step-1 install uv
 if [ $STEP -lt 1 ]; then
-print_function "STEP" "installing wget..."
-install_wget ${SUPPORT_OS[$os_index]}
-STEP=1
-checkpoint_stage $STAGE $STEP
-print_function "STEP" "install wget... [SUCCESS]"
-fi
-
-# step-2 install miniconda
-if [ $STEP -lt 2 ]; then
-print_function "STEP" "installing miniconda..."
+print_function "STEP" "installing uv..."
 create_dir $CURRENT_DIR/download
-install_miniconda $os_index
-STEP=2
+install_uv
+STEP=1
 checkpoint_stage $STAGE $STEP
-print_function "STEP" "install miniconda... [SUCCESS]"
+print_function "STEP" "install uv... [SUCCESS]"
 fi
 
-# step-3 install python environment which includes
+# step-2 install python environment which includes
 # 3.7 3.8 3.9 3.10
-if [ $STEP -lt 3 ] && [ `need_install_component "py_env"` = true ]; then
+if [ $STEP -lt 2 ] && [ `need_install_component "py_env"` = true ]; then
 print_function "STEP" "installing python environment..."
 install_py_env
-STEP=3
+STEP=2
 checkpoint_stage $STAGE $STEP
 print_function "STEP" "install python environment... [SUCCESS]"
 fi
 
-# step-4 install tox
-if [ $STEP -lt 4 ] && [ `need_install_component "tox"` = true ]; then
+# step-3 install tox
+if [ $STEP -lt 3 ] && [ `need_install_component "tox"` = true ]; then
 print_function "STEP" "installing tox..."
 install_tox
-STEP=4
+STEP=3
 checkpoint_stage $STAGE $STEP
 print_function "STEP" "install tox... [SUCCESS]"
 fi
 
-# step-5 install  flake8
-if [ $STEP -lt 5 ] && [ `need_install_component "flake8"` = true ]; then
+# step-4 install  flake8
+if [ $STEP -lt 4 ] && [ `need_install_component "flake8"` = true ]; then
 print_function "STEP" "installing flake8..."
 install_flake8
-STEP=5
+STEP=4
 checkpoint_stage $STAGE $STEP
 print_function "STEP" "install flake8... [SUCCESS]"
 fi
 
-# step-6 install sphinx
-if [ $STEP -lt 6 ] && [ `need_install_component "sphinx"` = true ]; then
+# step-5 install sphinx
+if [ $STEP -lt 5 ] && [ `need_install_component "sphinx"` = true ]; then
 print_function "STEP" "installing sphinx..."
 install_sphinx
-STEP=6
+STEP=5
 checkpoint_stage $STAGE $STEP
 print_function "STEP" "install sphinx... [SUCCESS]"
 fi
 
-# step-7 install mypy
-if [[ ${STEP} -lt 7 ]] && [[ `need_install_component "mypy"` = true ]]; 
then
+# step-5 install mypy

Review Comment:
   Oops, yes, good catch 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36594][hive]HiveCatalog should set HiveConf.hiveSiteLocation back [flink]

2025-02-20 Thread via GitHub


slankka commented on PR #25568:
URL: https://github.com/apache/flink/pull/25568#issuecomment-2671535475

   closed in favor of https://github.com/apache/flink-connector-hive/pull/32


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36594][hive]HiveCatalog should set HiveConf.hiveSiteLocation back [flink]

2025-02-20 Thread via GitHub


slankka closed pull request #25568: [FLINK-36594][hive]HiveCatalog should set 
HiveConf.hiveSiteLocation back
URL: https://github.com/apache/flink/pull/25568


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963472704


##
flink-python/pyflink/table/table.py:
##
@@ -1077,6 +1078,91 @@ def explain(self, *extra_details: ExplainDetail) -> str:
 j_extra_details = to_j_explain_detail_arr(extra_details)
 return self._j_table.explain(TEXT, j_extra_details)
 
+def insert_into(
+self, table_path_or_descriptor: Union[str, TableDescriptor], 
overwrite: bool = False
+) -> TablePipeline:
+"""
+When ``target_path_or_descriptor`` is a table path:
+
+Declares that the pipeline defined by the given :class:`Table` 
(backed by a
+DynamicTableSink) should be written to a table that was registered 
under the specified
+path.
+
+See the documentation of
+
:func:`pyflink.table.table_environment.TableEnvironment.use_database` or
+
:func:`pyflink.table.table_environment.TableEnvironment.use_catalog` for the 
rules on
+the path resolution.
+
+Example:
+::
+
+>>> table = table_env.sql_query("SELECTFROM MyTable")
+>>> table_pipeline = 
table.insert_into_table_path("MySinkTable", True)
+>>> table_result = table_pipeline.execute().wait()
+
+When ``target_path_or_descriptor`` is a  
:class:`~pyflink.table.TableDescriptor` :
+
+Declares that the pipeline defined by the given :class:`Table` 
object should be written
+to a table (backed by a DynamicTableSink) expressed via the given
+:class:`~pyflink.table.TableDescriptor`.
+
+The descriptor won't be registered in the catalog, but it will be 
propagated directly
+in the operation tree. Note that calling this method multiple 
times, even with the same
+descriptor, results in multiple sink tables instances.
+
+This method allows to declare a :class:`~pyflink.table.Schema` for 
the sink descriptor.
+The declaration is similar to a ``CREATE TABLE`` DDL in SQL and 
allows to:

Review Comment:
   maybe change this to. Specifying a schema asserts a structure to the table. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963465316


##
flink-python/pyflink/table/table.py:
##
@@ -1077,6 +1078,91 @@ def explain(self, *extra_details: ExplainDetail) -> str:
 j_extra_details = to_j_explain_detail_arr(extra_details)
 return self._j_table.explain(TEXT, j_extra_details)
 
+def insert_into(
+self, table_path_or_descriptor: Union[str, TableDescriptor], 
overwrite: bool = False
+) -> TablePipeline:
+"""
+When ``target_path_or_descriptor`` is a table path:
+
+Declares that the pipeline defined by the given :class:`Table` 
(backed by a
+DynamicTableSink) should be written to a table that was registered 
under the specified
+path.
+
+See the documentation of
+
:func:`pyflink.table.table_environment.TableEnvironment.use_database` or
+
:func:`pyflink.table.table_environment.TableEnvironment.use_catalog` for the 
rules on
+the path resolution.
+
+Example:
+::
+
+>>> table = table_env.sql_query("SELECTFROM MyTable")

Review Comment:
   "SELECTFROM MyTable" does not look like valid SQL. I was expecting something 
like `SELECT * FROM MyTable; `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963472704


##
flink-python/pyflink/table/table.py:
##
@@ -1077,6 +1078,91 @@ def explain(self, *extra_details: ExplainDetail) -> str:
 j_extra_details = to_j_explain_detail_arr(extra_details)
 return self._j_table.explain(TEXT, j_extra_details)
 
+def insert_into(
+self, table_path_or_descriptor: Union[str, TableDescriptor], 
overwrite: bool = False
+) -> TablePipeline:
+"""
+When ``target_path_or_descriptor`` is a table path:
+
+Declares that the pipeline defined by the given :class:`Table` 
(backed by a
+DynamicTableSink) should be written to a table that was registered 
under the specified
+path.
+
+See the documentation of
+
:func:`pyflink.table.table_environment.TableEnvironment.use_database` or
+
:func:`pyflink.table.table_environment.TableEnvironment.use_catalog` for the 
rules on
+the path resolution.
+
+Example:
+::
+
+>>> table = table_env.sql_query("SELECTFROM MyTable")
+>>> table_pipeline = 
table.insert_into_table_path("MySinkTable", True)
+>>> table_result = table_pipeline.execute().wait()
+
+When ``target_path_or_descriptor`` is a  
:class:`~pyflink.table.TableDescriptor` :
+
+Declares that the pipeline defined by the given :class:`Table` 
object should be written
+to a table (backed by a DynamicTableSink) expressed via the given
+:class:`~pyflink.table.TableDescriptor`.
+
+The descriptor won't be registered in the catalog, but it will be 
propagated directly
+in the operation tree. Note that calling this method multiple 
times, even with the same
+descriptor, results in multiple sink tables instances.
+
+This method allows to declare a :class:`~pyflink.table.Schema` for 
the sink descriptor.
+The declaration is similar to a ``CREATE TABLE`` DDL in SQL and 
allows to:

Review Comment:
   maybe change this to. Specifying a schema asserts a structure to the table 
and can be used to:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]

2025-02-20 Thread via GitHub


davidradl commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1963443638


##
flink-python/pyflink/table/catalog.py:
##
@@ -1391,3 +1391,87 @@ def of(catalog_name: str, configuration: Configuration, 
comment: str = None):
 j_catalog_descriptor = 
gateway.jvm.org.apache.flink.table.catalog.CatalogDescriptor.of(
 catalog_name, configuration._j_configuration, comment)
 return CatalogDescriptor(j_catalog_descriptor)
+
+
+class ObjectIdentifier(object):
+"""
+Identifies an object in a catalog. It allows to identify objects such as 
tables, views,

Review Comment:
   nit: change to match the other doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >