[PR] [FLINK-4602] Change RocksDBKeyedStateBackend to new package. [flink-benchmarks]

2024-10-29 Thread via GitHub


AlexYinHan opened a new pull request, #98:
URL: https://github.com/apache/flink-benchmarks/pull/98

   This resolves the compilation errors introduced by 
[FLINK-4602](https://github.com/apache/flink/pull/25543), which moves the 
rocksdb classes to o.a.f.state.rocksdb package package.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36271) Support json and jsonb type in PostgreSQL JDBC Dialect

2024-10-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36271:
---
Labels: pull-request-available  (was: )

> Support json and jsonb type in PostgreSQL JDBC Dialect
> --
>
> Key: FLINK-36271
> URL: https://issues.apache.org/jira/browse/FLINK-36271
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC, Table SQL / JDBC
>Reporter: Grzegorz Kołakowski
>Priority: Major
>  Labels: pull-request-available
>
> When using PostgreSQL JDBC Catalog an error is thrown if one of the tables 
> has column of type json or jsonb.
>  
> {noformat}
> java.lang.UnsupportedOperationException: Doesn't support Postgres type 
> 'jsonb' yet
> {noformat}
> Json/jsonb field can be returned as VARCHAR when reading the data.
> Writing values to json/jsonb column is not allowed in current design.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36245) Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated method/interface in Sink V2 in 2.0

2024-10-29 Thread LvYanquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893798#comment-17893798
 ] 

LvYanquan commented on FLINK-36245:
---

Hi, Piotr.
Kafka connector has not yet been adapted to the release of Flink 2.0-preview, 
so Kafka code cannot be directly used here.
I have added a build plugin in POM to not compile this class:
{code:java}

org.apache.maven.plugins
maven-compiler-plugin

   
   
  
org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java
   


   
  compile
  process-sources
  
 compile
  
  
 -Xlint:deprecation
 true
  
   

 {code}
And I tested the compilation of both the entire Flink project and individual 
flink-examples project using maven-3.8.6 and it passed. 
Perhaps you can check if this plugin is not working in your Maven environment. 

> Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated 
> method/interface in Sink V2 in 2.0
> ---
>
> Key: FLINK-36245
> URL: https://issues.apache.org/jira/browse/FLINK-36245
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Common
>Reporter: Qingsheng Ren
>Assignee: LvYanquan
>Priority: Major
>  Labels: 2.0-related, pull-request-available
> Fix For: 2.0-preview
>
>
> SourceFunction, SinkFunction and Sink V1 API has been marked as deprecated 
> and should be removed in Flink 2.0.
> Considering SourceFunction / SinkFunction are heavily used in test cases for 
> building a simple data generator or a data validator, it could be a huge 
> amount of work to rewrite all these usages with Source and Sink V2 API. A 
> viable path for 2.0-preview version would be:
>  * Move SourceFunction, SinkFunction to an internal package, as a test util
>  * Rewrite all Sink V1 implementations with Sink V2 directly (the usage of 
> Sink V1 is low in the main repo)
> As a long term working item, all usages of SourceFunction and SinkFunction 
> will be replaced by Source and Sink API. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36622) Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs.

2024-10-29 Thread Han Yin (Jira)
Han Yin created FLINK-36622:
---

 Summary: Remove the dependency of StateBenchmark on 
RocksDBKeyedStateBackend APIs.
 Key: FLINK-36622
 URL: https://issues.apache.org/jira/browse/FLINK-36622
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 2.0-preview
Reporter: Han Yin
 Fix For: 2.0.0


Currently, flink-benchmarks relies on non-public APIs in Flink. For example, in 
{_}+StateBackendBenchmarkUtils.java+{_}, the function _+compactState+_ takes 
RocksDBKeyedStateBackend as its first argument.

This requires explicit type conversion in flink-benchmark(from 
+_KeyedStateBackend_+ to {+}_RocksDBKeyedStateBackend_{+}). Moreover, this 
means that once the signature of +_RocksDBKeyedStateBackend_+ changes, we need 
to modify flink-benchmark correspondingly.

Therefore, we should avoid exposing non-public APIs in 
{_}+StateBackendBenchmarkUtils+{_}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36592][state/forst] Support file cache for ForStStateBackend [flink]

2024-10-29 Thread via GitHub


Zakelly commented on code in PR #25561:
URL: https://github.com/apache/flink/pull/25561#discussion_r1820533404


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java:
##
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs.cache;
+
+import org.apache.flink.core.fs.ByteBufferReadable;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
+
+/**
+ * A {@link FSDataInputStream} delegates requests to other one and supports 
reading data with {@link
+ * ByteBuffer}.
+ */
+public class CachedDataInputStream extends FSDataInputStream implements 
ByteBufferReadable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CachedDataInputStream.class);
+
+/** The reference to the cache entry. */
+private final FileCacheEntry cacheEntry;
+
+private volatile FSDataInputStream fsdis;
+
+private volatile StreamStatus streamStatus;
+
+/**
+ * The position of the cached stream, when cached stream is closed, the 
position is stored. When
+ * switch to original stream, the position is restored.
+ */
+private volatile long position;
+
+private final FSDataInputStream originalStream;
+
+private Semaphore semaphore;
+
+public CachedDataInputStream(
+FileCacheEntry cacheEntry,
+FSDataInputStream cacheStream,
+FSDataInputStream originalStream) {
+this.cacheEntry = cacheEntry;
+this.fsdis = cacheStream;
+this.originalStream = originalStream;
+this.streamStatus = StreamStatus.CACHED_OPEN;
+this.semaphore = new Semaphore(0);
+}
+
+private FSDataInputStream getStream() throws IOException {
+if (streamStatus == StreamStatus.CACHED_OPEN && cacheEntry.tryRetain() 
> 0) {
+return fsdis;
+} else if (streamStatus == StreamStatus.CACHED_CLOSED) {

Review Comment:
   ```suggestion
   } else if (streamStatus != StreamStatus.ORIGINAL) {
   ```
   Since the `streamStatus` might not yet updated



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java:
##
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs.cache;
+
+import org.apache.flink.core.fs.ByteBufferReadable;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
+
+/**
+ * A {@link FSDataInputStream} delegates requests to other one and supports 
reading data with {@link
+ * ByteBuffer}.
+ */
+public class CachedDataInputStream extends FSDataInputStream implements 
ByteBufferReadable {

Review Comment:
   Please offer some description about the muti-threading scenario?



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java:
##
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this w

Re: [PR] [FLINK-36455] Sinks retry synchronously [flink]

2024-10-29 Thread via GitHub


AHeise commented on PR #25547:
URL: https://github.com/apache/flink/pull/25547#issuecomment-2444290856

   Reverted the deprecation of numFailed and added a config option for the 
retries. PTAL @fapaul 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36493][TABLE API] Remove all deprecated methods in MapView [flink]

2024-10-29 Thread via GitHub


tinaselenge commented on PR #25566:
URL: https://github.com/apache/flink/pull/25566#issuecomment-2443607410

   Failing tests don't seem to be related as they pass locally for me. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in batch mode [flink]

2024-10-29 Thread via GitHub


dawidwys commented on code in PR #24699:
URL: https://github.com/apache/flink/pull/24699#discussion_r1820328447


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java:
##
@@ -567,7 +912,7 @@ void testUserDefinedFunctions() {
 "SELECT *\n"
 + "FROM MyTable\n"
 + "MATCH_RECOGNIZE (\n"
-+ "  ORDER BY proctime\n"
++ "  ORDER BY ts\n"

Review Comment:
   I agee with @grzegorz8 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-4602][State] Add constructors for o.a.f.contrib.streaming.state.EmbeddedRocksDBStateBackend to maintain interface compatibility. [flink]

2024-10-29 Thread via GitHub


AlexYinHan opened a new pull request, #25586:
URL: https://github.com/apache/flink/pull/25586

   
   
   
   
   ## What is the purpose of the change
   
   Add constructors for 
o.a.f.contrib.streaming.state.EmbeddedRocksDBStateBackend to maintain interface 
compatibility.
   
   
   
   ## Brief change log
   
   Add ```EmbeddedRocksDBStateBackend(boolean 
enableIncrementalCheckpointing)``` and  
```EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing)```
   
   ## Verifying this change
   
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats

2024-10-29 Thread zjjiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjjiang updated FLINK-36620:

Description: 
Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
--flink-home=$FLINK_HOME format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.

  was:
Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
"--flink-home=$FLINK_HOME" format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.


> Add support for the flink-home parameter to be set in both “--flink-home 
> $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
> 
>
> Key: FLINK-36620
> URL: https://issues.apache.org/jira/browse/FLINK-36620
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1
>Reporter: zjjiang
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Currently, most of FlinkCDC's command line arguments are supported in the 
> format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
> home, which only supports space spacing. Users who use the 
> --flink-home=$FLINK_HOME format on the command line (trying to be consistent 
> with the other = spacing arguments) will not be able to set flink home 
> correctly.
> In particular, when there is an environment variable $FLINK_HOME and you want 
> to override it by setting --flink-home=/path/to/new/flink/home, you will find 
> that it does not work.
> We would like to support the flink-home parameter in both --flink-home 
> $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
> formatting differences and runtime exceptions when using command line 
> arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36245) Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated method/interface in Sink V2 in 2.0

2024-10-29 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893764#comment-17893764
 ] 

Piotr Nowojski edited comment on FLINK-36245 at 10/29/24 10:39 AM:
---

Hi [~kunni], [~renqs] and [~leonard]. I think this change has made the build 
broken/unstable. Locally in the IntelliJ building Flink fails for me due to:

{code:java}
flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42
java: cannot access org.apache.flink.api.connector.sink2.StatefulSink
  class file for org.apache.flink.api.connector.sink2.StatefulSink not found
{code}
flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in 
turns is still referring to the StatefulSink:

{code:java}
public class KafkaSink implements StatefulSink, 
TwoPhaseCommittingSink (...)
{code}

 Maven builds might be working due to some dumb luck.

https://issues.apache.org/jira/browse/FLINK-36621



was (Author: pnowojski):
Hi [~kunni], [~renqs] and [~leonard]. I think this change has made the build 
broken/unstable. Locally in the IntelliJ building Flink fails for me due to:

{code:java}
flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42
java: cannot access org.apache.flink.api.connector.sink2.StatefulSink
  class file for org.apache.flink.api.connector.sink2.StatefulSink not found
{code}
flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in 
turns is still referring to the StatefulSink:

{code:java}
public class KafkaSink implements StatefulSink, 
TwoPhaseCommittingSink (...)
{code}

 Maven builds might be working due to some dumb luck.


> Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated 
> method/interface in Sink V2 in 2.0
> ---
>
> Key: FLINK-36245
> URL: https://issues.apache.org/jira/browse/FLINK-36245
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Common
>Reporter: Qingsheng Ren
>Assignee: LvYanquan
>Priority: Major
>  Labels: 2.0-related, pull-request-available
> Fix For: 2.0-preview
>
>
> SourceFunction, SinkFunction and Sink V1 API has been marked as deprecated 
> and should be removed in Flink 2.0.
> Considering SourceFunction / SinkFunction are heavily used in test cases for 
> building a simple data generator or a data validator, it could be a huge 
> amount of work to rewrite all these usages with Source and Sink V2 API. A 
> viable path for 2.0-preview version would be:
>  * Move SourceFunction, SinkFunction to an internal package, as a test util
>  * Rewrite all Sink V1 implementations with Sink V2 directly (the usage of 
> Sink V1 is low in the main repo)
> As a long term working item, all usages of SourceFunction and SinkFunction 
> will be replaced by Source and Sink API. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-4602][State] Add constructors for o.a.f.contrib.streaming.state.EmbeddedRocksDBStateBackend to maintain interface compatibility. [flink]

2024-10-29 Thread via GitHub


flinkbot commented on PR #25586:
URL: https://github.com/apache/flink/pull/25586#issuecomment-2443963990

   
   ## CI report:
   
   * 8a64c442363ad7af042796ee90a80e82f8fb47f2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-4602][State] Add constructors for o.a.f.contrib.streaming.state.EmbeddedRocksDBStateBackend to maintain interface compatibility. [flink]

2024-10-29 Thread via GitHub


AlexYinHan commented on PR #25586:
URL: https://github.com/apache/flink/pull/25586#issuecomment-2443980646

   @Zakelly This resolves the incompatibility of the constructors of 
EmbeddedRocksDBStateBackend. Can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-16851) Add common metrics to the SourceReader base implementation.

2024-10-29 Thread Poorvank Bhatia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893806#comment-17893806
 ] 

Poorvank Bhatia commented on FLINK-16851:
-

Hey [~becket_qin] , Is this still being worked upon? If not i can take this up. 
Happy to discuss :)

> Add common metrics to the SourceReader base implementation.
> ---
>
> Key: FLINK-16851
> URL: https://issues.apache.org/jira/browse/FLINK-16851
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Jiangjie Qin
>Priority: Major
>
> Add the metrics to the base SourceReader implementation. This is relevant to 
> [FLIP-33|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics?src=contextnavpagetreemode]].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36621) Build failure: StatefulSink not found

2024-10-29 Thread LvYanquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893812#comment-17893812
 ] 

LvYanquan commented on FLINK-36621:
---

This issue should be related to a bug 
https://youtrack.jetbrains.com/issue/IDEA-87868 in Idea. I will add 
instructions on this class to enable users to compile correctly through Maven.

> Build failure: StatefulSink not found
> -
>
> Key: FLINK-36621
> URL: https://issues.apache.org/jira/browse/FLINK-36621
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 2.0-preview
>Reporter: Piotr Nowojski
>Priority: Blocker
>
> Locally in the IntelliJ building Flink fails for me due to:
> {code:java}
> flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42
> java: cannot access org.apache.flink.api.connector.sink2.StatefulSink
>   class file for org.apache.flink.api.connector.sink2.StatefulSink not found
> {code}
> flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in 
> turns is still referring to the StatefulSink:
> {code:java}
> public class KafkaSink implements StatefulSink, 
> TwoPhaseCommittingSink (...)
> {code}
> which has been deleted in FLINK-36245. I think maven builds might be working 
> due to some luck and differences between how IntelliJ and Maven are 
> interpreting pom files and dealing with the dependencies.
> CC [~kunni] [~renqs] [~Leonard]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36622] Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs. [flink]

