Re: [PR] [FLINK-36194][Runtime] Handle the Graceful close of ExecutionGraphInfoStore from ClusterEntrypoint [flink]

2024-10-21 Thread via GitHub


flinkbot commented on PR #25553:
URL: https://github.com/apache/flink/pull/25553#issuecomment-2426917163

   
   ## CI report:
   
   * af6861f7c46e6033e5b9afea5f5fce703be10c4c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36194][Runtime] Handle the Graceful close of ExecutionGraphInfoStore from ClusterEntrypoint [flink]

2024-10-21 Thread via GitHub


eaugene opened a new pull request, #25553:
URL: https://github.com/apache/flink/pull/25553

   
   
   
   ## What is the purpose of the change
   This fixes the race-condition of writing to a closed file in the cluster 
shutdown is triggered 
   
   
   ## Brief change log
   - removing the shutdown hook from `ExecutionGraphInfoStore` and waiting for 
the `ClusterEntrypoint` ( which holds an instance of  `ExecutionGraphInfoStore` 
) to close it
   
   
   ## Verifying this change
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36194) Shutdown hook for ExecutionGraphInfo store runs concurrently to cluster shutdown hook causing race conditions

2024-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36194:
---
Labels: pull-request-available starter  (was: starter)

> Shutdown hook for ExecutionGraphInfo store runs concurrently to cluster 
> shutdown hook causing race conditions
> -
>
> Key: FLINK-36194
> URL: https://issues.apache.org/jira/browse/FLINK-36194
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Affects Versions: 2.0.0, 1.20.0, 1.19.1
>Reporter: Matthias Pohl
>Assignee: Eaugene Thomas
>Priority: Minor
>  Labels: pull-request-available, starter
>
> There is an {{FileNotFoundException}} being logged when shutting down the 
> cluster with currently running jobs:
> {code}
> /tmp/executionGraphStore-b2cb1190-2c4d-4021-a73d-8b15027860df/8f6abf294a46345d331590890f7e7c37
>  (No such file or directory)
> java.io.FileNotFoundException: 
> /tmp/executionGraphStore-b2cb1190-2c4d-4021-a73d-8b15027860df/8f6abf294a46345d331590890f7e7c37
>  (No such file or directory)
>   at java.base/java.io.FileOutputStream.open0(Native Method)
>   at java.base/java.io.FileOutputStream.open(FileOutputStream.java:298)
>   at java.base/java.io.FileOutputStream.(FileOutputStream.java:237)
>   at java.base/java.io.FileOutputStream.(FileOutputStream.java:187)
>   at 
> org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore.storeExecutionGraphInfo(FileExecutionGraphInfoStore.java:281)
>   at 
> org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore.put(FileExecutionGraphInfoStore.java:203)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.writeToExecutionGraphInfoStore(Dispatcher.java:1427)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedTerminalState(Dispatcher.java:1357)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:750)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$6(Dispatcher.java:700)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>   at 
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> [...]
> {code}
> This is caused by concurrent shutdown logic being triggered through the 
> {{FileExecutionGraphInfoStore}} shutdown hook. The shutdown hook calls close 
> on the store which will delete its temporary directory. 
> The concurrently performed cluster shutdown will try to suspend all running 
> jobs. The JobManagerRunners are trying to write their {{ExecutionGraphInfo}} 
> to the store which fails (because the temporary folder is deleted).
> This doesn't have any impact because the JobManager goes away, anyway. But 
> the log message is confusing the the shutdown hook is (IMHO) not needed. 
> Instead, the {{ExecutionGraphInfoStore}}'s close logic should be called by 
> the {{ClusterEntrypoint}} shutdown gracefully.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35136] Bump connector version to 4.0, adapt CI workflows [flink-connector-hbase]

2024-10-21 Thread via GitHub


JuliaBogdan commented on PR #46:
URL: 
https://github.com/apache/flink-connector-hbase/pull/46#issuecomment-2427101306

   > > Hi, @ferenc-csaky. I'm interested in the connector for Flink > 1.18 and 
glad to see this PR, thank you for the work! I wonder if there is there 
anything that is blocking the release?
   > 
   > Hi @JuliaBogdan, unfortunately yes, it is not trivial to release the 
connector as is against 1.18+, because some Hadoop elements are not playing 
well with JDK17+, so this will be more work than I originally anticipated. I 
will initiate a discussion to releasethe HBase connector for Flink 1.18 and 
1.19 for only JDK8 and JDK11 to at least start to bridge the current gap.
   > 
   > Anything that comes afterwards probably we will require to bump all 
Hadoop-related deps in this repo.
   
   I see, thanks for the quick response!
   Having the connector released for JDK11 would be super useful for our use 
case too. Looking forward to an outcome of the discussion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36065][runtime] Support submit stream graph. [flink]

2024-10-21 Thread via GitHub


