Re: [PR] [FLINK-31928][build] Upgrade okhttp3 to 4.11.0 [flink]
cy2008 closed pull request #22498: [FLINK-31928][build] Upgrade okhttp3 to 4.11.0 URL: https://github.com/apache/flink/pull/22498 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-36032) FLIP-468: Introducing StreamGraph-Based Job Submission
[ https://issues.apache.org/jira/browse/FLINK-36032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Junrui Li resolved FLINK-36032. --- Release Note: In version 2.0, we changed the default job submission behavior from submitting the job graph to submitting the stream graph. After the Job Manager receives the stream graph, it will persist it and compile it into a job graph for scheduling. Resolution: Resolved > FLIP-468: Introducing StreamGraph-Based Job Submission > -- > > Key: FLINK-36032 > URL: https://issues.apache.org/jira/browse/FLINK-36032 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination, Runtime / REST >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Fix For: 2.0.0 > > > This is the umbrella ticket for > [FLIP-468|https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36600) Implement java.sql.Wrapper interface.
Yi Cai created FLINK-36600: -- Summary: Implement java.sql.Wrapper interface. Key: FLINK-36600 URL: https://issues.apache.org/jira/browse/FLINK-36600 Project: Flink Issue Type: Improvement Components: Table SQL / JDBC Reporter: Yi Cai -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36601) Add JobID property to FlinkResultSet
Yi Cai created FLINK-36601: -- Summary: Add JobID property to FlinkResultSet Key: FLINK-36601 URL: https://issues.apache.org/jira/browse/FLINK-36601 Project: Flink Issue Type: Improvement Components: Table SQL / JDBC Reporter: Yi Cai -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35387][cdc-connector][postgres] PG CDC source support heart beat. [flink-cdc]
loserwang1024 opened a new pull request, #3667: URL: https://github.com/apache/flink-cdc/pull/3667 As shown in FLINK-35387, postgres cdc support: debezium.heartbeat.action.query -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35387) PG CDC source support heart beat
[ https://issues.apache.org/jira/browse/FLINK-35387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35387: --- Labels: pull-request-available (was: ) > PG CDC source support heart beat > > > Key: FLINK-35387 > URL: https://issues.apache.org/jira/browse/FLINK-35387 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.3.0 > > > Though, document of PG CDC [1] has heartbeat.interval.ms, but it's not valid. > The reason is bellow. > In debezium dos says: For the connector to detect and process events from a > heartbeat table, you must add the table to the PostgreSQL publication > specified by the > [publication.name|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-publication-name] > property. If this publication predates your Debezium deployment, the > connector uses the publications as defined. If the publication is not already > configured to automatically replicate changes {{FOR ALL TABLES}} in the > database, you must explicitly add the heartbeat table to the publication[2]. > Thus, if you want use heart beat in cdc: > 1. add a heartbeat table to publication: ALTER PUBLICATION > __ ADD TABLE {_}{_}; > 2. set heartbeatInterval > 3. add > debezium.[{{heartbeat.action.query}}|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query] > [3] > > However, when I use it it CDC, some exception occurs: > {code:java} > Caused by: java.lang.NullPointerException > at > io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55) > at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:127) > at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:94){code} > !https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5292b7c63c883d1620bbf7d3875a3a5b158e70b814913bc360a414d3de9277d871abf3af1cbd75249eddaaa1b37c2b2f5421a918fb1a2f0f3853c0ce41721e620699d98626fa2281948c58faa63edf8ebfc653b69905bac42?tmpCode=9193555a-7bf3-4335-9427-b59c1dfe1931! > > It seems CDC don't add a HeartbeatConnectionProvider when configure > PostgresEventDispatcher: > {code:java} > //org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configure > this.postgresDispatcher = > new PostgresEventDispatcher<>( > dbzConfig, > topicSelector, > schema, > queue, > dbzConfig.getTableFilters().dataCollectionFilter(), > DataChangeEvent::new, > metadataProvider, > schemaNameAdjuster); {code} > in debezium, when PostgresConnectorTask start, it will do it > {code:java} > //io.debezium.connector.postgresql.PostgresConnectorTask#start > final PostgresEventDispatcher dispatcher = new > PostgresEventDispatcher<>( > connectorConfig, > topicNamingStrategy, > schema, > queue, > connectorConfig.getTableFilters().dataCollectionFilter(), > DataChangeEvent::new, > PostgresChangeRecordEmitter::updateSchema, > metadataProvider, > connectorConfig.createHeartbeat( > topicNamingStrategy, > schemaNameAdjuster, > () -> new > PostgresConnection(connectorConfig.getJdbcConfig(), > PostgresConnection.CONNECTION_GENERAL), > exception -> { > String sqlErrorId = exception.getSQLState(); > switch (sqlErrorId) { > case "57P01": > // Postgres error admin_shutdown, see > https://www.postgresql.org/docs/12/errcodes-appendix.html > throw new DebeziumException("Could > not execute heartbeat action query (Error: " + sqlErrorId + ")", exception); > case "57P03": > // Postgres error cannot_connect_now, > see https://www.postgresql.org/docs/12/errcodes-appendix.html > throw new RetriableException("Could > not execute heartbeat action query (Error: " + sqlErrorId + ")", exception); > default: > break; >
Re: [PR] [FLINK-35387][cdc-connector][postgres] PG CDC source support heart beat. [flink-cdc]
loserwang1024 commented on PR #3667: URL: https://github.com/apache/flink-cdc/pull/3667#issuecomment-2436996841 @leonardBang @ruanhang1993 , CC -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34688][cdc-connector] CDC framework split snapshot chunks asynchronously [flink-cdc]
loserwang1024 commented on PR #3510: URL: https://github.com/apache/flink-cdc/pull/3510#issuecomment-2436894467 > @loserwang1024 Please rebase to master and solove confilcts. @GOODBOY008 done it, and all the tests are passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
github-actions[bot] commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2436543491 This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36520] CDC Supports MySQL 8.x and above [flink-cdc]
yuxiqian opened a new pull request, #3666: URL: https://github.com/apache/flink-cdc/pull/3666 This closes FLINK-36520, FLINK-36590. Currently, trying to use MySQL CDC with MySQL 8.4 would cause the following exception: ``` Caused by: org.apache.flink.util.FlinkRuntimeException: Read split MySqlSnapshotSplit{tableId=customer.customers, splitId='customer.customers:0', splitKeyType=[`id` BIGINT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} error due to org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured. ``` The reason is MySQL has changed some questionable words like `MASTER` and `SLAVE` to more appropriate ones, so some commands are no longer compatible. Legacy MySqlSource (based on Debezium) wasn't fixed since it's not possible to upgrade Debezium dependency for now, and it has been deprecated for a long time anyway. Marked this PR as a draft, needs more discussion and testing over this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36599) Correct the name of the config which enables response codes group
Alan Zhang created FLINK-36599: -- Summary: Correct the name of the config which enables response codes group Key: FLINK-36599 URL: https://issues.apache.org/jira/browse/FLINK-36599 Project: Flink Issue Type: Improvement Reporter: Alan Zhang In the "Metrics and Logging" doc, it doesn't use the full name of the operator config(aka, add operator config prefix "kubernetes.operator."): [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging.] {code:java} It’s possible to publish additional metrics by Http response code received from API server by setting kubernetes.client.metrics.http.response.code.groups.enabled to true . {code} This is confusing, we should make it be consistent with the naming in the "Configuration" doc: [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#system-metrics-configuration] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36599) Correct the name of the config which enables response codes group
[ https://issues.apache.org/jira/browse/FLINK-36599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alan Zhang updated FLINK-36599: --- Description: In the "Metrics and Logging" doc, it doesn't use the full name of the operator config(aka, add operator config prefix "kubernetes.operator."): [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging.] {code:java} It’s possible to publish additional metrics by Http response code received from API server by setting kubernetes.client.metrics.http.response.code.groups.enabled to true . {code} This is confusing, we should make it be consistent with the naming in the "Configuration" doc: [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#system-metrics-configuration] was: In the "Metrics and Logging" doc, it doesn't use the full name of the operator config(aka, add operator config prefix "kubernetes.operator."): [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging.] {code:java} It’s possible to publish additional metrics by Http response code received from API server by setting kubernetes.client.metrics.http.response.code.groups.enabled to true . {code} This is confusing, we should make it be consistent with the naming in the "Configuration" doc: [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#system-metrics-configuration] > Correct the name of the config which enables response codes group > - > > Key: FLINK-36599 > URL: https://issues.apache.org/jira/browse/FLINK-36599 > Project: Flink > Issue Type: Improvement >Reporter: Alan Zhang >Priority: Minor > > In the "Metrics and Logging" doc, it doesn't use the full name of the > operator config(aka, add operator config prefix "kubernetes.operator."): > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging.] > {code:java} > It’s possible to publish additional metrics by Http response code received > from API server by setting > kubernetes.client.metrics.http.response.code.groups.enabled to true . {code} > This is confusing, we should make it be consistent with the naming in the > "Configuration" doc: > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#system-metrics-configuration] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36542) Bump spotless version to 2.35.0
[ https://issues.apache.org/jira/browse/FLINK-36542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenjun Ruan updated FLINK-36542: Summary: Bump spotless version to 2.35.0 (was: Enable upToDateChecking to speed up the spotless) > Bump spotless version to 2.35.0 > --- > > Key: FLINK-36542 > URL: https://issues.apache.org/jira/browse/FLINK-36542 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Wenjun Ruan >Assignee: Wenjun Ruan >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > Attachments: image-2024-10-15-22-01-25-049.png > > > I use `mvn spotless:apply` to format the code format, find it will cost 01:13 > min, this is too slow and each time I execute `mvn spotless:apply` it will > cost 1 min. > I hope we can enable > upToDateChecking setting to speed up the spotless > [https://github.com/diffplug/spotless/tree/main/plugin-maven#incremental-up-to-date-checking-and-formatting] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36599] Correct the metric config name in the doc to reduce confusing [flink-kubernetes-operator]
alnzng opened a new pull request, #907: URL: https://github.com/apache/flink-kubernetes-operator/pull/907 ## What is the purpose of the change This PR is for fixing the name of the metric config which enables the response codes group in "[Metrics and Logging](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging/#kubernetes-client-metrics-by-http-response-code)" doc, to make the naming be consistent with information in "[Configuration](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#system-metrics-configuration)" doc. ## Brief change log - Add operator config prefix `kubernetes.operator.` to make it show the full name of the config ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (no) - Core observer or reconciler logic that is regularly executed: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36599) Correct the name of the config which enables response codes group
[ https://issues.apache.org/jira/browse/FLINK-36599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36599: --- Labels: pull-request-available (was: ) > Correct the name of the config which enables response codes group > - > > Key: FLINK-36599 > URL: https://issues.apache.org/jira/browse/FLINK-36599 > Project: Flink > Issue Type: Improvement >Reporter: Alan Zhang >Priority: Minor > Labels: pull-request-available > > In the "Metrics and Logging" doc, it doesn't use the full name of the > operator config(aka, add operator config prefix "kubernetes.operator."): > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging.] > {code:java} > It’s possible to publish additional metrics by Http response code received > from API server by setting > kubernetes.client.metrics.http.response.code.groups.enabled to true . {code} > This is confusing, we should make it be consistent with the naming in the > "Configuration" doc: > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#system-metrics-configuration] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36599] Correct the metric config name in the doc to reduce confusing [flink-kubernetes-operator]
alnzng commented on PR #907: URL: https://github.com/apache/flink-kubernetes-operator/pull/907#issuecomment-2436959770 @gyfora A minor improvement for doc, would you mind helping take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36429) Enhancing Flink History Server File Storage and Retrieval with RocksDB
[ https://issues.apache.org/jira/browse/FLINK-36429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892488#comment-17892488 ] Eaugene Thomas commented on FLINK-36429: Hi , If anyone is not working on , I am happy to take this up ? > Enhancing Flink History Server File Storage and Retrieval with RocksDB > -- > > Key: FLINK-36429 > URL: https://issues.apache.org/jira/browse/FLINK-36429 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.20.0 >Reporter: Xiaowen Sun >Priority: Major > Labels: historyserver > Original Estimate: 2,016h > Remaining Estimate: 2,016h > > Currently, when a Flink job finishes, it writes an archive as a single file > that maps paths to JSON files. Flink History Server (FHS) job archives are > pulled locally where the FHS is running on, and this process creates a local > directory that expands based on the contents of the single archive file. > Because of how the FHS stores the files, there are a large number of > directories created in the local file system. This system can become > inefficient and slow as the volume of job archives increases, creating > bottlenecks in job data navigation and retrieval. > To illustrate the problem of inode usage, let’s consider a scenario where > there are 5000 subtasks. Each subtask creates its own directory, and within > each subtask directory, there are additional directories that might store > only a single file. This structure rapidly increases the number of inodes > consumed. > Integrating RocksDB, a high-performance embedded database for key-value data, > aims to resolve these issues by offering faster data access and better > scalability. This integration is expected to significantly enhance the > operational efficiency of FHS by allowing faster data retrieval and enabling > a larger cache on local Kubernetes deployments, thus overcoming inode > limitations -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36032) FLIP-468: Introducing StreamGraph-Based Job Submission
[ https://issues.apache.org/jira/browse/FLINK-36032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Junrui Li updated FLINK-36032: -- Release Note: In 2.0, we changed the default job submission behavior from submitting the job graph to submitting the stream graph. After the Job Manager receives the stream graph, it will persist it and compile it into a job graph for scheduling. (was: In version 2.0, we changed the default job submission behavior from submitting the job graph to submitting the stream graph. After the Job Manager receives the stream graph, it will persist it and compile it into a job graph for scheduling.) > FLIP-468: Introducing StreamGraph-Based Job Submission > -- > > Key: FLINK-36032 > URL: https://issues.apache.org/jira/browse/FLINK-36032 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination, Runtime / REST >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Fix For: 2.0.0 > > > This is the umbrella ticket for > [FLIP-468|https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] FLINK-36506: Remove all deprecated methods in `ColumnStats` [flink]
atu-sharm commented on PR #2: URL: https://github.com/apache/flink/pull/2#issuecomment-2436863036 Sure @tinaselenge will do.. thanks!! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs] Add 'scan.incremental.snapshot.backfill.skip' to docs. [flink-cdc]
loserwang1024 commented on PR #3030: URL: https://github.com/apache/flink-cdc/pull/3030#issuecomment-2437022722 I will rebase it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36584) PostgresIncrementalSource is not exiting the flink execution when StartupOptions is snapshot and create multiple replication slots
[ https://issues.apache.org/jira/browse/FLINK-36584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892678#comment-17892678 ] Hongshun Wang commented on FLINK-36584: --- # Issue-1: yes, you can see the argue of snapshot mode design: [https://github.com/apache/flink-cdc/issues/2867.] "If we only include snapshot split , cannot get a consistency snapshot at any point, unless any operation is not allowed during snapshot splits read. For example, we have three snapshot splits. During the first split and third split, some update operations are applied. When read third snapshot split, it can read these operations to it, but the first splits cannot.{*}Make an extreme assumption: after the first split finished, we stop the job, then restart it from the savepoint in tomorrow. When the job finished, we get partial data in first split which is today's version , while get partial data in third split which is tomorrow's version."{*} # Issue-2: because we need to read backfill log which needed a slot, and it will be removed if snapshot split is finished. You can also skip backfill log which option `scan.incremental.snapshot.backfill.skip'`. > PostgresIncrementalSource is not exiting the flink execution when > StartupOptions is snapshot and create multiple replication slots > -- > > Key: FLINK-36584 > URL: https://issues.apache.org/jira/browse/FLINK-36584 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: 3.0.0 >Reporter: Anil Dasari >Priority: Major > > Issue-1. PostgresIncrementalSource is not exiting the Flink execution when > StartupOptions is snapshot. > > Postgres cdc module is using HybridSplitAssigner for batch scan and is > [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L128] > still trying to create streaming split when snapshot splits are completed. > > Issue-2. When source parallelism is > 1 i.e 2, PostgresIncrementalSource is > creating multiple replication slots. > > postgres logs: > {code:java} > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: logical > decoding found consistent point at 0/1690B28 > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: exported > logical decoding snapshot: "0006-0003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: logical > decoding found consistent point at 0/1690BF8 > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: exported > logical decoding snapshot: "0008-0003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] LOG: logical > decoding found consistent point at 0/1690C30 > flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.267 UTC [52] LOG: exported > logical decoding snapshot: "0009-0003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] LOG: starting > logical decoding for slot "flinkpostgres_0" > flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] DETAIL: Streaming > transactions committing after 0/1690C30, reading WAL from 0/1690BF8. > flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] LOG: logical > decoding found consistent point at 0/1690BF8 > flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] ERROR: replication > slot "flinkpostgres_1" does not exist > flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_1') > flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] LOG: starting > logical decoding for slot "flinkpostgres_0" > flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] DETAIL: Streaming > transactions committing after 0/1690C30, reading WAL from 0/1690C30. > flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] LOG: logical > decoding found consistent point at 0/1690C30 > flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] ERROR: replication > slot "flinkpostgres_0" does not exist > flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] STATEMENT: select > pg_dro
Re: [PR] [FLINK-36542] Bump spotless-maven-plugin version to 2.35.0 [flink]
ruanwenjun commented on PR #25525: URL: https://github.com/apache/flink/pull/25525#issuecomment-2435229437 @ferenc-csaky Done, updated the PR title and jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36002] Add CompiledPlan annotations to BatchExecMatch [flink]
dawidwys commented on code in PR #25171: URL: https://github.com/apache/flink/pull/25171#discussion_r1814803784 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java: ## @@ -86,7 +89,8 @@ public abstract class CommonExecMatch extends ExecNodeBase public static final String MATCH_TRANSFORMATION = "match"; -private final MatchSpec matchSpec; +@JsonProperty(FIELD_NAME_MATCH_SPEC) Review Comment: This introduces circular dependency. You use a constant from a child class in the parent class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36524][pipeline-connector][paimon] bump version of Paimon to 0.9.0 [flink-cdc]
lvyanquan commented on code in PR #3644: URL: https://github.com/apache/flink-cdc/pull/3644#discussion_r1814860259 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java: ## @@ -44,8 +48,34 @@ public PaimonCommitter(Options catalogOptions, String commitUser) { storeMultiCommitter = new StoreMultiCommitter( () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions), -commitUser, -null); +new org.apache.paimon.flink.sink.Committer.Context() { Review Comment: Thanks for your advice, addressed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36280) Snapshot scan mode do not support batch execution mode, and not as expected in mode streaming
[ https://issues.apache.org/jira/browse/FLINK-36280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HunterXHunter updated FLINK-36280: -- Attachment: image-2024-10-24-14-17-07-675.png > Snapshot scan mode do not support batch execution mode, and not as expected > in mode streaming > - > > Key: FLINK-36280 > URL: https://issues.apache.org/jira/browse/FLINK-36280 > Project: Flink > Issue Type: Bug >Reporter: HunterXHunter >Priority: Major > Attachments: image-2024-10-24-14-16-17-434.png, > image-2024-10-24-14-17-07-675.png > > > [https://github.com/apache/flink-cdc/pull/2901] > > I've encountered a few problems. > premise : > - Only 1 record in my mysql. > - Flink 1.8.1 > - flink cdc 3.1 & 3.2 > {code:java} > //代码占位符 > CREATE TEMPORARY TABLE mysql_cdc ( > id STRING, > name string, > dt string, > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = '123456789', > 'database-name' = 'tt', > 'table-name' = 'mysql_cdc_source', > 'scan.startup.mode'='snapshot'); {code} > > * Not supporting `BATCH` execution mode ?, Set execution.runtime-mode=BATCH > will met error. > {code:java} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Querying a table in batch mode is > currently only possible for INSERT-only table sources. But the source for > table 'default_catalog.default_database.mysql_cdc' produces other changelog > messages than just INSERT. {code} > * When i set execution.runtime-mode=STREAMING and checkpoint is disable , > then the job won't be never finish. > # > ## if set execution.checkpointing.interval=60s, then job will be finished > after checkpoint had triggered (60s) > ## if set execution.checkpointing.interval=5s, then the job will be finished > soon. (As expected) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]
AHeise commented on code in PR #130: URL: https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1815533794 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java: ## @@ -0,0 +1,97 @@ +package org.apache.flink.connector.kafka.lineage.facets; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.regex.Pattern; + +/** Facet containing all information related to sources and sinks on Kafka. */ +public class KafkaDatasetFacet implements LineageDatasetFacet { + +public static final String KAFKA_FACET_NAME = "kafka"; + +public final Properties properties; +public final TypeInformation typeInformation; +public final KafkaDatasetIdentifier topicIdentifier; + +public KafkaDatasetFacet( +KafkaDatasetIdentifier topicIdentifier, +Properties properties, +TypeInformation typeInformation) { +this.topicIdentifier = topicIdentifier; +this.properties = properties; +this.typeInformation = typeInformation; +} + +public void addProperties(Properties properties) { +this.properties.putAll(properties); +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +KafkaDatasetFacet that = (KafkaDatasetFacet) o; +return Objects.equals(properties, that.properties) +&& Objects.equals(typeInformation, that.typeInformation) +&& Objects.equals(topicIdentifier, that.topicIdentifier); +} + +@Override +public int hashCode() { +return Objects.hash(properties, typeInformation, topicIdentifier); +} + +@Override +public String name() { +return KAFKA_FACET_NAME; +} + +/** + * Record class to contain topics' identifier information which can be either a list of topics + * or a topic pattern. + */ +public static class KafkaDatasetIdentifier { +public final List topics; +public final Pattern topicPattern; + +public KafkaDatasetIdentifier(List fixedTopics, Pattern topicPattern) { +this.topics = fixedTopics; +this.topicPattern = topicPattern; +} + +public static KafkaDatasetIdentifier of(Pattern pattern) { +return new KafkaDatasetIdentifier(Collections.emptyList(), pattern); +} + +public static KafkaDatasetIdentifier of(List fixedTopics) { Review Comment: ```suggestion public static KafkaDatasetIdentifier ofTopics(List fixedTopics) { ``` ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java: ## @@ -0,0 +1,97 @@ +package org.apache.flink.connector.kafka.lineage.facets; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.regex.Pattern; + +/** Facet containing all information related to sources and sinks on Kafka. */ +public class KafkaDatasetFacet implements LineageDatasetFacet { + +public static final String KAFKA_FACET_NAME = "kafka"; + +public final Properties properties; Review Comment: We usually avoid public and rather use the full jazz. It just makes it easier to later add more validation or defensive copies when needed. ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java: ## @@ -0,0 +1,16 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet; + +import java.util.Optional; + +/** Contains method to extract {@link KafkaDatasetFacet}. */ +public interface KafkaDatasetFacetProvider { + +/** + * List of lineage dataset facets. Review Comment: Adjust comment. ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java: ## @@ -0,0 +1,97 @@ +package org.apache.flink.connector.kafka.lineage.facets; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.regex.Pattern; + +/** Facet containing all information related to sources and sinks on Kafka. */ +public class KafkaDatasetFacet implements LineageDatasetFacet { + +public static final String KAFKA_FACE
Re: [PR] [FLINK-36542] Bump spotless-maven-plugin version to 2.35.0 [flink]
ferenc-csaky commented on PR #25525: URL: https://github.com/apache/flink/pull/25525#issuecomment-2436340186 @ruanwenjun can you pls. rebase on top of the latest master and force-push again? That wikipedia example got broken a couple days ago, but since then it was removed, so it does not block the CI anymore. I know it is not related to this change at all, but in general it is a good practice to always have a green CI. Sorry about the back and forth. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36077][Connectors/Google PubSub] Implement table api support for SinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on PR #30: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/30#issuecomment-2435460105 @snuyanzin Could you please review or nominate a reviewer please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36512) Make rescale trigger based on failed checkpoints depend on the cause
[ https://issues.apache.org/jira/browse/FLINK-36512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892485#comment-17892485 ] Prashant Bhardwaj commented on FLINK-36512: --- [~mapohl] If nobody is working on it, can I pick this up? Happy to discuss this. > Make rescale trigger based on failed checkpoints depend on the cause > > > Key: FLINK-36512 > URL: https://issues.apache.org/jira/browse/FLINK-36512 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 2.0.0 >Reporter: Matthias Pohl >Priority: Major > > [FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler] > introduced rescale on checkpoints. The trigger logic is also initiated for > failed checkpoints (after a counter reached a configurable limit). > The issue here is that we might end up considering failed checkpoints which > we actually don't want to care about (e.g. checkpoint failures due to not all > tasks running, yet). Instead, we should start considering checkpoints only if > the job started running to avoid unnecessary (premature) rescale decisions. > We already have logic like that in place in the > [CheckpointCoordinator|https://github.com/apache/flink/blob/8be94e6663d8ac6e3d74bf4cd5f540cc96c8289e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java#L217] > which we might want to use here as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35401][Connectors/Sqs] Add Sqs Table API connector [flink-connector-aws]
vahmed-hamdy commented on PR #155: URL: https://github.com/apache/flink-connector-aws/pull/155#issuecomment-2434760164 @hlteoh37 gentle reminder if you have any other feedback -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36491][TABLE API] Remove the deprecated method in DataStreamSinkProvider [flink]
tinaselenge commented on PR #25564: URL: https://github.com/apache/flink/pull/25564#issuecomment-2434605355 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]
pnowojski commented on code in PR #25539: URL: https://github.com/apache/flink/pull/25539#discussion_r1814885284 ## flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java: ## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.otel; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.LogicalScopeProvider; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.metrics.util.TestHistogram; +import org.apache.flink.util.TestLoggerExtension; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link OpenTelemetryMetricReporter}. */ +@ExtendWith(TestLoggerExtension.class) +public class OpenTelemetryMetricReporterITCase extends OpenTelemetryTestBase { + +private static final long TIME_MS = 1234; + +private OpenTelemetryMetricReporter reporter; +private final Histogram histogram = new TestHistogram(); + +@BeforeEach +public void setUpEach() { +reporter = +new OpenTelemetryMetricReporter( +Clock.fixed(Instant.ofEpochMilli(TIME_MS), Clock.systemUTC().getZone())); +} + +@Test +public void testReport() throws Exception { +MetricConfig metricConfig = createMetricConfig(); +MetricGroup group = new TestMetricGroup(); + +reporter.open(metricConfig); + +SimpleCounter counter = new SimpleCounter(); +reporter.notifyOfAddedMetric(counter, "foo.counter", group); + +Gauge gauge = () -> 123.456d; +reporter.notifyOfAddedMetric(gauge, "foo.gauge", group); + +reporter.report(); + +MeterView meter = new MeterView(counter); +reporter.notifyOfAddedMetric(meter, "foo.meter", group); + +reporter.notifyOfAddedMetric(histogram, "foo.histogram", group); + +reporter.report(); +reporter.close(); + +eventuallyConsumeJson( +(json) -> { +JsonNode scopeMetrics = + json.findPath("resourceMetrics").findPath("scopeMetrics"); + assertThat(scopeMetrics.findPath("scope").findPath("name").asText()) +.isEqualTo("io.confluent.flink.common.metrics"); +JsonNode metrics = scopeMetrics.findPath("metrics"); + +List metricNames = extractMetricNames(json); +assertThat(metricNames) +.contains( +"flink.logical.scope.foo.counter", +"flink.logical.scope.foo.gauge", +"flink.logical.scope.foo.meter.count", +"flink.logical.scope.foo.meter.rate", +"flink.logical.scope.foo.histogram"); + + metrics.forEach(OpenTelemetryMetricReporterITCase::assertMetrics); +}); +} + +private static void assertMetrics(JsonNode metric) { +String name = metric.findPath("name").asText(); +if (name.equals("flink.logical.scope.foo.counter")) { + assertThat(metric.at("/sum/dataPoints").findPath("asInt").asInt()).isEqualTo(0); +} else if (name.equals("flink.logical.scope.foo.gauge")) { + assertThat(metric.at("/gauge/dataPoints").findPath("asDouble").asDouble()) +.isCloseTo(123.456, Percentage.withPercentage(1));
Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]
pnowojski merged PR #25539: URL: https://github.com/apache/flink/pull/25539 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent/RenameColunEvent [flink-cdc]
qg-lin commented on code in PR #3622: URL: https://github.com/apache/flink-cdc/pull/3622#discussion_r1815121775 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java: ## @@ -50,6 +50,7 @@ public class ValuesDataSourceHelper { */ public enum EventSetId { SINGLE_SPLIT_SINGLE_TABLE, +SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE, Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]
pnowojski commented on code in PR #25539: URL: https://github.com/apache/flink/pull/25539#discussion_r1814845364 ## flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java: ## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.otel; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.LogicalScopeProvider; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.metrics.util.TestHistogram; +import org.apache.flink.util.TestLoggerExtension; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link OpenTelemetryMetricReporter}. */ +@ExtendWith(TestLoggerExtension.class) +public class OpenTelemetryMetricReporterITCase extends OpenTelemetryTestBase { + +private static final long TIME_MS = 1234; + +private OpenTelemetryMetricReporter reporter; +private final Histogram histogram = new TestHistogram(); + +@BeforeEach +public void setUpEach() { +reporter = +new OpenTelemetryMetricReporter( +Clock.fixed(Instant.ofEpochMilli(TIME_MS), Clock.systemUTC().getZone())); +} + +@Test +public void testReport() throws Exception { +MetricConfig metricConfig = createMetricConfig(); +MetricGroup group = new TestMetricGroup(); + +reporter.open(metricConfig); + +SimpleCounter counter = new SimpleCounter(); +reporter.notifyOfAddedMetric(counter, "foo.counter", group); + +Gauge gauge = () -> 123.456d; +reporter.notifyOfAddedMetric(gauge, "foo.gauge", group); + +reporter.report(); + +MeterView meter = new MeterView(counter); +reporter.notifyOfAddedMetric(meter, "foo.meter", group); + +reporter.notifyOfAddedMetric(histogram, "foo.histogram", group); + +reporter.report(); +reporter.close(); + +eventuallyConsumeJson( +(json) -> { +JsonNode scopeMetrics = + json.findPath("resourceMetrics").findPath("scopeMetrics"); + assertThat(scopeMetrics.findPath("scope").findPath("name").asText()) +.isEqualTo("io.confluent.flink.common.metrics"); +JsonNode metrics = scopeMetrics.findPath("metrics"); + +List metricNames = extractMetricNames(json); +assertThat(metricNames) +.contains( +"flink.logical.scope.foo.counter", +"flink.logical.scope.foo.gauge", +"flink.logical.scope.foo.meter.count", +"flink.logical.scope.foo.meter.rate", +"flink.logical.scope.foo.histogram"); + + metrics.forEach(OpenTelemetryMetricReporterITCase::assertMetrics); +}); +} + +private static void assertMetrics(JsonNode metric) { +String name = metric.findPath("name").asText(); +if (name.equals("flink.logical.scope.foo.counter")) { + assertThat(metric.at("/sum/dataPoints").findPath("asInt").asInt()).isEqualTo(0); +} else if (name.equals("flink.logical.scope.foo.gauge")) { + assertThat(metric.at("/gauge/dataPoints").findPath("asDouble").asDouble()) +.isCloseTo(123.456, Percentage.withPercentage(1));
Re: [PR] [FLINK-36592][state/forst] Support file cache for ForStStateBackend [flink]
fredia commented on code in PR #25561: URL: https://github.com/apache/flink/pull/25561#discussion_r1814198955 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ## @@ -46,21 +54,44 @@ public class ForStFlinkFileSystem extends FileSystem { // TODO: make it configurable private static final int DEFAULT_INPUT_STREAM_CAPACITY = 32; +private static final long SST_FILE_SIZE = 1024 * 1024 * 64; + private static final Map remoteLocalMapping = new ConcurrentHashMap<>(); private static final Function miscFileFilter = s -> !s.endsWith(".sst"); +private static Path cacheBase; +private static long cacheCapacity = Long.MAX_VALUE; private final FileSystem localFS; private final FileSystem delegateFS; private final String remoteBase; private final Function localFileFilter; private final String localBase; +@Nullable private final FileBasedCache fileBasedCache; -public ForStFlinkFileSystem(FileSystem delegateFS, String remoteBase, String localBase) { +public ForStFlinkFileSystem( +FileSystem delegateFS, +String remoteBase, +String localBase, +@Nullable FileBasedCache fileBasedCache) { this.localFS = FileSystem.getLocalFileSystem(); this.delegateFS = delegateFS; this.localFileFilter = miscFileFilter; this.remoteBase = remoteBase; this.localBase = localBase; +this.fileBasedCache = fileBasedCache; +} + +/** + * Configure cache for ForStFlinkFileSystem. + * + * @param path the cache base path. + * @param cacheCap the cache capacity. + */ +public static void configureCache(Path path, long cacheCap) { Review Comment: Thanks for the suggestion, it will be changed after forstjni update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36569) flink kafka connector do not close kafka produer when it checkpoint success
[ https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892455#comment-17892455 ] Hongshun Wang commented on FLINK-36569: --- [~ft20082] Thanks for your explain, it seems one FlinkKafkaInternalProducer is enough for all the offset commit. > flink kafka connector do not close kafka produer when it checkpoint success > --- > > Key: FLINK-36569 > URL: https://issues.apache.org/jira/browse/FLINK-36569 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.19.0, 1.20.0 > Environment: flink: 1.20 > flink kafka connector: 3.3.0-1.20 >Reporter: Jake.zhang >Priority: Major > Attachments: image-2024-10-18-13-31-39-253.png, > image-2024-10-21-14-02-41-823.png > > > flink kafka connector do't close FlinkKafkaInternalProducer when flink > checkpoint success in flink 1.20/1.19 . it will create one > FlinkKafkaInternalProducer per checkpoint. > > FlinkKafkaInternalProducer do not close automatic. so kafka producer network > thread will more and more > it create `getRecoveryProducer` each time, `recyclable` object always > null, so `recyclable.ifPresent(Recyclable::close)` not work. > `org.apache.flink.connector.kafka.sink.KafkaCommitter` > {code:java} > producer = > recyclable > .>map(Recyclable::getObject) > .orElseGet(() -> getRecoveryProducer(committable)); > producer.commitTransaction(); > producer.flush(); > recyclable.ifPresent(Recyclable::close);{code} > > !image-2024-10-21-14-02-41-823.png! > > !image-2024-10-18-13-31-39-253.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]
pnowojski commented on code in PR #25539: URL: https://github.com/apache/flink/pull/25539#discussion_r1814837025 ## flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.otel; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.TestContainerExtension; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import com.esotericsoftware.minlog.Log; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.BaseConsumer; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** Tests for {@link OpenTelemetryMetricReporter}. */ +@ExtendWith(TestLoggerExtension.class) +public class OpenTelemetryTestBase { +public static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryTestBase.class); + +private static final Duration TIME_OUT = Duration.ofMinutes(2); + +@RegisterExtension +@Order(1) +private static final AllCallbackWrapper> +OTEL_EXTENSION = +new AllCallbackWrapper<>( +new TestContainerExtension<>(() -> new OtelTestContainer())); + +@BeforeEach +public void setup() { +Slf4jLevelLogConsumer logConsumer = new Slf4jLevelLogConsumer(LOG); + OTEL_EXTENSION.getCustomExtension().getTestContainer().followOutput(logConsumer); +} + +public static OtelTestContainer getOtelContainer() { +return OTEL_EXTENSION.getCustomExtension().getTestContainer(); +} + +public static MetricConfig createMetricConfig() { +MetricConfig metricConfig = new MetricConfig(); +metricConfig.setProperty( +OpenTelemetryReporterOptions.EXPORTER_ENDPOINT.key(), +getOtelContainer().getGrpcEndpoint()); +return metricConfig; +} + +public static void eventuallyConsumeJson(ThrowingConsumer jsonConsumer) +throws Exception { +eventually( +() -> { +// otel-collector dumps every report in a new line, so in order to re-use the +// same collector across multiple tests, let's read only the last line +getOtelContainer() +.copyFileFromContainer( + getOtelContainer().getOutputLogPath().toString(), +inputStream -> { +List lines = new ArrayList<>(); +BufferedReader input = +new BufferedReader( +new InputStreamReader(inputStream)); +String last = ""; +String line; + +while ((line = input.readLine()) != null) { +lines.add(line); +last = line; +} + +ObjectMapper mapper = new ObjectMapper(); +JsonNode json = mapper.readValue(last, JsonN
[jira] [Assigned] (FLINK-36317) Populate the ArchivedExecutionGraph with CheckpointStatsSnapshot data if in WaitingForResources state with a previousExecutionGraph being set
[ https://issues.apache.org/jira/browse/FLINK-36317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reassigned FLINK-36317: - Assignee: Eaugene Thomas > Populate the ArchivedExecutionGraph with CheckpointStatsSnapshot data if in > WaitingForResources state with a previousExecutionGraph being set > - > > Key: FLINK-36317 > URL: https://issues.apache.org/jira/browse/FLINK-36317 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 2.0-preview >Reporter: Matthias Pohl >Assignee: Eaugene Thomas >Priority: Minor > Labels: starter > > in FLINK-36295 we noticed an issue with the WaitingForResources state that > follows a restartable failure. The CheckpointStatistics are present but not > exposed through the ArchivedExecutionGraph despite being available. > We should think about adding these stats in {{WaitingForResources#getJob}} to > have them accessible even if the job isn't running at the moment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]
rkhachatryan commented on code in PR #25539: URL: https://github.com/apache/flink/pull/25539#discussion_r1814757339 ## flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java: ## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.otel; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.LogicalScopeProvider; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.metrics.util.TestHistogram; +import org.apache.flink.util.TestLoggerExtension; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link OpenTelemetryMetricReporter}. */ +@ExtendWith(TestLoggerExtension.class) +public class OpenTelemetryMetricReporterITCase extends OpenTelemetryTestBase { + +private static final long TIME_MS = 1234; + +private OpenTelemetryMetricReporter reporter; +private final Histogram histogram = new TestHistogram(); + +@BeforeEach +public void setUpEach() { +reporter = +new OpenTelemetryMetricReporter( +Clock.fixed(Instant.ofEpochMilli(TIME_MS), Clock.systemUTC().getZone())); +} Review Comment: Should this be closed? ## flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java: ## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.otel; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.LogicalScopeProvider; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.metrics.util.TestHistogram; +import org.apache.flink.util.TestLoggerExtension; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link OpenTelemetryMetricReporter}. */ +@ExtendWith(TestLoggerExtension.class) +public class OpenTelemetryMetricReporterITCase extends OpenTelemetryTestBase { + +private static final long TIME_MS = 1234; + +private OpenTelemetryMetri
Re: [PR] [FLINK-36595][docs] Explicitly set connector compatibility as string to prevent version comparison mismatch [flink-connector-kafka]
dannycranmer merged PR #132: URL: https://github.com/apache/flink-connector-kafka/pull/132 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-36595) Supported Flink version for connector documentation does not get matched for 1.20
[ https://issues.apache.org/jira/browse/FLINK-36595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer resolved FLINK-36595. --- Resolution: Fixed > Supported Flink version for connector documentation does not get matched for > 1.20 > - > > Key: FLINK-36595 > URL: https://issues.apache.org/jira/browse/FLINK-36595 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.20.0 >Reporter: Aleksandr Pilipenko >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.4.0 > > Attachments: image-2024-10-24-00-30-43-909.png > > > Connector documentation does not pick up already released artifacts for Flink > Example: > Kafka Connector documentation > !image-2024-10-24-00-30-43-909.png|width=813,height=439! > h3. Root cause: > Flink version compatibility is set in connector documentation sources: > [https://github.com/apache/flink-connector-kafka/blob/f4075ca1cd9aea4c915fe32207e736f267b6c3cf/docs/data/kafka.yml] > {code:yaml} > flink_compatibility: [1.19, 1.20] > {code} > Parsed config representation has trailing zero dropped from 1.20: > {code:java} > flink_compatibility:[1.19 1.2] > {code} > > This leads to failure to recognize Flink 1.20 as a version supported by > connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent/RenameColunEvent [flink-cdc]
yuxiqian commented on code in PR #3622: URL: https://github.com/apache/flink-cdc/pull/3622#discussion_r1814417426 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java: ## @@ -50,6 +50,7 @@ public class ValuesDataSourceHelper { */ public enum EventSetId { SINGLE_SPLIT_SINGLE_TABLE, +SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE, Review Comment: Maybe `ConfigOption`'s description needs to be updated correspondingly ## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java: ## @@ -169,6 +171,66 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}"); } +@ParameterizedTest +@EnumSource +void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi sinkApi) +throws Exception { +FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + +// Setup value source +Configuration sourceConfig = new Configuration(); +sourceConfig.set( +ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE); +SourceDef sourceDef = +new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + +// Setup value sink +Configuration sinkConfig = new Configuration(); +sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); +sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); +SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + +// Setup pipeline +Configuration pipelineConfig = new Configuration(); +pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + +PipelineDef pipelineDef = +new PipelineDef( +sourceDef, +sinkDef, +Collections.emptyList(), +Collections.emptyList(), +Collections.emptyList(), +pipelineConfig); + +// Execute the pipeline +PipelineExecution execution = composer.compose(pipelineDef); +execution.execute(); + +// Check result in ValuesDatabase +List results = ValuesDatabase.getResults(TABLE_1); +assertThat(results) +.contains( Review Comment: Will `.containsExactlyInAnyOrder` be more suitable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36595) Supported Flink version for connector documentation does not get matched for 1.20
[ https://issues.apache.org/jira/browse/FLINK-36595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892460#comment-17892460 ] Danny Cranmer commented on FLINK-36595: --- Merged commit [{{06159a2}}|https://github.com/apache/flink-connector-kafka/commit/06159a21f87b3791c9ae932b2610441c801ad83f] into apache:v3.3 > Supported Flink version for connector documentation does not get matched for > 1.20 > - > > Key: FLINK-36595 > URL: https://issues.apache.org/jira/browse/FLINK-36595 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.20.0 >Reporter: Aleksandr Pilipenko >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.4.0 > > Attachments: image-2024-10-24-00-30-43-909.png > > > Connector documentation does not pick up already released artifacts for Flink > Example: > Kafka Connector documentation > !image-2024-10-24-00-30-43-909.png|width=813,height=439! > h3. Root cause: > Flink version compatibility is set in connector documentation sources: > [https://github.com/apache/flink-connector-kafka/blob/f4075ca1cd9aea4c915fe32207e736f267b6c3cf/docs/data/kafka.yml] > {code:yaml} > flink_compatibility: [1.19, 1.20] > {code} > Parsed config representation has trailing zero dropped from 1.20: > {code:java} > flink_compatibility:[1.19 1.2] > {code} > > This leads to failure to recognize Flink 1.20 as a version supported by > connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35566] Promote TypeSerializer from PublicEvolving to Public [flink]
superdiaodiao commented on code in PR #25081: URL: https://github.com/apache/flink/pull/25081#discussion_r1814637228 ## pom.xml: ## @@ -2386,6 +2386,9 @@ under the License. org.apache.flink.types.DoubleValue org.apache.flink.types.FloatValue org.apache.flink.types.NormalizableKey + + org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility + org.apache.flink.api.common.typeutils.TypeSerializerSnapshot#readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader) Review Comment: Without these two `exclude`, two errors will appear when running the `mvn deploy`: `There is at least one incompatibility: org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleAfterMigration():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleAsIs():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(org.apache.flink.api.common.typeutils.TypeSerializer):CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.incompatible():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader):CLASS_GENERIC_TEMPLATE_CHANGED` `There is at least one incompatibility: org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader):CLASS_GENERIC_TEMPLATE_CHANGED` May I ask is this way OK? @masteryhx What's more, I found that there is another error, please refer: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=63265&view=logs&j=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819&t=2dd510a3-5041-5201-6dc3-54d310f68906 It seems that it is not relate to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35566] Promote TypeSerializer from PublicEvolving to Public [flink]
superdiaodiao commented on code in PR #25081: URL: https://github.com/apache/flink/pull/25081#discussion_r1814637228 ## pom.xml: ## @@ -2386,6 +2386,9 @@ under the License. org.apache.flink.types.DoubleValue org.apache.flink.types.FloatValue org.apache.flink.types.NormalizableKey + + org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility + org.apache.flink.api.common.typeutils.TypeSerializerSnapshot#readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader) Review Comment: Without these two `exclude`, two errors will appear when running the `mvn deploy`: `There is at least one incompatibility: org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleAfterMigration():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleAsIs():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(org.apache.flink.api.common.typeutils.TypeSerializer):CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.incompatible():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader):CLASS_GENERIC_TEMPLATE_CHANGED` `There is at least one incompatibility: org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader):CLASS_GENERIC_TEMPLATE_CHANGED` May I ask is this way OK? @masteryhx What's more, I found the CI failure, please refer: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=63265&view=logs&j=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819&t=2dd510a3-5041-5201-6dc3-54d310f68906 It seems that it is not relate to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36593][flink-runtime] Update io.airlift:aircompressor to 0.27 [flink]
tomncooper commented on PR #25567: URL: https://github.com/apache/flink/pull/25567#issuecomment-2434752346 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36523][Connectors/DynamoDB][BugFix] Fixing LATEST behaviour for DDB Connector [flink-connector-aws]
hlteoh37 merged PR #177: URL: https://github.com/apache/flink-connector-aws/pull/177 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33571][table] Backport json-path upgrade to deal with CVE [flink]
tomncooper opened a new pull request, #25573: URL: https://github.com/apache/flink/pull/25573 ## What is the purpose of the change Currently, the `flink-table` modules uses json-path 2.7.0. This is vulnerable to CVE-2023-1370 and CVE-2023-51074. ## Brief change log Upgrades the `com.jayway.jsonpath:json-path` version used in `flink-table` from `2.7.0` to `2.9.0`. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36002] Add CompiledPlan annotations to BatchExecMatch [flink]
dawidwys commented on code in PR #25171: URL: https://github.com/apache/flink/pull/25171#discussion_r1814809608 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/MatchRecognizeTestPrograms.java: ## @@ -121,6 +122,27 @@ public class MatchRecognizeTestPrograms { .build(); static final Row[] BEFORE_DATA = { +Row.of("2020-10-10 00:00:01", 9, 1), +Row.of("2020-10-10 00:00:01", 8, 2), +Row.of("2020-10-10 00:00:01", 10, 3), +Row.of("2020-10-10 00:00:04", 7, 4), +Row.of("2020-10-10 00:00:06", 5, 6), +Row.of("2020-10-10 00:00:07", 8, 5), +Row.of("2020-10-10 00:00:12", 3, 7), +Row.of("2020-10-10 00:00:16", 4, 9), +Row.of("2020-10-10 00:00:32", 7, 10), +Row.of("2020-10-10 00:00:33", 9, 12), +Row.of("2020-10-10 00:00:34", 5, 11) +}; + +static final Row[] AFTER_DATA = { +Row.of("2020-10-10 00:00:41", 3, 13), +Row.of("2020-10-10 00:00:42", 11, 16), +Row.of("2020-10-10 00:00:43", 12, 15), +Row.of("2020-10-10 00:00:44", 13, 14) +}; + +static final Row[] BEFORE_DATA_WITH_OUT_OF_ORDER_DATA = { Review Comment: why default scope? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/MatchRecognizeTestPrograms.java: ## @@ -181,6 +203,47 @@ public class MatchRecognizeTestPrograms { .runSql(getEventTimeSql("ORDER BY rowtime, sequence_num")) .build(); +static final SourceTestStep SOURCE_WITH_OUT_OF_ORDER_DATA = Review Comment: Why is this in the default scope? Can't it be `private` if `public` is not necessary? ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java: ## @@ -86,7 +89,8 @@ public abstract class CommonExecMatch extends ExecNodeBase public static final String MATCH_TRANSFORMATION = "match"; -private final MatchSpec matchSpec; +@JsonProperty(FIELD_NAME_MATCH_SPEC) Review Comment: Could you please also deduplicate the field from `StreamExecMatch` since you're touching this anyhow. I don't think it's necessary to be a separate field in `StreamExecMatch`. ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/MatchRecognizeTestPrograms.java: ## @@ -121,6 +122,27 @@ public class MatchRecognizeTestPrograms { .build(); static final Row[] BEFORE_DATA = { +Row.of("2020-10-10 00:00:01", 9, 1), +Row.of("2020-10-10 00:00:01", 8, 2), +Row.of("2020-10-10 00:00:01", 10, 3), +Row.of("2020-10-10 00:00:04", 7, 4), +Row.of("2020-10-10 00:00:06", 5, 6), +Row.of("2020-10-10 00:00:07", 8, 5), +Row.of("2020-10-10 00:00:12", 3, 7), +Row.of("2020-10-10 00:00:16", 4, 9), +Row.of("2020-10-10 00:00:32", 7, 10), +Row.of("2020-10-10 00:00:33", 9, 12), +Row.of("2020-10-10 00:00:34", 5, 11) +}; + +static final Row[] AFTER_DATA = { Review Comment: why default scope? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/MatchRecognizeTestPrograms.java: ## @@ -136,7 +158,7 @@ public class MatchRecognizeTestPrograms { Row.of("2020-10-10 00:00:34", 5, 11) }; -static final Row[] AFTER_DATA = { +static final Row[] AFTER_DATA_WITH_OUT_OF_ORDER_DATA = { Review Comment: why default scope? ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java: ## @@ -86,7 +89,8 @@ public abstract class CommonExecMatch extends ExecNodeBase public static final String MATCH_TRANSFORMATION = "match"; -private final MatchSpec matchSpec; +@JsonProperty(FIELD_NAME_MATCH_SPEC) Review Comment: This introduces circular dependency. You use a constant from a child class that you use in the parent class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36596] Fix unable to schema evolve with project-less transform rules [flink-cdc]
yuxiqian opened a new pull request, #3665: URL: https://github.com/apache/flink-cdc/pull/3665 This closes FLINK-36596. Requires FLINK-36461 being merged first. As described in docs, omitting `transform[*].projection` field in YAML definition should be equivalent to `projection: \*` expression. However, now schema change events will be dropped when there are no projection fields explicitly written down. For example, the following definition: ```yaml transform: - source-table: db.tbl filter: x > 0 ``` could not apply upstream schema change events to downstream due to the lacking of `projection` block, which would be surprising. Unifying the syntax of `projection: \*` and omitting `projection` field should fix this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36556][FLINK-36555] Improve buffer debloating [flink]
pnowojski opened a new pull request, #25574: URL: https://github.com/apache/flink/pull/25574 ## What is the purpose of the change This PR intends to improve the behaviour of buffer debloating. ## Brief change log [FLINK-36555] Guarantee buffer grows even with very small alpha [FLINK-36556] Add starting buffer size config option ## Verifying this change This PR adds new unit tests or is cover by existing ones ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / **JavaDocs** / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36556) Allow to configure starting buffer size when using buffer debloating
[ https://issues.apache.org/jira/browse/FLINK-36556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36556: --- Labels: pull-request-available (was: ) > Allow to configure starting buffer size when using buffer debloating > > > Key: FLINK-36556 > URL: https://issues.apache.org/jira/browse/FLINK-36556 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network, Runtime / Task >Affects Versions: 1.19.1 >Reporter: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Starting buffer size is 32KB, so during recovery/startup, before backpressure > kicks in, Flink job can be flooded with large buffers, completely stalling > the progress. > Proposed solution is to make this starting size configurable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36556][FLINK-36555] Improve buffer debloating [flink]
flinkbot commented on PR #25574: URL: https://github.com/apache/flink/pull/25574#issuecomment-2435567392 ## CI report: * 4db768dbd95281848b092efdd7fcb2649f1a9c79 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]
pnowojski commented on code in PR #25539: URL: https://github.com/apache/flink/pull/25539#discussion_r1814837025 ## flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.otel; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.TestContainerExtension; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import com.esotericsoftware.minlog.Log; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.BaseConsumer; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** Tests for {@link OpenTelemetryMetricReporter}. */ +@ExtendWith(TestLoggerExtension.class) +public class OpenTelemetryTestBase { +public static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryTestBase.class); + +private static final Duration TIME_OUT = Duration.ofMinutes(2); + +@RegisterExtension +@Order(1) +private static final AllCallbackWrapper> +OTEL_EXTENSION = +new AllCallbackWrapper<>( +new TestContainerExtension<>(() -> new OtelTestContainer())); + +@BeforeEach +public void setup() { +Slf4jLevelLogConsumer logConsumer = new Slf4jLevelLogConsumer(LOG); + OTEL_EXTENSION.getCustomExtension().getTestContainer().followOutput(logConsumer); +} + +public static OtelTestContainer getOtelContainer() { +return OTEL_EXTENSION.getCustomExtension().getTestContainer(); +} + +public static MetricConfig createMetricConfig() { +MetricConfig metricConfig = new MetricConfig(); +metricConfig.setProperty( +OpenTelemetryReporterOptions.EXPORTER_ENDPOINT.key(), +getOtelContainer().getGrpcEndpoint()); +return metricConfig; +} + +public static void eventuallyConsumeJson(ThrowingConsumer jsonConsumer) +throws Exception { +eventually( +() -> { +// otel-collector dumps every report in a new line, so in order to re-use the +// same collector across multiple tests, let's read only the last line +getOtelContainer() +.copyFileFromContainer( + getOtelContainer().getOutputLogPath().toString(), +inputStream -> { +List lines = new ArrayList<>(); +BufferedReader input = +new BufferedReader( +new InputStreamReader(inputStream)); +String last = ""; +String line; + +while ((line = input.readLine()) != null) { +lines.add(line); +last = line; +} + +ObjectMapper mapper = new ObjectMapper(); +JsonNode json = mapper.readValue(last, JsonN
[jira] [Resolved] (FLINK-36590) Flink CDC does not Support MySQL > 8.0.x
[ https://issues.apache.org/jira/browse/FLINK-36590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux resolved FLINK-36590. - Resolution: Duplicate Duplicate of FLINK-36520 > Flink CDC does not Support MySQL > 8.0.x > > > Key: FLINK-36590 > URL: https://issues.apache.org/jira/browse/FLINK-36590 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Major > > MySQL 8.4 dropped `SHOW MASTER STATUS` support and favor `SHOW BINARY LOG > STATUS` instead. The former command is hard-encoded inside and could not work > with MySQL > 8.0.x. > This might involve some cherry-picking of > [https://github.com/debezium/debezium/pull/5765] and > [https://github.com/debezium/debezium/pull/5769] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33696) FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter
[ https://issues.apache.org/jira/browse/FLINK-33696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-33696. -- Release Note: Flink now supports reporting metrics and traces to OpenTelemetry using gRPC Resolution: Fixed Merged to master as 437a47fd32a..5ba3f91717d > FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter > > > Key: FLINK-33696 > URL: https://issues.apache.org/jira/browse/FLINK-33696 > Project: Flink > Issue Type: New Feature > Components: Runtime / Metrics >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > h1. Motivation > [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces] > is adding TraceReporter interface. However with > [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces] > alone, Log4jTraceReporter would be the only available implementation of > TraceReporter interface, which is not very helpful. > In this FLIP I’m proposing to contribute both MetricExporter and > TraceReporter implementation using OpenTelemetry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35600) Data read duplication during the full-to-incremental conversion phase
[ https://issues.apache.org/jira/browse/FLINK-35600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruan Hang updated FLINK-35600: -- Fix Version/s: cdc-3.3.0 > Data read duplication during the full-to-incremental conversion phase > - > > Key: FLINK-35600 > URL: https://issues.apache.org/jira/browse/FLINK-35600 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Di Wu >Assignee: Di Wu >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.3.0 > > > Assume that the table has been split into 3 Chunks > Timeline > t1: chunk1 is read > t2: a piece of data A belonging to chunk2 is inserted in MySQL > t3: chunk2 is read, and data A has been sent downstream > t4: chunk3 is read > At this time, startOffset will be set to lowwatermark > t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method > *shouldEmit* to determine whether the data is sent downstream > In this method > {code:java} > private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset > position) { > if (pureBinlogPhaseTables.contains(tableId)) { > return true; > } > // the existed tables those have finished snapshot reading > if (maxSplitHighWatermarkMap.containsKey(tableId) > && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) { > pureBinlogPhaseTables.add(tableId); > return true; > } > } {code} > *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data > without ts_sec variable, and the default value is 0 > *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))* > So this expression is judged as true > *Data A continues to be sent downstream, and the data is repeated* -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36332] Missed webhook okhttp reference. [flink-kubernetes-operator]
SamBarker commented on code in PR #906: URL: https://github.com/apache/flink-kubernetes-operator/pull/906#discussion_r1815896525 ## flink-kubernetes-webhook/pom.xml: ## @@ -24,7 +24,7 @@ under the License. org.apache.flink flink-kubernetes-operator-parent 1.11-SNAPSHOT -.. Review Comment: So checking https://maven.apache.org/ref/3.9.9/maven-model/maven.html#parent intellij is correct `..` is not a valid reference, but based on stackoverflow and some other sources it might have been at one point or at least commonly thought to be. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent/RenameColunEvent [flink-cdc]
yuxiqian commented on code in PR #3622: URL: https://github.com/apache/flink-cdc/pull/3622#discussion_r1815882769 ## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java: ## @@ -169,6 +171,66 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}"); } +@ParameterizedTest +@EnumSource +void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi sinkApi) +throws Exception { +FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + +// Setup value source +Configuration sourceConfig = new Configuration(); +sourceConfig.set( +ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE); +SourceDef sourceDef = +new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + +// Setup value sink +Configuration sinkConfig = new Configuration(); +sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); +sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); +SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + +// Setup pipeline +Configuration pipelineConfig = new Configuration(); +pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + +PipelineDef pipelineDef = +new PipelineDef( +sourceDef, +sinkDef, +Collections.emptyList(), +Collections.emptyList(), +Collections.emptyList(), +pipelineConfig); + +// Execute the pipeline +PipelineExecution execution = composer.compose(pipelineDef); +execution.execute(); + +// Check result in ValuesDatabase +List results = ValuesDatabase.getResults(TABLE_1); +assertThat(results) +.containsExactly( Review Comment: `ValuesDatabase` uses `HashMap` to store records internally, and does not guarantee orderness IIUC. `containsExactlyInAnyOrder` might be more appropriate. ## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java: ## @@ -169,6 +171,66 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}"); } +@ParameterizedTest +@EnumSource +void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi sinkApi) +throws Exception { +FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + +// Setup value source +Configuration sourceConfig = new Configuration(); +sourceConfig.set( +ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE); +SourceDef sourceDef = +new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + +// Setup value sink +Configuration sinkConfig = new Configuration(); +sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); +sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); +SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + +// Setup pipeline +Configuration pipelineConfig = new Configuration(); +pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + +PipelineDef pipelineDef = +new PipelineDef( +sourceDef, +sinkDef, +Collections.emptyList(), +Collections.emptyList(), +Collections.emptyList(), +pipelineConfig); + +// Execute the pipeline +PipelineExecution execution = composer.compose(pipelineDef); +execution.execute(); + +// Check result in ValuesDatabase +List results = ValuesDatabase.getResults(TABLE_1); +assertThat(results) +.containsExactly( Review Comment: `ValuesDatabase` uses `HashMap` to store records internally, and does not guarantee orderliness IIUC. `containsExactlyInAnyOrder` might be more appropriate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the
[jira] [Updated] (FLINK-36520) Flink CDC doesn't support MySQL > 8.0
[ https://issues.apache.org/jira/browse/FLINK-36520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36520: --- Labels: pull-request-available (was: ) > Flink CDC doesn't support MySQL > 8.0 > - > > Key: FLINK-36520 > URL: https://issues.apache.org/jira/browse/FLINK-36520 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: 1.19.1 >Reporter: Maciej Bryński >Priority: Major > Labels: pull-request-available > > Hi, > I'm trying to use Flink-CDC MySQL connector with MySQL 8.4. > This results in exception: > {code} > Caused by: org.apache.flink.util.FlinkRuntimeException: Read split > MySqlSnapshotSplit{tableId=customer.customers, > splitId='customer.customers:0', splitKeyType=[`id` BIGINT NOT NULL], > splitStart=null, splitEnd=null, highWatermark=null} error due to > org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename > and position via 'SHOW MASTER STATUS'. Make sure your server is correctly > configured. > {code} > This is because SHOW MASTER STATUS statement isn't supported in this version: > https://dev.mysql.com/doc/refman/8.4/en/show-master-status.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36542) Bump spotless version to 2.35.0
[ https://issues.apache.org/jira/browse/FLINK-36542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892471#comment-17892471 ] Wenjun Ruan commented on FLINK-36542: - After upgrade spotless-maven-plugin version to 2.35.0, the upToDateChecking is enable by default. > Bump spotless version to 2.35.0 > --- > > Key: FLINK-36542 > URL: https://issues.apache.org/jira/browse/FLINK-36542 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Wenjun Ruan >Assignee: Wenjun Ruan >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > Attachments: image-2024-10-15-22-01-25-049.png > > > I use `mvn spotless:apply` to format the code format, find it will cost 01:13 > min, this is too slow and each time I execute `mvn spotless:apply` it will > cost 1 min. > I hope we can enable > upToDateChecking setting to speed up the spotless > [https://github.com/diffplug/spotless/tree/main/plugin-maven#incremental-up-to-date-checking-and-formatting] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent/RenameColunEvent [flink-cdc]
yuxiqian commented on PR #3622: URL: https://github.com/apache/flink-cdc/pull/3622#issuecomment-2434508943 Also could you please please rebase this PR with latest `master` branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36597) Optimize SnapshotSplitAssigner error log output
MOBIN created FLINK-36597: - Summary: Optimize SnapshotSplitAssigner error log output Key: FLINK-36597 URL: https://issues.apache.org/jira/browse/FLINK-36597 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.2.0 Reporter: MOBIN Attachments: image-2024-10-24-16-09-45-722.png !image-2024-10-24-16-09-45-722.png! Caused by: java.lang.IllegalStateException: Invalid assigner status {} [NEWLY_ADDED_ASSIGNING_FINISHED] --> Caused by: java.lang.IllegalStateException: Invalid assigner status NEWLY_ADDED_ASSIGNING_FINISHED -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36597] Optimize SnapshotSplitAssigner error log output [flink-cdc]
MOBIN-F opened a new pull request, #3664: URL: https://github.com/apache/flink-cdc/pull/3664  Caused by: java.lang.IllegalStateException: Invalid assigner status {} [NEWLY_ADDED_ASSIGNING_FINISHED] --> Caused by: java.lang.IllegalStateException: Invalid assigner status NEWLY_ADDED_ASSIGNING_FINISHED -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36597) Optimize SnapshotSplitAssigner error log output
[ https://issues.apache.org/jira/browse/FLINK-36597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36597: --- Labels: pull-request-available (was: ) > Optimize SnapshotSplitAssigner error log output > --- > > Key: FLINK-36597 > URL: https://issues.apache.org/jira/browse/FLINK-36597 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: MOBIN >Priority: Minor > Labels: pull-request-available > Attachments: image-2024-10-24-16-09-45-722.png > > > !image-2024-10-24-16-09-45-722.png! > Caused by: java.lang.IllegalStateException: Invalid assigner status {} > [NEWLY_ADDED_ASSIGNING_FINISHED] > --> > Caused by: java.lang.IllegalStateException: Invalid assigner status > NEWLY_ADDED_ASSIGNING_FINISHED -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36523) DDB Streams connector treating LATEST as TRIM_HORIZON for some cases
[ https://issues.apache.org/jira/browse/FLINK-36523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh reassigned FLINK-36523: --- Assignee: Abhi Gupta > DDB Streams connector treating LATEST as TRIM_HORIZON for some cases > > > Key: FLINK-36523 > URL: https://issues.apache.org/jira/browse/FLINK-36523 > Project: Flink > Issue Type: Bug > Components: Connectors / DynamoDB >Reporter: Abhi Gupta >Assignee: Abhi Gupta >Priority: Major > Labels: pull-request-available > > For a shard lineage a->b->c->d > The DDB connector starts itself as > > a (LATEST) -> b(TRIM_HORIZON) -> c(TRIM_HORIZON) -> d(TRIM_HORIZON) > > whereas it should start itself as > > d(LATEST) -> TRIM_HORIZON from here on. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36490][TABLE API] Remove the deprecated method in DataStreamScanProvider [flink]
tinaselenge commented on PR #25563: URL: https://github.com/apache/flink/pull/25563#issuecomment-2434696853 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36065][runtime] Support submit stream graph. [flink]
zhuzhurk closed pull request #25472: [FLINK-36065][runtime] Support submit stream graph. URL: https://github.com/apache/flink/pull/25472 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-36065) Support for submitting StreamGraph
[ https://issues.apache.org/jira/browse/FLINK-36065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-36065. --- Fix Version/s: 2.0.0 Resolution: Done 98f712640555333d5e1842a53441b283ca2f6e29 64cbcd2d4b360f03f7964edece6f3211d184fb06 > Support for submitting StreamGraph > -- > > Key: FLINK-36065 > URL: https://issues.apache.org/jira/browse/FLINK-36065 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > This ticket will encompass the following tasks: > # Make StreamGraph serializable > # Support the submission of StreamGraph, allowing the Dispatcher to receive > a StreamGraph, which the JobManager will then use to execute the job. > Note that this ticket will not modify the existing JobGraph submission path. > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36032) FLIP-468: Introducing StreamGraph-Based Job Submission
[ https://issues.apache.org/jira/browse/FLINK-36032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-36032: Fix Version/s: 2.0.0 > FLIP-468: Introducing StreamGraph-Based Job Submission > -- > > Key: FLINK-36032 > URL: https://issues.apache.org/jira/browse/FLINK-36032 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination, Runtime / REST >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Fix For: 2.0.0 > > > This is the umbrella ticket for > [FLIP-468|https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36596) YAML Pipeline fails to schema change with no projection fields specified
[ https://issues.apache.org/jira/browse/FLINK-36596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36596: --- Labels: pull-request-available (was: ) > YAML Pipeline fails to schema change with no projection fields specified > > > Key: FLINK-36596 > URL: https://issues.apache.org/jira/browse/FLINK-36596 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: yux >Priority: Major > Labels: pull-request-available > > As described in docs, omitting transform[*].projection field should be > equivalent to `projection: \*` expression. > However, now schema change events will be dropped when there are no > projection fields explicitly written down. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-36523) DDB Streams connector treating LATEST as TRIM_HORIZON for some cases
[ https://issues.apache.org/jira/browse/FLINK-36523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh resolved FLINK-36523. - Resolution: Fixed merged commit [{{52870f1}}|https://github.com/apache/flink-connector-aws/commit/52870f1a0b301c2f73dfc4f6dc91d09cc302ab0f] into apache:main > DDB Streams connector treating LATEST as TRIM_HORIZON for some cases > > > Key: FLINK-36523 > URL: https://issues.apache.org/jira/browse/FLINK-36523 > Project: Flink > Issue Type: Bug > Components: Connectors / DynamoDB >Reporter: Abhi Gupta >Priority: Major > Labels: pull-request-available > > For a shard lineage a->b->c->d > The DDB connector starts itself as > > a (LATEST) -> b(TRIM_HORIZON) -> c(TRIM_HORIZON) -> d(TRIM_HORIZON) > > whereas it should start itself as > > d(LATEST) -> TRIM_HORIZON from here on. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36542] Enable upToDateChecking to speed up the spotless [flink]
ferenc-csaky commented on PR #25525: URL: https://github.com/apache/flink/pull/25525#issuecomment-2434614251 @jnh5y @ruanwenjun Thanks for the added thoughts! I am okay with updating this PR with simply bumping the spotless version to `2.35.0`, putting the index under `target/` is not harmful at all, so my original point does not matter anyway. @ruanwenjun Would you mind adapting the Jira ticket ticket summary/desc and the PR title accordingly as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36598) [state/forst] Refactor initialization of ForSt db core
Zakelly Lan created FLINK-36598: --- Summary: [state/forst] Refactor initialization of ForSt db core Key: FLINK-36598 URL: https://issues.apache.org/jira/browse/FLINK-36598 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Zakelly Lan Assignee: Zakelly Lan Currently, the {{ForSt}} creates {{FileSystem}} itself. It is better to create it from flink side add give it to {{ForSt}}. By doing so, we are able to inject a cache or share files with the checkpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35566] Promote TypeSerializer from PublicEvolving to Public [flink]
superdiaodiao commented on code in PR #25081: URL: https://github.com/apache/flink/pull/25081#discussion_r1814637228 ## pom.xml: ## @@ -2386,6 +2386,9 @@ under the License. org.apache.flink.types.DoubleValue org.apache.flink.types.FloatValue org.apache.flink.types.NormalizableKey + + org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility + org.apache.flink.api.common.typeutils.TypeSerializerSnapshot#readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader) Review Comment: Without these two `exclude`, two errors will appear when running the `mvn deploy`: `There is at least one incompatibility: org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleAfterMigration():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleAsIs():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(org.apache.flink.api.common.typeutils.TypeSerializer):CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.incompatible():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader):CLASS_GENERIC_TEMPLATE_CHANGED` `There is at least one incompatibility: org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader):CLASS_GENERIC_TEMPLATE_CHANGED` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-36428) DynamoDb Table API Sink fails when null value in the RowData
[ https://issues.apache.org/jira/browse/FLINK-36428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh resolved FLINK-36428. - Resolution: Fixed merged commit [{{c057031}}|https://github.com/apache/flink-connector-aws/commit/c057031ce8ef5f90223d9540529bef837df1a970] into apache:main > DynamoDb Table API Sink fails when null value in the RowData > > > Key: FLINK-36428 > URL: https://issues.apache.org/jira/browse/FLINK-36428 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS >Affects Versions: 1.18.1 >Reporter: maoxingda >Assignee: maoxingda >Priority: Minor > Labels: pull-request-available, starter > Fix For: 1.19.1 > > > DynamoDb Table API Sink fails when there are null values in the RowData. > > package com.meican; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > public class SqlDynamodbSinkApp { > public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > tableEnv.executeSql("create temporary view source as " + > "select '1' as id, 'name1' as name, 18 as age union all " + > "select '2' as id, 'name2' as name, 19 as age union all " + > "select '3' as id, cast(null as string) as name, 20 as age" > ); > tableEnv.executeSql("create table sink" + > "(" + > " id string," + > " name string," + > " age int" + > ") partitioned by ( id )" + > "with" + > "(" + > " 'connector' = 'dynamodb'," + > " 'aws.region' = 'cn-northwest-1'," + > " 'table-name' = 'bi-oltp-mydata'," + > " 'ignore-nulls' = 'true'" + > ")" > ); > tableEnv.executeSql("insert into sink select * from source"); > } > } > > java.lang.NullPointerException: null > at > org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:39) > ~[flink-table-runtime-1.18.0.jar:1.18.0] > at > org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:27) > ~[flink-table-runtime-1.18.0.jar:1.18.0] > at > org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.lambda$addAttribute$0(RowDataToAttributeValueConverter.java:88) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > software.amazon.awssdk.enhanced.dynamodb.internal.mapper.ResolvedImmutableAttribute.lambda$create$0(ResolvedImmutableAttribute.java:54) > ~[dynamodb-enhanced-2.20.144.jar:?] > at > software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.lambda$itemToMap$5(StaticImmutableTableSchema.java:518) > ~[dynamodb-enhanced-2.20.144.jar:?] > at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?] > at > java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) > ~[?:?] > at > software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.itemToMap(StaticImmutableTableSchema.java:516) > ~[dynamodb-enhanced-2.20.144.jar:?] > at > software.amazon.awssdk.enhanced.dynamodb.mapper.WrappedTableSchema.itemToMap(WrappedTableSchema.java:67) > ~[dynamodb-enhanced-2.20.144.jar:?] > at > org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.convertRowData(RowDataToAttributeValueConverter.java:53) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:56) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:35) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:328) > ~[flink-connector-files-1.17.2.jar:1.17.2] > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.jav
Re: [PR] [FLINK-36428][Connectors/DynamoDB] DynamoDb Table API Sink fails when null value in the RowData [flink-connector-aws]
boring-cyborg[bot] commented on PR #173: URL: https://github.com/apache/flink-connector-aws/pull/173#issuecomment-2434781963 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35566] Promote TypeSerializer from PublicEvolving to Public [flink]
superdiaodiao commented on code in PR #25081: URL: https://github.com/apache/flink/pull/25081#discussion_r1814637228 ## pom.xml: ## @@ -2386,6 +2386,9 @@ under the License. org.apache.flink.types.DoubleValue org.apache.flink.types.FloatValue org.apache.flink.types.NormalizableKey + + org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility + org.apache.flink.api.common.typeutils.TypeSerializerSnapshot#readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader) Review Comment: Without these two `exclude`, these two errors will appear when running the `mvn deploy`: `There is at least one incompatibility: org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleAfterMigration():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleAsIs():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(org.apache.flink.api.common.typeutils.TypeSerializer):CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.incompatible():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader):CLASS_GENERIC_TEMPLATE_CHANGED` `There is at least one incompatibility: org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader):CLASS_GENERIC_TEMPLATE_CHANGED` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36595) Supported Flink version for connector documentation does not get matched for 1.20
[ https://issues.apache.org/jira/browse/FLINK-36595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer reassigned FLINK-36595: - Assignee: Aleksandr Pilipenko > Supported Flink version for connector documentation does not get matched for > 1.20 > - > > Key: FLINK-36595 > URL: https://issues.apache.org/jira/browse/FLINK-36595 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.20.0 >Reporter: Aleksandr Pilipenko >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > Attachments: image-2024-10-24-00-30-43-909.png > > > Connector documentation does not pick up already released artifacts for Flink > Example: > Kafka Connector documentation > !image-2024-10-24-00-30-43-909.png|width=813,height=439! > h3. Root cause: > Flink version compatibility is set in connector documentation sources: > [https://github.com/apache/flink-connector-kafka/blob/f4075ca1cd9aea4c915fe32207e736f267b6c3cf/docs/data/kafka.yml] > {code:yaml} > flink_compatibility: [1.19, 1.20] > {code} > Parsed config representation has trailing zero dropped from 1.20: > {code:java} > flink_compatibility:[1.19 1.2] > {code} > > This leads to failure to recognize Flink 1.20 as a version supported by > connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36595) Supported Flink version for connector documentation does not get matched for 1.20
[ https://issues.apache.org/jira/browse/FLINK-36595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-36595: -- Fix Version/s: kafka-3.4.0 > Supported Flink version for connector documentation does not get matched for > 1.20 > - > > Key: FLINK-36595 > URL: https://issues.apache.org/jira/browse/FLINK-36595 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.20.0 >Reporter: Aleksandr Pilipenko >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.4.0 > > Attachments: image-2024-10-24-00-30-43-909.png > > > Connector documentation does not pick up already released artifacts for Flink > Example: > Kafka Connector documentation > !image-2024-10-24-00-30-43-909.png|width=813,height=439! > h3. Root cause: > Flink version compatibility is set in connector documentation sources: > [https://github.com/apache/flink-connector-kafka/blob/f4075ca1cd9aea4c915fe32207e736f267b6c3cf/docs/data/kafka.yml] > {code:yaml} > flink_compatibility: [1.19, 1.20] > {code} > Parsed config representation has trailing zero dropped from 1.20: > {code:java} > flink_compatibility:[1.19 1.2] > {code} > > This leads to failure to recognize Flink 1.20 as a version supported by > connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36428) DynamoDb Table API Sink fails when null value in the RowData
[ https://issues.apache.org/jira/browse/FLINK-36428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh reassigned FLINK-36428: --- Assignee: maoxingda > DynamoDb Table API Sink fails when null value in the RowData > > > Key: FLINK-36428 > URL: https://issues.apache.org/jira/browse/FLINK-36428 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS >Affects Versions: 1.18.1 >Reporter: maoxingda >Assignee: maoxingda >Priority: Minor > Labels: pull-request-available, starter > Fix For: 1.19.1 > > > DynamoDb Table API Sink fails when there are null values in the RowData. > > package com.meican; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > public class SqlDynamodbSinkApp { > public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > tableEnv.executeSql("create temporary view source as " + > "select '1' as id, 'name1' as name, 18 as age union all " + > "select '2' as id, 'name2' as name, 19 as age union all " + > "select '3' as id, cast(null as string) as name, 20 as age" > ); > tableEnv.executeSql("create table sink" + > "(" + > " id string," + > " name string," + > " age int" + > ") partitioned by ( id )" + > "with" + > "(" + > " 'connector' = 'dynamodb'," + > " 'aws.region' = 'cn-northwest-1'," + > " 'table-name' = 'bi-oltp-mydata'," + > " 'ignore-nulls' = 'true'" + > ")" > ); > tableEnv.executeSql("insert into sink select * from source"); > } > } > > java.lang.NullPointerException: null > at > org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:39) > ~[flink-table-runtime-1.18.0.jar:1.18.0] > at > org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:27) > ~[flink-table-runtime-1.18.0.jar:1.18.0] > at > org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.lambda$addAttribute$0(RowDataToAttributeValueConverter.java:88) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > software.amazon.awssdk.enhanced.dynamodb.internal.mapper.ResolvedImmutableAttribute.lambda$create$0(ResolvedImmutableAttribute.java:54) > ~[dynamodb-enhanced-2.20.144.jar:?] > at > software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.lambda$itemToMap$5(StaticImmutableTableSchema.java:518) > ~[dynamodb-enhanced-2.20.144.jar:?] > at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?] > at > java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) > ~[?:?] > at > software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.itemToMap(StaticImmutableTableSchema.java:516) > ~[dynamodb-enhanced-2.20.144.jar:?] > at > software.amazon.awssdk.enhanced.dynamodb.mapper.WrappedTableSchema.itemToMap(WrappedTableSchema.java:67) > ~[dynamodb-enhanced-2.20.144.jar:?] > at > org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.convertRowData(RowDataToAttributeValueConverter.java:53) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:56) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:35) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:328) > ~[flink-connector-files-1.17.2.jar:1.17.2] > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask
Re: [PR] [FLINK-36575][runtime] ExecutionVertexInputInfo supports consuming subpartition groups [flink]
noorall commented on PR #25551: URL: https://github.com/apache/flink/pull/25551#issuecomment-2434867066 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35566] Promote TypeSerializer from PublicEvolving to Public [flink]
superdiaodiao commented on code in PR #25081: URL: https://github.com/apache/flink/pull/25081#discussion_r1814637228 ## pom.xml: ## @@ -2386,6 +2386,9 @@ under the License. org.apache.flink.types.DoubleValue org.apache.flink.types.FloatValue org.apache.flink.types.NormalizableKey + + org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility + org.apache.flink.api.common.typeutils.TypeSerializerSnapshot#readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader) Review Comment: Without these two `exclude`, two errors will appear when running the `mvn deploy`: `There is at least one incompatibility: org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleAfterMigration():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleAsIs():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(org.apache.flink.api.common.typeutils.TypeSerializer):CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility.incompatible():CLASS_GENERIC_TEMPLATE_CHANGED,org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader):CLASS_GENERIC_TEMPLATE_CHANGED` `There is at least one incompatibility: org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(org.apache.flink.core.memory.DataInputView,java.lang.ClassLoader):CLASS_GENERIC_TEMPLATE_CHANGED` May I ask is this way OK? @masteryhx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36592][state/forst] Support file cache for ForStStateBackend [flink]
fredia commented on code in PR #25561: URL: https://github.com/apache/flink/pull/25561#discussion_r1814392234 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java: ## @@ -58,6 +58,22 @@ public class ForStOptions { "The remote directory where ForSt puts its SST files, fallback to %s if not configured.", LOCAL_DIRECTORIES.key())); +public static final ConfigOption CACHE_DIRECTORY = +ConfigOptions.key("state.backend.forst.cache-dir") +.stringType() +.noDefaultValue() +.withDescription( +String.format( +"The directory where ForSt caches its SST files, fallback to %s/cache if not configured.", +LOCAL_DIRECTORIES.key())); + +public static final ConfigOption CACHE_CAPACITY = +ConfigOptions.key("state.backend.forst.cache-capacity") +.longType() +.defaultValue(-1L) +.withDescription( +"The capacity capacity of cache, a negative value means no cache will be used. When this value is greater than the actual available space, the actual available space will be used as the upper limit."); + Review Comment: Add `state.backend.forst.cache.reserved-size` for this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36596) YAML Pipeline fails to schema change with no projection fields specified
yux created FLINK-36596: --- Summary: YAML Pipeline fails to schema change with no projection fields specified Key: FLINK-36596 URL: https://issues.apache.org/jira/browse/FLINK-36596 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.2.0 Reporter: yux As described in docs, omitting transform[*].projection field should be equivalent to `projection: \*` expression. However, now schema change events will be dropped when there are no projection fields explicitly written down. -- This message was sent by Atlassian Jira (v8.20.10#820010)