2024-10-29 Thread via GitHub


AlexYinHan opened a new pull request, #25587:
URL: https://github.com/apache/flink/pull/25587

   
   
   
   
   ## What is the purpose of the change
   
   Currently, flink-benchmarks relies on non-public APIs in Flink. For example, 
in StateBackendBenchmarkUtils.java, the function compactState takes 
RocksDBKeyedStateBackend as its first argument.
   
   This requires explicit type conversion in flink-benchmark(from 
KeyedStateBackend to RocksDBKeyedStateBackend). Moreover, this means that once 
the signature of RocksDBKeyedStateBackend changes, we need to modify 
flink-benchmark correspondingly.
   
   Therefore, we should avoid exposing non-public APIs in 
StateBackendBenchmarkUtils.
   
   
   ## Brief change log
   
   Change the 1st argument of `StateBackendBenchmarkUtils#compactState` from 
RocksDBKeyedStateBackend to KeyedStateBackend.
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36616] fix npe in GcpPublisherConfig [flink-connector-gcp-pubsub]

2024-10-29 Thread via GitHub


snuyanzin commented on code in PR #33:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/33#discussion_r1820768938


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/GcpPublisherConfig.java:
##
@@ -52,7 +52,11 @@ public CredentialsProvider getCredentialsProvider() {
 }
 
 public TransportChannelProvider getTransportChannelProvider() {
+if (transportChannelProvider == null) {
+return null;
+}
 return transportChannelProvider.getTransportChannelProvider();
+

Review Comment:
   
   ```suggestion
   ```
   nit: i guess we don't need this line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36621) Build failure: StatefulSink not found

2024-10-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36621:
---
Labels: pull-request-available  (was: )

> Build failure: StatefulSink not found
> -
>
> Key: FLINK-36621
> URL: https://issues.apache.org/jira/browse/FLINK-36621
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 2.0-preview
>Reporter: Piotr Nowojski
>Priority: Blocker
>  Labels: pull-request-available
>
> Locally in the IntelliJ building Flink fails for me due to:
> {code:java}
> flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42
> java: cannot access org.apache.flink.api.connector.sink2.StatefulSink
>   class file for org.apache.flink.api.connector.sink2.StatefulSink not found
> {code}
> flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in 
> turns is still referring to the StatefulSink:
> {code:java}
> public class KafkaSink implements StatefulSink, 
> TwoPhaseCommittingSink (...)
> {code}
> which has been deleted in FLINK-36245. I think maven builds might be working 
> due to some luck and differences between how IntelliJ and Maven are 
> interpreting pom files and dealing with the dependencies.
> CC [~kunni] [~renqs] [~Leonard]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36621) Build failure: StatefulSink not found

2024-10-29 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-36621:
--

 Summary: Build failure: StatefulSink not found
 Key: FLINK-36621
 URL: https://issues.apache.org/jira/browse/FLINK-36621
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Affects Versions: 2.0-preview
Reporter: Piotr Nowojski


Locally in the IntelliJ building Flink fails for me due to:

{code:java}
flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42
java: cannot access org.apache.flink.api.connector.sink2.StatefulSink
  class file for org.apache.flink.api.connector.sink2.StatefulSink not found
{code}
flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in 
turns is still referring to the StatefulSink:

{code:java}
public class KafkaSink implements StatefulSink, 
TwoPhaseCommittingSink (...)
{code}

which has been deleted in FLINK-36245. I think maven builds might be working 
due to some luck and differences between how IntelliJ and Maven are 
interpreting pom files and dealing with the dependencies.

CC [~kunni] [~renqs] [~Leonard]




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-4602] Change RocksDBKeyedStateBackend to new package. [flink-benchmarks]

2024-10-29 Thread via GitHub


AlexYinHan commented on PR #98:
URL: https://github.com/apache/flink-benchmarks/pull/98#issuecomment-2444005263

   @Zakelly This should resolve the compilation errors once [Flink 
PR-25586](https://github.com/apache/flink/pull/25586) is merged. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36621][flink-examples] Add comment in KafkaEventsGeneratorJob [flink]

2024-10-29 Thread via GitHub


flinkbot commented on PR #25588:
URL: https://github.com/apache/flink/pull/25588#issuecomment-2444190248

   
   ## CI report:
   
   * de83f7c5c6d6e57252dbf23a0677957e9164d3ac UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36622] Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs. [flink]

2024-10-29 Thread via GitHub


flinkbot commented on PR #25587:
URL: https://github.com/apache/flink/pull/25587#issuecomment-2444189745

   
   ## CI report:
   
   * fdcf5f44ec979958a0f65c27b2685f443e70152c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36621][flink-examples] Add comment in KafkaEventsGeneratorJob [flink]

2024-10-29 Thread via GitHub


lvyanquan opened a new pull request, #25588:
URL: https://github.com/apache/flink/pull/25588

   ## What is the purpose of the change
   
   Add comment to help developer avoiding compiling error in flink-examples 
module. 
   
   ## Brief change log
   
   comment
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no 
 - The S3 file system connector: no 
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36622) Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs.

2024-10-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36622:
---
Labels: pull-request-available  (was: )

> Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs.
> -
>
> Key: FLINK-36622
> URL: https://issues.apache.org/jira/browse/FLINK-36622
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 2.0-preview
>Reporter: Han Yin
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Currently, flink-benchmarks relies on non-public APIs in Flink. For example, 
> in {_}+StateBackendBenchmarkUtils.java+{_}, the function _+compactState+_ 
> takes RocksDBKeyedStateBackend as its first argument.
> This requires explicit type conversion in flink-benchmark(from 
> +_KeyedStateBackend_+ to {+}_RocksDBKeyedStateBackend_{+}). Moreover, this 
> means that once the signature of +_RocksDBKeyedStateBackend_+ changes, we 
> need to modify flink-benchmark correspondingly.
> Therefore, we should avoid exposing non-public APIs in 
> {_}+StateBackendBenchmarkUtils+{_}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32483) RescaleCheckpointManuallyITCase.testCheckpointRescalingOutKeyedState fails on AZP

2024-10-29 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-32483.
--
Resolution: Fixed

> RescaleCheckpointManuallyITCase.testCheckpointRescalingOutKeyedState fails on 
> AZP
> -
>
> Key: FLINK-32483
> URL: https://issues.apache.org/jira/browse/FLINK-32483
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.17.2, 1.20.0
>Reporter: Sergey Nuyanzin
>Assignee: David Morávek
>Priority: Major
>  Labels: auto-deprioritized-critical, test-stability
> Fix For: 2.0.0
>
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50397&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=0c010d0c-3dec-5bf1-d408-7b18988b1b2b&l=7495
>  fails with
> {noformat}
> Jun 26 06:08:57 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 21.041 s <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase
> Jun 26 06:08:57 [ERROR] 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingOutKeyedState
>   Time elapsed: 6.435 s  <<< FAILURE!
> Jun 26 06:08:57 java.lang.AssertionError: expected:<[(0,24000), (2,58500), 
> (0,34500), (0,45000), (3,43500), (2,18000), (1,6000), (1,16500), (0,28500), 
> (0,52500), (3,27000), (1,51000), (2,25500), (0,1500), (0,49500), (3,0), 
> (3,48000), (0,36000), (2,22500), (1,10500), (0,46500), (2,33000), (1,21000), 
> (0,9000), (0,57000), (3,31500), (2,19500), (1,7500), (1,55500), (3,42000), 
> (2,3), (0,54000), (2,40500), (1,4500), (3,15000), (2,3000), (1,39000), 
> (2,13500), (0,37500), (0,61500), (3,12000), (3,6)]> but was:<[(2,58500), 
> (0,34500), (0,45000), (3,43500), (2,18000), (1,16500), (0,52500), (3,27000), 
> (2,25500), (0,49500), (3,0), (3,48000), (0,36000), (2,22500), (1,21000), 
> (0,9000), (0,57000), (3,31500), (1,7500), (2,3), (0,54000), (2,40500), 
> (1,4500), (2,3000), (1,39000), (2,13500), (0,61500), (3,12000)]>
> Jun 26 06:08:57   at org.junit.Assert.fail(Assert.java:89)
> Jun 26 06:08:57   at org.junit.Assert.failNotEquals(Assert.java:835)
> Jun 26 06:08:57   at org.junit.Assert.assertEquals(Assert.java:120)
> Jun 26 06:08:57   at org.junit.Assert.assertEquals(Assert.java:146)
> Jun 26 06:08:57   at 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.restoreAndAssert(RescaleCheckpointManuallyITCase.java:219)
> Jun 26 06:08:57   at 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingKeyedState(RescaleCheckpointManuallyITCase.java:138)
> Jun 26 06:08:57   at 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingOutKeyedState(RescaleCheckpointManuallyITCase.java:116)
> Jun 26 06:08:57   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36271] Support reading json and jsonb types in PostgreSQL dialect [flink-connector-jdbc]

2024-10-29 Thread via GitHub


grzegorz8 commented on PR #141:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/141#issuecomment-2444020989

   > > @grzegorz8 Can You Give me, Some Example to Build This Code and Add Jar 
and use in pyflink code for Streaming Data to JSONB in PostgreSQL.
   > 
   > I'm sorry but there are still one issue with this change, namely, cratedb 
depends on postgres module and the following error is thrown in tests:
   > 
   > ```
   > Caused by: java.lang.NoClassDefFoundError: org/postgresql/util/PGobject
   >at 
org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialectConverter.lambda$createPrimitiveConverter$bd2b50a6$1(PostgresDialectConverter.java:99)
   >at 
org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.lambda$wrapIntoNullableInternalConverter$ff222c76$1(AbstractDialectConverter.java:127)
   >at 
org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialectConverter.lambda$createPostgresArrayConverter$477a3c4c$1(PostgresDialectConverter.java:87)
   >at 
org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.lambda$wrapIntoNullableInternalConverter$ff222c76$1(AbstractDialectConverter.java:127)
   >at 
org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.toInternal(AbstractDialectConverter.java:78)
   >at 
org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257)
   >at 
org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56)
   >at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:97)
   >at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:114)
   >at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
   >at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
   > Caused by: java.lang.ClassNotFoundException: org.postgresql.util.PGobject
   >at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
   >at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
   >at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
   >... 11 more
   > ```
   > 
   > However, the build will succeed if you skip tests. `mvn clean install 
-DskipTests`
   
   Hey @matriv! Since you are the author of CrateDB support 
(https://github.com/apache/flink-connector-jdbc/pull/29) maybe you can suggest 
me how to deal with the error shown above? Thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-21909) Unify API and implementation for Hive and Filesystem source connector

2024-10-29 Thread Poorvank Bhatia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893792#comment-17893792
 ] 

Poorvank Bhatia commented on FLINK-21909:
-

Hey [~jark] , If this is still unassigned can i take this up? Thanks :)

> Unify API and implementation for Hive and Filesystem source connector
> -
>
> Key: FLINK-21909
> URL: https://issues.apache.org/jira/browse/FLINK-21909
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem, Connectors / Hive
>Reporter: Jark Wu
>Priority: Major
>
> This should make Filesystem source connector have all the ability of Hive 
> source connector (including the watermark ability). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25921) Support different input parallelism for preCommit topology

2024-10-29 Thread Poorvank Bhatia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893796#comment-17893796
 ] 

Poorvank Bhatia commented on FLINK-25921:
-

Hey [~fpaul] , If this is still unassigned can i pick this up? Thanks :)

> Support different input parallelism for preCommit topology
> --
>
> Key: FLINK-25921
> URL: https://issues.apache.org/jira/browse/FLINK-25921
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Fabian Paul
>Priority: Major
>
> Currently, we assume that the pre-commit topology has the same parallelism as 
> the operator before when inserting the failover region. To support a 
> different parallelism we might need to insert a different identity map to 
> customize the mapping.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36613) RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState fails on AZP

2024-10-29 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893856#comment-17893856
 ] 

Piotr Nowojski commented on FLINK-36613:


{quote}
It is very likely that the reason is FLINK-36556 where was changed default 
value for STARTING_MEMORY_SEGMENT_SIZE and it impacts the test

BTW before changes from FLINK-36556 it is not reproduced
{quote}

I don't think that FLINK-36556 can be the culprit here. Buffer debloating is 
not enabled neither randomly in the test environment nor in the 
{{RescaleCheckpointManuallyITCase}} itself. It's also disabled by default, so 
changes in the FLINK-36556 shouldn't affect in any way this test. I would 
presume that test is still broken in some way. I've tried looking into it, but 
after a bit of digging I couldn't find what's actually wrong with it.

> RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState fails on 
> AZP
> 
>
> Key: FLINK-36613
> URL: https://issues.apache.org/jira/browse/FLINK-36613
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 2.0-preview
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> {{RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState}} fails 
> like
> {noformat}
> Oct 28 04:42:03 04:42:03.884 [ERROR] 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState
>  -- Time elapsed: 6.107 s <<< FAILURE!
> Oct 28 04:42:03 java.lang.AssertionError: expected:<[(0,24000), (1,22500), 
> (0,34500), (1,33000), (0,21000), (0,45000), (2,31500), (2,42000), (1,6000), 
> (0,28500), (0,52500), (2,15000), (1,3000), (1,51000), (0,1500), (0,49500), 
> (2,12000), (2,6), (0,36000), (1,10500), (1,58500), (0,46500), (0,9000), 
> (0,57000), (2,19500), (2,43500), (1,7500), (1,55500), (2,3), (1,18000), 
> (0,54000), (2,40500), (1,4500), (0,16500), (2,27000), (1,39000), (2,13500), 
> (1,25500), (0,37500), (0,61500), (2,0), (2,48000)]> but was:<[(1,22500), 
> (0,34500), (1,33000), (0,45000), (0,21000), (2,31500), (0,28500), (0,52500), 
> (2,15000), (1,3000), (1,51000), (0,49500), (0,1500), (1,58500), (1,10500), 
> (0,46500), (0,9000), (0,57000), (2,43500), (2,19500), (1,7500), (1,55500), 
> (2,40500), (1,4500), (0,16500), (2,27000), (1,39000), (2,13500), (1,25500), 
> (0,37500), (0,61500)]>
> Oct 28 04:42:03   at org.junit.Assert.fail(Assert.java:89)
> Oct 28 04:42:03   at org.junit.Assert.failNotEquals(Assert.java:835)
> Oct 28 04:42:03   at org.junit.Assert.assertEquals(Assert.java:120)
> Oct 28 04:42:03   at org.junit.Assert.assertEquals(Assert.java:146)
> Oct 28 04:42:03   at 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.restoreAndAssert(RescaleCheckpointManuallyITCase.java:216)
> Oct 28 04:42:03   at 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingKeyedState(RescaleCheckpointManuallyITCase.java:138)
> Oct 28 04:42:03   at 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState(RescaleCheckpointManuallyITCase.java:111)
> Oct 28 04:42:03   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> Oct 28 04:42:03   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> {noformat}
> for instance 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=63345&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba
> the issue is floating however could be reproduced locally after several 
> repetitions like
> {code}
> cd flink-tests
> for i in $(seq 1 100); do ../mvnw -Dtest=RescaleCheckpointManuallyITCase test 
> || break ; done
> {code}
> there is a similar issue FLINK-32483 however it seems fixed (not closed for 
> some reason and not related)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36410) Improve Lineage Info Collection for flink app

2024-10-29 Thread Zhenqiu Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenqiu Huang resolved FLINK-36410.
---
Resolution: Done

> Improve Lineage Info Collection for flink app
> -
>
> Key: FLINK-36410
> URL: https://issues.apache.org/jira/browse/FLINK-36410
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0
>Reporter: Zhenqiu Huang
>Priority: Minor
>
> We find that lineage interface adoption is not easy for each of Flink 
> connectors as every connect has its own release dependency and schedule. 
> Thus, to make the lineage integration incrementally used by user, we want to 
> change the lineage info collection not require both source and sink as 
> lineage provider implemented. So that Lineage reporter can has partial 
> lineage graph generated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36410) Improve Lineage Info Collection for flink app

2024-10-29 Thread Zhenqiu Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893955#comment-17893955
 ] 

Zhenqiu Huang commented on FLINK-36410:
---

PR is created and merged https://github.com/apache/flink/pull/25440

> Improve Lineage Info Collection for flink app
> -
>
> Key: FLINK-36410
> URL: https://issues.apache.org/jira/browse/FLINK-36410
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0
>Reporter: Zhenqiu Huang
>Priority: Minor
>
> We find that lineage interface adoption is not easy for each of Flink 
> connectors as every connect has its own release dependency and schedule. 
> Thus, to make the lineage integration incrementally used by user, we want to 
> change the lineage info collection not require both source and sink as 
> lineage provider implemented. So that Lineage reporter can has partial 
> lineage graph generated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36625) Add helper classes for Lineage integration in connectors

2024-10-29 Thread Zhenqiu Huang (Jira)
Zhenqiu Huang created FLINK-36625:
-

 Summary: Add helper classes for Lineage integration in connectors
 Key: FLINK-36625
 URL: https://issues.apache.org/jira/browse/FLINK-36625
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Zhenqiu Huang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36626) Flink SQL temporal JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Summary: Flink SQL temporal JOINs behavior change from Flink 1.15 to Flink 
1.18+  (was: Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+)

> Flink SQL temporal JOINs behavior change from Flink 1.15 to Flink 1.18+
> ---
>
> Key: FLINK-36626
> URL: https://issues.apache.org/jira/browse/FLINK-36626
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.18.1, 1.20.0
> Environment: AWS Managed Apache Flink 
>Reporter: Eduardo Breijo
>Priority: Critical
> Attachments: Flink-SQL-query.txt
>
>
> There is a behavior change I found when migrating to Flink 1.18+ from Flink 
> 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
> point and is causing the query below to output different results.
> *Flink SQL Query:*
> ~WITH assets_setpoint AS (~
>     ~SELECT~
>       ~asset_id,~
>       ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
>       ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
>       ~LAST_VALUE(`value`) AS `value`~
>     ~FROM asset_readings~
>     ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
>     ~ON metric.metric_id = asset_readings.metric_id~
>     ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
>     ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
>   ~)~
>   ~SELECT~
>   ~assets_supply_air_temp.`timestamp`,~
>   ~assets_supply_air_temp.asset_id,~
>   ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
>   ~FROM (~
>     ~SELECT asset_readings.`timestamp`,~
>     ~asset_readings.asset_id,~
>     ~asset_readings.`value` AS `value`~
>     ~FROM asset_readings~
>     ~-- Metrics temporal lookup inner join~
>     ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
>     ~ON metric.metric_id = asset_readings.metric_id~
>     ~-- Assets to ignore for this computed metric definition temporal lookup 
> left join~
>     ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
> AS OF `proctime`~
>     ~ON 
> asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id 
> = :computedMetricDefinitionId~
>     ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
> asset_readings.asset_id~
>     ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
>     ~-- Filter assets not present in the asset to ignore for this computed 
> metric definition table~
>     ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
>   ~) AS assets_supply_air_temp~
>   ~INNER JOIN assets_setpoint~
>   ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
>   ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
> assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~
> *Schema:*
> ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~
> ~|      name |                        type |  null | key |        extras |    
>       watermark |~
> ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~
> ~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
> SOURCE_WATERMARK() |~
> ~|  asset_id |                      BIGINT |  TRUE |     |               |    
>                 |~
> ~| metric_id |                         INT |  TRUE |     |               |    
>                 |~
> ~|     value |                      DOUBLE |  TRUE |     |               |    
>                 |~
> ~|  metadata |         MAP |  TRUE |     |               |    
>                 |~
> ~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |    
>                 |~
> ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~
> ~6 rows in set~
> ~++~
> ~|                                     table name |~
> ~++~
> ~|                                 asset_readings |~
> ~|              asset_relationship_parent_to_unit |~
> ~| asset_to_ignore_per_computed_metric_definition |~
> ~|                                         metric |~
> ~++~
> Results:
>  * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
> assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
> assets_setpoint is computed correctly for 

Re: [PR] [FLINK-35444][pipeline-connector][paimon] Paimon Pipeline Connector support changing column names to lowercase for Hive metastore [flink-cdc]

2024-10-29 Thread via GitHub


github-actions[bot] commented on PR #3569:
URL: https://github.com/apache/flink-cdc/pull/3569#issuecomment-2445540069

   This pull request has been automatically marked as stale because it has not 
had recent activity for 60 days. It will be closed in 30 days if no further 
activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Refactored Identifiers [flink-cdc]

2024-10-29 Thread via GitHub


github-actions[bot] commented on PR #3501:
URL: https://github.com/apache/flink-cdc/pull/3501#issuecomment-2445540164

   This pull request has been closed because it has not had recent activity. 
You could reopen it if you try to continue your work, and anyone who are 
interested in it are encouraged to continue work on this pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35599][cdc-connector][jdbc-mysql]Flink cdc pipeline sink jdbc mysql [flink-cdc]

2024-10-29 Thread via GitHub


github-actions[bot] commented on PR #3433:
URL: https://github.com/apache/flink-cdc/pull/3433#issuecomment-2445540230

   This pull request has been automatically marked as stale because it has not 
had recent activity for 60 days. It will be closed in 30 days if no further 
activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36626) Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Description: 
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
point and is causing the query below to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~
~6 rows in set~
~++~
~|                                     table name |~
~++~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~++~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+ this difference always results in 0

 

I have tried updating the query using different formats but I have not found a 
workaround and I don't know why this is happening. Attached you will find a 
file with the different SQL formats I have tried with no luck.

Any help would be appreciated

 

  was:
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
point and is causing the query to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`t

[jira] [Updated] (FLINK-36626) Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Description: 
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL joins that I haven't been able to pin point and is 
causing the query to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~+---+-+---+-+---++~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~+---+-+---+-+---++~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~+---+-+---+-+---++~
~6 rows in set~
~++~
~|                                     table name |~
~++~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~++~



Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+ this difference always results in 0

 

I have tried updating the query using different formats but I have not found a 
workaround and I don't know why this is happening. Attached you will find a 
file with the different SQL formats I have tried with no luck.

Any help would be appreciated

 

  was:
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL joins that I haven't been able to pin point and is 
causing the query to output different results.

Flink SQL Query:

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE me

[jira] [Assigned] (FLINK-36547) Add option to retain `RowKind` sematics after serialization/deserialization for cdc formats

2024-10-29 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-36547:
--

Assignee: Yubin Li

> Add option to retain `RowKind` sematics after serialization/deserialization 
> for cdc formats
> ---
>
> Key: FLINK-36547
> URL: https://issues.apache.org/jira/browse/FLINK-36547
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 2.0.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
> Attachments: image-2024-10-16-11-01-54-790.png, 
> image-2024-10-16-11-02-34-406.png
>
>
> As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> 
> +I
> {code:java}
> Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
> as Debezium JSON or Avro messages, and emit to external systems like Kafka. 
> However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
> single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and 
> UDPATE_AFTER as DELETE and INSERT Debezium messages. {code}
> In fact, we also have a demand to make the `RowKind` sematics consistent in 
> many scenarios, such as those that require different processing of -U/-D and 
> +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
> UPDATE_AFTER to implement the feature and made it run well in bussiness.
> {code:java}
> create table datagen1 (id int, name string) with 
> ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
> 'fields.id.max'='2');
> // add 'debezium-json.retain.rowkind' = 'true'
> create table t2 (id int, name string, num bigint) WITH (
>     'topic' = 't2', 
>     'connector' = 'kafka', 
>     'properties.bootstrap.servers' = 'xx', 
>     'properties.group.id' = 'test', 
>     'scan.startup.mode' = 'earliest-offset', 
>     'format' = 'debezium-json',
>     'key.format' = 'json',
>     'key.fields' = 'id',
>     'debezium-json.timestamp-format.standard' = 'ISO-8601', 
>     'debezium-json.schema-include' = 'false',
> 'debezium-json.retain.rowkind' = 'true'
> );
> insert into t2 select id, max(name) as name, count(1) as num from datagen1 
> group by id;
> insert into print1 select * from t2;
>  {code}
> output result:
> !image-2024-10-16-11-02-34-406.png|width=660,height=153!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36626) Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Description: 
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
point and is causing the query to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~{+}--{-}{-}{+}---{-}++{-}-{-}{-}---{-}++{-}-{-}{-}---+~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~{+}--{-}{-}{+}---{-}++{-}-{-}{-}---{-}++{-}-{-}{-}---+~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~{+}--{-}{-}{+}---{-}++{-}-{-}{-}---{-}++{-}-{-}{-}---+~
~6 rows in set~
~++~
~|                                     table name |~
~++~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~++~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+ this difference always results in 0

 

I have tried updating the query using different formats but I have not found a 
workaround and I don't know why this is happening. Attached you will find a 
file with the different SQL formats I have tried with no luck.

Any help would be appreciated

 

  was:
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL joins that I haven't been able to pin point and is 
causing the query to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS 

Re: [PR] [hotfix] Replace System.out.println with logger for better log management [flink-connector-gcp-pubsub]

2024-10-29 Thread via GitHub


caicancai commented on code in PR #31:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/31#discussion_r1821701027


##
flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java:
##
@@ -21,11 +21,14 @@
 import com.google.protobuf.ByteString;
 import com.google.pubsub.v1.PubsubMessage;
 import com.google.pubsub.v1.TopicName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
 
 /** Helper class to send PubSubMessages to a PubSub topic. */
 class PubSubPublisher {
+private static final Logger LOG = 
LoggerFactory.getLogger(PubSubExample.class);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Description: 
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
point and is causing the query below to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~{+}---{-}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~{+}---{-}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~{+}---{-}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~
~6 rows in set~
~++~
~|                                     table name |~
~++~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~++~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+ this difference always results in 0
 * Updating the query to use regular join against the metric table (removing 
{~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to output the correct 
value but I don't think regular joins is what I need in this case.

 

I have tried updating the query using different formats with temporal joins but 
I have not found a workaround and I don't know why this is happening. Attached 
you will find a file with the different SQL formats I have tried with no luck.

Any 

[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Description: 
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
point and is causing the query below to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~{+}{-}{{-}}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}--{-}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~{+}{-}{{-}}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}--{-}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~{+}{-}{{-}}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{+}{{+}}{{-}}{-}--{-}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~
~6 rows in set~
~++~
~|                                     table name |~
~++~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~++~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+ this difference always results in 0
 * Updating the query to use regular join against the metric table (removing 
{~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to output the correct 
value

 

I have tried updating the query using different formats but I have not found a 
workaround and I don't know why this is happening. Attached you will find a 
file with the different SQL formats I have tried with no luck.

Any help would be appreciated

 

  was:
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal j

[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Description: 
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
point and is causing the query below to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}--\{+}~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}--\{+}~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}--\{+}~
~6 rows in set~
~++~
~|                                     table name |~
~++~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~++~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+, for the same query, this difference always results in 0
 * On Flink 1.18+, updating the query to use regular join against the metric 
table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to 
output the correct value but I don't think regular joins is what I need in this 
case.

 

I have tried updating the query using different formats w

[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Description: 
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
point and is causing the query below to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~{+}--{-}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---\{+}~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~{+}--{-}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---\{+}~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~{+}--{-}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}---\{+}~
~6 rows in set~
~++~
~|                                     table name |~
~++~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~++~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+ this difference always results in 0
 * On Flink 1.18+, updating the query to use regular join against the metric 
table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to 
output the correct value but I don't think regular joins is what I need in this 
case.

 

I have tried updating the query using different formats with temporal joins but 
I have not found a workaround and I don't know why this is happening.

Re: [PR] [FLINK-36011] [runtime] Generalize RescaleManager to become StateTransitionManager [flink]

2024-10-29 Thread via GitHub


rkhachatryan commented on code in PR #25280:
URL: https://github.com/apache/flink/pull/25280#discussion_r1821594247


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java:
##
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.Temporal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.Supplier;
+
+/**
+ * {@code DefaultStateTransitionManager} is a state machine which manages the 
{@link
+ * AdaptiveScheduler}'s state transitions based on the previous transition 
time and the available
+ * resources. See {@link Phase} for details on each individual phase of this 
state machine. Note: We
+ * use the term phase here to avoid confusion with the state used in the 
{@link AdaptiveScheduler}.
+ *
+ * 
+ * {@link Cooldown}
+ *   |
+ *   +--> {@link Idling}
+ *   |  |
+ *   |  V
+ *   +--> {@link Stabilizing}
+ *  |
+ *  +--> {@link Stabilized} --> {@link Idling}
+ *  |  |
+ *  |  V
+ *  \--> {@link Transitioning}
+ * 
+ *
+ * Thread-safety: This class is not implemented in a thread-safe manner and 
relies on the fact
+ * that any method call happens within a single thread.
+ *
+ * @see Executing
+ */
+@NotThreadSafe
+public class DefaultStateTransitionManager implements StateTransitionManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStateTransitionManager.class);
+
+private final Supplier clock;
+private final StateTransitionManager.Context transitionContext;
+private Phase phase;
+private final List> scheduledFutures;
+
+@VisibleForTesting final Duration cooldownTimeout;
+@Nullable @VisibleForTesting final Duration resourceStabilizationTimeout;
+@VisibleForTesting final Duration maxTriggerDelay;
+
+DefaultStateTransitionManager(
+Temporal initializationTime,
+StateTransitionManager.Context transitionContext,
+Duration cooldownTimeout,
+@Nullable Duration resourceStabilizationTimeout,
+Duration maxTriggerDelay) {
+this(
+initializationTime,
+Instant::now,
+transitionContext,
+cooldownTimeout,
+resourceStabilizationTimeout,
+maxTriggerDelay);
+}
+
+@VisibleForTesting
+DefaultStateTransitionManager(
+Temporal initializationTime,
+Supplier clock,
+StateTransitionManager.Context transitionContext,
+Duration cooldownTimeout,
+@Nullable Duration resourceStabilizationTimeout,
+Duration maxTriggerDelay) {
+
+this.clock = clock;
+this.maxTriggerDelay = maxTriggerDelay;
+this.cooldownTimeout = cooldownTimeout;
+this.resourceStabilizationTimeout = resourceStabilizationTimeout;
+this.transitionContext = transitionContext;
+this.scheduledFutures = new ArrayList<>();
+this.phase = new Cooldown(initializationTime, clock, this, 
cooldownTimeout);
+}
+
+@Override
+public void onChange() {
+phase.onChange();
+}
+
+@Override
+public void onTrigger() {
+phase.onTrigger();
+}
+
+@Override
+public void close() {
+scheduledFutures.forEach(future -> future.cancel(true));
+scheduledFutures.clear();
+}
+
+@VisibleForTesting
+Phase getPhase() {
+return phase;
+}
+
+private void progressToIdling() {
+progressToPhase(new Idling(clock, this));
+}
+
+private void progressToStabilizing(Temporal firstChangeEventTimestamp) {
+progressToPhase(
+new Stabilizing(
+clock

[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Description: 
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
point and is causing the query below to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~
~6 rows in set~
~++~
~|                                     table name |~
~++~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~++~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+, for the same query, this difference always results in 0
 * On Flink 1.18+, updating the query to use regular join against the metric 
lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query 
to out

[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Description: 
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal lookup joins that I haven't been able to 
pin point and is causing the query below to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~{+}--{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}\{+}~
~6 rows in set~
~++~
~|                                     table name |~
~++~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~++~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+, for the same query, this difference always results in 0
 * On Flink 1.18+, updating the query to use regular join against the metric 
lookup table (removing {~}FOR SYSTEM_

[jira] [Created] (FLINK-36627) Failure to process a CSV file in Flink due to a character encoding mismatch: the file is in ISO-8859 and the application expects UTF-8.

2024-10-29 Thread Hector Miuler Malpica Gallegos (Jira)
Hector Miuler Malpica Gallegos created FLINK-36627:
--

 Summary: Failure to process a CSV file in Flink due to a character 
encoding mismatch: the file is in ISO-8859 and the application expects UTF-8.
 Key: FLINK-36627
 URL: https://issues.apache.org/jira/browse/FLINK-36627
 Project: Flink
  Issue Type: Bug
Reporter: Hector Miuler Malpica Gallegos


I have error in read csv with charset ISO-8859, my error is the following:



{{{color:#de350b}_Caused by: java.io.CharConversionException: Invalid UTF-8 
middle byte 0x41 (at char #1247, byte #1246): check content encoding, does not 
look like UTF-8_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportInvalidOther(UTF8Reader.java:520)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportDeferredInvalid(UTF8Reader.java:531)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.read(UTF8Reader.java:177)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.loadMore(CsvDecoder.java:458)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder._nextUnquotedString(CsvDecoder.java:782)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.nextString(CsvDecoder.java:732)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:963)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:763)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:321)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:283)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.next(MappingIterator.java:199)_{color}}}
{{{color:#de350b}    _... 11 more_{color}}}



{{My code is the following:}}

{{{}{color:#0747a6}_val env = 
StreamExecutionEnvironment.createLocalEnvironment()_{color}{}}}{{{}{color:#0747a6}_val
 csvFormat = CsvReaderFormat.forPojo(Empresa::class.java)_{color}{}}}
{{{color:#0747a6}_val csvSource = FileSource_{color}}}
{{{color:#0747a6}_.forRecordStreamFormat(csvFormat, 
Path("/miuler/PadronRUC_202410.csv"))_{color}}}
{{{color:#0747a6}_.build()_{color}}}
{{val empresaStreamSource = env.fromSource(csvSource, 
WatermarkStrategy.noWatermarks(), "CSV Source")}}
{{empresaStreamSource.print()}}
{{env.execute("Load CSV")}}


my dependencies:



{{{color:#0747a6}val kotlinVersion = "1.20.0"{color}}}
{{{color:#0747a6}// FLINK{color}}}
{{{color:#0747a6}dependencies {{color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-shaded-jackson:2.15.3-19.0"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-core:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-runtime:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-runtime-web:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-clients:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-streaming-java:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-csv:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-connector-base:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-connector-files:$kotlinVersion"){color}}}
{{{color:#0747a6}}{color}}}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36627) Failure to process a CSV file in Flink due to a character encoding mismatch: the file is in ISO-8859 and the application expects UTF-8.

2024-10-29 Thread Hector Miuler Malpica Gallegos (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Miuler Malpica Gallegos updated FLINK-36627:
---
Description: 
I have error in read csv with charset ISO-8859, my error is the following:

{{{color:#de350b}_Caused by: java.io.CharConversionException: Invalid UTF-8 
middle byte 0x41 (at char #1247, byte #1246): check content encoding, does not 
look like UTF-8_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportInvalidOther(UTF8Reader.java:520)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportDeferredInvalid(UTF8Reader.java:531)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.read(UTF8Reader.java:177)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.loadMore(CsvDecoder.java:458)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder._nextUnquotedString(CsvDecoder.java:782)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.nextString(CsvDecoder.java:732)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:963)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:763)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:321)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:283)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.next(MappingIterator.java:199)_{color}}}
{{{color:#de350b}    _... 11 more_{color}}}

 

 

{{My code is the following:}}

{color:#0747a6}_{{{}val env = 
StreamExecutionEnvironment.createLocalEnvironment(){}}}{{{}val csvFormat = 
CsvReaderFormat.forPojo(Empresa::class.java){}}}_{color}
{color:#0747a6}_{{val csvSource = FileSource}}_{color}
{color:#0747a6}_{{.forRecordStreamFormat(csvFormat, 
Path("/miuler/PadronRUC_202410.csv"))}}_{color}
{color:#0747a6}_{{.build()}}_{color}
{color:#0747a6}_{{val empresaStreamSource = env.fromSource(csvSource, 
WatermarkStrategy.noWatermarks(), "CSV Source")}}_{color}
{color:#0747a6}_{{empresaStreamSource.print()}}_{color}
{color:#0747a6}_{{env.execute("Load CSV")}}_{color}

 

 

My dependencies:

_{color:#0747a6}{{val kotlinVersion = "1.20.0"}}{color}_
_{color:#0747a6}{{dependencies {}}{color}_
 
_{color:#0747a6}{{implementation("org.apache.flink:flink-shaded-jackson:2.15.3-19.0")}}{color}_
 
_{color:#0747a6}{{implementation("org.apache.flink:flink-core:$kotlinVersion")}}{color}_
 
_{color:#0747a6}{{implementation("org.apache.flink:flink-runtime:$kotlinVersion")}}{color}_
 
_{color:#0747a6}{{implementation("org.apache.flink:flink-runtime-web:$kotlinVersion")}}{color}_
 
_{color:#0747a6}{{implementation("org.apache.flink:flink-clients:$kotlinVersion")}}{color}_
 
_{color:#0747a6}{{implementation("org.apache.flink:flink-streaming-java:$kotlinVersion")}}{color}_
 
_{color:#0747a6}{{implementation("org.apache.flink:flink-csv:$kotlinVersion")}}{color}_
 
_{color:#0747a6}{{implementation("org.apache.flink:flink-connector-base:$kotlinVersion")}}{color}_
 
_{color:#0747a6}{{implementation("org.apache.flink:flink-connector-files:$kotlinVersion")}}{color}_
_{color:#0747a6}}{color}_
 

  was:
I have error in read csv with charset ISO-8859, my error is the following:



{{{color:#de350b}_Caused by: java.io.CharConversionException: Invalid UTF-8 
middle byte 0x41 (at char #1247, byte #1246): check content encoding, does not 
look like UTF-8_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportInvalidOther(UTF8Reader.java:520)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportDeferredInvalid(UTF8Reader.java:531)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.read(UTF8Reader.java:177)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.loadMore(CsvDecoder.java:458)_{color}}}
{{{color:#de350b}

[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Summary: Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to 
Flink 1.18+  (was: Flink SQL temporal JOINs behavior change from Flink 1.15 to 
Flink 1.18+)

> Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+
> --
>
> Key: FLINK-36626
> URL: https://issues.apache.org/jira/browse/FLINK-36626
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.18.1, 1.20.0
> Environment: AWS Managed Apache Flink 
>Reporter: Eduardo Breijo
>Priority: Critical
> Attachments: Flink-SQL-query.txt
>
>
> There is a behavior change I found when migrating to Flink 1.18+ from Flink 
> 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
> point and is causing the query below to output different results.
> *Flink SQL Query:*
> ~WITH assets_setpoint AS (~
>     ~SELECT~
>       ~asset_id,~
>       ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
>       ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
>       ~LAST_VALUE(`value`) AS `value`~
>     ~FROM asset_readings~
>     ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
>     ~ON metric.metric_id = asset_readings.metric_id~
>     ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
>     ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
>   ~)~
>   ~SELECT~
>   ~assets_supply_air_temp.`timestamp`,~
>   ~assets_supply_air_temp.asset_id,~
>   ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
>   ~FROM (~
>     ~SELECT asset_readings.`timestamp`,~
>     ~asset_readings.asset_id,~
>     ~asset_readings.`value` AS `value`~
>     ~FROM asset_readings~
>     ~-- Metrics temporal lookup inner join~
>     ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
>     ~ON metric.metric_id = asset_readings.metric_id~
>     ~-- Assets to ignore for this computed metric definition temporal lookup 
> left join~
>     ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
> AS OF `proctime`~
>     ~ON 
> asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id 
> = :computedMetricDefinitionId~
>     ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
> asset_readings.asset_id~
>     ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
>     ~-- Filter assets not present in the asset to ignore for this computed 
> metric definition table~
>     ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
>   ~) AS assets_supply_air_temp~
>   ~INNER JOIN assets_setpoint~
>   ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
>   ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
> assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~
> *Schema:*
> ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~
> ~|      name |                        type |  null | key |        extras |    
>       watermark |~
> ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~
> ~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
> SOURCE_WATERMARK() |~
> ~|  asset_id |                      BIGINT |  TRUE |     |               |    
>                 |~
> ~| metric_id |                         INT |  TRUE |     |               |    
>                 |~
> ~|     value |                      DOUBLE |  TRUE |     |               |    
>                 |~
> ~|  metadata |         MAP |  TRUE |     |               |    
>                 |~
> ~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |    
>                 |~
> ~{+}-{-}{{-}}{-}{-}{+}-{-}{{-}}+{+}{{-}}{-}---{-}{{-}}{{-}}{-}-{-}{{-}}{+}{+}{{-}}{-}---{-}{{-}}{{-}}{-}--{+}~
> ~6 rows in set~
> ~++~
> ~|                                     table name |~
> ~++~
> ~|                                 asset_readings |~
> ~|              asset_relationship_parent_to_unit |~
> ~| asset_to_ignore_per_computed_metric_definition |~
> ~|                                         metric |~
> ~++~
> Results:
>  * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
> assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
> assets_setp

[jira] [Updated] (FLINK-36626) Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
---
Description: 
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
point and is causing the query below to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-\{+}~
~6 rows in set~
~++~
~|                                     table name |~
~++~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~++~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+, for the same query, this difference always results in 0
 * On Flink 1.18+, updating the query to use regular join against the metric 
lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query 
to output the correct value but I don't think regular joins is what I need in 

[jira] [Created] (FLINK-36626) Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)
Eduardo Breijo created FLINK-36626:
--

 Summary: Flink SQL JOINs behavior change from Flink 1.15 to Flink 
1.18+
 Key: FLINK-36626
 URL: https://issues.apache.org/jira/browse/FLINK-36626
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.20.0, 1.18.1
 Environment: AWS Managed Apache Flink 
Reporter: Eduardo Breijo
 Attachments: Flink-SQL-query.txt

There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL joins that I haven't been able to pin point and is 
causing the query to output different results.

Flink SQL Query:

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+ this difference always results in 0

 

I have tried updating the query using different formats but I have not found a 
workaround and I don't know why this is happening. Attached you will find a 
file with the different SQL formats I have tried with no luck.

Any help would be appreciated

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36161][docs]Update Integration Test Example with Sink API. [flink]

2024-10-29 Thread via GitHub


RanJinh opened a new pull request, #25590:
URL: https://github.com/apache/flink/pull/25590

   ## What is the purpose of the change
   
   To update the `Testing Flink Jobs` Docs example from deprecated 
`SinkFunction` to `Sink`.
   
   
   ## Brief change log
   
 - Using `Sink` instead of `SinkFunction`, since `SinkFunction` is 
deprecated in flink 1.20.
   
   
   ## Verifying this change
   
 - Document address: 
   - 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/#testing-flink-jobs
   - 
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/testing/#%e6%b5%8b%e8%af%95-flink-%e4%bd%9c%e4%b8%9a
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-4602][State] Add constructors for o.a.f.contrib.streaming.state.EmbeddedRocksDBStateBackend to maintain interface compatibility. [flink]

2024-10-29 Thread via GitHub


Zakelly merged PR #25586:
URL: https://github.com/apache/flink/pull/25586


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-4602) Move RocksDB backend to proper package

2024-10-29 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-4602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893675#comment-17893675
 ] 

Zakelly Lan edited comment on FLINK-4602 at 10/30/24 2:32 AM:
--

Merge 316daca and 255ca52 into master


was (Author: zakelly):
Merge 316daca into master

> Move RocksDB backend to proper package
> --
>
> Key: FLINK-4602
> URL: https://issues.apache.org/jira/browse/FLINK-4602
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / State Backends
>Reporter: Aljoscha Krettek
>Assignee: Han Yin
>Priority: Major
>  Labels: 2.0-related, auto-unassigned, pull-request-available
> Fix For: 2.0.0
>
>
> Right now the package is {{org.apache.flink.contrib.streaming.state}}, it 
> should probably be in {{org.apache.flink.runtime.state.rocksdb}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36621) Build failure: StatefulSink not found

2024-10-29 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-36621:
--

Assignee: LvYanquan

> Build failure: StatefulSink not found
> -
>
> Key: FLINK-36621
> URL: https://issues.apache.org/jira/browse/FLINK-36621
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 2.0-preview
>Reporter: Piotr Nowojski
>Assignee: LvYanquan
>Priority: Blocker
>  Labels: pull-request-available
>
> Locally in the IntelliJ building Flink fails for me due to:
> {code:java}
> flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42
> java: cannot access org.apache.flink.api.connector.sink2.StatefulSink
>   class file for org.apache.flink.api.connector.sink2.StatefulSink not found
> {code}
> flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in 
> turns is still referring to the StatefulSink:
> {code:java}
> public class KafkaSink implements StatefulSink, 
> TwoPhaseCommittingSink (...)
> {code}
> which has been deleted in FLINK-36245. I think maven builds might be working 
> due to some luck and differences between how IntelliJ and Maven are 
> interpreting pom files and dealing with the dependencies.
> CC [~kunni] [~renqs] [~Leonard]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36624) Log JobID in SourceCoordinator

2024-10-29 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-36624:
--

Assignee: Piotr Nowojski

> Log JobID in SourceCoordinator
> --
>
> Key: FLINK-36624
> URL: https://issues.apache.org/jira/browse/FLINK-36624
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>
> Currently log entries from the SourceCoordinator are not tagged with the 
> JobID, which could be quite easily done.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36624) Log JobID in SourceCoordinator

2024-10-29 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-36624:
--

 Summary: Log JobID in SourceCoordinator
 Key: FLINK-36624
 URL: https://issues.apache.org/jira/browse/FLINK-36624
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Piotr Nowojski


Currently log entries from the SourceCoordinator are not tagged with the JobID, 
which could be quite easily done.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]

2024-10-29 Thread via GitHub


HuangZhenQiu commented on code in PR #130:
URL: 
https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1821361818


##
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java:
##
@@ -0,0 +1,56 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link LineageUtil}. */
+public class LineageUtilTest {
+@Test
+public void testSourceLineageVertexOf() {
+LineageDataset dataset = Mockito.mock(LineageDataset.class);

Review Comment:
   As called out by @AHeise, we need to move out from Mockito with testing 
classes. I am thinking. I should probably add these helper test classes in 
flink-core rather than implement in each of connector. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36077][Connectors/Google PubSub] Implement table api support for SinkV2 [flink-connector-gcp-pubsub]

2024-10-29 Thread via GitHub


vahmed-hamdy commented on PR #30:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/30#issuecomment-2444821498

   @snuyanzin thanks for the review, I refactored the factory tests and removed 
unnecessary public access modifiers, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-16851) Add common metrics to the SourceReader base implementation.

2024-10-29 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893911#comment-17893911
 ] 

Jiangjie Qin commented on FLINK-16851:
--

I think most of the metrics have been added over time. But it would be good to 
do a check and see if any common metric is missing. If there is no metric 
missing, we can just close this ticket.

> Add common metrics to the SourceReader base implementation.
> ---
>
> Key: FLINK-16851
> URL: https://issues.apache.org/jira/browse/FLINK-16851
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Jiangjie Qin
>Priority: Major
>
> Add the metrics to the base SourceReader implementation. This is relevant to 
> [FLIP-33|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics?src=contextnavpagetreemode]].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats

2024-10-29 Thread zjjiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjjiang updated FLINK-36620:

Description: 
Currently, most of FlinkCDC's command line arguments are supported in the 
format `--$KEY $VALUE`  or `--$KEY=$VALUE`, e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
`--flink-home=$FLINK_HOME` format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.

  was:
Currently, most of FlinkCDC's command line arguments are supported in the 
format `--$KEY $VALUE` or `--$KEY=$VALUE`, e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
`--flink-home=$FLINK_HOME` format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.


> Add support for the flink-home parameter to be set in both “--flink-home 
> $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
> 
>
> Key: FLINK-36620
> URL: https://issues.apache.org/jira/browse/FLINK-36620
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1
>Reporter: zjjiang
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Currently, most of FlinkCDC's command line arguments are supported in the 
> format `--$KEY $VALUE`  or `--$KEY=$VALUE`, e.g. --jar, but, except for flink 
> home, which only supports space spacing. Users who use the 
> `--flink-home=$FLINK_HOME` format on the command line (trying to be 
> consistent with the other = spacing arguments) will not be able to set flink 
> home correctly.
> In particular, when there is an environment variable $FLINK_HOME and you want 
> to override it by setting --flink-home=/path/to/new/flink/home, you will find 
> that it does not work.
> We would like to support the flink-home parameter in both --flink-home 
> $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
> formatting differences and runtime exceptions when using command line 
> arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-27632) Improve connector testing framework to support more cases

2024-10-29 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-27632:
-

Assignee: Poorvank Bhatia

> Improve connector testing framework to support more cases
> -
>
> Key: FLINK-27632
> URL: https://issues.apache.org/jira/browse/FLINK-27632
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.16.0
>Reporter: Qingsheng Ren
>Assignee: Poorvank Bhatia
>Priority: Major
> Fix For: 2.0.0
>
>
> In order to make connector testing framework available for more connectors, 
> including Table /SQL connectors, more test cases are required to cover more 
> scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36011][runtime] Improved logging in StateTransitionManager [flink]

2024-10-29 Thread via GitHub


flinkbot commented on PR #25589:
URL: https://github.com/apache/flink/pull/25589#issuecomment-2444516868

   
   ## CI report:
   
   * b3233bfef81e4faf868f0bf92bcf742594e7e5a8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36011][runtime] Improved logging in StateTransitionManager [flink]

2024-10-29 Thread via GitHub


ztison opened a new pull request, #25589:
URL: https://github.com/apache/flink/pull/25589

   ## What is the purpose of the change
   
   This PR fixes the log-level regression introduced in 
https://github.com/apache/flink/pull/25280#discussion_r1818949028
   
   
   ## Brief change log
   
   *(for example:)*
 - The log level reverted back to the INFO in the StateTransitionManager
 - Added a few more log lines  in the StateTransitionManager
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in batch mode [flink]

2024-10-29 Thread via GitHub


dawidwys commented on code in PR #24699:
URL: https://github.com/apache/flink/pull/24699#discussion_r1820335891


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java:
##
@@ -51,7 +71,95 @@ public BatchExecMatch(
 }
 
 @Override
-public boolean isProcTime(RowType inputRowType) {
-return true;
+public void checkOrderKeys(RowType inputRowType) {
+SortSpec orderKeys = matchSpec.getOrderKeys();
+if (orderKeys.getFieldSize() == 0) {
+throw new TableException("You must specify non-empty order by.");
+}
+
+SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0);
+int timeOrderFieldIdx = timeOrderField.getFieldIndex();
+LogicalType timeOrderFieldType = 
inputRowType.getTypeAt(timeOrderFieldIdx);
+
+if (!TypeCheckUtils.isTimePoint(timeOrderFieldType)) {
+throw new TableException("You must specify time point for order by 
as the first one.");
+}
+
+// time ordering needs to be ascending
+if (!orderKeys.getAscendingOrders()[0]) {
+throw new TableException("Primary sort order of a table must be 
ascending on time.");
+}
+}
+
+@Override
+protected Transformation translateOrder(
+PlannerBase planner,
+Transformation inputTransform,
+RowType inputRowType,
+ExecEdge inputEdge,
+ExecNodeConfig config) {
+if (isProcTime(inputRowType)) {
+// In proctime process records in the order they come.
+return inputTransform;
+}
+
+SortSpec sortSpec = matchSpec.getOrderKeys();
+RowType inputType = (RowType) inputEdge.getOutputType();
+SortCodeGenerator codeGen =
+new SortCodeGenerator(
+config, planner.getFlinkContext().getClassLoader(), 
inputType, sortSpec);
+SortOperator operator =

Review Comment:
   > Correct me if I'm wrong: In batch mode events for given key are buffered 
in CepOperator state and they all are processed when onEventTime(MAX_WATERMARK) 
is called. onEventTime() is called exactly once for each key. In onEventTime() 
CepOperator iterates MapState> in ascending order by key (ties 
are resolved using EventComparator comparator).
   
   Yes, that's my understanding as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28177) Elasticsearch6DynamicSinkITCase.testWritingDocumentsNoPrimaryKey failed with 503 Service Unavailable

2024-10-29 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893907#comment-17893907
 ] 

Ahmed Hamdy commented on FLINK-28177:
-

[~martijnvisser] I see the 
[PR|https://github.com/apache/flink-connector-elasticsearch/pull/48] is merged. 
Could we close this ticket?

> Elasticsearch6DynamicSinkITCase.testWritingDocumentsNoPrimaryKey failed with 
> 503 Service Unavailable
> 
>
> Key: FLINK-28177
> URL: https://issues.apache.org/jira/browse/FLINK-28177
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: KurtDing
>Priority: Critical
>  Labels: pull-request-available, stale-assigned, test-stability
> Attachments: image-2022-07-21-10-48-33-213.png
>
>
> {code:java}
> 2022-06-21T07:39:23.9065585Z Jun 21 07:39:23 [ERROR] Tests run: 4, Failures: 
> 0, Errors: 2, Skipped: 0, Time elapsed: 43.125 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkITCase
> 2022-06-21T07:39:23.9068457Z Jun 21 07:39:23 [ERROR] 
> org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkITCase.testWritingDocumentsNoPrimaryKey
>   Time elapsed: 8.697 s  <<< ERROR!
> 2022-06-21T07:39:23.9069955Z Jun 21 07:39:23 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> 2022-06-21T07:39:23.9071135Z Jun 21 07:39:23  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2022-06-21T07:39:23.9072225Z Jun 21 07:39:23  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2022-06-21T07:39:23.9073408Z Jun 21 07:39:23  at 
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:118)
> 2022-06-21T07:39:23.9075081Z Jun 21 07:39:23  at 
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:81)
> 2022-06-21T07:39:23.9076560Z Jun 21 07:39:23  at 
> org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkITCase.testWritingDocumentsNoPrimaryKey(Elasticsearch6DynamicSinkITCase.java:286)
> 2022-06-21T07:39:23.9078535Z Jun 21 07:39:23  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-06-21T07:39:23.9079534Z Jun 21 07:39:23  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-06-21T07:39:23.9080702Z Jun 21 07:39:23  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-06-21T07:39:23.9081838Z Jun 21 07:39:23  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-06-21T07:39:23.9082942Z Jun 21 07:39:23  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-06-21T07:39:23.9084127Z Jun 21 07:39:23  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-06-21T07:39:23.9085246Z Jun 21 07:39:23  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-06-21T07:39:23.9086380Z Jun 21 07:39:23  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-06-21T07:39:23.9087812Z Jun 21 07:39:23  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-06-21T07:39:23.9088843Z Jun 21 07:39:23  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-06-21T07:39:23.9089823Z Jun 21 07:39:23  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-06-21T07:39:23.9103797Z Jun 21 07:39:23  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-06-21T07:39:23.9105022Z Jun 21 07:39:23  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-06-21T07:39:23.9106065Z Jun 21 07:39:23  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-06-21T07:39:23.9107500Z Jun 21 07:39:23  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-06-21T07:39:23.9108591Z Jun 21 07:39:23  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-06-21T07:39:23.9109575Z Jun 21 07:39:23  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-06-21T07:39:23.9110606Z Jun 21 07:39:23  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-06-21T07:39:23.9111634Z Jun 21 07:39:23  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2022-06-21T07:39:23.9112653Z Jun 21 07:39:23  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:29

[jira] [Created] (FLINK-36623) Improve logging in DefaultStateTransitionManager

2024-10-29 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-36623:
-

 Summary: Improve logging in DefaultStateTransitionManager
 Key: FLINK-36623
 URL: https://issues.apache.org/jira/browse/FLINK-36623
 Project: Flink
  Issue Type: Improvement
Reporter: Roman Khachatryan
Assignee: Zdenek Tison
 Fix For: 1.20.1


When the job transitions from one state to another, e.g. restarts when new 
slots are available; it's not visible in the logs unless log.level is debug.

Therefore, it'd make sense to:
 # Change log level from DEBUG to INFO
 # Log job ID when such transition happens



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36455] Sinks retry synchronously [flink]

2024-10-29 Thread via GitHub


AHeise commented on code in PR #25547:
URL: https://github.com/apache/flink/pull/25547#discussion_r1820836505


##
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java:
##
@@ -164,41 +165,37 @@ public void notifyCheckpointComplete(long checkpointId) 
throws Exception {
 
 private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
 long completedCheckpointId = endInput ? EOI : 
lastCompletedCheckpointId;
-do {
-for (CheckpointCommittableManager manager :
-
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
-commitAndEmit(manager);
-}
-// !committableCollector.isFinished() indicates that we should 
retry
-// Retry should be done here if this is a final checkpoint 
(indicated by endInput)
-// WARN: this is an endless retry, may make the job stuck while 
finishing
-} while (!committableCollector.isFinished() && endInput);
-
-if (!committableCollector.isFinished()) {
-// if not endInput, we can schedule retrying later
-retryWithDelay();
+for (CheckpointCommittableManager checkpointManager :
+
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
+// ensure that all committables of the first checkpoint are fully 
committed before
+// attempting the next committable
+commitAndEmit(checkpointManager);
+committableCollector.remove(checkpointManager);
 }
-committableCollector.compact();
 }
 
 private void commitAndEmit(CheckpointCommittableManager 
committableManager)
 throws IOException, InterruptedException {
-Collection> committed = 
committableManager.commit(committer);
-if (emitDownstream && committableManager.isFinished()) {
-int subtaskId = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
-int numberOfSubtasks = 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
-output.collect(
-new 
StreamRecord<>(committableManager.getSummary(subtaskId, numberOfSubtasks)));
-for (CommittableWithLineage committable : committed) {
-output.collect(new 
StreamRecord<>(committable.withSubtaskId(subtaskId)));
-}
+committableManager.commit(committer, MAX_RETRIES);
+if (emitDownstream) {
+emit(committableManager);
 }
 }
 
-private void retryWithDelay() {
-processingTimeService.registerTimer(
-processingTimeService.getCurrentProcessingTime() + RETRY_DELAY,
-ts -> commitAndEmitCheckpoints());
+private void emit(CheckpointCommittableManager committableManager) {
+int subtaskId = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+int numberOfSubtasks = 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();

Review Comment:
   What's the concern here? Peformance? JVM should inline the call to pretty 
much result into a field access.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36011] [runtime] Generalize RescaleManager to become StateTransitionManager [flink]

2024-10-29 Thread via GitHub


ztison commented on code in PR #25280:
URL: https://github.com/apache/flink/pull/25280#discussion_r1820968233


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java:
##
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.Temporal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.Supplier;
+
+/**
+ * {@code DefaultStateTransitionManager} is a state machine which manages the 
{@link
+ * AdaptiveScheduler}'s state transitions based on the previous transition 
time and the available
+ * resources. See {@link Phase} for details on each individual phase of this 
state machine. Note: We
+ * use the term phase here to avoid confusion with the state used in the 
{@link AdaptiveScheduler}.
+ *
+ * 
+ * {@link Cooldown}
+ *   |
+ *   +--> {@link Idling}
+ *   |  |
+ *   |  V
+ *   +--> {@link Stabilizing}
+ *  |
+ *  +--> {@link Stabilized} --> {@link Idling}
+ *  |  |
+ *  |  V
+ *  \--> {@link Transitioning}
+ * 
+ *
+ * Thread-safety: This class is not implemented in a thread-safe manner and 
relies on the fact
+ * that any method call happens within a single thread.
+ *
+ * @see Executing
+ */
+@NotThreadSafe
+public class DefaultStateTransitionManager implements StateTransitionManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStateTransitionManager.class);
+
+private final Supplier clock;
+private final StateTransitionManager.Context transitionContext;
+private Phase phase;
+private final List> scheduledFutures;
+
+@VisibleForTesting final Duration cooldownTimeout;
+@Nullable @VisibleForTesting final Duration resourceStabilizationTimeout;
+@VisibleForTesting final Duration maxTriggerDelay;
+
+DefaultStateTransitionManager(
+Temporal initializationTime,
+StateTransitionManager.Context transitionContext,
+Duration cooldownTimeout,
+@Nullable Duration resourceStabilizationTimeout,
+Duration maxTriggerDelay) {
+this(
+initializationTime,
+Instant::now,
+transitionContext,
+cooldownTimeout,
+resourceStabilizationTimeout,
+maxTriggerDelay);
+}
+
+@VisibleForTesting
+DefaultStateTransitionManager(
+Temporal initializationTime,
+Supplier clock,
+StateTransitionManager.Context transitionContext,
+Duration cooldownTimeout,
+@Nullable Duration resourceStabilizationTimeout,
+Duration maxTriggerDelay) {
+
+this.clock = clock;
+this.maxTriggerDelay = maxTriggerDelay;
+this.cooldownTimeout = cooldownTimeout;
+this.resourceStabilizationTimeout = resourceStabilizationTimeout;
+this.transitionContext = transitionContext;
+this.scheduledFutures = new ArrayList<>();
+this.phase = new Cooldown(initializationTime, clock, this, 
cooldownTimeout);
+}
+
+@Override
+public void onChange() {
+phase.onChange();
+}
+
+@Override
+public void onTrigger() {
+phase.onTrigger();
+}
+
+@Override
+public void close() {
+scheduledFutures.forEach(future -> future.cancel(true));
+scheduledFutures.clear();
+}
+
+@VisibleForTesting
+Phase getPhase() {
+return phase;
+}
+
+private void progressToIdling() {
+progressToPhase(new Idling(clock, this));
+}
+
+private void progressToStabilizing(Temporal firstChangeEventTimestamp) {
+progressToPhase(
+new Stabilizing(
+clock,
+   

Re: [PR] [hotfix] Replace System.out.println with logger for better log management [flink-connector-gcp-pubsub]

2024-10-29 Thread via GitHub


snuyanzin commented on code in PR #31:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/31#discussion_r1821436745


##
flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java:
##
@@ -21,11 +21,14 @@
 import com.google.protobuf.ByteString;
 import com.google.pubsub.v1.PubsubMessage;
 import com.google.pubsub.v1.TopicName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
 
 /** Helper class to send PubSubMessages to a PubSub topic. */
 class PubSubPublisher {
+private static final Logger LOG = 
LoggerFactory.getLogger(PubSubExample.class);

Review Comment:
   ```suggestion
   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubPublisher.class);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32315) Support local file upload in K8s mode

2024-10-29 Thread Ferenc Csaky (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893734#comment-17893734
 ] 

Ferenc Csaky commented on FLINK-32315:
--

When I developed this, the "s3" related config options required to be added to 
the {{{}flink-conf.yaml{}}}, cause those props are not picked up if they given 
as a dynamic param during. For Flink 1.20 it was already renamed 
{{{}config.yaml{}}}, but that should not matter for this particular case I 
believe.

I have the following gist about how I was setting up Minio, which might be 
outdated, cause it uses the latest img, but still could be helpful: 
[https://gist.github.com/ferenc-csaky/fd7fee71d89cd389cac2da4a4471ab65]

> Support local file upload in K8s mode
> -
>
> Key: FLINK-32315
> URL: https://issues.apache.org/jira/browse/FLINK-32315
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Deployment / Kubernetes
>Reporter: Paul Lin
>Assignee: Ferenc Csaky
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-10-29-11-32-25-979.png
>
>
> Currently, Flink assumes all resources are locally accessible in the pods, 
> which requires users to prepare the resources by mounting storages, 
> downloading resources with init containers, or rebuilding images for each 
> execution.
> We could make things much easier by introducing a built-in file distribution 
> mechanism based on Flink-supported filesystems. It's implemented in two steps:
>  
> 1. KubernetesClusterDescripter uploads all local resources to remote storage 
> via Flink filesystem (skips if the resources are already remote).
> 2. KubernetesApplicationClusterEntrypoint and KubernetesTaskExecutorRunner 
> download the resources and put them in the classpath during startup.
>  
> The 2nd step is mostly done by 
> [FLINK-28915|https://issues.apache.org/jira/browse/FLINK-28915], thus this 
> issue is focused on the upload part.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32315) Support local file upload in K8s mode

2024-10-29 Thread Ferenc Csaky (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893734#comment-17893734
 ] 

Ferenc Csaky edited comment on FLINK-32315 at 10/29/24 8:31 AM:


When I developed this, the "s3" related config options required to be added to 
the {{flink-conf.yaml}}, cause those props are not picked up if they given as a 
dynamic param in the {{flink run}} command. For Flink 1.20 it was already 
renamed {{config.yaml}}, but that should not matter for this particular case I 
believe.

I have the following gist about how I was setting up Minio, which might be 
outdated, cause it uses the latest img, but still could be helpful: 
[https://gist.github.com/ferenc-csaky/fd7fee71d89cd389cac2da4a4471ab65]


was (Author: JIRAUSER306586):
When I developed this, the "s3" related config options required to be added to 
the {{{}flink-conf.yaml{}}}, cause those props are not picked up if they given 
as a dynamic param during. For Flink 1.20 it was already renamed 
{{{}config.yaml{}}}, but that should not matter for this particular case I 
believe.

I have the following gist about how I was setting up Minio, which might be 
outdated, cause it uses the latest img, but still could be helpful: 
[https://gist.github.com/ferenc-csaky/fd7fee71d89cd389cac2da4a4471ab65]

> Support local file upload in K8s mode
> -
>
> Key: FLINK-32315
> URL: https://issues.apache.org/jira/browse/FLINK-32315
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Deployment / Kubernetes
>Reporter: Paul Lin
>Assignee: Ferenc Csaky
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-10-29-11-32-25-979.png
>
>
> Currently, Flink assumes all resources are locally accessible in the pods, 
> which requires users to prepare the resources by mounting storages, 
> downloading resources with init containers, or rebuilding images for each 
> execution.
> We could make things much easier by introducing a built-in file distribution 
> mechanism based on Flink-supported filesystems. It's implemented in two steps:
>  
> 1. KubernetesClusterDescripter uploads all local resources to remote storage 
> via Flink filesystem (skips if the resources are already remote).
> 2. KubernetesApplicationClusterEntrypoint and KubernetesTaskExecutorRunner 
> download the resources and put them in the classpath during startup.
>  
> The 2nd step is mostly done by 
> [FLINK-28915|https://issues.apache.org/jira/browse/FLINK-28915], thus this 
> issue is focused on the upload part.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36497][table]Remove all deprecated methods `CatalogTable` [flink]

2024-10-29 Thread via GitHub


flinkbot commented on PR #25585:
URL: https://github.com/apache/flink/pull/25585#issuecomment-2443475748

   
   ## CI report:
   
   * 33ea97c1bdd9ca2a9e0265600ae2ea36cad24476 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics after serialization/deserialization for cdc formats

2024-10-29 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-36547:
-
Description: 
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in 
many scenarios. we have taken advantage of the difference between UPDATE_BEFORE 
and UPDATE_AFTER to implement the feature and made it run well in bussiness.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!

  was:
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent 
在需要区别处理的场景下in many scenarios. we have taken advantage of the difference between 
UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in 
bussiness.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!


> Add option to retain `RowKind` sematics after serialization/deserialization 
> for cdc formats
> ---
>
> Key: FLINK-36547
> URL: https://issues.apache.org/jira/browse/FLINK-36547
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 2.0.0
>Reporter: Yubin Li
>Priority: Major
> Attachments: image-2024-10-16-11-01-54-790.png, 
> image-2024-10-16-11-02-34-406.png
>
>
> As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> 
> +I
> {code:java}
> Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
> as Debezium JSON or Avro messages, and emit to external systems like Kafka. 
> However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
> single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and 
> UDPATE_AFTER as DELETE and INSERT Debezium messages. {code}
> In fact, we also have a demand to make the `RowKind` sematics consistent in 
> many scenarios. we have taken advantage of the difference between 
> UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well 
> in bussiness.
> {code:java}
> create table datagen1 (id int, name string) with 
> ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
> 'fields.id.max'='2');
> // add 'debezium-json.retain.rowkind' = 'true'
> create table t2 (id int, name string, num bigint) WITH (
>     'topic' = 't2', 
>     'con

[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics after serialization/deserialization for cdc formats

2024-10-29 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-36547:
-
Description: 
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in 
many scenarios, such as those that require different processing of -U/-D and 
+U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
UPDATE_AFTER to implement the feature and made it run well in bussiness.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!

  was:
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in 
many scenarios. we have taken advantage of the difference between UPDATE_BEFORE 
and UPDATE_AFTER to implement the feature and made it run well in bussiness.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!


> Add option to retain `RowKind` sematics after serialization/deserialization 
> for cdc formats
> ---
>
> Key: FLINK-36547
> URL: https://issues.apache.org/jira/browse/FLINK-36547
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 2.0.0
>Reporter: Yubin Li
>Priority: Major
> Attachments: image-2024-10-16-11-01-54-790.png, 
> image-2024-10-16-11-02-34-406.png
>
>
> As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> 
> +I
> {code:java}
> Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
> as Debezium JSON or Avro messages, and emit to external systems like Kafka. 
> However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
> single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and 
> UDPATE_AFTER as DELETE and INSERT Debezium messages. {code}
> In fact, we also have a demand to make the `RowKind` sematics consistent in 
> many scenarios, such as those that require different processing of -U/-D and 
> +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
> UPDATE_AFTER to implement the feature and made it run well in bussiness.
> {code:java}
> create table datagen1 (id int, name string) with 
> ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
> 'fields.id.max'='2');
> // add 'debezi

[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics after serialization/deserialization for cdc formats

2024-10-29 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-36547:
-
Description: 
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent 
在需要区别处理的场景下in many scenarios. we have taken advantage of the difference between 
UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in 
bussiness.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!

  was:
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in 
many scenarios. we have taken advantage of the difference between UPDATE_BEFORE 
and UPDATE_AFTER to implement the feature and made it run well in bussiness.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!


> Add option to retain `RowKind` sematics after serialization/deserialization 
> for cdc formats
> ---
>
> Key: FLINK-36547
> URL: https://issues.apache.org/jira/browse/FLINK-36547
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 2.0.0
>Reporter: Yubin Li
>Priority: Major
> Attachments: image-2024-10-16-11-01-54-790.png, 
> image-2024-10-16-11-02-34-406.png
>
>
> As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> 
> +I
> {code:java}
> Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
> as Debezium JSON or Avro messages, and emit to external systems like Kafka. 
> However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
> single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and 
> UDPATE_AFTER as DELETE and INSERT Debezium messages. {code}
> In fact, we also have a demand to make the `RowKind` sematics consistent 
> 在需要区别处理的场景下in many scenarios. we have taken advantage of the difference 
> between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it 
> run well in bussiness.
> {code:java}
> create table datagen1 (id int, name string) with 
> ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
> 'fields.id.max'='2');
> // add 'debezium-json.retain.rowkind' = 'true'
> create table t2 (id int, name string, num bigint) WITH (
>     'topic' = 't2', 

Re: [PR] [FLINK-35268][state] Add ttl interface for Async State API && implement TtlListStateV2/TtlValueStateV2 [flink]

2024-10-29 Thread via GitHub


fredia commented on code in PR #25515:
URL: https://github.com/apache/flink/pull/25515#discussion_r1820233250


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java:
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.ttl;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.StateIterator;
+import org.apache.flink.runtime.state.ttl.TtlStateContext;
+import org.apache.flink.runtime.state.ttl.TtlUtils;
+import org.apache.flink.runtime.state.ttl.TtlValue;
+import org.apache.flink.runtime.state.v2.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState
+extends AbstractTtlState, InternalListState>>
+implements InternalListState {
+
+protected TtlListState(
+TtlStateContext>, T> 
ttlStateContext) {
+super(ttlStateContext);
+}
+
+@Override
+public StateFuture asyncUpdate(List values) {
+Preconditions.checkNotNull(values, "List of values to add cannot be 
null.");
+return original.asyncUpdate(withTs(values));
+}
+
+@Override
+public StateFuture asyncAddAll(List values) {
+Preconditions.checkNotNull(values, "List of values to add cannot be 
null.");
+return original.asyncAddAll(withTs(values));
+}
+
+@Override
+public StateFuture> asyncGet() {
+// 1. The timestamp of elements in list state isn't updated when get 
even if updateTsOnRead
+// is true.
+// 2. we don't clear state here cause forst is LSM-tree based.
+return original.asyncGet().thenApply(stateIter -> new 
AsyncIteratorWrapper(stateIter));
+}
+
+@Override
+public StateFuture asyncAdd(T value) {
+return original.asyncAdd(value == null ? null : wrapWithTs(value));
+}
+
+@Override
+public Iterable get() {
+Iterable> ttlValue = original.get();
+ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
+final Iterable> finalResult = ttlValue;
+return () -> new IteratorWithCleanup(finalResult.iterator());
+}
+
+@Override
+public void add(T value) {
+original.add(value == null ? null : wrapWithTs(value));
+}
+
+@Override
+public void update(List values) {
+Preconditions.checkNotNull(values, "List of values to add cannot be 
null.");
+original.update(withTs(values));
+}
+
+@Override
+public void addAll(List values) {
+Preconditions.checkNotNull(values, "List of values to add cannot be 
null.");
+original.addAll(withTs(values));
+}
+
+private  List collect(Iterable iterable) {
+if (iterable instanceof List) {
+return (List) iterable;
+} else {
+List list = new ArrayList<>();
+for (E element : iterable) {
+list.add(element);
+}
+return list;
+}
+}
+
+private List> withTs(List values) {
+long currentTimestamp = timeProvider.currentTimestamp();
+List> withTs = new ArrayList<>(values.size());
+for (T value : values) {
+Preconditions.checkNotNull(value, "You cannot have null element in 
a ListState.");
+withTs.add(TtlUtils.wrapWithTs(value, currentTimestamp));
+}
+return withTs;
+}
+
+private class IteratorWithCleanup implements Iterator {
+private final Iterator> originalIterator;
+private boolean anyUnexpired = false;
+private boolean uncleared = true;
+private T nextUnexpired = null;
+
+private IteratorWithCleanup(Iterator> ttlIterat

[jira] [Updated] (FLINK-36497) Remove all deprecated methods in `CatalogTable`

2024-10-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36497:
---
Labels: pull-request-available  (was: )

> Remove all deprecated methods in `CatalogTable`
> ---
>
> Key: FLINK-36497
> URL: https://issues.apache.org/jira/browse/FLINK-36497
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36497][table]Remove all deprecated methods `CatalogTable` [flink]

2024-10-29 Thread via GitHub


Edward-Gavin opened a new pull request, #25585:
URL: https://github.com/apache/flink/pull/25585

   
   
   ## What is the purpose of the change
   
   Remove all deprecated methods in `CatalogTable`
   
   ## Brief change log
   
   Remove all deprecated methods in `CatalogTable`
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats

2024-10-29 Thread zjjiang (Jira)
zjjiang created FLINK-36620:
---

 Summary: Add support for the flink-home parameter to be set in 
both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
 Key: FLINK-36620
 URL: https://issues.apache.org/jira/browse/FLINK-36620
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1, cdc-3.2.0, cdc-3.1.0
Reporter: zjjiang
 Fix For: cdc-3.3.0


Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink- 
home, which only supports space spacing. Users who use the 
"--flink-home=$FLINK_HOME" format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats

2024-10-29 Thread zjjiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjjiang updated FLINK-36620:

Description: 
Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
"--flink-home=$FLINK_HOME" format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.

  was:
Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink- 
home, which only supports space spacing. Users who use the 
"--flink-home=$FLINK_HOME" format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.


> Add support for the flink-home parameter to be set in both “--flink-home 
> $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
> 
>
> Key: FLINK-36620
> URL: https://issues.apache.org/jira/browse/FLINK-36620
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1
>Reporter: zjjiang
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Currently, most of FlinkCDC's command line arguments are supported in the 
> format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
> home, which only supports space spacing. Users who use the 
> "--flink-home=$FLINK_HOME" format on the command line (trying to be 
> consistent with the other = spacing arguments) will not be able to set flink 
> home correctly.
> In particular, when there is an environment variable $FLINK_HOME and you want 
> to override it by setting --flink-home=/path/to/new/flink/home, you will find 
> that it does not work.
> We would like to support the flink-home parameter in both --flink-home 
> $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
> formatting differences and runtime exceptions when using command line 
> arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats

2024-10-29 Thread zjjiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjjiang updated FLINK-36620:

Description: 
Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
"-flink-home=$FLINK_HOME" format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.

  was:
Currently, most of FlinkCDC's command line arguments are supported in the 
format "-$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, 
which only supports space spacing. Users who use the "-flink-home=$FLINK_HOME" 
format on the command line (trying to be consistent with the other = spacing 
arguments) will not be able to set flink home correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.


> Add support for the flink-home parameter to be set in both “--flink-home 
> $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
> 
>
> Key: FLINK-36620
> URL: https://issues.apache.org/jira/browse/FLINK-36620
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1
>Reporter: zjjiang
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Currently, most of FlinkCDC's command line arguments are supported in the 
> format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
> home, which only supports space spacing. Users who use the 
> "-flink-home=$FLINK_HOME" format on the command line (trying to be consistent 
> with the other = spacing arguments) will not be able to set flink home 
> correctly.
> In particular, when there is an environment variable $FLINK_HOME and you want 
> to override it by setting --flink-home=/path/to/new/flink/home, you will find 
> that it does not work.
> We would like to support the flink-home parameter in both --flink-home 
> $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
> formatting differences and runtime exceptions when using command line 
> arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats

2024-10-29 Thread zjjiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjjiang updated FLINK-36620:

Description: 
Currently, most of FlinkCDC's command line arguments are supported in the 
format "-$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink home, 
which only supports space spacing. Users who use the "-flink-home=$FLINK_HOME" 
format on the command line (trying to be consistent with the other = spacing 
arguments) will not be able to set flink home correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.

  was:
Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
"--flink-home=$FLINK_HOME" format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.


> Add support for the flink-home parameter to be set in both “--flink-home 
> $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
> 
>
> Key: FLINK-36620
> URL: https://issues.apache.org/jira/browse/FLINK-36620
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1
>Reporter: zjjiang
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Currently, most of FlinkCDC's command line arguments are supported in the 
> format "-$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
> home, which only supports space spacing. Users who use the 
> "-flink-home=$FLINK_HOME" format on the command line (trying to be consistent 
> with the other = spacing arguments) will not be able to set flink home 
> correctly.
> In particular, when there is an environment variable $FLINK_HOME and you want 
> to override it by setting --flink-home=/path/to/new/flink/home, you will find 
> that it does not work.
> We would like to support the flink-home parameter in both --flink-home 
> $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
> formatting differences and runtime exceptions when using command line 
> arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats

2024-10-29 Thread zjjiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjjiang updated FLINK-36620:

Description: 
Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
"--flink-home=$FLINK_HOME" format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.

  was:
Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
"-flink-home=$FLINK_HOME" format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.


> Add support for the flink-home parameter to be set in both “--flink-home 
> $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
> 
>
> Key: FLINK-36620
> URL: https://issues.apache.org/jira/browse/FLINK-36620
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1
>Reporter: zjjiang
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Currently, most of FlinkCDC's command line arguments are supported in the 
> format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
> home, which only supports space spacing. Users who use the 
> "--flink-home=$FLINK_HOME" format on the command line (trying to be 
> consistent with the other = spacing arguments) will not be able to set flink 
> home correctly.
> In particular, when there is an environment variable $FLINK_HOME and you want 
> to override it by setting --flink-home=/path/to/new/flink/home, you will find 
> that it does not work.
> We would like to support the flink-home parameter in both --flink-home 
> $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
> formatting differences and runtime exceptions when using command line 
> arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]

2024-10-29 Thread via GitHub


pawel-big-lebowski commented on code in PR #130:
URL: 
https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1820485021


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java:
##
@@ -369,5 +416,43 @@ public ProducerRecord serialize(
 value,
 headerProvider != null ? 
headerProvider.getHeaders(element) : null);
 }
+
+@Override
+public Optional getKafkaDatasetFacet() {
+if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) {
+LOG.warn("Cannot identify topics. Not an 
TopicsIdentifierProvider");
+return Optional.empty();
+}
+
+Optional topicsIdentifier =
+((KafkaDatasetIdentifierProvider) 
(topicSelector)).getDatasetIdentifier();
+
+if (!topicsIdentifier.isPresent()) {
+LOG.warn("No topics' identifiers provided");
+return Optional.empty();
+}
+
+TypeInformation typeInformation;
+if (this.valueSerializationSchema instanceof ResultTypeQueryable) {
+typeInformation =
+((ResultTypeQueryable) 
this.valueSerializationSchema).getProducedType();
+} else {
+// gets type information from serialize method signature
+typeInformation =

Review Comment:
   `LineageGraph` in the flink-core contains separate lists of sources and 
sinks. Given that, I am not sure if we want to distinguish "inputType" from 
"outputType". From the facet perspective, this should be all `type` and the 
same facet can be used for both scenarios. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36245) Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated method/interface in Sink V2 in 2.0

2024-10-29 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893764#comment-17893764
 ] 

Piotr Nowojski commented on FLINK-36245:


Hi [~kunni], [~renqs] and [~leonard]. I think this change has made the build 
broken/unstable. Locally in the IntelliJ building Flink fails for me due to:

{code:java}
flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42
java: cannot access org.apache.flink.api.connector.sink2.StatefulSink
  class file for org.apache.flink.api.connector.sink2.StatefulSink not found
{code}
flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in 
turns is still referring to the StatefulSink:

{code:java}
public class KafkaSink implements StatefulSink, 
TwoPhaseCommittingSink (...)
{code}

 Maven builds might be working due to some dumb luck.


> Remove legacy SourceFunction / SinkFunction / Sink V1 API and deprecated 
> method/interface in Sink V2 in 2.0
> ---
>
> Key: FLINK-36245
> URL: https://issues.apache.org/jira/browse/FLINK-36245
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Common
>Reporter: Qingsheng Ren
>Assignee: LvYanquan
>Priority: Major
>  Labels: 2.0-related, pull-request-available
> Fix For: 2.0-preview
>
>
> SourceFunction, SinkFunction and Sink V1 API has been marked as deprecated 
> and should be removed in Flink 2.0.
> Considering SourceFunction / SinkFunction are heavily used in test cases for 
> building a simple data generator or a data validator, it could be a huge 
> amount of work to rewrite all these usages with Source and Sink V2 API. A 
> viable path for 2.0-preview version would be:
>  * Move SourceFunction, SinkFunction to an internal package, as a test util
>  * Rewrite all Sink V1 implementations with Sink V2 directly (the usage of 
> Sink V1 is low in the main repo)
> As a long term working item, all usages of SourceFunction and SinkFunction 
> will be replaced by Source and Sink API. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats

2024-10-29 Thread zjjiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjjiang updated FLINK-36620:

Description: 
Currently, most of FlinkCDC's command line arguments are supported in the 
format `--$KEY $VALUE` or `--$KEY=$VALUE`, e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
`--flink-home=$FLINK_HOME` format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.

  was:
Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
--flink-home=$FLINK_HOME format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.


> Add support for the flink-home parameter to be set in both “--flink-home 
> $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
> 
>
> Key: FLINK-36620
> URL: https://issues.apache.org/jira/browse/FLINK-36620
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1
>Reporter: zjjiang
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Currently, most of FlinkCDC's command line arguments are supported in the 
> format `--$KEY $VALUE` or `--$KEY=$VALUE`, e.g. --jar, but, except for flink 
> home, which only supports space spacing. Users who use the 
> `--flink-home=$FLINK_HOME` format on the command line (trying to be 
> consistent with the other = spacing arguments) will not be able to set flink 
> home correctly.
> In particular, when there is an environment variable $FLINK_HOME and you want 
> to override it by setting --flink-home=/path/to/new/flink/home, you will find 
> that it does not work.
> We would like to support the flink-home parameter in both --flink-home 
> $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
> formatting differences and runtime exceptions when using command line 
> arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]

2024-10-29 Thread via GitHub


AHeise commented on code in PR #130:
URL: 
https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1820444037


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java:
##
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import 
org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier;
+
+import java.util.Optional;
+
+/** Contains method which allows extracting topic identifier. */
+public interface KafkaDatasetIdentifierProvider {

Review Comment:
   I double-checked and it's not that common of a pattern. It's used only in
   
   ```
   OperatorCoordinator.java
   FileEnumerator.java
   DynamicFileEnumerator.java
   FileSplitAssigner.java
   InternalTimeServiceManager.java
   ```
   
   So you can just stick to the current approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]

2024-10-29 Thread via GitHub


AHeise commented on code in PR #130:
URL: 
https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1820446158


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/** Utility class with useful methods for managing dataset facets. */
+public class LineageUtil {
+
+private static final String KAFKA_DATASET_PREFIX = "kafka://";
+private static final String COMMA = ",";
+private static final String SEMICOLON = ";";
+
+public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet 
kafkaDatasetFacet) {
+return new LineageDataset() {
+@Override
+public String name() {
+if (kafkaDatasetFacet.topicIdentifier.topicPattern != null) {
+return kafkaDatasetFacet.topicIdentifier.toString();
+}
+
+return String.join(",", 
kafkaDatasetFacet.topicIdentifier.topics);
+}
+
+@Override
+public String namespace() {
+return namespace;
+}
+
+@Override
+public Map facets() {
+return Collections.singletonMap(
+KafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet);

Review Comment:
   Yes it would work well. Tbh I'm not sure if 1 or 2 is the better approach. 
Maybe @HuangZhenQiu can weigh in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]

2024-10-29 Thread via GitHub


AHeise commented on code in PR #130:
URL: 
https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1815659513


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java:
##
@@ -369,5 +416,43 @@ public ProducerRecord serialize(
 value,
 headerProvider != null ? 
headerProvider.getHeaders(element) : null);
 }
+
+@Override
+public Optional getKafkaDatasetFacet() {
+if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) {
+LOG.warn("Cannot identify topics. Not an 
TopicsIdentifierProvider");
+return Optional.empty();
+}
+
+Optional topicsIdentifier =
+((KafkaDatasetIdentifierProvider) 
(topicSelector)).getDatasetIdentifier();
+
+if (!topicsIdentifier.isPresent()) {
+LOG.warn("No topics' identifiers provided");
+return Optional.empty();
+}
+
+TypeInformation typeInformation;
+if (this.valueSerializationSchema instanceof ResultTypeQueryable) {
+typeInformation =
+((ResultTypeQueryable) 
this.valueSerializationSchema).getProducedType();
+} else {
+// gets type information from serialize method signature
+typeInformation =
+
Arrays.stream(this.valueSerializationSchema.getClass().getMethods())
+.map(m -> Invokable.from(m))
+.filter(m -> 
"serialize".equalsIgnoreCase(m.getName()))
+.map(m -> m.getParameters().get(0))
+.filter(p -> 
!p.getType().equals(TypeToken.of(Object.class)))
+.findFirst()
+.map(p -> p.getType())
+.map(t -> TypeInformation.of(t.getRawType()))
+.orElse(null);

Review Comment:
   This looks way more complicated as it should be. Here is what I had in mind.
   
   ```
   TypeToken serializationSchemaType = 
TypeToken.of(valueSerializationSchema.getClass());
   Class parameterType = 
serializationSchemaType.resolveType(SerializationSchema.class.getTypeParameters()[0]).getRawType();
   if (parameterType != Object.class) {
   typeInformation = TypeInformation.of(parameterType);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36616] fix npe in GcpPublisherConfig [flink-connector-gcp-pubsub]

2024-10-29 Thread via GitHub


stankiewicz commented on PR #33:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/33#issuecomment-2443713018

   fixed style errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34466] Lineage interfaces for kafka connector [flink-connector-kafka]

2024-10-29 Thread via GitHub


AHeise commented on code in PR #130:
URL: 
https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1820453815


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java:
##
@@ -369,5 +416,43 @@ public ProducerRecord serialize(
 value,
 headerProvider != null ? 
headerProvider.getHeaders(element) : null);
 }
+
+@Override
+public Optional getKafkaDatasetFacet() {
+if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) {
+LOG.warn("Cannot identify topics. Not an 
TopicsIdentifierProvider");
+return Optional.empty();
+}
+
+Optional topicsIdentifier =
+((KafkaDatasetIdentifierProvider) 
(topicSelector)).getDatasetIdentifier();
+
+if (!topicsIdentifier.isPresent()) {
+LOG.warn("No topics' identifiers provided");
+return Optional.empty();
+}
+
+TypeInformation typeInformation;
+if (this.valueSerializationSchema instanceof ResultTypeQueryable) {
+typeInformation =
+((ResultTypeQueryable) 
this.valueSerializationSchema).getProducedType();
+} else {
+// gets type information from serialize method signature
+typeInformation =

Review Comment:
   Yes TypeInformationFacet sounds like a general concept. I'm convinced you 
want to pull it out of the KafkaFacet now. You probably want to name it 
"inputType" and "outputType" depending on the type of the connector 
(source/sink). I'd design it generally and pull it up into flink-core for Flink 
2.0 later (so make it work in Kafka first and then propose to port it upwards).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats

2024-10-29 Thread zjjiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjjiang updated FLINK-36620:

Description: 
Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
--flink-home=$FLINK_HOME format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.

  was:
Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "$KEY=$VALUE", e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
--flink-home=$FLINK_HOME format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.


> Add support for the flink-home parameter to be set in both “--flink-home 
> $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
> 
>
> Key: FLINK-36620
> URL: https://issues.apache.org/jira/browse/FLINK-36620
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1
>Reporter: zjjiang
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Currently, most of FlinkCDC's command line arguments are supported in the 
> format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink 
> home, which only supports space spacing. Users who use the 
> --flink-home=$FLINK_HOME format on the command line (trying to be consistent 
> with the other = spacing arguments) will not be able to set flink home 
> correctly.
> In particular, when there is an environment variable $FLINK_HOME and you want 
> to override it by setting --flink-home=/path/to/new/flink/home, you will find 
> that it does not work.
> We would like to support the flink-home parameter in both --flink-home 
> $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
> formatting differences and runtime exceptions when using command line 
> arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats

2024-10-29 Thread zjjiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjjiang updated FLINK-36620:

Description: 
Currently, most of FlinkCDC's command line arguments are supported in the 
format `\-\-$KEY $VALUE`  or `\-\-$KEY=$VALUE`, e.g. --jar, but, except for 
flink home, which only supports space spacing. Users who use the 
`\-\-flink-home=$FLINK_HOME` format on the command line (trying to be 
consistent with the other = spacing arguments) will not be able to set flink 
home correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.

  was:
Currently, most of FlinkCDC's command line arguments are supported in the 
format `--$KEY $VALUE`  or `--$KEY=$VALUE`, e.g. --jar, but, except for flink 
home, which only supports space spacing. Users who use the 
`--flink-home=$FLINK_HOME` format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.


> Add support for the flink-home parameter to be set in both “--flink-home 
> $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
> 
>
> Key: FLINK-36620
> URL: https://issues.apache.org/jira/browse/FLINK-36620
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1
>Reporter: zjjiang
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Currently, most of FlinkCDC's command line arguments are supported in the 
> format `\-\-$KEY $VALUE`  or `\-\-$KEY=$VALUE`, e.g. --jar, but, except for 
> flink home, which only supports space spacing. Users who use the 
> `\-\-flink-home=$FLINK_HOME` format on the command line (trying to be 
> consistent with the other = spacing arguments) will not be able to set flink 
> home correctly.
> In particular, when there is an environment variable $FLINK_HOME and you want 
> to override it by setting --flink-home=/path/to/new/flink/home, you will find 
> that it does not work.
> We would like to support the flink-home parameter in both --flink-home 
> $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
> formatting differences and runtime exceptions when using command line 
> arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics for cdc formats

2024-10-29 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-36547:
-
Summary: Add option to retain `RowKind` sematics for cdc formats  (was: Add 
option to retain `RowKind` sematics after serialization/deserialization for cdc 
formats)

> Add option to retain `RowKind` sematics for cdc formats
> ---
>
> Key: FLINK-36547
> URL: https://issues.apache.org/jira/browse/FLINK-36547
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 2.0.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
> Attachments: image-2024-10-16-11-01-54-790.png, 
> image-2024-10-16-11-02-34-406.png
>
>
> As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> 
> +I
> {code:java}
> Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
> as Debezium JSON or Avro messages, and emit to external systems like Kafka. 
> However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
> single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and 
> UDPATE_AFTER as DELETE and INSERT Debezium messages. {code}
> In fact, we also have a demand to make the `RowKind` sematics consistent in 
> many scenarios, such as those that require different processing of -U/-D and 
> +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
> UPDATE_AFTER to implement the feature and made it run well in bussiness.
> {code:java}
> create table datagen1 (id int, name string) with 
> ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
> 'fields.id.max'='2');
> // add 'debezium-json.retain.rowkind' = 'true'
> create table t2 (id int, name string, num bigint) WITH (
>     'topic' = 't2', 
>     'connector' = 'kafka', 
>     'properties.bootstrap.servers' = 'xx', 
>     'properties.group.id' = 'test', 
>     'scan.startup.mode' = 'earliest-offset', 
>     'format' = 'debezium-json',
>     'key.format' = 'json',
>     'key.fields' = 'id',
>     'debezium-json.timestamp-format.standard' = 'ISO-8601', 
>     'debezium-json.schema-include' = 'false',
> 'debezium-json.retain.rowkind' = 'true'
> );
> insert into t2 select id, max(name) as name, count(1) as num from datagen1 
> group by id;
> insert into print1 select * from t2;
>  {code}
> output result:
> !image-2024-10-16-11-02-34-406.png|width=660,height=153!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics for cdc formats

2024-10-29 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-36547:
-
Description: 
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in 
many scenarios, such as those that require different processing of -U/-D and 
+U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
UPDATE_AFTER to implement the feature and made it run well in business.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!

  was:
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in 
many scenarios, such as those that require different processing of -U/-D and 
+U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
UPDATE_AFTER to implement the feature and made it run well in bussiness.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!


> Add option to retain `RowKind` sematics for cdc formats
> ---
>
> Key: FLINK-36547
> URL: https://issues.apache.org/jira/browse/FLINK-36547
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 2.0.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
> Attachments: image-2024-10-16-11-01-54-790.png, 
> image-2024-10-16-11-02-34-406.png
>
>
> As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> 
> +I
> {code:java}
> Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
> as Debezium JSON or Avro messages, and emit to external systems like Kafka. 
> However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
> single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and 
> UDPATE_AFTER as DELETE and INSERT Debezium messages. {code}
> In fact, we also have a demand to make the `RowKind` sematics consistent in 
> many scenarios, such as those that require different processing of -U/-D and 
> +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
> UPDATE_AFTER to implement the feature and made it run well in business.
> {code:java}
> create table datagen1 (id int, name string) with 
> ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
> 'fields.id.max

[jira] [Updated] (FLINK-36547) Add option to retain `RowKind` sematics for cdc formats

2024-10-29 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-36547:
-
Description: 
As official docs said, `RowKind` semantics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in 
many scenarios, such as those that require different processing of -U/-D and 
+U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
UPDATE_AFTER to implement the feature and made it run well in business.

implementation details: When serialization, -U/+U are both represented by u, 
the former has a non-empty `before` field and the latter has a non-empty 
`after` field; When deserializing data of type u, if `before` is not empty, 
parsed as -U; if `after` is not empty, parsed as +U.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!

  was:
As official docs said, `RowKind` semantics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in 
many scenarios, such as those that require different processing of -U/-D and 
+U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
UPDATE_AFTER to implement the feature and made it run well in business.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!


> Add option to retain `RowKind` sematics for cdc formats
> ---
>
> Key: FLINK-36547
> URL: https://issues.apache.org/jira/browse/FLINK-36547
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 2.0.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
> Attachments: image-2024-10-16-11-01-54-790.png, 
> image-2024-10-16-11-02-34-406.png
>
>
> As official docs said, `RowKind` semantics have been changed: -U -> -D, +D -> 
> +I
> {code:java}
> Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
> as Debezium JSON or Avro messages, and emit to external systems like Kafka. 
> However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
> single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and 
> UDPATE_AFTER as DELETE and INSERT Debezium messages. {code}
> In fact, we also have a demand to make the `RowKind` sematics consistent in 
> many scenarios, such as those that require different processing of -U/-D and 
> +U/+I. we have t

Re: [PR] [FLINK-36607][table-planner] Introduce AdaptiveBroadcastJoinProcessor to inject adaptive broadcast join. [flink]

2024-10-29 Thread via GitHub


JunRuiLee commented on code in PR #25578:
URL: https://github.com/apache/flink/pull/25578#discussion_r1820633873


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala:
##
@@ -275,4 +281,32 @@ object JoinUtil {
   rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)
 }
   }
+
+  def getLargeManagedMemory(joinType: FlinkJoinType, config: ExecNodeConfig): 
Long = {
+val hashJoinManagedMemory =
+  
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY).getBytes
+// The memory used by SortMergeJoinIterator that buffer the matched rows, 
each side needs
+// this memory if it is full outer join
+val externalBufferMemory =
+  
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY).getBytes
+// The memory used by BinaryExternalSorter for sort, the left and right 
side both need it
+val sortMemory = 
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes
+var externalBufferNum = 1
+if (joinType eq FlinkJoinType.FULL) externalBufferNum = 2
+val sortMergeJoinManagedMemory = externalBufferMemory * externalBufferNum 
+ sortMemory * 2
+// Due to hash join maybe fallback to sort merge join, so here managed 
memory choose the
+// large one
+Math.max(hashJoinManagedMemory, sortMergeJoinManagedMemory)
+  }
+
+  def getJoinStrategyHint(relHints: ImmutableList[RelHint], joinStrategy: 
JoinStrategy): Boolean = {

Review Comment:
   I prefer to rename this method to isJoinStrategyHintMatched.



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveBroadcastJoin.java:
##
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.AdaptiveBroadcastJoin;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.planner.adaptive.AdaptiveBroadcastJoinOperatorGenerator;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.OperatorType;
+import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinFunction;
+import 
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveBroadcastJoinOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link BatchExecNode} for Adaptive Broadcast Join. */
+public class BatchExecAdaptiveBroadcastJoin extends ExecNodeBase
+implements BatchExecNode, 
SingleTransformationTranslator {
+
+private final FlinkJoinType joinType;

  1   2   >