Re: [PR] [FLINK-36194][Runtime] Handle the Graceful close of ExecutionGraphInfoStore from ClusterEntrypoint [flink]
flinkbot commented on PR #25553: URL: https://github.com/apache/flink/pull/25553#issuecomment-2426917163 ## CI report: * af6861f7c46e6033e5b9afea5f5fce703be10c4c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36194][Runtime] Handle the Graceful close of ExecutionGraphInfoStore from ClusterEntrypoint [flink]
eaugene opened a new pull request, #25553: URL: https://github.com/apache/flink/pull/25553 ## What is the purpose of the change This fixes the race-condition of writing to a closed file in the cluster shutdown is triggered ## Brief change log - removing the shutdown hook from `ExecutionGraphInfoStore` and waiting for the `ClusterEntrypoint` ( which holds an instance of `ExecutionGraphInfoStore` ) to close it ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36194) Shutdown hook for ExecutionGraphInfo store runs concurrently to cluster shutdown hook causing race conditions
[ https://issues.apache.org/jira/browse/FLINK-36194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36194: --- Labels: pull-request-available starter (was: starter) > Shutdown hook for ExecutionGraphInfo store runs concurrently to cluster > shutdown hook causing race conditions > - > > Key: FLINK-36194 > URL: https://issues.apache.org/jira/browse/FLINK-36194 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Coordination >Affects Versions: 2.0.0, 1.20.0, 1.19.1 >Reporter: Matthias Pohl >Assignee: Eaugene Thomas >Priority: Minor > Labels: pull-request-available, starter > > There is an {{FileNotFoundException}} being logged when shutting down the > cluster with currently running jobs: > {code} > /tmp/executionGraphStore-b2cb1190-2c4d-4021-a73d-8b15027860df/8f6abf294a46345d331590890f7e7c37 > (No such file or directory) > java.io.FileNotFoundException: > /tmp/executionGraphStore-b2cb1190-2c4d-4021-a73d-8b15027860df/8f6abf294a46345d331590890f7e7c37 > (No such file or directory) > at java.base/java.io.FileOutputStream.open0(Native Method) > at java.base/java.io.FileOutputStream.open(FileOutputStream.java:298) > at java.base/java.io.FileOutputStream.(FileOutputStream.java:237) > at java.base/java.io.FileOutputStream.(FileOutputStream.java:187) > at > org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore.storeExecutionGraphInfo(FileExecutionGraphInfoStore.java:281) > at > org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore.put(FileExecutionGraphInfoStore.java:203) > at > org.apache.flink.runtime.dispatcher.Dispatcher.writeToExecutionGraphInfoStore(Dispatcher.java:1427) > at > org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedTerminalState(Dispatcher.java:1357) > at > org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:750) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$6(Dispatcher.java:700) > at > java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) > at > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) > at > java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > [...] > {code} > This is caused by concurrent shutdown logic being triggered through the > {{FileExecutionGraphInfoStore}} shutdown hook. The shutdown hook calls close > on the store which will delete its temporary directory. > The concurrently performed cluster shutdown will try to suspend all running > jobs. The JobManagerRunners are trying to write their {{ExecutionGraphInfo}} > to the store which fails (because the temporary folder is deleted). > This doesn't have any impact because the JobManager goes away, anyway. But > the log message is confusing the the shutdown hook is (IMHO) not needed. > Instead, the {{ExecutionGraphInfoStore}}'s close logic should be called by > the {{ClusterEntrypoint}} shutdown gracefully. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35136] Bump connector version to 4.0, adapt CI workflows [flink-connector-hbase]
JuliaBogdan commented on PR #46: URL: https://github.com/apache/flink-connector-hbase/pull/46#issuecomment-2427101306 > > Hi, @ferenc-csaky. I'm interested in the connector for Flink > 1.18 and glad to see this PR, thank you for the work! I wonder if there is there anything that is blocking the release? > > Hi @JuliaBogdan, unfortunately yes, it is not trivial to release the connector as is against 1.18+, because some Hadoop elements are not playing well with JDK17+, so this will be more work than I originally anticipated. I will initiate a discussion to releasethe HBase connector for Flink 1.18 and 1.19 for only JDK8 and JDK11 to at least start to bridge the current gap. > > Anything that comes afterwards probably we will require to bump all Hadoop-related deps in this repo. I see, thanks for the quick response! Having the connector released for JDK11 would be super useful for our use case too. Looking forward to an outcome of the discussion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36065][runtime] Support submit stream graph. [flink]
zhuzhurk commented on code in PR #25472: URL: https://github.com/apache/flink/pull/25472#discussion_r1808402242 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java: ## @@ -80,6 +83,14 @@ public SchedulerNG createInstance( final Collection failureEnrichers, final BlocklistOperations blocklistOperations) throws Exception { +JobGraph jobGraph; + +if (executionPlan instanceof JobGraph) { +jobGraph = (JobGraph) executionPlan; +} else { +checkState(executionPlan instanceof StreamGraph, "Unsupported execution plan."); Review Comment: maybe ``` } else if (executionPlan instanceof StreamGraph) { ... } else { throw FlinkException("Unsupported execution plan " + executionPlan.getClass().getCanonicalName()); } ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java: ## @@ -1066,6 +1067,14 @@ public CompletableFuture submitJob(ExecutionPlan executionP // When MiniCluster uses the local RPC, the provided ExecutionPlan is passed directly to the // Dispatcher. This means that any mutations to the JG can affect the Dispatcher behaviour, // so we rather clone it to guard against this. Review Comment: Comments above are for the original line. ## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ## @@ -169,10 +225,15 @@ public ExecutionConfig getExecutionConfig() { return executionConfig; } +@Override public Configuration getJobConfiguration() { return jobConfiguration; } +public void setJobConfiguration(Configuration configuration) { Review Comment: Is this method required? If not, we can keep `jobConfiguration` final. ## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ## @@ -1081,4 +1269,345 @@ public void setAttribute(Integer vertexId, Attribute attribute) { getStreamNode(vertexId).setAttribute(attribute); } } + +public void setJobId(JobID jobId) { +this.jobId = jobId; +} + +@Override +public JobID getJobID() { +return jobId; +} + +/** + * Sets the classpath required to run the job on a task manager. + * + * @param paths paths of the directories/JAR files required to run the job on a task manager + */ +public void setClasspath(List paths) { +classpath = paths; +} + +public List getClasspath() { +return classpath; +} + +/** + * Adds the given jar files to the {@link JobGraph} via {@link JobGraph#addJar}. + * + * @param jarFilesToAttach a list of the {@link URL URLs} of the jar files to attach to the + * jobgraph. + * @throws RuntimeException if a jar URL is not valid. + */ +public void addJars(final List jarFilesToAttach) { +for (URL jar : jarFilesToAttach) { +try { +addJar(new Path(jar.toURI())); +} catch (URISyntaxException e) { +throw new RuntimeException("URL is invalid. This should not happen.", e); +} +} +} + +/** + * Returns a list of BLOB keys referring to the JAR files required to run this job. + * + * @return list of BLOB keys referring to the JAR files required to run this job + */ +@Override +public List getUserJarBlobKeys() { +return this.userJarBlobKeys; +} + +@Override +public List getClasspaths() { +return classpath; +} + +public void addUserArtifact(String name, DistributedCache.DistributedCacheEntry file) { +if (file == null) { +throw new IllegalArgumentException(); +} + +userArtifacts.putIfAbsent(name, file); +} + +@Override +public Map getUserArtifacts() { +return userArtifacts; +} + +@Override +public void addUserJarBlobKey(PermanentBlobKey key) { +if (key == null) { +throw new IllegalArgumentException(); +} + +if (!userJarBlobKeys.contains(key)) { +userJarBlobKeys.add(key); +} +} + +@Override +public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) +throws IOException { +byte[] serializedBlobKey; +serializedBlobKey = InstantiationUtil.serializeObject(blobKey); + +userArtifacts.computeIfPresent( +entryName, +(key, originalEntry) -> +new DistributedCache.DistributedCacheEntry( +originalEntry.filePath, +originalEntry.isExecutable, +serializedBlobKey, +originalEntry.isZipped)); +} + +@Override +public
Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 (backport to release-1.20) [flink]
ferenc-csaky commented on code in PR #25550: URL: https://github.com/apache/flink/pull/25550#discussion_r1808220796 ## flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE: ## @@ -6,17 +6,17 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- com.hierynomus:asn-one:0.5.0 +- com.hierynomus:asn-one:0.6.0 - com.typesafe:config:1.4.2 - com.typesafe:ssl-config-core_2.12:0.6.1 -- io.netty:netty:3.10.6.Final -- org.agrona:agrona:1.15.1 -- org.apache.pekko:pekko-actor_2.12:1.0.1 -- org.apache.pekko:pekko-remote_2.12:1.0.1 -- org.apache.pekko:pekko-pki_2.12:1.0.1 -- org.apache.pekko:pekko-protobuf-v3_2.12:1.0.1 -- org.apache.pekko:pekko-slf4j_2.12:1.0.1 -- org.apache.pekko:pekko-stream_2.12:1.0.1 +- io.netty:netty-all:4.1.100.Final Review Comment: This should be changed to `4.1.91.Final` according to the [root POM](https://github.com/apache/flink/blob/60cba350d7638592ea771dc7cf512798e6248886/pom.xml#L369C14-L369C26). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 (backport to release-1.20) [flink]
ferenc-csaky commented on PR #25550: URL: https://github.com/apache/flink/pull/25550#issuecomment-2425809842 1.20 backport of https://github.com/apache/flink/pull/25494, will merge them together. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][tests] Separate Pipeline and Source E2e tests [flink-cdc]
yuxiqian closed pull request #3427: [hotfix][tests] Separate Pipeline and Source E2e tests URL: https://github.com/apache/flink-cdc/pull/3427 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][tests] Separate Pipeline and Source E2e tests [flink-cdc]
yuxiqian commented on PR #3427: URL: https://github.com/apache/flink-cdc/pull/3427#issuecomment-2425814837 Fixed in #3514 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 (backport to release-1.20) [flink]
gracegrimwood commented on code in PR #25550: URL: https://github.com/apache/flink/pull/25550#discussion_r1808227768 ## flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE: ## @@ -6,17 +6,17 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- com.hierynomus:asn-one:0.5.0 +- com.hierynomus:asn-one:0.6.0 - com.typesafe:config:1.4.2 - com.typesafe:ssl-config-core_2.12:0.6.1 -- io.netty:netty:3.10.6.Final -- org.agrona:agrona:1.15.1 -- org.apache.pekko:pekko-actor_2.12:1.0.1 -- org.apache.pekko:pekko-remote_2.12:1.0.1 -- org.apache.pekko:pekko-pki_2.12:1.0.1 -- org.apache.pekko:pekko-protobuf-v3_2.12:1.0.1 -- org.apache.pekko:pekko-slf4j_2.12:1.0.1 -- org.apache.pekko:pekko-stream_2.12:1.0.1 +- io.netty:netty-all:4.1.100.Final Review Comment: Nice catch! Changed this now :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36576][runtime] Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider [flink]
flinkbot commented on PR #25552: URL: https://github.com/apache/flink/pull/25552#issuecomment-2425962755 ## CI report: * 5765848fe5d10d5ea875a723f1683645c9efd9bd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36575][runtime] ExecutionVertexInputInfo supports consuming subpartition groups [flink]
noorall opened a new pull request, #25551: URL: https://github.com/apache/flink/pull/25551 ## What is the purpose of the change Currently, the ExecutionVertexInputInfo describes the task's input through a combination of a PartitionRange and a SubpartitionRange. However, in the case of skewed join optimization, we need to split a group of data corresponding to the specific key, which may result in a downstream task subscribing to multiple combinations of PartitionRanges and SubpartitionRanges. Therefore, we need to modify the ExecutionVertexInputInfo to describe the input data as multiple combinations of PartitionRanges and SubpartitionRanges to meet the requirements of the aforementioned scenario and improve Flink's flexibility in describing the task's inputs. ## Brief change log - Modify the description of the input in ExecutionVertexInputInfo. - Modify the connect function for edges. - Modify the InputGate creation logic of the network layer to adapt to this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36575) ExecutionVertexInputInfo supports consuming subpartition groups
[ https://issues.apache.org/jira/browse/FLINK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36575: --- Labels: pull-request-available (was: ) > ExecutionVertexInputInfo supports consuming subpartition groups > --- > > Key: FLINK-36575 > URL: https://issues.apache.org/jira/browse/FLINK-36575 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Lei Yang >Priority: Major > Labels: pull-request-available > > Currently, the ExecutionVertexInputInfo describes the task's input through a > combination of a PartitionRange and a SubpartitionRange. However, in the case > of skewed join optimization, we need to split a group of data corresponding > to the specific key, which may result in a downstream task subscribing to > multiple combinations of PartitionRanges and SubpartitionRanges. > Therefore, we need to modify the ExecutionVertexInputInfo to describe the > input data as multiple combinations of PartitionRanges and SubpartitionRanges > to meet the requirements of the aforementioned scenario and improve Flink's > flexibility in describing the task's inputs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36575][runtime] ExecutionVertexInputInfo supports consuming subpartition groups [flink]
flinkbot commented on PR #25551: URL: https://github.com/apache/flink/pull/25551#issuecomment-2425912906 ## CI report: * b99b368660d7e45b2e5e8265e05c778bf4b7c372 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36052][docs][elasticsearch] Add flink cdc elasticsearch pipeline sink to docs [flink-cdc]
beryllw commented on PR #3649: URL: https://github.com/apache/flink-cdc/pull/3649#issuecomment-2425946273 @leonardBang Vitess dead link causes CI to fail. Please help to re-trigger CI.Thanks a lot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36576][runtime] Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider [flink]
noorall opened a new pull request, #25552: URL: https://github.com/apache/flink/pull/25552 ## What is the purpose of the change Currently, the DefaultVertexParallelismAndInputInfosDecider is able to implement a balanced distribution algorithm based on the amount of data and the number of subpartitions, however it also has some limitations: 1. Currently, Decider selects the data distribution algorithm via the AllToAll or Pointwise attribute of the input, which limits the ability of the operator to dynamically modify the data distribution algorithm. 2. Doesn't support data volume-based balanced distribution for Pointwise inputs. 3. For AllToAll type inputs, it does not support splitting the data corresponding to the specific key, i.e., it cannot solve the data skewing caused by single-key hotspot. For that we plan to introduce the following improvements: 1. Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the input characterisation which allows the operator to flexibly choose the data balanced distribution algorithm. 2. Introducing a data volume-based data balanced distribution algorithm for Pointwise inputs 3. Introducing the ability to split data corresponding to the specific key to optimise AllToAll's data volume-based data balancing distribution algorithm. ## Brief change log - Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation. - Introducing amount-based data balanced distribution algorithm for Pointwise. - Introducing the ability to split data corresponding to the specific key for AllToAll ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36576) Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider
[ https://issues.apache.org/jira/browse/FLINK-36576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36576: --- Labels: pull-request-available (was: ) > Improving amount-based data balancing distribution algorithm for > DefaultVertexParallelismAndInputInfosDecider > - > > Key: FLINK-36576 > URL: https://issues.apache.org/jira/browse/FLINK-36576 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Lei Yang >Priority: Major > Labels: pull-request-available > > Currently, the DefaultVertexParallelismAndInputInfosDecider is able to > implement a balanced distribution algorithm based on the amount of data and > the number of subpartitions, however it also has some limitations: > # Currently, Decider selects the data distribution algorithm via the > AllToAll or Pointwise attribute of the input, which limits the ability of the > operator to dynamically modify the data distribution algorithm. > # Doesn't support data volume-based balanced distribution for Pointwise > inputs. > # For AllToAll type inputs, it does not support splitting the data > corresponding to the specific key, i.e., it cannot solve the data skewing > caused by single-key hotspot. > For that we plan to introduce the following improvements: > # Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the > input characterisation which allows the operator to flexibly choose the data > balanced distribution algorithm. > # Introducing a data volume-based data balanced distribution algorithm for > Pointwise inputs > # Introducing the ability to split data corresponding to the specific key to > optimise AllToAll's data volume-based data balancing distribution algorithm. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36146][connector] Fix SingleThreadFetcherManager race condition [flink]
kimgr commented on PR #25340: URL: https://github.com/apache/flink/pull/25340#issuecomment-2427619439 @becketqin That test fails consistently (before and after fix) with `java.lang.IllegalStateException: The split fetcher manager has closed.` due to state validation in `SplitFetcherManager.createSplitFetcher`. I'll see if I can find another sequence to provoke the error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36332] Missed webhook okhttp reference. [flink-kubernetes-operator]
SamBarker opened a new pull request, #906: URL: https://github.com/apache/flink-kubernetes-operator/pull/906 ## What is the purpose of the change *(For example: This pull request adds a new feature to periodically create and maintain savepoints through the `FlinkDeployment` custom resource.)* ## Brief change log *(for example:)* - *Periodic savepoint trigger is introduced to the custom resource* - *The operator checks on reconciliation whether the required time has passed* - *The JobManager's dispose savepoint API is used to clean up obsolete savepoints* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (yes / no) - Core observer or reconciler logic that is regularly executed: (yes / no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36573][cdc-connector][mysql] Add table information of binlog offsets when checkpointing [flink-cdc]
lvyanquan commented on code in PR #3656: URL: https://github.com/apache/flink-cdc/pull/3656#discussion_r1808363691 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java: ## @@ -108,6 +108,16 @@ public boolean isCompletedSplit() { return totalFinishedSplitSize == finishedSnapshotSplitInfos.size(); } +public String getTables() { +String tables; +if (tableSchemas != null) { +tables = tableSchemas.keySet().toString(); Review Comment: If we iterate every time, it will have an impact on performance. Can we assume that tableschemas will not change and we only need to build the tables variable once. ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java: ## @@ -530,7 +530,11 @@ private void logCurrentBinlogOffsets(List splits, long checkpointId) return; } BinlogOffset offset = split.asBinlogSplit().getStartingOffset(); -LOG.info("Binlog offset on checkpoint {}: {}", checkpointId, offset); +LOG.info( +"Binlog offset for tables {} on checkpoint {}: {}", +split.asBinlogSplit().getTables(), Review Comment: If there are a large number of tables, it will result in logs being quite long. Would you consider truncating them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36568][rest] Fix incorrect http url for ClientCoordinationHeaders. [flink]
zhuzhurk merged PR #25542: URL: https://github.com/apache/flink/pull/25542 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-36568) IllegalStateException Causes Collect Sink to Fail to Collect Result
[ https://issues.apache.org/jira/browse/FLINK-36568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-36568. --- Fix Version/s: 2.0.0 Resolution: Fixed 2c830898640fe65016763c747c05110bd79b8894 > IllegalStateException Causes Collect Sink to Fail to Collect Result > --- > > Key: FLINK-36568 > URL: https://issues.apache.org/jira/browse/FLINK-36568 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > When Flink jobs use the collect sink, the JobManager logs the following > exception: > > {code:java} > ERROR > org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler > [] - Unhandled exception. > java.lang.IllegalStateException: No parameter could be found for the given > class. {code} > The root cause is that the HTTP URL for the ClientCoordinationHandler is > incorrect. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [tests] Add migration tests from CDC 3.2.0 [flink-cdc]
yuxiqian opened a new pull request, #3657: URL: https://github.com/apache/flink-cdc/pull/3657 Flink CDC has been released and we're preparing for the following 3.3 release. Adding it to migration testing matrix should help us ensuring state backwards compatibility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered
[ https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vincent Woo updated FLINK-34636: Affects Version/s: 1.13.2 > Requesting exclusive buffers timeout causes repeated restarts and cannot be > automatically recovered > --- > > Key: FLINK-34636 > URL: https://issues.apache.org/jira/browse/FLINK-34636 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.13.2 >Reporter: Vincent Woo >Priority: Major > Attachments: image-20240308100308649.png, > image-20240308101008765.png, image-20240308101407396.png, > image-20240308101934756.png > > > Based on the observation of logs and metrics, it was found that a subtask > deployed on a same TM consistently reported an exception of requesting > exclusive buffers timeout. It was discovered that during the restart process, > 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I > suspect that the network buffer memory was not properly released during the > restart process, which caused the newly deployed task to fail to obtain the > network buffer. This problem persisted despite repeated restarts, and the > application failed to recover automatically. > (I'm not sure if there are other reasons for this issue) > Attached below are screenshots of the exception stack and relevant metrics: > {code:java} > 2024-03-08 09:58:18,738 WARN org.apache.flink.runtime.taskmanager.Task > [] - GroupWindowAggregate switched from DEPLOYING to FAILED with > failure cause: java.io.IOException: Timeout triggered when requesting > exclusive buffers: The total number of network buffers is currently set to > 32768 of 32768 bytes each. You can increase this number by setting the > configuration keys 'taskmanager.memory.network.fraction', > 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or > you may increase the timeout which is 3ms by setting the key > 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427) > > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257) > > at > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84) > > at > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:748) {code} > !image-20240308101407396.png|width=866,height=171! > Network metric:Only this TM is always 100%, without any variation. > !image-20240308100308649.png|width=868,height=338! > The status of the task deployed to this TM cannot be RUNNING and the status > change is slow > !image-20240308101008765.png|width=869,height=118! > Although the root exception thrown by the application is > PartitionNotFoundException, the actual underlying root cause exception log > found is IOException: Timeout triggered when requesting exclusive buffers > !image-20240308101934756.png|width=869,height=394! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36542] Enable upToDateChecking to speed up the spotless [flink]
ruanwenjun commented on PR #25525: URL: https://github.com/apache/flink/pull/25525#issuecomment-2426100238 @ferenc-csaky Thanks for your review, as far as I know, the index file will be generated under root/target, this will not be put into binary package, so will not break the RAT license check, so we don't need to any extra config in CI. BTW, disable it by default and enabled by using maven args is OK for me. ``` mvn spotless:apply -Dspotless.usingIndex=true ``` It's not a burden to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36517][cdc-connect][paimon] use filterAndCommit API for Avoid commit the same datafile duplicate [flink-cdc]
lvyanquan commented on code in PR #3639: URL: https://github.com/apache/flink-cdc/pull/3639#discussion_r1808408534 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java: ## @@ -454,6 +454,94 @@ public void testSinkWithMultiTables(String metastore) Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", "1")), result); } +@ParameterizedTest +@ValueSource(strings = {"filesystem", "hive"}) +public void testDuplicateCommitAfterRestore(String metastore) +throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, +Catalog.DatabaseNotExistException, SchemaEvolveException { +initialize(metastore); +PaimonSink paimonSink = +new PaimonSink<>( +catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); +PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); +Committer committer = paimonSink.createCommitter(); + +// insert +for (Event event : createTestEvents()) { +writer.write(event, null); +} +writer.flush(false); +Collection> commitRequests = +writer.prepareCommit().stream() +.map(MockCommitRequestImpl::new) +.collect(Collectors.toList()); +committer.commit(commitRequests); + +// We add a loop for restore 3 times +for (int i = 0; i < 3; i++) { Review Comment: Suggest that the repetition count be greater than 5 to trigger some compaction. Refer to [num-sorted-run.compaction-trigger](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions) in doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35387) PG CDC source support heart beat
[ https://issues.apache.org/jira/browse/FLINK-35387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-35387: -- Assignee: Hongshun Wang > PG CDC source support heart beat > > > Key: FLINK-35387 > URL: https://issues.apache.org/jira/browse/FLINK-35387 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Major > Fix For: cdc-3.3.0 > > > Though, document of PG CDC [1] has heartbeat.interval.ms, but it's not valid. > The reason is bellow. > In debezium dos says: For the connector to detect and process events from a > heartbeat table, you must add the table to the PostgreSQL publication > specified by the > [publication.name|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-publication-name] > property. If this publication predates your Debezium deployment, the > connector uses the publications as defined. If the publication is not already > configured to automatically replicate changes {{FOR ALL TABLES}} in the > database, you must explicitly add the heartbeat table to the publication[2]. > Thus, if you want use heart beat in cdc: > 1. add a heartbeat table to publication: ALTER PUBLICATION > __ ADD TABLE {_}{_}; > 2. set heartbeatInterval > 3. add > debezium.[{{heartbeat.action.query}}|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query] > [3] > > However, when I use it it CDC, some exception occurs: > {code:java} > Caused by: java.lang.NullPointerException > at > io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55) > at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:127) > at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:94){code} > !https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5292b7c63c883d1620bbf7d3875a3a5b158e70b814913bc360a414d3de9277d871abf3af1cbd75249eddaaa1b37c2b2f5421a918fb1a2f0f3853c0ce41721e620699d98626fa2281948c58faa63edf8ebfc653b69905bac42?tmpCode=9193555a-7bf3-4335-9427-b59c1dfe1931! > > It seems CDC don't add a HeartbeatConnectionProvider when configure > PostgresEventDispatcher: > {code:java} > //org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configure > this.postgresDispatcher = > new PostgresEventDispatcher<>( > dbzConfig, > topicSelector, > schema, > queue, > dbzConfig.getTableFilters().dataCollectionFilter(), > DataChangeEvent::new, > metadataProvider, > schemaNameAdjuster); {code} > in debezium, when PostgresConnectorTask start, it will do it > {code:java} > //io.debezium.connector.postgresql.PostgresConnectorTask#start > final PostgresEventDispatcher dispatcher = new > PostgresEventDispatcher<>( > connectorConfig, > topicNamingStrategy, > schema, > queue, > connectorConfig.getTableFilters().dataCollectionFilter(), > DataChangeEvent::new, > PostgresChangeRecordEmitter::updateSchema, > metadataProvider, > connectorConfig.createHeartbeat( > topicNamingStrategy, > schemaNameAdjuster, > () -> new > PostgresConnection(connectorConfig.getJdbcConfig(), > PostgresConnection.CONNECTION_GENERAL), > exception -> { > String sqlErrorId = exception.getSQLState(); > switch (sqlErrorId) { > case "57P01": > // Postgres error admin_shutdown, see > https://www.postgresql.org/docs/12/errcodes-appendix.html > throw new DebeziumException("Could > not execute heartbeat action query (Error: " + sqlErrorId + ")", exception); > case "57P03": > // Postgres error cannot_connect_now, > see https://www.postgresql.org/docs/12/errcodes-appendix.html > throw new RetriableException("Could > not execute heartbeat action query (Error: " + sqlErrorId + ")", exception); > default: > break; > } > }), > schem
Re: [PR] [FLINK-36285] Pass default value expression in AlterColumnTypeEvent [flink-cdc]
yuxiqian closed pull request #3614: [FLINK-36285] Pass default value expression in AlterColumnTypeEvent URL: https://github.com/apache/flink-cdc/pull/3614 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36285] Pass default value expression in AlterColumnTypeEvent [flink-cdc]
yuxiqian commented on PR #3614: URL: https://github.com/apache/flink-cdc/pull/3614#issuecomment-2425837181 Fixed in https://github.com/apache/doris-flink-connector/pull/490 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [W.I.P] Run tests [flink]
JunRuiLee closed pull request #25431: [W.I.P] Run tests URL: https://github.com/apache/flink/pull/25431 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36575) ExecutionVertexInputInfo supports consuming subpartition groups
Lei Yang created FLINK-36575: Summary: ExecutionVertexInputInfo supports consuming subpartition groups Key: FLINK-36575 URL: https://issues.apache.org/jira/browse/FLINK-36575 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Lei Yang Currently, the ExecutionVertexInputInfo describes the task's input through a combination of a PartitionRange and a SubpartitionRange. However, in the case of skewed join optimization, we need to split a group of data corresponding to the specific key, which may result in a downstream task subscribing to multiple combinations of PartitionRanges and SubpartitionRanges. Therefore, we need to modify the ExecutionVertexInputInfo to describe the input data as multiple combinations of PartitionRanges and SubpartitionRanges to meet the requirements of the aforementioned scenario and improve Flink's flexibility in describing the task's inputs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36576) Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider
Lei Yang created FLINK-36576: Summary: Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider Key: FLINK-36576 URL: https://issues.apache.org/jira/browse/FLINK-36576 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Lei Yang Currently, the DefaultVertexParallelismAndInputInfosDecider is able to implement a balanced distribution algorithm based on the amount of data and the number of subpartitions, however it also has some limitations: # Currently, Decider selects the data distribution algorithm via the AllToAll or Pointwise attribute of the input, which limits the ability of the operator to dynamically modify the data distribution algorithm. # Doesn't support data volume-based balanced distribution for Pointwise inputs. # For AllToAll type inputs, it does not support splitting the data corresponding to the specific key, i.e., it cannot solve the data skewing caused by single-key hotspot. For that we plan to introduce the following improvements: # Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the input characterisation which allows the operator to flexibly choose the data balanced distribution algorithm. # Introducing a data volume-based data balanced distribution algorithm for Pointwise inputs # Introducing the ability to split data corresponding to the specific key to optimise AllToAll's data volume-based data balancing distribution algorithm. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36576) Improving amount-based data balancing distribution algorithm for DefaultVertexParallelismAndInputInfosDecider
[ https://issues.apache.org/jira/browse/FLINK-36576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lei Yang updated FLINK-36576: - Description: Currently, the DefaultVertexParallelismAndInputInfosDecider is able to implement a balanced distribution algorithm based on the amount of data and the number of subpartitions, however it also has some limitations: # Currently, Decider selects the data distribution algorithm via the AllToAll or Pointwise attribute of the input, which limits the ability of the operator to dynamically modify the data distribution algorithm. # Doesn't support data volume-based balanced distribution for Pointwise inputs. # For AllToAll type inputs, it does not support splitting the data corresponding to the specific key, i.e., it cannot solve the data skewing caused by single-key hotspot. For that we plan to introduce the following improvements: # Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the input characterisation which allows the operator to flexibly choose the data balanced distribution algorithm. # Introducing a data volume-based data balanced distribution algorithm for Pointwise inputs # Introducing the ability to split data corresponding to the specific key to optimise AllToAll's data volume-based data balancing distribution algorithm. was: Currently, the DefaultVertexParallelismAndInputInfosDecider is able to implement a balanced distribution algorithm based on the amount of data and the number of subpartitions, however it also has some limitations: # Currently, Decider selects the data distribution algorithm via the AllToAll or Pointwise attribute of the input, which limits the ability of the operator to dynamically modify the data distribution algorithm. # Doesn't support data volume-based balanced distribution for Pointwise inputs. # For AllToAll type inputs, it does not support splitting the data corresponding to the specific key, i.e., it cannot solve the data skewing caused by single-key hotspot. For that we plan to introduce the following improvements: # Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the input characterisation which allows the operator to flexibly choose the data balanced distribution algorithm. # Introducing a data volume-based data balanced distribution algorithm for Pointwise inputs # Introducing the ability to split data corresponding to the specific key to optimise AllToAll's data volume-based data balancing distribution algorithm. > Improving amount-based data balancing distribution algorithm for > DefaultVertexParallelismAndInputInfosDecider > - > > Key: FLINK-36576 > URL: https://issues.apache.org/jira/browse/FLINK-36576 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Lei Yang >Priority: Major > > Currently, the DefaultVertexParallelismAndInputInfosDecider is able to > implement a balanced distribution algorithm based on the amount of data and > the number of subpartitions, however it also has some limitations: > # Currently, Decider selects the data distribution algorithm via the > AllToAll or Pointwise attribute of the input, which limits the ability of the > operator to dynamically modify the data distribution algorithm. > # Doesn't support data volume-based balanced distribution for Pointwise > inputs. > # For AllToAll type inputs, it does not support splitting the data > corresponding to the specific key, i.e., it cannot solve the data skewing > caused by single-key hotspot. > For that we plan to introduce the following improvements: > # Introducing InterInputsKeyCorrelation and IntraInputKeyCorrelation to the > input characterisation which allows the operator to flexibly choose the data > balanced distribution algorithm. > # Introducing a data volume-based data balanced distribution algorithm for > Pointwise inputs > # Introducing the ability to split data corresponding to the specific key to > optimise AllToAll's data volume-based data balancing distribution algorithm. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36542] Enable upToDateChecking to speed up the spotless [flink]
ferenc-csaky commented on PR #25525: URL: https://github.com/apache/flink/pull/25525#issuecomment-2425881444 I think this would be pretty useful, just wondering about if we enable it by default could it break something in any CI? My understanding is that by default the index file is placed into the project root, and I think that would break the RAT license checker. But even if not, or if we add an exception for the index in the RAT config doing this extra step in CI runs has no much value, as that always runs start a clean slate, cloning the code, etc. All in all, I think disabling this by default would be a safer choice, we can introduce a property for it to make it easy to enable indexing via a dynamic param in any local run. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered
[ https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891463#comment-17891463 ] Vincent Woo commented on FLINK-34636: - This issue is occurring in version 1.13.2, and it looks like it may be related to this Network buffer leak:Network buffer leak when ResultPartition is released (failover), so I'll verify that the fix code avoids this issue first. > Requesting exclusive buffers timeout causes repeated restarts and cannot be > automatically recovered > --- > > Key: FLINK-34636 > URL: https://issues.apache.org/jira/browse/FLINK-34636 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.13.2 >Reporter: Vincent Woo >Priority: Major > Attachments: image-20240308100308649.png, > image-20240308101008765.png, image-20240308101407396.png, > image-20240308101934756.png > > > Based on the observation of logs and metrics, it was found that a subtask > deployed on a same TM consistently reported an exception of requesting > exclusive buffers timeout. It was discovered that during the restart process, > 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I > suspect that the network buffer memory was not properly released during the > restart process, which caused the newly deployed task to fail to obtain the > network buffer. This problem persisted despite repeated restarts, and the > application failed to recover automatically. > (I'm not sure if there are other reasons for this issue) > Attached below are screenshots of the exception stack and relevant metrics: > {code:java} > 2024-03-08 09:58:18,738 WARN org.apache.flink.runtime.taskmanager.Task > [] - GroupWindowAggregate switched from DEPLOYING to FAILED with > failure cause: java.io.IOException: Timeout triggered when requesting > exclusive buffers: The total number of network buffers is currently set to > 32768 of 32768 bytes each. You can increase this number by setting the > configuration keys 'taskmanager.memory.network.fraction', > 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or > you may increase the timeout which is 3ms by setting the key > 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427) > > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257) > > at > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84) > > at > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:748) {code} > !image-20240308101407396.png|width=866,height=171! > Network metric:Only this TM is always 100%, without any variation. > !image-20240308100308649.png|width=868,height=338! > The status of the task deployed to this TM cannot be RUNNING and the status > change is slow > !image-20240308101008765.png|width=869,height=118! > Although the root exception thrown by the application is > PartitionNotFoundException, the actual underlying root cause exception log > found is IOException: Timeout triggered when requesting exclusive buffers > !image-20240308101934756.png|width=869,height=394! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36568) IllegalStateException Causes Collect Sink to Fail to Collect Result
[ https://issues.apache.org/jira/browse/FLINK-36568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu reassigned FLINK-36568: --- Assignee: Junrui Li > IllegalStateException Causes Collect Sink to Fail to Collect Result > --- > > Key: FLINK-36568 > URL: https://issues.apache.org/jira/browse/FLINK-36568 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > > When Flink jobs use the collect sink, the JobManager logs the following > exception: > > {code:java} > ERROR > org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler > [] - Unhandled exception. > java.lang.IllegalStateException: No parameter could be found for the given > class. {code} > The root cause is that the HTTP URL for the ClientCoordinationHandler is > incorrect. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered
[ https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891463#comment-17891463 ] Vincent Woo edited comment on FLINK-34636 at 10/21/24 9:32 AM: --- This issue is occurring in version 1.13.2, and it looks like it may be related to this Network buffer leak:[Network buffer leak when ResultPartition is released (failover)|https://issues.apache.org/jira/browse/FLINK-23724], so I'll verify that the fix code avoids this issue first. was (Author: JIRAUSER299026): This issue is occurring in version 1.13.2, and it looks like it may be related to this Network buffer leak:Network buffer leak when ResultPartition is released (failover), so I'll verify that the fix code avoids this issue first. > Requesting exclusive buffers timeout causes repeated restarts and cannot be > automatically recovered > --- > > Key: FLINK-34636 > URL: https://issues.apache.org/jira/browse/FLINK-34636 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.13.2 >Reporter: Vincent Woo >Priority: Major > Attachments: image-20240308100308649.png, > image-20240308101008765.png, image-20240308101407396.png, > image-20240308101934756.png > > > Based on the observation of logs and metrics, it was found that a subtask > deployed on a same TM consistently reported an exception of requesting > exclusive buffers timeout. It was discovered that during the restart process, > 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I > suspect that the network buffer memory was not properly released during the > restart process, which caused the newly deployed task to fail to obtain the > network buffer. This problem persisted despite repeated restarts, and the > application failed to recover automatically. > (I'm not sure if there are other reasons for this issue) > Attached below are screenshots of the exception stack and relevant metrics: > {code:java} > 2024-03-08 09:58:18,738 WARN org.apache.flink.runtime.taskmanager.Task > [] - GroupWindowAggregate switched from DEPLOYING to FAILED with > failure cause: java.io.IOException: Timeout triggered when requesting > exclusive buffers: The total number of network buffers is currently set to > 32768 of 32768 bytes each. You can increase this number by setting the > configuration keys 'taskmanager.memory.network.fraction', > 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or > you may increase the timeout which is 3ms by setting the key > 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427) > > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257) > > at > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84) > > at > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:748) {code} > !image-20240308101407396.png|width=866,height=171! > Network metric:Only this TM is always 100%, without any variation. > !image-20240308100308649.png|width=868,height=338! > The status of the task deployed to this TM cannot be RUNNING and the status > change is slow > !image-20240308101008765.png|width=869,height=118! > Although the root exception thrown by the application is > PartitionNotFoundException, the actual underlying root cause exception log > found is IOException: Timeout triggered when requesting exclusive buffers > !image-20240308101934756.png|width=869,height=394! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered
[ https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891463#comment-17891463 ] Vincent Woo edited comment on FLINK-34636 at 10/21/24 9:32 AM: --- This issue is occurring in version 1.13.2, and it looks like it may be related to this Network buffer leak:Network buffer leak when ResultPartition is released (failover), so I'll verify that the fix code avoids this issue first. was (Author: JIRAUSER299026): This issue is occurring in version 1.13.2, and it looks like it may be related to this Network buffer leak:Network buffer leak when ResultPartition is released (failover), so I'll verify that the fix code avoids this issue first. > Requesting exclusive buffers timeout causes repeated restarts and cannot be > automatically recovered > --- > > Key: FLINK-34636 > URL: https://issues.apache.org/jira/browse/FLINK-34636 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.13.2 >Reporter: Vincent Woo >Priority: Major > Attachments: image-20240308100308649.png, > image-20240308101008765.png, image-20240308101407396.png, > image-20240308101934756.png > > > Based on the observation of logs and metrics, it was found that a subtask > deployed on a same TM consistently reported an exception of requesting > exclusive buffers timeout. It was discovered that during the restart process, > 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I > suspect that the network buffer memory was not properly released during the > restart process, which caused the newly deployed task to fail to obtain the > network buffer. This problem persisted despite repeated restarts, and the > application failed to recover automatically. > (I'm not sure if there are other reasons for this issue) > Attached below are screenshots of the exception stack and relevant metrics: > {code:java} > 2024-03-08 09:58:18,738 WARN org.apache.flink.runtime.taskmanager.Task > [] - GroupWindowAggregate switched from DEPLOYING to FAILED with > failure cause: java.io.IOException: Timeout triggered when requesting > exclusive buffers: The total number of network buffers is currently set to > 32768 of 32768 bytes each. You can increase this number by setting the > configuration keys 'taskmanager.memory.network.fraction', > 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or > you may increase the timeout which is 3ms by setting the key > 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427) > > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257) > > at > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84) > > at > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:748) {code} > !image-20240308101407396.png|width=866,height=171! > Network metric:Only this TM is always 100%, without any variation. > !image-20240308100308649.png|width=868,height=338! > The status of the task deployed to this TM cannot be RUNNING and the status > change is slow > !image-20240308101008765.png|width=869,height=118! > Although the root exception thrown by the application is > PartitionNotFoundException, the actual underlying root cause exception log > found is IOException: Timeout triggered when requesting exclusive buffers > !image-20240308101934756.png|width=869,height=394! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36577) Add compatibility check for FlinkStateSnapshot CRD
Mate Czagany created FLINK-36577: Summary: Add compatibility check for FlinkStateSnapshot CRD Key: FLINK-36577 URL: https://issues.apache.org/jira/browse/FLINK-36577 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Mate Czagany Fix For: kubernetes-operator-1.11.0 There are already two CRD compatibility checks in the flink-kubernetes-operator-api POM (for FlinkDeployment and FlinkSessionJob), one should also be added for FlinkStateSnapshot -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36577) Add compatibility check for FlinkStateSnapshot CRD
[ https://issues.apache.org/jira/browse/FLINK-36577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mate Czagany updated FLINK-36577: - Description: There are already two CRD compatibility checks in the flink-kubernetes-operator-api POM (for FlinkDeployment and FlinkSessionJob), one should also be added for FlinkStateSnapshot. Requirement for this is to have the 1.10.0 version released. was:There are already two CRD compatibility checks in the flink-kubernetes-operator-api POM (for FlinkDeployment and FlinkSessionJob), one should also be added for FlinkStateSnapshot > Add compatibility check for FlinkStateSnapshot CRD > -- > > Key: FLINK-36577 > URL: https://issues.apache.org/jira/browse/FLINK-36577 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Mate Czagany >Priority: Major > Fix For: kubernetes-operator-1.11.0 > > > There are already two CRD compatibility checks in the > flink-kubernetes-operator-api POM (for FlinkDeployment and FlinkSessionJob), > one should also be added for FlinkStateSnapshot. > > Requirement for this is to have the 1.10.0 version released. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35977][doc] Missing an import in datastream.md [flink]
guluo2016 commented on PR #25153: URL: https://github.com/apache/flink/pull/25153#issuecomment-2426468450 > Thanks for the contribution > > can you please also update `process_function.md` and `sourceSincks.md`, probably `data_stream_api.md` as well since they have the same issue. May be something else, need to search through the documentation to double check Thanks for your review, the changes have been made, please review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Flink Kubernetes Operator 1.10.0 [flink-web]
mateczagany commented on code in PR #758: URL: https://github.com/apache/flink-web/pull/758#discussion_r1808609610 ## docs/content/posts/2024-10-30-release-kubernetes-operator-1.10.0.md: ## @@ -0,0 +1,75 @@ +--- +title: "Apache Flink Kubernetes Operator 1.10.0 Release Announcement" +date: "2024-10-30T18:00:00.000Z" +authors: +- mateczagany: + name: "Mate Czagany" +aliases: +- /news/2024/10/30/release-kubernetes-operator-1.10.0.html +--- + +The Apache Flink community is excited to announce the release of Flink Kubernetes Operator 1.10.0! + +The release includes several improvements to the autoscaler, and introduces a new Kubernetes custom resource called FlinkStateSnapshot to manage job snapshots. +The process of job upgrades has also been enhanced which makes it possible to now use the last-state upgrade mode with session jobs. + +We encourage you to [download the release](https://flink.apache.org/downloads.html) and share your experience with the +community through the Flink [mailing lists](https://flink.apache.org/community.html#mailing-lists) or +[JIRA](https://issues.apache.org/jira/browse/flink)! We're looking forward to your feedback! + +## Highlights + +### FlinkStateSnapshot + +With this version comes also a new custom resource called FlinkStateSnapshot. +This is used to describe savepoint or checkpoint for a Flink job. +The savepoint/checkpoint fields found in FlinkDeployment and FlinkSessionJob status are therefore deprecated, and the operator will create new FlinkStateSnapshot resources for periodic, update and manual savepoints/checkpoints. + +Users can also create new FlinkStateSnapshot resources, which will instruct the operator to trigger new checkpoint/savepoint. + +This new feature is enabled by default, unless disabled by setting `kubernetes.operator.snapshot.resource.enabled` to false or if the FlinkStateSnapshot CRD was not found on the Kubernetes cluster. + +You can read more about this feature [here](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.10/docs/custom-resource/snapshots/). + + +### Last-State Upgrade Mode + +For deployments using last-state upgrade mode, the operator - instead of forcefully deleting the JobManager pods to trigger a restart - will cancel the job via REST API and extract the last checkpoint info after cancellation if the job is healthy. +This change makes it possible to finally use the last-state upgrade mode for session jobs as well. + + +### Autoscaler Delayed Scale Down + +With the introduction of the configuration option `job.autoscaler.scale-down.interval`, the operator can now optimize multiple scale-down operations to a single one to prevent too many unnecessary downscales, thus improving job availability. +Please note that `job.autoscaler.scale-up.grace-period` has been removed with this change. + + +### Other Autoscaler Improvements +- Optimized cases where partitions or key groups cannot be evenly distributed to subtasks in case of Kafka and Pulsar Review Comment: Thank you for clarifying it! Sorry for the delay, I have updated this line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35136] Bump connector version to 4.0, adapt CI workflows [flink-connector-hbase]
ferenc-csaky commented on PR #46: URL: https://github.com/apache/flink-connector-hbase/pull/46#issuecomment-2426418591 > Hi, @ferenc-csaky. I'm interested in the connector for Flink > 1.18 and glad to see this PR, thank you for the work! I wonder if there is there anything that is blocking the release? Hi @JuliaBogdan, unfortunately yes, it is not trivial to release the connector as is against 1.18+, because some Hadoop elements are not playing well with JDK17+, so this will be more work than I originally anticipated. I will initiate a discussion to releasethe HBase connector for Flink 1.18 and 1.19 for only JDK8 and JDK11 to at least start to bridge the current gap. Anything that comes afterwards probably we will require to bump all Hadoop-related deps in this repo. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36502) Remove all deprecated methods in `FactoryUtil`
[ https://issues.apache.org/jira/browse/FLINK-36502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891523#comment-17891523 ] Shengbo commented on FLINK-36502: - Hi [~xuyangzhong], I'd like to work on this subtask as my first contribution on Flink, would you please assign it to me > Remove all deprecated methods in `FactoryUtil` > -- > > Key: FLINK-36502 > URL: https://issues.apache.org/jira/browse/FLINK-36502 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: xuyang >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [runtime-web] chore: update Angular to v15 [flink]
michalmw commented on PR #25450: URL: https://github.com/apache/flink/pull/25450#issuecomment-2426504565 @simplejason or @yangjunhan can you check this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34467] bump flink version to 1.20.0 [flink-connector-jdbc]
boring-cyborg[bot] commented on PR #137: URL: https://github.com/apache/flink-connector-jdbc/pull/137#issuecomment-2426275936 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34467) Implement Lineage Interface in Jdbc Connector
[ https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-34467: --- Fix Version/s: cdc-3.3.0 > Implement Lineage Interface in Jdbc Connector > - > > Key: FLINK-34467 > URL: https://issues.apache.org/jira/browse/FLINK-34467 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.3.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34467] bump flink version to 1.20.0 [flink-connector-jdbc]
leonardBang merged PR #137: URL: https://github.com/apache/flink-connector-jdbc/pull/137 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34467) Implement Lineage Interface in Jdbc Connector
[ https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891492#comment-17891492 ] Leonard Xu commented on FLINK-34467: [~arvid] [~ZhenqiuHuang] Could you check your last PR for Kakfa connector version bump ? > Implement Lineage Interface in Jdbc Connector > - > > Key: FLINK-34467 > URL: https://issues.apache.org/jira/browse/FLINK-34467 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.3.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36476) Remove all deprecated methods under public APIs in table modules and adapt the related tests to make them pass
[ https://issues.apache.org/jira/browse/FLINK-36476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891481#comment-17891481 ] xuyang commented on FLINK-36476: [~davidradl] Good idea. The vast majority of the deletion work should not be affected, so let's proceed for now. Once we've completed most of the work, we can summarize the methods that were blocked due to not being marked as deprecated on the Python side and discuss them in the dev email. WDYT? > Remove all deprecated methods under public APIs in table modules and adapt > the related tests to make them pass > -- > > Key: FLINK-36476 > URL: https://issues.apache.org/jira/browse/FLINK-36476 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: xuyang >Priority: Blocker > Labels: pull-request-available > Fix For: 2.0.0 > > > All methods needed to be removed are: > [https://docs.google.com/document/d/1bVrmcB9UFOd1-sT7xDRTMb5rmO0vK0eDlLRUYYL9gd4/edit?usp=sharing] > > Note: > # Please make sure to delete and adapt the code(including test) of module > table, python, doc, etc. as well. > # Please raise a PR in the corresponding subtask, and try not to link PRs on > this umbrella jira. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33926][kubernetes]: Allow using job jars in the system classpath in native mode [flink]
tinaselenge commented on code in PR #25445: URL: https://github.com/apache/flink/pull/25445#discussion_r1808630829 ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java: ## @@ -214,9 +215,12 @@ public ClusterClientProvider deployApplicationCluster( applicationConfiguration.applyToConfiguration(flinkConfig); -// No need to do pipelineJars validation if it is a PyFlink job. +// No need to do pipelineJars validation if it is a PyFlink job or no jars specified in +// pipeline.jars (to use jars in system classpath instead). if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName()) -|| PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments( { +|| PackagedProgramUtils.isPython( + applicationConfiguration.getProgramArguments())) +&& flinkConfig.get(PipelineOptions.JARS) != null) { Review Comment: good idea! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34467) Implement Lineage Interface in Jdbc Connector
[ https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891488#comment-17891488 ] Leonard Xu edited comment on FLINK-34467 at 10/21/24 10:35 AM: --- merged into jdbc connector repo via main: 72f315fcac5ab300ec06625696bcda33b2141cd7 was (Author: leonard xu): merged into jdbc connector repo vias main: 72f315fcac5ab300ec06625696bcda33b2141cd7 > Implement Lineage Interface in Jdbc Connector > - > > Key: FLINK-34467 > URL: https://issues.apache.org/jira/browse/FLINK-34467 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.3.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34467) Implement Lineage Interface in Jdbc Connector
[ https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891488#comment-17891488 ] Leonard Xu commented on FLINK-34467: merged into jdbc connector repo vias main: 72f315fcac5ab300ec06625696bcda33b2141cd7 > Implement Lineage Interface in Jdbc Connector > - > > Key: FLINK-34467 > URL: https://issues.apache.org/jira/browse/FLINK-34467 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.3.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35136] Bump connector version to 4.0, adapt CI workflows [flink-connector-hbase]
JuliaBogdan commented on PR #46: URL: https://github.com/apache/flink-connector-hbase/pull/46#issuecomment-2426326123 Hi, @ferenc-csaky. I'm interested in the connector for Flink > 1.18 and glad to see this PR, thank you for the work! I wonder if there is there anything that is blocking the release? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35016] catalog changes for model resource [flink]
twalthr merged PR #25211: URL: https://github.com/apache/flink/pull/25211 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35016) Catalog changes for model CRUD
[ https://issues.apache.org/jira/browse/FLINK-35016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther closed FLINK-35016. Fix Version/s: 2.0.0 Assignee: Hao Li Resolution: Fixed Fixed in master: 2d17f6148796c30890d38c830f772f8f38bfd495 > Catalog changes for model CRUD > -- > > Key: FLINK-35016 > URL: https://issues.apache.org/jira/browse/FLINK-35016 > Project: Flink > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36542] Enable upToDateChecking to speed up the spotless [flink]
ferenc-csaky commented on PR #25525: URL: https://github.com/apache/flink/pull/25525#issuecomment-2426536625 > @ferenc-csaky Thanks for your review, as far as I know, the index file will be generated under root/target, this will not be put into binary package, so will not break the RAT license check, we don't need any extra change in CI. > > BTW, disable it by default and enable this by using maven args is acceptable to me. > > ``` > mvn spotless:apply -Dspotless.usingIndex=true > ``` > > It's not a burden to me. Sounds good to me! Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34467) Implement Lineage Interface in Jdbc Connector
[ https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-34467: --- Affects Version/s: jdbc-3.2.0 (was: 1.19.0) > Implement Lineage Interface in Jdbc Connector > - > > Key: FLINK-34467 > URL: https://issues.apache.org/jira/browse/FLINK-34467 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [hotfix][docs] Add FlinkStateSnapshot to upgrade docs [flink-kubernetes-operator]
mateczagany opened a new pull request, #905: URL: https://github.com/apache/flink-kubernetes-operator/pull/905 With the usual `helm upgrade`, the new FlinkStateSnapshot CRD won't be installed, and it will need to be installed with `kubectl create`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36557) Stale Autoscaler Context in Kubernetes Operator
[ https://issues.apache.org/jira/browse/FLINK-36557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891612#comment-17891612 ] Sai Sharath Dandi commented on FLINK-36557: --- Hi [~gyfora] , Could you please help review this Jira and provide suggestion? I can work on the PR for fix accordingly > Stale Autoscaler Context in Kubernetes Operator > --- > > Key: FLINK-36557 > URL: https://issues.apache.org/jira/browse/FLINK-36557 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Sai Sharath Dandi >Priority: Minor > > The KubernetesJobAutoScalerContext is > [cached|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java#L59] > in the FlinkResourceContext and reused. If the JobAutoscalerContext is > initialized before the job reaches Running state, it can cause the autoscaler > to not trigger - > [link|[https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java#L98].] > > We need to either refresh the AutoScalerContext similar to the standalone > [implementation|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java#L127] > or the autoscaler module itself needs to refresh the job status -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36542] Enable upToDateChecking to speed up the spotless [flink]
ferenc-csaky commented on PR #25525: URL: https://github.com/apache/flink/pull/25525#issuecomment-2427353187 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36557) Stale Autoscaler Context in Kubernetes Operator
[ https://issues.apache.org/jira/browse/FLINK-36557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sai Sharath Dandi updated FLINK-36557: -- Description: The KubernetesJobAutoScalerContext is [cached|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java#L59] in the FlinkResourceContext and reused. If the JobAutoscalerContext is initialized before the job reaches Running state, it can cause the autoscaler to not trigger - [link|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java#L98] We need to either refresh the AutoScalerContext similar to the standalone [implementation|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java#L127] or the autoscaler module itself needs to refresh the job status was: The KubernetesJobAutoScalerContext is [cached|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java#L59] in the FlinkResourceContext and reused. If the JobAutoscalerContext is initialized before the job reaches Running state, it can cause the autoscaler to not trigger - [link|[https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java#L98].] We need to either refresh the AutoScalerContext similar to the standalone [implementation|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java#L127] or the autoscaler module itself needs to refresh the job status > Stale Autoscaler Context in Kubernetes Operator > --- > > Key: FLINK-36557 > URL: https://issues.apache.org/jira/browse/FLINK-36557 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Sai Sharath Dandi >Priority: Minor > > The KubernetesJobAutoScalerContext is > [cached|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java#L59] > in the FlinkResourceContext and reused. If the JobAutoscalerContext is > initialized before the job reaches Running state, it can cause the autoscaler > to not trigger - > [link|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java#L98] > > We need to either refresh the AutoScalerContext similar to the standalone > [implementation|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java#L127] > or the autoscaler module itself needs to refresh the job status -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36542) Enable upToDateChecking to speed up the spotless
[ https://issues.apache.org/jira/browse/FLINK-36542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky reassigned FLINK-36542: Assignee: Wenjun Ruan > Enable upToDateChecking to speed up the spotless > > > Key: FLINK-36542 > URL: https://issues.apache.org/jira/browse/FLINK-36542 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Wenjun Ruan >Assignee: Wenjun Ruan >Priority: Major > Labels: pull-request-available > Attachments: image-2024-10-15-22-01-25-049.png > > > I use `mvn spotless:apply` to format the code format, find it will cost 01:13 > min, this is too slow and each time I execute `mvn spotless:apply` it will > cost 1 min. > I hope we can enable > upToDateChecking setting to speed up the spotless > [https://github.com/diffplug/spotless/tree/main/plugin-maven#incremental-up-to-date-checking-and-formatting] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36542) Enable upToDateChecking to speed up the spotless
[ https://issues.apache.org/jira/browse/FLINK-36542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky updated FLINK-36542: - Fix Version/s: 2.0.0 > Enable upToDateChecking to speed up the spotless > > > Key: FLINK-36542 > URL: https://issues.apache.org/jira/browse/FLINK-36542 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Wenjun Ruan >Assignee: Wenjun Ruan >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > Attachments: image-2024-10-15-22-01-25-049.png > > > I use `mvn spotless:apply` to format the code format, find it will cost 01:13 > min, this is too slow and each time I execute `mvn spotless:apply` it will > cost 1 min. > I hope we can enable > upToDateChecking setting to speed up the spotless > [https://github.com/diffplug/spotless/tree/main/plugin-maven#incremental-up-to-date-checking-and-formatting] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36146][connector] Fix SingleThreadFetcherManager race condition [flink]
becketqin commented on PR #25340: URL: https://github.com/apache/flink/pull/25340#issuecomment-2427417285 @kimgr Sorry for the late reply. And thanks for the patch. The patch itself looks good to me. Usually, a test needs to be added to verify the bug is actually fixed. For race conditions like this the unit test can be a little tricky to write. What I usually do is just running the sequence for something like 1000 times and make sure there is no occasional failure anymore. So in this case, I think we can repeat the following sequence in a unit test for 1000 times: 1. create a `SingleThreadFetcherManager`. 2. add a new split. 3. close the fetcher manager without waiting. 4. add a new split. That said, I still don't quite understand why a split will be assigned after the SourceReader is closing. But I don't think the fix will cause problem because the SourceReader is shutdown anyways when this race condition happens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Use chronological order timestamp for timers [flink-benchmarks]
pnowojski commented on PR #94: URL: https://github.com/apache/flink-benchmarks/pull/94#issuecomment-2427270392 I think the problem got resolved by https://github.com/apache/flink-benchmarks/pull/97 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Use chronological order timestamp for timers [flink-benchmarks]
pnowojski merged PR #94: URL: https://github.com/apache/flink-benchmarks/pull/94 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36146][connector] Fix SingleThreadFetcherManager race condition [flink]
kimgr commented on PR #25340: URL: https://github.com/apache/flink/pull/25340#issuecomment-2427449905 Thanks, I'll try to hack up a testcase! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36573][cdc-connector][mysql] Add table information of binlog offsets when checkpointing [flink-cdc]
herunkang2018 commented on code in PR #3656: URL: https://github.com/apache/flink-cdc/pull/3656#discussion_r1809715176 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java: ## @@ -108,6 +108,16 @@ public boolean isCompletedSplit() { return totalFinishedSplitSize == finishedSnapshotSplitInfos.size(); } +public String getTables() { +String tables; +if (tableSchemas != null) { +tables = tableSchemas.keySet().toString(); Review Comment: Good advice, I will refine it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]
yunfengzhou-hub commented on code in PR #25501: URL: https://github.com/apache/flink/pull/25501#discussion_r1809757295 ## flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java: ## @@ -67,6 +67,12 @@ default boolean isRestored() { */ InternalTimeServiceManager internalTimerServiceManager(); +/** + * Returns the internal timer service manager create by async state backend for the stream + * operator. This method returns null for non-keyed operators. + */ +InternalTimeServiceManager asyncInternalTimerServiceManager(); Review Comment: Are there cases in which an operator needs to use async timer service and sync timer service simultaneously? I'm wondering if we could reduce `internalTimerServiceManager()` and `asyncInternalTimerServiceManager()` into one method, given that they have the same return type, and it might better help hiding implementation details from invokers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36578) Fixed bug when converting json to string
Lee SeungMin created FLINK-36578: Summary: Fixed bug when converting json to string Key: FLINK-36578 URL: https://issues.apache.org/jira/browse/FLINK-36578 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Lee SeungMin The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace - code ``` protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try { r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String) { // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } ``` (https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381) So added modified JsonStringFormatter (added whitespace between key and value, and after comma to make it work the same as the snapshot step). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36052] Add elasticsearch.md for elasticsearch pipeline connector [flink-cdc]
github-actions[bot] commented on PR #3539: URL: https://github.com/apache/flink-cdc/pull/3539#issuecomment-2427952115 This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Optimize log level [flink-cdc]
github-actions[bot] commented on PR #3538: URL: https://github.com/apache/flink-cdc/pull/3538#issuecomment-2427952144 This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36573][cdc-connector][mysql] Add table information of binlog offsets when checkpointing [flink-cdc]
herunkang2018 commented on code in PR #3656: URL: https://github.com/apache/flink-cdc/pull/3656#discussion_r1809718242 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java: ## @@ -530,7 +530,11 @@ private void logCurrentBinlogOffsets(List splits, long checkpointId) return; } BinlogOffset offset = split.asBinlogSplit().getStartingOffset(); -LOG.info("Binlog offset on checkpoint {}: {}", checkpointId, offset); +LOG.info( +"Binlog offset for tables {} on checkpoint {}: {}", +split.asBinlogSplit().getTables(), Review Comment: Yes, I will refine it. The first few tables are sufficient to identify the specific cdc source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36578) Fixed bug when converting json to string
[ https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee SeungMin updated FLINK-36578: - Description: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace - code {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) So added modified JsonStringFormatter (added whitespace between key and value, and after comma to make it work the same as the snapshot step). was: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace - code ``` protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try { r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String) { // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } ``` (https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381) So added modified JsonStringFormatter (added whitespace between key and value, and after comma to make it work the same as the snapshot step). > Fixed bug when converting json to string > > > Key: FLINK-36578 > URL: https://issues.apache.org/jira/browse/FLINK-36578 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Lee SeungMin >Priority: Major > > The function to convert json to string is different in the snapshot step and > the binlog step. > This causes the following differences. > - original : \{"key": "value", "key1": "value1"} > - snapshot : \{"key": "value", "key1": "value1"} > - binlog : \{"key":"value","key1":"value1"} // no whitespace > > > - code > {code:java} > protected Object convertJson(Column column, Field fieldDefn, Object data) > { > return convertValue(column, fieldDefn, data, "{}", (r) -> { > if (data instanceof byte[]) { > // The BinlogReader sees these JSON values as binary encoded, >
[jira] [Updated] (FLINK-36578) Fixed bug when converting json to string
[ https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36578: --- Labels: pull-request-available (was: ) > Fixed bug when converting json to string > > > Key: FLINK-36578 > URL: https://issues.apache.org/jira/browse/FLINK-36578 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Lee SeungMin >Priority: Major > Labels: pull-request-available > > The function to convert json to string is different in the snapshot step and > the binlog step. > This causes the following differences. > - original : \{"key": "value", "key1": "value1"} > - snapshot : \{"key": "value", "key1": "value1"} > - binlog : \{"key":"value","key1":"value1"} // no whitespace > > > - code > {code:java} > protected Object convertJson(Column column, Field fieldDefn, Object data) > { > return convertValue(column, fieldDefn, data, "{}", (r) -> { > if (data instanceof byte[]) { > // The BinlogReader sees these JSON values as binary encoded, > so we use the binlog client library's utility > // to parse MySQL's internal binary representation into a > JSON string, using the standard formatter. if (((byte[]) > data).length == 0) { > r.deliver(column.isOptional() ? null : "{}"); > } > else { > try{ > r.deliver(JsonBinary.parseAsString((byte[]) data)); } > catch (IOException e) { > parsingErrorHandler.error("Failed to parse and read a > JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); > r.deliver(column.isOptional() ? null : "{}"); > } > } > } > else if (data instanceof String){ // The > SnapshotReader sees JSON values as UTF-8 encoded strings. > r.deliver(data); } }); > } {code} > ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) > > So added modified JsonStringFormatter (added whitespace between key and > value, and after comma to make it work the same as the snapshot step). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36578][mysql] Add table information of binlog offsets when checkpointing added modified JsonStringFormatter [flink-cdc]
SML0127 opened a new pull request, #3658: URL: https://github.com/apache/flink-cdc/pull/3658 The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. ```d original data : {"key": "value", "key1": "value1"} snapshot : {"key": "value", "key1": "value1"} binlog : {"key":"value","key1":"value1"} // no whitespace ``` So I modified JsonStringFormatter and added it to flink-cdc - modified code line - line 105: Added whitespace before value - line 207: Added whitespace after comma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36578) Fixed bug when converting json to string
[ https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891704#comment-17891704 ] Lee SeungMin commented on FLINK-36578: -- assign this pr to me PTAL [~Leonard], [~renqs] > Fixed bug when converting json to string > > > Key: FLINK-36578 > URL: https://issues.apache.org/jira/browse/FLINK-36578 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Lee SeungMin >Priority: Major > > The function to convert json to string is different in the snapshot step and > the binlog step. > This causes the following differences. > - original : \{"key": "value", "key1": "value1"} > - snapshot : \{"key": "value", "key1": "value1"} > - binlog : \{"key":"value","key1":"value1"} // no whitespace > > > - code > {code:java} > protected Object convertJson(Column column, Field fieldDefn, Object data) > { > return convertValue(column, fieldDefn, data, "{}", (r) -> { > if (data instanceof byte[]) { > // The BinlogReader sees these JSON values as binary encoded, > so we use the binlog client library's utility > // to parse MySQL's internal binary representation into a > JSON string, using the standard formatter. if (((byte[]) > data).length == 0) { > r.deliver(column.isOptional() ? null : "{}"); > } > else { > try{ > r.deliver(JsonBinary.parseAsString((byte[]) data)); } > catch (IOException e) { > parsingErrorHandler.error("Failed to parse and read a > JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); > r.deliver(column.isOptional() ? null : "{}"); > } > } > } > else if (data instanceof String){ // The > SnapshotReader sees JSON values as UTF-8 encoded strings. > r.deliver(data); } }); > } {code} > ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) > > So added modified JsonStringFormatter (added whitespace between key and > value, and after comma to make it work the same as the snapshot step). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36578) Fixed bug when converting json to string
[ https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee SeungMin updated FLINK-36578: - Description: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace code {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) So added modified JsonStringFormatter (added whitespace between key and value, and after comma to make it work the same as the snapshot step). was: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace code {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) So added modified JsonStringFormatter (added whitespace between key and value, and after comma to make it work the same as the snapshot step). > Fixed bug when converting json to string > > > Key: FLINK-36578 > URL: https://issues.apache.org/jira/browse/FLINK-36578 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Lee SeungMin >Priority: Major > Labels: pull-request-available > > The function to convert json to string is different in the snapshot step and > the binlog step. > This causes the following differences. > - original : \{"key": "value", "key1": "value1"} > - snapshot : \{"key": "value", "key1": "value1"} > - binlog : \{"key":"value","key1":"value1"} // no whitespace > > code > {code:java} > protected Object convertJson(Column column, Field fieldDefn, Object data) > { > return convertValue(column, fieldDefn, data, "{}", (r) -> { > if (data instanceof byte[]) { > // The BinlogReader sees
[jira] [Updated] (FLINK-36578) Fixed bug when converting json to string
[ https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee SeungMin updated FLINK-36578: - Description: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace code {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) So added modified JsonStringFormatter (added whitespace between key and value, and after comma to make it work the same as the snapshot step). was: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace - code {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) So added modified JsonStringFormatter (added whitespace between key and value, and after comma to make it work the same as the snapshot step). > Fixed bug when converting json to string > > > Key: FLINK-36578 > URL: https://issues.apache.org/jira/browse/FLINK-36578 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Lee SeungMin >Priority: Major > Labels: pull-request-available > > The function to convert json to string is different in the snapshot step and > the binlog step. > This causes the following differences. > - original : \{"key": "value", "key1": "value1"} > - snapshot : \{"key": "value", "key1": "value1"} > - binlog : \{"key":"value","key1":"value1"} // no whitespace > > code > {code:java} > protected Object convertJson(Column column, Field fieldDefn, Object data) > { > return convertValue(column, fieldDefn, data, "{}", (r) -> { > if (data instanceof byte[]) { > // The BinlogReader s
[jira] [Commented] (FLINK-36578) Fixed bug when converting json to string
[ https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891706#comment-17891706 ] Lee SeungMin commented on FLINK-36578: -- PR: https://github.com/apache/flink-cdc/pull/3658 > Fixed bug when converting json to string > > > Key: FLINK-36578 > URL: https://issues.apache.org/jira/browse/FLINK-36578 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Lee SeungMin >Priority: Major > Labels: pull-request-available > > The function to convert json to string is different in the snapshot step and > the binlog step. > This causes the following differences. > - original : \{"key": "value", "key1": "value1"} > - snapshot : \{"key": "value", "key1": "value1"} > - binlog : \{"key":"value","key1":"value1"} // no whitespace > > code > {code:java} > protected Object convertJson(Column column, Field fieldDefn, Object data) > { > return convertValue(column, fieldDefn, data, "{}", (r) -> { > if (data instanceof byte[]) { > // The BinlogReader sees these JSON values as binary encoded, > so we use the binlog client library's utility > // to parse MySQL's internal binary representation into a > JSON string, using the standard formatter. if (((byte[]) > data).length == 0) { > r.deliver(column.isOptional() ? null : "{}"); > } > else { > try{ > r.deliver(JsonBinary.parseAsString((byte[]) data)); } > catch (IOException e) { > parsingErrorHandler.error("Failed to parse and read a > JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); > r.deliver(column.isOptional() ? null : "{}"); > } > } > } > else if (data instanceof String){ // The > SnapshotReader sees JSON values as UTF-8 encoded strings. > r.deliver(data); } }); > } {code} > ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) > > So added modified JsonStringFormatter (added whitespace between key and > value, and after comma to make it work the same as the snapshot step). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36578][mysql] Add table information of binlog offsets when checkpointing added modified JsonStringFormatter [flink-cdc]
SML0127 commented on PR #3658: URL: https://github.com/apache/flink-cdc/pull/3658#issuecomment-2428093914 @leonardBang @PatrickRen PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]
fredia commented on code in PR #25501: URL: https://github.com/apache/flink/pull/25501#discussion_r1809809121 ## flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java: ## @@ -67,6 +67,12 @@ default boolean isRestored() { */ InternalTimeServiceManager internalTimerServiceManager(); +/** + * Returns the internal timer service manager create by async state backend for the stream + * operator. This method returns null for non-keyed operators. + */ +InternalTimeServiceManager asyncInternalTimerServiceManager(); Review Comment: > Are there cases in which an operator needs to use async timer service and sync timer service simultaneously? No, it is up to the developer to decide which one to use. This is to keep it consistent with the current `asyncStateBackend` and `KeyedStateBackend`, we will sort out the relationship between `asyncStateBackend`/`keyedStateBackend` and `asyncTimerManager`/`timerManager` in subsequent PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36578) Fixed bug when converting json to string
[ https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee SeungMin updated FLINK-36578: - Description: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace So added modified JsonStringFormatter to flink-cdc (modified code: added whitespace between key and value, and after comma to make it work the same as the snapshot step). different logic to convert json as string in snapshot and binlog step. {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) was: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace code {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) So added modified JsonStringFormatter (added whitespace between key and value, and after comma to make it work the same as the snapshot step). > Fixed bug when converting json to string > > > Key: FLINK-36578 > URL: https://issues.apache.org/jira/browse/FLINK-36578 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Lee SeungMin >Priority: Major > Labels: pull-request-available > > The function to convert json to string is different in the snapshot step and > the binlog step. > This causes the following differences. > - original : \{"key": "value", "key1": "value1"} > - snapshot : \{"key": "value", "key1": "value1"} > - binlog : \{"key":"value","key1":"value1"} // no whitespace > > So added modified JsonStringFormatter to flink-cdc > (modified code: added whitespace between key and value, and after comma to > make it work the same a
[jira] [Commented] (FLINK-36506) Remove all deprecated methods in `ColumnStats`
[ https://issues.apache.org/jira/browse/FLINK-36506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891724#comment-17891724 ] Atul Sharma commented on FLINK-36506: - Hi [~xuyangzhong] can i work on this? > Remove all deprecated methods in `ColumnStats` > -- > > Key: FLINK-36506 > URL: https://issues.apache.org/jira/browse/FLINK-36506 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: xuyang >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36503) Remove deprecated method `FunctionDefinitionFactory#createFunctionDefinition`
[ https://issues.apache.org/jira/browse/FLINK-36503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891725#comment-17891725 ] Atul Sharma commented on FLINK-36503: - Hi [~xuyangzhong] can i work on this? > Remove deprecated method `FunctionDefinitionFactory#createFunctionDefinition` > - > > Key: FLINK-36503 > URL: https://issues.apache.org/jira/browse/FLINK-36503 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: xuyang >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36521][runtime] Introduce TtlAwareSerializer and TtlAwareSerializerSnapshot [flink]
xiangyuf opened a new pull request, #25554: URL: https://github.com/apache/flink/pull/25554 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] FLINK-36506: Remove all deprecated methods in `ColumnStats` [flink]
atu-sharm opened a new pull request, #2: URL: https://github.com/apache/flink/pull/2 ## What is the purpose of the change Removing all deprecated methods as per [FLINK-36506](https://issues.apache.org/jira/browse/FLINK-36506) in `ColumnStats` *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36506) Remove all deprecated methods in `ColumnStats`
[ https://issues.apache.org/jira/browse/FLINK-36506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36506: --- Labels: pull-request-available (was: ) > Remove all deprecated methods in `ColumnStats` > -- > > Key: FLINK-36506 > URL: https://issues.apache.org/jira/browse/FLINK-36506 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: xuyang >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36521) Introduce TtlAwareSerializer to resolve the compatibility check between ttlSerializer and original serializer
[ https://issues.apache.org/jira/browse/FLINK-36521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36521: --- Labels: pull-request-available (was: ) > Introduce TtlAwareSerializer to resolve the compatibility check between > ttlSerializer and original serializer > -- > > Key: FLINK-36521 > URL: https://issues.apache.org/jira/browse/FLINK-36521 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: xiangyu feng >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36521][runtime] Introduce TtlAwareSerializer and TtlAwareSerializerSnapshot [flink]
flinkbot commented on PR #25554: URL: https://github.com/apache/flink/pull/25554#issuecomment-2428292508 ## CI report: * 98ea2df3c759b9d866b0e0327ec6a06883ff6dc9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] FLINK-36506: Remove all deprecated methods in `ColumnStats` [flink]
flinkbot commented on PR #2: URL: https://github.com/apache/flink/pull/2#issuecomment-2428292787 ## CI report: * 1656356a2b283dbffa1dcec3685025dd32b24cb8 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in batch mode [flink]
jnh5y commented on code in PR #24699: URL: https://github.com/apache/flink/pull/24699#discussion_r1809434479 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java: ## @@ -567,7 +912,7 @@ void testUserDefinedFunctions() { "SELECT *\n" + "FROM MyTable\n" + "MATCH_RECOGNIZE (\n" -+ " ORDER BY proctime\n" ++ " ORDER BY ts\n" Review Comment: As a question, do we need to change all of the tests from using PROCTIME() to ts? E.g., can the existing behavior remain? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34825) [flink-cdc-pipeline-connectors] Add Implementation of DataSource in MongoDB
[ https://issues.apache.org/jira/browse/FLINK-34825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891714#comment-17891714 ] JunboWang commented on FLINK-34825: --- Interested in this. Please assign it to me. Thanks! > [flink-cdc-pipeline-connectors] Add Implementation of DataSource in MongoDB > --- > > Key: FLINK-34825 > URL: https://issues.apache.org/jira/browse/FLINK-34825 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Flink CDC Issue Import >Priority: Major > Labels: github-import > > ### Search before asking > - [X] I searched in the > [issues|https://github.com/ververica/flink-cdc-connectors/issues] and found > nothing similar. > ### Motivation > After https://github.com/ververica/flink-cdc-connectors/pull/2638 merged, we > can try to add implementation in MongoDB to simulate enterprise scenarios. > You may need to wait for > https://github.com/ververica/flink-cdc-connectors/issues/2642 > https://github.com/ververica/flink-cdc-connectors/issues/2644 completed. > ### Solution > _No response_ > ### Alternatives > _No response_ > ### Anything else? > _No response_ > ### Are you willing to submit a PR? > - [ ] I'm willing to submit a PR! > Imported from GitHub > Url: https://github.com/apache/flink-cdc/issues/2648 > Created by: [lvyanquan|https://github.com/lvyanquan] > Labels: enhancement, task, 【3.0】, > Assignee: [Jiabao-Sun|https://github.com/Jiabao-Sun] > Created at: Tue Nov 07 11:18:18 CST 2023 > State: open -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] FLINK-36506: Remove all deprecated methods in `ColumnStats` [flink]
atu-sharm commented on PR #2: URL: https://github.com/apache/flink/pull/2#issuecomment-2428292821 Hi @xuyangzhong can you please review this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]
yunfengzhou-hub commented on PR #25501: URL: https://github.com/apache/flink/pull/25501#issuecomment-2428319498 Thanks for the update. The runtime part LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-36569) flink kafka connector do not close kafka produer when it checkpoint success
[ https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891735#comment-17891735 ] Jake.zhang edited comment on FLINK-36569 at 10/22/24 6:34 AM: -- add schedule close service in `org.apache.flink.connector.kafka.sink.KafkaWriter` constructor, reserve recently 5 checkpoint producer. it works. need to wait for the Kafka transaction timeout. {code:java} initFlinkMetrics(); // 定时任务检查是否有需要关闭的 checkpoint id this.autoCloseService = Executors.newSingleThreadScheduledExecutor(); this.autoCloseService.scheduleWithFixedDelay( () -> { try { LOG.info("last checkpoint id: {}", lastCheckpointId); // 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1 作为最大的 transaction FlinkKafkaInternalProducer flinkKafkaInternalProducer = null; while ((flinkKafkaInternalProducer = (FlinkKafkaInternalProducer) this.producerCloseables.peek()) != null) { String transactionId = flinkKafkaInternalProducer.getTransactionalId(); assert transactionId != null; String[] transactionIdArr = transactionId.split("-"); long itemCheckpointId = Long.parseLong(transactionIdArr[transactionIdArr.length - 1]); if (lastCheckpointId - 5 > itemCheckpointId) { // 消费出来置空 try { FlinkKafkaInternalProducer closeable = (FlinkKafkaInternalProducer) this.producerCloseables.poll(); closeable.close(); LOG.info( "close producer transaction id: {}", closeable.getTransactionalId()); } catch (Exception e) { LOG.warn("fkip close error", e); } } else { // 等待下次检查 break; } } } catch (Exception e) { LOG.warn("schedule auto close producer error", e); } }, 60, 60, TimeUnit.SECONDS); } {code} was (Author: ft20082): add schedule close service in `org.apache.flink.connector.kafka.sink.KafkaWriter`, reserve recently 5 checkpoint producer. it works. need to wait for the Kafka transaction timeout. {code:java} initFlinkMetrics(); // 定时任务检查是否有需要关闭的 checkpoint id this.autoCloseService = Executors.newSingleThreadScheduledExecutor(); this.autoCloseService.scheduleWithFixedDelay( () -> { try { LOG.info("last checkpoint id: {}", lastCheckpointId); // 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1 作为最大的 transaction FlinkKafkaInternalProducer flinkKafkaInternalProducer = null; while ((flinkKafkaInternalProducer = (FlinkKafkaInternalProducer) this.producerCloseables.peek()) != null) { String transactionId = flinkKafkaInternalProducer.getTransactionalId(); assert transactionId != null; String[] transactionIdArr = transactionId.split("-"); long itemCheckpointId = Long.parseLong(transactionIdArr[transactionIdArr.length - 1]); if (lastCheckpointId - 5 > itemCheckpointId) { // 消费出来置空 try { FlinkKafkaInternalProducer closeable = (FlinkKafkaInternalProducer) this.producerCloseables.poll(); closeable.close(); LOG.info( "close producer transaction id: {}", closeable.getTransactionalId()); } catch (Exception e) { LOG.warn("fkip close error", e); } } else { // 等待下次检查 break; } } } catch (Exception e) { LOG.warn("schedule auto close producer error", e); }
[jira] [Commented] (FLINK-36569) flink kafka connector do not close kafka produer when it checkpoint success
[ https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891735#comment-17891735 ] Jake.zhang commented on FLINK-36569: add schedule close service in `org.apache.flink.connector.kafka.sink.KafkaWriter`, reserve recently 5 checkpoint producer. it works. need to wait for the Kafka transaction timeout. {code:java} initFlinkMetrics(); // 定时任务检查是否有需要关闭的 checkpoint id this.autoCloseService = Executors.newSingleThreadScheduledExecutor(); this.autoCloseService.scheduleWithFixedDelay( () -> { try { LOG.info("last checkpoint id: {}", lastCheckpointId); // 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1 作为最大的 transaction FlinkKafkaInternalProducer flinkKafkaInternalProducer = null; while ((flinkKafkaInternalProducer = (FlinkKafkaInternalProducer) this.producerCloseables.peek()) != null) { String transactionId = flinkKafkaInternalProducer.getTransactionalId(); assert transactionId != null; String[] transactionIdArr = transactionId.split("-"); long itemCheckpointId = Long.parseLong(transactionIdArr[transactionIdArr.length - 1]); if (lastCheckpointId - 5 > itemCheckpointId) { // 消费出来置空 try { FlinkKafkaInternalProducer closeable = (FlinkKafkaInternalProducer) this.producerCloseables.poll(); closeable.close(); LOG.info( "close producer transaction id: {}", closeable.getTransactionalId()); } catch (Exception e) { LOG.warn("fkip close error", e); } } else { // 等待下次检查 break; } } } catch (Exception e) { LOG.warn("schedule auto close producer error", e); } }, 60, 60, TimeUnit.SECONDS); } {code} > flink kafka connector do not close kafka produer when it checkpoint success > --- > > Key: FLINK-36569 > URL: https://issues.apache.org/jira/browse/FLINK-36569 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.19.0, 1.20.0 > Environment: flink: 1.20 > flink kafka connector: 3.3.0-1.20 >Reporter: Jake.zhang >Priority: Major > Attachments: image-2024-10-18-13-31-39-253.png, > image-2024-10-21-14-02-41-823.png > > > flink kafka connector do't close FlinkKafkaInternalProducer when flink > checkpoint success in flink 1.20/1.19 . it will create one > FlinkKafkaInternalProducer per checkpoint. > > FlinkKafkaInternalProducer do not close automatic. so kafka producer network > thread will more and more > it create `getRecoveryProducer` each time, `recyclable` object always > null, so `recyclable.ifPresent(Recyclable::close)` not work. > `org.apache.flink.connector.kafka.sink.KafkaCommitter` > {code:java} > producer = > recyclable > .>map(Recyclable::getObject) > .orElseGet(() -> getRecoveryProducer(committable)); > producer.commitTransaction(); > producer.flush(); > recyclable.ifPresent(Recyclable::close);{code} > > !image-2024-10-21-14-02-41-823.png! > > !image-2024-10-18-13-31-39-253.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)