[PR] [FLINK-4602] Change RocksDBKeyedStateBackend to new package. [flink-benchmarks]
AlexYinHan opened a new pull request, #98: URL: https://github.com/apache/flink-benchmarks/pull/98 This resolves the compilation errors introduced by [FLINK-4602](https://github.com/apache/flink/pull/25543), which moves the rocksdb classes to o.a.f.state.rocksdb package package. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36271) Support json and jsonb type in PostgreSQL JDBC Dialect
[ https://issues.apache.org/jira/browse/FLINK-36271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36271: --- Labels: pull-request-available (was: ) > Support json and jsonb type in PostgreSQL JDBC Dialect > -- > > Key: FLINK-36271 > URL: https://issues.apache.org/jira/browse/FLINK-36271 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC, Table SQL / JDBC >Reporter: Grzegorz Kołakowski >Priority: Major > Labels: pull-request-available > > When using PostgreSQL JDBC Catalog an error is thrown if one of the tables > has column of type json or jsonb. > > {noformat} > java.lang.UnsupportedOperationException: Doesn't support Postgres type > 'jsonb' yet > {noformat} > Json/jsonb field can be returned as VARCHAR when reading the data. > Writing values to json/jsonb column is not allowed in current design. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36245) Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated method/interface in Sink V2 in 2.0
[ https://issues.apache.org/jira/browse/FLINK-36245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893798#comment-17893798 ] LvYanquan commented on FLINK-36245: --- Hi, Piotr. Kafka connector has not yet been adapted to the release of Flink 2.0-preview, so Kafka code cannot be directly used here. I have added a build plugin in POM to not compile this class: {code:java} org.apache.maven.plugins maven-compiler-plugin org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java compile process-sources compile -Xlint:deprecation true {code} And I tested the compilation of both the entire Flink project and individual flink-examples project using maven-3.8.6 and it passed. Perhaps you can check if this plugin is not working in your Maven environment. > Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated > method/interface in Sink V2 in 2.0 > --- > > Key: FLINK-36245 > URL: https://issues.apache.org/jira/browse/FLINK-36245 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Common >Reporter: Qingsheng Ren >Assignee: LvYanquan >Priority: Major > Labels: 2.0-related, pull-request-available > Fix For: 2.0-preview > > > SourceFunction, SinkFunction and Sink V1 API has been marked as deprecated > and should be removed in Flink 2.0. > Considering SourceFunction / SinkFunction are heavily used in test cases for > building a simple data generator or a data validator, it could be a huge > amount of work to rewrite all these usages with Source and Sink V2 API. A > viable path for 2.0-preview version would be: > * Move SourceFunction, SinkFunction to an internal package, as a test util > * Rewrite all Sink V1 implementations with Sink V2 directly (the usage of > Sink V1 is low in the main repo) > As a long term working item, all usages of SourceFunction and SinkFunction > will be replaced by Source and Sink API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36622) Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs.
Han Yin created FLINK-36622: --- Summary: Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs. Key: FLINK-36622 URL: https://issues.apache.org/jira/browse/FLINK-36622 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 2.0-preview Reporter: Han Yin Fix For: 2.0.0 Currently, flink-benchmarks relies on non-public APIs in Flink. For example, in {_}+StateBackendBenchmarkUtils.java+{_}, the function _+compactState+_ takes RocksDBKeyedStateBackend as its first argument. This requires explicit type conversion in flink-benchmark(from +_KeyedStateBackend_+ to {+}_RocksDBKeyedStateBackend_{+}). Moreover, this means that once the signature of +_RocksDBKeyedStateBackend_+ changes, we need to modify flink-benchmark correspondingly. Therefore, we should avoid exposing non-public APIs in {_}+StateBackendBenchmarkUtils+{_}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36592][state/forst] Support file cache for ForStStateBackend [flink]
Zakelly commented on code in PR #25561: URL: https://github.com/apache/flink/pull/25561#discussion_r1820533404 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java: ## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs.cache; + +import org.apache.flink.core.fs.ByteBufferReadable; +import org.apache.flink.core.fs.FSDataInputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Semaphore; + +/** + * A {@link FSDataInputStream} delegates requests to other one and supports reading data with {@link + * ByteBuffer}. + */ +public class CachedDataInputStream extends FSDataInputStream implements ByteBufferReadable { + +private static final Logger LOG = LoggerFactory.getLogger(CachedDataInputStream.class); + +/** The reference to the cache entry. */ +private final FileCacheEntry cacheEntry; + +private volatile FSDataInputStream fsdis; + +private volatile StreamStatus streamStatus; + +/** + * The position of the cached stream, when cached stream is closed, the position is stored. When + * switch to original stream, the position is restored. + */ +private volatile long position; + +private final FSDataInputStream originalStream; + +private Semaphore semaphore; + +public CachedDataInputStream( +FileCacheEntry cacheEntry, +FSDataInputStream cacheStream, +FSDataInputStream originalStream) { +this.cacheEntry = cacheEntry; +this.fsdis = cacheStream; +this.originalStream = originalStream; +this.streamStatus = StreamStatus.CACHED_OPEN; +this.semaphore = new Semaphore(0); +} + +private FSDataInputStream getStream() throws IOException { +if (streamStatus == StreamStatus.CACHED_OPEN && cacheEntry.tryRetain() > 0) { +return fsdis; +} else if (streamStatus == StreamStatus.CACHED_CLOSED) { Review Comment: ```suggestion } else if (streamStatus != StreamStatus.ORIGINAL) { ``` Since the `streamStatus` might not yet updated ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java: ## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs.cache; + +import org.apache.flink.core.fs.ByteBufferReadable; +import org.apache.flink.core.fs.FSDataInputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Semaphore; + +/** + * A {@link FSDataInputStream} delegates requests to other one and supports reading data with {@link + * ByteBuffer}. + */ +public class CachedDataInputStream extends FSDataInputStream implements ByteBufferReadable { Review Comment: Please offer some description about the muti-threading scenario? ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java: ## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this w
Re: [PR] [FLINK-36455] Sinks retry synchronously [flink]
AHeise commented on PR #25547: URL: https://github.com/apache/flink/pull/25547#issuecomment-2444290856 Reverted the deprecation of numFailed and added a config option for the retries. PTAL @fapaul -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36493][TABLE API] Remove all deprecated methods in MapView [flink]
tinaselenge commented on PR #25566: URL: https://github.com/apache/flink/pull/25566#issuecomment-2443607410 Failing tests don't seem to be related as they pass locally for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in batch mode [flink]
dawidwys commented on code in PR #24699: URL: https://github.com/apache/flink/pull/24699#discussion_r1820328447 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java: ## @@ -567,7 +912,7 @@ void testUserDefinedFunctions() { "SELECT *\n" + "FROM MyTable\n" + "MATCH_RECOGNIZE (\n" -+ " ORDER BY proctime\n" ++ " ORDER BY ts\n" Review Comment: I agee with @grzegorz8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-4602][State] Add constructors for o.a.f.contrib.streaming.state.EmbeddedRocksDBStateBackend to maintain interface compatibility. [flink]
AlexYinHan opened a new pull request, #25586: URL: https://github.com/apache/flink/pull/25586 ## What is the purpose of the change Add constructors for o.a.f.contrib.streaming.state.EmbeddedRocksDBStateBackend to maintain interface compatibility. ## Brief change log Add ```EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing)``` and ```EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing)``` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
[ https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zjjiang updated FLINK-36620: Description: Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the --flink-home=$FLINK_HOME format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. was: Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the "--flink-home=$FLINK_HOME" format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. > Add support for the flink-home parameter to be set in both “--flink-home > $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats > > > Key: FLINK-36620 > URL: https://issues.apache.org/jira/browse/FLINK-36620 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1 >Reporter: zjjiang >Priority: Major > Fix For: cdc-3.3.0 > > > Currently, most of FlinkCDC's command line arguments are supported in the > format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink > home, which only supports space spacing. Users who use the > --flink-home=$FLINK_HOME format on the command line (trying to be consistent > with the other = spacing arguments) will not be able to set flink home > correctly. > In particular, when there is an environment variable $FLINK_HOME and you want > to override it by setting --flink-home=/path/to/new/flink/home, you will find > that it does not work. > We would like to support the flink-home parameter in both --flink-home > $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid > formatting differences and runtime exceptions when using command line > arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-36245) Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated method/interface in Sink V2 in 2.0
[ https://issues.apache.org/jira/browse/FLINK-36245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893764#comment-17893764 ] Piotr Nowojski edited comment on FLINK-36245 at 10/29/24 10:39 AM: --- Hi [~kunni], [~renqs] and [~leonard]. I think this change has made the build broken/unstable. Locally in the IntelliJ building Flink fails for me due to: {code:java} flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42 java: cannot access org.apache.flink.api.connector.sink2.StatefulSink class file for org.apache.flink.api.connector.sink2.StatefulSink not found {code} flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in turns is still referring to the StatefulSink: {code:java} public class KafkaSink implements StatefulSink, TwoPhaseCommittingSink (...) {code} Maven builds might be working due to some dumb luck. https://issues.apache.org/jira/browse/FLINK-36621 was (Author: pnowojski): Hi [~kunni], [~renqs] and [~leonard]. I think this change has made the build broken/unstable. Locally in the IntelliJ building Flink fails for me due to: {code:java} flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42 java: cannot access org.apache.flink.api.connector.sink2.StatefulSink class file for org.apache.flink.api.connector.sink2.StatefulSink not found {code} flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in turns is still referring to the StatefulSink: {code:java} public class KafkaSink implements StatefulSink, TwoPhaseCommittingSink (...) {code} Maven builds might be working due to some dumb luck. > Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated > method/interface in Sink V2 in 2.0 > --- > > Key: FLINK-36245 > URL: https://issues.apache.org/jira/browse/FLINK-36245 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Common >Reporter: Qingsheng Ren >Assignee: LvYanquan >Priority: Major > Labels: 2.0-related, pull-request-available > Fix For: 2.0-preview > > > SourceFunction, SinkFunction and Sink V1 API has been marked as deprecated > and should be removed in Flink 2.0. > Considering SourceFunction / SinkFunction are heavily used in test cases for > building a simple data generator or a data validator, it could be a huge > amount of work to rewrite all these usages with Source and Sink V2 API. A > viable path for 2.0-preview version would be: > * Move SourceFunction, SinkFunction to an internal package, as a test util > * Rewrite all Sink V1 implementations with Sink V2 directly (the usage of > Sink V1 is low in the main repo) > As a long term working item, all usages of SourceFunction and SinkFunction > will be replaced by Source and Sink API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-4602][State] Add constructors for o.a.f.contrib.streaming.state.EmbeddedRocksDBStateBackend to maintain interface compatibility. [flink]
flinkbot commented on PR #25586: URL: https://github.com/apache/flink/pull/25586#issuecomment-2443963990 ## CI report: * 8a64c442363ad7af042796ee90a80e82f8fb47f2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-4602][State] Add constructors for o.a.f.contrib.streaming.state.EmbeddedRocksDBStateBackend to maintain interface compatibility. [flink]
AlexYinHan commented on PR #25586: URL: https://github.com/apache/flink/pull/25586#issuecomment-2443980646 @Zakelly This resolves the incompatibility of the constructors of EmbeddedRocksDBStateBackend. Can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-16851) Add common metrics to the SourceReader base implementation.
[ https://issues.apache.org/jira/browse/FLINK-16851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893806#comment-17893806 ] Poorvank Bhatia commented on FLINK-16851: - Hey [~becket_qin] , Is this still being worked upon? If not i can take this up. Happy to discuss :) > Add common metrics to the SourceReader base implementation. > --- > > Key: FLINK-16851 > URL: https://issues.apache.org/jira/browse/FLINK-16851 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Jiangjie Qin >Priority: Major > > Add the metrics to the base SourceReader implementation. This is relevant to > [FLIP-33|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics?src=contextnavpagetreemode]]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36621) Build failure: StatefulSink not found
[ https://issues.apache.org/jira/browse/FLINK-36621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893812#comment-17893812 ] LvYanquan commented on FLINK-36621: --- This issue should be related to a bug https://youtrack.jetbrains.com/issue/IDEA-87868 in Idea. I will add instructions on this class to enable users to compile correctly through Maven. > Build failure: StatefulSink not found > - > > Key: FLINK-36621 > URL: https://issues.apache.org/jira/browse/FLINK-36621 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 2.0-preview >Reporter: Piotr Nowojski >Priority: Blocker > > Locally in the IntelliJ building Flink fails for me due to: > {code:java} > flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42 > java: cannot access org.apache.flink.api.connector.sink2.StatefulSink > class file for org.apache.flink.api.connector.sink2.StatefulSink not found > {code} > flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in > turns is still referring to the StatefulSink: > {code:java} > public class KafkaSink implements StatefulSink, > TwoPhaseCommittingSink (...) > {code} > which has been deleted in FLINK-36245. I think maven builds might be working > due to some luck and differences between how IntelliJ and Maven are > interpreting pom files and dealing with the dependencies. > CC [~kunni] [~renqs] [~Leonard] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36622] Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs. [flink]
AlexYinHan opened a new pull request, #25587: URL: https://github.com/apache/flink/pull/25587 ## What is the purpose of the change Currently, flink-benchmarks relies on non-public APIs in Flink. For example, in StateBackendBenchmarkUtils.java, the function compactState takes RocksDBKeyedStateBackend as its first argument. This requires explicit type conversion in flink-benchmark(from KeyedStateBackend to RocksDBKeyedStateBackend). Moreover, this means that once the signature of RocksDBKeyedStateBackend changes, we need to modify flink-benchmark correspondingly. Therefore, we should avoid exposing non-public APIs in StateBackendBenchmarkUtils. ## Brief change log Change the 1st argument of `StateBackendBenchmarkUtils#compactState` from RocksDBKeyedStateBackend to KeyedStateBackend. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36616] fix npe in GcpPublisherConfig [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #33: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/33#discussion_r1820768938 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/GcpPublisherConfig.java: ## @@ -52,7 +52,11 @@ public CredentialsProvider getCredentialsProvider() { } public TransportChannelProvider getTransportChannelProvider() { +if (transportChannelProvider == null) { +return null; +} return transportChannelProvider.getTransportChannelProvider(); + Review Comment: ```suggestion ``` nit: i guess we don't need this line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36621) Build failure: StatefulSink not found
[ https://issues.apache.org/jira/browse/FLINK-36621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36621: --- Labels: pull-request-available (was: ) > Build failure: StatefulSink not found > - > > Key: FLINK-36621 > URL: https://issues.apache.org/jira/browse/FLINK-36621 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 2.0-preview >Reporter: Piotr Nowojski >Priority: Blocker > Labels: pull-request-available > > Locally in the IntelliJ building Flink fails for me due to: > {code:java} > flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42 > java: cannot access org.apache.flink.api.connector.sink2.StatefulSink > class file for org.apache.flink.api.connector.sink2.StatefulSink not found > {code} > flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in > turns is still referring to the StatefulSink: > {code:java} > public class KafkaSink implements StatefulSink, > TwoPhaseCommittingSink (...) > {code} > which has been deleted in FLINK-36245. I think maven builds might be working > due to some luck and differences between how IntelliJ and Maven are > interpreting pom files and dealing with the dependencies. > CC [~kunni] [~renqs] [~Leonard] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36621) Build failure: StatefulSink not found
Piotr Nowojski created FLINK-36621: -- Summary: Build failure: StatefulSink not found Key: FLINK-36621 URL: https://issues.apache.org/jira/browse/FLINK-36621 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 2.0-preview Reporter: Piotr Nowojski Locally in the IntelliJ building Flink fails for me due to: {code:java} flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42 java: cannot access org.apache.flink.api.connector.sink2.StatefulSink class file for org.apache.flink.api.connector.sink2.StatefulSink not found {code} flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in turns is still referring to the StatefulSink: {code:java} public class KafkaSink implements StatefulSink, TwoPhaseCommittingSink (...) {code} which has been deleted in FLINK-36245. I think maven builds might be working due to some luck and differences between how IntelliJ and Maven are interpreting pom files and dealing with the dependencies. CC [~kunni] [~renqs] [~Leonard] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-4602] Change RocksDBKeyedStateBackend to new package. [flink-benchmarks]
AlexYinHan commented on PR #98: URL: https://github.com/apache/flink-benchmarks/pull/98#issuecomment-2444005263 @Zakelly This should resolve the compilation errors once [Flink PR-25586](https://github.com/apache/flink/pull/25586) is merged. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36621][flink-examples] Add comment in KafkaEventsGeneratorJob [flink]
flinkbot commented on PR #25588: URL: https://github.com/apache/flink/pull/25588#issuecomment-2444190248 ## CI report: * de83f7c5c6d6e57252dbf23a0677957e9164d3ac UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36622] Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs. [flink]
flinkbot commented on PR #25587: URL: https://github.com/apache/flink/pull/25587#issuecomment-2444189745 ## CI report: * fdcf5f44ec979958a0f65c27b2685f443e70152c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36621][flink-examples] Add comment in KafkaEventsGeneratorJob [flink]
lvyanquan opened a new pull request, #25588: URL: https://github.com/apache/flink/pull/25588 ## What is the purpose of the change Add comment to help developer avoiding compiling error in flink-examples module. ## Brief change log comment ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36622) Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs.
[ https://issues.apache.org/jira/browse/FLINK-36622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36622: --- Labels: pull-request-available (was: ) > Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs. > - > > Key: FLINK-36622 > URL: https://issues.apache.org/jira/browse/FLINK-36622 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 2.0-preview >Reporter: Han Yin >Priority: Minor > Labels: pull-request-available > Fix For: 2.0.0 > > > Currently, flink-benchmarks relies on non-public APIs in Flink. For example, > in {_}+StateBackendBenchmarkUtils.java+{_}, the function _+compactState+_ > takes RocksDBKeyedStateBackend as its first argument. > This requires explicit type conversion in flink-benchmark(from > +_KeyedStateBackend_+ to {+}_RocksDBKeyedStateBackend_{+}). Moreover, this > means that once the signature of +_RocksDBKeyedStateBackend_+ changes, we > need to modify flink-benchmark correspondingly. > Therefore, we should avoid exposing non-public APIs in > {_}+StateBackendBenchmarkUtils+{_}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32483) RescaleCheckpointManuallyITCase.testCheckpointRescalingOutKeyedState fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-32483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-32483. -- Resolution: Fixed > RescaleCheckpointManuallyITCase.testCheckpointRescalingOutKeyedState fails on > AZP > - > > Key: FLINK-32483 > URL: https://issues.apache.org/jira/browse/FLINK-32483 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.17.2, 1.20.0 >Reporter: Sergey Nuyanzin >Assignee: David Morávek >Priority: Major > Labels: auto-deprioritized-critical, test-stability > Fix For: 2.0.0 > > > This build > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50397&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=0c010d0c-3dec-5bf1-d408-7b18988b1b2b&l=7495 > fails with > {noformat} > Jun 26 06:08:57 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 21.041 s <<< FAILURE! - in > org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase > Jun 26 06:08:57 [ERROR] > org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingOutKeyedState > Time elapsed: 6.435 s <<< FAILURE! > Jun 26 06:08:57 java.lang.AssertionError: expected:<[(0,24000), (2,58500), > (0,34500), (0,45000), (3,43500), (2,18000), (1,6000), (1,16500), (0,28500), > (0,52500), (3,27000), (1,51000), (2,25500), (0,1500), (0,49500), (3,0), > (3,48000), (0,36000), (2,22500), (1,10500), (0,46500), (2,33000), (1,21000), > (0,9000), (0,57000), (3,31500), (2,19500), (1,7500), (1,55500), (3,42000), > (2,3), (0,54000), (2,40500), (1,4500), (3,15000), (2,3000), (1,39000), > (2,13500), (0,37500), (0,61500), (3,12000), (3,6)]> but was:<[(2,58500), > (0,34500), (0,45000), (3,43500), (2,18000), (1,16500), (0,52500), (3,27000), > (2,25500), (0,49500), (3,0), (3,48000), (0,36000), (2,22500), (1,21000), > (0,9000), (0,57000), (3,31500), (1,7500), (2,3), (0,54000), (2,40500), > (1,4500), (2,3000), (1,39000), (2,13500), (0,61500), (3,12000)]> > Jun 26 06:08:57 at org.junit.Assert.fail(Assert.java:89) > Jun 26 06:08:57 at org.junit.Assert.failNotEquals(Assert.java:835) > Jun 26 06:08:57 at org.junit.Assert.assertEquals(Assert.java:120) > Jun 26 06:08:57 at org.junit.Assert.assertEquals(Assert.java:146) > Jun 26 06:08:57 at > org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.restoreAndAssert(RescaleCheckpointManuallyITCase.java:219) > Jun 26 06:08:57 at > org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingKeyedState(RescaleCheckpointManuallyITCase.java:138) > Jun 26 06:08:57 at > org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingOutKeyedState(RescaleCheckpointManuallyITCase.java:116) > Jun 26 06:08:57 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36271] Support reading json and jsonb types in PostgreSQL dialect [flink-connector-jdbc]
grzegorz8 commented on PR #141: URL: https://github.com/apache/flink-connector-jdbc/pull/141#issuecomment-2444020989 > > @grzegorz8 Can You Give me, Some Example to Build This Code and Add Jar and use in pyflink code for Streaming Data to JSONB in PostgreSQL. > > I'm sorry but there are still one issue with this change, namely, cratedb depends on postgres module and the following error is thrown in tests: > > ``` > Caused by: java.lang.NoClassDefFoundError: org/postgresql/util/PGobject >at org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialectConverter.lambda$createPrimitiveConverter$bd2b50a6$1(PostgresDialectConverter.java:99) >at org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.lambda$wrapIntoNullableInternalConverter$ff222c76$1(AbstractDialectConverter.java:127) >at org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialectConverter.lambda$createPostgresArrayConverter$477a3c4c$1(PostgresDialectConverter.java:87) >at org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.lambda$wrapIntoNullableInternalConverter$ff222c76$1(AbstractDialectConverter.java:127) >at org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.toInternal(AbstractDialectConverter.java:78) >at org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257) >at org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56) >at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:97) >at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:114) >at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71) >at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338) > Caused by: java.lang.ClassNotFoundException: org.postgresql.util.PGobject >at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) >at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) >at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527) >... 11 more > ``` > > However, the build will succeed if you skip tests. `mvn clean install -DskipTests` Hey @matriv! Since you are the author of CrateDB support (https://github.com/apache/flink-connector-jdbc/pull/29) maybe you can suggest me how to deal with the error shown above? Thanks in advance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-21909) Unify API and implementation for Hive and Filesystem source connector
[ https://issues.apache.org/jira/browse/FLINK-21909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893792#comment-17893792 ] Poorvank Bhatia commented on FLINK-21909: - Hey [~jark] , If this is still unassigned can i take this up? Thanks :) > Unify API and implementation for Hive and Filesystem source connector > - > > Key: FLINK-21909 > URL: https://issues.apache.org/jira/browse/FLINK-21909 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem, Connectors / Hive >Reporter: Jark Wu >Priority: Major > > This should make Filesystem source connector have all the ability of Hive > source connector (including the watermark ability). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25921) Support different input parallelism for preCommit topology
[ https://issues.apache.org/jira/browse/FLINK-25921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893796#comment-17893796 ] Poorvank Bhatia commented on FLINK-25921: - Hey [~fpaul] , If this is still unassigned can i pick this up? Thanks :) > Support different input parallelism for preCommit topology > -- > > Key: FLINK-25921 > URL: https://issues.apache.org/jira/browse/FLINK-25921 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Affects Versions: 1.15.0, 1.16.0 >Reporter: Fabian Paul >Priority: Major > > Currently, we assume that the pre-commit topology has the same parallelism as > the operator before when inserting the failover region. To support a > different parallelism we might need to insert a different identity map to > customize the mapping. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36613) RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-36613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893856#comment-17893856 ] Piotr Nowojski commented on FLINK-36613: {quote} It is very likely that the reason is FLINK-36556 where was changed default value for STARTING_MEMORY_SEGMENT_SIZE and it impacts the test BTW before changes from FLINK-36556 it is not reproduced {quote} I don't think that FLINK-36556 can be the culprit here. Buffer debloating is not enabled neither randomly in the test environment nor in the {{RescaleCheckpointManuallyITCase}} itself. It's also disabled by default, so changes in the FLINK-36556 shouldn't affect in any way this test. I would presume that test is still broken in some way. I've tried looking into it, but after a bit of digging I couldn't find what's actually wrong with it. > RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState fails on > AZP > > > Key: FLINK-36613 > URL: https://issues.apache.org/jira/browse/FLINK-36613 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 2.0-preview >Reporter: Sergey Nuyanzin >Priority: Critical > Labels: pull-request-available, test-stability > > {{RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState}} fails > like > {noformat} > Oct 28 04:42:03 04:42:03.884 [ERROR] > org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState > -- Time elapsed: 6.107 s <<< FAILURE! > Oct 28 04:42:03 java.lang.AssertionError: expected:<[(0,24000), (1,22500), > (0,34500), (1,33000), (0,21000), (0,45000), (2,31500), (2,42000), (1,6000), > (0,28500), (0,52500), (2,15000), (1,3000), (1,51000), (0,1500), (0,49500), > (2,12000), (2,6), (0,36000), (1,10500), (1,58500), (0,46500), (0,9000), > (0,57000), (2,19500), (2,43500), (1,7500), (1,55500), (2,3), (1,18000), > (0,54000), (2,40500), (1,4500), (0,16500), (2,27000), (1,39000), (2,13500), > (1,25500), (0,37500), (0,61500), (2,0), (2,48000)]> but was:<[(1,22500), > (0,34500), (1,33000), (0,45000), (0,21000), (2,31500), (0,28500), (0,52500), > (2,15000), (1,3000), (1,51000), (0,49500), (0,1500), (1,58500), (1,10500), > (0,46500), (0,9000), (0,57000), (2,43500), (2,19500), (1,7500), (1,55500), > (2,40500), (1,4500), (0,16500), (2,27000), (1,39000), (2,13500), (1,25500), > (0,37500), (0,61500)]> > Oct 28 04:42:03 at org.junit.Assert.fail(Assert.java:89) > Oct 28 04:42:03 at org.junit.Assert.failNotEquals(Assert.java:835) > Oct 28 04:42:03 at org.junit.Assert.assertEquals(Assert.java:120) > Oct 28 04:42:03 at org.junit.Assert.assertEquals(Assert.java:146) > Oct 28 04:42:03 at > org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.restoreAndAssert(RescaleCheckpointManuallyITCase.java:216) > Oct 28 04:42:03 at > org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingKeyedState(RescaleCheckpointManuallyITCase.java:138) > Oct 28 04:42:03 at > org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState(RescaleCheckpointManuallyITCase.java:111) > Oct 28 04:42:03 at > java.base/java.lang.reflect.Method.invoke(Method.java:566) > Oct 28 04:42:03 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > {noformat} > for instance > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=63345&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba > the issue is floating however could be reproduced locally after several > repetitions like > {code} > cd flink-tests > for i in $(seq 1 100); do ../mvnw -Dtest=RescaleCheckpointManuallyITCase test > || break ; done > {code} > there is a similar issue FLINK-32483 however it seems fixed (not closed for > some reason and not related) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-36410) Improve Lineage Info Collection for flink app
[ https://issues.apache.org/jira/browse/FLINK-36410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenqiu Huang resolved FLINK-36410. --- Resolution: Done > Improve Lineage Info Collection for flink app > - > > Key: FLINK-36410 > URL: https://issues.apache.org/jira/browse/FLINK-36410 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.20.0 >Reporter: Zhenqiu Huang >Priority: Minor > > We find that lineage interface adoption is not easy for each of Flink > connectors as every connect has its own release dependency and schedule. > Thus, to make the lineage integration incrementally used by user, we want to > change the lineage info collection not require both source and sink as > lineage provider implemented. So that Lineage reporter can has partial > lineage graph generated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36410) Improve Lineage Info Collection for flink app
[ https://issues.apache.org/jira/browse/FLINK-36410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893955#comment-17893955 ] Zhenqiu Huang commented on FLINK-36410: --- PR is created and merged https://github.com/apache/flink/pull/25440 > Improve Lineage Info Collection for flink app > - > > Key: FLINK-36410 > URL: https://issues.apache.org/jira/browse/FLINK-36410 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.20.0 >Reporter: Zhenqiu Huang >Priority: Minor > > We find that lineage interface adoption is not easy for each of Flink > connectors as every connect has its own release dependency and schedule. > Thus, to make the lineage integration incrementally used by user, we want to > change the lineage info collection not require both source and sink as > lineage provider implemented. So that Lineage reporter can has partial > lineage graph generated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36625) Add helper classes for Lineage integration in connectors
Zhenqiu Huang created FLINK-36625: - Summary: Add helper classes for Lineage integration in connectors Key: FLINK-36625 URL: https://issues.apache.org/jira/browse/FLINK-36625 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Zhenqiu Huang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36626) Flink SQL temporal JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Summary: Flink SQL temporal JOINs behavior change from Flink 1.15 to Flink 1.18+ (was: Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+) > Flink SQL temporal JOINs behavior change from Flink 1.15 to Flink 1.18+ > --- > > Key: FLINK-36626 > URL: https://issues.apache.org/jira/browse/FLINK-36626 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.18.1, 1.20.0 > Environment: AWS Managed Apache Flink >Reporter: Eduardo Breijo >Priority: Critical > Attachments: Flink-SQL-query.txt > > > There is a behavior change I found when migrating to Flink 1.18+ from Flink > 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin > point and is causing the query below to output different results. > *Flink SQL Query:* > ~WITH assets_setpoint AS (~ > ~SELECT~ > ~asset_id,~ > ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ > ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ > ~LAST_VALUE(`value`) AS `value`~ > ~FROM asset_readings~ > ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ > ~ON metric.metric_id = asset_readings.metric_id~ > ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ > ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ > ~)~ > ~SELECT~ > ~assets_supply_air_temp.`timestamp`,~ > ~assets_supply_air_temp.asset_id,~ > ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ > ~FROM (~ > ~SELECT asset_readings.`timestamp`,~ > ~asset_readings.asset_id,~ > ~asset_readings.`value` AS `value`~ > ~FROM asset_readings~ > ~-- Metrics temporal lookup inner join~ > ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ > ~ON metric.metric_id = asset_readings.metric_id~ > ~-- Assets to ignore for this computed metric definition temporal lookup > left join~ > ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME > AS OF `proctime`~ > ~ON > asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id > = :computedMetricDefinitionId~ > ~AND asset_to_ignore_per_computed_metric_definition.asset_id = > asset_readings.asset_id~ > ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ > ~-- Filter assets not present in the asset to ignore for this computed > metric definition table~ > ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ > ~) AS assets_supply_air_temp~ > ~INNER JOIN assets_setpoint~ > ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ > ~WHERE assets_supply_air_temp.`timestamp` BETWEEN > assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ > *Schema:* > ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~ > ~| name | type | null | key | extras | > watermark |~ > ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~ > ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | > SOURCE_WATERMARK() |~ > ~| asset_id | BIGINT | TRUE | | | > |~ > ~| metric_id | INT | TRUE | | | > |~ > ~| value | DOUBLE | TRUE | | | > |~ > ~| metadata | MAP | TRUE | | | > |~ > ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | > |~ > ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~ > ~6 rows in set~ > ~++~ > ~| table name |~ > ~++~ > ~| asset_readings |~ > ~| asset_relationship_parent_to_unit |~ > ~| asset_to_ignore_per_computed_metric_definition |~ > ~| metric |~ > ~++~ > Results: > * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - > assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and > assets_setpoint is computed correctly for
Re: [PR] [FLINK-35444][pipeline-connector][paimon] Paimon Pipeline Connector support changing column names to lowercase for Hive metastore [flink-cdc]
github-actions[bot] commented on PR #3569: URL: https://github.com/apache/flink-cdc/pull/3569#issuecomment-2445540069 This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactored Identifiers [flink-cdc]
github-actions[bot] commented on PR #3501: URL: https://github.com/apache/flink-cdc/pull/3501#issuecomment-2445540164 This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35599][cdc-connector][jdbc-mysql]Flink cdc pipeline sink jdbc mysql [flink-cdc]
github-actions[bot] commented on PR #3433: URL: https://github.com/apache/flink-cdc/pull/3433#issuecomment-2445540230 This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36626) Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Description: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin point and is causing the query below to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~ ~| name | type | null | key | extras | watermark |~ ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~ ~6 rows in set~ ~++~ ~| table name |~ ~++~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~++~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+ this difference always results in 0 I have tried updating the query using different formats but I have not found a workaround and I don't know why this is happening. Attached you will find a file with the different SQL formats I have tried with no luck. Any help would be appreciated was: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin point and is causing the query to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`t
[jira] [Updated] (FLINK-36626) Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Description: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL joins that I haven't been able to pin point and is causing the query to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~+---+-+---+-+---++~ ~| name | type | null | key | extras | watermark |~ ~+---+-+---+-+---++~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~+---+-+---+-+---++~ ~6 rows in set~ ~++~ ~| table name |~ ~++~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~++~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+ this difference always results in 0 I have tried updating the query using different formats but I have not found a workaround and I don't know why this is happening. Attached you will find a file with the different SQL formats I have tried with no luck. Any help would be appreciated was: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL joins that I haven't been able to pin point and is causing the query to output different results. Flink SQL Query: ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE me
[jira] [Assigned] (FLINK-36547) Add option to retain `RowKind` sematics after serialization/deserialization for cdc formats
[ https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-36547: -- Assignee: Yubin Li > Add option to retain `RowKind` sematics after serialization/deserialization > for cdc formats > --- > > Key: FLINK-36547 > URL: https://issues.apache.org/jira/browse/FLINK-36547 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 2.0.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Attachments: image-2024-10-16-11-01-54-790.png, > image-2024-10-16-11-02-34-406.png > > > As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> > +I > {code:java} > Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL > as Debezium JSON or Avro messages, and emit to external systems like Kafka. > However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a > single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and > UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} > In fact, we also have a demand to make the `RowKind` sematics consistent in > many scenarios, such as those that require different processing of -U/-D and > +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and > UPDATE_AFTER to implement the feature and made it run well in bussiness. > {code:java} > create table datagen1 (id int, name string) with > ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', > 'fields.id.max'='2'); > // add 'debezium-json.retain.rowkind' = 'true' > create table t2 (id int, name string, num bigint) WITH ( > 'topic' = 't2', > 'connector' = 'kafka', > 'properties.bootstrap.servers' = 'xx', > 'properties.group.id' = 'test', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'debezium-json', > 'key.format' = 'json', > 'key.fields' = 'id', > 'debezium-json.timestamp-format.standard' = 'ISO-8601', > 'debezium-json.schema-include' = 'false', > 'debezium-json.retain.rowkind' = 'true' > ); > insert into t2 select id, max(name) as name, count(1) as num from datagen1 > group by id; > insert into print1 select * from t2; > {code} > output result: > !image-2024-10-16-11-02-34-406.png|width=660,height=153! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36626) Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Description: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin point and is causing the query to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~{+}--{-}{-}{+}---{-}++{-}-{-}{-}---{-}++{-}-{-}{-}---+~ ~| name | type | null | key | extras | watermark |~ ~{+}--{-}{-}{+}---{-}++{-}-{-}{-}---{-}++{-}-{-}{-}---+~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~{+}--{-}{-}{+}---{-}++{-}-{-}{-}---{-}++{-}-{-}{-}---+~ ~6 rows in set~ ~++~ ~| table name |~ ~++~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~++~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+ this difference always results in 0 I have tried updating the query using different formats but I have not found a workaround and I don't know why this is happening. Attached you will find a file with the different SQL formats I have tried with no luck. Any help would be appreciated was: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL joins that I haven't been able to pin point and is causing the query to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS
Re: [PR] [hotfix] Replace System.out.println with logger for better log management [flink-connector-gcp-pubsub]
caicancai commented on code in PR #31: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/31#discussion_r1821701027 ## flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java: ## @@ -21,11 +21,14 @@ import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.math.BigInteger; /** Helper class to send PubSubMessages to a PubSub topic. */ class PubSubPublisher { +private static final Logger LOG = LoggerFactory.getLogger(PubSubExample.class); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Description: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin point and is causing the query below to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~{+}---{-}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~ ~| name | type | null | key | extras | watermark |~ ~{+}---{-}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~{+}---{-}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~ ~6 rows in set~ ~++~ ~| table name |~ ~++~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~++~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+ this difference always results in 0 * Updating the query to use regular join against the metric table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to output the correct value but I don't think regular joins is what I need in this case. I have tried updating the query using different formats with temporal joins but I have not found a workaround and I don't know why this is happening. Attached you will find a file with the different SQL formats I have tried with no luck. Any
[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Description: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin point and is causing the query below to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~{+}{-}{{-}}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}--{-}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~ ~| name | type | null | key | extras | watermark |~ ~{+}{-}{{-}}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}--{-}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~{+}{-}{{-}}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}--{-}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~ ~6 rows in set~ ~++~ ~| table name |~ ~++~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~++~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+ this difference always results in 0 * Updating the query to use regular join against the metric table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to output the correct value I have tried updating the query using different formats but I have not found a workaround and I don't know why this is happening. Attached you will find a file with the different SQL formats I have tried with no luck. Any help would be appreciated was: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal j
[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Description: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin point and is causing the query below to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}--\{+}~ ~| name | type | null | key | extras | watermark |~ ~{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}--\{+}~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}--\{+}~ ~6 rows in set~ ~++~ ~| table name |~ ~++~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~++~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+, for the same query, this difference always results in 0 * On Flink 1.18+, updating the query to use regular join against the metric table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to output the correct value but I don't think regular joins is what I need in this case. I have tried updating the query using different formats w
[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Description: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin point and is causing the query below to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~{+}--{-}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---\{+}~ ~| name | type | null | key | extras | watermark |~ ~{+}--{-}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---\{+}~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~{+}--{-}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---\{+}~ ~6 rows in set~ ~++~ ~| table name |~ ~++~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~++~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+ this difference always results in 0 * On Flink 1.18+, updating the query to use regular join against the metric table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to output the correct value but I don't think regular joins is what I need in this case. I have tried updating the query using different formats with temporal joins but I have not found a workaround and I don't know why this is happening.
Re: [PR] [FLINK-36011] [runtime] Generalize RescaleManager to become StateTransitionManager [flink]
rkhachatryan commented on code in PR #25280: URL: https://github.com/apache/flink/pull/25280#discussion_r1821594247 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java: ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.Temporal; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Supplier; + +/** + * {@code DefaultStateTransitionManager} is a state machine which manages the {@link + * AdaptiveScheduler}'s state transitions based on the previous transition time and the available + * resources. See {@link Phase} for details on each individual phase of this state machine. Note: We + * use the term phase here to avoid confusion with the state used in the {@link AdaptiveScheduler}. + * + * + * {@link Cooldown} + * | + * +--> {@link Idling} + * | | + * | V + * +--> {@link Stabilizing} + * | + * +--> {@link Stabilized} --> {@link Idling} + * | | + * | V + * \--> {@link Transitioning} + * + * + * Thread-safety: This class is not implemented in a thread-safe manner and relies on the fact + * that any method call happens within a single thread. + * + * @see Executing + */ +@NotThreadSafe +public class DefaultStateTransitionManager implements StateTransitionManager { + +private static final Logger LOG = LoggerFactory.getLogger(DefaultStateTransitionManager.class); + +private final Supplier clock; +private final StateTransitionManager.Context transitionContext; +private Phase phase; +private final List> scheduledFutures; + +@VisibleForTesting final Duration cooldownTimeout; +@Nullable @VisibleForTesting final Duration resourceStabilizationTimeout; +@VisibleForTesting final Duration maxTriggerDelay; + +DefaultStateTransitionManager( +Temporal initializationTime, +StateTransitionManager.Context transitionContext, +Duration cooldownTimeout, +@Nullable Duration resourceStabilizationTimeout, +Duration maxTriggerDelay) { +this( +initializationTime, +Instant::now, +transitionContext, +cooldownTimeout, +resourceStabilizationTimeout, +maxTriggerDelay); +} + +@VisibleForTesting +DefaultStateTransitionManager( +Temporal initializationTime, +Supplier clock, +StateTransitionManager.Context transitionContext, +Duration cooldownTimeout, +@Nullable Duration resourceStabilizationTimeout, +Duration maxTriggerDelay) { + +this.clock = clock; +this.maxTriggerDelay = maxTriggerDelay; +this.cooldownTimeout = cooldownTimeout; +this.resourceStabilizationTimeout = resourceStabilizationTimeout; +this.transitionContext = transitionContext; +this.scheduledFutures = new ArrayList<>(); +this.phase = new Cooldown(initializationTime, clock, this, cooldownTimeout); +} + +@Override +public void onChange() { +phase.onChange(); +} + +@Override +public void onTrigger() { +phase.onTrigger(); +} + +@Override +public void close() { +scheduledFutures.forEach(future -> future.cancel(true)); +scheduledFutures.clear(); +} + +@VisibleForTesting +Phase getPhase() { +return phase; +} + +private void progressToIdling() { +progressToPhase(new Idling(clock, this)); +} + +private void progressToStabilizing(Temporal firstChangeEventTimestamp) { +progressToPhase( +new Stabilizing( +clock
[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Description: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin point and is causing the query below to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~ ~| name | type | null | key | extras | watermark |~ ~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~ ~6 rows in set~ ~++~ ~| table name |~ ~++~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~++~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+, for the same query, this difference always results in 0 * On Flink 1.18+, updating the query to use regular join against the metric lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to out
[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Description: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal lookup joins that I haven't been able to pin point and is causing the query below to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~ ~| name | type | null | key | extras | watermark |~ ~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~ ~6 rows in set~ ~++~ ~| table name |~ ~++~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~++~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+, for the same query, this difference always results in 0 * On Flink 1.18+, updating the query to use regular join against the metric lookup table (removing {~}FOR SYSTEM_
[jira] [Created] (FLINK-36627) Failure to process a CSV file in Flink due to a character encoding mismatch: the file is in ISO-8859 and the application expects UTF-8.
Hector Miuler Malpica Gallegos created FLINK-36627: -- Summary: Failure to process a CSV file in Flink due to a character encoding mismatch: the file is in ISO-8859 and the application expects UTF-8. Key: FLINK-36627 URL: https://issues.apache.org/jira/browse/FLINK-36627 Project: Flink Issue Type: Bug Reporter: Hector Miuler Malpica Gallegos I have error in read csv with charset ISO-8859, my error is the following: {{{color:#de350b}_Caused by: java.io.CharConversionException: Invalid UTF-8 middle byte 0x41 (at char #1247, byte #1246): check content encoding, does not look like UTF-8_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportInvalidOther(UTF8Reader.java:520)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportDeferredInvalid(UTF8Reader.java:531)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.read(UTF8Reader.java:177)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.loadMore(CsvDecoder.java:458)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder._nextUnquotedString(CsvDecoder.java:782)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.nextString(CsvDecoder.java:732)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:963)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:763)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:321)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:283)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.next(MappingIterator.java:199)_{color}}} {{{color:#de350b} _... 11 more_{color}}} {{My code is the following:}} {{{}{color:#0747a6}_val env = StreamExecutionEnvironment.createLocalEnvironment()_{color}{}}}{{{}{color:#0747a6}_val csvFormat = CsvReaderFormat.forPojo(Empresa::class.java)_{color}{}}} {{{color:#0747a6}_val csvSource = FileSource_{color}}} {{{color:#0747a6}_.forRecordStreamFormat(csvFormat, Path("/miuler/PadronRUC_202410.csv"))_{color}}} {{{color:#0747a6}_.build()_{color}}} {{val empresaStreamSource = env.fromSource(csvSource, WatermarkStrategy.noWatermarks(), "CSV Source")}} {{empresaStreamSource.print()}} {{env.execute("Load CSV")}} my dependencies: {{{color:#0747a6}val kotlinVersion = "1.20.0"{color}}} {{{color:#0747a6}// FLINK{color}}} {{{color:#0747a6}dependencies {{color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-shaded-jackson:2.15.3-19.0"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-core:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-runtime:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-runtime-web:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-clients:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-streaming-java:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-csv:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-connector-base:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-connector-files:$kotlinVersion"){color}}} {{{color:#0747a6}}{color}}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36627) Failure to process a CSV file in Flink due to a character encoding mismatch: the file is in ISO-8859 and the application expects UTF-8.
[ https://issues.apache.org/jira/browse/FLINK-36627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hector Miuler Malpica Gallegos updated FLINK-36627: --- Description: I have error in read csv with charset ISO-8859, my error is the following: {{{color:#de350b}_Caused by: java.io.CharConversionException: Invalid UTF-8 middle byte 0x41 (at char #1247, byte #1246): check content encoding, does not look like UTF-8_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportInvalidOther(UTF8Reader.java:520)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportDeferredInvalid(UTF8Reader.java:531)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.read(UTF8Reader.java:177)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.loadMore(CsvDecoder.java:458)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder._nextUnquotedString(CsvDecoder.java:782)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.nextString(CsvDecoder.java:732)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:963)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:763)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:321)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:283)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.next(MappingIterator.java:199)_{color}}} {{{color:#de350b} _... 11 more_{color}}} {{My code is the following:}} {color:#0747a6}_{{{}val env = StreamExecutionEnvironment.createLocalEnvironment(){}}}{{{}val csvFormat = CsvReaderFormat.forPojo(Empresa::class.java){}}}_{color} {color:#0747a6}_{{val csvSource = FileSource}}_{color} {color:#0747a6}_{{.forRecordStreamFormat(csvFormat, Path("/miuler/PadronRUC_202410.csv"))}}_{color} {color:#0747a6}_{{.build()}}_{color} {color:#0747a6}_{{val empresaStreamSource = env.fromSource(csvSource, WatermarkStrategy.noWatermarks(), "CSV Source")}}_{color} {color:#0747a6}_{{empresaStreamSource.print()}}_{color} {color:#0747a6}_{{env.execute("Load CSV")}}_{color} My dependencies: _{color:#0747a6}{{val kotlinVersion = "1.20.0"}}{color}_ _{color:#0747a6}{{dependencies {}}{color}_ _{color:#0747a6}{{implementation("org.apache.flink:flink-shaded-jackson:2.15.3-19.0")}}{color}_ _{color:#0747a6}{{implementation("org.apache.flink:flink-core:$kotlinVersion")}}{color}_ _{color:#0747a6}{{implementation("org.apache.flink:flink-runtime:$kotlinVersion")}}{color}_ _{color:#0747a6}{{implementation("org.apache.flink:flink-runtime-web:$kotlinVersion")}}{color}_ _{color:#0747a6}{{implementation("org.apache.flink:flink-clients:$kotlinVersion")}}{color}_ _{color:#0747a6}{{implementation("org.apache.flink:flink-streaming-java:$kotlinVersion")}}{color}_ _{color:#0747a6}{{implementation("org.apache.flink:flink-csv:$kotlinVersion")}}{color}_ _{color:#0747a6}{{implementation("org.apache.flink:flink-connector-base:$kotlinVersion")}}{color}_ _{color:#0747a6}{{implementation("org.apache.flink:flink-connector-files:$kotlinVersion")}}{color}_ _{color:#0747a6}}{color}_ was: I have error in read csv with charset ISO-8859, my error is the following: {{{color:#de350b}_Caused by: java.io.CharConversionException: Invalid UTF-8 middle byte 0x41 (at char #1247, byte #1246): check content encoding, does not look like UTF-8_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportInvalidOther(UTF8Reader.java:520)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportDeferredInvalid(UTF8Reader.java:531)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.read(UTF8Reader.java:177)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.loadMore(CsvDecoder.java:458)_{color}}} {{{color:#de350b}
[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Summary: Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+ (was: Flink SQL temporal JOINs behavior change from Flink 1.15 to Flink 1.18+) > Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+ > -- > > Key: FLINK-36626 > URL: https://issues.apache.org/jira/browse/FLINK-36626 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.18.1, 1.20.0 > Environment: AWS Managed Apache Flink >Reporter: Eduardo Breijo >Priority: Critical > Attachments: Flink-SQL-query.txt > > > There is a behavior change I found when migrating to Flink 1.18+ from Flink > 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin > point and is causing the query below to output different results. > *Flink SQL Query:* > ~WITH assets_setpoint AS (~ > ~SELECT~ > ~asset_id,~ > ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ > ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ > ~LAST_VALUE(`value`) AS `value`~ > ~FROM asset_readings~ > ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ > ~ON metric.metric_id = asset_readings.metric_id~ > ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ > ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ > ~)~ > ~SELECT~ > ~assets_supply_air_temp.`timestamp`,~ > ~assets_supply_air_temp.asset_id,~ > ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ > ~FROM (~ > ~SELECT asset_readings.`timestamp`,~ > ~asset_readings.asset_id,~ > ~asset_readings.`value` AS `value`~ > ~FROM asset_readings~ > ~-- Metrics temporal lookup inner join~ > ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ > ~ON metric.metric_id = asset_readings.metric_id~ > ~-- Assets to ignore for this computed metric definition temporal lookup > left join~ > ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME > AS OF `proctime`~ > ~ON > asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id > = :computedMetricDefinitionId~ > ~AND asset_to_ignore_per_computed_metric_definition.asset_id = > asset_readings.asset_id~ > ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ > ~-- Filter assets not present in the asset to ignore for this computed > metric definition table~ > ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ > ~) AS assets_supply_air_temp~ > ~INNER JOIN assets_setpoint~ > ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ > ~WHERE assets_supply_air_temp.`timestamp` BETWEEN > assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ > *Schema:* > ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~ > ~| name | type | null | key | extras | > watermark |~ > ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~ > ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | > SOURCE_WATERMARK() |~ > ~| asset_id | BIGINT | TRUE | | | > |~ > ~| metric_id | INT | TRUE | | | > |~ > ~| value | DOUBLE | TRUE | | | > |~ > ~| metadata | MAP | TRUE | | | > |~ > ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | > |~ > ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~ > ~6 rows in set~ > ~++~ > ~| table name |~ > ~++~ > ~| asset_readings |~ > ~| asset_relationship_parent_to_unit |~ > ~| asset_to_ignore_per_computed_metric_definition |~ > ~| metric |~ > ~++~ > Results: > * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - > assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and > assets_setp
[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+
[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Breijo updated FLINK-36626: --- Description: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin point and is causing the query below to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~ ~| name | type | null | key | extras | watermark |~ ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~ ~6 rows in set~ ~++~ ~| table name |~ ~++~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~++~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+, for the same query, this difference always results in 0 * On Flink 1.18+, updating the query to use regular join against the metric lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to output the correct value but I don't think regular joins is what I need in
[jira] [Created] (FLINK-36626) Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+
Eduardo Breijo created FLINK-36626: -- Summary: Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+ Key: FLINK-36626 URL: https://issues.apache.org/jira/browse/FLINK-36626 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.20.0, 1.18.1 Environment: AWS Managed Apache Flink Reporter: Eduardo Breijo Attachments: Flink-SQL-query.txt There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL joins that I haven't been able to pin point and is causing the query to output different results. Flink SQL Query: ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+ this difference always results in 0 I have tried updating the query using different formats but I have not found a workaround and I don't know why this is happening. Attached you will find a file with the different SQL formats I have tried with no luck. Any help would be appreciated -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36161][docs]Update Integration Test Example with Sink API. [flink]
RanJinh opened a new pull request, #25590: URL: https://github.com/apache/flink/pull/25590 ## What is the purpose of the change To update the `Testing Flink Jobs` Docs example from deprecated `SinkFunction` to `Sink`. ## Brief change log - Using `Sink` instead of `SinkFunction`, since `SinkFunction` is deprecated in flink 1.20. ## Verifying this change - Document address: - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/#testing-flink-jobs - https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/testing/#%e6%b5%8b%e8%af%95-flink-%e4%bd%9c%e4%b8%9a ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-4602][State] Add constructors for o.a.f.contrib.streaming.state.EmbeddedRocksDBStateBackend to maintain interface compatibility. [flink]
Zakelly merged PR #25586: URL: https://github.com/apache/flink/pull/25586 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-4602) Move RocksDB backend to proper package
[ https://issues.apache.org/jira/browse/FLINK-4602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893675#comment-17893675 ] Zakelly Lan edited comment on FLINK-4602 at 10/30/24 2:32 AM: -- Merge 316daca and 255ca52 into master was (Author: zakelly): Merge 316daca into master > Move RocksDB backend to proper package > -- > > Key: FLINK-4602 > URL: https://issues.apache.org/jira/browse/FLINK-4602 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / State Backends >Reporter: Aljoscha Krettek >Assignee: Han Yin >Priority: Major > Labels: 2.0-related, auto-unassigned, pull-request-available > Fix For: 2.0.0 > > > Right now the package is {{org.apache.flink.contrib.streaming.state}}, it > should probably be in {{org.apache.flink.runtime.state.rocksdb}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36621) Build failure: StatefulSink not found
[ https://issues.apache.org/jira/browse/FLINK-36621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-36621: -- Assignee: LvYanquan > Build failure: StatefulSink not found > - > > Key: FLINK-36621 > URL: https://issues.apache.org/jira/browse/FLINK-36621 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 2.0-preview >Reporter: Piotr Nowojski >Assignee: LvYanquan >Priority: Blocker > Labels: pull-request-available > > Locally in the IntelliJ building Flink fails for me due to: > {code:java} > flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42 > java: cannot access org.apache.flink.api.connector.sink2.StatefulSink > class file for org.apache.flink.api.connector.sink2.StatefulSink not found > {code} > flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in > turns is still referring to the StatefulSink: > {code:java} > public class KafkaSink implements StatefulSink, > TwoPhaseCommittingSink (...) > {code} > which has been deleted in FLINK-36245. I think maven builds might be working > due to some luck and differences between how IntelliJ and Maven are > interpreting pom files and dealing with the dependencies. > CC [~kunni] [~renqs] [~Leonard] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36624) Log JobID in SourceCoordinator
[ https://issues.apache.org/jira/browse/FLINK-36624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-36624: -- Assignee: Piotr Nowojski > Log JobID in SourceCoordinator > -- > > Key: FLINK-36624 > URL: https://issues.apache.org/jira/browse/FLINK-36624 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > > Currently log entries from the SourceCoordinator are not tagged with the > JobID, which could be quite easily done. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36624) Log JobID in SourceCoordinator
Piotr Nowojski created FLINK-36624: -- Summary: Log JobID in SourceCoordinator Key: FLINK-36624 URL: https://issues.apache.org/jira/browse/FLINK-36624 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Piotr Nowojski Currently log entries from the SourceCoordinator are not tagged with the JobID, which could be quite easily done. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]
HuangZhenQiu commented on code in PR #130: URL: https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1821361818 ## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java: ## @@ -0,0 +1,56 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Collections; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link LineageUtil}. */ +public class LineageUtilTest { +@Test +public void testSourceLineageVertexOf() { +LineageDataset dataset = Mockito.mock(LineageDataset.class); Review Comment: As called out by @AHeise, we need to move out from Mockito with testing classes. I am thinking. I should probably add these helper test classes in flink-core rather than implement in each of connector. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36077][Connectors/Google PubSub] Implement table api support for SinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on PR #30: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/30#issuecomment-2444821498 @snuyanzin thanks for the review, I refactored the factory tests and removed unnecessary public access modifiers, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-16851) Add common metrics to the SourceReader base implementation.
[ https://issues.apache.org/jira/browse/FLINK-16851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893911#comment-17893911 ] Jiangjie Qin commented on FLINK-16851: -- I think most of the metrics have been added over time. But it would be good to do a check and see if any common metric is missing. If there is no metric missing, we can just close this ticket. > Add common metrics to the SourceReader base implementation. > --- > > Key: FLINK-16851 > URL: https://issues.apache.org/jira/browse/FLINK-16851 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Jiangjie Qin >Priority: Major > > Add the metrics to the base SourceReader implementation. This is relevant to > [FLIP-33|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics?src=contextnavpagetreemode]]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
[ https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zjjiang updated FLINK-36620: Description: Currently, most of FlinkCDC's command line arguments are supported in the format `--$KEY $VALUE` or `--$KEY=$VALUE`, e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the `--flink-home=$FLINK_HOME` format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. was: Currently, most of FlinkCDC's command line arguments are supported in the format `--$KEY $VALUE` or `--$KEY=$VALUE`, e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the `--flink-home=$FLINK_HOME` format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. > Add support for the flink-home parameter to be set in both “--flink-home > $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats > > > Key: FLINK-36620 > URL: https://issues.apache.org/jira/browse/FLINK-36620 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1 >Reporter: zjjiang >Priority: Major > Fix For: cdc-3.3.0 > > > Currently, most of FlinkCDC's command line arguments are supported in the > format `--$KEY $VALUE` or `--$KEY=$VALUE`, e.g. --jar, but, except for flink > home, which only supports space spacing. Users who use the > `--flink-home=$FLINK_HOME` format on the command line (trying to be > consistent with the other = spacing arguments) will not be able to set flink > home correctly. > In particular, when there is an environment variable $FLINK_HOME and you want > to override it by setting --flink-home=/path/to/new/flink/home, you will find > that it does not work. > We would like to support the flink-home parameter in both --flink-home > $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid > formatting differences and runtime exceptions when using command line > arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-27632) Improve connector testing framework to support more cases
[ https://issues.apache.org/jira/browse/FLINK-27632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren reassigned FLINK-27632: - Assignee: Poorvank Bhatia > Improve connector testing framework to support more cases > - > > Key: FLINK-27632 > URL: https://issues.apache.org/jira/browse/FLINK-27632 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.16.0 >Reporter: Qingsheng Ren >Assignee: Poorvank Bhatia >Priority: Major > Fix For: 2.0.0 > > > In order to make connector testing framework available for more connectors, > including Table /SQL connectors, more test cases are required to cover more > scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36011][runtime] Improved logging in StateTransitionManager [flink]
flinkbot commented on PR #25589: URL: https://github.com/apache/flink/pull/25589#issuecomment-2444516868 ## CI report: * b3233bfef81e4faf868f0bf92bcf742594e7e5a8 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36011][runtime] Improved logging in StateTransitionManager [flink]
ztison opened a new pull request, #25589: URL: https://github.com/apache/flink/pull/25589 ## What is the purpose of the change This PR fixes the log-level regression introduced in https://github.com/apache/flink/pull/25280#discussion_r1818949028 ## Brief change log *(for example:)* - The log level reverted back to the INFO in the StateTransitionManager - Added a few more log lines in the StateTransitionManager ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in batch mode [flink]
dawidwys commented on code in PR #24699: URL: https://github.com/apache/flink/pull/24699#discussion_r1820335891 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java: ## @@ -51,7 +71,95 @@ public BatchExecMatch( } @Override -public boolean isProcTime(RowType inputRowType) { -return true; +public void checkOrderKeys(RowType inputRowType) { +SortSpec orderKeys = matchSpec.getOrderKeys(); +if (orderKeys.getFieldSize() == 0) { +throw new TableException("You must specify non-empty order by."); +} + +SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0); +int timeOrderFieldIdx = timeOrderField.getFieldIndex(); +LogicalType timeOrderFieldType = inputRowType.getTypeAt(timeOrderFieldIdx); + +if (!TypeCheckUtils.isTimePoint(timeOrderFieldType)) { +throw new TableException("You must specify time point for order by as the first one."); +} + +// time ordering needs to be ascending +if (!orderKeys.getAscendingOrders()[0]) { +throw new TableException("Primary sort order of a table must be ascending on time."); +} +} + +@Override +protected Transformation translateOrder( +PlannerBase planner, +Transformation inputTransform, +RowType inputRowType, +ExecEdge inputEdge, +ExecNodeConfig config) { +if (isProcTime(inputRowType)) { +// In proctime process records in the order they come. +return inputTransform; +} + +SortSpec sortSpec = matchSpec.getOrderKeys(); +RowType inputType = (RowType) inputEdge.getOutputType(); +SortCodeGenerator codeGen = +new SortCodeGenerator( +config, planner.getFlinkContext().getClassLoader(), inputType, sortSpec); +SortOperator operator = Review Comment: > Correct me if I'm wrong: In batch mode events for given key are buffered in CepOperator state and they all are processed when onEventTime(MAX_WATERMARK) is called. onEventTime() is called exactly once for each key. In onEventTime() CepOperator iterates MapState> in ascending order by key (ties are resolved using EventComparator comparator). Yes, that's my understanding as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28177) Elasticsearch6DynamicSinkITCase.testWritingDocumentsNoPrimaryKey failed with 503 Service Unavailable
[ https://issues.apache.org/jira/browse/FLINK-28177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893907#comment-17893907 ] Ahmed Hamdy commented on FLINK-28177: - [~martijnvisser] I see the [PR|https://github.com/apache/flink-connector-elasticsearch/pull/48] is merged. Could we close this ticket? > Elasticsearch6DynamicSinkITCase.testWritingDocumentsNoPrimaryKey failed with > 503 Service Unavailable > > > Key: FLINK-28177 > URL: https://issues.apache.org/jira/browse/FLINK-28177 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: KurtDing >Priority: Critical > Labels: pull-request-available, stale-assigned, test-stability > Attachments: image-2022-07-21-10-48-33-213.png > > > {code:java} > 2022-06-21T07:39:23.9065585Z Jun 21 07:39:23 [ERROR] Tests run: 4, Failures: > 0, Errors: 2, Skipped: 0, Time elapsed: 43.125 s <<< FAILURE! - in > org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkITCase > 2022-06-21T07:39:23.9068457Z Jun 21 07:39:23 [ERROR] > org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkITCase.testWritingDocumentsNoPrimaryKey > Time elapsed: 8.697 s <<< ERROR! > 2022-06-21T07:39:23.9069955Z Jun 21 07:39:23 > java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > 2022-06-21T07:39:23.9071135Z Jun 21 07:39:23 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > 2022-06-21T07:39:23.9072225Z Jun 21 07:39:23 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > 2022-06-21T07:39:23.9073408Z Jun 21 07:39:23 at > org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:118) > 2022-06-21T07:39:23.9075081Z Jun 21 07:39:23 at > org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:81) > 2022-06-21T07:39:23.9076560Z Jun 21 07:39:23 at > org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkITCase.testWritingDocumentsNoPrimaryKey(Elasticsearch6DynamicSinkITCase.java:286) > 2022-06-21T07:39:23.9078535Z Jun 21 07:39:23 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-06-21T07:39:23.9079534Z Jun 21 07:39:23 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-06-21T07:39:23.9080702Z Jun 21 07:39:23 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-06-21T07:39:23.9081838Z Jun 21 07:39:23 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-06-21T07:39:23.9082942Z Jun 21 07:39:23 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-06-21T07:39:23.9084127Z Jun 21 07:39:23 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-06-21T07:39:23.9085246Z Jun 21 07:39:23 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-06-21T07:39:23.9086380Z Jun 21 07:39:23 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-06-21T07:39:23.9087812Z Jun 21 07:39:23 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-06-21T07:39:23.9088843Z Jun 21 07:39:23 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-06-21T07:39:23.9089823Z Jun 21 07:39:23 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-06-21T07:39:23.9103797Z Jun 21 07:39:23 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-06-21T07:39:23.9105022Z Jun 21 07:39:23 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-06-21T07:39:23.9106065Z Jun 21 07:39:23 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-06-21T07:39:23.9107500Z Jun 21 07:39:23 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-06-21T07:39:23.9108591Z Jun 21 07:39:23 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-06-21T07:39:23.9109575Z Jun 21 07:39:23 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-06-21T07:39:23.9110606Z Jun 21 07:39:23 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > 2022-06-21T07:39:23.9111634Z Jun 21 07:39:23 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > 2022-06-21T07:39:23.9112653Z Jun 21 07:39:23 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:29
[jira] [Created] (FLINK-36623) Improve logging in DefaultStateTransitionManager
Roman Khachatryan created FLINK-36623: - Summary: Improve logging in DefaultStateTransitionManager Key: FLINK-36623 URL: https://issues.apache.org/jira/browse/FLINK-36623 Project: Flink Issue Type: Improvement Reporter: Roman Khachatryan Assignee: Zdenek Tison Fix For: 1.20.1 When the job transitions from one state to another, e.g. restarts when new slots are available; it's not visible in the logs unless log.level is debug. Therefore, it'd make sense to: # Change log level from DEBUG to INFO # Log job ID when such transition happens -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36455] Sinks retry synchronously [flink]
AHeise commented on code in PR #25547: URL: https://github.com/apache/flink/pull/25547#discussion_r1820836505 ## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java: ## @@ -164,41 +165,37 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { private void commitAndEmitCheckpoints() throws IOException, InterruptedException { long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId; -do { -for (CheckpointCommittableManager manager : - committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { -commitAndEmit(manager); -} -// !committableCollector.isFinished() indicates that we should retry -// Retry should be done here if this is a final checkpoint (indicated by endInput) -// WARN: this is an endless retry, may make the job stuck while finishing -} while (!committableCollector.isFinished() && endInput); - -if (!committableCollector.isFinished()) { -// if not endInput, we can schedule retrying later -retryWithDelay(); +for (CheckpointCommittableManager checkpointManager : + committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { +// ensure that all committables of the first checkpoint are fully committed before +// attempting the next committable +commitAndEmit(checkpointManager); +committableCollector.remove(checkpointManager); } -committableCollector.compact(); } private void commitAndEmit(CheckpointCommittableManager committableManager) throws IOException, InterruptedException { -Collection> committed = committableManager.commit(committer); -if (emitDownstream && committableManager.isFinished()) { -int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); -int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); -output.collect( -new StreamRecord<>(committableManager.getSummary(subtaskId, numberOfSubtasks))); -for (CommittableWithLineage committable : committed) { -output.collect(new StreamRecord<>(committable.withSubtaskId(subtaskId))); -} +committableManager.commit(committer, MAX_RETRIES); +if (emitDownstream) { +emit(committableManager); } } -private void retryWithDelay() { -processingTimeService.registerTimer( -processingTimeService.getCurrentProcessingTime() + RETRY_DELAY, -ts -> commitAndEmitCheckpoints()); +private void emit(CheckpointCommittableManager committableManager) { +int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); +int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); Review Comment: What's the concern here? Peformance? JVM should inline the call to pretty much result into a field access. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36011] [runtime] Generalize RescaleManager to become StateTransitionManager [flink]
ztison commented on code in PR #25280: URL: https://github.com/apache/flink/pull/25280#discussion_r1820968233 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java: ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.Temporal; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Supplier; + +/** + * {@code DefaultStateTransitionManager} is a state machine which manages the {@link + * AdaptiveScheduler}'s state transitions based on the previous transition time and the available + * resources. See {@link Phase} for details on each individual phase of this state machine. Note: We + * use the term phase here to avoid confusion with the state used in the {@link AdaptiveScheduler}. + * + * + * {@link Cooldown} + * | + * +--> {@link Idling} + * | | + * | V + * +--> {@link Stabilizing} + * | + * +--> {@link Stabilized} --> {@link Idling} + * | | + * | V + * \--> {@link Transitioning} + * + * + * Thread-safety: This class is not implemented in a thread-safe manner and relies on the fact + * that any method call happens within a single thread. + * + * @see Executing + */ +@NotThreadSafe +public class DefaultStateTransitionManager implements StateTransitionManager { + +private static final Logger LOG = LoggerFactory.getLogger(DefaultStateTransitionManager.class); + +private final Supplier clock; +private final StateTransitionManager.Context transitionContext; +private Phase phase; +private final List> scheduledFutures; + +@VisibleForTesting final Duration cooldownTimeout; +@Nullable @VisibleForTesting final Duration resourceStabilizationTimeout; +@VisibleForTesting final Duration maxTriggerDelay; + +DefaultStateTransitionManager( +Temporal initializationTime, +StateTransitionManager.Context transitionContext, +Duration cooldownTimeout, +@Nullable Duration resourceStabilizationTimeout, +Duration maxTriggerDelay) { +this( +initializationTime, +Instant::now, +transitionContext, +cooldownTimeout, +resourceStabilizationTimeout, +maxTriggerDelay); +} + +@VisibleForTesting +DefaultStateTransitionManager( +Temporal initializationTime, +Supplier clock, +StateTransitionManager.Context transitionContext, +Duration cooldownTimeout, +@Nullable Duration resourceStabilizationTimeout, +Duration maxTriggerDelay) { + +this.clock = clock; +this.maxTriggerDelay = maxTriggerDelay; +this.cooldownTimeout = cooldownTimeout; +this.resourceStabilizationTimeout = resourceStabilizationTimeout; +this.transitionContext = transitionContext; +this.scheduledFutures = new ArrayList<>(); +this.phase = new Cooldown(initializationTime, clock, this, cooldownTimeout); +} + +@Override +public void onChange() { +phase.onChange(); +} + +@Override +public void onTrigger() { +phase.onTrigger(); +} + +@Override +public void close() { +scheduledFutures.forEach(future -> future.cancel(true)); +scheduledFutures.clear(); +} + +@VisibleForTesting +Phase getPhase() { +return phase; +} + +private void progressToIdling() { +progressToPhase(new Idling(clock, this)); +} + +private void progressToStabilizing(Temporal firstChangeEventTimestamp) { +progressToPhase( +new Stabilizing( +clock, +
Re: [PR] [hotfix] Replace System.out.println with logger for better log management [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #31: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/31#discussion_r1821436745 ## flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java: ## @@ -21,11 +21,14 @@ import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.math.BigInteger; /** Helper class to send PubSubMessages to a PubSub topic. */ class PubSubPublisher { +private static final Logger LOG = LoggerFactory.getLogger(PubSubExample.class); Review Comment: ```suggestion private static final Logger LOG = LoggerFactory.getLogger(PubSubPublisher.class); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32315) Support local file upload in K8s mode
[ https://issues.apache.org/jira/browse/FLINK-32315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893734#comment-17893734 ] Ferenc Csaky commented on FLINK-32315: -- When I developed this, the "s3" related config options required to be added to the {{{}flink-conf.yaml{}}}, cause those props are not picked up if they given as a dynamic param during. For Flink 1.20 it was already renamed {{{}config.yaml{}}}, but that should not matter for this particular case I believe. I have the following gist about how I was setting up Minio, which might be outdated, cause it uses the latest img, but still could be helpful: [https://gist.github.com/ferenc-csaky/fd7fee71d89cd389cac2da4a4471ab65] > Support local file upload in K8s mode > - > > Key: FLINK-32315 > URL: https://issues.apache.org/jira/browse/FLINK-32315 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Deployment / Kubernetes >Reporter: Paul Lin >Assignee: Ferenc Csaky >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-10-29-11-32-25-979.png > > > Currently, Flink assumes all resources are locally accessible in the pods, > which requires users to prepare the resources by mounting storages, > downloading resources with init containers, or rebuilding images for each > execution. > We could make things much easier by introducing a built-in file distribution > mechanism based on Flink-supported filesystems. It's implemented in two steps: > > 1. KubernetesClusterDescripter uploads all local resources to remote storage > via Flink filesystem (skips if the resources are already remote). > 2. KubernetesApplicationClusterEntrypoint and KubernetesTaskExecutorRunner > download the resources and put them in the classpath during startup. > > The 2nd step is mostly done by > [FLINK-28915|https://issues.apache.org/jira/browse/FLINK-28915], thus this > issue is focused on the upload part. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32315) Support local file upload in K8s mode
[ https://issues.apache.org/jira/browse/FLINK-32315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893734#comment-17893734 ] Ferenc Csaky edited comment on FLINK-32315 at 10/29/24 8:31 AM: When I developed this, the "s3" related config options required to be added to the {{flink-conf.yaml}}, cause those props are not picked up if they given as a dynamic param in the {{flink run}} command. For Flink 1.20 it was already renamed {{config.yaml}}, but that should not matter for this particular case I believe. I have the following gist about how I was setting up Minio, which might be outdated, cause it uses the latest img, but still could be helpful: [https://gist.github.com/ferenc-csaky/fd7fee71d89cd389cac2da4a4471ab65] was (Author: JIRAUSER306586): When I developed this, the "s3" related config options required to be added to the {{{}flink-conf.yaml{}}}, cause those props are not picked up if they given as a dynamic param during. For Flink 1.20 it was already renamed {{{}config.yaml{}}}, but that should not matter for this particular case I believe. I have the following gist about how I was setting up Minio, which might be outdated, cause it uses the latest img, but still could be helpful: [https://gist.github.com/ferenc-csaky/fd7fee71d89cd389cac2da4a4471ab65] > Support local file upload in K8s mode > - > > Key: FLINK-32315 > URL: https://issues.apache.org/jira/browse/FLINK-32315 > Project: Flink > Issue Type: New Feature > Components: Client / Job Submission, Deployment / Kubernetes >Reporter: Paul Lin >Assignee: Ferenc Csaky >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-10-29-11-32-25-979.png > > > Currently, Flink assumes all resources are locally accessible in the pods, > which requires users to prepare the resources by mounting storages, > downloading resources with init containers, or rebuilding images for each > execution. > We could make things much easier by introducing a built-in file distribution > mechanism based on Flink-supported filesystems. It's implemented in two steps: > > 1. KubernetesClusterDescripter uploads all local resources to remote storage > via Flink filesystem (skips if the resources are already remote). > 2. KubernetesApplicationClusterEntrypoint and KubernetesTaskExecutorRunner > download the resources and put them in the classpath during startup. > > The 2nd step is mostly done by > [FLINK-28915|https://issues.apache.org/jira/browse/FLINK-28915], thus this > issue is focused on the upload part. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36497][table]Remove all deprecated methods `CatalogTable` [flink]
flinkbot commented on PR #25585: URL: https://github.com/apache/flink/pull/25585#issuecomment-2443475748 ## CI report: * 33ea97c1bdd9ca2a9e0265600ae2ea36cad24476 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics after serialization/deserialization for cdc formats
[ https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yubin Li updated FLINK-36547: - Description: As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent in many scenarios. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in bussiness. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! was: As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent 在需要区别处理的场景下in many scenarios. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in bussiness. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! > Add option to retain `RowKind` sematics after serialization/deserialization > for cdc formats > --- > > Key: FLINK-36547 > URL: https://issues.apache.org/jira/browse/FLINK-36547 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 2.0.0 >Reporter: Yubin Li >Priority: Major > Attachments: image-2024-10-16-11-01-54-790.png, > image-2024-10-16-11-02-34-406.png > > > As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> > +I > {code:java} > Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL > as Debezium JSON or Avro messages, and emit to external systems like Kafka. > However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a > single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and > UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} > In fact, we also have a demand to make the `RowKind` sematics consistent in > many scenarios. we have taken advantage of the difference between > UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well > in bussiness. > {code:java} > create table datagen1 (id int, name string) with > ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', > 'fields.id.max'='2'); > // add 'debezium-json.retain.rowkind' = 'true' > create table t2 (id int, name string, num bigint) WITH ( > 'topic' = 't2', > 'con
[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics after serialization/deserialization for cdc formats
[ https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yubin Li updated FLINK-36547: - Description: As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent in many scenarios, such as those that require different processing of -U/-D and +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in bussiness. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! was: As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent in many scenarios. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in bussiness. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! > Add option to retain `RowKind` sematics after serialization/deserialization > for cdc formats > --- > > Key: FLINK-36547 > URL: https://issues.apache.org/jira/browse/FLINK-36547 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 2.0.0 >Reporter: Yubin Li >Priority: Major > Attachments: image-2024-10-16-11-01-54-790.png, > image-2024-10-16-11-02-34-406.png > > > As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> > +I > {code:java} > Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL > as Debezium JSON or Avro messages, and emit to external systems like Kafka. > However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a > single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and > UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} > In fact, we also have a demand to make the `RowKind` sematics consistent in > many scenarios, such as those that require different processing of -U/-D and > +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and > UPDATE_AFTER to implement the feature and made it run well in bussiness. > {code:java} > create table datagen1 (id int, name string) with > ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', > 'fields.id.max'='2'); > // add 'debezi
[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics after serialization/deserialization for cdc formats
[ https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yubin Li updated FLINK-36547: - Description: As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent 在需要区别处理的场景下in many scenarios. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in bussiness. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! was: As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent in many scenarios. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in bussiness. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! > Add option to retain `RowKind` sematics after serialization/deserialization > for cdc formats > --- > > Key: FLINK-36547 > URL: https://issues.apache.org/jira/browse/FLINK-36547 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 2.0.0 >Reporter: Yubin Li >Priority: Major > Attachments: image-2024-10-16-11-01-54-790.png, > image-2024-10-16-11-02-34-406.png > > > As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> > +I > {code:java} > Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL > as Debezium JSON or Avro messages, and emit to external systems like Kafka. > However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a > single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and > UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} > In fact, we also have a demand to make the `RowKind` sematics consistent > 在需要区别处理的场景下in many scenarios. we have taken advantage of the difference > between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it > run well in bussiness. > {code:java} > create table datagen1 (id int, name string) with > ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', > 'fields.id.max'='2'); > // add 'debezium-json.retain.rowkind' = 'true' > create table t2 (id int, name string, num bigint) WITH ( > 'topic' = 't2',
Re: [PR] [FLINK-35268][state] Add ttl interface for Async State API && implement TtlListStateV2/TtlValueStateV2 [flink]
fredia commented on code in PR #25515: URL: https://github.com/apache/flink/pull/25515#discussion_r1820233250 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java: ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.ttl; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.runtime.state.ttl.TtlStateContext; +import org.apache.flink.runtime.state.ttl.TtlUtils; +import org.apache.flink.runtime.state.ttl.TtlValue; +import org.apache.flink.runtime.state.v2.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState +extends AbstractTtlState, InternalListState>> +implements InternalListState { + +protected TtlListState( +TtlStateContext>, T> ttlStateContext) { +super(ttlStateContext); +} + +@Override +public StateFuture asyncUpdate(List values) { +Preconditions.checkNotNull(values, "List of values to add cannot be null."); +return original.asyncUpdate(withTs(values)); +} + +@Override +public StateFuture asyncAddAll(List values) { +Preconditions.checkNotNull(values, "List of values to add cannot be null."); +return original.asyncAddAll(withTs(values)); +} + +@Override +public StateFuture> asyncGet() { +// 1. The timestamp of elements in list state isn't updated when get even if updateTsOnRead +// is true. +// 2. we don't clear state here cause forst is LSM-tree based. +return original.asyncGet().thenApply(stateIter -> new AsyncIteratorWrapper(stateIter)); +} + +@Override +public StateFuture asyncAdd(T value) { +return original.asyncAdd(value == null ? null : wrapWithTs(value)); +} + +@Override +public Iterable get() { +Iterable> ttlValue = original.get(); +ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; +final Iterable> finalResult = ttlValue; +return () -> new IteratorWithCleanup(finalResult.iterator()); +} + +@Override +public void add(T value) { +original.add(value == null ? null : wrapWithTs(value)); +} + +@Override +public void update(List values) { +Preconditions.checkNotNull(values, "List of values to add cannot be null."); +original.update(withTs(values)); +} + +@Override +public void addAll(List values) { +Preconditions.checkNotNull(values, "List of values to add cannot be null."); +original.addAll(withTs(values)); +} + +private List collect(Iterable iterable) { +if (iterable instanceof List) { +return (List) iterable; +} else { +List list = new ArrayList<>(); +for (E element : iterable) { +list.add(element); +} +return list; +} +} + +private List> withTs(List values) { +long currentTimestamp = timeProvider.currentTimestamp(); +List> withTs = new ArrayList<>(values.size()); +for (T value : values) { +Preconditions.checkNotNull(value, "You cannot have null element in a ListState."); +withTs.add(TtlUtils.wrapWithTs(value, currentTimestamp)); +} +return withTs; +} + +private class IteratorWithCleanup implements Iterator { +private final Iterator> originalIterator; +private boolean anyUnexpired = false; +private boolean uncleared = true; +private T nextUnexpired = null; + +private IteratorWithCleanup(Iterator> ttlIterat
[jira] [Updated] (FLINK-36497) Remove all deprecated methods in `CatalogTable`
[ https://issues.apache.org/jira/browse/FLINK-36497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36497: --- Labels: pull-request-available (was: ) > Remove all deprecated methods in `CatalogTable` > --- > > Key: FLINK-36497 > URL: https://issues.apache.org/jira/browse/FLINK-36497 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: xuyang >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36497][table]Remove all deprecated methods `CatalogTable` [flink]
Edward-Gavin opened a new pull request, #25585: URL: https://github.com/apache/flink/pull/25585 ## What is the purpose of the change Remove all deprecated methods in `CatalogTable` ## Brief change log Remove all deprecated methods in `CatalogTable` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
zjjiang created FLINK-36620: --- Summary: Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats Key: FLINK-36620 URL: https://issues.apache.org/jira/browse/FLINK-36620 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.1.1, cdc-3.2.0, cdc-3.1.0 Reporter: zjjiang Fix For: cdc-3.3.0 Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink- home, which only supports space spacing. Users who use the "--flink-home=$FLINK_HOME" format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
[ https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zjjiang updated FLINK-36620: Description: Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the "--flink-home=$FLINK_HOME" format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. was: Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink- home, which only supports space spacing. Users who use the "--flink-home=$FLINK_HOME" format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. > Add support for the flink-home parameter to be set in both “--flink-home > $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats > > > Key: FLINK-36620 > URL: https://issues.apache.org/jira/browse/FLINK-36620 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1 >Reporter: zjjiang >Priority: Major > Fix For: cdc-3.3.0 > > > Currently, most of FlinkCDC's command line arguments are supported in the > format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink > home, which only supports space spacing. Users who use the > "--flink-home=$FLINK_HOME" format on the command line (trying to be > consistent with the other = spacing arguments) will not be able to set flink > home correctly. > In particular, when there is an environment variable $FLINK_HOME and you want > to override it by setting --flink-home=/path/to/new/flink/home, you will find > that it does not work. > We would like to support the flink-home parameter in both --flink-home > $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid > formatting differences and runtime exceptions when using command line > arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
[ https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zjjiang updated FLINK-36620: Description: Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the "-flink-home=$FLINK_HOME" format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. was: Currently, most of FlinkCDC's command line arguments are supported in the format "-$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the "-flink-home=$FLINK_HOME" format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. > Add support for the flink-home parameter to be set in both “--flink-home > $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats > > > Key: FLINK-36620 > URL: https://issues.apache.org/jira/browse/FLINK-36620 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1 >Reporter: zjjiang >Priority: Major > Fix For: cdc-3.3.0 > > > Currently, most of FlinkCDC's command line arguments are supported in the > format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink > home, which only supports space spacing. Users who use the > "-flink-home=$FLINK_HOME" format on the command line (trying to be consistent > with the other = spacing arguments) will not be able to set flink home > correctly. > In particular, when there is an environment variable $FLINK_HOME and you want > to override it by setting --flink-home=/path/to/new/flink/home, you will find > that it does not work. > We would like to support the flink-home parameter in both --flink-home > $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid > formatting differences and runtime exceptions when using command line > arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
[ https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zjjiang updated FLINK-36620: Description: Currently, most of FlinkCDC's command line arguments are supported in the format "-$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the "-flink-home=$FLINK_HOME" format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. was: Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the "--flink-home=$FLINK_HOME" format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. > Add support for the flink-home parameter to be set in both “--flink-home > $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats > > > Key: FLINK-36620 > URL: https://issues.apache.org/jira/browse/FLINK-36620 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1 >Reporter: zjjiang >Priority: Major > Fix For: cdc-3.3.0 > > > Currently, most of FlinkCDC's command line arguments are supported in the > format "-$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink > home, which only supports space spacing. Users who use the > "-flink-home=$FLINK_HOME" format on the command line (trying to be consistent > with the other = spacing arguments) will not be able to set flink home > correctly. > In particular, when there is an environment variable $FLINK_HOME and you want > to override it by setting --flink-home=/path/to/new/flink/home, you will find > that it does not work. > We would like to support the flink-home parameter in both --flink-home > $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid > formatting differences and runtime exceptions when using command line > arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
[ https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zjjiang updated FLINK-36620: Description: Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the "--flink-home=$FLINK_HOME" format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. was: Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the "-flink-home=$FLINK_HOME" format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. > Add support for the flink-home parameter to be set in both “--flink-home > $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats > > > Key: FLINK-36620 > URL: https://issues.apache.org/jira/browse/FLINK-36620 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1 >Reporter: zjjiang >Priority: Major > Fix For: cdc-3.3.0 > > > Currently, most of FlinkCDC's command line arguments are supported in the > format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink > home, which only supports space spacing. Users who use the > "--flink-home=$FLINK_HOME" format on the command line (trying to be > consistent with the other = spacing arguments) will not be able to set flink > home correctly. > In particular, when there is an environment variable $FLINK_HOME and you want > to override it by setting --flink-home=/path/to/new/flink/home, you will find > that it does not work. > We would like to support the flink-home parameter in both --flink-home > $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid > formatting differences and runtime exceptions when using command line > arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]
pawel-big-lebowski commented on code in PR #130: URL: https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1820485021 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java: ## @@ -369,5 +416,43 @@ public ProducerRecord serialize( value, headerProvider != null ? headerProvider.getHeaders(element) : null); } + +@Override +public Optional getKafkaDatasetFacet() { +if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) { +LOG.warn("Cannot identify topics. Not an TopicsIdentifierProvider"); +return Optional.empty(); +} + +Optional topicsIdentifier = +((KafkaDatasetIdentifierProvider) (topicSelector)).getDatasetIdentifier(); + +if (!topicsIdentifier.isPresent()) { +LOG.warn("No topics' identifiers provided"); +return Optional.empty(); +} + +TypeInformation typeInformation; +if (this.valueSerializationSchema instanceof ResultTypeQueryable) { +typeInformation = +((ResultTypeQueryable) this.valueSerializationSchema).getProducedType(); +} else { +// gets type information from serialize method signature +typeInformation = Review Comment: `LineageGraph` in the flink-core contains separate lists of sources and sinks. Given that, I am not sure if we want to distinguish "inputType" from "outputType". From the facet perspective, this should be all `type` and the same facet can be used for both scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36245) Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated method/interface in Sink V2 in 2.0
[ https://issues.apache.org/jira/browse/FLINK-36245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893764#comment-17893764 ] Piotr Nowojski commented on FLINK-36245: Hi [~kunni], [~renqs] and [~leonard]. I think this change has made the build broken/unstable. Locally in the IntelliJ building Flink fails for me due to: {code:java} flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42 java: cannot access org.apache.flink.api.connector.sink2.StatefulSink class file for org.apache.flink.api.connector.sink2.StatefulSink not found {code} flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in turns is still referring to the StatefulSink: {code:java} public class KafkaSink implements StatefulSink, TwoPhaseCommittingSink (...) {code} Maven builds might be working due to some dumb luck. > Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated > method/interface in Sink V2 in 2.0 > --- > > Key: FLINK-36245 > URL: https://issues.apache.org/jira/browse/FLINK-36245 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Common >Reporter: Qingsheng Ren >Assignee: LvYanquan >Priority: Major > Labels: 2.0-related, pull-request-available > Fix For: 2.0-preview > > > SourceFunction, SinkFunction and Sink V1 API has been marked as deprecated > and should be removed in Flink 2.0. > Considering SourceFunction / SinkFunction are heavily used in test cases for > building a simple data generator or a data validator, it could be a huge > amount of work to rewrite all these usages with Source and Sink V2 API. A > viable path for 2.0-preview version would be: > * Move SourceFunction, SinkFunction to an internal package, as a test util > * Rewrite all Sink V1 implementations with Sink V2 directly (the usage of > Sink V1 is low in the main repo) > As a long term working item, all usages of SourceFunction and SinkFunction > will be replaced by Source and Sink API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
[ https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zjjiang updated FLINK-36620: Description: Currently, most of FlinkCDC's command line arguments are supported in the format `--$KEY $VALUE` or `--$KEY=$VALUE`, e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the `--flink-home=$FLINK_HOME` format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. was: Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the --flink-home=$FLINK_HOME format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. > Add support for the flink-home parameter to be set in both “--flink-home > $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats > > > Key: FLINK-36620 > URL: https://issues.apache.org/jira/browse/FLINK-36620 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1 >Reporter: zjjiang >Priority: Major > Fix For: cdc-3.3.0 > > > Currently, most of FlinkCDC's command line arguments are supported in the > format `--$KEY $VALUE` or `--$KEY=$VALUE`, e.g. --jar, but, except for flink > home, which only supports space spacing. Users who use the > `--flink-home=$FLINK_HOME` format on the command line (trying to be > consistent with the other = spacing arguments) will not be able to set flink > home correctly. > In particular, when there is an environment variable $FLINK_HOME and you want > to override it by setting --flink-home=/path/to/new/flink/home, you will find > that it does not work. > We would like to support the flink-home parameter in both --flink-home > $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid > formatting differences and runtime exceptions when using command line > arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]
AHeise commented on code in PR #130: URL: https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1820444037 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java: ## @@ -0,0 +1,16 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier; + +import java.util.Optional; + +/** Contains method which allows extracting topic identifier. */ +public interface KafkaDatasetIdentifierProvider { Review Comment: I double-checked and it's not that common of a pattern. It's used only in ``` OperatorCoordinator.java FileEnumerator.java DynamicFileEnumerator.java FileSplitAssigner.java InternalTimeServiceManager.java ``` So you can just stick to the current approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]
AHeise commented on code in PR #130: URL: https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1820446158 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +/** Utility class with useful methods for managing dataset facets. */ +public class LineageUtil { + +private static final String KAFKA_DATASET_PREFIX = "kafka://"; +private static final String COMMA = ","; +private static final String SEMICOLON = ";"; + +public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) { +return new LineageDataset() { +@Override +public String name() { +if (kafkaDatasetFacet.topicIdentifier.topicPattern != null) { +return kafkaDatasetFacet.topicIdentifier.toString(); +} + +return String.join(",", kafkaDatasetFacet.topicIdentifier.topics); +} + +@Override +public String namespace() { +return namespace; +} + +@Override +public Map facets() { +return Collections.singletonMap( +KafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet); Review Comment: Yes it would work well. Tbh I'm not sure if 1 or 2 is the better approach. Maybe @HuangZhenQiu can weigh in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]
AHeise commented on code in PR #130: URL: https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1815659513 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java: ## @@ -369,5 +416,43 @@ public ProducerRecord serialize( value, headerProvider != null ? headerProvider.getHeaders(element) : null); } + +@Override +public Optional getKafkaDatasetFacet() { +if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) { +LOG.warn("Cannot identify topics. Not an TopicsIdentifierProvider"); +return Optional.empty(); +} + +Optional topicsIdentifier = +((KafkaDatasetIdentifierProvider) (topicSelector)).getDatasetIdentifier(); + +if (!topicsIdentifier.isPresent()) { +LOG.warn("No topics' identifiers provided"); +return Optional.empty(); +} + +TypeInformation typeInformation; +if (this.valueSerializationSchema instanceof ResultTypeQueryable) { +typeInformation = +((ResultTypeQueryable) this.valueSerializationSchema).getProducedType(); +} else { +// gets type information from serialize method signature +typeInformation = + Arrays.stream(this.valueSerializationSchema.getClass().getMethods()) +.map(m -> Invokable.from(m)) +.filter(m -> "serialize".equalsIgnoreCase(m.getName())) +.map(m -> m.getParameters().get(0)) +.filter(p -> !p.getType().equals(TypeToken.of(Object.class))) +.findFirst() +.map(p -> p.getType()) +.map(t -> TypeInformation.of(t.getRawType())) +.orElse(null); Review Comment: This looks way more complicated as it should be. Here is what I had in mind. ``` TypeToken serializationSchemaType = TypeToken.of(valueSerializationSchema.getClass()); Class parameterType = serializationSchemaType.resolveType(SerializationSchema.class.getTypeParameters()[0]).getRawType(); if (parameterType != Object.class) { typeInformation = TypeInformation.of(parameterType); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36616] fix npe in GcpPublisherConfig [flink-connector-gcp-pubsub]
stankiewicz commented on PR #33: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/33#issuecomment-2443713018 fixed style errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]
AHeise commented on code in PR #130: URL: https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1820453815 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java: ## @@ -369,5 +416,43 @@ public ProducerRecord serialize( value, headerProvider != null ? headerProvider.getHeaders(element) : null); } + +@Override +public Optional getKafkaDatasetFacet() { +if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) { +LOG.warn("Cannot identify topics. Not an TopicsIdentifierProvider"); +return Optional.empty(); +} + +Optional topicsIdentifier = +((KafkaDatasetIdentifierProvider) (topicSelector)).getDatasetIdentifier(); + +if (!topicsIdentifier.isPresent()) { +LOG.warn("No topics' identifiers provided"); +return Optional.empty(); +} + +TypeInformation typeInformation; +if (this.valueSerializationSchema instanceof ResultTypeQueryable) { +typeInformation = +((ResultTypeQueryable) this.valueSerializationSchema).getProducedType(); +} else { +// gets type information from serialize method signature +typeInformation = Review Comment: Yes TypeInformationFacet sounds like a general concept. I'm convinced you want to pull it out of the KafkaFacet now. You probably want to name it "inputType" and "outputType" depending on the type of the connector (source/sink). I'd design it generally and pull it up into flink-core for Flink 2.0 later (so make it work in Kafka first and then propose to port it upwards). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
[ https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zjjiang updated FLINK-36620: Description: Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the --flink-home=$FLINK_HOME format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. was: Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the --flink-home=$FLINK_HOME format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. > Add support for the flink-home parameter to be set in both “--flink-home > $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats > > > Key: FLINK-36620 > URL: https://issues.apache.org/jira/browse/FLINK-36620 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1 >Reporter: zjjiang >Priority: Major > Fix For: cdc-3.3.0 > > > Currently, most of FlinkCDC's command line arguments are supported in the > format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink > home, which only supports space spacing. Users who use the > --flink-home=$FLINK_HOME format on the command line (trying to be consistent > with the other = spacing arguments) will not be able to set flink home > correctly. > In particular, when there is an environment variable $FLINK_HOME and you want > to override it by setting --flink-home=/path/to/new/flink/home, you will find > that it does not work. > We would like to support the flink-home parameter in both --flink-home > $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid > formatting differences and runtime exceptions when using command line > arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
[ https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zjjiang updated FLINK-36620: Description: Currently, most of FlinkCDC's command line arguments are supported in the format `\-\-$KEY $VALUE` or `\-\-$KEY=$VALUE`, e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the `\-\-flink-home=$FLINK_HOME` format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. was: Currently, most of FlinkCDC's command line arguments are supported in the format `--$KEY $VALUE` or `--$KEY=$VALUE`, e.g. --jar, but, except for flink home, which only supports space spacing. Users who use the `--flink-home=$FLINK_HOME` format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. > Add support for the flink-home parameter to be set in both “--flink-home > $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats > > > Key: FLINK-36620 > URL: https://issues.apache.org/jira/browse/FLINK-36620 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1 >Reporter: zjjiang >Priority: Major > Fix For: cdc-3.3.0 > > > Currently, most of FlinkCDC's command line arguments are supported in the > format `\-\-$KEY $VALUE` or `\-\-$KEY=$VALUE`, e.g. --jar, but, except for > flink home, which only supports space spacing. Users who use the > `\-\-flink-home=$FLINK_HOME` format on the command line (trying to be > consistent with the other = spacing arguments) will not be able to set flink > home correctly. > In particular, when there is an environment variable $FLINK_HOME and you want > to override it by setting --flink-home=/path/to/new/flink/home, you will find > that it does not work. > We would like to support the flink-home parameter in both --flink-home > $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid > formatting differences and runtime exceptions when using command line > arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics for cdc formats
[ https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yubin Li updated FLINK-36547: - Summary: Add option to retain `RowKind` sematics for cdc formats (was: Add option to retain `RowKind` sematics after serialization/deserialization for cdc formats) > Add option to retain `RowKind` sematics for cdc formats > --- > > Key: FLINK-36547 > URL: https://issues.apache.org/jira/browse/FLINK-36547 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 2.0.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Attachments: image-2024-10-16-11-01-54-790.png, > image-2024-10-16-11-02-34-406.png > > > As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> > +I > {code:java} > Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL > as Debezium JSON or Avro messages, and emit to external systems like Kafka. > However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a > single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and > UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} > In fact, we also have a demand to make the `RowKind` sematics consistent in > many scenarios, such as those that require different processing of -U/-D and > +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and > UPDATE_AFTER to implement the feature and made it run well in bussiness. > {code:java} > create table datagen1 (id int, name string) with > ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', > 'fields.id.max'='2'); > // add 'debezium-json.retain.rowkind' = 'true' > create table t2 (id int, name string, num bigint) WITH ( > 'topic' = 't2', > 'connector' = 'kafka', > 'properties.bootstrap.servers' = 'xx', > 'properties.group.id' = 'test', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'debezium-json', > 'key.format' = 'json', > 'key.fields' = 'id', > 'debezium-json.timestamp-format.standard' = 'ISO-8601', > 'debezium-json.schema-include' = 'false', > 'debezium-json.retain.rowkind' = 'true' > ); > insert into t2 select id, max(name) as name, count(1) as num from datagen1 > group by id; > insert into print1 select * from t2; > {code} > output result: > !image-2024-10-16-11-02-34-406.png|width=660,height=153! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics for cdc formats
[ https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yubin Li updated FLINK-36547: - Description: As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent in many scenarios, such as those that require different processing of -U/-D and +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in business. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! was: As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent in many scenarios, such as those that require different processing of -U/-D and +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in bussiness. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! > Add option to retain `RowKind` sematics for cdc formats > --- > > Key: FLINK-36547 > URL: https://issues.apache.org/jira/browse/FLINK-36547 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 2.0.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Attachments: image-2024-10-16-11-01-54-790.png, > image-2024-10-16-11-02-34-406.png > > > As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> > +I > {code:java} > Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL > as Debezium JSON or Avro messages, and emit to external systems like Kafka. > However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a > single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and > UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} > In fact, we also have a demand to make the `RowKind` sematics consistent in > many scenarios, such as those that require different processing of -U/-D and > +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and > UPDATE_AFTER to implement the feature and made it run well in business. > {code:java} > create table datagen1 (id int, name string) with > ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', > 'fields.id.max
[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics for cdc formats
[ https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yubin Li updated FLINK-36547: - Description: As official docs said, `RowKind` semantics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent in many scenarios, such as those that require different processing of -U/-D and +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in business. implementation details: When serialization, -U/+U are both represented by u, the former has a non-empty `before` field and the latter has a non-empty `after` field; When deserializing data of type u, if `before` is not empty, parsed as -U; if `after` is not empty, parsed as +U. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! was: As official docs said, `RowKind` semantics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent in many scenarios, such as those that require different processing of -U/-D and +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in business. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! > Add option to retain `RowKind` sematics for cdc formats > --- > > Key: FLINK-36547 > URL: https://issues.apache.org/jira/browse/FLINK-36547 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 2.0.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Attachments: image-2024-10-16-11-01-54-790.png, > image-2024-10-16-11-02-34-406.png > > > As official docs said, `RowKind` semantics have been changed: -U -> -D, +D -> > +I > {code:java} > Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL > as Debezium JSON or Avro messages, and emit to external systems like Kafka. > However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a > single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and > UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} > In fact, we also have a demand to make the `RowKind` sematics consistent in > many scenarios, such as those that require different processing of -U/-D and > +U/+I. we have t
Re: [PR] [FLINK-36607][table-planner] Introduce AdaptiveBroadcastJoinProcessor to inject adaptive broadcast join. [flink]
JunRuiLee commented on code in PR #25578: URL: https://github.com/apache/flink/pull/25578#discussion_r1820633873 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala: ## @@ -275,4 +281,32 @@ object JoinUtil { rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode) } } + + def getLargeManagedMemory(joinType: FlinkJoinType, config: ExecNodeConfig): Long = { +val hashJoinManagedMemory = + config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY).getBytes +// The memory used by SortMergeJoinIterator that buffer the matched rows, each side needs +// this memory if it is full outer join +val externalBufferMemory = + config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY).getBytes +// The memory used by BinaryExternalSorter for sort, the left and right side both need it +val sortMemory = config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes +var externalBufferNum = 1 +if (joinType eq FlinkJoinType.FULL) externalBufferNum = 2 +val sortMergeJoinManagedMemory = externalBufferMemory * externalBufferNum + sortMemory * 2 +// Due to hash join maybe fallback to sort merge join, so here managed memory choose the +// large one +Math.max(hashJoinManagedMemory, sortMergeJoinManagedMemory) + } + + def getJoinStrategyHint(relHints: ImmutableList[RelHint], joinStrategy: JoinStrategy): Boolean = { Review Comment: I prefer to rename this method to isJoinStrategyHintMatched. ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveBroadcastJoin.java: ## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.operators.AdaptiveBroadcastJoin; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.adaptive.AdaptiveBroadcastJoinOperatorGenerator; +import org.apache.flink.table.planner.codegen.CodeGeneratorContext; +import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.planner.plan.utils.JoinUtil; +import org.apache.flink.table.planner.plan.utils.OperatorType; +import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.operators.join.SortMergeJoinFunction; +import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveBroadcastJoinOperatorFactory; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.calcite.rex.RexNode; + +import java.io.IOException; +import java.util.List; +import java.util.stream.IntStream; + +/** {@link BatchExecNode} for Adaptive Broadcast Join. */ +public class BatchExecAdaptiveBroadcastJoin extends ExecNodeBase +implements BatchExecNode, SingleTransformationTranslator { + +private final FlinkJoinType joinType;