zhuzhurk commented on code in PR #25472:
URL: https://github.com/apache/flink/pull/25472#discussion_r1808402242


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java:
##
@@ -80,6 +83,14 @@ public SchedulerNG createInstance(
 final Collection failureEnrichers,
 final BlocklistOperations blocklistOperations)
 throws Exception {
+JobGraph jobGraph;
+
+if (executionPlan instanceof JobGraph) {
+jobGraph = (JobGraph) executionPlan;
+} else {
+checkState(executionPlan instanceof StreamGraph, "Unsupported 
execution plan.");

Review Comment:
   maybe
   ```
   } else if (executionPlan instanceof StreamGraph) {
   ...
   } else {
   throw FlinkException("Unsupported execution plan " + 
executionPlan.getClass().getCanonicalName());
   }
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##
@@ -1066,6 +1067,14 @@ public CompletableFuture 
submitJob(ExecutionPlan executionP
 // When MiniCluster uses the local RPC, the provided ExecutionPlan is 
passed directly to the
 // Dispatcher. This means that any mutations to the JG can affect the 
Dispatcher behaviour,
 // so we rather clone it to guard against this.

Review Comment:
   Comments above are for the original line.



##
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##
@@ -169,10 +225,15 @@ public ExecutionConfig getExecutionConfig() {
 return executionConfig;
 }
 
+@Override
 public Configuration getJobConfiguration() {
 return jobConfiguration;
 }
 
+public void setJobConfiguration(Configuration configuration) {

Review Comment:
   Is this method required? If not, we can keep `jobConfiguration` final.



##
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##
@@ -1081,4 +1269,345 @@ public void setAttribute(Integer vertexId, Attribute 
attribute) {
 getStreamNode(vertexId).setAttribute(attribute);
 }
 }
+
+public void setJobId(JobID jobId) {
+this.jobId = jobId;
+}
+
+@Override
+public JobID getJobID() {
+return jobId;
+}
+
+/**
+ * Sets the classpath required to run the job on a task manager.
+ *
+ * @param paths paths of the directories/JAR files required to run the job 
on a task manager
+ */
+public void setClasspath(List paths) {
+classpath = paths;
+}
+
+public List getClasspath() {
+return classpath;
+}
+
+/**
+ * Adds the given jar files to the {@link JobGraph} via {@link 
JobGraph#addJar}.
+ *
+ * @param jarFilesToAttach a list of the {@link URL URLs} of the jar files 
to attach to the
+ * jobgraph.
+ * @throws RuntimeException if a jar URL is not valid.
+ */
+public void addJars(final List jarFilesToAttach) {
+for (URL jar : jarFilesToAttach) {
+try {
+addJar(new Path(jar.toURI()));
+} catch (URISyntaxException e) {
+throw new RuntimeException("URL is invalid. This should not 
happen.", e);
+}
+}
+}
+
+/**
+ * Returns a list of BLOB keys referring to the JAR files required to run 
this job.
+ *
+ * @return list of BLOB keys referring to the JAR files required to run 
this job
+ */
+@Override
+public List getUserJarBlobKeys() {
+return this.userJarBlobKeys;
+}
+
+@Override
+public List getClasspaths() {
+return classpath;
+}
+
+public void addUserArtifact(String name, 
DistributedCache.DistributedCacheEntry file) {
+if (file == null) {
+throw new IllegalArgumentException();
+}
+
+userArtifacts.putIfAbsent(name, file);
+}
+
+@Override
+public Map 
getUserArtifacts() {
+return userArtifacts;
+}
+
+@Override
+public void addUserJarBlobKey(PermanentBlobKey key) {
+if (key == null) {
+throw new IllegalArgumentException();
+}
+
+if (!userJarBlobKeys.contains(key)) {
+userJarBlobKeys.add(key);
+}
+}
+
+@Override
+public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey)
+throws IOException {
+byte[] serializedBlobKey;
+serializedBlobKey = InstantiationUtil.serializeObject(blobKey);
+
+userArtifacts.computeIfPresent(
+entryName,
+(key, originalEntry) ->
+new DistributedCache.DistributedCacheEntry(
+originalEntry.filePath,
+originalEntry.isExecutable,
+serializedBlobKey,
+originalEntry.isZipped));
+}
+
+@Override
+public 

Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 (backport to release-1.20) [flink]

2024-10-21 Thread via GitHub


ferenc-csaky commented on code in PR #25550:
URL: https://github.com/apache/flink/pull/25550#discussion_r1808220796


##
flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE:
##
@@ -6,17 +6,17 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- com.hierynomus:asn-one:0.5.0
+- com.hierynomus:asn-one:0.6.0
 - com.typesafe:config:1.4.2
 - com.typesafe:ssl-config-core_2.12:0.6.1
-- io.netty:netty:3.10.6.Final
-- org.agrona:agrona:1.15.1
-- org.apache.pekko:pekko-actor_2.12:1.0.1
-- org.apache.pekko:pekko-remote_2.12:1.0.1
-- org.apache.pekko:pekko-pki_2.12:1.0.1
-- org.apache.pekko:pekko-protobuf-v3_2.12:1.0.1
-- org.apache.pekko:pekko-slf4j_2.12:1.0.1
-- org.apache.pekko:pekko-stream_2.12:1.0.1
+- io.netty:netty-all:4.1.100.Final

Review Comment:
   This should be changed to `4.1.91.Final` according to the [root 
POM](https://github.com/apache/flink/blob/60cba350d7638592ea771dc7cf512798e6248886/pom.xml#L369C14-L369C26).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 (backport to release-1.20) [flink]

2024-10-21 Thread via GitHub


ferenc-csaky commented on PR #25550:
URL: https://github.com/apache/flink/pull/25550#issuecomment-2425809842

   1.20 backport of https://github.com/apache/flink/pull/25494, will merge them 
together.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][tests] Separate Pipeline and Source E2e tests [flink-cdc]

2024-10-21 Thread via GitHub


yuxiqian closed pull request #3427: [hotfix][tests] Separate Pipeline and 
Source E2e tests
URL: https://github.com/apache/flink-cdc/pull/3427


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][tests] Separate Pipeline and Source E2e tests [flink-cdc]

2024-10-21 Thread via GitHub


yuxiqian commented on PR #3427:
URL: https://github.com/apache/flink-cdc/pull/3427#issuecomment-2425814837

   Fixed in #3514


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 (backport to release-1.20) [flink]

2024-10-21 Thread via GitHub


gracegrimwood commented on code in PR #25550:
URL: https://github.com/apache/flink/pull/25550#discussion_r1808227768


##
flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE:
##
@@ -6,17 +6,17 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- com.hierynomus:asn-one:0.5.0
+- com.hierynomus:asn-one:0.6.0
 - com.typesafe:config:1.4.2
 - com.typesafe:ssl-config-core_2.12:0.6.1
-- io.netty:netty:3.10.6.Final
-- org.agrona:agrona:1.15.1
-- org.apache.pekko:pekko-actor_2.12:1.0.1
-- org.apache.pekko:pekko-remote_2.12:1.0.1
-- org.apache.pekko:pekko-pki_2.12:1.0.1
-- org.apache.pekko:pekko-protobuf-v3_2.12:1.0.1
-- org.apache.pekko:pekko-slf4j_2.12:1.0.1
-- org.apache.pekko:pekko-stream_2.12:1.0.1
+- io.netty:netty-all:4.1.100.Final

Review Comment:
   Nice catch! Changed this now :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36576][runtime] Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider [flink]

2024-10-21 Thread via GitHub


flinkbot commented on PR #25552:
URL: https://github.com/apache/flink/pull/25552#issuecomment-2425962755

   
   ## CI report:
   
   * 5765848fe5d10d5ea875a723f1683645c9efd9bd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36575][runtime] ExecutionVertexInputInfo supports consuming subpartition groups [flink]

2024-10-21 Thread via GitHub


noorall opened a new pull request, #25551:
URL: https://github.com/apache/flink/pull/25551

   ## What is the purpose of the change
   
   Currently, the ExecutionVertexInputInfo describes the task's input through a 
combination of a PartitionRange and a SubpartitionRange. However, in the case 
of skewed join optimization, we need to split a group of data corresponding to 
the specific key, which may result in a downstream task subscribing to multiple 
combinations of PartitionRanges and SubpartitionRanges.
   
   Therefore, we need to modify the ExecutionVertexInputInfo to describe the 
input data as multiple combinations of PartitionRanges and SubpartitionRanges 
to meet the requirements of the aforementioned scenario and improve Flink's 
flexibility in describing the task's inputs.
   
   
   ## Brief change log
   
 - Modify the description of the input in ExecutionVertexInputInfo.
 - Modify the connect function for edges.
 - Modify the InputGate creation logic of the network layer to adapt to 
this change.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36575) ExecutionVertexInputInfo supports consuming subpartition groups

2024-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36575:
---
Labels: pull-request-available  (was: )

> ExecutionVertexInputInfo supports consuming subpartition groups
> ---
>
> Key: FLINK-36575
> URL: https://issues.apache.org/jira/browse/FLINK-36575
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Lei Yang
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the ExecutionVertexInputInfo describes the task's input through a 
> combination of a PartitionRange and a SubpartitionRange. However, in the case 
> of skewed join optimization, we need to split a group of data corresponding 
> to the specific key, which may result in a downstream task subscribing to 
> multiple combinations of PartitionRanges and SubpartitionRanges. 
> Therefore, we need to modify the ExecutionVertexInputInfo to describe the 
> input data as multiple combinations of PartitionRanges and SubpartitionRanges 
> to meet the requirements of the aforementioned scenario and improve Flink's 
> flexibility in describing the task's inputs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36575][runtime] ExecutionVertexInputInfo supports consuming subpartition groups [flink]

2024-10-21 Thread via GitHub


flinkbot commented on PR #25551:
URL: https://github.com/apache/flink/pull/25551#issuecomment-2425912906

   
   ## CI report:
   
   * b99b368660d7e45b2e5e8265e05c778bf4b7c372 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36052][docs][elasticsearch] Add flink cdc elasticsearch pipeline sink to docs [flink-cdc]

2024-10-21 Thread via GitHub


beryllw commented on PR #3649:
URL: https://github.com/apache/flink-cdc/pull/3649#issuecomment-2425946273

   @leonardBang Vitess dead link causes CI to fail. Please help to re-trigger 
CI.Thanks a lot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36576][runtime] Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider [flink]

2024-10-21 Thread via GitHub


noorall opened a new pull request, #25552:
URL: https://github.com/apache/flink/pull/25552

   ## What is the purpose of the change
   
   Currently, the DefaultVertexParallelismAndInputInfosDecider is able to 
implement a balanced distribution algorithm based on the amount of data and the 
number of subpartitions, however it also has some limitations:
   
   1. Currently, Decider selects the data distribution algorithm via the 
AllToAll or Pointwise attribute of the input, which limits the ability of the 
operator to dynamically modify the data distribution algorithm.
   2. Doesn't support data volume-based balanced distribution for Pointwise 
inputs.
   3. For AllToAll type inputs, it does not support splitting the data 
corresponding to the specific key, i.e., it cannot solve the data skewing 
caused by single-key hotspot.
   
   For that we plan to introduce the following improvements:
   
   1. Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the 
input characterisation which allows the operator to flexibly choose the data 
balanced distribution algorithm.
   2. Introducing a data volume-based data balanced distribution algorithm for 
Pointwise inputs
   3. Introducing the ability to split data corresponding to the specific key 
to optimise AllToAll's data volume-based data balancing distribution algorithm.
   
   ## Brief change log
   
 - Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation.
 - Introducing amount-based data balanced distribution algorithm for 
Pointwise.
 - Introducing the ability to split data corresponding to the specific key 
for AllToAll
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36576) Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider

2024-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36576:
---
Labels: pull-request-available  (was: )

> Improving amount-based data balancing distribution algorithm for 
> DefaultVertexParallelismAndInputInfosDecider
> -
>
> Key: FLINK-36576
> URL: https://issues.apache.org/jira/browse/FLINK-36576
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Lei Yang
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the DefaultVertexParallelismAndInputInfosDecider is able to 
> implement a balanced distribution algorithm based on the amount of data and 
> the number of subpartitions, however it also has some limitations:
>  # Currently, Decider selects the data distribution algorithm via the 
> AllToAll or Pointwise attribute of the input, which limits the ability of the 
> operator to dynamically modify the data distribution algorithm.
>  # Doesn't support data volume-based balanced distribution for Pointwise 
> inputs.
>  # For AllToAll type inputs, it does not support splitting the data 
> corresponding to the specific key, i.e., it cannot solve the data skewing 
> caused by single-key hotspot.
> For that we plan to introduce the following improvements:
>  # Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the 
> input characterisation which allows the operator to flexibly choose the data 
> balanced distribution algorithm.
>  # Introducing a data volume-based data balanced distribution algorithm for 
> Pointwise inputs
>  # Introducing the ability to split data corresponding to the specific key to 
> optimise AllToAll's data volume-based data balancing distribution algorithm.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36146][connector] Fix SingleThreadFetcherManager race condition [flink]

2024-10-21 Thread via GitHub


kimgr commented on PR #25340:
URL: https://github.com/apache/flink/pull/25340#issuecomment-2427619439

   @becketqin That test fails consistently (before and after fix) with 
`java.lang.IllegalStateException: The split fetcher manager has closed.` due to 
state validation in `SplitFetcherManager.createSplitFetcher`. I'll see if I can 
find another sequence to provoke the error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36332] Missed webhook okhttp reference. [flink-kubernetes-operator]

2024-10-21 Thread via GitHub


SamBarker opened a new pull request, #906:
URL: https://github.com/apache/flink-kubernetes-operator/pull/906

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request adds a new feature to periodically create 
and maintain savepoints through the `FlinkDeployment` custom resource.)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *Periodic savepoint trigger is introduced to the custom resource*
 - *The operator checks on reconciliation whether the required time has 
passed*
 - *The JobManager's dispose savepoint API is used to clean up obsolete 
savepoints*
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(yes / no)
 - Core observer or reconciler logic that is regularly executed: (yes / no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36573][cdc-connector][mysql] Add table information of binlog offsets when checkpointing [flink-cdc]

2024-10-21 Thread via GitHub


lvyanquan commented on code in PR #3656:
URL: https://github.com/apache/flink-cdc/pull/3656#discussion_r1808363691


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java:
##
@@ -108,6 +108,16 @@ public boolean isCompletedSplit() {
 return totalFinishedSplitSize == finishedSnapshotSplitInfos.size();
 }
 
+public String getTables() {
+String tables;
+if (tableSchemas != null) {
+tables = tableSchemas.keySet().toString();

Review Comment:
   If we iterate every time, it will have an impact on performance. Can we 
assume that tableschemas will not change and we only need to build the tables 
variable once.



##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java:
##
@@ -530,7 +530,11 @@ private void logCurrentBinlogOffsets(List 
splits, long checkpointId)
 return;
 }
 BinlogOffset offset = split.asBinlogSplit().getStartingOffset();
-LOG.info("Binlog offset on checkpoint {}: {}", checkpointId, 
offset);
+LOG.info(
+"Binlog offset for tables {} on checkpoint {}: {}",
+split.asBinlogSplit().getTables(),

Review Comment:
   If there are a large number of tables, it will result in logs being quite 
long. Would you consider truncating them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36568][rest] Fix incorrect http url for ClientCoordinationHeaders. [flink]

2024-10-21 Thread via GitHub


zhuzhurk merged PR #25542:
URL: https://github.com/apache/flink/pull/25542


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-36568) IllegalStateException Causes Collect Sink to Fail to Collect Result

2024-10-21 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-36568.
---
Fix Version/s: 2.0.0
   Resolution: Fixed

2c830898640fe65016763c747c05110bd79b8894

> IllegalStateException Causes Collect Sink to Fail to Collect Result
> ---
>
> Key: FLINK-36568
> URL: https://issues.apache.org/jira/browse/FLINK-36568
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> When Flink jobs use the collect sink, the JobManager logs the following 
> exception:
>  
> {code:java}
> ERROR 
> org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler
>  [] - Unhandled exception.
> java.lang.IllegalStateException: No parameter could be found for the given 
> class. {code}
> The root cause is that the HTTP URL for the ClientCoordinationHandler is 
> incorrect.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [tests] Add migration tests from CDC 3.2.0 [flink-cdc]

2024-10-21 Thread via GitHub


yuxiqian opened a new pull request, #3657:
URL: https://github.com/apache/flink-cdc/pull/3657

   Flink CDC has been released and we're preparing for the following 3.3 
release. Adding it to migration testing matrix should help us ensuring state 
backwards compatibility.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered

2024-10-21 Thread Vincent Woo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vincent Woo updated FLINK-34636:

Affects Version/s: 1.13.2

> Requesting exclusive buffers timeout causes repeated restarts and cannot be 
> automatically recovered
> ---
>
> Key: FLINK-34636
> URL: https://issues.apache.org/jira/browse/FLINK-34636
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.13.2
>Reporter: Vincent Woo
>Priority: Major
> Attachments: image-20240308100308649.png, 
> image-20240308101008765.png, image-20240308101407396.png, 
> image-20240308101934756.png
>
>
> Based on the observation of logs and metrics, it was found that a subtask 
> deployed on a same TM consistently reported an exception of requesting 
> exclusive buffers timeout. It was discovered that during the restart process, 
> 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
> suspect that the network buffer memory was not properly released during the 
> restart process, which caused the newly deployed task to fail to obtain the 
> network buffer. This problem persisted despite repeated restarts, and the 
> application failed to recover automatically.
> (I'm not sure if there are other reasons for this issue)
> Attached below are screenshots of the exception stack and relevant metrics:
> {code:java}
> 2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task   
>  [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
> failure cause: java.io.IOException: Timeout triggered when requesting 
> exclusive buffers: The total number of network buffers is currently set to 
> 32768 of 32768 bytes each. You can increase this number by setting the 
> configuration keys 'taskmanager.memory.network.fraction', 
> 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max',  or 
> you may increase the timeout which is 3ms by setting the key 
> 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
>   
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257)
>   
> at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84)
>   
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952)
>   
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655)  
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)  
> at java.lang.Thread.run(Thread.java:748) {code}
> !image-20240308101407396.png|width=866,height=171!
> Network metric:Only this TM is always 100%, without any variation.
> !image-20240308100308649.png|width=868,height=338!
> The status of the task deployed to this TM cannot be RUNNING and the status 
> change is slow
> !image-20240308101008765.png|width=869,height=118!
> Although the root exception thrown by the  application is 
> PartitionNotFoundException, the actual underlying root cause exception log 
> found is IOException: Timeout triggered when requesting exclusive buffers
> !image-20240308101934756.png|width=869,height=394!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36542] Enable upToDateChecking to speed up the spotless [flink]

2024-10-21 Thread via GitHub


ruanwenjun commented on PR #25525:
URL: https://github.com/apache/flink/pull/25525#issuecomment-2426100238

   @ferenc-csaky Thanks for your review, as far as I know, the index file will 
be generated under root/target, this will not be put into binary package, so 
will not break the RAT license check, so we don't need to any extra config in 
CI.
   
   BTW, disable it by default and enabled by using maven args is OK for me.
   ```
   mvn spotless:apply -Dspotless.usingIndex=true
   ```
   It's not a burden to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36517][cdc-connect][paimon] use filterAndCommit API for Avoid commit the same datafile duplicate [flink-cdc]

2024-10-21 Thread via GitHub


lvyanquan commented on code in PR #3639:
URL: https://github.com/apache/flink-cdc/pull/3639#discussion_r1808408534


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java:
##
@@ -454,6 +454,94 @@ public void testSinkWithMultiTables(String metastore)
 Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", 
"1")), result);
 }
 
+@ParameterizedTest
+@ValueSource(strings = {"filesystem", "hive"})
+public void testDuplicateCommitAfterRestore(String metastore)
+throws IOException, InterruptedException, 
Catalog.DatabaseNotEmptyException,
+Catalog.DatabaseNotExistException, SchemaEvolveException {
+initialize(metastore);
+PaimonSink paimonSink =
+new PaimonSink<>(
+catalogOptions, new 
PaimonRecordEventSerializer(ZoneId.systemDefault()));
+PaimonWriter writer = paimonSink.createWriter(new 
MockInitContext());
+Committer committer = 
paimonSink.createCommitter();
+
+// insert
+for (Event event : createTestEvents()) {
+writer.write(event, null);
+}
+writer.flush(false);
+Collection> 
commitRequests =
+writer.prepareCommit().stream()
+.map(MockCommitRequestImpl::new)
+.collect(Collectors.toList());
+committer.commit(commitRequests);
+
+// We add a loop for restore 3 times
+for (int i = 0; i < 3; i++) {

Review Comment:
   Suggest that the repetition count be greater than 5 to trigger some 
compaction. 
   Refer to 
[num-sorted-run.compaction-trigger](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions)
 in doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35387) PG CDC source support heart beat

2024-10-21 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-35387:
--

Assignee: Hongshun Wang

> PG CDC source support heart beat
> 
>
> Key: FLINK-35387
> URL: https://issues.apache.org/jira/browse/FLINK-35387
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Though, document of PG CDC [1] has heartbeat.interval.ms, but it's not valid. 
> The reason is bellow.
> In debezium dos says: For the connector to detect and process events from a 
> heartbeat table, you must add the table to the PostgreSQL publication 
> specified by the 
> [publication.name|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-publication-name]
>  property. If this publication predates your Debezium deployment, the 
> connector uses the publications as defined. If the publication is not already 
> configured to automatically replicate changes {{FOR ALL TABLES}} in the 
> database, you must explicitly add the heartbeat table to the publication[2].
> Thus, if you want use heart beat in cdc:
> 1. add a heartbeat table to publication: ALTER PUBLICATION 
> __ ADD TABLE {_}{_};
> 2. set heartbeatInterval
> 3. add 
> debezium.[{{heartbeat.action.query}}|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query]
>  [3]
>  
> However, when I use it it CDC, some exception occurs:
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55)
> at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:127)
> at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:94){code}
> !https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5292b7c63c883d1620bbf7d3875a3a5b158e70b814913bc360a414d3de9277d871abf3af1cbd75249eddaaa1b37c2b2f5421a918fb1a2f0f3853c0ce41721e620699d98626fa2281948c58faa63edf8ebfc653b69905bac42?tmpCode=9193555a-7bf3-4335-9427-b59c1dfe1931!
>  
> It seems CDC don't add  a HeartbeatConnectionProvider  when configure 
> PostgresEventDispatcher:
> {code:java}
> //org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configure
> this.postgresDispatcher =
>                 new PostgresEventDispatcher<>(
>                         dbzConfig,
>                         topicSelector,
>                         schema,
>                         queue,
>                         dbzConfig.getTableFilters().dataCollectionFilter(),
>                         DataChangeEvent::new,
>                         metadataProvider,
>                         schemaNameAdjuster); {code}
> in debezium, when PostgresConnectorTask start, it will  do it
> {code:java}
> //io.debezium.connector.postgresql.PostgresConnectorTask#start
>   final PostgresEventDispatcher dispatcher = new 
> PostgresEventDispatcher<>(
>                     connectorConfig,
>                     topicNamingStrategy,
>                     schema,
>                     queue,
>                     connectorConfig.getTableFilters().dataCollectionFilter(),
>                     DataChangeEvent::new,
>                     PostgresChangeRecordEmitter::updateSchema,
>                     metadataProvider,
>                     connectorConfig.createHeartbeat(
>                             topicNamingStrategy,
>                             schemaNameAdjuster,
>                             () -> new 
> PostgresConnection(connectorConfig.getJdbcConfig(), 
> PostgresConnection.CONNECTION_GENERAL),
>                             exception -> {
>                                 String sqlErrorId = exception.getSQLState();
>                                 switch (sqlErrorId) {
>                                     case "57P01":
>                                         // Postgres error admin_shutdown, see 
> https://www.postgresql.org/docs/12/errcodes-appendix.html
>                                         throw new DebeziumException("Could 
> not execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
>                                     case "57P03":
>                                         // Postgres error cannot_connect_now, 
> see https://www.postgresql.org/docs/12/errcodes-appendix.html
>                                         throw new RetriableException("Could 
> not execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
>                                     default:
>                                         break;
>                                 }
>                             }),
>                     schem

Re: [PR] [FLINK-36285] Pass default value expression in AlterColumnTypeEvent [flink-cdc]

2024-10-21 Thread via GitHub


yuxiqian closed pull request #3614: [FLINK-36285] Pass default value expression 
in AlterColumnTypeEvent
URL: https://github.com/apache/flink-cdc/pull/3614


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36285] Pass default value expression in AlterColumnTypeEvent [flink-cdc]

2024-10-21 Thread via GitHub


yuxiqian commented on PR #3614:
URL: https://github.com/apache/flink-cdc/pull/3614#issuecomment-2425837181

   Fixed in https://github.com/apache/doris-flink-connector/pull/490


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [W.I.P] Run tests [flink]

2024-10-21 Thread via GitHub


JunRuiLee closed pull request #25431: [W.I.P] Run tests
URL: https://github.com/apache/flink/pull/25431


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36575) ExecutionVertexInputInfo supports consuming subpartition groups

2024-10-21 Thread Lei Yang (Jira)
Lei Yang created FLINK-36575:


 Summary: ExecutionVertexInputInfo supports consuming subpartition 
groups
 Key: FLINK-36575
 URL: https://issues.apache.org/jira/browse/FLINK-36575
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Lei Yang


Currently, the ExecutionVertexInputInfo describes the task's input through a 
combination of a PartitionRange and a SubpartitionRange. However, in the case 
of skewed join optimization, we need to split a group of data corresponding to 
the specific key, which may result in a downstream task subscribing to multiple 
combinations of PartitionRanges and SubpartitionRanges. 

Therefore, we need to modify the ExecutionVertexInputInfo to describe the input 
data as multiple combinations of PartitionRanges and SubpartitionRanges to meet 
the requirements of the aforementioned scenario and improve Flink's flexibility 
in describing the task's inputs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36576) Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider

2024-10-21 Thread Lei Yang (Jira)
Lei Yang created FLINK-36576:


 Summary: Improving amount-based data balancing distribution 
algorithm for DefaultVertexParallelismAndInputInfosDecider
 Key: FLINK-36576
 URL: https://issues.apache.org/jira/browse/FLINK-36576
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Lei Yang


Currently, the DefaultVertexParallelismAndInputInfosDecider is able to 
implement a balanced distribution algorithm based on the amount of data and the 
number of subpartitions, however it also has some limitations:
 # 
Currently, Decider selects the data distribution algorithm via the AllToAll or 
Pointwise attribute of the input, which limits the ability of the operator to 
dynamically modify the data distribution algorithm.
 # 
Doesn't support data volume-based balanced distribution for Pointwise inputs.
 # 
For AllToAll type inputs, it does not support splitting the data corresponding 
to the specific key, i.e., it cannot solve the data skewing caused by 
single-key hotspot.

For that we plan to introduce the following improvements:
 # 
Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the input 
characterisation which allows the operator to flexibly choose the data balanced 
distribution algorithm.
 # 
Introducing a data volume-based data balanced distribution algorithm for 
Pointwise inputs
 # 
Introducing the ability to split data corresponding to the specific key to 
optimise AllToAll's data volume-based data balancing distribution algorithm.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36576) Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider

2024-10-21 Thread Lei Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lei Yang updated FLINK-36576:
-
Description: 
Currently, the DefaultVertexParallelismAndInputInfosDecider is able to 
implement a balanced distribution algorithm based on the amount of data and the 
number of subpartitions, however it also has some limitations:
 # Currently, Decider selects the data distribution algorithm via the AllToAll 
or Pointwise attribute of the input, which limits the ability of the operator 
to dynamically modify the data distribution algorithm.
 # Doesn't support data volume-based balanced distribution for Pointwise inputs.
 # For AllToAll type inputs, it does not support splitting the data 
corresponding to the specific key, i.e., it cannot solve the data skewing 
caused by single-key hotspot.

For that we plan to introduce the following improvements:
 # Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the 
input characterisation which allows the operator to flexibly choose the data 
balanced distribution algorithm.
 # Introducing a data volume-based data balanced distribution algorithm for 
Pointwise inputs
 # Introducing the ability to split data corresponding to the specific key to 
optimise AllToAll's data volume-based data balancing distribution algorithm.

  was:
Currently, the DefaultVertexParallelismAndInputInfosDecider is able to 
implement a balanced distribution algorithm based on the amount of data and the 
number of subpartitions, however it also has some limitations:
 # 
Currently, Decider selects the data distribution algorithm via the AllToAll or 
Pointwise attribute of the input, which limits the ability of the operator to 
dynamically modify the data distribution algorithm.
 # 
Doesn't support data volume-based balanced distribution for Pointwise inputs.
 # 
For AllToAll type inputs, it does not support splitting the data corresponding 
to the specific key, i.e., it cannot solve the data skewing caused by 
single-key hotspot.

For that we plan to introduce the following improvements:
 # 
Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the input 
characterisation which allows the operator to flexibly choose the data balanced 
distribution algorithm.
 # 
Introducing a data volume-based data balanced distribution algorithm for 
Pointwise inputs
 # 
Introducing the ability to split data corresponding to the specific key to 
optimise AllToAll's data volume-based data balancing distribution algorithm.


> Improving amount-based data balancing distribution algorithm for 
> DefaultVertexParallelismAndInputInfosDecider
> -
>
> Key: FLINK-36576
> URL: https://issues.apache.org/jira/browse/FLINK-36576
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Lei Yang
>Priority: Major
>
> Currently, the DefaultVertexParallelismAndInputInfosDecider is able to 
> implement a balanced distribution algorithm based on the amount of data and 
> the number of subpartitions, however it also has some limitations:
>  # Currently, Decider selects the data distribution algorithm via the 
> AllToAll or Pointwise attribute of the input, which limits the ability of the 
> operator to dynamically modify the data distribution algorithm.
>  # Doesn't support data volume-based balanced distribution for Pointwise 
> inputs.
>  # For AllToAll type inputs, it does not support splitting the data 
> corresponding to the specific key, i.e., it cannot solve the data skewing 
> caused by single-key hotspot.
> For that we plan to introduce the following improvements:
>  # Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the 
> input characterisation which allows the operator to flexibly choose the data 
> balanced distribution algorithm.
>  # Introducing a data volume-based data balanced distribution algorithm for 
> Pointwise inputs
>  # Introducing the ability to split data corresponding to the specific key to 
> optimise AllToAll's data volume-based data balancing distribution algorithm.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36542] Enable upToDateChecking to speed up the spotless [flink]

2024-10-21 Thread via GitHub


ferenc-csaky commented on PR #25525:
URL: https://github.com/apache/flink/pull/25525#issuecomment-2425881444

   I think this would be pretty useful, just wondering about if we enable it by 
default could it break something in any CI? My understanding is that by default 
the index file is placed into the project root, and I think that would break 
the RAT license checker. But even if not, or if we add an exception for the 
index in the RAT config doing this extra step in CI runs has no much value, as 
that always runs start a clean slate, cloning the code, etc.
   
   All in all, I think disabling this by default would be a safer choice, we 
can introduce a property for it to make it easy to enable indexing via a 
dynamic param in any local run. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered

2024-10-21 Thread Vincent Woo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891463#comment-17891463
 ] 

Vincent Woo commented on FLINK-34636:
-

This issue is occurring in version 1.13.2, and it looks like it may be related 
to this Network buffer leak:Network buffer leak when ResultPartition is 
released (failover), so I'll verify that the fix code avoids this issue first.

> Requesting exclusive buffers timeout causes repeated restarts and cannot be 
> automatically recovered
> ---
>
> Key: FLINK-34636
> URL: https://issues.apache.org/jira/browse/FLINK-34636
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.13.2
>Reporter: Vincent Woo
>Priority: Major
> Attachments: image-20240308100308649.png, 
> image-20240308101008765.png, image-20240308101407396.png, 
> image-20240308101934756.png
>
>
> Based on the observation of logs and metrics, it was found that a subtask 
> deployed on a same TM consistently reported an exception of requesting 
> exclusive buffers timeout. It was discovered that during the restart process, 
> 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
> suspect that the network buffer memory was not properly released during the 
> restart process, which caused the newly deployed task to fail to obtain the 
> network buffer. This problem persisted despite repeated restarts, and the 
> application failed to recover automatically.
> (I'm not sure if there are other reasons for this issue)
> Attached below are screenshots of the exception stack and relevant metrics:
> {code:java}
> 2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task   
>  [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
> failure cause: java.io.IOException: Timeout triggered when requesting 
> exclusive buffers: The total number of network buffers is currently set to 
> 32768 of 32768 bytes each. You can increase this number by setting the 
> configuration keys 'taskmanager.memory.network.fraction', 
> 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max',  or 
> you may increase the timeout which is 3ms by setting the key 
> 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
>   
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257)
>   
> at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84)
>   
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952)
>   
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655)  
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)  
> at java.lang.Thread.run(Thread.java:748) {code}
> !image-20240308101407396.png|width=866,height=171!
> Network metric:Only this TM is always 100%, without any variation.
> !image-20240308100308649.png|width=868,height=338!
> The status of the task deployed to this TM cannot be RUNNING and the status 
> change is slow
> !image-20240308101008765.png|width=869,height=118!
> Although the root exception thrown by the  application is 
> PartitionNotFoundException, the actual underlying root cause exception log 
> found is IOException: Timeout triggered when requesting exclusive buffers
> !image-20240308101934756.png|width=869,height=394!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36568) IllegalStateException Causes Collect Sink to Fail to Collect Result

2024-10-21 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-36568:
---

Assignee: Junrui Li

> IllegalStateException Causes Collect Sink to Fail to Collect Result
> ---
>
> Key: FLINK-36568
> URL: https://issues.apache.org/jira/browse/FLINK-36568
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
>
> When Flink jobs use the collect sink, the JobManager logs the following 
> exception:
>  
> {code:java}
> ERROR 
> org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler
>  [] - Unhandled exception.
> java.lang.IllegalStateException: No parameter could be found for the given 
> class. {code}
> The root cause is that the HTTP URL for the ClientCoordinationHandler is 
> incorrect.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered

2024-10-21 Thread Vincent Woo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891463#comment-17891463
 ] 

Vincent Woo edited comment on FLINK-34636 at 10/21/24 9:32 AM:
---

This issue is occurring in version 1.13.2, and it looks like it may be related 
to this Network buffer leak:[Network buffer leak when ResultPartition is 
released (failover)|https://issues.apache.org/jira/browse/FLINK-23724], so I'll 
verify that the fix code avoids this issue first.


was (Author: JIRAUSER299026):
This issue is occurring in version 1.13.2, and it looks like it may be related 
to this Network buffer leak:Network buffer leak when ResultPartition is 
released (failover), so I'll verify that the fix code avoids this issue first.

> Requesting exclusive buffers timeout causes repeated restarts and cannot be 
> automatically recovered
> ---
>
> Key: FLINK-34636
> URL: https://issues.apache.org/jira/browse/FLINK-34636
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.13.2
>Reporter: Vincent Woo
>Priority: Major
> Attachments: image-20240308100308649.png, 
> image-20240308101008765.png, image-20240308101407396.png, 
> image-20240308101934756.png
>
>
> Based on the observation of logs and metrics, it was found that a subtask 
> deployed on a same TM consistently reported an exception of requesting 
> exclusive buffers timeout. It was discovered that during the restart process, 
> 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
> suspect that the network buffer memory was not properly released during the 
> restart process, which caused the newly deployed task to fail to obtain the 
> network buffer. This problem persisted despite repeated restarts, and the 
> application failed to recover automatically.
> (I'm not sure if there are other reasons for this issue)
> Attached below are screenshots of the exception stack and relevant metrics:
> {code:java}
> 2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task   
>  [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
> failure cause: java.io.IOException: Timeout triggered when requesting 
> exclusive buffers: The total number of network buffers is currently set to 
> 32768 of 32768 bytes each. You can increase this number by setting the 
> configuration keys 'taskmanager.memory.network.fraction', 
> 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max',  or 
> you may increase the timeout which is 3ms by setting the key 
> 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
>   
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257)
>   
> at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84)
>   
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952)
>   
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655)  
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)  
> at java.lang.Thread.run(Thread.java:748) {code}
> !image-20240308101407396.png|width=866,height=171!
> Network metric:Only this TM is always 100%, without any variation.
> !image-20240308100308649.png|width=868,height=338!
> The status of the task deployed to this TM cannot be RUNNING and the status 
> change is slow
> !image-20240308101008765.png|width=869,height=118!
> Although the root exception thrown by the  application is 
> PartitionNotFoundException, the actual underlying root cause exception log 
> found is IOException: Timeout triggered when requesting exclusive buffers
> !image-20240308101934756.png|width=869,height=394!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered

2024-10-21 Thread Vincent Woo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891463#comment-17891463
 ] 

Vincent Woo edited comment on FLINK-34636 at 10/21/24 9:32 AM:
---

This issue is occurring in version 1.13.2, and it looks like it may be related 
to this Network buffer leak:Network buffer leak when ResultPartition is 
released (failover), so I'll verify that the fix code avoids this issue first.


was (Author: JIRAUSER299026):
This issue is occurring in version 1.13.2, and it looks like it may be related 
to this Network buffer leak:Network buffer leak when ResultPartition is 
released (failover), so I'll verify that the fix code avoids this issue first.

> Requesting exclusive buffers timeout causes repeated restarts and cannot be 
> automatically recovered
> ---
>
> Key: FLINK-34636
> URL: https://issues.apache.org/jira/browse/FLINK-34636
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.13.2
>Reporter: Vincent Woo
>Priority: Major
> Attachments: image-20240308100308649.png, 
> image-20240308101008765.png, image-20240308101407396.png, 
> image-20240308101934756.png
>
>
> Based on the observation of logs and metrics, it was found that a subtask 
> deployed on a same TM consistently reported an exception of requesting 
> exclusive buffers timeout. It was discovered that during the restart process, 
> 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
> suspect that the network buffer memory was not properly released during the 
> restart process, which caused the newly deployed task to fail to obtain the 
> network buffer. This problem persisted despite repeated restarts, and the 
> application failed to recover automatically.
> (I'm not sure if there are other reasons for this issue)
> Attached below are screenshots of the exception stack and relevant metrics:
> {code:java}
> 2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task   
>  [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
> failure cause: java.io.IOException: Timeout triggered when requesting 
> exclusive buffers: The total number of network buffers is currently set to 
> 32768 of 32768 bytes each. You can increase this number by setting the 
> configuration keys 'taskmanager.memory.network.fraction', 
> 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max',  or 
> you may increase the timeout which is 3ms by setting the key 
> 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
>   
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257)
>   
> at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84)
>   
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952)
>   
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655)  
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)  
> at java.lang.Thread.run(Thread.java:748) {code}
> !image-20240308101407396.png|width=866,height=171!
> Network metric:Only this TM is always 100%, without any variation.
> !image-20240308100308649.png|width=868,height=338!
> The status of the task deployed to this TM cannot be RUNNING and the status 
> change is slow
> !image-20240308101008765.png|width=869,height=118!
> Although the root exception thrown by the  application is 
> PartitionNotFoundException, the actual underlying root cause exception log 
> found is IOException: Timeout triggered when requesting exclusive buffers
> !image-20240308101934756.png|width=869,height=394!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36577) Add compatibility check for FlinkStateSnapshot CRD

2024-10-21 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-36577:


 Summary: Add compatibility check for FlinkStateSnapshot CRD
 Key: FLINK-36577
 URL: https://issues.apache.org/jira/browse/FLINK-36577
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Mate Czagany
 Fix For: kubernetes-operator-1.11.0


There are already two CRD compatibility checks in the 
flink-kubernetes-operator-api POM (for FlinkDeployment and FlinkSessionJob), 
one should also be added for FlinkStateSnapshot



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36577) Add compatibility check for FlinkStateSnapshot CRD

2024-10-21 Thread Mate Czagany (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mate Czagany updated FLINK-36577:
-
Description: 
There are already two CRD compatibility checks in the 
flink-kubernetes-operator-api POM (for FlinkDeployment and FlinkSessionJob), 
one should also be added for FlinkStateSnapshot.

 

Requirement for this is to have the 1.10.0 version released.

  was:There are already two CRD compatibility checks in the 
flink-kubernetes-operator-api POM (for FlinkDeployment and FlinkSessionJob), 
one should also be added for FlinkStateSnapshot


> Add compatibility check for FlinkStateSnapshot CRD
> --
>
> Key: FLINK-36577
> URL: https://issues.apache.org/jira/browse/FLINK-36577
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Priority: Major
> Fix For: kubernetes-operator-1.11.0
>
>
> There are already two CRD compatibility checks in the 
> flink-kubernetes-operator-api POM (for FlinkDeployment and FlinkSessionJob), 
> one should also be added for FlinkStateSnapshot.
>  
> Requirement for this is to have the 1.10.0 version released.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35977][doc] Missing an import in datastream.md [flink]

2024-10-21 Thread via GitHub


guluo2016 commented on PR #25153:
URL: https://github.com/apache/flink/pull/25153#issuecomment-2426468450

   > Thanks for the contribution
   > 
   > can you please also update `process_function.md` and `sourceSincks.md`, 
probably `data_stream_api.md` as well since they have the same issue. May be 
something else, need to search through the documentation to double check
   
   Thanks for your review, the changes have been made, please review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Flink Kubernetes Operator 1.10.0 [flink-web]

2024-10-21 Thread via GitHub


mateczagany commented on code in PR #758:
URL: https://github.com/apache/flink-web/pull/758#discussion_r1808609610


##
docs/content/posts/2024-10-30-release-kubernetes-operator-1.10.0.md:
##
@@ -0,0 +1,75 @@
+---
+title:  "Apache Flink Kubernetes Operator 1.10.0 Release Announcement"
+date: "2024-10-30T18:00:00.000Z"
+authors:
+- mateczagany:
+  name: "Mate Czagany"
+aliases:
+- /news/2024/10/30/release-kubernetes-operator-1.10.0.html
+---
+
+The Apache Flink community is excited to announce the release of Flink 
Kubernetes Operator 1.10.0!
+
+The release includes several improvements to the autoscaler, and introduces a 
new Kubernetes custom resource called FlinkStateSnapshot to manage job 
snapshots.
+The process of job upgrades has also been enhanced which makes it possible to 
now use the last-state upgrade mode with session jobs.
+
+We encourage you to [download the 
release](https://flink.apache.org/downloads.html) and share your experience 
with the
+community through the Flink [mailing 
lists](https://flink.apache.org/community.html#mailing-lists) or
+[JIRA](https://issues.apache.org/jira/browse/flink)! We're looking forward to 
your feedback!
+
+## Highlights
+
+### FlinkStateSnapshot
+
+With this version comes also a new custom resource called FlinkStateSnapshot. 
+This is used to describe savepoint or checkpoint for a Flink job. 
+The savepoint/checkpoint fields found in FlinkDeployment and FlinkSessionJob 
status are therefore deprecated, and the operator will create new 
FlinkStateSnapshot resources for periodic, update and manual 
savepoints/checkpoints.
+
+Users can also create new FlinkStateSnapshot resources, which will instruct 
the operator to trigger new checkpoint/savepoint.
+
+This new feature is enabled by default, unless disabled by setting 
`kubernetes.operator.snapshot.resource.enabled` to false or if the 
FlinkStateSnapshot CRD was not found on the Kubernetes cluster.
+
+You can read more about this feature 
[here](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.10/docs/custom-resource/snapshots/).
+
+
+### Last-State Upgrade Mode
+
+For deployments using last-state upgrade mode, the operator - instead of 
forcefully deleting the JobManager pods to trigger a restart - will cancel the 
job via REST API and extract the last checkpoint info after cancellation if the 
job is healthy.
+This change makes it possible to finally use the last-state upgrade mode for 
session jobs as well.
+
+
+### Autoscaler Delayed Scale Down
+
+With the introduction of the configuration option 
`job.autoscaler.scale-down.interval`, the operator can now optimize multiple 
scale-down operations to a single one to prevent too many unnecessary 
downscales, thus improving job availability.
+Please note that `job.autoscaler.scale-up.grace-period` has been removed with 
this change.
+
+
+### Other Autoscaler Improvements
+- Optimized cases where partitions or key groups cannot be evenly distributed 
to subtasks in case of Kafka and Pulsar

Review Comment:
   Thank you for clarifying it! Sorry for the delay, I have updated this line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35136] Bump connector version to 4.0, adapt CI workflows [flink-connector-hbase]

2024-10-21 Thread via GitHub


ferenc-csaky commented on PR #46:
URL: 
https://github.com/apache/flink-connector-hbase/pull/46#issuecomment-2426418591

   > Hi, @ferenc-csaky. I'm interested in the connector for Flink > 1.18 and 
glad to see this PR, thank you for the work! I wonder if there is there 
anything that is blocking the release?
   
   Hi @JuliaBogdan, unfortunately yes, it is not trivial to release the 
connector as is against 1.18+, because some Hadoop elements are not playing 
well with JDK17+, so this will be more work than I originally anticipated. I 
will initiate a discussion to releasethe HBase connector for Flink 1.18 and 
1.19 for only JDK8 and JDK11 to at least start to bridge the current gap.
   
   Anything that comes afterwards probably we will require to bump all 
Hadoop-related deps in this repo.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36502) Remove all deprecated methods in `FactoryUtil`

2024-10-21 Thread Shengbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891523#comment-17891523
 ] 

Shengbo commented on FLINK-36502:
-

Hi [~xuyangzhong], I'd like to work on this subtask as my first contribution on 
Flink, would you please assign it to me

> Remove all deprecated methods in `FactoryUtil`
> --
>
> Key: FLINK-36502
> URL: https://issues.apache.org/jira/browse/FLINK-36502
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [runtime-web] chore: update Angular to v15 [flink]

2024-10-21 Thread via GitHub


michalmw commented on PR #25450:
URL: https://github.com/apache/flink/pull/25450#issuecomment-2426504565

   @simplejason or @yangjunhan can you check this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34467] bump flink version to 1.20.0 [flink-connector-jdbc]

2024-10-21 Thread via GitHub


boring-cyborg[bot] commented on PR #137:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/137#issuecomment-2426275936

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34467) Implement Lineage Interface in Jdbc Connector

2024-10-21 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-34467:
---
Fix Version/s: cdc-3.3.0

> Implement Lineage Interface in Jdbc Connector
> -
>
> Key: FLINK-34467
> URL: https://issues.apache.org/jira/browse/FLINK-34467
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34467] bump flink version to 1.20.0 [flink-connector-jdbc]

2024-10-21 Thread via GitHub


leonardBang merged PR #137:
URL: https://github.com/apache/flink-connector-jdbc/pull/137


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34467) Implement Lineage Interface in Jdbc Connector

2024-10-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891492#comment-17891492
 ] 

Leonard Xu commented on FLINK-34467:


[~arvid] [~ZhenqiuHuang] Could you check your last PR for Kakfa connector 
version bump ?

> Implement Lineage Interface in Jdbc Connector
> -
>
> Key: FLINK-34467
> URL: https://issues.apache.org/jira/browse/FLINK-34467
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36476) Remove all deprecated methods under public APIs in table modules and adapt the related tests to make them pass

2024-10-21 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891481#comment-17891481
 ] 

xuyang commented on FLINK-36476:


[~davidradl] Good idea. The vast majority of the deletion work should not be 
affected, so let's proceed for now. Once we've completed most of the work, we 
can summarize the methods that were blocked due to not being marked as 
deprecated on the Python side and discuss them in the dev email. WDYT?

> Remove all deprecated methods under public APIs in table modules and adapt 
> the related tests to make them pass
> --
>
> Key: FLINK-36476
> URL: https://issues.apache.org/jira/browse/FLINK-36476
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> All methods needed to be removed are:
> [https://docs.google.com/document/d/1bVrmcB9UFOd1-sT7xDRTMb5rmO0vK0eDlLRUYYL9gd4/edit?usp=sharing]
>  
> Note:
>  # Please make sure to delete and adapt the code(including test) of module 
> table, python, doc, etc. as well.
>  # Please raise a PR in the corresponding subtask, and try not to link PRs on 
> this umbrella jira.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33926][kubernetes]: Allow using job jars in the system classpath in native mode [flink]

2024-10-21 Thread via GitHub


tinaselenge commented on code in PR #25445:
URL: https://github.com/apache/flink/pull/25445#discussion_r1808630829


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java:
##
@@ -214,9 +215,12 @@ public ClusterClientProvider 
deployApplicationCluster(
 
 applicationConfiguration.applyToConfiguration(flinkConfig);
 
-// No need to do pipelineJars validation if it is a PyFlink job.
+// No need to do pipelineJars validation if it is a PyFlink job or no 
jars specified in
+// pipeline.jars (to use jars in system classpath instead).
 if 
(!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName())
-|| 
PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments( 
{
+|| PackagedProgramUtils.isPython(
+
applicationConfiguration.getProgramArguments()))
+&& flinkConfig.get(PipelineOptions.JARS) != null) {

Review Comment:
   good idea!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34467) Implement Lineage Interface in Jdbc Connector

2024-10-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891488#comment-17891488
 ] 

Leonard Xu edited comment on FLINK-34467 at 10/21/24 10:35 AM:
---

merged into jdbc connector repo via main: 
72f315fcac5ab300ec06625696bcda33b2141cd7


was (Author: leonard xu):
merged into jdbc connector repo vias main: 
72f315fcac5ab300ec06625696bcda33b2141cd7

> Implement Lineage Interface in Jdbc Connector
> -
>
> Key: FLINK-34467
> URL: https://issues.apache.org/jira/browse/FLINK-34467
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34467) Implement Lineage Interface in Jdbc Connector

2024-10-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891488#comment-17891488
 ] 

Leonard Xu commented on FLINK-34467:


merged into jdbc connector repo vias main: 
72f315fcac5ab300ec06625696bcda33b2141cd7

> Implement Lineage Interface in Jdbc Connector
> -
>
> Key: FLINK-34467
> URL: https://issues.apache.org/jira/browse/FLINK-34467
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35136] Bump connector version to 4.0, adapt CI workflows [flink-connector-hbase]

2024-10-21 Thread via GitHub


JuliaBogdan commented on PR #46:
URL: 
https://github.com/apache/flink-connector-hbase/pull/46#issuecomment-2426326123

   Hi, @ferenc-csaky. I'm interested in the connector for Flink > 1.18 and glad 
to see this PR, thank you for the work!
   I wonder if there is there anything that is blocking the release?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35016] catalog changes for model resource [flink]

2024-10-21 Thread via GitHub


twalthr merged PR #25211:
URL: https://github.com/apache/flink/pull/25211


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35016) Catalog changes for model CRUD

2024-10-21 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-35016.

Fix Version/s: 2.0.0
 Assignee: Hao Li
   Resolution: Fixed

Fixed in master: 2d17f6148796c30890d38c830f772f8f38bfd495

> Catalog changes for model CRUD
> --
>
> Key: FLINK-35016
> URL: https://issues.apache.org/jira/browse/FLINK-35016
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36542] Enable upToDateChecking to speed up the spotless [flink]

2024-10-21 Thread via GitHub


ferenc-csaky commented on PR #25525:
URL: https://github.com/apache/flink/pull/25525#issuecomment-2426536625

   > @ferenc-csaky Thanks for your review, as far as I know, the index file 
will be generated under root/target, this will not be put into binary package, 
so will not break the RAT license check, we don't need any extra change in CI.
   > 
   > BTW, disable it by default and enable this by using maven args is 
acceptable to me.
   > 
   > ```
   > mvn spotless:apply -Dspotless.usingIndex=true
   > ```
   > 
   > It's not a burden to me.
   
   Sounds good to me! Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34467) Implement Lineage Interface in Jdbc Connector

2024-10-21 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-34467:
---
Affects Version/s: jdbc-3.2.0
   (was: 1.19.0)

> Implement Lineage Interface in Jdbc Connector
> -
>
> Key: FLINK-34467
> URL: https://issues.apache.org/jira/browse/FLINK-34467
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [hotfix][docs] Add FlinkStateSnapshot to upgrade docs [flink-kubernetes-operator]

2024-10-21 Thread via GitHub


mateczagany opened a new pull request, #905:
URL: https://github.com/apache/flink-kubernetes-operator/pull/905

   With the usual `helm upgrade`, the new FlinkStateSnapshot CRD won't be 
installed, and it will need to be installed with `kubectl create`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36557) Stale Autoscaler Context in Kubernetes Operator

2024-10-21 Thread Sai Sharath Dandi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891612#comment-17891612
 ] 

Sai Sharath Dandi commented on FLINK-36557:
---

Hi [~gyfora] , Could you please help review this Jira and provide suggestion? I 
can work on the PR for fix accordingly

> Stale Autoscaler Context in Kubernetes Operator
> ---
>
> Key: FLINK-36557
> URL: https://issues.apache.org/jira/browse/FLINK-36557
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Sai Sharath Dandi
>Priority: Minor
>
> The KubernetesJobAutoScalerContext is 
> [cached|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java#L59]
>  in the FlinkResourceContext and reused. If the JobAutoscalerContext is 
> initialized before the job reaches Running state, it can cause the autoscaler 
> to not trigger - 
> [link|[https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java#L98].]
>  
> We need to either refresh the AutoScalerContext similar to the standalone 
> [implementation|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java#L127]
>  or the autoscaler module itself needs to refresh the job status



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36542] Enable upToDateChecking to speed up the spotless [flink]

2024-10-21 Thread via GitHub


ferenc-csaky commented on PR #25525:
URL: https://github.com/apache/flink/pull/25525#issuecomment-2427353187

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36557) Stale Autoscaler Context in Kubernetes Operator

2024-10-21 Thread Sai Sharath Dandi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sai Sharath Dandi updated FLINK-36557:
--
Description: 
The KubernetesJobAutoScalerContext is 
[cached|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java#L59]
 in the FlinkResourceContext and reused. If the JobAutoscalerContext is 
initialized before the job reaches Running state, it can cause the autoscaler 
to not trigger - 
[link|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java#L98]

 

We need to either refresh the AutoScalerContext similar to the standalone 
[implementation|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java#L127]
 or the autoscaler module itself needs to refresh the job status

  was:
The KubernetesJobAutoScalerContext is 
[cached|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java#L59]
 in the FlinkResourceContext and reused. If the JobAutoscalerContext is 
initialized before the job reaches Running state, it can cause the autoscaler 
to not trigger - 
[link|[https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java#L98].]




 

We need to either refresh the AutoScalerContext similar to the standalone 
[implementation|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java#L127]
 or the autoscaler module itself needs to refresh the job status


> Stale Autoscaler Context in Kubernetes Operator
> ---
>
> Key: FLINK-36557
> URL: https://issues.apache.org/jira/browse/FLINK-36557
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Sai Sharath Dandi
>Priority: Minor
>
> The KubernetesJobAutoScalerContext is 
> [cached|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java#L59]
>  in the FlinkResourceContext and reused. If the JobAutoscalerContext is 
> initialized before the job reaches Running state, it can cause the autoscaler 
> to not trigger - 
> [link|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java#L98]
>  
> We need to either refresh the AutoScalerContext similar to the standalone 
> [implementation|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java#L127]
>  or the autoscaler module itself needs to refresh the job status



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36542) Enable upToDateChecking to speed up the spotless

2024-10-21 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky reassigned FLINK-36542:


Assignee: Wenjun Ruan

> Enable upToDateChecking to speed up the spotless
> 
>
> Key: FLINK-36542
> URL: https://issues.apache.org/jira/browse/FLINK-36542
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Wenjun Ruan
>Assignee: Wenjun Ruan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-10-15-22-01-25-049.png
>
>
> I use `mvn spotless:apply` to format the code format, find it will cost 01:13 
> min, this is too slow and each time I execute `mvn spotless:apply` it will 
> cost 1 min.
> I hope we can enable 
> upToDateChecking setting to speed up the spotless
> [https://github.com/diffplug/spotless/tree/main/plugin-maven#incremental-up-to-date-checking-and-formatting]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36542) Enable upToDateChecking to speed up the spotless

2024-10-21 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-36542:
-
Fix Version/s: 2.0.0

> Enable upToDateChecking to speed up the spotless
> 
>
> Key: FLINK-36542
> URL: https://issues.apache.org/jira/browse/FLINK-36542
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Wenjun Ruan
>Assignee: Wenjun Ruan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
> Attachments: image-2024-10-15-22-01-25-049.png
>
>
> I use `mvn spotless:apply` to format the code format, find it will cost 01:13 
> min, this is too slow and each time I execute `mvn spotless:apply` it will 
> cost 1 min.
> I hope we can enable 
> upToDateChecking setting to speed up the spotless
> [https://github.com/diffplug/spotless/tree/main/plugin-maven#incremental-up-to-date-checking-and-formatting]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36146][connector] Fix SingleThreadFetcherManager race condition [flink]

2024-10-21 Thread via GitHub


becketqin commented on PR #25340:
URL: https://github.com/apache/flink/pull/25340#issuecomment-2427417285

   @kimgr Sorry for the late reply. And thanks for the patch. 
   
   The patch itself looks good to me. Usually, a test needs to be added to 
verify the bug is actually fixed. For race conditions like this the unit test 
can be a little tricky to write. What I usually do is just running the sequence 
for something like 1000 times and make sure there is no occasional failure 
anymore. So in this case, I think we can repeat the following sequence in a 
unit test for 1000 times:
   
   1. create a `SingleThreadFetcherManager`.
   2. add a new split.
   3. close the fetcher manager without waiting.
   4. add a new split.
   
   That said, I still don't quite understand why a split will be assigned after 
the SourceReader is closing. But I don't think the fix will cause problem 
because the SourceReader is shutdown anyways when this race condition happens.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Use chronological order timestamp for timers [flink-benchmarks]

2024-10-21 Thread via GitHub


pnowojski commented on PR #94:
URL: https://github.com/apache/flink-benchmarks/pull/94#issuecomment-2427270392

   I think the problem got resolved by 
https://github.com/apache/flink-benchmarks/pull/97


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Use chronological order timestamp for timers [flink-benchmarks]

2024-10-21 Thread via GitHub


pnowojski merged PR #94:
URL: https://github.com/apache/flink-benchmarks/pull/94


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36146][connector] Fix SingleThreadFetcherManager race condition [flink]

2024-10-21 Thread via GitHub


kimgr commented on PR #25340:
URL: https://github.com/apache/flink/pull/25340#issuecomment-2427449905

   Thanks, I'll try to hack up a testcase!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36573][cdc-connector][mysql] Add table information of binlog offsets when checkpointing [flink-cdc]

2024-10-21 Thread via GitHub


herunkang2018 commented on code in PR #3656:
URL: https://github.com/apache/flink-cdc/pull/3656#discussion_r1809715176


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java:
##
@@ -108,6 +108,16 @@ public boolean isCompletedSplit() {
 return totalFinishedSplitSize == finishedSnapshotSplitInfos.size();
 }
 
+public String getTables() {
+String tables;
+if (tableSchemas != null) {
+tables = tableSchemas.keySet().toString();

Review Comment:
   Good advice, I will refine it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]

2024-10-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #25501:
URL: https://github.com/apache/flink/pull/25501#discussion_r1809757295


##
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java:
##
@@ -67,6 +67,12 @@ default boolean isRestored() {
  */
 InternalTimeServiceManager internalTimerServiceManager();
 
+/**
+ * Returns the internal timer service manager create by async state 
backend for the stream
+ * operator. This method returns null for non-keyed operators.
+ */
+InternalTimeServiceManager asyncInternalTimerServiceManager();

Review Comment:
   Are there cases in which an operator needs to use async timer service and 
sync timer service simultaneously? I'm wondering if we could reduce 
`internalTimerServiceManager()` and `asyncInternalTimerServiceManager()` into 
one method, given that they have the same return type, and it might better help 
hiding implementation details from invokers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36578) Fixed bug when converting json to string

2024-10-21 Thread Lee SeungMin (Jira)
Lee SeungMin created FLINK-36578:


 Summary: Fixed bug when converting json to string
 Key: FLINK-36578
 URL: https://issues.apache.org/jira/browse/FLINK-36578
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: Lee SeungMin


The function to convert json to string is different in the snapshot step and 
the binlog step. 

This causes the following differences.

- original : \{"key": "value", "key1": "value1"}

- snapshot : \{"key": "value", "key1": "value1"}

- binlog : \{"key":"value","key1":"value1"} // no whitespace

 

 

- code

```

    protected Object convertJson(Column column, Field fieldDefn, Object data) {
        return convertValue(column, fieldDefn, data, "{}", (r) -> {
            if (data instanceof byte[]) {
                // The BinlogReader sees these JSON values as binary encoded, 
so we use the binlog client library's utility
                // to parse MySQL's internal binary representation into a JSON 
string, using the standard formatter.

                if (((byte[]) data).length == 0) {
                    r.deliver(column.isOptional() ? null : "{}");
                }
                else {
                    try {
                        r.deliver(JsonBinary.parseAsString((byte[]) data));
                    }
                    catch (IOException e) {
                        parsingErrorHandler.error("Failed to parse and read a 
JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
                        r.deliver(column.isOptional() ? null : "{}");
                    }
                }
            }
            else if (data instanceof String) {
                // The SnapshotReader sees JSON values as UTF-8 encoded strings.
                r.deliver(data);
            }
        });
    }

```

(https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381)

 

So added modified JsonStringFormatter (added whitespace between key and value, 
and after comma to make it work the same as the snapshot step).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36052] Add elasticsearch.md for elasticsearch pipeline connector [flink-cdc]

2024-10-21 Thread via GitHub


github-actions[bot] commented on PR #3539:
URL: https://github.com/apache/flink-cdc/pull/3539#issuecomment-2427952115

   This pull request has been automatically marked as stale because it has not 
had recent activity for 60 days. It will be closed in 30 days if no further 
activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Optimize log level [flink-cdc]

2024-10-21 Thread via GitHub


github-actions[bot] commented on PR #3538:
URL: https://github.com/apache/flink-cdc/pull/3538#issuecomment-2427952144

   This pull request has been automatically marked as stale because it has not 
had recent activity for 60 days. It will be closed in 30 days if no further 
activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36573][cdc-connector][mysql] Add table information of binlog offsets when checkpointing [flink-cdc]

2024-10-21 Thread via GitHub


herunkang2018 commented on code in PR #3656:
URL: https://github.com/apache/flink-cdc/pull/3656#discussion_r1809718242


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java:
##
@@ -530,7 +530,11 @@ private void logCurrentBinlogOffsets(List 
splits, long checkpointId)
 return;
 }
 BinlogOffset offset = split.asBinlogSplit().getStartingOffset();
-LOG.info("Binlog offset on checkpoint {}: {}", checkpointId, 
offset);
+LOG.info(
+"Binlog offset for tables {} on checkpoint {}: {}",
+split.asBinlogSplit().getTables(),

Review Comment:
   Yes, I will refine it. The first few tables are sufficient to identify the 
specific cdc source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36578) Fixed bug when converting json to string

2024-10-21 Thread Lee SeungMin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee SeungMin updated FLINK-36578:
-
Description: 
The function to convert json to string is different in the snapshot step and 
the binlog step. 

This causes the following differences.
 - original : \{"key": "value", "key1": "value1"}

 - snapshot : \{"key": "value", "key1": "value1"}

 - binlog : \{"key":"value","key1":"value1"} // no whitespace

 

 
 - code

{code:java}
    protected Object convertJson(Column column, Field fieldDefn, Object data) {
        return convertValue(column, fieldDefn, data, "{}", (r) -> {
            if (data instanceof byte[]) {
                // The BinlogReader sees these JSON values as binary encoded, 
so we use the binlog client library's utility
                // to parse MySQL's internal binary representation into a JSON 
string, using the standard formatter.                if (((byte[]) data).length 
== 0) {
                    r.deliver(column.isOptional() ? null : "{}");
                }
                else {
                    try{                         
r.deliver(JsonBinary.parseAsString((byte[]) data));                     }       
             catch (IOException e) {
                        parsingErrorHandler.error("Failed to parse and read a 
JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
                        r.deliver(column.isOptional() ? null : "{}");
                    }
                }
            }
            else if (data instanceof String){                 // The 
SnapshotReader sees JSON values as UTF-8 encoded strings.                 
r.deliver(data);             }        });
    } {code}

([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])

 

So added modified JsonStringFormatter (added whitespace between key and value, 
and after comma to make it work the same as the snapshot step).

  was:
The function to convert json to string is different in the snapshot step and 
the binlog step. 

This causes the following differences.

- original : \{"key": "value", "key1": "value1"}

- snapshot : \{"key": "value", "key1": "value1"}

- binlog : \{"key":"value","key1":"value1"} // no whitespace

 

 

- code

```

    protected Object convertJson(Column column, Field fieldDefn, Object data) {
        return convertValue(column, fieldDefn, data, "{}", (r) -> {
            if (data instanceof byte[]) {
                // The BinlogReader sees these JSON values as binary encoded, 
so we use the binlog client library's utility
                // to parse MySQL's internal binary representation into a JSON 
string, using the standard formatter.

                if (((byte[]) data).length == 0) {
                    r.deliver(column.isOptional() ? null : "{}");
                }
                else {
                    try {
                        r.deliver(JsonBinary.parseAsString((byte[]) data));
                    }
                    catch (IOException e) {
                        parsingErrorHandler.error("Failed to parse and read a 
JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
                        r.deliver(column.isOptional() ? null : "{}");
                    }
                }
            }
            else if (data instanceof String) {
                // The SnapshotReader sees JSON values as UTF-8 encoded strings.
                r.deliver(data);
            }
        });
    }

```

(https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381)

 

So added modified JsonStringFormatter (added whitespace between key and value, 
and after comma to make it work the same as the snapshot step).


> Fixed bug when converting json to string
> 
>
> Key: FLINK-36578
> URL: https://issues.apache.org/jira/browse/FLINK-36578
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Lee SeungMin
>Priority: Major
>
> The function to convert json to string is different in the snapshot step and 
> the binlog step. 
> This causes the following differences.
>  - original : \{"key": "value", "key1": "value1"}
>  - snapshot : \{"key": "value", "key1": "value1"}
>  - binlog : \{"key":"value","key1":"value1"} // no whitespace
>  
>  
>  - code
> {code:java}
>     protected Object convertJson(Column column, Field fieldDefn, Object data) 
> {
>         return convertValue(column, fieldDefn, data, "{}", (r) -> {
>             if (data instanceof byte[]) {
>                 // The BinlogReader sees these JSON values as binary encoded, 
> 

[jira] [Updated] (FLINK-36578) Fixed bug when converting json to string

2024-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36578:
---
Labels: pull-request-available  (was: )

> Fixed bug when converting json to string
> 
>
> Key: FLINK-36578
> URL: https://issues.apache.org/jira/browse/FLINK-36578
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Lee SeungMin
>Priority: Major
>  Labels: pull-request-available
>
> The function to convert json to string is different in the snapshot step and 
> the binlog step. 
> This causes the following differences.
>  - original : \{"key": "value", "key1": "value1"}
>  - snapshot : \{"key": "value", "key1": "value1"}
>  - binlog : \{"key":"value","key1":"value1"} // no whitespace
>  
>  
>  - code
> {code:java}
>     protected Object convertJson(Column column, Field fieldDefn, Object data) 
> {
>         return convertValue(column, fieldDefn, data, "{}", (r) -> {
>             if (data instanceof byte[]) {
>                 // The BinlogReader sees these JSON values as binary encoded, 
> so we use the binlog client library's utility
>                 // to parse MySQL's internal binary representation into a 
> JSON string, using the standard formatter.                if (((byte[]) 
> data).length == 0) {
>                     r.deliver(column.isOptional() ? null : "{}");
>                 }
>                 else {
>                     try{                         
> r.deliver(JsonBinary.parseAsString((byte[]) data));                     }     
>                catch (IOException e) {
>                         parsingErrorHandler.error("Failed to parse and read a 
> JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
>                         r.deliver(column.isOptional() ? null : "{}");
>                     }
>                 }
>             }
>             else if (data instanceof String){                 // The 
> SnapshotReader sees JSON values as UTF-8 encoded strings.                 
> r.deliver(data);             }        });
>     } {code}
> ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])
>  
> So added modified JsonStringFormatter (added whitespace between key and 
> value, and after comma to make it work the same as the snapshot step).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36578][mysql] Add table information of binlog offsets when checkpointing added modified JsonStringFormatter [flink-cdc]

2024-10-21 Thread via GitHub


SML0127 opened a new pull request, #3658:
URL: https://github.com/apache/flink-cdc/pull/3658

   The function to convert json to string is different in the snapshot step and 
the binlog step. 
   This causes the following differences.
   ```d
   original data : {"key": "value", "key1": "value1"}
   snapshot : {"key": "value", "key1": "value1"}
   binlog : {"key":"value","key1":"value1"} // no whitespace
   ```

   
   So I modified JsonStringFormatter and added it to flink-cdc
   - modified code line
   - line 105: Added whitespace before value
   - line 207: Added whitespace after comma
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36578) Fixed bug when converting json to string

2024-10-21 Thread Lee SeungMin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891704#comment-17891704
 ] 

Lee SeungMin commented on FLINK-36578:
--

assign this pr to me

PTAL [~Leonard], [~renqs] 

> Fixed bug when converting json to string
> 
>
> Key: FLINK-36578
> URL: https://issues.apache.org/jira/browse/FLINK-36578
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Lee SeungMin
>Priority: Major
>
> The function to convert json to string is different in the snapshot step and 
> the binlog step. 
> This causes the following differences.
>  - original : \{"key": "value", "key1": "value1"}
>  - snapshot : \{"key": "value", "key1": "value1"}
>  - binlog : \{"key":"value","key1":"value1"} // no whitespace
>  
>  
>  - code
> {code:java}
>     protected Object convertJson(Column column, Field fieldDefn, Object data) 
> {
>         return convertValue(column, fieldDefn, data, "{}", (r) -> {
>             if (data instanceof byte[]) {
>                 // The BinlogReader sees these JSON values as binary encoded, 
> so we use the binlog client library's utility
>                 // to parse MySQL's internal binary representation into a 
> JSON string, using the standard formatter.                if (((byte[]) 
> data).length == 0) {
>                     r.deliver(column.isOptional() ? null : "{}");
>                 }
>                 else {
>                     try{                         
> r.deliver(JsonBinary.parseAsString((byte[]) data));                     }     
>                catch (IOException e) {
>                         parsingErrorHandler.error("Failed to parse and read a 
> JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
>                         r.deliver(column.isOptional() ? null : "{}");
>                     }
>                 }
>             }
>             else if (data instanceof String){                 // The 
> SnapshotReader sees JSON values as UTF-8 encoded strings.                 
> r.deliver(data);             }        });
>     } {code}
> ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])
>  
> So added modified JsonStringFormatter (added whitespace between key and 
> value, and after comma to make it work the same as the snapshot step).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36578) Fixed bug when converting json to string

2024-10-21 Thread Lee SeungMin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee SeungMin updated FLINK-36578:
-
Description: 
The function to convert json to string is different in the snapshot step and 
the binlog step. 

This causes the following differences.
 - original : \{"key": "value", "key1": "value1"}

 - snapshot : \{"key": "value", "key1": "value1"}

 - binlog : \{"key":"value","key1":"value1"} // no whitespace

 

code
{code:java}
    protected Object convertJson(Column column, Field fieldDefn, Object data) {
        return convertValue(column, fieldDefn, data, "{}", (r) -> {
            if (data instanceof byte[]) {
                // The BinlogReader sees these JSON values as binary encoded, 
so we use the binlog client library's utility
                // to parse MySQL's internal binary representation into a JSON 
string, using the standard formatter.

                if (((byte[]) data).length == 0) {
                    r.deliver(column.isOptional() ? null : "{}");
                }
                else {
                    try{                         
r.deliver(JsonBinary.parseAsString((byte[]) data));                     }       
             catch (IOException e) {
                        parsingErrorHandler.error("Failed to parse and read a 
JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
                        r.deliver(column.isOptional() ? null : "{}");
                    }
                }
            }
            else if (data instanceof String){
                 // The SnapshotReader sees JSON values as UTF-8 encoded 
strings.
                 r.deliver(data);
             }
        });
    } {code}
([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])

 

So added modified JsonStringFormatter (added whitespace between key and value, 
and after comma to make it work the same as the snapshot step).

  was:
The function to convert json to string is different in the snapshot step and 
the binlog step. 

This causes the following differences.
 - original : \{"key": "value", "key1": "value1"}

 - snapshot : \{"key": "value", "key1": "value1"}

 - binlog : \{"key":"value","key1":"value1"} // no whitespace

 

code
{code:java}
    protected Object convertJson(Column column, Field fieldDefn, Object data) {
        return convertValue(column, fieldDefn, data, "{}", (r) -> {
            if (data instanceof byte[]) {
                // The BinlogReader sees these JSON values as binary encoded, 
so we use the binlog client library's utility
                // to parse MySQL's internal binary representation into a JSON 
string, using the standard formatter.                if (((byte[]) data).length 
== 0) {
                    r.deliver(column.isOptional() ? null : "{}");
                }
                else {
                    try{                         
r.deliver(JsonBinary.parseAsString((byte[]) data));                     }       
             catch (IOException e) {
                        parsingErrorHandler.error("Failed to parse and read a 
JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
                        r.deliver(column.isOptional() ? null : "{}");
                    }
                }
            }
            else if (data instanceof String){                 // The 
SnapshotReader sees JSON values as UTF-8 encoded strings.                 
r.deliver(data);             }        });
    } {code}
([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])

 

So added modified JsonStringFormatter (added whitespace between key and value, 
and after comma to make it work the same as the snapshot step).


> Fixed bug when converting json to string
> 
>
> Key: FLINK-36578
> URL: https://issues.apache.org/jira/browse/FLINK-36578
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Lee SeungMin
>Priority: Major
>  Labels: pull-request-available
>
> The function to convert json to string is different in the snapshot step and 
> the binlog step. 
> This causes the following differences.
>  - original : \{"key": "value", "key1": "value1"}
>  - snapshot : \{"key": "value", "key1": "value1"}
>  - binlog : \{"key":"value","key1":"value1"} // no whitespace
>  
> code
> {code:java}
>     protected Object convertJson(Column column, Field fieldDefn, Object data) 
> {
>         return convertValue(column, fieldDefn, data, "{}", (r) -> {
>             if (data instanceof byte[]) {
>                 // The BinlogReader sees

[jira] [Updated] (FLINK-36578) Fixed bug when converting json to string

2024-10-21 Thread Lee SeungMin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee SeungMin updated FLINK-36578:
-
Description: 
The function to convert json to string is different in the snapshot step and 
the binlog step. 

This causes the following differences.
 - original : \{"key": "value", "key1": "value1"}

 - snapshot : \{"key": "value", "key1": "value1"}

 - binlog : \{"key":"value","key1":"value1"} // no whitespace

 

code
{code:java}
    protected Object convertJson(Column column, Field fieldDefn, Object data) {
        return convertValue(column, fieldDefn, data, "{}", (r) -> {
            if (data instanceof byte[]) {
                // The BinlogReader sees these JSON values as binary encoded, 
so we use the binlog client library's utility
                // to parse MySQL's internal binary representation into a JSON 
string, using the standard formatter.                if (((byte[]) data).length 
== 0) {
                    r.deliver(column.isOptional() ? null : "{}");
                }
                else {
                    try{                         
r.deliver(JsonBinary.parseAsString((byte[]) data));                     }       
             catch (IOException e) {
                        parsingErrorHandler.error("Failed to parse and read a 
JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
                        r.deliver(column.isOptional() ? null : "{}");
                    }
                }
            }
            else if (data instanceof String){                 // The 
SnapshotReader sees JSON values as UTF-8 encoded strings.                 
r.deliver(data);             }        });
    } {code}
([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])

 

So added modified JsonStringFormatter (added whitespace between key and value, 
and after comma to make it work the same as the snapshot step).

  was:
The function to convert json to string is different in the snapshot step and 
the binlog step. 

This causes the following differences.
 - original : \{"key": "value", "key1": "value1"}

 - snapshot : \{"key": "value", "key1": "value1"}

 - binlog : \{"key":"value","key1":"value1"} // no whitespace

 

 
 - code

{code:java}
    protected Object convertJson(Column column, Field fieldDefn, Object data) {
        return convertValue(column, fieldDefn, data, "{}", (r) -> {
            if (data instanceof byte[]) {
                // The BinlogReader sees these JSON values as binary encoded, 
so we use the binlog client library's utility
                // to parse MySQL's internal binary representation into a JSON 
string, using the standard formatter.                if (((byte[]) data).length 
== 0) {
                    r.deliver(column.isOptional() ? null : "{}");
                }
                else {
                    try{                         
r.deliver(JsonBinary.parseAsString((byte[]) data));                     }       
             catch (IOException e) {
                        parsingErrorHandler.error("Failed to parse and read a 
JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
                        r.deliver(column.isOptional() ? null : "{}");
                    }
                }
            }
            else if (data instanceof String){                 // The 
SnapshotReader sees JSON values as UTF-8 encoded strings.                 
r.deliver(data);             }        });
    } {code}

([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])

 

So added modified JsonStringFormatter (added whitespace between key and value, 
and after comma to make it work the same as the snapshot step).


> Fixed bug when converting json to string
> 
>
> Key: FLINK-36578
> URL: https://issues.apache.org/jira/browse/FLINK-36578
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Lee SeungMin
>Priority: Major
>  Labels: pull-request-available
>
> The function to convert json to string is different in the snapshot step and 
> the binlog step. 
> This causes the following differences.
>  - original : \{"key": "value", "key1": "value1"}
>  - snapshot : \{"key": "value", "key1": "value1"}
>  - binlog : \{"key":"value","key1":"value1"} // no whitespace
>  
> code
> {code:java}
>     protected Object convertJson(Column column, Field fieldDefn, Object data) 
> {
>         return convertValue(column, fieldDefn, data, "{}", (r) -> {
>             if (data instanceof byte[]) {
>                 // The BinlogReader s

[jira] [Commented] (FLINK-36578) Fixed bug when converting json to string

2024-10-21 Thread Lee SeungMin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891706#comment-17891706
 ] 

Lee SeungMin commented on FLINK-36578:
--

PR: https://github.com/apache/flink-cdc/pull/3658

> Fixed bug when converting json to string
> 
>
> Key: FLINK-36578
> URL: https://issues.apache.org/jira/browse/FLINK-36578
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Lee SeungMin
>Priority: Major
>  Labels: pull-request-available
>
> The function to convert json to string is different in the snapshot step and 
> the binlog step. 
> This causes the following differences.
>  - original : \{"key": "value", "key1": "value1"}
>  - snapshot : \{"key": "value", "key1": "value1"}
>  - binlog : \{"key":"value","key1":"value1"} // no whitespace
>  
> code
> {code:java}
>     protected Object convertJson(Column column, Field fieldDefn, Object data) 
> {
>         return convertValue(column, fieldDefn, data, "{}", (r) -> {
>             if (data instanceof byte[]) {
>                 // The BinlogReader sees these JSON values as binary encoded, 
> so we use the binlog client library's utility
>                 // to parse MySQL's internal binary representation into a 
> JSON string, using the standard formatter.                if (((byte[]) 
> data).length == 0) {
>                     r.deliver(column.isOptional() ? null : "{}");
>                 }
>                 else {
>                     try{                         
> r.deliver(JsonBinary.parseAsString((byte[]) data));                     }     
>                catch (IOException e) {
>                         parsingErrorHandler.error("Failed to parse and read a 
> JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
>                         r.deliver(column.isOptional() ? null : "{}");
>                     }
>                 }
>             }
>             else if (data instanceof String){                 // The 
> SnapshotReader sees JSON values as UTF-8 encoded strings.                 
> r.deliver(data);             }        });
>     } {code}
> ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])
>  
> So added modified JsonStringFormatter (added whitespace between key and 
> value, and after comma to make it work the same as the snapshot step).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36578][mysql] Add table information of binlog offsets when checkpointing added modified JsonStringFormatter [flink-cdc]

2024-10-21 Thread via GitHub


SML0127 commented on PR #3658:
URL: https://github.com/apache/flink-cdc/pull/3658#issuecomment-2428093914

   @leonardBang @PatrickRen PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]

2024-10-21 Thread via GitHub


fredia commented on code in PR #25501:
URL: https://github.com/apache/flink/pull/25501#discussion_r1809809121


##
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java:
##
@@ -67,6 +67,12 @@ default boolean isRestored() {
  */
 InternalTimeServiceManager internalTimerServiceManager();
 
+/**
+ * Returns the internal timer service manager create by async state 
backend for the stream
+ * operator. This method returns null for non-keyed operators.
+ */
+InternalTimeServiceManager asyncInternalTimerServiceManager();

Review Comment:
   > Are there cases in which an operator needs to use async timer service and 
sync timer service simultaneously? 
   
   No, it is up to the developer to decide which one to use. 
   
   This is to keep it consistent with the current `asyncStateBackend` and 
`KeyedStateBackend`, we will sort out the relationship between 
`asyncStateBackend`/`keyedStateBackend`  and `asyncTimerManager`/`timerManager` 
in subsequent PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36578) Fixed bug when converting json to string

2024-10-21 Thread Lee SeungMin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee SeungMin updated FLINK-36578:
-
Description: 
The function to convert json to string is different in the snapshot step and 
the binlog step. 

This causes the following differences.
 - original : \{"key": "value", "key1": "value1"}

 - snapshot : \{"key": "value", "key1": "value1"}

 - binlog : \{"key":"value","key1":"value1"} // no whitespace

 

So added modified JsonStringFormatter to flink-cdc 
(modified code: added whitespace between key and value, and after comma to make 
it work the same as the snapshot step).

 

different logic to convert json as string in snapshot and binlog step.
{code:java}
    protected Object convertJson(Column column, Field fieldDefn, Object data) {
        return convertValue(column, fieldDefn, data, "{}", (r) -> {
            if (data instanceof byte[]) {
                // The BinlogReader sees these JSON values as binary encoded, 
so we use the binlog client library's utility
                // to parse MySQL's internal binary representation into a JSON 
string, using the standard formatter.

                if (((byte[]) data).length == 0) {
                    r.deliver(column.isOptional() ? null : "{}");
                }
                else {
                    try{                         
r.deliver(JsonBinary.parseAsString((byte[]) data));                     }       
             catch (IOException e) {
                        parsingErrorHandler.error("Failed to parse and read a 
JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
                        r.deliver(column.isOptional() ? null : "{}");
                    }
                }
            }
            else if (data instanceof String){
                 // The SnapshotReader sees JSON values as UTF-8 encoded 
strings.
                 r.deliver(data);
             }
        });
    } {code}
([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])

 

 

  was:
The function to convert json to string is different in the snapshot step and 
the binlog step. 

This causes the following differences.
 - original : \{"key": "value", "key1": "value1"}

 - snapshot : \{"key": "value", "key1": "value1"}

 - binlog : \{"key":"value","key1":"value1"} // no whitespace

 

code
{code:java}
    protected Object convertJson(Column column, Field fieldDefn, Object data) {
        return convertValue(column, fieldDefn, data, "{}", (r) -> {
            if (data instanceof byte[]) {
                // The BinlogReader sees these JSON values as binary encoded, 
so we use the binlog client library's utility
                // to parse MySQL's internal binary representation into a JSON 
string, using the standard formatter.

                if (((byte[]) data).length == 0) {
                    r.deliver(column.isOptional() ? null : "{}");
                }
                else {
                    try{                         
r.deliver(JsonBinary.parseAsString((byte[]) data));                     }       
             catch (IOException e) {
                        parsingErrorHandler.error("Failed to parse and read a 
JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
                        r.deliver(column.isOptional() ? null : "{}");
                    }
                }
            }
            else if (data instanceof String){
                 // The SnapshotReader sees JSON values as UTF-8 encoded 
strings.
                 r.deliver(data);
             }
        });
    } {code}
([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])

 

So added modified JsonStringFormatter (added whitespace between key and value, 
and after comma to make it work the same as the snapshot step).


> Fixed bug when converting json to string
> 
>
> Key: FLINK-36578
> URL: https://issues.apache.org/jira/browse/FLINK-36578
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Lee SeungMin
>Priority: Major
>  Labels: pull-request-available
>
> The function to convert json to string is different in the snapshot step and 
> the binlog step. 
> This causes the following differences.
>  - original : \{"key": "value", "key1": "value1"}
>  - snapshot : \{"key": "value", "key1": "value1"}
>  - binlog : \{"key":"value","key1":"value1"} // no whitespace
>  
> So added modified JsonStringFormatter to flink-cdc 
> (modified code: added whitespace between key and value, and after comma to 
> make it work the same a

[jira] [Commented] (FLINK-36506) Remove all deprecated methods in `ColumnStats`

2024-10-21 Thread Atul Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891724#comment-17891724
 ] 

Atul Sharma commented on FLINK-36506:
-

Hi [~xuyangzhong] can i work on this?

> Remove all deprecated methods in `ColumnStats`
> --
>
> Key: FLINK-36506
> URL: https://issues.apache.org/jira/browse/FLINK-36506
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36503) Remove deprecated method `FunctionDefinitionFactory#createFunctionDefinition`

2024-10-21 Thread Atul Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891725#comment-17891725
 ] 

Atul Sharma commented on FLINK-36503:
-

Hi [~xuyangzhong] can i work on this?

> Remove deprecated method `FunctionDefinitionFactory#createFunctionDefinition`
> -
>
> Key: FLINK-36503
> URL: https://issues.apache.org/jira/browse/FLINK-36503
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36521][runtime] Introduce TtlAwareSerializer and TtlAwareSerializerSnapshot [flink]

2024-10-21 Thread via GitHub


xiangyuf opened a new pull request, #25554:
URL: https://github.com/apache/flink/pull/25554

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] FLINK-36506: Remove all deprecated methods in `ColumnStats` [flink]

2024-10-21 Thread via GitHub


atu-sharm opened a new pull request, #2:
URL: https://github.com/apache/flink/pull/2

   
   
   ## What is the purpose of the change
   
   Removing all deprecated methods as per 
[FLINK-36506](https://issues.apache.org/jira/browse/FLINK-36506) in 
`ColumnStats`
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36506) Remove all deprecated methods in `ColumnStats`

2024-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36506:
---
Labels: pull-request-available  (was: )

> Remove all deprecated methods in `ColumnStats`
> --
>
> Key: FLINK-36506
> URL: https://issues.apache.org/jira/browse/FLINK-36506
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36521) Introduce TtlAwareSerializer to resolve the compatibility check between ttlSerializer and original serializer

2024-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36521:
---
Labels: pull-request-available  (was: )

> Introduce TtlAwareSerializer to resolve the compatibility check between 
> ttlSerializer and original serializer 
> --
>
> Key: FLINK-36521
> URL: https://issues.apache.org/jira/browse/FLINK-36521
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: xiangyu feng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36521][runtime] Introduce TtlAwareSerializer and TtlAwareSerializerSnapshot [flink]

2024-10-21 Thread via GitHub


flinkbot commented on PR #25554:
URL: https://github.com/apache/flink/pull/25554#issuecomment-2428292508

   
   ## CI report:
   
   * 98ea2df3c759b9d866b0e0327ec6a06883ff6dc9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLINK-36506: Remove all deprecated methods in `ColumnStats` [flink]

2024-10-21 Thread via GitHub


flinkbot commented on PR #2:
URL: https://github.com/apache/flink/pull/2#issuecomment-2428292787

   
   ## CI report:
   
   * 1656356a2b283dbffa1dcec3685025dd32b24cb8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in batch mode [flink]

2024-10-21 Thread via GitHub


jnh5y commented on code in PR #24699:
URL: https://github.com/apache/flink/pull/24699#discussion_r1809434479


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java:
##
@@ -567,7 +912,7 @@ void testUserDefinedFunctions() {
 "SELECT *\n"
 + "FROM MyTable\n"
 + "MATCH_RECOGNIZE (\n"
-+ "  ORDER BY proctime\n"
++ "  ORDER BY ts\n"

Review Comment:
   As a question, do we need to change all of the tests from using PROCTIME() 
to ts?
   
   E.g., can the existing behavior remain?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34825) [flink-cdc-pipeline-connectors] Add Implementation of DataSource in MongoDB

2024-10-21 Thread JunboWang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891714#comment-17891714
 ] 

JunboWang commented on FLINK-34825:
---

Interested in this. Please assign it to me. Thanks!

> [flink-cdc-pipeline-connectors] Add Implementation of DataSource in MongoDB
> ---
>
> Key: FLINK-34825
> URL: https://issues.apache.org/jira/browse/FLINK-34825
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Flink CDC Issue Import
>Priority: Major
>  Labels: github-import
>
> ### Search before asking
> - [X] I searched in the 
> [issues|https://github.com/ververica/flink-cdc-connectors/issues] and found 
> nothing similar.
> ### Motivation
> After https://github.com/ververica/flink-cdc-connectors/pull/2638 merged, we 
> can try to add implementation in MongoDB to simulate enterprise scenarios.
> You may need to wait for 
> https://github.com/ververica/flink-cdc-connectors/issues/2642 
> https://github.com/ververica/flink-cdc-connectors/issues/2644 completed. 
> ### Solution
> _No response_
> ### Alternatives
> _No response_
> ### Anything else?
> _No response_
> ### Are you willing to submit a PR?
> - [ ] I'm willing to submit a PR!
>  Imported from GitHub 
> Url: https://github.com/apache/flink-cdc/issues/2648
> Created by: [lvyanquan|https://github.com/lvyanquan]
> Labels: enhancement, task, 【3.0】, 
> Assignee: [Jiabao-Sun|https://github.com/Jiabao-Sun]
> Created at: Tue Nov 07 11:18:18 CST 2023
> State: open



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] FLINK-36506: Remove all deprecated methods in `ColumnStats` [flink]

2024-10-21 Thread via GitHub


atu-sharm commented on PR #2:
URL: https://github.com/apache/flink/pull/2#issuecomment-2428292821

   Hi @xuyangzhong can you please review this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]

2024-10-21 Thread via GitHub


yunfengzhou-hub commented on PR #25501:
URL: https://github.com/apache/flink/pull/25501#issuecomment-2428319498

   Thanks for the update. The runtime part LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-36569) flink kafka connector do not close kafka produer when it checkpoint success

2024-10-21 Thread Jake.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891735#comment-17891735
 ] 

Jake.zhang edited comment on FLINK-36569 at 10/22/24 6:34 AM:
--

add schedule close service in 
`org.apache.flink.connector.kafka.sink.KafkaWriter` constructor,  reserve 
recently 5 checkpoint producer.  it works.

need to wait for the Kafka transaction timeout.
{code:java}
initFlinkMetrics();
// 定时任务检查是否有需要关闭的 checkpoint id
this.autoCloseService = Executors.newSingleThreadScheduledExecutor();
this.autoCloseService.scheduleWithFixedDelay(
() -> {
try {
LOG.info("last checkpoint id: {}", lastCheckpointId);
// 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1 
作为最大的 transaction
FlinkKafkaInternalProducer flinkKafkaInternalProducer = 
null;
while ((flinkKafkaInternalProducer =
(FlinkKafkaInternalProducer) 
this.producerCloseables.peek())
!= null) {
String transactionId = 
flinkKafkaInternalProducer.getTransactionalId();
assert transactionId != null;
String[] transactionIdArr = transactionId.split("-");
long itemCheckpointId =

Long.parseLong(transactionIdArr[transactionIdArr.length - 1]);
if (lastCheckpointId - 5 > itemCheckpointId) {
// 消费出来置空
try {
FlinkKafkaInternalProducer closeable =
(FlinkKafkaInternalProducer)
this.producerCloseables.poll();
closeable.close();
LOG.info(
"close producer transaction id: {}",
closeable.getTransactionalId());
} catch (Exception e) {
LOG.warn("fkip close error", e);
}
} else {
// 等待下次检查
break;
}
}
} catch (Exception e) {
LOG.warn("schedule auto close producer error", e);
}
},
60,
60,
TimeUnit.SECONDS);
} {code}


was (Author: ft20082):
add schedule close service in 
`org.apache.flink.connector.kafka.sink.KafkaWriter`,  reserve recently 5 
checkpoint producer.  it works.

need to wait for the Kafka transaction timeout.
{code:java}
initFlinkMetrics();
// 定时任务检查是否有需要关闭的 checkpoint id
this.autoCloseService = Executors.newSingleThreadScheduledExecutor();
this.autoCloseService.scheduleWithFixedDelay(
() -> {
try {
LOG.info("last checkpoint id: {}", lastCheckpointId);
// 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1 
作为最大的 transaction
FlinkKafkaInternalProducer flinkKafkaInternalProducer = 
null;
while ((flinkKafkaInternalProducer =
(FlinkKafkaInternalProducer) 
this.producerCloseables.peek())
!= null) {
String transactionId = 
flinkKafkaInternalProducer.getTransactionalId();
assert transactionId != null;
String[] transactionIdArr = transactionId.split("-");
long itemCheckpointId =

Long.parseLong(transactionIdArr[transactionIdArr.length - 1]);
if (lastCheckpointId - 5 > itemCheckpointId) {
// 消费出来置空
try {
FlinkKafkaInternalProducer closeable =
(FlinkKafkaInternalProducer)
this.producerCloseables.poll();
closeable.close();
LOG.info(
"close producer transaction id: {}",
closeable.getTransactionalId());
} catch (Exception e) {
LOG.warn("fkip close error", e);
}
} else {
// 等待下次检查
break;
}
}
} catch (Exception e) {
LOG.warn("schedule auto close producer error", e);
}
  

[jira] [Commented] (FLINK-36569) flink kafka connector do not close kafka produer when it checkpoint success

2024-10-21 Thread Jake.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891735#comment-17891735
 ] 

Jake.zhang commented on FLINK-36569:


add schedule close service in 
`org.apache.flink.connector.kafka.sink.KafkaWriter`,  reserve recently 5 
checkpoint producer.  it works.

need to wait for the Kafka transaction timeout.
{code:java}
initFlinkMetrics();
// 定时任务检查是否有需要关闭的 checkpoint id
this.autoCloseService = Executors.newSingleThreadScheduledExecutor();
this.autoCloseService.scheduleWithFixedDelay(
() -> {
try {
LOG.info("last checkpoint id: {}", lastCheckpointId);
// 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1 
作为最大的 transaction
FlinkKafkaInternalProducer flinkKafkaInternalProducer = 
null;
while ((flinkKafkaInternalProducer =
(FlinkKafkaInternalProducer) 
this.producerCloseables.peek())
!= null) {
String transactionId = 
flinkKafkaInternalProducer.getTransactionalId();
assert transactionId != null;
String[] transactionIdArr = transactionId.split("-");
long itemCheckpointId =

Long.parseLong(transactionIdArr[transactionIdArr.length - 1]);
if (lastCheckpointId - 5 > itemCheckpointId) {
// 消费出来置空
try {
FlinkKafkaInternalProducer closeable =
(FlinkKafkaInternalProducer)
this.producerCloseables.poll();
closeable.close();
LOG.info(
"close producer transaction id: {}",
closeable.getTransactionalId());
} catch (Exception e) {
LOG.warn("fkip close error", e);
}
} else {
// 等待下次检查
break;
}
}
} catch (Exception e) {
LOG.warn("schedule auto close producer error", e);
}
},
60,
60,
TimeUnit.SECONDS);
} {code}

