[jira] [Created] (FLINK-36811) MySQL CDC

2024-11-27 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-36811:
--

 Summary: MySQL CDC
 Key: FLINK-36811
 URL: https://issues.apache.org/jira/browse/FLINK-36811
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36811) MySQL CDC source set is processing backlog during snapshot phase

2024-11-27 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su reassigned FLINK-36811:
--

Assignee: Ruan Hang
 Summary: MySQL CDC source set is processing backlog during snapshot phase  
(was: MySQL CDC)

> MySQL CDC source set is processing backlog during snapshot phase
> 
>
> Key: FLINK-36811
> URL: https://issues.apache.org/jira/browse/FLINK-36811
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Xuannan Su
>Assignee: Ruan Hang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36812) The flink jdbc connector's 'scan.partition.column' supports the varchar field type.

2024-11-27 Thread ouyangwulin (Jira)
ouyangwulin created FLINK-36812:
---

 Summary: The flink jdbc connector's 'scan.partition.column' 
supports the varchar field type.
 Key: FLINK-36812
 URL: https://issues.apache.org/jira/browse/FLINK-36812
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Affects Versions: jdbc-3.2.0
Reporter: ouyangwulin
 Fix For: jdbc-3.3.0


The scan.partition.column must be a numeric, date, or timestamp column from the 
table in question.  But in many cases, tables don't have numeric, date, or 
timestamp columns, and we need to support a varchar field to make concurrent 
ingest useful in a wider range of scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35670) Support Postgres CDC Pipeline Source

2024-11-27 Thread ouyangwulin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901604#comment-17901604
 ] 

ouyangwulin commented on FLINK-35670:
-

I investigated the implementation, it is really very difficult, pgoutput does 
not have ddl, decoderbuf plug-in has ddl, but the difficulty is not so 
difficult, we can consider using decoderbuf to parse to ddl to achieve a 
version.

> Support Postgres CDC Pipeline Source
> 
>
> Key: FLINK-35670
> URL: https://issues.apache.org/jira/browse/FLINK-35670
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Muhammet Orazov
>Priority: Major
>
> Similar to other [CDC pipeline 
> sources|https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/pipeline-connectors/overview/]
>  (only MySQL at the moment), we should support Postgres as a pipeline source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36813) MySQLCDC supports synchronization of specified fields

