Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]
pnowojski commented on code in PR #25539: URL: https://github.com/apache/flink/pull/25539#discussion_r1804401155 ## flink-metrics/flink-metrics-otel/pom.xml: ## @@ -0,0 +1,141 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-metrics + 2.0-SNAPSHOT + + + flink-metrics-otel + Flink : Metrics : OpenTelemetry + + + 1.30.0 Review Comment: I couldn't find matching higher versions of all of those libraries :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]
pnowojski commented on code in PR #25539: URL: https://github.com/apache/flink/pull/25539#discussion_r1804401532 ## flink-metrics/flink-metrics-otel/pom.xml: ## @@ -0,0 +1,141 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-metrics + 2.0-SNAPSHOT + + + flink-metrics-otel + Flink : Metrics : OpenTelemetry + + + 1.30.0 + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + ${flink.markBundledAsOptional} + + + + io.opentelemetry + opentelemetry-sdk-common + ${opentelemetry.version} + ${flink.markBundledAsOptional} + + + + io.opentelemetry + opentelemetry-exporter-logging + ${opentelemetry.version} + ${flink.markBundledAsOptional} + + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + ${flink.markBundledAsOptional} + + + + io.opentelemetry + opentelemetry-semconv + ${opentelemetry.version}-alpha + ${flink.markBundledAsOptional} + + + + + + io.opentelemetry.javaagent + opentelemetry-testing-common + ${opentelemetry.version}-alpha + test + + + + io.opentelemetry.javaagent + opentelemetry-agent-for-testing + ${opentelemetry.version}-alpha + test + + + + org.apache.flink + flink-metrics-core + ${project.version} + test + test-jar + + + + org.apache.flink + flink-test-utils-junit + ${project.version} + + + + org.mock-server + mockserver-netty-no-dependencies + 5.15.0 + test + + + + commons-codec + commons-codec + 1.17.1 Review Comment: 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33410] Add a test for UNBOUNDED PRECEDING [flink]
dawidwys merged PR #25536: URL: https://github.com/apache/flink/pull/25536 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy [flink-kubernetes-operator]
huyuanfeng2018 commented on PR #904: URL: https://github.com/apache/flink-kubernetes-operator/pull/904#issuecomment-2418982398 > I'm lacking context for the change so my comments could be missing the mark but I think that means I represent a lot of people trying to consume the proposed config, if the naming confuses me its likely to confuse others. @SamBarker Thank you very much for your review. `scaling.radical.enabled` is really not easy to understand. I think your concern is warranted. I will think about the naming of this option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36560) Fix the issue of timestamp_ltz increasing by 8 hours in Paimon
LvYanquan created FLINK-36560: - Summary: Fix the issue of timestamp_ltz increasing by 8 hours in Paimon Key: FLINK-36560 URL: https://issues.apache.org/jira/browse/FLINK-36560 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.2.1 Reporter: LvYanquan Fix For: cdc-3.2.1 Attachments: image-2024-10-17-17-16-01-773.png When synchronizing the timestamp field type in MySQL, it was found that the time displayed in the Paimon table was incorrect. How to reproduct: {code:java} CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee bigint not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); INSERT INTO orders VALUES (11, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (12, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (13, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (14, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (15, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (16, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (17, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);{code} My yaml job is like following: source: type: mysql hostname: host port: 3306 username: flink password: xx tables: yaml_test.\.* server-id: 22600-22620 sink: type: paimon catalog.properties.metastore: filesystem catalog.properties.warehouse: xx catalog.properties.fs.oss.endpoint: xx catalog.properties.fs.oss.accessKeyId:xx catalog.properties.fs.oss.accessKeySecret: xx pipeline: name: MySQL Database to Paimon Database Currently, the result is like following: the `create_time` and `update_time` fields are no correct. ||order_id||user_id||shop_id||product_id||buy_fee||create_time||update_time||state|| |100,001|user_001|12,345|1|5,000|2023-02-16 00:40:56|2023-02-16 02:42:56|1| |100,002|user_002|12,346|2|4,000|2023-02-15 23:40:56|2023-02-16 02:42:56|1| |100,003|user_003|12,347|3|3,000|2023-02-15 22:40:56|2023-02-16 02:42:56|1| |100,004|user_001|12,347|4|2,000|2023-02-15 21:40:56|2023-02-16 02:42:56|1| |100,005|user_002|12,348|5|1,000|2023-02-15 20:40:56|2023-02-16 02:42:56|1| |100,006|user_001|12,348|1|1,000|2023-02-15 19:40:56|2023-02-16 02:42:56|1| |100,007|user_003|12,347|4|2,000|2023-02-15 18:40:56|2023-02-16 02:42:56|1| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36560) Fix the issue of timestamp_ltz increasing by 8 hours in Paimon
[ https://issues.apache.org/jira/browse/FLINK-36560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890385#comment-17890385 ] LvYanquan commented on FLINK-36560: --- I've found the issue in code, and willing to fix it. > Fix the issue of timestamp_ltz increasing by 8 hours in Paimon > -- > > Key: FLINK-36560 > URL: https://issues.apache.org/jira/browse/FLINK-36560 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.1 >Reporter: LvYanquan >Priority: Major > Fix For: cdc-3.2.1 > > Attachments: image-2024-10-17-17-16-01-773.png > > > When synchronizing the timestamp field type in MySQL, it was found that the > time displayed in the Paimon table was incorrect. > How to reproduct: > {code:java} > CREATE TABLE `orders` ( > order_id bigint not null primary key, > user_id varchar(50) not null, > shop_id bigint not null, > product_id bigint not null, > buy_fee bigint not null, > create_time timestamp not null, > update_time timestamp not null default now(), > state int not null > ); > INSERT INTO orders VALUES > (11, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 > 18:42:56', 1), > (12, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 > 18:42:56', 1), > (13, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 > 18:42:56', 1), > (14, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 > 18:42:56', 1), > (15, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 > 18:42:56', 1), > (16, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 > 18:42:56', 1), > (17, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 > 18:42:56', 1);{code} > My yaml job is like following: > source: > type: mysql > hostname: host > port: 3306 > username: flink > password: xx > tables: yaml_test.\.* > server-id: 22600-22620 > sink: > type: paimon > catalog.properties.metastore: filesystem > catalog.properties.warehouse: xx > catalog.properties.fs.oss.endpoint: xx > catalog.properties.fs.oss.accessKeyId:xx > catalog.properties.fs.oss.accessKeySecret: xx > pipeline: > name: MySQL Database to Paimon Database > Currently, the result is like following: > the `create_time` and `update_time` fields are no correct. > ||order_id||user_id||shop_id||product_id||buy_fee||create_time||update_time||state|| > |100,001|user_001|12,345|1|5,000|2023-02-16 00:40:56|2023-02-16 02:42:56|1| > |100,002|user_002|12,346|2|4,000|2023-02-15 23:40:56|2023-02-16 02:42:56|1| > |100,003|user_003|12,347|3|3,000|2023-02-15 22:40:56|2023-02-16 02:42:56|1| > |100,004|user_001|12,347|4|2,000|2023-02-15 21:40:56|2023-02-16 02:42:56|1| > |100,005|user_002|12,348|5|1,000|2023-02-15 20:40:56|2023-02-16 02:42:56|1| > |100,006|user_001|12,348|1|1,000|2023-02-15 19:40:56|2023-02-16 02:42:56|1| > |100,007|user_003|12,347|4|2,000|2023-02-15 18:40:56|2023-02-16 02:42:56|1| > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35031][runtime] LatencyMarker/RecordAttribute emitting under async execution model [flink]
fredia merged PR #25503: URL: https://github.com/apache/flink/pull/25503 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 [flink]
SamBarker commented on code in PR #25494: URL: https://github.com/apache/flink/pull/25494#discussion_r1804065549 ## flink-rpc/flink-rpc-akka/pom.xml: ## @@ -94,8 +94,8 @@ under the License. io.netty - netty - 3.10.6.Final + netty-all Review Comment: Pekko tests that `io.netty.channel.Channel` (via reflection) is on the classpath so shading is still required to apply the relocations, even if its not moving the classes around. However as shading is only applied during package and so after test execution something has to supply netty for the tests in `flink-rpc-akka` so having netty-all in `test` scope rather than `provided` might just work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy [flink-kubernetes-operator]
SamBarker commented on code in PR #904: URL: https://github.com/apache/flink-kubernetes-operator/pull/904#discussion_r1804328631 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ## @@ -416,21 +416,31 @@ protected static > int scale( // Optimize the case where newParallelism <= maxParallelism / 2 newParallelism > numKeyGroupsOrPartitions / 2 ? numKeyGroupsOrPartitions -: numKeyGroupsOrPartitions / 2, +: numKeyGroupsOrPartitions / 2 + numKeyGroupsOrPartitions % 2, upperBound); +boolean scalingRadical = + context.getConfiguration().get(AutoScalerOptions.SCALING_RADICAL_ENABLED); + // When the shuffle type of vertex inputs contains keyBy or vertex is a source, // we try to adjust the parallelism such that it divides // the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks for (int p = newParallelism; p <= upperBoundForAlignment; p++) { -if (numKeyGroupsOrPartitions % p == 0) { +if (numKeyGroupsOrPartitions % p == 0 +|| +// When scaling radical is enabled, Try to find the smallest parallelism that +// can satisfy the +// current consumption rate. +(scalingRadical +&& numKeyGroupsOrPartitions / p +< numKeyGroupsOrPartitions / newParallelism)) { Review Comment: I think extracting this as a method `canMaximiseUtilisation` the intent is to make the intent of the condition easier to understand when working through the code. ## docs/layouts/shortcodes/generated/auto_scaler_configuration.html: ## @@ -188,6 +188,12 @@ Duration Time interval to resend the identical event + +job.autoscaler.scaling.radical.enabled Review Comment: Coming at this cold, its not at all clear to me what radical means. While the description goes some way towards clarifying the intent it doesn't feel like a great term, additionally following through the JIRA links `radical` feels like a very off term for a default (assuming I'm following properly). I wonder if `job.autoscaler.scaling.maximizeUtilisation.enabled` would make things more explicit? ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ## @@ -416,21 +416,31 @@ protected static > int scale( // Optimize the case where newParallelism <= maxParallelism / 2 newParallelism > numKeyGroupsOrPartitions / 2 ? numKeyGroupsOrPartitions -: numKeyGroupsOrPartitions / 2, +: numKeyGroupsOrPartitions / 2 + numKeyGroupsOrPartitions % 2, upperBound); +boolean scalingRadical = + context.getConfiguration().get(AutoScalerOptions.SCALING_RADICAL_ENABLED); + // When the shuffle type of vertex inputs contains keyBy or vertex is a source, // we try to adjust the parallelism such that it divides // the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks for (int p = newParallelism; p <= upperBoundForAlignment; p++) { -if (numKeyGroupsOrPartitions % p == 0) { +if (numKeyGroupsOrPartitions % p == 0 +|| +// When scaling radical is enabled, Try to find the smallest parallelism that +// can satisfy the +// current consumption rate. +(scalingRadical +&& numKeyGroupsOrPartitions / p +< numKeyGroupsOrPartitions / newParallelism)) { return p; } } -// When adjust the parallelism after rounding up cannot be evenly divided by -// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the -// current consumption rate. +// When adjust the parallelism after rounding up cannot be +// find the right degree of parallelism to meet requirements, +// Try to find the smallest parallelism that can satisfy the current consumption rate. Review Comment: nits ```suggestion // When adjusting the parallelism after rounding up cannot // find the right degree of parallelism to meet requirements. // Try to find the smallest parallelism that can satisfy the current consumption rate. ``` ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.j
[jira] [Assigned] (FLINK-4602) Move RocksDB backend to proper package
[ https://issues.apache.org/jira/browse/FLINK-4602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan reassigned FLINK-4602: -- Assignee: Han Yin > Move RocksDB backend to proper package > -- > > Key: FLINK-4602 > URL: https://issues.apache.org/jira/browse/FLINK-4602 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / State Backends >Reporter: Aljoscha Krettek >Assignee: Han Yin >Priority: Major > Labels: 2.0-related, auto-unassigned > Fix For: 2.0.0 > > > Right now the package is {{org.apache.flink.contrib.streaming.state}}, it > should probably be in {{org.apache.flink.runtime.state.rocksdb}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-4602) Move RocksDB backend to proper package
[ https://issues.apache.org/jira/browse/FLINK-4602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan updated FLINK-4602: --- Parent: (was: FLINK-3957) Issue Type: Technical Debt (was: Sub-task) > Move RocksDB backend to proper package > -- > > Key: FLINK-4602 > URL: https://issues.apache.org/jira/browse/FLINK-4602 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / State Backends >Reporter: Aljoscha Krettek >Priority: Major > Labels: 2.0-related, auto-unassigned > Fix For: 2.0.0 > > > Right now the package is {{org.apache.flink.contrib.streaming.state}}, it > should probably be in {{org.apache.flink.runtime.state.rocksdb}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] FLINK-36479 remove deprecated method Table getSchema as [flink]
davidradl commented on code in PR #25540: URL: https://github.com/apache/flink/pull/25540#discussion_r1804348228 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java: ## @@ -161,27 +147,6 @@ TemporalTableFunction createTemporalTableFunction( */ Table as(String field, String... fields); -/** - * Renames the fields of the expression result. Use this to disambiguate fields before joining - * to operations. - * Review Comment: I see a test that uses this function in Scala in OverWindowValidationTest is has . `Over.partitionBy($"c").orderBy($"rowtime").preceding(2).following($"xx").as($"w"))` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36559) [docs]Add flink cdc elasticsearch pipeline sink to docs
[ https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36559: --- Labels: pull-request-available (was: ) > [docs]Add flink cdc elasticsearch pipeline sink to docs > --- > > Key: FLINK-36559 > URL: https://issues.apache.org/jira/browse/FLINK-36559 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36559) [docs]Add elasticsearch sink to docs
JunboWang created FLINK-36559: - Summary: [docs]Add elasticsearch sink to docs Key: FLINK-36559 URL: https://issues.apache.org/jira/browse/FLINK-36559 Project: Flink Issue Type: Improvement Components: Documentation Reporter: JunboWang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36559) [docs]Add flink cdc elasticsearch pipeline sink to docs
[ https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang updated FLINK-36559: -- Summary: [docs]Add flink cdc elasticsearch pipeline sink to docs (was: [docs]Add elasticsearch sink to docs) > [docs]Add flink cdc elasticsearch pipeline sink to docs > --- > > Key: FLINK-36559 > URL: https://issues.apache.org/jira/browse/FLINK-36559 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36559) [docs]Add elasticsearch sink to docs
[ https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang updated FLINK-36559: -- Component/s: Flink CDC (was: Documentation) > [docs]Add elasticsearch sink to docs > > > Key: FLINK-36559 > URL: https://issues.apache.org/jira/browse/FLINK-36559 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36517][cdc-connect][paimon] use filterAndCommit API for Avoid commit the same datafile duplicate [flink-cdc]
lvyanquan commented on code in PR #3639: URL: https://github.com/apache/flink-cdc/pull/3639#discussion_r1804467910 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java: ## @@ -454,6 +454,46 @@ public void testSinkWithMultiTables(String metastore) Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", "1")), result); } +@ParameterizedTest +@ValueSource(strings = {"filesystem", "hive"}) +public void testDuplicateCommitAfterRestore(String metastore) +throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, +Catalog.DatabaseNotExistException, SchemaEvolveException { +initialize(metastore); +PaimonSink paimonSink = +new PaimonSink<>( +catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); +PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); +Committer committer = paimonSink.createCommitter(); + +// insert +for (Event event : createTestEvents()) { +writer.write(event, null); +} +writer.flush(false); +Collection> commitRequests = +writer.prepareCommit().stream() +.map(MockCommitRequestImpl::new) +.collect(Collectors.toList()); +committer.commit(commitRequests); + +// We've two steps in checkpoint: 1. snapshotState(ckp); 2. notifyCheckpointComplete(ckp). +// It's possible that flink job will restore from a checkpoint with only step#1 finished and +// step#2 not. +// CommitterOperator will try to re-commit recovered transactions. +committer.commit(commitRequests); Review Comment: Considering there is another issue https://issues.apache.org/jira/browse/FLINK-36541 in PaimonWriter, If there is a problem with adding this loop, you can skip it for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]
mateczagany commented on PR #25509: URL: https://github.com/apache/flink/pull/25509#issuecomment-2418969804 I have reproduced this issue easily, but I could not reproduce it if snapshot compression is turned on. IIUC when using snapshot compression, `SnappyFramedInputStream#available()` will return the number of bytes uncompressed in the buffer, and we should skip those. When not using snapshot compression, we call `BufferedInputStream#available()` which will be the number of buffered bytes, and in my tests the buffer size was 4096, so if we read e.g. 50 bytes of data during the last deserialization, this was 4046. Skipping this many bytes resulted in the next seek having to seek backwards, and that will cause the S3 client to close and re-open the stream. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy [flink-kubernetes-operator]
huyuanfeng2018 opened a new pull request, #904: URL: https://github.com/apache/flink-kubernetes-operator/pull/904 ## What is the purpose of the change Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy ## Brief change log - *Add a new option: `scaling.radical.enabled`* - *Support use a more aggressive strategy(Resource utilization takes priority) to determine the degree of parallelism after Source or after keyby without first considering balanced consumption.* ## Verifying this change in `org.apache.flink.autoscaler.JobVertexScalerTest.JobVertexScalerTest` * `testParallelismComputationWithAdjustment` and `testNumPartitionsAdjustment` add logic to test ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (yes / no) - Core observer or reconciler logic that is regularly executed: (yes / no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36527) Introduce a parameter to support the autoscaler to adopt a more aggressive strategy when Source or upstream shuffle is keyBy
[ https://issues.apache.org/jira/browse/FLINK-36527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36527: --- Labels: pull-request-available (was: ) > Introduce a parameter to support the autoscaler to adopt a more aggressive > strategy when Source or upstream shuffle is keyBy > > > Key: FLINK-36527 > URL: https://issues.apache.org/jira/browse/FLINK-36527 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: yuanfenghu >Assignee: yuanfenghu >Priority: Minor > Labels: pull-request-available > > In > [https://issues.apache.org/jira/browse/FLINK-36192|https://issues.apache.org/jira/browse/FLINK-36192] > We have completed the optimization of Souce parallelism determination. > Task Source or upstream shuffle is keyBy will be adjusted to be a divisor of > the number of partitions or the maximum degree of parallelism. > This Jira hopes to introduce a parameter to enable a {color:#de350b}more > aggressive{color} strategy : > https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1764419949 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33410] Add a test for UNBOUNDED PRECEDING [flink]
dawidwys commented on PR #25536: URL: https://github.com/apache/flink/pull/25536#issuecomment-2418841891 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36560) Fix the issue of timestamp_ltz increasing by 8 hours in Paimon
[ https://issues.apache.org/jira/browse/FLINK-36560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36560: --- Labels: pull-request-available (was: ) > Fix the issue of timestamp_ltz increasing by 8 hours in Paimon > -- > > Key: FLINK-36560 > URL: https://issues.apache.org/jira/browse/FLINK-36560 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.1 >Reporter: LvYanquan >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.1 > > Attachments: image-2024-10-17-17-16-01-773.png > > > When synchronizing the timestamp field type in MySQL, it was found that the > time displayed in the Paimon table was incorrect. > How to reproduct: > {code:java} > CREATE TABLE `orders` ( > order_id bigint not null primary key, > user_id varchar(50) not null, > shop_id bigint not null, > product_id bigint not null, > buy_fee bigint not null, > create_time timestamp not null, > update_time timestamp not null default now(), > state int not null > ); > INSERT INTO orders VALUES > (11, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 > 18:42:56', 1), > (12, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 > 18:42:56', 1), > (13, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 > 18:42:56', 1), > (14, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 > 18:42:56', 1), > (15, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 > 18:42:56', 1), > (16, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 > 18:42:56', 1), > (17, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 > 18:42:56', 1);{code} > My yaml job is like following: > source: > type: mysql > hostname: host > port: 3306 > username: flink > password: xx > tables: yaml_test.\.* > server-id: 22600-22620 > sink: > type: paimon > catalog.properties.metastore: filesystem > catalog.properties.warehouse: xx > catalog.properties.fs.oss.endpoint: xx > catalog.properties.fs.oss.accessKeyId:xx > catalog.properties.fs.oss.accessKeySecret: xx > pipeline: > name: MySQL Database to Paimon Database > Currently, the result is like following: > the `create_time` and `update_time` fields are no correct. > ||order_id||user_id||shop_id||product_id||buy_fee||create_time||update_time||state|| > |100,001|user_001|12,345|1|5,000|2023-02-16 00:40:56|2023-02-16 02:42:56|1| > |100,002|user_002|12,346|2|4,000|2023-02-15 23:40:56|2023-02-16 02:42:56|1| > |100,003|user_003|12,347|3|3,000|2023-02-15 22:40:56|2023-02-16 02:42:56|1| > |100,004|user_001|12,347|4|2,000|2023-02-15 21:40:56|2023-02-16 02:42:56|1| > |100,005|user_002|12,348|5|1,000|2023-02-15 20:40:56|2023-02-16 02:42:56|1| > |100,006|user_001|12,348|1|1,000|2023-02-15 19:40:56|2023-02-16 02:42:56|1| > |100,007|user_003|12,347|4|2,000|2023-02-15 18:40:56|2023-02-16 02:42:56|1| > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]
Zakelly commented on code in PR #25501: URL: https://github.com/apache/flink/pull/25501#discussion_r1804441714 ## flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java: ## @@ -111,6 +112,7 @@ interface Provider extends Serializable { InternalTimeServiceManager create( TaskIOMetricGroup taskIOMetricGroup, CheckpointableKeyedStateBackend keyedStatedBackend, +AsyncKeyedStateBackend asyncKeyedStateBackend, Review Comment: How about merge the two parameter into one state backend? Like this ```suggestion PriorityQueueSetFactory factory, KeyGroupRange keyGroupRange, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36560][pipeline-connector][paimon] fix the issue that timestamp_ltz field is not correctly converted. [flink-cdc]
lvyanquan commented on PR #3648: URL: https://github.com/apache/flink-cdc/pull/3648#issuecomment-2419036919 @beryllw @yuxiqian @ruanhang1993 could you please help to review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-35031) Latency marker emitting under async execution model
[ https://issues.apache.org/jira/browse/FLINK-35031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei resolved FLINK-35031. Assignee: Yanfei Lei Resolution: Resolved > Latency marker emitting under async execution model > --- > > Key: FLINK-35031 > URL: https://issues.apache.org/jira/browse/FLINK-35031 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35031) Latency marker emitting under async execution model
[ https://issues.apache.org/jira/browse/FLINK-35031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890390#comment-17890390 ] Yanfei Lei commented on FLINK-35031: Merged into master via ecb41150dab63c0c12b8dc3c447810e7e93df115 > Latency marker emitting under async execution model > --- > > Key: FLINK-35031 > URL: https://issues.apache.org/jira/browse/FLINK-35031 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36517][cdc-connect][paimon] use filterAndCommit API for Avoid commit the same datafile duplicate [flink-cdc]
lvyanquan commented on code in PR #3639: URL: https://github.com/apache/flink-cdc/pull/3639#discussion_r1804454752 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java: ## @@ -454,6 +454,46 @@ public void testSinkWithMultiTables(String metastore) Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", "1")), result); } +@ParameterizedTest +@ValueSource(strings = {"filesystem", "hive"}) +public void testDuplicateCommitAfterRestore(String metastore) +throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, +Catalog.DatabaseNotExistException, SchemaEvolveException { +initialize(metastore); +PaimonSink paimonSink = +new PaimonSink<>( +catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); +PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); +Committer committer = paimonSink.createCommitter(); + +// insert +for (Event event : createTestEvents()) { +writer.write(event, null); +} +writer.flush(false); +Collection> commitRequests = +writer.prepareCommit().stream() +.map(MockCommitRequestImpl::new) +.collect(Collectors.toList()); +committer.commit(commitRequests); + +// We've two steps in checkpoint: 1. snapshotState(ckp); 2. notifyCheckpointComplete(ckp). +// It's possible that flink job will restore from a checkpoint with only step#1 finished and +// step#2 not. +// CommitterOperator will try to re-commit recovered transactions. +committer.commit(commitRequests); Review Comment: Thanks for adding this, what about running `insert and commit` many times(in a for loop), to simulate more complex situations and situations with compaction? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]
Zakelly commented on code in PR #25509: URL: https://github.com/apache/flink/pull/25509#discussion_r1804708194 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataInputStream.java: ## @@ -31,19 +31,21 @@ public class CompressibleFSDataInputStream extends FSDataInputStream { private final FSDataInputStream delegate; private final InputStream compressingDelegate; +private final boolean compressed; public CompressibleFSDataInputStream( FSDataInputStream delegate, StreamCompressionDecorator compressionDecorator) throws IOException { this.delegate = delegate; this.compressingDelegate = compressionDecorator.decorateWithCompression(delegate); +this.compressed = compressionDecorator != UncompressedStreamCompressionDecorator.INSTANCE; } @Override public void seek(long desired) throws IOException { -final int available = compressingDelegate.available(); -if (available > 0) { -if (available != compressingDelegate.skip(available)) { +if (compressed) { +final int available = compressingDelegate.available(); +if (available > 0 && available != compressingDelegate.skip(available)) { throw new IOException("Unable to skip buffered data."); } } Review Comment: How about moving this part into a new interface. For example, `compressingDelegate` is a newly introduced `DecompressingInputStream`, which provides a method `clearBuffer`. Here's where the different behaviors go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36326] Fix auto scan newly-added table after restart [flink-cdc]
lvyanquan commented on PR #3613: URL: https://github.com/apache/flink-cdc/pull/3613#issuecomment-2419385239 According to feedback from community users, cherrypicking this pr has indeed solved the problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]
gaborgsomogyi commented on code in PR #25509: URL: https://github.com/apache/flink/pull/25509#discussion_r1804812888 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataInputStream.java: ## @@ -31,19 +31,21 @@ public class CompressibleFSDataInputStream extends FSDataInputStream { private final FSDataInputStream delegate; private final InputStream compressingDelegate; +private final boolean compressed; public CompressibleFSDataInputStream( FSDataInputStream delegate, StreamCompressionDecorator compressionDecorator) throws IOException { this.delegate = delegate; this.compressingDelegate = compressionDecorator.decorateWithCompression(delegate); +this.compressed = compressionDecorator != UncompressedStreamCompressionDecorator.INSTANCE; } @Override public void seek(long desired) throws IOException { -final int available = compressingDelegate.available(); -if (available > 0) { -if (available != compressingDelegate.skip(available)) { +if (compressed) { +final int available = compressingDelegate.available(); +if (available > 0 && available != compressingDelegate.skip(available)) { throw new IOException("Unable to skip buffered data."); } } Review Comment: I agree that this area has improvement possibility but I've the following reasons not to do that (at least in this PR): * This is a blocker and as such I would keep the fix as small as possible not to break things * The main issue here is that we don't have separate classes for uncompressed and compressed `FSDataInputStream`, so if you ask me I would do that step. I've tried that already and came to the conclusion that it would lead far because this is a pattern in Flink like `CompressibleFSDataOutputStream` and all the streams which are in `has-a` context instead of being a real decorator around each other. So all in all introducing another level of layer as an interface would be overkill here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35268][state] Add ttl interface for Async State API && implement TtlListStateV2/TtlValueStateV2 [flink]
Zakelly commented on code in PR #25515: URL: https://github.com/apache/flink/pull/25515#discussion_r1804486595 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java: ## @@ -59,7 +59,7 @@ public AbstractAggregatingState( } protected StateFuture asyncGetAccumulator() { Review Comment: How about remove this method? ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregatingStateV2.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.ttl; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.state.ttl.TtlStateContext; +import org.apache.flink.runtime.state.ttl.TtlValue; +import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState; + +import java.util.Collection; + +/** + * This class wraps aggregating state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the value added to the state + * @param The type of the accumulator (intermediate aggregate state). + * @param Type of the value extracted from the state + */ +class TtlAggregatingStateV2 +extends AbstractTtlStateV2< +K, N, ACC, TtlValue, InternalAggregatingState, OUT>> +implements InternalAggregatingState { + +TtlAggregatingStateV2( +TtlStateContext, OUT>, ACC> +ttlStateContext, +TtlAggregateFunctionV2 aggregateFunction) { +super(ttlStateContext); +aggregateFunction.updater = (ttlValue) -> original.updateInternal(ttlValue); +} + +@Override +public StateFuture asyncMergeNamespaces(N target, Collection sources) { +return original.asyncMergeNamespaces(target, sources); +} + +@Override +public void mergeNamespaces(N target, Collection sources) { +original.mergeNamespaces(target, sources); +} + +@Override +public StateFuture asyncGet() { +return original.asyncGet(); +} + +@Override +public StateFuture asyncAdd(IN value) { +return original.asyncAdd(value); +} + +@Override +public OUT get() { +return original.get(); +} + +@Override +public void add(IN value) { +original.add(value); +} + +@Override +public void clear() { +original.clear(); +} + +@Override +public StateFuture asyncGetInternal() { +return original.asyncGetInternal() +.thenApply(ttlValue -> getElementWithTtlCheck(ttlValue, original::updateInternal)); Review Comment: How about update the value asynchronously when `updateOnRead`? ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/AbstractTtlStateV2.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.ttl; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.state.ttl.AbstractTtlDecorator; +import org.apache.flink.runtime.state.ttl.TtlStateContext; +import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; + +/** + * Base class for TTL logic wrappers of state objects. StateV2 does not support + * FULL_STATE_SCAN_SNAPSHOT and INCREMENTAL_CLEANUP, only supports ROCKSDB_COMPACTION_FILTER. + * + *
Re: [PR] [FLINK-36564][ci] Running CI in random timezone to expose more time related bugs. [flink-cdc]
lvyanquan commented on PR #3650: URL: https://github.com/apache/flink-cdc/pull/3650#issuecomment-2421138664 @yuxiqian PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]
ruanhang1993 closed pull request #102: [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT URL: https://github.com/apache/flink-connector-kafka/pull/102 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] FLINK-36540.[Runtime / Task] Add Support for Hadoop Caller Context when using Flink to operate hdfs. [flink]
liangyu-1 commented on PR #25516: URL: https://github.com/apache/flink/pull/25516#issuecomment-2421163090 @dmvk @fapaul Hi, would you please help me check this issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18][FLINK-36421] [fs] [checkpoint] Sync outputStream before returning handle in FsCheckpointStreamFactory [flink]
Zakelly commented on PR #25479: URL: https://github.com/apache/flink/pull/25479#issuecomment-2421167188 Will merge this without CI green -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33977][runtime] Adaptive scheduler may not minimize the number of TMs during downscaling [flink]
1996fanrui commented on PR #25218: URL: https://github.com/apache/flink/pull/25218#issuecomment-2421167238 Hey @XComp @ztison , sorry, I'd like to discuss with you again about this PR. Could we fix the issue for `DefaultSlotAssigner` and `Application Mode` first? I prefer to fix it first for several reasons: - @XComp 's first [concern](https://github.com/apache/flink/pull/25218#discussion_r1771187666) is this fix conflicts with `execution.state-recovery.from-local`, so it's better to be handled in a FLIP. - That's why this PR only change the code of `DefaultSlotAssigner` and doesn't change any code of `StateLocalitySlotAssigner`. - @ztison 's [concern](https://github.com/apache/flink/pull/25218#issuecomment-2401913141) is this fix conflicts with spreading the workload across as many workers as possible. - As we discussed before, this concern only exists for session mode. That why I'm curious could we fix it for `Application Mode` first. - The third reason is most important: the issue that this PR is trying to fix is more like a bug than an optimization for `Application Mode` and disable `execution.state-recovery.from-local`. - The phenomenon of this bug is that TM resources cannot be released after scaling down. - I believe that flink users use Adaptive Scheduler mainly to scale up and scale down quickly or more efficiently. - Many users have questions like: why resources can be saved after scaling down. - This bug is reported to 3 JIRAs: FLINK-33977, FLINK-35594 and FLINK-35903. - The main reason I wanna discuss with you again is : one flink user [reported this bug ](https://apache-flink.slack.com/archives/C03G7LJTS2G/p1729167222445569)again in the Slack troubleshooting channel, the the reporter cc me in the [next thread](https://apache-flink.slack.com/archives/C03G7LJTS2G/p1729167719506889) due to I'm the active contributor of autoscaler. (I guess he doesn't know the bug or phenomenon is not related to autoscaler, it's related to Adaptive Scheduler) - It is worth mentioning that as I know @RocMarshal (the developer of this PR) doesn't report any jira, because he noticed the issue is reported via some JIRAs. - It means at least 5 users(From what I have observed, these 5 users come from 5 different companies) faced this issue in their production jobs. I’m happy to see more and more companies trying out Adaptive Scheduler. - The fourth reason: 1.20 is the LTS version for 1.x series. - If we think it's a bug, we could fix it in 1.20.x and 2.0.x together. - If we think it's an improvement or feature rather than a bug , and improve it in a FLIP, it means this issue cannot be fixed in 1.x series. - That's why [FLIP-461](https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler) and [FLIP-472](https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states) cannot be backported to 1.x series. - Actually, I think both of [FLIP-461](https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler) and [FLIP-472](https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states) are great improvement for Adaptive Scheduler. Thank you for the great work. ❤️ - I believe most of users(companies) are not able to maintain the internal flink version, and they use the official flink version. If this bug is not fixed in 1.x, it may be difficult for Adaptive Scheduler to be used by a large number of users in 1.x. - Of course, my team maintains our internal flink version. We can easily fix it in our production environment. My initiative is mainly to enable most flink users to have a better Adaptive Scheduler experience. Sorry to bother you again. This is definitely my last try. If you think it is unreasonable, I can accept it and deal with it in a subsequent FLIP. Thank you very much. ❤️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36564) Running CI in random timezone to expose more time related bugs
LvYanquan created FLINK-36564: - Summary: Running CI in random timezone to expose more time related bugs Key: FLINK-36564 URL: https://issues.apache.org/jira/browse/FLINK-36564 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.3.0 Reporter: LvYanquan Fix For: cdc-3.3.0 Refer to [this comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012], when running CI, setting random time zone in session can help to expose issues that is related to time zone in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36563) Running CI in random timezone to expose more time related bugs
[ https://issues.apache.org/jira/browse/FLINK-36563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LvYanquan closed FLINK-36563. - Resolution: Duplicate > Running CI in random timezone to expose more time related bugs > > > Key: FLINK-36563 > URL: https://issues.apache.org/jira/browse/FLINK-36563 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.3.0 >Reporter: LvYanquan >Priority: Major > Fix For: cdc-3.3.0 > > > Refer to [this > comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012], > when running CI, setting random time zone in session can help to expose > issues that is related to time zone in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36562) Running CI in random timezone to expose more time related bugs
[ https://issues.apache.org/jira/browse/FLINK-36562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LvYanquan closed FLINK-36562. - Release Note: Duplicated with FLINK-36564. Resolution: Duplicate > Running CI in random timezone to expose more time related bugs > > > Key: FLINK-36562 > URL: https://issues.apache.org/jira/browse/FLINK-36562 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.3.0 >Reporter: LvYanquan >Priority: Major > Fix For: cdc-3.3.0 > > > Refer to [this > comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012], > when running CI, setting random time zone in session can help to expose > issues that is related to time zone in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36562) Running CI in random timezone to expose more time related bugs
[ https://issues.apache.org/jira/browse/FLINK-36562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890773#comment-17890773 ] LvYanquan commented on FLINK-36562: --- Closed as this is duplicated with FLINK-36564 > Running CI in random timezone to expose more time related bugs > > > Key: FLINK-36562 > URL: https://issues.apache.org/jira/browse/FLINK-36562 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.3.0 >Reporter: LvYanquan >Priority: Major > Fix For: cdc-3.3.0 > > > Refer to [this > comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012], > when running CI, setting random time zone in session can help to expose > issues that is related to time zone in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36474] Support merging timestamp columns when routing [flink-cdc]
yuxiqian commented on PR #3636: URL: https://github.com/apache/flink-cdc/pull/3636#issuecomment-2421159128 Squashed & rebased with `master`. Could @ruanhang1993 please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-36559) [docs]Add flink cdc elasticsearch pipeline sink to docs
[ https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang closed FLINK-36559. - Resolution: Duplicate Duplicate with FLINK-36052. > [docs]Add flink cdc elasticsearch pipeline sink to docs > --- > > Key: FLINK-36559 > URL: https://issues.apache.org/jira/browse/FLINK-36559 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36559) [docs]Add flink cdc elasticsearch pipeline sink to docs
[ https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang updated FLINK-36559: -- Description: Duplicate with 36052. > [docs]Add flink cdc elasticsearch pipeline sink to docs > --- > > Key: FLINK-36559 > URL: https://issues.apache.org/jira/browse/FLINK-36559 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > Labels: pull-request-available > > Duplicate with 36052. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36052) add elasticsearch.md for elasticsearch pipeline connector
[ https://issues.apache.org/jira/browse/FLINK-36052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890794#comment-17890794 ] JunboWang commented on FLINK-36052: --- Could we add Component/s as FLINK-CDC? > add elasticsearch.md for elasticsearch pipeline connector > - > > Key: FLINK-36052 > URL: https://issues.apache.org/jira/browse/FLINK-36052 > Project: Flink > Issue Type: Improvement >Reporter: wuzexian >Assignee: wuzexian >Priority: Major > Labels: pull-request-available > > add elasticsearch.md for elasticsearch pipeline connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36541) Occasional met commit conflict problem in PaimonSink.
[ https://issues.apache.org/jira/browse/FLINK-36541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890800#comment-17890800 ] LvYanquan commented on FLINK-36541: --- I would like to try to fix it. > Occasional met commit conflict problem in PaimonSink. > - > > Key: FLINK-36541 > URL: https://issues.apache.org/jira/browse/FLINK-36541 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.3.0 >Reporter: LvYanquan >Priority: Major > Fix For: cdc-3.3.0 > > Attachments: image-2024-10-15-20-36-03-495.png > > > When writing records to Paimon, we occasional met commit conflict problem > even if Parallelism is 1. > It turns out that PaimonWriter preparecommit Committable with a checkpointId > which is always 1, as the following code: > [https://github.com/apache/flink-cdc/blob/a1781f432d906fa2a864642a5f74ac5bdc963d9c/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java#L164-L178] > !image-2024-10-15-20-36-03-495.png! > This is actually an incorrect usage, we must pass in an increasing ID to > ensure the correctness of the snapshot information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-36052) add elasticsearch.md for elasticsearch pipeline connector
[ https://issues.apache.org/jira/browse/FLINK-36052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890794#comment-17890794 ] JunboWang edited comment on FLINK-36052 at 10/18/24 5:49 AM: - Could we set Component/s as FLINK-CDC? was (Author: JIRAUSER305453): Could we add Component/s as FLINK-CDC? > add elasticsearch.md for elasticsearch pipeline connector > - > > Key: FLINK-36052 > URL: https://issues.apache.org/jira/browse/FLINK-36052 > Project: Flink > Issue Type: Improvement >Reporter: wuzexian >Assignee: wuzexian >Priority: Major > Labels: pull-request-available > > add elasticsearch.md for elasticsearch pipeline connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36559) [docs]Add flink cdc elasticsearch pipeline sink to docs
[ https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang updated FLINK-36559: -- Description: (was: Duplicate with 36052.) > [docs]Add flink cdc elasticsearch pipeline sink to docs > --- > > Key: FLINK-36559 > URL: https://issues.apache.org/jira/browse/FLINK-36559 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36066][runtime] Introducing the AdaptiveGraphManager component [flink]
JunRuiLee commented on code in PR #25414: URL: https://github.com/apache/flink/pull/25414#discussion_r1805761682 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/StreamNodeForwardGroup.java: ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.forwardgroup; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +public class StreamNodeForwardGroup { Review Comment: JavaDoc is required ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java: ## @@ -85,6 +88,10 @@ public ResultPartitionType getResultType() { return resultType; } +public List getConsumerStreamEdges() { Review Comment: getOutputStreamEdges ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/StreamNodeForwardGroup.java: ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.forwardgroup; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +public class StreamNodeForwardGroup { Review Comment: Could we introduce an interface named ForwardGroup and rename the original ForwardGroup class to JobVerticesForwardGroup? The JobVerticesForwardGroup and StreamNodeForwardGroup class would then implement this interface. ## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java: ## @@ -31,4 +31,12 @@ public interface StreamGraphHasher { * didn't change. */ Map traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph); + +/** + * Generates a hash for the specified {@link StreamNode} within the {@link StreamGraph}. This + * hash is stored in the provided map and can be used to uniquely identify the {@link + * StreamNode} across job submissions, assuming its configuration remains unchanged. + */ +boolean generateHashesByStreamNode( +StreamNode streamNode, StreamGraph streamGraph, Map hashes); Review Comment: int streamNodeId, StreamGraph streamGraph, Map hashes ## f
[jira] [Created] (FLINK-36562) Running CI in random timezone to expose more time related bugs
LvYanquan created FLINK-36562: - Summary: Running CI in random timezone to expose more time related bugs Key: FLINK-36562 URL: https://issues.apache.org/jira/browse/FLINK-36562 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.3.0 Reporter: LvYanquan Fix For: cdc-3.3.0 Refer to [this comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012], when running CI, setting random time zone in session can help to expose issues that is related to time zone in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36563) Running CI in random timezone to expose more time related bugs
LvYanquan created FLINK-36563: - Summary: Running CI in random timezone to expose more time related bugs Key: FLINK-36563 URL: https://issues.apache.org/jira/browse/FLINK-36563 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.3.0 Reporter: LvYanquan Fix For: cdc-3.3.0 Refer to [this comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012], when running CI, setting random time zone in session can help to expose issues that is related to time zone in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36565) Pipeline YAML should allow merging decimal with different precisions
yux created FLINK-36565: --- Summary: Pipeline YAML should allow merging decimal with different precisions Key: FLINK-36565 URL: https://issues.apache.org/jira/browse/FLINK-36565 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, it's not possible to merge two Decimal-typed fields with different precision or scaling. Since DECIMAL(p1, s1) and DECIMAL(p2, s2) could be converted to DECIMAL(MAX(p1 - s1, p2 - s2) + MAX(s1, s2), MAX(s1, s2) without any loss, this converting path seems reasonable and worth being added. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]
Zakelly commented on code in PR #25509: URL: https://github.com/apache/flink/pull/25509#discussion_r1805752878 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataInputStream.java: ## @@ -31,19 +31,21 @@ public class CompressibleFSDataInputStream extends FSDataInputStream { private final FSDataInputStream delegate; private final InputStream compressingDelegate; +private final boolean compressed; public CompressibleFSDataInputStream( FSDataInputStream delegate, StreamCompressionDecorator compressionDecorator) throws IOException { this.delegate = delegate; this.compressingDelegate = compressionDecorator.decorateWithCompression(delegate); +this.compressed = compressionDecorator != UncompressedStreamCompressionDecorator.INSTANCE; } @Override public void seek(long desired) throws IOException { -final int available = compressingDelegate.available(); -if (available > 0) { -if (available != compressingDelegate.skip(available)) { +if (compressed) { +final int available = compressingDelegate.available(); +if (available > 0 && available != compressingDelegate.skip(available)) { throw new IOException("Unable to skip buffered data."); } } Review Comment: Separate classes uncompressed and compressed `FSDataInputStream` sounds good to me. Agree to consider this later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36563) Running CI in random timezone to expose more time related bugs
[ https://issues.apache.org/jira/browse/FLINK-36563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890772#comment-17890772 ] LvYanquan commented on FLINK-36563: --- Closed as this is duplicated with FLINK-36564. > Running CI in random timezone to expose more time related bugs > > > Key: FLINK-36563 > URL: https://issues.apache.org/jira/browse/FLINK-36563 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.3.0 >Reporter: LvYanquan >Priority: Major > Fix For: cdc-3.3.0 > > > Refer to [this > comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012], > when running CI, setting random time zone in session can help to expose > issues that is related to time zone in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36566) Code optimization: always identify DataChangeEvent before SchemaChangeEvent in Operator
LvYanquan created FLINK-36566: - Summary: Code optimization: always identify DataChangeEvent before SchemaChangeEvent in Operator Key: FLINK-36566 URL: https://issues.apache.org/jira/browse/FLINK-36566 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.3.0 Reporter: LvYanquan Fix For: cdc-3.3.0 In a data flow system, the number of DataChangeEvents is always much larger than that of SchemaChangeEvents. If we always identify DataChangeEvents first (such as using an `instance of` judgment), it can reduce a lot of judgment logic and improve the performance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36565] Route module allows merging Decimals with various precisions [flink-cdc]
yuxiqian commented on PR #3651: URL: https://github.com/apache/flink-cdc/pull/3651#issuecomment-2421296255 Could @melin please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36565] Route module allows merging Decimals with various precisions [flink-cdc]
yuxiqian opened a new pull request, #3651: URL: https://github.com/apache/flink-cdc/pull/3651 This closes FLINK-36565. Currently, table-merging router does not allow coercing `Decimal` fields with different precisions from upstream, which means some lossless conversions are actually not possible, including: * `TINYINT`, `SMALLINT`, `INT`, `BIGINT` to `DECIMAL(p > 17, s >= 0)` * low precision `DECIMAL` to high precision ones --- I've baked the changes in #3636 (FLINK-36474) into this PR since there are some common changes between them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36565) Pipeline YAML should allow merging decimal with different precisions
[ https://issues.apache.org/jira/browse/FLINK-36565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36565: --- Labels: pull-request-available (was: ) > Pipeline YAML should allow merging decimal with different precisions > > > Key: FLINK-36565 > URL: https://issues.apache.org/jira/browse/FLINK-36565 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Major > Labels: pull-request-available > > Currently, it's not possible to merge two Decimal-typed fields with different > precision or scaling. Since DECIMAL(p1, s1) and DECIMAL(p2, s2) could be > converted to DECIMAL(MAX(p1 - s1, p2 - s2) + MAX(s1, s2), MAX(s1, s2) without > any loss, this converting path seems reasonable and worth being added. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36564) Running CI in random timezone to expose more time related bugs
[ https://issues.apache.org/jira/browse/FLINK-36564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36564: --- Labels: pull-request-available (was: ) > Running CI in random timezone to expose more time related bugs > --- > > Key: FLINK-36564 > URL: https://issues.apache.org/jira/browse/FLINK-36564 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.3.0 >Reporter: LvYanquan >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.3.0 > > > Refer to [this > comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012], > when running CI, setting random time zone in session can help to expose > issues that is related to time zone in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]
Zakelly commented on code in PR #25501: URL: https://github.com/apache/flink/pull/25501#discussion_r1805798857 ## flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java: ## @@ -220,7 +220,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) isUsingCustomRawKeyedState()); stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables); -timeServiceManager = context.internalTimerServiceManager(); +timeServiceManager = context.asyncInternalTimerServiceManager(); Review Comment: I think this is not true. We'd better keep `context.internalTimerServiceManager` here, and re-assign `timeServiceManager` in `AbstractAsyncStateStreamOperatorV2` with `asyncInternalTimerServiceManager`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36567) Planner module didn't use the setting from flink-conf.yaml
liting liu created FLINK-36567: -- Summary: Planner module didn't use the setting from flink-conf.yaml Key: FLINK-36567 URL: https://issues.apache.org/jira/browse/FLINK-36567 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.18.1 Reporter: liting liu I found the flink-table-planner_*.jar was generated in the /tmp dir, event though the conf `io.tmp.dirs` has been set to `/opt`. See jobmanger's log: 2024-10-17 08:52:30,330 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: io.tmp.dirs, /opt The code related should be ``` org.apache.flink.table.planner.loader.PlannerModule#PlannerModule private PlannerModule() { try { final ClassLoader flinkClassLoader = PlannerModule.class.getClassLoader(); final Path tmpDirectory = Paths.get(ConfigurationUtils.parseTempDirectories(new Configuration())[0]); ``` The PlannerModule creates a new Configuration instead of using the values from the configFile. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]
gaborgsomogyi commented on PR #25509: URL: https://github.com/apache/flink/pull/25509#issuecomment-2419716967 > Could you reproduce this issue with state compression enabled? I've slightly touched compressed state and yeah, seen either slowness and/or huge amount of re-opens. The number of re-opens are controlled by (which I've played with to reduce re-opens): * `fs.s3a.readahead.range` * `fs.s3a.input.async.drain.threshold` Plus tried to enable pre-fetching via `fs.s3a.prefetch.enabled` to do everything in memory to gain some speed. None of them helped. My finding is that `skip` in case of S3 is reading the data into a buffer and then just drop it. I've not gone through the whole snappy chain but assumed the same happens with decompression at top. Not sure from where the mentioned `4096 bytes` buffer size is coming from but having as many re-opens as many elements are the list is also something which is not optimal🙂 Are you saying that the exact same state data with default S3 Hadoop configs is slow uncompressed and fast compressed? That would be better case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 [flink]
ferenc-csaky commented on code in PR #25494: URL: https://github.com/apache/flink/pull/25494#discussion_r1804959664 ## flink-rpc/flink-rpc-akka/pom.xml: ## @@ -94,8 +94,8 @@ under the License. io.netty - netty - 3.10.6.Final + netty-all Review Comment: Ah yeah, thanks for the detailed answer, quite some time ago I started to create a POC to migrate to Artery but forgot the whole context since then. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 [flink]
ferenc-csaky commented on PR #25494: URL: https://github.com/apache/flink/pull/25494#issuecomment-2419808148 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]
AHeise merged PR #757: URL: https://github.com/apache/flink-web/pull/757 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]
pnowojski commented on PR #25539: URL: https://github.com/apache/flink/pull/25539#issuecomment-2419944369 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36356) HadoopRecoverableWriterTest.testRecoverWithState due to IOException
[ https://issues.apache.org/jira/browse/FLINK-36356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890500#comment-17890500 ] Piotr Nowojski commented on FLINK-36356: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=63129&view=logs&j=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819&t=2dd510a3-5041-5201-6dc3-54d310f68906 > HadoopRecoverableWriterTest.testRecoverWithState due to IOException > --- > > Key: FLINK-36356 > URL: https://issues.apache.org/jira/browse/FLINK-36356 > Project: Flink > Issue Type: Bug > Components: Connectors / Hadoop Compatibility >Affects Versions: 2.0-preview >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=62378&view=logs&j=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819&t=2dd510a3-5041-5201-6dc3-54d310f68906&l=10514 > {code} > Sep 23 07:55:16 07:55:16.451 [ERROR] Tests run: 12, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 20.05 s <<< FAILURE! -- in > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriterTest > Sep 23 07:55:16 07:55:16.451 [ERROR] > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriterTest.testRecoverWithState > -- Time elapsed: 2.694 s <<< ERROR! > Sep 23 07:55:16 java.io.IOException: All datanodes > [DatanodeInfoWithStorage[127.0.0.1:45240,DS-13a30476-dff5-4f3a-88b1-887571521a95,DISK]] > are bad. Aborting... > Sep 23 07:55:16 at > org.apache.hadoop.hdfs.DataStreamer.handleBadDatanode(DataStreamer.java:1537) > Sep 23 07:55:16 at > org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1472) > Sep 23 07:55:16 at > org.apache.hadoop.hdfs.DataStreamer.processDatanodeError(DataStreamer.java:1244) > Sep 23 07:55:16 at > org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:663) > {code} > The Maven logs reveal a bit more (I attached the extract of the failed build): > {code} > 07:55:13,491 [DataXceiver for client DFSClient_NONMAPREDUCE_211593080_35 at > /127.0.0.1:59360 [Receiving block > BP-289839883-172.27.0.2-1727078098659:blk_1073741832_1016]] ERROR > org.apache.hadoop.hdfs.server.datanode.DataNode [] - > 127.0.0.1:46429:DataXceiver error processing WRITE_BLOCK operation src: > /127.0.0.1:59360 dst: /127.0.0.1:46429 > java.nio.channels.ClosedByInterruptException: null > at > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) > ~[?:1.8.0_292] > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:406) > ~[?:1.8.0_292] > at > org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57) > ~[hadoop-common-2.10.2.jar:?] > at > org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) > ~[hadoop-common-2.10.2.jar:?] > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) > ~[hadoop-common-2.10.2.jar:?] > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) > ~[hadoop-common-2.10.2.jar:?] > at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > ~[?:1.8.0_292] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) > ~[?:1.8.0_292] > at java.io.BufferedInputStream.read(BufferedInputStream.java:345) > ~[?:1.8.0_292] > at java.io.DataInputStream.read(DataInputStream.java:149) > ~[?:1.8.0_292] > at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:209) > ~[hadoop-common-2.10.2.jar:?] > at > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:211) > ~[hadoop-hdfs-client-2.10.2.jar:?] > at > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) > ~[hadoop-hdfs-client-2.10.2.jar:?] > at > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) > ~[hadoop-hdfs-client-2.10.2.jar:?] > at > org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:528) > ~[hadoop-hdfs-2.10.2.jar:?] > at > org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:968) > ~[hadoop-hdfs-2.10.2.jar:?] > at > org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:877) > ~[hadoop-hdfs-2.10.2.jar:?] > at > org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:166) > ~[hadoop-hdfs-2.10.2.jar:?] > at > org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:103) > ~[hadoop-hdfs-2.10.2.jar:?] > at > org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataX
[jira] [Updated] (FLINK-36561) ResultSet.wasNull() does not reflect null values in Flink JDBC Driver
[ https://issues.apache.org/jira/browse/FLINK-36561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Soin updated FLINK-36561: -- Language: (was: jav) > ResultSet.wasNull() does not reflect null values in Flink JDBC Driver > - > > Key: FLINK-36561 > URL: https://issues.apache.org/jira/browse/FLINK-36561 > Project: Flink > Issue Type: Bug > Components: Table SQL / JDBC >Affects Versions: 1.18.1, 1.20.0, 1.19.1 >Reporter: Ilya Soin >Priority: Major > > As per JDBC > [standard|https://docs.oracle.com/en/java/javase/17/docs/api/java.sql/java/sql/ResultSet.html#wasNull()], > {{ResultSet.wasNull()}} > {quote}Reports whether the last column read had a value of SQL NULL. Note > that you must first call one of the getter methods on a column to try to read > its value and then call the method wasNull to see if the value read was SQL > NULL. > {quote} > However, Flink JDBC driver currently does not update the {{wasNull}} flag > within the {{FlinkResultSet.get*()}} methods. Instead, it only sets this flag > during [iteration over > rows|https://github.com/apache/flink/blob/release-2.0-preview1-rc1/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java#L106] > fetched from the gateway endpoint. This behavior leads to {{wasNull}} > returning true only if the entire row is null, not when individual column > values are null. Consequently, reading a null value using > {{FlinkResultSet.get*()}} incorrectly results in {{wasNull()}} returning > false, which is not compliant with the JDBC specification. > h4. Proposed solution > Check if the underlying value accessed with {{FlinkResultSet.get*()}} method > is null, and update wasNull accordingly. > h4. For discussion > Can we skip null rows in FlinkResultSet.next()? > h4. Steps to reproduce: > Add > {code:java} > assertTrue(resultSet.wasNull()); > {code} > after any call to resultSet.get*() in > [testStringResultSetNullData()|https://github.com/apache/flink/blob/release-2.0-preview1-rc1/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java#L115]. > Run the test and see the failed check. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36561) ResultSet.wasNull() does not reflect null values in Flink JDBC Driver
Ilya Soin created FLINK-36561: - Summary: ResultSet.wasNull() does not reflect null values in Flink JDBC Driver Key: FLINK-36561 URL: https://issues.apache.org/jira/browse/FLINK-36561 Project: Flink Issue Type: Bug Components: Table SQL / JDBC Affects Versions: 1.19.1, 1.20.0, 1.18.1 Reporter: Ilya Soin As per JDBC [standard|https://docs.oracle.com/en/java/javase/17/docs/api/java.sql/java/sql/ResultSet.html#wasNull()], {{ResultSet.wasNull()}} {quote}Reports whether the last column read had a value of SQL NULL. Note that you must first call one of the getter methods on a column to try to read its value and then call the method wasNull to see if the value read was SQL NULL. {quote} However, Flink JDBC driver currently does not update the {{wasNull}} flag within the {{FlinkResultSet.get*()}} methods. Instead, it only sets this flag during [iteration over rows|https://github.com/apache/flink/blob/release-2.0-preview1-rc1/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java#L106] fetched from the gateway endpoint. This behavior leads to {{wasNull}} returning true only if the entire row is null, not when individual column values are null. Consequently, reading a null value using {{FlinkResultSet.get*()}} incorrectly results in {{wasNull()}} returning false, which is not compliant with the JDBC specification. h4. Proposed solution Check if the underlying value accessed with {{FlinkResultSet.get*()}} method is null, and update wasNull accordingly. h4. For discussion Can we skip null rows in FlinkResultSet.next()? h4. Steps to reproduce: Add {code:java} assertTrue(resultSet.wasNull()); {code} after any call to resultSet.get*() in [testStringResultSetNullData()|https://github.com/apache/flink/blob/release-2.0-preview1-rc1/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java#L115]. Run the test and see the failed check. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]
mateczagany commented on PR #25509: URL: https://github.com/apache/flink/pull/25509#issuecomment-2420140089 > Are you saying that the exact same state data with default S3 Hadoop configs is slow uncompressed and fast compressed? That would be better case. Yes, that's correct, I did not tune any S3 settings. The job was the same, compressed state size was 290 MB, uncompressed one was 11 MB. The recovery of uncompressed state resulted in one S3 `GET` after each `read()` after `skip()` was called, while the compressed data did not. It was a ListState of Strings, and for the compressed data I even induced an artificial `skip()` for each element by reading 1 less byte in `StringValue#read()`, and none of the `skip()` or `read()` calls resulted in any new S3 `GET` queries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org