> flink kafka connector do not close kafka produer when it checkpoint success
> ---
>
> Key: FLINK-36569
> URL: https://issues.apache.org/jira/browse/FLINK-36569
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.19.0, 1.20.0
> Environment: flink: 1.20
> flink kafka connector: 3.3.0-1.20 
>Reporter: Jake.zhang
>Priority: Major
> Attachments: image-2024-10-18-13-31-39-253.png, 
> image-2024-10-21-14-02-41-823.png
>
>
> flink kafka connector do't close FlinkKafkaInternalProducer when flink 
> checkpoint success in flink 1.20/1.19 .  it will create one  
> FlinkKafkaInternalProducer per checkpoint.
>  
> FlinkKafkaInternalProducer do not close automatic. so  kafka producer network 
> thread will more and more 
>  it create `getRecoveryProducer` each time,   `recyclable` object always 
> null, so  `recyclable.ifPresent(Recyclable::close)` not work.
> `org.apache.flink.connector.kafka.sink.KafkaCommitter`
> {code:java}
> producer =
> recyclable
> .>map(Recyclable::getObject)
> .orElseGet(() -> getRecoveryProducer(committable));
> producer.commitTransaction();
> producer.flush();
> recyclable.ifPresent(Recyclable::close);{code}
>  
> !image-2024-10-21-14-02-41-823.png!
>  
> !image-2024-10-18-13-31-39-253.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)