2024-11-27 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36813:
--
Description: 
*Background*
In some scenarios, MySQL synchronization only expects to synchronize specified 
fields instead of all fields in the table.
1. The user only has the permission for some fields in MySQL
2. The user has too many fields in a single table and only wants to synchronize 
some fields, for example, here 
[https://github.com/apache/flink-cdc/discussions/3058]

*Current situation*
For the incremental stage, you only need to configure the column.include.list 
property of debezium to support the synchronization of some fields in the 
incremental stage, refer to: 
[https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list]

For the full snapshot stage, * is currently used in 
{_}MySqlSnapshotSplitReadTask{_}, refer to
{code:java}
if (isScanningData) {
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); 
{code}
 

*Solution*
We can refer to debezium 
[RelationalSnapshotChangeEventSource](https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java#L752-L776),
 The user configures column.include.list, and then captures the specific 
columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan 
SQL.

  was:
*Background*
In some scenarios, MySQL synchronization only expects to synchronize specified 
fields instead of all fields in the table.
1. The user only has the permission for some fields in MySQL
2. The user has too many fields in a single table and only wants to synchronize 
some fields, for example, here 
https://github.com/apache/flink-cdc/discussions/3058

*Current situation*
For the incremental stage, you only need to configure the column.include.list 
property of debezium to support the synchronization of some fields in the 
incremental stage, refer to: 
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list

For the full snapshot stage, * is currently used in 
{_}MySqlSnapshotSplitReadTask{_}, refer to
{code:java}
if (isScanningData) {
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); 
{code}
 

*Solution*
The user configures {_}column.include.list{_}, and then captures the specific 
columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan 
SQL.


> MySQLCDC supports synchronization of specified fields
> -
>
> Key: FLINK-36813
> URL: https://issues.apache.org/jira/browse/FLINK-36813
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.1
>Reporter: Di Wu
>Priority: Major
>  Labels: CDC, pull-request-available
> Fix For: cdc-3.3.0
>
>
> *Background*
> In some scenarios, MySQL synchronization only expects to synchronize 
> specified fields instead of all fields in the table.
> 1. The user only has the permission for some fields in MySQL
> 2. The user has too many fields in a single table and only wants to 
> synchronize some fields, for example, here 
> [https://github.com/apache/flink-cdc/discussions/3058]
> *Current situation*
> For the incremental stage, you only need to configure the column.include.list 
> property of debezium to support the synchronization of some fields in the 
> incremental stage, refer to: 
> [https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list]
> For the full snapshot stage, * is currently used in 
> {_}MySqlSnapshotSplitReadTask{_}, refer to
> {code:java}
> if (isScanningData) {
> return buildSelectWithRowLimits(
> tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); 
> {code}
>  
> *Solution*
> We can refer to debezium 
> [RelationalSnapshotChangeEventSource](https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java#L752-L776),
>  The user configures column.include.list, and then captures the specific 
> columns in MySqlSnapshotSplitReadTask, and splices them when constructing 
> Scan SQL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36751) PausableRelativeClock does not pause when the source only has one split

2024-11-27 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan reassigned FLINK-36751:
-

Assignee: haishui

> PausableRelativeClock does not pause when the source only has one split
> ---
>
> Key: FLINK-36751
> URL: https://issues.apache.org/jira/browse/FLINK-36751
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Affects Versions: 1.20.0, 1.19.1, 2.0-preview
>Reporter: haishui
>Assignee: haishui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Reason:
> PausableRelativeClock#pause is called at pauseOrResumeSplits in 
> ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when 
> the sourceOperator has more than one splits.
>  
> My example code tested on Flink 1.20-SNAPSHOT is as follows:
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataGeneratorSource dataGen1 = new DataGeneratorSource<>(v -> v, 
> Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG);
> DataGeneratorSource dataGen2 = new DataGeneratorSource<>(v -> v + 500, 
> Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG);
> WatermarkStrategy watermarkStrategy = WatermarkStrategy
>         .forMonotonousTimestamps()
>         .withTimestampAssigner((aLong, l) -> aLong)
>         .withWatermarkAlignment("default", Duration.ofMillis(5), 
> Duration.ofSeconds(5))
>         .withIdleness(Duration.ofSeconds(5));
> DataStreamSource s1 = env.fromSource(dataGen1, watermarkStrategy, "S1");
> DataStream s2 = env.fromSource(dataGen2, watermarkStrategy, "S2");
> s1.print("S1");
> s2.print("S2");
> s1.keyBy(v -> 0)
>         .connect(s2.keyBy(v -> 0))
>         .process(new CoProcessFunction() {
>             @Override
>             public void processElement1(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception {
>                 if (context.timestamp() < 
> context.timerService().currentWatermark()) {
>                     throw new IllegalStateException("left stream element is 
> late: " + aLong);
>                 }
>             }            @Override
>             public void processElement2(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception {
>                 if (context.timestamp() < 
> context.timerService().currentWatermark()) {
>                     throw new IllegalStateException("right stream element is 
> late: " + aLong);
>                 }
>             }
>         });
> env.execute();{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36751) PausableRelativeClock does not pause when the source only has one split

2024-11-27 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901589#comment-17901589
 ] 

Roman Khachatryan commented on FLINK-36751:
---

Merged into master as 0c338ae6d21e6d3dc79764e7801cbb7adfe6a44f

into 1.20 as 98045aad2c0ace527893bb20a9daa054aa48adbf

into 1.19 as d917462ee6492d83559d9e602a82230a850272a1

> PausableRelativeClock does not pause when the source only has one split
> ---
>
> Key: FLINK-36751
> URL: https://issues.apache.org/jira/browse/FLINK-36751
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Affects Versions: 1.20.0, 1.19.1, 2.0-preview
>Reporter: haishui
>Assignee: haishui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Reason:
> PausableRelativeClock#pause is called at pauseOrResumeSplits in 
> ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when 
> the sourceOperator has more than one splits.
>  
> My example code tested on Flink 1.20-SNAPSHOT is as follows:
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataGeneratorSource dataGen1 = new DataGeneratorSource<>(v -> v, 
> Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG);
> DataGeneratorSource dataGen2 = new DataGeneratorSource<>(v -> v + 500, 
> Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG);
> WatermarkStrategy watermarkStrategy = WatermarkStrategy
>         .forMonotonousTimestamps()
>         .withTimestampAssigner((aLong, l) -> aLong)
>         .withWatermarkAlignment("default", Duration.ofMillis(5), 
> Duration.ofSeconds(5))
>         .withIdleness(Duration.ofSeconds(5));
> DataStreamSource s1 = env.fromSource(dataGen1, watermarkStrategy, "S1");
> DataStream s2 = env.fromSource(dataGen2, watermarkStrategy, "S2");
> s1.print("S1");
> s2.print("S2");
> s1.keyBy(v -> 0)
>         .connect(s2.keyBy(v -> 0))
>         .process(new CoProcessFunction() {
>             @Override
>             public void processElement1(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception {
>                 if (context.timestamp() < 
> context.timerService().currentWatermark()) {
>                     throw new IllegalStateException("left stream element is 
> late: " + aLong);
>                 }
>             }            @Override
>             public void processElement2(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception {
>                 if (context.timestamp() < 
> context.timerService().currentWatermark()) {
>                     throw new IllegalStateException("right stream element is 
> late: " + aLong);
>                 }
>             }
>         });
> env.execute();{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36751) PausableRelativeClock does not pause when the source only has one split

2024-11-27 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan closed FLINK-36751.
-
Resolution: Fixed

Thanks for the contribution [~haishui] !

> PausableRelativeClock does not pause when the source only has one split
> ---
>
> Key: FLINK-36751
> URL: https://issues.apache.org/jira/browse/FLINK-36751
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Affects Versions: 1.20.0, 1.19.1, 2.0-preview
>Reporter: haishui
>Assignee: haishui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Reason:
> PausableRelativeClock#pause is called at pauseOrResumeSplits in 
> ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when 
> the sourceOperator has more than one splits.
>  
> My example code tested on Flink 1.20-SNAPSHOT is as follows:
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataGeneratorSource dataGen1 = new DataGeneratorSource<>(v -> v, 
> Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG);
> DataGeneratorSource dataGen2 = new DataGeneratorSource<>(v -> v + 500, 
> Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG);
> WatermarkStrategy watermarkStrategy = WatermarkStrategy
>         .forMonotonousTimestamps()
>         .withTimestampAssigner((aLong, l) -> aLong)
>         .withWatermarkAlignment("default", Duration.ofMillis(5), 
> Duration.ofSeconds(5))
>         .withIdleness(Duration.ofSeconds(5));
> DataStreamSource s1 = env.fromSource(dataGen1, watermarkStrategy, "S1");
> DataStream s2 = env.fromSource(dataGen2, watermarkStrategy, "S2");
> s1.print("S1");
> s2.print("S2");
> s1.keyBy(v -> 0)
>         .connect(s2.keyBy(v -> 0))
>         .process(new CoProcessFunction() {
>             @Override
>             public void processElement1(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception {
>                 if (context.timestamp() < 
> context.timerService().currentWatermark()) {
>                     throw new IllegalStateException("left stream element is 
> late: " + aLong);
>                 }
>             }            @Override
>             public void processElement2(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception {
>                 if (context.timestamp() < 
> context.timerService().currentWatermark()) {
>                     throw new IllegalStateException("right stream element is 
> late: " + aLong);
>                 }
>             }
>         });
> env.execute();{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] (1.19 backport)[FLINK-36751] Fix PausableRelativeClock does not pause when the source only has one split [flink]

2024-11-27 Thread via GitHub


rkhachatryan merged PR #25695:
URL: https://github.com/apache/flink/pull/25695


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Handle map and array types binary record data [flink-cdc]

2024-11-27 Thread via GitHub


umeshdangat commented on code in PR #3434:
URL: https://github.com/apache/flink-cdc/pull/3434#discussion_r1861307255


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataFieldSerializer.java:
##
@@ -39,7 +39,14 @@ public class DataFieldSerializer extends 
TypeSerializerSingleton {
 public static final DataFieldSerializer INSTANCE = new 
DataFieldSerializer();
 
 private final StringSerializer stringSerializer = 
StringSerializer.INSTANCE;
-private final DataTypeSerializer dataTypeSerializer = new 
DataTypeSerializer();
+private DataTypeSerializer dataTypeSerializer;
+
+private DataTypeSerializer getDataTypeSerializer() {

Review Comment:
   > I came across a bug due to circular dependency between 
DataFieldSerializer, DataTypeSerializer, and RowTypeSerializer classes that 
causes NPE for certain RowTypes like nested row type. I have baked that change 
in this PR but also have a separate one 
https://github.com/apache/flink-cdc/pull/3400
   
   [explained earlier 
here](https://github.com/apache/flink-cdc/pull/3434#issuecomment-2192072774)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]

2024-11-27 Thread via GitHub


snuyanzin commented on PR #24829:
URL: https://github.com/apache/flink/pull/24829#issuecomment-2504944809

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-36756) Bump up the sql-gateway rest api version

2024-11-27 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang reassigned FLINK-36756:
-

Assignee: Shengkai Fang

> Bump up the sql-gateway rest api version
> 
>
> Key: FLINK-36756
> URL: https://issues.apache.org/jira/browse/FLINK-36756
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Shengkai Fang
>Assignee: Shengkai Fang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36756][sql-gateway] Bump up the sql gateway rest version [flink]

2024-11-27 Thread via GitHub


fsk119 opened a new pull request, #25705:
URL: https://github.com/apache/flink/pull/25705

   
   
   ## What is the purpose of the change
   
   *Bump up the SQL Gateway REST API verison.*
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36756) Bump up the sql-gateway rest api version

2024-11-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36756:
---
Labels: pull-request-available  (was: )

> Bump up the sql-gateway rest api version
> 
>
> Key: FLINK-36756
> URL: https://issues.apache.org/jira/browse/FLINK-36756
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Shengkai Fang
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36803][cdc-connector][base] Use the same format `tableId:chunkId` for splitId in SnapshotSplit [flink-cdc]

2024-11-27 Thread via GitHub


yuxiqian commented on code in PR #3763:
URL: https://github.com/apache/flink-cdc/pull/3763#discussion_r1861478089


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java:
##
@@ -152,7 +152,7 @@ public Offset createNoStoppingOffset() {
 private SchemalessSnapshotSplit constuctSchemalessSnapshotSplit() {
 return new SchemalessSnapshotSplit(
 tableId,
-"test",
+tableId.toString() + ":0",

Review Comment:
   Is is possible to make it `@VisibleForTesting` and call `generateSplitId`, 
to reduce potential discrepancy?



##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java:
##
@@ -190,7 +190,7 @@ private static MySqlSchemalessSnapshotSplit 
getTestSchemalessSnapshotSplit(
 TableId tableId, int splitNo) {
 return new MySqlSchemalessSnapshotSplit(
 tableId,
-tableId.toString() + "-" + splitNo,
+tableId.toString() + ":" + splitNo,

Review Comment:
   Ditto



##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SnapshotSplit.java:
##
@@ -95,6 +125,18 @@ public final SchemalessSnapshotSplit 
toSchemalessSnapshotSplit() {
 tableId, splitId, splitKeyType, splitStart, splitEnd, 
highWatermark);
 }
 
+private static String generateSplitId(TableId tableId, int chunkId) {
+return tableId.toString() + ":" + chunkId;
+}
+
+public static TableId parseTableId(String splitId) {

Review Comment:
   Will `extract` be a more precise name than `parse`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36756][sql-gateway] Bump up the sql gateway rest version [flink]

2024-11-27 Thread via GitHub


flinkbot commented on PR #25705:
URL: https://github.com/apache/flink/pull/25705#issuecomment-2505221909

   
   ## CI report:
   
   * 1e83f1bda02cb512b4b1acb6f5c5758f2bc37e6d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36813) MySQLCDC supports synchronization of specified fields

2024-11-27 Thread Di Wu (Jira)
Di Wu created FLINK-36813:
-

 Summary: MySQLCDC supports synchronization of specified fields
 Key: FLINK-36813
 URL: https://issues.apache.org/jira/browse/FLINK-36813
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.1
Reporter: Di Wu
 Fix For: cdc-3.3.0


*Background*
In some scenarios, MySQL synchronization only expects to synchronize specified 
fields instead of all fields in the table.
1. The user only has the permission for some fields in MySQL
2. The user has too many fields in a single table and only wants to synchronize 
some fields, for example, here 
https://github.com/apache/flink-cdc/discussions/3058

*Current situation*
For the incremental stage, you only need to configure the column.include.list 
property of debezium to support the synchronization of some fields in the 
incremental stage, refer to: 
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list

For the full snapshot stage, * is currently used in 
{_}MySqlSnapshotSplitReadTask{_}, refer to
{code:java}
if (isScanningData) {
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); 
{code}
 

*Solution*
The user configures {_}column.include.list{_}, and then captures the specific 
columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan 
SQL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36813][cdc-connectors][mysql] support mysql sync part columns [flink-cdc]

2024-11-27 Thread via GitHub


JNSimba opened a new pull request, #3767:
URL: https://github.com/apache/flink-cdc/pull/3767

   **Background**
   In some scenarios, MySQL synchronization only expects to synchronize 
specified fields instead of all fields in the table.
   1. The user only has the permission for some fields in MySQL
   2. The user has too many fields in a single table and only wants to 
synchronize some fields, for example, here 
https://github.com/apache/flink-cdc/discussions/3058
   
   **Current situation**
   For the incremental stage, you only need to configure the 
column.include.list property of debezium to support the synchronization of some 
fields in the incremental stage, refer to: 
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list
   
   For the full snapshot stage, * is currently used in 
MySqlSnapshotSplitReadTask, refer to
   ```
   ```
   if (isScanningData) {
   return buildSelectWithRowLimits(
   tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); 

   
   **Solution**
   The user configures column.include.list, and then captures the specific 
columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan 
SQL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36813) MySQLCDC supports synchronization of specified fields

2024-11-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36813:
---
Labels: CDC pull-request-available  (was: CDC)

> MySQLCDC supports synchronization of specified fields
> -
>
> Key: FLINK-36813
> URL: https://issues.apache.org/jira/browse/FLINK-36813
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.1
>Reporter: Di Wu
>Priority: Major
>  Labels: CDC, pull-request-available
> Fix For: cdc-3.3.0
>
>
> *Background*
> In some scenarios, MySQL synchronization only expects to synchronize 
> specified fields instead of all fields in the table.
> 1. The user only has the permission for some fields in MySQL
> 2. The user has too many fields in a single table and only wants to 
> synchronize some fields, for example, here 
> https://github.com/apache/flink-cdc/discussions/3058
> *Current situation*
> For the incremental stage, you only need to configure the column.include.list 
> property of debezium to support the synchronization of some fields in the 
> incremental stage, refer to: 
> https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list
> For the full snapshot stage, * is currently used in 
> {_}MySqlSnapshotSplitReadTask{_}, refer to
> {code:java}
> if (isScanningData) {
> return buildSelectWithRowLimits(
> tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); 
> {code}
>  
> *Solution*
> The user configures {_}column.include.list{_}, and then captures the specific 
> columns in MySqlSnapshotSplitReadTask, and splices them when constructing 
> Scan SQL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]

2024-11-27 Thread via GitHub


snuyanzin commented on PR #24829:
URL: https://github.com/apache/flink/pull/24829#issuecomment-2505382087

   @naferx can you rebase the PR
   I tend to think that the reason of the current failure has already been 
fixed in master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][docs] Fix typo in alter.md [flink]

2024-11-27 Thread via GitHub


snuyanzin commented on code in PR #24858:
URL: https://github.com/apache/flink/pull/24858#discussion_r1861608059


##
docs/content/docs/dev/table/hive-compatibility/hive-dialect/alter.md:
##
@@ -40,7 +40,7 @@ With Hive dialect, the following ALTER statements are 
supported for now:
 -- alter database's properties
 ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES 
(property_name=property_value, ...);
 
--- alter database's localtion
+-- alter database's location

Review Comment:
   Can you also fix the same typo  in Chinese version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on code in PR #766:
URL: https://github.com/apache/flink-web/pull/766#discussion_r1860867485


##
docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Introducing the new Prometheus connector"
+date: "2024-11-26T00:00:00.000Z"
+authors:
+- nicusX:
+  name: "Lorenzo Nicora"
+---
+
+
+We are excited to announce a new sink connector that enables writing data to 
Prometheus 
([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)).
 This articles introduces the main features of the connector, and the reasoning 
behind design decisions.
+
+This connector allows writing data to Prometheus using the 
[Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push 
interface, which lets you write time-series data to Prometheus at scale.
+
+## Motivations for a Prometheus connector
+
+Prometheus is an efficient time-series database optimized for building 
real-time dashboards and alerts, typically in combination with Grafana or other 
visualization tools.
+
+Prometheus is commonly used to monitor compute resources, IT infrastructure, 
Kubernetes clusters, applications, and cloud resources. It can also be used to 
observe your Flink cluster and Flink jobs. Flink existing [Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
 has this purpose. 
+
+So, why do we need a connector?
+
+Prometheus can serve as a general-purpose observability time-series database, 
beyond traditional infrastructure monitoring. For example, it can be used to 
monitor IoT devices, sensors, connected cars, media streaming devices, and any 
resource that streams events or measurements continuously.
+
+Observability data from these use cases differs from metrics generated by 
compute resources. They present additional challenges:
+* **Out-of-order events**: Devices may be connected via mobile networks or 
even Bluetooth. Events from different devices may follow different paths and 
arrive at very different times. A **stateful, event-time logic** can be used to 
reorder them.
+* **High frequency** and **high cardinality**: You can have a sheer number of 
devices, each emitting signals multiple times per second. **Aggregating over 
time** and **over dimensions** can reduce frequency and cardinality and make 
the volume of data more efficiently analysable.
+* **Lack of contextual information**: Raw events sent by the devices often 
lack of contextual information for a meaningful analysis. **Enrichment** of raw 
events, looking up some reference dataset, can be used to add dimensions useful 
for the analysis.
+* **Noise**: sensor measurement may contain noise. For example when a GPS 
tracker lose connection and reports spurious positions. These obvious outliers 
can be **filtered** out to simplify visualization and analysis.
+
+Flink can be used as a pre-processor to address all the above. 
+
+You can implement a sink from scratch or use AsyncIO to call the Prometheus 
Remote-Write endpoint. However, there are not-trivial details to implement an 
efficient Remote-Write client:
+* There is no high-level client for Prometheus Remote-Write. You would need to 
build on top of a low-level HTTP client.
+* Remote-Write can be inefficient unless write requests are batched and 
parallelized.
+* Error handling can be complex, and specifications demand strict behaviors 
(see [Strict Specifications, Lenient 
Implementations](#strict-specifications-lenient-implementations)).
+
+The new Prometheus connector manages all of this for you.
+
+## Key features
+
+The version `1.0.0` of the Prometheus connector has the following features:
+
+* DataStream API, Java Sink, based on AsyncSinkBase.
+* Configurable write batching.
+* Order is retained on retries.
+* At-most-once guarantees (we’ll explain the reason for this later).
+* Supports parallelism greater than 1.
+* Configurable retry strategy for retryable server responses (e.g., 
throttling) and configurable behavior when the retry limit is reached.
+* Configurable behavior on retryable errors (e.g., throttling from the 
Prometheus backend).
+
+### Authentication
+
+Authentication is [explicitly out of 
scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for 
Prometheus Remote-Write specifications.
+The connector provides a generic interface, PrometheusRequestSigner, to 
manipulate requests and add headers. This allows implementation of any 
authentication scheme that requires adding headers to the request, such as API 
keys, authorization, or signature tokens.
+
+In the release `1.0.0`, an implementation for Amazon Managed Service for 
Prometheus (AMP) is provided as a separate, optional dependency.
+
+## Designing the connector
+
+A sink to Prometheus differs from a sink to most other datastores or 
time-series databases. The wire interface is the easiest part; the main 
challenges arise from the diff

Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]

2024-11-27 Thread via GitHub


mehdid93 commented on PR #25670:
URL: https://github.com/apache/flink/pull/25670#issuecomment-2504432014

   @zentol Thank you! I have pushed a commit to use the new Image in order to 
test the changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on code in PR #766:
URL: https://github.com/apache/flink-web/pull/766#discussion_r1860846971


##
docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Introducing the new Prometheus connector"
+date: "2024-11-26T00:00:00.000Z"
+authors:
+- nicusX:
+  name: "Lorenzo Nicora"
+---
+
+
+We are excited to announce a new sink connector that enables writing data to 
Prometheus 
([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)).
 This articles introduces the main features of the connector, and the reasoning 
behind design decisions.
+
+This connector allows writing data to Prometheus using the 
[Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push 
interface, which lets you write time-series data to Prometheus at scale.
+
+## Motivations for a Prometheus connector
+
+Prometheus is an efficient time-series database optimized for building 
real-time dashboards and alerts, typically in combination with Grafana or other 
visualization tools.
+
+Prometheus is commonly used to monitor compute resources, IT infrastructure, 
Kubernetes clusters, applications, and cloud resources. It can also be used to 
observe your Flink cluster and Flink jobs. Flink existing [Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
 has this purpose. 
+
+So, why do we need a connector?
+
+Prometheus can serve as a general-purpose observability time-series database, 
beyond traditional infrastructure monitoring. For example, it can be used to 
monitor IoT devices, sensors, connected cars, media streaming devices, and any 
resource that streams events or measurements continuously.
+
+Observability data from these use cases differs from metrics generated by 
compute resources. They present additional challenges:
+* **Out-of-order events**: Devices may be connected via mobile networks or 
even Bluetooth. Events from different devices may follow different paths and 
arrive at very different times. A **stateful, event-time logic** can be used to 
reorder them.
+* **High frequency** and **high cardinality**: You can have a sheer number of 
devices, each emitting signals multiple times per second. **Aggregating over 
time** and **over dimensions** can reduce frequency and cardinality and make 
the volume of data more efficiently analysable.
+* **Lack of contextual information**: Raw events sent by the devices often 
lack of contextual information for a meaningful analysis. **Enrichment** of raw 
events, looking up some reference dataset, can be used to add dimensions useful 
for the analysis.
+* **Noise**: sensor measurement may contain noise. For example when a GPS 
tracker lose connection and reports spurious positions. These obvious outliers 
can be **filtered** out to simplify visualization and analysis.
+
+Flink can be used as a pre-processor to address all the above. 
+
+You can implement a sink from scratch or use AsyncIO to call the Prometheus 
Remote-Write endpoint. However, there are not-trivial details to implement an 
efficient Remote-Write client:
+* There is no high-level client for Prometheus Remote-Write. You would need to 
build on top of a low-level HTTP client.
+* Remote-Write can be inefficient unless write requests are batched and 
parallelized.
+* Error handling can be complex, and specifications demand strict behaviors 
(see [Strict Specifications, Lenient 
Implementations](#strict-specifications-lenient-implementations)).
+
+The new Prometheus connector manages all of this for you.
+
+## Key features
+
+The version `1.0.0` of the Prometheus connector has the following features:
+
+* DataStream API, Java Sink, based on AsyncSinkBase.
+* Configurable write batching.
+* Order is retained on retries.
+* At-most-once guarantees (we’ll explain the reason for this later).
+* Supports parallelism greater than 1.
+* Configurable retry strategy for retryable server responses (e.g., 
throttling) and configurable behavior when the retry limit is reached.
+* Configurable behavior on retryable errors (e.g., throttling from the 
Prometheus backend).

Review Comment:
   Yes! removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-27 Thread via GitHub


flinkbot commented on PR #25703:
URL: https://github.com/apache/flink/pull/25703#issuecomment-2504141228

   
   ## CI report:
   
   * b366d760dc117aa17cc528e2be5070b2f9036f4a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on code in PR #766:
URL: https://github.com/apache/flink-web/pull/766#discussion_r1860854487


##
docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Introducing the new Prometheus connector"
+date: "2024-11-26T00:00:00.000Z"
+authors:
+- nicusX:
+  name: "Lorenzo Nicora"
+---
+
+
+We are excited to announce a new sink connector that enables writing data to 
Prometheus 
([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)).
 This articles introduces the main features of the connector, and the reasoning 
behind design decisions.
+
+This connector allows writing data to Prometheus using the 
[Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push 
interface, which lets you write time-series data to Prometheus at scale.
+
+## Motivations for a Prometheus connector
+
+Prometheus is an efficient time-series database optimized for building 
real-time dashboards and alerts, typically in combination with Grafana or other 
visualization tools.
+
+Prometheus is commonly used to monitor compute resources, IT infrastructure, 
Kubernetes clusters, applications, and cloud resources. It can also be used to 
observe your Flink cluster and Flink jobs. Flink existing [Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
 has this purpose. 
+
+So, why do we need a connector?
+
+Prometheus can serve as a general-purpose observability time-series database, 
beyond traditional infrastructure monitoring. For example, it can be used to 
monitor IoT devices, sensors, connected cars, media streaming devices, and any 
resource that streams events or measurements continuously.
+
+Observability data from these use cases differs from metrics generated by 
compute resources. They present additional challenges:
+* **Out-of-order events**: Devices may be connected via mobile networks or 
even Bluetooth. Events from different devices may follow different paths and 
arrive at very different times. A **stateful, event-time logic** can be used to 
reorder them.
+* **High frequency** and **high cardinality**: You can have a sheer number of 
devices, each emitting signals multiple times per second. **Aggregating over 
time** and **over dimensions** can reduce frequency and cardinality and make 
the volume of data more efficiently analysable.
+* **Lack of contextual information**: Raw events sent by the devices often 
lack of contextual information for a meaningful analysis. **Enrichment** of raw 
events, looking up some reference dataset, can be used to add dimensions useful 
for the analysis.
+* **Noise**: sensor measurement may contain noise. For example when a GPS 
tracker lose connection and reports spurious positions. These obvious outliers 
can be **filtered** out to simplify visualization and analysis.
+
+Flink can be used as a pre-processor to address all the above. 
+
+You can implement a sink from scratch or use AsyncIO to call the Prometheus 
Remote-Write endpoint. However, there are not-trivial details to implement an 
efficient Remote-Write client:
+* There is no high-level client for Prometheus Remote-Write. You would need to 
build on top of a low-level HTTP client.
+* Remote-Write can be inefficient unless write requests are batched and 
parallelized.
+* Error handling can be complex, and specifications demand strict behaviors 
(see [Strict Specifications, Lenient 
Implementations](#strict-specifications-lenient-implementations)).
+
+The new Prometheus connector manages all of this for you.
+
+## Key features
+
+The version `1.0.0` of the Prometheus connector has the following features:
+
+* DataStream API, Java Sink, based on AsyncSinkBase.
+* Configurable write batching.
+* Order is retained on retries.
+* At-most-once guarantees (we’ll explain the reason for this later).
+* Supports parallelism greater than 1.
+* Configurable retry strategy for retryable server responses (e.g., 
throttling) and configurable behavior when the retry limit is reached.
+* Configurable behavior on retryable errors (e.g., throttling from the 
Prometheus backend).
+
+### Authentication
+
+Authentication is [explicitly out of 
scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for 
Prometheus Remote-Write specifications.
+The connector provides a generic interface, PrometheusRequestSigner, to 
manipulate requests and add headers. This allows implementation of any 
authentication scheme that requires adding headers to the request, such as API 
keys, authorization, or signature tokens.
+
+In the release `1.0.0`, an implementation for Amazon Managed Service for 
Prometheus (AMP) is provided as a separate, optional dependency.
+
+## Designing the connector
+
+A sink to Prometheus differs from a sink to most other datastores or 
time-series databases. The wire interface is the easiest part; the main 
challenges arise from the diff

Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on code in PR #766:
URL: https://github.com/apache/flink-web/pull/766#discussion_r1860851806


##
docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Introducing the new Prometheus connector"
+date: "2024-11-26T00:00:00.000Z"
+authors:
+- nicusX:
+  name: "Lorenzo Nicora"
+---
+
+
+We are excited to announce a new sink connector that enables writing data to 
Prometheus 
([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)).
 This articles introduces the main features of the connector, and the reasoning 
behind design decisions.
+
+This connector allows writing data to Prometheus using the 
[Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push 
interface, which lets you write time-series data to Prometheus at scale.
+
+## Motivations for a Prometheus connector
+
+Prometheus is an efficient time-series database optimized for building 
real-time dashboards and alerts, typically in combination with Grafana or other 
visualization tools.
+
+Prometheus is commonly used to monitor compute resources, IT infrastructure, 
Kubernetes clusters, applications, and cloud resources. It can also be used to 
observe your Flink cluster and Flink jobs. Flink existing [Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
 has this purpose. 
+
+So, why do we need a connector?
+
+Prometheus can serve as a general-purpose observability time-series database, 
beyond traditional infrastructure monitoring. For example, it can be used to 
monitor IoT devices, sensors, connected cars, media streaming devices, and any 
resource that streams events or measurements continuously.
+
+Observability data from these use cases differs from metrics generated by 
compute resources. They present additional challenges:
+* **Out-of-order events**: Devices may be connected via mobile networks or 
even Bluetooth. Events from different devices may follow different paths and 
arrive at very different times. A **stateful, event-time logic** can be used to 
reorder them.
+* **High frequency** and **high cardinality**: You can have a sheer number of 
devices, each emitting signals multiple times per second. **Aggregating over 
time** and **over dimensions** can reduce frequency and cardinality and make 
the volume of data more efficiently analysable.
+* **Lack of contextual information**: Raw events sent by the devices often 
lack of contextual information for a meaningful analysis. **Enrichment** of raw 
events, looking up some reference dataset, can be used to add dimensions useful 
for the analysis.
+* **Noise**: sensor measurement may contain noise. For example when a GPS 
tracker lose connection and reports spurious positions. These obvious outliers 
can be **filtered** out to simplify visualization and analysis.
+
+Flink can be used as a pre-processor to address all the above. 
+
+You can implement a sink from scratch or use AsyncIO to call the Prometheus 
Remote-Write endpoint. However, there are not-trivial details to implement an 
efficient Remote-Write client:
+* There is no high-level client for Prometheus Remote-Write. You would need to 
build on top of a low-level HTTP client.
+* Remote-Write can be inefficient unless write requests are batched and 
parallelized.
+* Error handling can be complex, and specifications demand strict behaviors 
(see [Strict Specifications, Lenient 
Implementations](#strict-specifications-lenient-implementations)).
+
+The new Prometheus connector manages all of this for you.

Review Comment:
   Good point. I will try to clarify, hoping not to be too verbose.
   But the bottomline is that this is **data** from the point of view of the 
application 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on code in PR #766:
URL: https://github.com/apache/flink-web/pull/766#discussion_r1860853048


##
docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Introducing the new Prometheus connector"
+date: "2024-11-26T00:00:00.000Z"
+authors:
+- nicusX:
+  name: "Lorenzo Nicora"
+---
+
+
+We are excited to announce a new sink connector that enables writing data to 
Prometheus 
([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)).
 This articles introduces the main features of the connector, and the reasoning 
behind design decisions.
+
+This connector allows writing data to Prometheus using the 
[Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push 
interface, which lets you write time-series data to Prometheus at scale.
+
+## Motivations for a Prometheus connector
+
+Prometheus is an efficient time-series database optimized for building 
real-time dashboards and alerts, typically in combination with Grafana or other 
visualization tools.
+
+Prometheus is commonly used to monitor compute resources, IT infrastructure, 
Kubernetes clusters, applications, and cloud resources. It can also be used to 
observe your Flink cluster and Flink jobs. Flink existing [Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
 has this purpose. 
+
+So, why do we need a connector?
+
+Prometheus can serve as a general-purpose observability time-series database, 
beyond traditional infrastructure monitoring. For example, it can be used to 
monitor IoT devices, sensors, connected cars, media streaming devices, and any 
resource that streams events or measurements continuously.
+
+Observability data from these use cases differs from metrics generated by 
compute resources. They present additional challenges:
+* **Out-of-order events**: Devices may be connected via mobile networks or 
even Bluetooth. Events from different devices may follow different paths and 
arrive at very different times. A **stateful, event-time logic** can be used to 
reorder them.
+* **High frequency** and **high cardinality**: You can have a sheer number of 
devices, each emitting signals multiple times per second. **Aggregating over 
time** and **over dimensions** can reduce frequency and cardinality and make 
the volume of data more efficiently analysable.
+* **Lack of contextual information**: Raw events sent by the devices often 
lack of contextual information for a meaningful analysis. **Enrichment** of raw 
events, looking up some reference dataset, can be used to add dimensions useful 
for the analysis.
+* **Noise**: sensor measurement may contain noise. For example when a GPS 
tracker lose connection and reports spurious positions. These obvious outliers 
can be **filtered** out to simplify visualization and analysis.
+
+Flink can be used as a pre-processor to address all the above. 
+
+You can implement a sink from scratch or use AsyncIO to call the Prometheus 
Remote-Write endpoint. However, there are not-trivial details to implement an 
efficient Remote-Write client:
+* There is no high-level client for Prometheus Remote-Write. You would need to 
build on top of a low-level HTTP client.
+* Remote-Write can be inefficient unless write requests are batched and 
parallelized.
+* Error handling can be complex, and specifications demand strict behaviors 
(see [Strict Specifications, Lenient 
Implementations](#strict-specifications-lenient-implementations)).
+
+The new Prometheus connector manages all of this for you.
+
+## Key features
+
+The version `1.0.0` of the Prometheus connector has the following features:
+
+* DataStream API, Java Sink, based on AsyncSinkBase.
+* Configurable write batching.
+* Order is retained on retries.
+* At-most-once guarantees (we’ll explain the reason for this later).
+* Supports parallelism greater than 1.
+* Configurable retry strategy for retryable server responses (e.g., 
throttling) and configurable behavior when the retry limit is reached.
+* Configurable behavior on retryable errors (e.g., throttling from the 
Prometheus backend).
+
+### Authentication
+
+Authentication is [explicitly out of 
scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for 
Prometheus Remote-Write specifications.
+The connector provides a generic interface, PrometheusRequestSigner, to 
manipulate requests and add headers. This allows implementation of any 
authentication scheme that requires adding headers to the request, such as API 
keys, authorization, or signature tokens.
+
+In the release `1.0.0`, an implementation for Amazon Managed Service for 
Prometheus (AMP) is provided as a separate, optional dependency.
+
+## Designing the connector
+
+A sink to Prometheus differs from a sink to most other datastores or 
time-series databases. The wire interface is the easiest part; the main 
challenges arise from the diff

Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-27 Thread via GitHub


gustavodemorais closed pull request #25703: [FLINK-36809][table] Support 
ignoreIfExists param for createTable
URL: https://github.com/apache/flink/pull/25703


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]

2024-11-27 Thread via GitHub


zentol commented on PR #25670:
URL: https://github.com/apache/flink/pull/25670#issuecomment-2504396199

   @mehdid93 You can try your changes with 
[chesnay/flink-ci:java_8_11_17_21_maven_386_jammy](https://hub.docker.com/layers/chesnay/flink-ci/java_8_11_17_21_maven_386_jammy/images/sha256-85838a97ac7151fac2493de5df7dd8fbb02e3ba1d48d7f64b654866deed07448?context=explore).
   
   I'll open a PR tomorrow to update CI as a whole to use that image.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update blogpost for new KinesisStreamsSource and DynamoDbStreamsSource [flink-web]

2024-11-27 Thread via GitHub


hlteoh37 merged PR #767:
URL: https://github.com/apache/flink-web/pull/767


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33566) HBase sql-connector needs overwrite the rowKey

2024-11-27 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-33566:
-
Fix Version/s: hbase-4.1.0
   (was: hbase-3.0.0)

> HBase sql-connector needs overwrite the rowKey
> --
>
> Key: FLINK-33566
> URL: https://issues.apache.org/jira/browse/FLINK-33566
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase
>Affects Versions: 1.18.0
> Environment: flink: 1.18.0
> hbase: 2.2.3
> flink-connector-hbase-2.2:3.0.0-1.18
>Reporter: JankoWilliam
>Priority: Major
>  Labels: HBase-2.0, flink-connector-hbase, pull-request-available
> Fix For: hbase-4.1.0
>
>
> When I want to write label values of 50+to a rowkey column family in HBase 
> (all values are label values of 0/1), for example:
> {"id":"","q1":"0","q2":"1","q3":"0","q4":"1",...,"q49":"0","q50":"1"}
> Here are four label values for example:
> {"id":"","q1":"0","q2":"1","q3":"0","q4":"1"}
> {code:java}
> --source:
> CREATE TABLE kafka_table_(
>  `id` STRING,
>  `q1` STRING,
>  `q2` STRING,
>  `q3` STRING,
>  `q4` STRING
> )  WITH (
> ...
>    'connector'='kafka',
>    'format'='json',
> ...
> );
> --sink:
> CREATE TABLE hbase_table_ (
>  rowkey  STRING,
>  cf ROW,
>  PRIMARY KEY (rowkey ) NOT ENFORCED
> ) WITH (
>   'connector' = 'my-hbase-2.2',
>   'table-name' = 'test_table',
>   'zookeeper.quorum' = '127.0.0.1'
> );
> --insert:
> insert into hbase_table_ 
> select 
>  id AS rowkey ,
>  ROW( cast(q1 as INT),cast(q2 as INT),cast(q3 as INT),cast(q4 as INT)) as cf
>  from kafka_table_  ;{code}
> hbase:
> hbase(main):016:0> scan 'test_table'
> ROW                                                        COLUMN+CELL        
>                                                                               
>                                                                           
>                                                        column=cf:q1, 
> timestamp=1, value=\x00\x00\x00\x00                               
>                                                                               
>  
>                                                        column=cf:q2, 
> timestamp=1, value=\x00\x00\x00\x01                               
>                                                                               
>  
>                                                        column=cf:q3, 
> timestamp=1, value=\x00\x00\x00\x00                               
>                                                                               
>  
>                                                        column=cf:q4, 
> timestamp=1, value=\x00\x00\x00\x01 
>  
> Upstream data has a fixed value of 50+k-v data, among which very few value 
> values are 1 (the default label value is 0). For example, only 1 or 2 values 
> are 1: q2=1, q4=1, so I want HBase to store the following values:
> hbase(main):016:0> scan 'test_table'
> ROW                                                        COLUMN+CELL        
>                                                                               
>                                                                           
>                                                        column=cf:q2, 
> timestamp=1, value=\x00\x00\x00\x01                               
>                                                                               
>                                                        column=cf:q4, 
> timestamp=1, value=\x00\x00\x00\x01 
>  
> When I use the "sink. ignore null value" keyword here, It just don't update 
> the null value, and downstream third parties will still read all the values 
> (such as 50+), but there are only 2 values that are truly 1:
> {code:java}
> --sink:
> CREATE TABLE hbase_table_ (
>  rowkey  STRING,
>  cf ROW,
>  PRIMARY KEY (rowkey ) NOT ENFORCED
> ) WITH (
>   'connector' = 'my-hbase-2.2',
>   'table-name' = 'test_table',
>   'sink.ignore-null-value' = 'true',
>   'zookeeper.quorum' = '127.0.0.1'
> ); 
> --insert:
> insert into hbase_table_ 
> select 
>  id AS rowkey ,
>  ROW(
>  case when q1 <> '0' then cast(q1 as INT) else null end,
>  case when q2 <> '0' then cast(q2 as INT) else null end,
>  case when q3 <> '0' then cast(q3 as INT) else null end,
>  case when q4 <> '0' then cast(q4 as INT) else null end 
> ) as cf
>  from kafka_table_ ; {code}
> hbase(main):016:0> scan 'test_table'
> ROW                                                        COLUMN+CELL        
>                                                                               
>                                                                           
>                                    

[PR] [FLINK-35136][docs] Bump HBase connector docs version to v4.0 [flink]

2024-11-27 Thread via GitHub


ferenc-csaky opened a new pull request, #25702:
URL: https://github.com/apache/flink/pull/25702

   Simple release chore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-27 Thread via GitHub


flinkbot commented on PR #25704:
URL: https://github.com/apache/flink/pull/25704#issuecomment-2504208974

   
   ## CI report:
   
   * d2bcde1986b8a02f114f9ff5556aae73919dcf7d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on code in PR #766:
URL: https://github.com/apache/flink-web/pull/766#discussion_r1860897153


##
docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Introducing the new Prometheus connector"
+date: "2024-11-26T00:00:00.000Z"
+authors:
+- nicusX:
+  name: "Lorenzo Nicora"
+---
+
+
+We are excited to announce a new sink connector that enables writing data to 
Prometheus 
([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)).
 This articles introduces the main features of the connector, and the reasoning 
behind design decisions.
+
+This connector allows writing data to Prometheus using the 
[Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push 
interface, which lets you write time-series data to Prometheus at scale.
+
+## Motivations for a Prometheus connector
+
+Prometheus is an efficient time-series database optimized for building 
real-time dashboards and alerts, typically in combination with Grafana or other 
visualization tools.
+
+Prometheus is commonly used to monitor compute resources, IT infrastructure, 
Kubernetes clusters, applications, and cloud resources. It can also be used to 
observe your Flink cluster and Flink jobs. Flink existing [Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
 has this purpose. 
+
+So, why do we need a connector?
+
+Prometheus can serve as a general-purpose observability time-series database, 
beyond traditional infrastructure monitoring. For example, it can be used to 
monitor IoT devices, sensors, connected cars, media streaming devices, and any 
resource that streams events or measurements continuously.
+
+Observability data from these use cases differs from metrics generated by 
compute resources. They present additional challenges:
+* **Out-of-order events**: Devices may be connected via mobile networks or 
even Bluetooth. Events from different devices may follow different paths and 
arrive at very different times. A **stateful, event-time logic** can be used to 
reorder them.
+* **High frequency** and **high cardinality**: You can have a sheer number of 
devices, each emitting signals multiple times per second. **Aggregating over 
time** and **over dimensions** can reduce frequency and cardinality and make 
the volume of data more efficiently analysable.
+* **Lack of contextual information**: Raw events sent by the devices often 
lack of contextual information for a meaningful analysis. **Enrichment** of raw 
events, looking up some reference dataset, can be used to add dimensions useful 
for the analysis.
+* **Noise**: sensor measurement may contain noise. For example when a GPS 
tracker lose connection and reports spurious positions. These obvious outliers 
can be **filtered** out to simplify visualization and analysis.
+
+Flink can be used as a pre-processor to address all the above. 
+
+You can implement a sink from scratch or use AsyncIO to call the Prometheus 
Remote-Write endpoint. However, there are not-trivial details to implement an 
efficient Remote-Write client:
+* There is no high-level client for Prometheus Remote-Write. You would need to 
build on top of a low-level HTTP client.
+* Remote-Write can be inefficient unless write requests are batched and 
parallelized.
+* Error handling can be complex, and specifications demand strict behaviors 
(see [Strict Specifications, Lenient 
Implementations](#strict-specifications-lenient-implementations)).
+
+The new Prometheus connector manages all of this for you.
+
+## Key features
+
+The version `1.0.0` of the Prometheus connector has the following features:
+
+* DataStream API, Java Sink, based on AsyncSinkBase.
+* Configurable write batching.
+* Order is retained on retries.
+* At-most-once guarantees (we’ll explain the reason for this later).
+* Supports parallelism greater than 1.
+* Configurable retry strategy for retryable server responses (e.g., 
throttling) and configurable behavior when the retry limit is reached.
+* Configurable behavior on retryable errors (e.g., throttling from the 
Prometheus backend).
+
+### Authentication
+
+Authentication is [explicitly out of 
scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for 
Prometheus Remote-Write specifications.
+The connector provides a generic interface, PrometheusRequestSigner, to 
manipulate requests and add headers. This allows implementation of any 
authentication scheme that requires adding headers to the request, such as API 
keys, authorization, or signature tokens.
+
+In the release `1.0.0`, an implementation for Amazon Managed Service for 
Prometheus (AMP) is provided as a separate, optional dependency.
+
+## Designing the connector
+
+A sink to Prometheus differs from a sink to most other datastores or 
time-series databases. The wire interface is the easiest part; the main 
challenges arise from the diff

Re: [PR] [FLINK-35136][docs] Bump HBase connector docs version to v4.0 [flink]

2024-11-27 Thread via GitHub


ferenc-csaky commented on PR #25697:
URL: https://github.com/apache/flink/pull/25697#issuecomment-2504020451

   @flinkbot run azure 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36808) UNION ALL after lookup join produces unexpected results

2024-11-27 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-36808:
--
Description: 
Here is the SQL to reproduce the issue:
{code:java}
-- Data of table `stream`:
-- (1, Alice)
-- (2, Bob)
CREATE TEMPORARY TABLE `stream` (
`id` BIGINT,
`name` STRING,
`txn_time` as proctime(),
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'stream',
  'username' = 'postgres',
  'password' = 'postgres'
);

-- Data of table `dim`:
-- (1, OK)
-- (2, OK)
CREATE TEMPORARY TABLE `dim` (
`id` BIGINT,
`status` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'dim',
  'username' = 'postgres',
  'password' = 'postgres'
);

-- Lookup join two tables twice with different filter, and union them together
SELECT
 s.id,
 s.name,
 s.txn_time,
 d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
`d`
ON
 `s`.`id` = `d`.`id`
WHERE
 `d`.`status` = 'OK' 
UNION ALL
SELECT
 s.id,
 s.name,
 s.txn_time,
 d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
`d`
ON
 `s`.`id` = `d`.`id`
WHERE
 `d`.`status` = 'NOT_EXISTS';{code}
The first lookup join should output:
{code:java}
(1, Alice 2024-11-27 11:52:19.332, OK)
(2, Bob 2024-11-27 11:52:19.332, OK) {code}
The second lookup join should output nothing, as there's not status 
'NOT_EXISTS'.

But the result after union is:
{code:java}
1, Alice, 2024-11-27 11:52:19.332, OK
2, Bob, 2024-11-27 11:52:19.332, OK
1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS
2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code}
There shouldn't be any 'NOT_EXISTS's. 

The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are 
appended directly by the calc after the lookup join operation, which is not as 
expected. 
{code:java}
| == Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
:  +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')])
:     +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 2}])
:        :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
:        :  +- LogicalTableScan(table=[[default_catalog, default_database, 
stream]])
:        +- LogicalFilter(condition=[=($cor0.id, $0)])
:           +- LogicalSnapshot(period=[$cor0.txn_time])
:              +- LogicalTableScan(table=[[default_catalog, default_database, 
dim]])
+- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
   +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')])
      +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{0, 2}])
         :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
         :  +- LogicalTableScan(table=[[default_catalog, default_database, 
stream]])
         +- LogicalFilter(condition=[=($cor1.id, $0)])
            +- LogicalSnapshot(period=[$cor1.txn_time])
               +- LogicalTableScan(table=[[default_catalog, default_database, 
dim]])

== Optimized Physical Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
   :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 
status])
   :  +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
   :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
   :        +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])
   +- Calc(select=[id, name, txn_time, 
CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status])
      +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
         +- Calc(select=[id, name, PROCTIME() AS txn_time])
            +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])

== Optimized Execution Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
   :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS 
status])
   :  +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, 
id])(reuse_id=[1])
   :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
   :        +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])
   +- Calc(select=

Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-27 Thread via GitHub


gustavodemorais commented on PR #25703:
URL: https://github.com/apache/flink/pull/25703#issuecomment-2504152111

   Forgot one detail, let me open that again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on code in PR #766:
URL: https://github.com/apache/flink-web/pull/766#discussion_r1860859551


##
docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Introducing the new Prometheus connector"
+date: "2024-11-26T00:00:00.000Z"
+authors:
+- nicusX:
+  name: "Lorenzo Nicora"
+---
+
+
+We are excited to announce a new sink connector that enables writing data to 
Prometheus 
([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)).
 This articles introduces the main features of the connector, and the reasoning 
behind design decisions.
+
+This connector allows writing data to Prometheus using the 
[Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push 
interface, which lets you write time-series data to Prometheus at scale.
+
+## Motivations for a Prometheus connector
+
+Prometheus is an efficient time-series database optimized for building 
real-time dashboards and alerts, typically in combination with Grafana or other 
visualization tools.
+
+Prometheus is commonly used to monitor compute resources, IT infrastructure, 
Kubernetes clusters, applications, and cloud resources. It can also be used to 
observe your Flink cluster and Flink jobs. Flink existing [Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
 has this purpose. 
+
+So, why do we need a connector?
+
+Prometheus can serve as a general-purpose observability time-series database, 
beyond traditional infrastructure monitoring. For example, it can be used to 
monitor IoT devices, sensors, connected cars, media streaming devices, and any 
resource that streams events or measurements continuously.
+
+Observability data from these use cases differs from metrics generated by 
compute resources. They present additional challenges:
+* **Out-of-order events**: Devices may be connected via mobile networks or 
even Bluetooth. Events from different devices may follow different paths and 
arrive at very different times. A **stateful, event-time logic** can be used to 
reorder them.
+* **High frequency** and **high cardinality**: You can have a sheer number of 
devices, each emitting signals multiple times per second. **Aggregating over 
time** and **over dimensions** can reduce frequency and cardinality and make 
the volume of data more efficiently analysable.
+* **Lack of contextual information**: Raw events sent by the devices often 
lack of contextual information for a meaningful analysis. **Enrichment** of raw 
events, looking up some reference dataset, can be used to add dimensions useful 
for the analysis.
+* **Noise**: sensor measurement may contain noise. For example when a GPS 
tracker lose connection and reports spurious positions. These obvious outliers 
can be **filtered** out to simplify visualization and analysis.
+
+Flink can be used as a pre-processor to address all the above. 
+
+You can implement a sink from scratch or use AsyncIO to call the Prometheus 
Remote-Write endpoint. However, there are not-trivial details to implement an 
efficient Remote-Write client:
+* There is no high-level client for Prometheus Remote-Write. You would need to 
build on top of a low-level HTTP client.
+* Remote-Write can be inefficient unless write requests are batched and 
parallelized.
+* Error handling can be complex, and specifications demand strict behaviors 
(see [Strict Specifications, Lenient 
Implementations](#strict-specifications-lenient-implementations)).
+
+The new Prometheus connector manages all of this for you.
+
+## Key features
+
+The version `1.0.0` of the Prometheus connector has the following features:
+
+* DataStream API, Java Sink, based on AsyncSinkBase.
+* Configurable write batching.
+* Order is retained on retries.
+* At-most-once guarantees (we’ll explain the reason for this later).
+* Supports parallelism greater than 1.
+* Configurable retry strategy for retryable server responses (e.g., 
throttling) and configurable behavior when the retry limit is reached.
+* Configurable behavior on retryable errors (e.g., throttling from the 
Prometheus backend).
+
+### Authentication
+
+Authentication is [explicitly out of 
scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for 
Prometheus Remote-Write specifications.
+The connector provides a generic interface, PrometheusRequestSigner, to 
manipulate requests and add headers. This allows implementation of any 
authentication scheme that requires adding headers to the request, such as API 
keys, authorization, or signature tokens.
+
+In the release `1.0.0`, an implementation for Amazon Managed Service for 
Prometheus (AMP) is provided as a separate, optional dependency.
+
+## Designing the connector
+
+A sink to Prometheus differs from a sink to most other datastores or 
time-series databases. The wire interface is the easiest part; the main 
challenges arise from the diff

Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on code in PR #766:
URL: https://github.com/apache/flink-web/pull/766#discussion_r1860866006


##
docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Introducing the new Prometheus connector"
+date: "2024-11-26T00:00:00.000Z"
+authors:
+- nicusX:
+  name: "Lorenzo Nicora"
+---
+
+
+We are excited to announce a new sink connector that enables writing data to 
Prometheus 
([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)).
 This articles introduces the main features of the connector, and the reasoning 
behind design decisions.
+
+This connector allows writing data to Prometheus using the 
[Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push 
interface, which lets you write time-series data to Prometheus at scale.
+
+## Motivations for a Prometheus connector
+
+Prometheus is an efficient time-series database optimized for building 
real-time dashboards and alerts, typically in combination with Grafana or other 
visualization tools.
+
+Prometheus is commonly used to monitor compute resources, IT infrastructure, 
Kubernetes clusters, applications, and cloud resources. It can also be used to 
observe your Flink cluster and Flink jobs. Flink existing [Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
 has this purpose. 
+
+So, why do we need a connector?
+
+Prometheus can serve as a general-purpose observability time-series database, 
beyond traditional infrastructure monitoring. For example, it can be used to 
monitor IoT devices, sensors, connected cars, media streaming devices, and any 
resource that streams events or measurements continuously.
+
+Observability data from these use cases differs from metrics generated by 
compute resources. They present additional challenges:
+* **Out-of-order events**: Devices may be connected via mobile networks or 
even Bluetooth. Events from different devices may follow different paths and 
arrive at very different times. A **stateful, event-time logic** can be used to 
reorder them.
+* **High frequency** and **high cardinality**: You can have a sheer number of 
devices, each emitting signals multiple times per second. **Aggregating over 
time** and **over dimensions** can reduce frequency and cardinality and make 
the volume of data more efficiently analysable.
+* **Lack of contextual information**: Raw events sent by the devices often 
lack of contextual information for a meaningful analysis. **Enrichment** of raw 
events, looking up some reference dataset, can be used to add dimensions useful 
for the analysis.
+* **Noise**: sensor measurement may contain noise. For example when a GPS 
tracker lose connection and reports spurious positions. These obvious outliers 
can be **filtered** out to simplify visualization and analysis.
+
+Flink can be used as a pre-processor to address all the above. 
+
+You can implement a sink from scratch or use AsyncIO to call the Prometheus 
Remote-Write endpoint. However, there are not-trivial details to implement an 
efficient Remote-Write client:
+* There is no high-level client for Prometheus Remote-Write. You would need to 
build on top of a low-level HTTP client.
+* Remote-Write can be inefficient unless write requests are batched and 
parallelized.
+* Error handling can be complex, and specifications demand strict behaviors 
(see [Strict Specifications, Lenient 
Implementations](#strict-specifications-lenient-implementations)).
+
+The new Prometheus connector manages all of this for you.
+
+## Key features
+
+The version `1.0.0` of the Prometheus connector has the following features:
+
+* DataStream API, Java Sink, based on AsyncSinkBase.
+* Configurable write batching.
+* Order is retained on retries.
+* At-most-once guarantees (we’ll explain the reason for this later).
+* Supports parallelism greater than 1.
+* Configurable retry strategy for retryable server responses (e.g., 
throttling) and configurable behavior when the retry limit is reached.
+* Configurable behavior on retryable errors (e.g., throttling from the 
Prometheus backend).
+
+### Authentication
+
+Authentication is [explicitly out of 
scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for 
Prometheus Remote-Write specifications.
+The connector provides a generic interface, PrometheusRequestSigner, to 
manipulate requests and add headers. This allows implementation of any 
authentication scheme that requires adding headers to the request, such as API 
keys, authorization, or signature tokens.
+
+In the release `1.0.0`, an implementation for Amazon Managed Service for 
Prometheus (AMP) is provided as a separate, optional dependency.
+
+## Designing the connector
+
+A sink to Prometheus differs from a sink to most other datastores or 
time-series databases. The wire interface is the easiest part; the main 
challenges arise from the diff

Re: [PR] [FLINK-36595][docs] Explicitly set connector compatibility as string to prevent version comparison mismatch [flink-connector-hbase]

2024-11-27 Thread via GitHub


ferenc-csaky merged PR #53:
URL: https://github.com/apache/flink-connector-hbase/pull/53


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36595][docs] Explicitly set connector compatibility as string … [flink-connector-hbase]

2024-11-27 Thread via GitHub


ferenc-csaky commented on PR #52:
URL: 
https://github.com/apache/flink-connector-hbase/pull/52#issuecomment-2504645328

   Closed, as it is obsolete now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36595][docs] Explicitly set connector compatibility as string to prevent version comparison mismatch [flink-connector-hbase]

2024-11-27 Thread via GitHub


boring-cyborg[bot] commented on PR #53:
URL: 
https://github.com/apache/flink-connector-hbase/pull/53#issuecomment-2504644678

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36595][docs] Explicitly set connector compatibility as string … [flink-connector-hbase]

2024-11-27 Thread via GitHub


ferenc-csaky closed pull request #52: [FLINK-36595][docs] Explicitly set 
connector compatibility as string …
URL: https://github.com/apache/flink-connector-hbase/pull/52


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35315) MemoryManagerConcurrentModReleaseTest executes more than 15 minutes

2024-11-27 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-35315:
--
Priority: Critical  (was: Major)

> MemoryManagerConcurrentModReleaseTest executes more than 15 minutes
> ---
>
> Key: FLINK-35315
> URL: https://issues.apache.org/jira/browse/FLINK-35315
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Tests
>Affects Versions: 1.20.0, 2.0-preview
>Reporter: Rui Fan
>Priority: Critical
>  Labels: test-stability
> Fix For: 2.0.0
>
> Attachments: image-2024-05-09-11-53-10-037.png
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59395&view=results]
>  
> It seems 
> MemoryManagerConcurrentModReleaseTest.testConcurrentModificationWhileReleasing
>  executes more than 15 minutes.
> The root cause may be {color:#e1dfdd}ConcurrentModificationException{color}
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59395&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=10060]
>  
> !image-2024-05-09-11-53-10-037.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35136][docs] Bump HBase connector docs version to v4.0 [flink]

2024-11-27 Thread via GitHub


flinkbot commented on PR #25702:
URL: https://github.com/apache/flink/pull/25702#issuecomment-2504107185

   
   ## CI report:
   
   * 41b6582f692f590ce71d2721306a8c06d3be97ee UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36810) Reflective access warning not addressed causing Flink failing to process job creation

2024-11-27 Thread Yusu Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901531#comment-17901531
 ] 

Yusu Gao commented on FLINK-36810:
--

[~tomncooper] 

Thank you for pointing out that one. I think we missed this list. 

Now my question becomes, is there any plan or any work in progress to refactor 
the code not to use so many reflections? Certain open-ups from this list like 
sun.security.tools are very concerning in terms of security. 

> Reflective access warning not addressed causing Flink failing to process job 
> creation
> -
>
> Key: FLINK-36810
> URL: https://issues.apache.org/jira/browse/FLINK-36810
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.20.0
>Reporter: Yusu Gao
>Priority: Major
>
> While we are testing migration our Flink cluster from 1.11 on Java 11 runtime 
> to 1.20 on Java 17 runtime, we noticed a job creation error.
> {code:java}
> "errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Could 
> not execute application.
>     at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114)
>     at 
> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
>     at 
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
>     at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>     at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
>     at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.flink.datastream.impl.ExecutionContextEnvironment
>     at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
>     at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
>     at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
>     ... 1 more
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.flink.datastream.impl.ExecutionContextEnvironment
>     at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:109)
>     at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
>     at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
>     at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
>     at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
>     ... 1 more
> Caused by: java.lang.ExceptionInInitializerError: Exception 
> java.lang.ExceptionInInitializerError [in thread 
> \"flink-jar-runner-thread-1\"]
>     ... 6 more {code}
> Reverting runtime back to Java 11 with {*}--illegal-access=warn{*}, we found 
> a warning for Flink code
> {code:java}
> WARNING: Illegal reflective access by 
> org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator
>  (file:/path/to/flink/flink-1.20.0/lib/flink-dist-1.20.0.jar) to field 
> java.util.Collections$UnmodifiableMap.m {code}
> Which points to 
> [DataStreamV2SinkTransformationTranslator|https://github.com/apache/flink/blob/release-1.20.0/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java#L98]
>  which is used exactly in the [class 
> |https://github.com/apache/flink/blob/master/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java#L95]throwing
>  exception.
> And adding argument to the JVM_ARGS env var fixed issue
> {code:java}
> export JVM_ARGS="$JVM_ARGS --add-opens=java.base/java.util=ALL-UNNAMED"{code}
> This is a workaround but not necessarily the best solution nor the safest. 
> Please help provide insights or better solution for such issue. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-27 Thread via GitHub


gustavodemorais opened a new pull request, #25703:
URL: https://github.com/apache/flink/pull/25703

   
   
   ## What is the purpose of the change
   
   Support ignoreIfExists for createTable. Works exactly the same way for other 
resources, like createFunction. Similar to SQL's `IF NOT EXISTS`.
   
   
   ## Brief change log
   
 - Added one more signature for TableEnvironment's createTable supporting 
ignoreIfExists
 - Added testCreateTableIfNotExistsFromDescriptor
   
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
 - Added test that validates that running createTable, with ignoreIfExists 
set to true, for an existing table leads to a no op. Also checks it doing the 
same ignoreIfExists set to false throws an exception.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - Dependencies (does it add or upgrade a dependency): no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36809) Add ifNotExists param to createTable

2024-11-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36809:
---
Labels: pull-request-available  (was: )

> Add ifNotExists param to createTable
> 
>
> Key: FLINK-36809
> URL: https://issues.apache.org/jira/browse/FLINK-36809
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Gustavo de Morais
>Priority: Major
>  Labels: pull-request-available
>
> Table API's createTable currently doesn't support ignoreIfExists, like other 
> interfaces for other resources already support.
> We want to support the ignoreIfExists param for createTable. It should work 
> the same way it does for other resources, like createFunction.
>  
> Similar to SQL's {{{}_IF NOT EXISTS_ clause{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36810) Reflective access warning not addressed causing Flink failing to process job creation

2024-11-27 Thread Yusu Gao (Jira)
Yusu Gao created FLINK-36810:


 Summary: Reflective access warning not addressed causing Flink 
failing to process job creation
 Key: FLINK-36810
 URL: https://issues.apache.org/jira/browse/FLINK-36810
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.20.0
Reporter: Yusu Gao


While we are testing migration our Flink cluster from 1.11 on Java 11 runtime 
to 1.20 on Java 17 runtime, we noticed a job creation error.
{code:java}
"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Could 
not execute application.
    at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114)
    at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
    at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
    at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.concurrent.CompletionException: 
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.datastream.impl.ExecutionContextEnvironment
    at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
    at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
    at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
    ... 1 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.datastream.impl.ExecutionContextEnvironment
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:109)
    at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
    at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
    at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
    at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
    ... 1 more
Caused by: java.lang.ExceptionInInitializerError: Exception 
java.lang.ExceptionInInitializerError [in thread \"flink-jar-runner-thread-1\"]
    ... 6 more {code}
Reverting runtime back to Java 11 with {*}--illegal-access=warn{*}, we found a 
warning for Flink code
{code:java}
WARNING: Illegal reflective access by 
org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator
 (file:/path/to/flink/flink-1.20.0/lib/flink-dist-1.20.0.jar) to field 
java.util.Collections$UnmodifiableMap.m {code}
Which points to 
[DataStreamV2SinkTransformationTranslator|https://github.com/apache/flink/blob/release-1.20.0/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java#L98]
 which is used exactly in the [class 
|https://github.com/apache/flink/blob/master/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java#L95]throwing
 exception.

And adding argument to the JVM_ARGS env var fixed issue
{code:java}
export JVM_ARGS="$JVM_ARGS --add-opens=java.base/java.util=ALL-UNNAMED"{code}
This is a workaround but not necessarily the best solution nor the safest. 
Please help provide insights or better solution for such issue. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on code in PR #766:
URL: https://github.com/apache/flink-web/pull/766#discussion_r1860845426


##
docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Introducing the new Prometheus connector"
+date: "2024-11-26T00:00:00.000Z"
+authors:
+- nicusX:
+  name: "Lorenzo Nicora"
+---
+
+
+We are excited to announce a new sink connector that enables writing data to 
Prometheus 
([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)).
 This articles introduces the main features of the connector, and the reasoning 
behind design decisions.
+
+This connector allows writing data to Prometheus using the 
[Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push 
interface, which lets you write time-series data to Prometheus at scale.
+
+## Motivations for a Prometheus connector
+
+Prometheus is an efficient time-series database optimized for building 
real-time dashboards and alerts, typically in combination with Grafana or other 
visualization tools.
+
+Prometheus is commonly used to monitor compute resources, IT infrastructure, 
Kubernetes clusters, applications, and cloud resources. It can also be used to 
observe your Flink cluster and Flink jobs. Flink existing [Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
 has this purpose. 
+
+So, why do we need a connector?
+
+Prometheus can serve as a general-purpose observability time-series database, 
beyond traditional infrastructure monitoring. For example, it can be used to 
monitor IoT devices, sensors, connected cars, media streaming devices, and any 
resource that streams events or measurements continuously.
+
+Observability data from these use cases differs from metrics generated by 
compute resources. They present additional challenges:
+* **Out-of-order events**: Devices may be connected via mobile networks or 
even Bluetooth. Events from different devices may follow different paths and 
arrive at very different times. A **stateful, event-time logic** can be used to 
reorder them.
+* **High frequency** and **high cardinality**: You can have a sheer number of 
devices, each emitting signals multiple times per second. **Aggregating over 
time** and **over dimensions** can reduce frequency and cardinality and make 
the volume of data more efficiently analysable.
+* **Lack of contextual information**: Raw events sent by the devices often 
lack of contextual information for a meaningful analysis. **Enrichment** of raw 
events, looking up some reference dataset, can be used to add dimensions useful 
for the analysis.
+* **Noise**: sensor measurement may contain noise. For example when a GPS 
tracker lose connection and reports spurious positions. These obvious outliers 
can be **filtered** out to simplify visualization and analysis.
+
+Flink can be used as a pre-processor to address all the above. 
+
+You can implement a sink from scratch or use AsyncIO to call the Prometheus 
Remote-Write endpoint. However, there are not-trivial details to implement an 
efficient Remote-Write client:
+* There is no high-level client for Prometheus Remote-Write. You would need to 
build on top of a low-level HTTP client.
+* Remote-Write can be inefficient unless write requests are batched and 
parallelized.
+* Error handling can be complex, and specifications demand strict behaviors 
(see [Strict Specifications, Lenient 
Implementations](#strict-specifications-lenient-implementations)).
+
+The new Prometheus connector manages all of this for you.
+
+## Key features
+
+The version `1.0.0` of the Prometheus connector has the following features:
+
+* DataStream API, Java Sink, based on AsyncSinkBase.
+* Configurable write batching.
+* Order is retained on retries.
+* At-most-once guarantees (we’ll explain the reason for this later).

Review Comment:
   Prometheus is at-most-once by design: fast ingestion and data freshness over 
consistency. If you try to make a Prometheus sink at-least-once, Prometheus 
will spit in your face :D 
   If you make the connector fail when Prometheus rejects your data, you are 
doomed into an endless loop of restart-from-checkpoint



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add HBase Connector 4.0.0 [flink-web]

2024-11-27 Thread via GitHub


ferenc-csaky merged PR #762:
URL: https://github.com/apache/flink-web/pull/762


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35136][docs] Bump HBase connector docs version to v4.0 [flink]

2024-11-27 Thread via GitHub


ferenc-csaky merged PR #25697:
URL: https://github.com/apache/flink/pull/25697


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-36451) Kubernetes Application JobManager Potential Deadlock and TaskManager Pod Residuals

2024-11-27 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895936#comment-17895936
 ] 

Matthias Pohl edited comment on FLINK-36451 at 11/27/24 2:33 PM:
-

The problem is that we have two code paths running concurrently (in the 
contenders main thread and in the main thread of the 
{{DefaultLeaderElectionService}}) that acquire the same locks in reversed order:
* Certain operations in the contender (the DispatcherLeaderProcess in 
FLINK-36451 and the JobMasterServiceLeadershipRunner) need to be called only as 
leader. These operations are executed from within the contender and acquire the 
contender’s lock first before acquiring the LeaderElectionService lock (to 
check for leadership)).
* In the meantime, any leadership change event will trigger the same locks in 
reverse order: First, locking the DefaultLeaderElectionService lock and then 
informing the registered contenders. Notifying the contenders will require 
acquiring their locks as part of the process.

With unstable leadership happening quite often, the chance of running into a 
deadlock increases. This is an issue that does not only affect the k8s but also 
the ZooKeeper leader election.

The solution should be to run any leader election-related operations solely on 
the {{DefaultLeaderElectionService}} main thread to ensure sequential 
execution. This allows us to remove the lock within the 
{{DefaultLeaderElectionService}}. We have to ensure, though, that the 
operations that are performed on the {{DefaultLeaderElectionService}} main 
thread are light-weight (similar to the requirements of Flink's {{RpcEndpoint}} 
main thread).


was (Author: mapohl):
The problem is that we have two code paths running concurrently (in the 
contenders main thread and in the main thread of the 
{{DefaultLeaderElectionService}}) that acquire the same locks in reversed order:
* Certain operations in the contender (the DispatcherLeaderProcess in 
FLINK-36451 and the JobMasterServiceLeadershipRunner in FLINKCC-1558) need to 
be called only as leader. These operations are executed from within the 
contender and acquire the contender’s lock first before acquiring the 
LeaderElectionService lock (to check for leadership)).
* In the meantime, any leadership change event will trigger the same locks in 
reverse order: First, locking the DefaultLeaderElectionService lock and then 
informing the registered contenders. Notifying the contenders will require 
acquiring their locks as part of the process.

With unstable leadership happening quite often, the chance of running into a 
deadlock increases. This is an issue that does not only affect the k8s but also 
the ZooKeeper leader election.

The solution should be to run any leader election-related operations solely on 
the {{DefaultLeaderElectionService}} main thread to ensure sequential 
execution. This allows us to remove the lock within the 
{{DefaultLeaderElectionService}}. We have to ensure, though, that the 
operations that are performed on the {{DefaultLeaderElectionService}} main 
thread are light-weight (similar to the requirements of Flink's {{RpcEndpoint}} 
main thread).

> Kubernetes Application JobManager Potential Deadlock and TaskManager Pod 
> Residuals
> --
>
> Key: FLINK-36451
> URL: https://issues.apache.org/jira/browse/FLINK-36451
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.1
> Environment: * Flink version: 1.19.1
>  * - Deployment mode: Flink Kubernetes Application Mode
>  * - JVM version: OpenJDK 17
>  
>Reporter: xiechenling
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
> Attachments: 1.png, 2.png, jobmanager.log, jstack.txt
>
>
> In Kubernetes Application Mode, when there is significant etcd latency or 
> instability, the Flink JobManager may enter a deadlock situation. 
> Additionally, TaskManager pods are not cleaned up properly, resulting in 
> stale resources that prevent the Flink job from recovering correctly. This 
> issue occurs during frequent service restarts or network instability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36451) Kubernetes Application JobManager Potential Deadlock and TaskManager Pod Residuals

2024-11-27 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895936#comment-17895936
 ] 

Matthias Pohl edited comment on FLINK-36451 at 11/27/24 2:34 PM:
-

The problem is that we have two code paths running concurrently (in the 
contenders main thread and in the main thread of the 
{{DefaultLeaderElectionService}}) that acquire the same locks in reversed order:
* Certain operations in the contender (the DispatcherLeaderProcess in 
FLINK-36451 and similarly the JobMasterServiceLeadershipRunner) need to be 
called only as leader. These operations are executed from within the contender 
and acquire the contender’s lock first before acquiring the 
LeaderElectionService lock (to check for leadership)).
* In the meantime, any leadership change event will trigger the same locks in 
reverse order: First, locking the DefaultLeaderElectionService lock and then 
informing the registered contenders. Notifying the contenders will require 
acquiring their locks as part of the process.

With unstable leadership happening quite often, the chance of running into a 
deadlock increases. This is an issue that does not only affect the k8s but also 
the ZooKeeper leader election.

The solution should be to run any leader election-related operations solely on 
the {{DefaultLeaderElectionService}} main thread to ensure sequential 
execution. This allows us to remove the lock within the 
{{DefaultLeaderElectionService}}. We have to ensure, though, that the 
operations that are performed on the {{DefaultLeaderElectionService}} main 
thread are light-weight (similar to the requirements of Flink's {{RpcEndpoint}} 
main thread).


was (Author: mapohl):
The problem is that we have two code paths running concurrently (in the 
contenders main thread and in the main thread of the 
{{DefaultLeaderElectionService}}) that acquire the same locks in reversed order:
* Certain operations in the contender (the DispatcherLeaderProcess in 
FLINK-36451 and the JobMasterServiceLeadershipRunner) need to be called only as 
leader. These operations are executed from within the contender and acquire the 
contender’s lock first before acquiring the LeaderElectionService lock (to 
check for leadership)).
* In the meantime, any leadership change event will trigger the same locks in 
reverse order: First, locking the DefaultLeaderElectionService lock and then 
informing the registered contenders. Notifying the contenders will require 
acquiring their locks as part of the process.

With unstable leadership happening quite often, the chance of running into a 
deadlock increases. This is an issue that does not only affect the k8s but also 
the ZooKeeper leader election.

The solution should be to run any leader election-related operations solely on 
the {{DefaultLeaderElectionService}} main thread to ensure sequential 
execution. This allows us to remove the lock within the 
{{DefaultLeaderElectionService}}. We have to ensure, though, that the 
operations that are performed on the {{DefaultLeaderElectionService}} main 
thread are light-weight (similar to the requirements of Flink's {{RpcEndpoint}} 
main thread).

> Kubernetes Application JobManager Potential Deadlock and TaskManager Pod 
> Residuals
> --
>
> Key: FLINK-36451
> URL: https://issues.apache.org/jira/browse/FLINK-36451
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.1
> Environment: * Flink version: 1.19.1
>  * - Deployment mode: Flink Kubernetes Application Mode
>  * - JVM version: OpenJDK 17
>  
>Reporter: xiechenling
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
> Attachments: 1.png, 2.png, jobmanager.log, jstack.txt
>
>
> In Kubernetes Application Mode, when there is significant etcd latency or 
> instability, the Flink JobManager may enter a deadlock situation. 
> Additionally, TaskManager pods are not cleaned up properly, resulting in 
> stale resources that prevent the Flink job from recovering correctly. This 
> issue occurs during frequent service restarts or network instability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36809) Add ifNotExists param to createTable

2024-11-27 Thread Gustavo de Morais (Jira)
Gustavo de Morais created FLINK-36809:
-

 Summary: Add ifNotExists param to createTable
 Key: FLINK-36809
 URL: https://issues.apache.org/jira/browse/FLINK-36809
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Gustavo de Morais


Table API's createTable currently doesn't support ignoreIfExists, like other 
interfaces for other resources already support.

We want to support the ignoreIfExists param for createTable. It should work the 
same way it does for other resources, like createFunction.

 

Similar to SQL's {{{}_IF NOT EXISTS_ clause{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36808) UNION ALL after lookup join produces unexpected results

2024-11-27 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-36808:
-

 Summary: UNION ALL after lookup join produces unexpected results
 Key: FLINK-36808
 URL: https://issues.apache.org/jira/browse/FLINK-36808
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.1, 1.20.0
Reporter: Qingsheng Ren


Here is the SQL to reproduce the issue:
{code:java}
-- Data of table `stream`:
-- (1, Alice)
-- (2, Bob)
CREATE TEMPORARY TABLE `stream` (
`id` BIGINT,
`name` STRING,
`txn_time` as proctime(),
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'stream',
  'username' = 'postgres',
  'password' = 'postgres'
);

-- Data of table `dim`:
-- (1, OK)
-- (2, OK)
CREATE TEMPORARY TABLE `dim` (
`id` BIGINT,
`status` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'dim',
  'username' = 'postgres',
  'password' = 'postgres'
);

-- Lookup join two tables twice with different filter, and union them together
SELECT
 s.id,
 s.name,
 s.txn_time,
 d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
`d`
ON
 `s`.`id` = `d`.`id`
WHERE
 `d`.`status` = 'OK' 
UNION ALL
SELECT
 s.id,
 s.name,
 s.txn_time,
 d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS 
`d`
ON
 `s`.`id` = `d`.`id`
WHERE
 `d`.`status` = 'NOT_EXISTS';{code}
The first lookup join should output:
{code:java}
(1, Alice 2024-11-27 11:52:19.332, OK)
(2, Bob 2024-11-27 11:52:19.332, OK) {code}
The second lookup join should output nothing, as there's not status 
'NOT_EXISTS'.

But the result after union is:
{code:java}
1, Alice, 2024-11-27 11:52:19.332, OK
2, Bob, 2024-11-27 11:52:19.332, OK
1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS
2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code}
There shouldn't be any 'NOT_EXISTS's. 

The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are 
appended directly by the calc after the lookup join operation, which is not as 
expected. 
{code:java}
| == Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
:  +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')])
:     +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 2}])
:        :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
:        :  +- LogicalTableScan(table=[[default_catalog, default_database, 
stream]])
:        +- LogicalFilter(condition=[=($cor0.id, $0)])
:           +- LogicalSnapshot(period=[$cor0.txn_time])
:              +- LogicalTableScan(table=[[default_catalog, default_database, 
dim]])
+- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
   +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')])
      +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{0, 2}])
         :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
         :  +- LogicalTableScan(table=[[default_catalog, default_database, 
stream]])
         +- LogicalFilter(condition=[=($cor1.id, $0)])
            +- LogicalSnapshot(period=[$cor1.txn_time])
               +- LogicalTableScan(table=[[default_catalog, default_database, 
dim]])== Optimized Physical Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
   :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 
status])
   :  +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
   :     +- Calc(select=[id, name, PROCTIME() AS txn_time])
   :        +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])
   +- Calc(select=[id, name, txn_time, 
CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status])
      +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
         +- Calc(select=[id, name, PROCTIME() AS txn_time])
            +- TableSourceScan(table=[[default_catalog, default_database, 
stream]], fields=[id, name])== Optimized Execution Plan ==
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
   :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS 
status])
   :  +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_t

Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on code in PR #766:
URL: https://github.com/apache/flink-web/pull/766#discussion_r1860883342


##
docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Introducing the new Prometheus connector"
+date: "2024-11-26T00:00:00.000Z"
+authors:
+- nicusX:
+  name: "Lorenzo Nicora"
+---
+
+
+We are excited to announce a new sink connector that enables writing data to 
Prometheus 
([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)).
 This articles introduces the main features of the connector, and the reasoning 
behind design decisions.
+
+This connector allows writing data to Prometheus using the 
[Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push 
interface, which lets you write time-series data to Prometheus at scale.
+
+## Motivations for a Prometheus connector
+
+Prometheus is an efficient time-series database optimized for building 
real-time dashboards and alerts, typically in combination with Grafana or other 
visualization tools.
+
+Prometheus is commonly used to monitor compute resources, IT infrastructure, 
Kubernetes clusters, applications, and cloud resources. It can also be used to 
observe your Flink cluster and Flink jobs. Flink existing [Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
 has this purpose. 
+
+So, why do we need a connector?
+
+Prometheus can serve as a general-purpose observability time-series database, 
beyond traditional infrastructure monitoring. For example, it can be used to 
monitor IoT devices, sensors, connected cars, media streaming devices, and any 
resource that streams events or measurements continuously.
+
+Observability data from these use cases differs from metrics generated by 
compute resources. They present additional challenges:
+* **Out-of-order events**: Devices may be connected via mobile networks or 
even Bluetooth. Events from different devices may follow different paths and 
arrive at very different times. A **stateful, event-time logic** can be used to 
reorder them.
+* **High frequency** and **high cardinality**: You can have a sheer number of 
devices, each emitting signals multiple times per second. **Aggregating over 
time** and **over dimensions** can reduce frequency and cardinality and make 
the volume of data more efficiently analysable.
+* **Lack of contextual information**: Raw events sent by the devices often 
lack of contextual information for a meaningful analysis. **Enrichment** of raw 
events, looking up some reference dataset, can be used to add dimensions useful 
for the analysis.
+* **Noise**: sensor measurement may contain noise. For example when a GPS 
tracker lose connection and reports spurious positions. These obvious outliers 
can be **filtered** out to simplify visualization and analysis.
+
+Flink can be used as a pre-processor to address all the above. 
+
+You can implement a sink from scratch or use AsyncIO to call the Prometheus 
Remote-Write endpoint. However, there are not-trivial details to implement an 
efficient Remote-Write client:
+* There is no high-level client for Prometheus Remote-Write. You would need to 
build on top of a low-level HTTP client.
+* Remote-Write can be inefficient unless write requests are batched and 
parallelized.
+* Error handling can be complex, and specifications demand strict behaviors 
(see [Strict Specifications, Lenient 
Implementations](#strict-specifications-lenient-implementations)).
+
+The new Prometheus connector manages all of this for you.
+
+## Key features
+
+The version `1.0.0` of the Prometheus connector has the following features:
+
+* DataStream API, Java Sink, based on AsyncSinkBase.
+* Configurable write batching.
+* Order is retained on retries.
+* At-most-once guarantees (we’ll explain the reason for this later).
+* Supports parallelism greater than 1.
+* Configurable retry strategy for retryable server responses (e.g., 
throttling) and configurable behavior when the retry limit is reached.
+* Configurable behavior on retryable errors (e.g., throttling from the 
Prometheus backend).
+
+### Authentication
+
+Authentication is [explicitly out of 
scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for 
Prometheus Remote-Write specifications.
+The connector provides a generic interface, PrometheusRequestSigner, to 
manipulate requests and add headers. This allows implementation of any 
authentication scheme that requires adding headers to the request, such as API 
keys, authorization, or signature tokens.
+
+In the release `1.0.0`, an implementation for Amazon Managed Service for 
Prometheus (AMP) is provided as a separate, optional dependency.
+
+## Designing the connector
+
+A sink to Prometheus differs from a sink to most other datastores or 
time-series databases. The wire interface is the easiest part; the main 
challenges arise from the diff

Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on code in PR #766:
URL: https://github.com/apache/flink-web/pull/766#discussion_r1860886508


##
docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Introducing the new Prometheus connector"
+date: "2024-11-26T00:00:00.000Z"
+authors:
+- nicusX:
+  name: "Lorenzo Nicora"
+---
+
+
+We are excited to announce a new sink connector that enables writing data to 
Prometheus 
([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)).
 This articles introduces the main features of the connector, and the reasoning 
behind design decisions.
+
+This connector allows writing data to Prometheus using the 
[Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push 
interface, which lets you write time-series data to Prometheus at scale.
+
+## Motivations for a Prometheus connector
+
+Prometheus is an efficient time-series database optimized for building 
real-time dashboards and alerts, typically in combination with Grafana or other 
visualization tools.
+
+Prometheus is commonly used to monitor compute resources, IT infrastructure, 
Kubernetes clusters, applications, and cloud resources. It can also be used to 
observe your Flink cluster and Flink jobs. Flink existing [Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
 has this purpose. 
+
+So, why do we need a connector?
+
+Prometheus can serve as a general-purpose observability time-series database, 
beyond traditional infrastructure monitoring. For example, it can be used to 
monitor IoT devices, sensors, connected cars, media streaming devices, and any 
resource that streams events or measurements continuously.
+
+Observability data from these use cases differs from metrics generated by 
compute resources. They present additional challenges:
+* **Out-of-order events**: Devices may be connected via mobile networks or 
even Bluetooth. Events from different devices may follow different paths and 
arrive at very different times. A **stateful, event-time logic** can be used to 
reorder them.
+* **High frequency** and **high cardinality**: You can have a sheer number of 
devices, each emitting signals multiple times per second. **Aggregating over 
time** and **over dimensions** can reduce frequency and cardinality and make 
the volume of data more efficiently analysable.
+* **Lack of contextual information**: Raw events sent by the devices often 
lack of contextual information for a meaningful analysis. **Enrichment** of raw 
events, looking up some reference dataset, can be used to add dimensions useful 
for the analysis.
+* **Noise**: sensor measurement may contain noise. For example when a GPS 
tracker lose connection and reports spurious positions. These obvious outliers 
can be **filtered** out to simplify visualization and analysis.
+
+Flink can be used as a pre-processor to address all the above. 
+
+You can implement a sink from scratch or use AsyncIO to call the Prometheus 
Remote-Write endpoint. However, there are not-trivial details to implement an 
efficient Remote-Write client:
+* There is no high-level client for Prometheus Remote-Write. You would need to 
build on top of a low-level HTTP client.
+* Remote-Write can be inefficient unless write requests are batched and 
parallelized.
+* Error handling can be complex, and specifications demand strict behaviors 
(see [Strict Specifications, Lenient 
Implementations](#strict-specifications-lenient-implementations)).
+
+The new Prometheus connector manages all of this for you.
+
+## Key features
+
+The version `1.0.0` of the Prometheus connector has the following features:
+
+* DataStream API, Java Sink, based on AsyncSinkBase.
+* Configurable write batching.
+* Order is retained on retries.
+* At-most-once guarantees (we’ll explain the reason for this later).
+* Supports parallelism greater than 1.
+* Configurable retry strategy for retryable server responses (e.g., 
throttling) and configurable behavior when the retry limit is reached.
+* Configurable behavior on retryable errors (e.g., throttling from the 
Prometheus backend).
+
+### Authentication
+
+Authentication is [explicitly out of 
scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for 
Prometheus Remote-Write specifications.
+The connector provides a generic interface, PrometheusRequestSigner, to 
manipulate requests and add headers. This allows implementation of any 
authentication scheme that requires adding headers to the request, such as API 
keys, authorization, or signature tokens.
+
+In the release `1.0.0`, an implementation for Amazon Managed Service for 
Prometheus (AMP) is provided as a separate, optional dependency.
+
+## Designing the connector
+
+A sink to Prometheus differs from a sink to most other datastores or 
time-series databases. The wire interface is the easiest part; the main 
challenges arise from the diff

[jira] [Commented] (FLINK-36810) Reflective access warning not addressed causing Flink failing to process job creation

2024-11-27 Thread Thomas Cooper (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901522#comment-17901522
 ] 

Thomas Cooper commented on FLINK-36810:
---

Are you setting the Java options needed for JVM 17 support, they are 
[listed|https://github.com/apache/flink/blob/762bb5ab2a6a5ed48f3d60501b9e1c0d362ad8cb/flink-dist/src/main/resources/config.yaml#L19]
 in the default conf.yaml file?

> Reflective access warning not addressed causing Flink failing to process job 
> creation
> -
>
> Key: FLINK-36810
> URL: https://issues.apache.org/jira/browse/FLINK-36810
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.20.0
>Reporter: Yusu Gao
>Priority: Major
>
> While we are testing migration our Flink cluster from 1.11 on Java 11 runtime 
> to 1.20 on Java 17 runtime, we noticed a job creation error.
> {code:java}
> "errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Could 
> not execute application.
>     at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114)
>     at 
> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
>     at 
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
>     at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>     at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
>     at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.flink.datastream.impl.ExecutionContextEnvironment
>     at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
>     at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
>     at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
>     ... 1 more
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.flink.datastream.impl.ExecutionContextEnvironment
>     at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:109)
>     at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
>     at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
>     at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
>     at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
>     ... 1 more
> Caused by: java.lang.ExceptionInInitializerError: Exception 
> java.lang.ExceptionInInitializerError [in thread 
> \"flink-jar-runner-thread-1\"]
>     ... 6 more {code}
> Reverting runtime back to Java 11 with {*}--illegal-access=warn{*}, we found 
> a warning for Flink code
> {code:java}
> WARNING: Illegal reflective access by 
> org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator
>  (file:/path/to/flink/flink-1.20.0/lib/flink-dist-1.20.0.jar) to field 
> java.util.Collections$UnmodifiableMap.m {code}
> Which points to 
> [DataStreamV2SinkTransformationTranslator|https://github.com/apache/flink/blob/release-1.20.0/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java#L98]
>  which is used exactly in the [class 
> |https://github.com/apache/flink/blob/master/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java#L95]throwing
>  exception.
> And adding argument to the JVM_ARGS env var fixed issue
> {code:java}
> export JVM_ARGS="$JVM_ARGS --add-opens=java.base/java.util=ALL-UNNAMED"{code}
> This is a workaround but not necessarily the best solution nor the safest. 
> Please help provide insights or better solution for such issue. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-27 Thread via GitHub


gustavodemorais opened a new pull request, #25704:
URL: https://github.com/apache/flink/pull/25704

   
   
   ## What is the purpose of the change
   
   Support ignoreIfExists for createTable. Works exactly the same way for other 
resources, like createFunction. Similar to SQL's `IF NOT EXISTS`.
   
   
   ## Brief change log
   
 - Added one more signature for TableEnvironment's createTable supporting 
ignoreIfExists
 - Added testCreateTableIfNotExistsFromDescriptor
   
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
 - Added test that validates that running createTable, with ignoreIfExists 
set to true, for an existing table leads to a no op. Also checks it doing the 
same ignoreIfExists set to false throws an exception.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - Dependencies (does it add or upgrade a dependency): no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on PR #766:
URL: https://github.com/apache/flink-web/pull/766#issuecomment-2504225605

   @hlteoh37 @dannycranmer I addressed all your comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36451][runtime] Replaces LeaderElection#hasLeadership with LeaderElection#runAsLeader [flink]

2024-11-27 Thread via GitHub


XComp commented on code in PR #25679:
URL: https://github.com/apache/flink/pull/25679#discussion_r1860904735


##
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##
@@ -311,8 +312,11 @@ void testMultipleDriverCreations() throws Exception {
 @Test
 void testGrantCallWhileInstantiatingDriver() throws Exception {
 final UUID expectedLeaderSessionID = UUID.randomUUID();
+// leadership acquired right from the start
 final TestingLeaderElectionDriver.Builder driverBuilder =
-TestingLeaderElectionDriver.newNoOpBuilder();
+TestingLeaderElectionDriver.newBuilder(new 
AtomicBoolean(true));

Review Comment:
   That was an interesting one because I wondered why we didn't need to set the 
leadership pre-FLINK-36451.
   
   The reason was a bug in the 
[TestingLeaderContender#grantLeadership](https://github.com/apache/flink/pull/25679/files#diff-b0f861e1a8e93374d482f4565c05430b9134f10bf691da391abef6b79b4dd015L53)
 method. The old implementation confirmed the leadership and pushed a 
LeaderInformation event after that call even if the confirmation failed (due to 
leadership loss). The new implementation is cleaner in this regards



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add blogpost for new Prometheus connector [flink-web]

2024-11-27 Thread via GitHub


nicusX commented on PR #766:
URL: https://github.com/apache/flink-web/pull/766#issuecomment-2504229269

   @hlteoh37 @dannycranmer I addressed all your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]

2024-11-27 Thread via GitHub


twalthr commented on code in PR #25704:
URL: https://github.com/apache/flink/pull/25704#discussion_r1860917076


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -745,6 +745,35 @@ void createTemporarySystemFunction(
  */
 void createTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a catalog table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+ * in the catalog.
+ *
+ * If the table should not be permanently stored in a catalog, use 
{@link
+ * #createTemporaryTable(String, TableDescriptor)} instead.
+ *
+ * Examples:
+ *
+ * {@code
+ * tEnv.createTable("MyTable", TableDescriptor.forConnector("datagen")
+ *   .schema(Schema.newBuilder()
+ * .column("f0", DataTypes.STRING())
+ * .build())
+ *   .option(DataGenOptions.ROWS_PER_SECOND, 10)
+ *   .option("fields.f0.kind", "random")
+ *   .build());
+ * }
+ *
+ * @param path The path under which the table will be registered. See also 
the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @param descriptor Template for creating a {@link CatalogTable} instance.
+ * @param ignoreIfExists If a table exists under the given path and this 
flag is set, no
+ * operation is executed. An exception is thrown otherwise.
+ */
+void createTable(String path, TableDescriptor descriptor, boolean 
ignoreIfExists);

Review Comment:
   can you check whether Python already supports `createTable` with 
TableDescriptors. if yes, we should also add the method there. check 
`flink/flink-python/pyflink/table/table_environment.py`



##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java:
##
@@ -100,6 +101,55 @@ void testCreateTableFromDescriptor() throws Exception {
 .contains(entry("connector", "fake"), entry("a", "Test"));
 }
 
+@Test
+void testCreateTableIfNotExistsFromDescriptor() throws Exception {
+final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+final String catalog = tEnv.getCurrentCatalog();
+final String database = tEnv.getCurrentDatabase();
+
+final Schema schema = Schema.newBuilder().column("f0", 
DataTypes.INT()).build();
+tEnv.createTable(
+"T",
+
TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build(),
+true);
+
+final ObjectPath objectPath = new ObjectPath(database, "T");
+assertThat(
+tEnv.getCatalog(catalog)
+.orElseThrow(AssertionError::new)
+.tableExists(objectPath))
+.isTrue();
+
+final CatalogBaseTable catalogTable =
+
tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getTable(objectPath);
+assertThat(catalogTable).isInstanceOf(CatalogTable.class);
+assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema);
+assertThat(catalogTable.getOptions())
+.contains(entry("connector", "fake"), entry("a", "Test"));
+
+assertThatNoException()
+.isThrownBy(
+() ->
+tEnv.createTable(
+"T",
+TableDescriptor.forConnector("fake")
+.schema(schema)
+.option("a", "Test")
+.build(),
+true));
+
+assertThatThrownBy(
+() ->
+tEnv.createTable(
+"T",
+TableDescriptor.forConnector("fake")
+.schema(schema)
+.option("a", "Test")
+.build(),
+false))
+.isInstanceOf(ValidationException.class);

Review Comment:
   ideally, we should also check the error message. at least parts of it. 
`hasMessageContaining` could do the job?



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -745,6 +745,35 @@ void createTemporarySystemFunction(
  */
 void createTable(String path, TableDescriptor descriptor);
 
+/**
+ * Registers the given {@link TableDescriptor} as a catalog table.
+ *
+ * The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and

Re: [PR] [FLINK-35718][mysql] cherry pick DBZ-5333 from debezium MySqlErrorHandler and add logs. [flink-cdc]

2024-11-27 Thread via GitHub


github-actions[bot] commented on PR #3440:
URL: https://github.com/apache/flink-cdc/pull/3440#issuecomment-2505023790

   This pull request has been closed because it has not had recent activity. 
You could reopen it if you try to continue your work, and anyone who are 
interested in it are encouraged to continue work on this pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35277][cdc-connector][db2] Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. [flink-cdc]

2024-11-27 Thread via GitHub


github-actions[bot] closed pull request #3286: 
[FLINK-35277][cdc-connector][db2] Fix the error in the `asncdcaddremove.sql` 
script for the DB2 test container.
URL: https://github.com/apache/flink-cdc/pull/3286


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36397][cdc-connector][mysql] Obtain the high watermark offset and table data in the same transaction during the Snapshot phase [flink-cdc]

2024-11-27 Thread via GitHub


github-actions[bot] commented on PR #3616:
URL: https://github.com/apache/flink-cdc/pull/3616#issuecomment-2505023628

   This pull request has been automatically marked as stale because it has not 
had recent activity for 60 days. It will be closed in 30 days if no further 
activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35277][cdc-connector][db2] Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. [flink-cdc]

2024-11-27 Thread via GitHub


github-actions[bot] commented on PR #3286:
URL: https://github.com/apache/flink-cdc/pull/3286#issuecomment-2505023836

   This pull request has been closed because it has not had recent activity. 
You could reopen it if you try to continue your work, and anyone who are 
interested in it are encouraged to continue work on this pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] SQL Server CDC support - fixing the issues in the outstanding pull request by another author [flink-cdc]

2024-11-27 Thread via GitHub


github-actions[bot] commented on PR #3507:
URL: https://github.com/apache/flink-cdc/pull/3507#issuecomment-2505023722

   This pull request has been closed because it has not had recent activity. 
You could reopen it if you try to continue your work, and anyone who are 
interested in it are encouraged to continue work on this pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35718][mysql] cherry pick DBZ-5333 from debezium MySqlErrorHandler and add logs. [flink-cdc]

2024-11-27 Thread via GitHub


github-actions[bot] closed pull request #3440: [FLINK-35718][mysql] cherry pick 
DBZ-5333 from debezium MySqlErrorHandler and add logs.
URL: https://github.com/apache/flink-cdc/pull/3440


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] SQL Server CDC support - fixing the issues in the outstanding pull request by another author [flink-cdc]

2024-11-27 Thread via GitHub


github-actions[bot] closed pull request #3507: SQL Server CDC support - fixing 
the issues in the outstanding pull request by another author
URL: https://github.com/apache/flink-cdc/pull/3507


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36814) Flink CDC Support Mutil Datasource

2024-11-27 Thread melin (Jira)
melin created FLINK-36814:
-

 Summary: Flink CDC Support Mutil Datasource
 Key: FLINK-36814
 URL: https://issues.apache.org/jira/browse/FLINK-36814
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: melin


Flink CDC Support Mutil Datasource



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36815) series bugs of DSv2

2024-11-27 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-36815:
--

Assignee: xuhuang

> series bugs of DSv2
> ---
>
> Key: FLINK-36815
> URL: https://issues.apache.org/jira/browse/FLINK-36815
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 2.0.0
>Reporter: xuhuang
>Assignee: xuhuang
>Priority: Major
>  Labels: pull-request-available
>
> I found four bugs in DSv2.
> 1. The DSv2 _org.apache.flink.datastream.api.ExecutionEnvironment#fromSource_ 
> method return a 
> {_}NonKeyedPartitionStream{_}, which does not allow users to set source 
> configurations, such as operator name or parallelism.
> 2. The _org.apache.flink.datastream.api.context.TaskInfo_ lack 
> {_}sub{_}_taskIndex_ and {_}attemptNumber{_}.
> 3. The _open_ and _close_ methods of 
> _org.apache.flink.datastream.api.function.ProcessFunction_ are not being 
> called.
> 4. The _open_ method of 
> _org.apache.flink.datastream.api.function.ProcessFunction_ lacks relevant 
> information, such as the current TaskInfo.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36815) series bugs of DSv2

2024-11-27 Thread xuhuang (Jira)
xuhuang created FLINK-36815:
---

 Summary: series bugs of DSv2
 Key: FLINK-36815
 URL: https://issues.apache.org/jira/browse/FLINK-36815
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 2.0.0
Reporter: xuhuang


I found four bugs in DSv2.

1. The DSv2 _org.apache.flink.datastream.api.ExecutionEnvironment#fromSource_ 
method return a 
{_}NonKeyedPartitionStream{_}, which does not allow users to set source 
configurations, such as operator name or parallelism.
2. The _org.apache.flink.datastream.api.context.TaskInfo_ lack 
{_}sub{_}_taskIndex_ and {_}attemptNumber{_}.

3. The _open_ and _close_ methods of 
_org.apache.flink.datastream.api.function.ProcessFunction_ are not being called.

4. The _open_ method of 
_org.apache.flink.datastream.api.function.ProcessFunction_ lacks relevant 
information, such as the current TaskInfo.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36815][API] Fix fix series bugs of DSv2 [flink]

2024-11-27 Thread via GitHub


codenohup opened a new pull request, #25706:
URL: https://github.com/apache/flink/pull/25706

   ## What is the purpose of the change
   
   Fix fix series bugs of DSv2.
   
   1. The DSv2 org.apache.flink.datastream.api.ExecutionEnvironment#fromSource 
method return a 
   NonKeyedPartitionStream, which does not allow users to set source 
configurations, such as operator name or parallelism.
   
   2. The org.apache.flink.datastream.api.context.TaskInfo lack subtaskIndex 
and attemptNumber.
   
   3. The open and close methods of 
org.apache.flink.datastream.api.function.ProcessFunction are not being called.
   
   4. The open method of 
org.apache.flink.datastream.api.function.ProcessFunction lacks relevant 
information, such as the current TaskInfo.
   
   ## Brief change log
   - Make ExecutionEnvironment#fromSource return 
ProcessConfigurableAndNonKeyedPartitionStream
   - Introduce subtask index and attempt id in TaskInfo for DSv2
   - Open and close user function in DSv2 ProcessOperator
   - Introduce a NonPartitionedContext parameter in Open method of UserFunction 
for DSv2
   
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36815) series bugs of DSv2

2024-11-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36815:
---
Labels: pull-request-available  (was: )

> series bugs of DSv2
> ---
>
> Key: FLINK-36815
> URL: https://issues.apache.org/jira/browse/FLINK-36815
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 2.0.0
>Reporter: xuhuang
>Priority: Major
>  Labels: pull-request-available
>
> I found four bugs in DSv2.
> 1. The DSv2 _org.apache.flink.datastream.api.ExecutionEnvironment#fromSource_ 
> method return a 
> {_}NonKeyedPartitionStream{_}, which does not allow users to set source 
> configurations, such as operator name or parallelism.
> 2. The _org.apache.flink.datastream.api.context.TaskInfo_ lack 
> {_}sub{_}_taskIndex_ and {_}attemptNumber{_}.
> 3. The _open_ and _close_ methods of 
> _org.apache.flink.datastream.api.function.ProcessFunction_ are not being 
> called.
> 4. The _open_ method of 
> _org.apache.flink.datastream.api.function.ProcessFunction_ lacks relevant 
> information, such as the current TaskInfo.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36815][API] Fix fix series bugs of DSv2 [flink]

2024-11-27 Thread via GitHub


flinkbot commented on PR #25706:
URL: https://github.com/apache/flink/pull/25706#issuecomment-2505464742

   
   ## CI report:
   
   * f24821faddf52ab597bb635334aee0e252fbba65 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36788] Fix and cover global committer [flink]

2024-11-27 Thread via GitHub


AHeise commented on code in PR #25685:
URL: https://github.com/apache/flink/pull/25685#discussion_r1860247749


##
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java:
##
@@ -39,12 +41,29 @@ private StandardSinkTopologies() {}
  */
 public static  void addGlobalCommitter(
 DataStream> committables,
-SerializableSupplier> committerFactory,
+SerializableFunction> 
committerFactory,
 SerializableSupplier> 
committableSerializer) {
 committables
 .getExecutionEnvironment()
 .addOperator(
 new GlobalCommitterTransform<>(
 committables, committerFactory, 
committableSerializer));
 }
+
+/**
+ * Adds a global committer to the pipeline that runs as final operator 
with a parallelism of
+ * one.
+ */
+public static  void addGlobalCommitter(

Review Comment:
   It's fine to keep if the global committer doesn't need access to the ctx. 
It's relatively little code and could be considered sugar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format [flink-cdc]

2024-11-27 Thread via GitHub


yuxiqian commented on code in PR #3658:
URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1860226446


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql:
##


Review Comment:
   Accidental change?



##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java:
##
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.github.shyiko.mysql.binlog.event.deserialization.json;
+
+import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Base64;
+
+import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig.useLegacyJsonFormat;
+
+/**
+ * Copied from mysql-binlog-connector-java 0.27.2 to add whitespace before 
value and after comma.
+ *
+ * Line 105: Added whitespace before value, Line 207: Added whitespace 
after comma

Review Comment:
   Seems line numbers have been differed from actual changes.



##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java:
##


Review Comment:
   Since DataStream / Table source connectors' behavior also changed, 
corresponding integrated test cases might be necessary.



##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java:
##
@@ -457,6 +495,61 @@ private void testCommonDataTypes(UniqueDatabase database) 
throws Exception {
 .isEqualTo(expectedStreamRecord);
 }
 
+private void testJsonDataType(UniqueDatabase database, Boolean 
useLegacyJsonFormat)
+throws Exception {
+database.createAndInitialize();
+CloseableIterator iterator =
+env.fromSource(
+getFlinkSourceProvider(
+new String[] {"json_types"},
+database,
+useLegacyJsonFormat)
+.getSource(),
+WatermarkStrategy.noWatermarks(),
+"Event-Source")
+.executeAndCollect();
+
+Object[] expectedSnapshot =
+new Object[] {
+DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
+BinaryStringData.fromString("{\"key1\": \"value1\"}"),
+BinaryStringData.fromString("{\"key1\": \"value1\", 
\"key2\": \"value2\"}"),
+BinaryStringData.fromString(
+"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": 
\"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": 
[\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"),
+1
+};
+
+// skip CreateTableEvent
+List snapshotResults =
+MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 
1).f0;
+RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(0)).after();
+Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, 
JSON_TYPES))
+.isEqualTo(expectedSnapshot);
+
+try (Connection connection = database.getJdbcConnection();
+Statement statement = connection.createStatement()) {
+statement.execute("UPDATE json_types SET int_c = null WHERE id = 
1;");
+}
+
+Object[] expectedStreamRecord = expectedSnapshot;
+
+if (useLegacyJsonFormat) {
+expectedSnapshot[1] = 
BinaryStringData.fromString("{\"key1\":\"value1\"}");
+expectedSnapshot[2] =
+
BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}");
+expectedSnapshot[3] =
+BinaryStringData.fro

Re: [PR] [FLINK-36788] Fix and cover global committer [flink]

2024-11-27 Thread via GitHub


AHeise commented on code in PR #25685:
URL: https://github.com/apache/flink/pull/25685#discussion_r1860251139


##
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java:
##
@@ -90,7 +92,18 @@ private Collection translateInternal(
 transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME);
 transformation.setParallelism(1);
 transformation.setMaxParallelism(1);
-return Collections.emptyList();
+copySafely(transformation::setName, globalCommitterTransform::getName);

Review Comment:
   We set the attributes while expanding the post commit topology. We expand 
this transform almost at the end of the sink expansion. So it's fair to say 
that we have it in all cases where we set the properties during post commit 
expansion. It's not set if no uid or customUidHashes are user-supplied. I think 
name should always be set but I still use the copySafely method because you 
never know how things will evolve.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36788] Fix and cover global committer [flink]

2024-11-27 Thread via GitHub


AHeise commented on code in PR #25685:
URL: https://github.com/apache/flink/pull/25685#discussion_r1860253151


##
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java:
##
@@ -188,6 +196,14 @@ private void expand() {
 }
 }
 
+private  void repeatUntilConverged(Supplier producer) {
+R lastResult = producer.get();
+R nextResult;
+while (!lastResult.equals(nextResult = producer.get())) {
+lastResult = nextResult;
+}
+}
+
 private List> getSinkTransformations(int sizeBefore) 
{

Review Comment:
   Yes that's why I create a copy now of the sublist (should be List.copyOf in 
newer JDK versions). If new transforms are added, we have a separate iteration 
with a separate copy that expands them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]

2024-11-27 Thread via GitHub


zentol commented on PR #25670:
URL: https://github.com/apache/flink/pull/25670#issuecomment-2503299977

   > why are the CI tests failing?
   
   Because the image was modified incorrectly.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] DRAFT [FLINK-xxxx][flink-table] Support create table ignoreIfExists for table-api [flink]

2024-11-27 Thread via GitHub


Guttz closed pull request #25701: DRAFT [FLINK-][flink-table] Support 
create table ignoreIfExists for table-api
URL: https://github.com/apache/flink/pull/25701


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33571][table] Override json-path version for Calcite 1.32 to deal with CVE [flink]

2024-11-27 Thread via GitHub


tomncooper commented on PR #25602:
URL: https://github.com/apache/flink/pull/25602#issuecomment-2503426742

   @snuyanzin Good point, I ammended the title and commit message to point at 
[FLINK-33571](https://issues.apache.org/jira/browse/FLINK-33571).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]

2024-11-27 Thread via GitHub


pnowojski merged PR #25671:
URL: https://github.com/apache/flink/pull/25671


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Do skip distributing maxAllowedWatermark if there are no sub… [flink]

2024-11-27 Thread via GitHub


pnowojski merged PR #25693:
URL: https://github.com/apache/flink/pull/25693


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-36764) Add checkpoint type to checkpoint trace

2024-11-27 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-36764.
--
Resolution: Fixed

merged commit 09ad451 into apache:master

> Add checkpoint type to checkpoint trace
> ---
>
> Key: FLINK-36764
> URL: https://issues.apache.org/jira/browse/FLINK-36764
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Metrics
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Currently it's impossible to distinguish checkpoints from savepoints. Also it 
> would be handy to distinguish aligned and unaligned checkpoints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Handle map and array types binary record data [flink-cdc]

2024-11-27 Thread via GitHub


lvyanquan commented on PR #3434:
URL: https://github.com/apache/flink-cdc/pull/3434#issuecomment-2503216515

   Hi, @umeshdangat gentle ping.
   I think that this pr is an important base for many other following features, 
and we can try to merge it as soon as possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update blogpost for new KinesisStreamsSource and DynamoDbStreamsSource [flink-web]

2024-11-27 Thread via GitHub


foxus opened a new pull request, #767:
URL: https://github.com/apache/flink-web/pull/767

   I've addressed some minor grammar nits in the KinesisStreamsSource and 
DynamoDbStreamsSource blog.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] (1.19 backport)[FLINK-36751] Fix PausableRelativeClock does not pause when the source only has one split [flink]

2024-11-27 Thread via GitHub


rkhachatryan commented on PR #25695:
URL: https://github.com/apache/flink/pull/25695#issuecomment-2503274062

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36788] Fix and cover global committer [flink]

2024-11-27 Thread via GitHub


fapaul commented on code in PR #25685:
URL: https://github.com/apache/flink/pull/25685#discussion_r1860223898


##
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java:
##
@@ -90,7 +92,18 @@ private Collection translateInternal(
 transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME);
 transformation.setParallelism(1);
 transformation.setMaxParallelism(1);
-return Collections.emptyList();
+copySafely(transformation::setName, globalCommitterTransform::getName);

Review Comment:
   In which scenarios does the `globalCommitterTransform` have it's own 
attributes?



##
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java:
##
@@ -39,12 +41,29 @@ private StandardSinkTopologies() {}
  */
 public static  void addGlobalCommitter(
 DataStream> committables,
-SerializableSupplier> committerFactory,
+SerializableFunction> 
committerFactory,
 SerializableSupplier> 
committableSerializer) {
 committables
 .getExecutionEnvironment()
 .addOperator(
 new GlobalCommitterTransform<>(
 committables, committerFactory, 
committableSerializer));
 }
+
+/**
+ * Adds a global committer to the pipeline that runs as final operator 
with a parallelism of
+ * one.
+ */
+public static  void addGlobalCommitter(

Review Comment:
   Nit: Should we mark this method as deprecated to signal it will eventually 
be removed? 



##
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java:
##
@@ -188,6 +196,14 @@ private void expand() {
 }
 }
 
+private  void repeatUntilConverged(Supplier producer) {
+R lastResult = producer.get();
+R nextResult;
+while (!lastResult.equals(nextResult = producer.get())) {
+lastResult = nextResult;
+}
+}
+
 private List> getSinkTransformations(int sizeBefore) 
{

Review Comment:
   Didn't we discuss offline that the current behavior of sublisting the 
transformations isn't safe?



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java:
##
@@ -241,7 +242,8 @@ public TestSinkV2WithPostCommitTopology(
 
 @Override
 public void 
addPostCommitTopology(DataStream> committables) {
-// We do not need to do anything for tests
+StandardSinkTopologies.addGlobalCommitter(

Review Comment:
   Nit: Should we open a follow-up to increase the coverage even further by 
also covering the expansions of pre/post-commit topology?



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java:
##
@@ -18,77 +18,285 @@
 
 package org.apache.flink.streaming.api.graph;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
 import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
+import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
 import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
-import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.function.Predicate;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Tests for {@link 
org.apache.flink.streaming.api.transformations.SinkTransformation}.
  *
  * ATTENTION: This test is extremely brittle. Do NOT remove, add or 
re-order test cases.
  */
-@ExtendWith(ParameterizedTestExtension

[jira] [Commented] (FLINK-36780) Kafka source disable partition discovery unexpectedly

2024-11-27 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901414#comment-17901414
 ] 

Hongshun Wang commented on FLINK-36780:
---

[~liuml07] , I agree with you. This modification should be noticed for user 
when release.

> Kafka source disable partition discovery unexpectedly
> -
>
> Key: FLINK-36780
> URL: https://issues.apache.org/jira/browse/FLINK-36780
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-4.0.0, kafka-3.2.0, kafka-3.3.0, kafka-3.4.0
>Reporter: Mingliang Liu
>Assignee: Mingliang Liu
>Priority: Major
>  Labels: pull-request-available
>
> Currently Kafka source enables partition discovery. This is set by 
> {{partition.discovery.interval.ms}}, aka 
> {{KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS}}. The default value is 
> 5 minutes, which is equal to the default value of {{metadata.max.age.ms}} in 
> Kafka.
> However, it's disabled by default unexpectedly in the source builder 
> ([code|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L476-L480]).
>  The intention I believe was to only disable for bounded source.
> We need a fix that is able to keep the default partition discovery. This 
> could cause data loss after Kafka retention if the new partitions are not 
> consumed silently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [Minor][Doc] fix spelling error. [flink-cdc]

2024-11-27 Thread via GitHub


lvyanquan opened a new pull request, #3764:
URL: https://github.com/apache/flink-cdc/pull/3764

   Sorry for this mistake.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [docs] Update download links to up-to-date cdc version [flink-cdc]

2024-11-27 Thread via GitHub


yuxiqian opened a new pull request, #3765:
URL: https://github.com/apache/flink-cdc/pull/3765

   Use parameterized version string instead of write hard-encoded links.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [docs] Update download links to up-to-date cdc version [flink-cdc]

2024-11-27 Thread via GitHub


yuxiqian opened a new pull request, #3766:
URL: https://github.com/apache/flink-cdc/pull/3766

   Use parameterized version string instead of write hard-encoded links.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-3.2][docs] Update download links to up-to-date cdc version [flink-cdc]

2024-11-27 Thread via GitHub


yuxiqian closed pull request #3765: [BP-3.2][docs] Update download links to 
up-to-date cdc version
URL: https://github.com/apache/flink-cdc/pull/3765


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36806) Support for synchronization between timestamp fields with and without time zones to avoid time offsets

2024-11-27 Thread zjjiang (Jira)
zjjiang created FLINK-36806:
---

 Summary: Support for synchronization between timestamp fields with 
and without time zones to avoid time offsets
 Key: FLINK-36806
 URL: https://issues.apache.org/jira/browse/FLINK-36806
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.1, cdc-3.1.0, cdc-3.3.0
Reporter: zjjiang
 Attachments: image-2024-11-27-16-23-22-995.png

Currently, FlinkCDC can correctly synchronize the same time type (where the 
type indicates whether the time zone is included or not), but time offsets can 
occur when synchronizing between fields of different time types. For example, 
when synchronizing the field DATETIME (without time zone) in MySQL to the field 
TIMESTAMPTZ (with time zone) in Iceberg, there is an 8-hour offset after 
synchronization when the time in mysql represents Beijing time.

This kind of synchronization in different time type fields is very common in 
practical applications, however, the time offset will be very troublesome for 
business. Therefore, Flink CDC's support for the conversion of different time 
types will avoid the time offset and help improve the data consistency and 
accuracy of synchronization.

 

!image-2024-11-27-16-23-22-995.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36806) Support for synchronization between timestamp fields with and without time zones to avoid time offsets

2024-11-27 Thread zjjiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjjiang updated FLINK-36806:

Description: 
Currently, FlinkCDC can correctly synchronize the same time type (where the 
type indicates whether the time zone is included or not), but time offsets can 
occur when synchronizing between fields of different time types. For example, 
when synchronizing the field DATETIME (without time zone) in MySQL to the field 
TIMESTAMPTZ (with time zone) in Iceberg, there is an 8-hour offset after 
synchronization when the time in mysql represents Beijing time.

This kind of synchronization in different time type fields is very common in 
practical applications, however, the time offset will be very troublesome for 
business. Therefore, Flink CDC's support for the conversion of different time 
types will avoid the time offset and help improve the data consistency and 
accuracy of synchronization.

 

!image-2024-11-27-16-23-22-995.png|width=1004,height=375!

  was:
Currently, FlinkCDC can correctly synchronize the same time type (where the 
type indicates whether the time zone is included or not), but time offsets can 
occur when synchronizing between fields of different time types. For example, 
when synchronizing the field DATETIME (without time zone) in MySQL to the field 
TIMESTAMPTZ (with time zone) in Iceberg, there is an 8-hour offset after 
synchronization when the time in mysql represents Beijing time.

This kind of synchronization in different time type fields is very common in 
practical applications, however, the time offset will be very troublesome for 
business. Therefore, Flink CDC's support for the conversion of different time 
types will avoid the time offset and help improve the data consistency and 
accuracy of synchronization.

 

!image-2024-11-27-16-23-22-995.png!


> Support for synchronization between timestamp fields with and without time 
> zones to avoid time offsets
> --
>
> Key: FLINK-36806
> URL: https://issues.apache.org/jira/browse/FLINK-36806
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.3.0, cdc-3.2.1
>Reporter: zjjiang
>Priority: Major
> Attachments: image-2024-11-27-16-23-22-995.png
>
>
> Currently, FlinkCDC can correctly synchronize the same time type (where the 
> type indicates whether the time zone is included or not), but time offsets 
> can occur when synchronizing between fields of different time types. For 
> example, when synchronizing the field DATETIME (without time zone) in MySQL 
> to the field TIMESTAMPTZ (with time zone) in Iceberg, there is an 8-hour 
> offset after synchronization when the time in mysql represents Beijing time.
> This kind of synchronization in different time type fields is very common in 
> practical applications, however, the time offset will be very troublesome for 
> business. Therefore, Flink CDC's support for the conversion of different time 
> types will avoid the time offset and help improve the data consistency and 
> accuracy of synchronization.
>  
> !image-2024-11-27-16-23-22-995.png|width=1004,height=375!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36788] Fix and cover global committer [flink]

2024-11-27 Thread via GitHub


AHeise commented on code in PR #25685:
URL: https://github.com/apache/flink/pull/25685#discussion_r1860258571


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java:
##
@@ -241,7 +242,8 @@ public TestSinkV2WithPostCommitTopology(
 
 @Override
 public void 
addPostCommitTopology(DataStream> committables) {
-// We do not need to do anything for tests
+StandardSinkTopologies.addGlobalCommitter(

Review Comment:
   That's a good point. I created 
https://issues.apache.org/jira/browse/FLINK-36807 .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >