[jira] [Created] (FLINK-36811) MySQL CDC
Xuannan Su created FLINK-36811: -- Summary: MySQL CDC Key: FLINK-36811 URL: https://issues.apache.org/jira/browse/FLINK-36811 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36811) MySQL CDC source set is processing backlog during snapshot phase
[ https://issues.apache.org/jira/browse/FLINK-36811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su reassigned FLINK-36811: -- Assignee: Ruan Hang Summary: MySQL CDC source set is processing backlog during snapshot phase (was: MySQL CDC) > MySQL CDC source set is processing backlog during snapshot phase > > > Key: FLINK-36811 > URL: https://issues.apache.org/jira/browse/FLINK-36811 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Xuannan Su >Assignee: Ruan Hang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36812) The flink jdbc connector's 'scan.partition.column' supports the varchar field type.
ouyangwulin created FLINK-36812: --- Summary: The flink jdbc connector's 'scan.partition.column' supports the varchar field type. Key: FLINK-36812 URL: https://issues.apache.org/jira/browse/FLINK-36812 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Affects Versions: jdbc-3.2.0 Reporter: ouyangwulin Fix For: jdbc-3.3.0 The scan.partition.column must be a numeric, date, or timestamp column from the table in question. But in many cases, tables don't have numeric, date, or timestamp columns, and we need to support a varchar field to make concurrent ingest useful in a wider range of scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35670) Support Postgres CDC Pipeline Source
[ https://issues.apache.org/jira/browse/FLINK-35670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901604#comment-17901604 ] ouyangwulin commented on FLINK-35670: - I investigated the implementation, it is really very difficult, pgoutput does not have ddl, decoderbuf plug-in has ddl, but the difficulty is not so difficult, we can consider using decoderbuf to parse to ddl to achieve a version. > Support Postgres CDC Pipeline Source > > > Key: FLINK-35670 > URL: https://issues.apache.org/jira/browse/FLINK-35670 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Muhammet Orazov >Priority: Major > > Similar to other [CDC pipeline > sources|https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/pipeline-connectors/overview/] > (only MySQL at the moment), we should support Postgres as a pipeline source. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36813) MySQLCDC supports synchronization of specified fields
[ https://issues.apache.org/jira/browse/FLINK-36813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36813: -- Description: *Background* In some scenarios, MySQL synchronization only expects to synchronize specified fields instead of all fields in the table. 1. The user only has the permission for some fields in MySQL 2. The user has too many fields in a single table and only wants to synchronize some fields, for example, here [https://github.com/apache/flink-cdc/discussions/3058] *Current situation* For the incremental stage, you only need to configure the column.include.list property of debezium to support the synchronization of some fields in the incremental stage, refer to: [https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list] For the full snapshot stage, * is currently used in {_}MySqlSnapshotSplitReadTask{_}, refer to {code:java} if (isScanningData) { return buildSelectWithRowLimits( tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); {code} *Solution* We can refer to debezium [RelationalSnapshotChangeEventSource](https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java#L752-L776), The user configures column.include.list, and then captures the specific columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan SQL. was: *Background* In some scenarios, MySQL synchronization only expects to synchronize specified fields instead of all fields in the table. 1. The user only has the permission for some fields in MySQL 2. The user has too many fields in a single table and only wants to synchronize some fields, for example, here https://github.com/apache/flink-cdc/discussions/3058 *Current situation* For the incremental stage, you only need to configure the column.include.list property of debezium to support the synchronization of some fields in the incremental stage, refer to: https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list For the full snapshot stage, * is currently used in {_}MySqlSnapshotSplitReadTask{_}, refer to {code:java} if (isScanningData) { return buildSelectWithRowLimits( tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); {code} *Solution* The user configures {_}column.include.list{_}, and then captures the specific columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan SQL. > MySQLCDC supports synchronization of specified fields > - > > Key: FLINK-36813 > URL: https://issues.apache.org/jira/browse/FLINK-36813 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.1 >Reporter: Di Wu >Priority: Major > Labels: CDC, pull-request-available > Fix For: cdc-3.3.0 > > > *Background* > In some scenarios, MySQL synchronization only expects to synchronize > specified fields instead of all fields in the table. > 1. The user only has the permission for some fields in MySQL > 2. The user has too many fields in a single table and only wants to > synchronize some fields, for example, here > [https://github.com/apache/flink-cdc/discussions/3058] > *Current situation* > For the incremental stage, you only need to configure the column.include.list > property of debezium to support the synchronization of some fields in the > incremental stage, refer to: > [https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list] > For the full snapshot stage, * is currently used in > {_}MySqlSnapshotSplitReadTask{_}, refer to > {code:java} > if (isScanningData) { > return buildSelectWithRowLimits( > tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); > {code} > > *Solution* > We can refer to debezium > [RelationalSnapshotChangeEventSource](https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java#L752-L776), > The user configures column.include.list, and then captures the specific > columns in MySqlSnapshotSplitReadTask, and splices them when constructing > Scan SQL. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36751) PausableRelativeClock does not pause when the source only has one split
[ https://issues.apache.org/jira/browse/FLINK-36751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan reassigned FLINK-36751: - Assignee: haishui > PausableRelativeClock does not pause when the source only has one split > --- > > Key: FLINK-36751 > URL: https://issues.apache.org/jira/browse/FLINK-36751 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Affects Versions: 1.20.0, 1.19.1, 2.0-preview >Reporter: haishui >Assignee: haishui >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Reason: > PausableRelativeClock#pause is called at pauseOrResumeSplits in > ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when > the sourceOperator has more than one splits. > > My example code tested on Flink 1.20-SNAPSHOT is as follows: > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > DataGeneratorSource dataGen1 = new DataGeneratorSource<>(v -> v, > Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG); > DataGeneratorSource dataGen2 = new DataGeneratorSource<>(v -> v + 500, > Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG); > WatermarkStrategy watermarkStrategy = WatermarkStrategy > .forMonotonousTimestamps() > .withTimestampAssigner((aLong, l) -> aLong) > .withWatermarkAlignment("default", Duration.ofMillis(5), > Duration.ofSeconds(5)) > .withIdleness(Duration.ofSeconds(5)); > DataStreamSource s1 = env.fromSource(dataGen1, watermarkStrategy, "S1"); > DataStream s2 = env.fromSource(dataGen2, watermarkStrategy, "S2"); > s1.print("S1"); > s2.print("S2"); > s1.keyBy(v -> 0) > .connect(s2.keyBy(v -> 0)) > .process(new CoProcessFunction() { > @Override > public void processElement1(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception { > if (context.timestamp() < > context.timerService().currentWatermark()) { > throw new IllegalStateException("left stream element is > late: " + aLong); > } > } @Override > public void processElement2(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception { > if (context.timestamp() < > context.timerService().currentWatermark()) { > throw new IllegalStateException("right stream element is > late: " + aLong); > } > } > }); > env.execute();{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36751) PausableRelativeClock does not pause when the source only has one split
[ https://issues.apache.org/jira/browse/FLINK-36751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901589#comment-17901589 ] Roman Khachatryan commented on FLINK-36751: --- Merged into master as 0c338ae6d21e6d3dc79764e7801cbb7adfe6a44f into 1.20 as 98045aad2c0ace527893bb20a9daa054aa48adbf into 1.19 as d917462ee6492d83559d9e602a82230a850272a1 > PausableRelativeClock does not pause when the source only has one split > --- > > Key: FLINK-36751 > URL: https://issues.apache.org/jira/browse/FLINK-36751 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Affects Versions: 1.20.0, 1.19.1, 2.0-preview >Reporter: haishui >Assignee: haishui >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Reason: > PausableRelativeClock#pause is called at pauseOrResumeSplits in > ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when > the sourceOperator has more than one splits. > > My example code tested on Flink 1.20-SNAPSHOT is as follows: > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > DataGeneratorSource dataGen1 = new DataGeneratorSource<>(v -> v, > Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG); > DataGeneratorSource dataGen2 = new DataGeneratorSource<>(v -> v + 500, > Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG); > WatermarkStrategy watermarkStrategy = WatermarkStrategy > .forMonotonousTimestamps() > .withTimestampAssigner((aLong, l) -> aLong) > .withWatermarkAlignment("default", Duration.ofMillis(5), > Duration.ofSeconds(5)) > .withIdleness(Duration.ofSeconds(5)); > DataStreamSource s1 = env.fromSource(dataGen1, watermarkStrategy, "S1"); > DataStream s2 = env.fromSource(dataGen2, watermarkStrategy, "S2"); > s1.print("S1"); > s2.print("S2"); > s1.keyBy(v -> 0) > .connect(s2.keyBy(v -> 0)) > .process(new CoProcessFunction() { > @Override > public void processElement1(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception { > if (context.timestamp() < > context.timerService().currentWatermark()) { > throw new IllegalStateException("left stream element is > late: " + aLong); > } > } @Override > public void processElement2(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception { > if (context.timestamp() < > context.timerService().currentWatermark()) { > throw new IllegalStateException("right stream element is > late: " + aLong); > } > } > }); > env.execute();{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36751) PausableRelativeClock does not pause when the source only has one split
[ https://issues.apache.org/jira/browse/FLINK-36751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan closed FLINK-36751. - Resolution: Fixed Thanks for the contribution [~haishui] ! > PausableRelativeClock does not pause when the source only has one split > --- > > Key: FLINK-36751 > URL: https://issues.apache.org/jira/browse/FLINK-36751 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Affects Versions: 1.20.0, 1.19.1, 2.0-preview >Reporter: haishui >Assignee: haishui >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Reason: > PausableRelativeClock#pause is called at pauseOrResumeSplits in > ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when > the sourceOperator has more than one splits. > > My example code tested on Flink 1.20-SNAPSHOT is as follows: > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > DataGeneratorSource dataGen1 = new DataGeneratorSource<>(v -> v, > Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG); > DataGeneratorSource dataGen2 = new DataGeneratorSource<>(v -> v + 500, > Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG); > WatermarkStrategy watermarkStrategy = WatermarkStrategy > .forMonotonousTimestamps() > .withTimestampAssigner((aLong, l) -> aLong) > .withWatermarkAlignment("default", Duration.ofMillis(5), > Duration.ofSeconds(5)) > .withIdleness(Duration.ofSeconds(5)); > DataStreamSource s1 = env.fromSource(dataGen1, watermarkStrategy, "S1"); > DataStream s2 = env.fromSource(dataGen2, watermarkStrategy, "S2"); > s1.print("S1"); > s2.print("S2"); > s1.keyBy(v -> 0) > .connect(s2.keyBy(v -> 0)) > .process(new CoProcessFunction() { > @Override > public void processElement1(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception { > if (context.timestamp() < > context.timerService().currentWatermark()) { > throw new IllegalStateException("left stream element is > late: " + aLong); > } > } @Override > public void processElement2(Long aLong, CoProcessFunction Long, Void>.Context context, Collector collector) throws Exception { > if (context.timestamp() < > context.timerService().currentWatermark()) { > throw new IllegalStateException("right stream element is > late: " + aLong); > } > } > }); > env.execute();{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] (1.19 backport)[FLINK-36751] Fix PausableRelativeClock does not pause when the source only has one split [flink]
rkhachatryan merged PR #25695: URL: https://github.com/apache/flink/pull/25695 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Handle map and array types binary record data [flink-cdc]
umeshdangat commented on code in PR #3434: URL: https://github.com/apache/flink-cdc/pull/3434#discussion_r1861307255 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataFieldSerializer.java: ## @@ -39,7 +39,14 @@ public class DataFieldSerializer extends TypeSerializerSingleton { public static final DataFieldSerializer INSTANCE = new DataFieldSerializer(); private final StringSerializer stringSerializer = StringSerializer.INSTANCE; -private final DataTypeSerializer dataTypeSerializer = new DataTypeSerializer(); +private DataTypeSerializer dataTypeSerializer; + +private DataTypeSerializer getDataTypeSerializer() { Review Comment: > I came across a bug due to circular dependency between DataFieldSerializer, DataTypeSerializer, and RowTypeSerializer classes that causes NPE for certain RowTypes like nested row type. I have baked that change in this PR but also have a separate one https://github.com/apache/flink-cdc/pull/3400 [explained earlier here](https://github.com/apache/flink-cdc/pull/3434#issuecomment-2192072774) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]
snuyanzin commented on PR #24829: URL: https://github.com/apache/flink/pull/24829#issuecomment-2504944809 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36756) Bump up the sql-gateway rest api version
[ https://issues.apache.org/jira/browse/FLINK-36756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang reassigned FLINK-36756: - Assignee: Shengkai Fang > Bump up the sql-gateway rest api version > > > Key: FLINK-36756 > URL: https://issues.apache.org/jira/browse/FLINK-36756 > Project: Flink > Issue Type: Sub-task >Reporter: Shengkai Fang >Assignee: Shengkai Fang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36756][sql-gateway] Bump up the sql gateway rest version [flink]
fsk119 opened a new pull request, #25705: URL: https://github.com/apache/flink/pull/25705 ## What is the purpose of the change *Bump up the SQL Gateway REST API verison.* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36756) Bump up the sql-gateway rest api version
[ https://issues.apache.org/jira/browse/FLINK-36756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36756: --- Labels: pull-request-available (was: ) > Bump up the sql-gateway rest api version > > > Key: FLINK-36756 > URL: https://issues.apache.org/jira/browse/FLINK-36756 > Project: Flink > Issue Type: Sub-task >Reporter: Shengkai Fang >Assignee: Shengkai Fang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36803][cdc-connector][base] Use the same format `tableId:chunkId` for splitId in SnapshotSplit [flink-cdc]
yuxiqian commented on code in PR #3763: URL: https://github.com/apache/flink-cdc/pull/3763#discussion_r1861478089 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java: ## @@ -152,7 +152,7 @@ public Offset createNoStoppingOffset() { private SchemalessSnapshotSplit constuctSchemalessSnapshotSplit() { return new SchemalessSnapshotSplit( tableId, -"test", +tableId.toString() + ":0", Review Comment: Is is possible to make it `@VisibleForTesting` and call `generateSplitId`, to reduce potential discrepancy? ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java: ## @@ -190,7 +190,7 @@ private static MySqlSchemalessSnapshotSplit getTestSchemalessSnapshotSplit( TableId tableId, int splitNo) { return new MySqlSchemalessSnapshotSplit( tableId, -tableId.toString() + "-" + splitNo, +tableId.toString() + ":" + splitNo, Review Comment: Ditto ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SnapshotSplit.java: ## @@ -95,6 +125,18 @@ public final SchemalessSnapshotSplit toSchemalessSnapshotSplit() { tableId, splitId, splitKeyType, splitStart, splitEnd, highWatermark); } +private static String generateSplitId(TableId tableId, int chunkId) { +return tableId.toString() + ":" + chunkId; +} + +public static TableId parseTableId(String splitId) { Review Comment: Will `extract` be a more precise name than `parse`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36756][sql-gateway] Bump up the sql gateway rest version [flink]
flinkbot commented on PR #25705: URL: https://github.com/apache/flink/pull/25705#issuecomment-2505221909 ## CI report: * 1e83f1bda02cb512b4b1acb6f5c5758f2bc37e6d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36813) MySQLCDC supports synchronization of specified fields
Di Wu created FLINK-36813: - Summary: MySQLCDC supports synchronization of specified fields Key: FLINK-36813 URL: https://issues.apache.org/jira/browse/FLINK-36813 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.2.1 Reporter: Di Wu Fix For: cdc-3.3.0 *Background* In some scenarios, MySQL synchronization only expects to synchronize specified fields instead of all fields in the table. 1. The user only has the permission for some fields in MySQL 2. The user has too many fields in a single table and only wants to synchronize some fields, for example, here https://github.com/apache/flink-cdc/discussions/3058 *Current situation* For the incremental stage, you only need to configure the column.include.list property of debezium to support the synchronization of some fields in the incremental stage, refer to: https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list For the full snapshot stage, * is currently used in {_}MySqlSnapshotSplitReadTask{_}, refer to {code:java} if (isScanningData) { return buildSelectWithRowLimits( tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); {code} *Solution* The user configures {_}column.include.list{_}, and then captures the specific columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan SQL. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36813][cdc-connectors][mysql] support mysql sync part columns [flink-cdc]
JNSimba opened a new pull request, #3767: URL: https://github.com/apache/flink-cdc/pull/3767 **Background** In some scenarios, MySQL synchronization only expects to synchronize specified fields instead of all fields in the table. 1. The user only has the permission for some fields in MySQL 2. The user has too many fields in a single table and only wants to synchronize some fields, for example, here https://github.com/apache/flink-cdc/discussions/3058 **Current situation** For the incremental stage, you only need to configure the column.include.list property of debezium to support the synchronization of some fields in the incremental stage, refer to: https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list For the full snapshot stage, * is currently used in MySqlSnapshotSplitReadTask, refer to ``` ``` if (isScanningData) { return buildSelectWithRowLimits( tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); **Solution** The user configures column.include.list, and then captures the specific columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan SQL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36813) MySQLCDC supports synchronization of specified fields
[ https://issues.apache.org/jira/browse/FLINK-36813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36813: --- Labels: CDC pull-request-available (was: CDC) > MySQLCDC supports synchronization of specified fields > - > > Key: FLINK-36813 > URL: https://issues.apache.org/jira/browse/FLINK-36813 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.1 >Reporter: Di Wu >Priority: Major > Labels: CDC, pull-request-available > Fix For: cdc-3.3.0 > > > *Background* > In some scenarios, MySQL synchronization only expects to synchronize > specified fields instead of all fields in the table. > 1. The user only has the permission for some fields in MySQL > 2. The user has too many fields in a single table and only wants to > synchronize some fields, for example, here > https://github.com/apache/flink-cdc/discussions/3058 > *Current situation* > For the incremental stage, you only need to configure the column.include.list > property of debezium to support the synchronization of some fields in the > incremental stage, refer to: > https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list > For the full snapshot stage, * is currently used in > {_}MySqlSnapshotSplitReadTask{_}, refer to > {code:java} > if (isScanningData) { > return buildSelectWithRowLimits( > tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); > {code} > > *Solution* > The user configures {_}column.include.list{_}, and then captures the specific > columns in MySqlSnapshotSplitReadTask, and splices them when constructing > Scan SQL. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][javadocs] Remove duplicated word 'the' in Javadocs [flink]
snuyanzin commented on PR #24829: URL: https://github.com/apache/flink/pull/24829#issuecomment-2505382087 @naferx can you rebase the PR I tend to think that the reason of the current failure has already been fixed in master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][docs] Fix typo in alter.md [flink]
snuyanzin commented on code in PR #24858: URL: https://github.com/apache/flink/pull/24858#discussion_r1861608059 ## docs/content/docs/dev/table/hive-compatibility/hive-dialect/alter.md: ## @@ -40,7 +40,7 @@ With Hive dialect, the following ALTER statements are supported for now: -- alter database's properties ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...); --- alter database's localtion +-- alter database's location Review Comment: Can you also fix the same typo in Chinese version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on code in PR #766: URL: https://github.com/apache/flink-web/pull/766#discussion_r1860867485 ## docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md: ## @@ -0,0 +1,202 @@ +--- +title: "Introducing the new Prometheus connector" +date: "2024-11-26T00:00:00.000Z" +authors: +- nicusX: + name: "Lorenzo Nicora" +--- + + +We are excited to announce a new sink connector that enables writing data to Prometheus ([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)). This articles introduces the main features of the connector, and the reasoning behind design decisions. + +This connector allows writing data to Prometheus using the [Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push interface, which lets you write time-series data to Prometheus at scale. + +## Motivations for a Prometheus connector + +Prometheus is an efficient time-series database optimized for building real-time dashboards and alerts, typically in combination with Grafana or other visualization tools. + +Prometheus is commonly used to monitor compute resources, IT infrastructure, Kubernetes clusters, applications, and cloud resources. It can also be used to observe your Flink cluster and Flink jobs. Flink existing [Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/) has this purpose. + +So, why do we need a connector? + +Prometheus can serve as a general-purpose observability time-series database, beyond traditional infrastructure monitoring. For example, it can be used to monitor IoT devices, sensors, connected cars, media streaming devices, and any resource that streams events or measurements continuously. + +Observability data from these use cases differs from metrics generated by compute resources. They present additional challenges: +* **Out-of-order events**: Devices may be connected via mobile networks or even Bluetooth. Events from different devices may follow different paths and arrive at very different times. A **stateful, event-time logic** can be used to reorder them. +* **High frequency** and **high cardinality**: You can have a sheer number of devices, each emitting signals multiple times per second. **Aggregating over time** and **over dimensions** can reduce frequency and cardinality and make the volume of data more efficiently analysable. +* **Lack of contextual information**: Raw events sent by the devices often lack of contextual information for a meaningful analysis. **Enrichment** of raw events, looking up some reference dataset, can be used to add dimensions useful for the analysis. +* **Noise**: sensor measurement may contain noise. For example when a GPS tracker lose connection and reports spurious positions. These obvious outliers can be **filtered** out to simplify visualization and analysis. + +Flink can be used as a pre-processor to address all the above. + +You can implement a sink from scratch or use AsyncIO to call the Prometheus Remote-Write endpoint. However, there are not-trivial details to implement an efficient Remote-Write client: +* There is no high-level client for Prometheus Remote-Write. You would need to build on top of a low-level HTTP client. +* Remote-Write can be inefficient unless write requests are batched and parallelized. +* Error handling can be complex, and specifications demand strict behaviors (see [Strict Specifications, Lenient Implementations](#strict-specifications-lenient-implementations)). + +The new Prometheus connector manages all of this for you. + +## Key features + +The version `1.0.0` of the Prometheus connector has the following features: + +* DataStream API, Java Sink, based on AsyncSinkBase. +* Configurable write batching. +* Order is retained on retries. +* At-most-once guarantees (we’ll explain the reason for this later). +* Supports parallelism greater than 1. +* Configurable retry strategy for retryable server responses (e.g., throttling) and configurable behavior when the retry limit is reached. +* Configurable behavior on retryable errors (e.g., throttling from the Prometheus backend). + +### Authentication + +Authentication is [explicitly out of scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for Prometheus Remote-Write specifications. +The connector provides a generic interface, PrometheusRequestSigner, to manipulate requests and add headers. This allows implementation of any authentication scheme that requires adding headers to the request, such as API keys, authorization, or signature tokens. + +In the release `1.0.0`, an implementation for Amazon Managed Service for Prometheus (AMP) is provided as a separate, optional dependency. + +## Designing the connector + +A sink to Prometheus differs from a sink to most other datastores or time-series databases. The wire interface is the easiest part; the main challenges arise from the diff
Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]
mehdid93 commented on PR #25670: URL: https://github.com/apache/flink/pull/25670#issuecomment-2504432014 @zentol Thank you! I have pushed a commit to use the new Image in order to test the changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on code in PR #766: URL: https://github.com/apache/flink-web/pull/766#discussion_r1860846971 ## docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md: ## @@ -0,0 +1,202 @@ +--- +title: "Introducing the new Prometheus connector" +date: "2024-11-26T00:00:00.000Z" +authors: +- nicusX: + name: "Lorenzo Nicora" +--- + + +We are excited to announce a new sink connector that enables writing data to Prometheus ([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)). This articles introduces the main features of the connector, and the reasoning behind design decisions. + +This connector allows writing data to Prometheus using the [Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push interface, which lets you write time-series data to Prometheus at scale. + +## Motivations for a Prometheus connector + +Prometheus is an efficient time-series database optimized for building real-time dashboards and alerts, typically in combination with Grafana or other visualization tools. + +Prometheus is commonly used to monitor compute resources, IT infrastructure, Kubernetes clusters, applications, and cloud resources. It can also be used to observe your Flink cluster and Flink jobs. Flink existing [Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/) has this purpose. + +So, why do we need a connector? + +Prometheus can serve as a general-purpose observability time-series database, beyond traditional infrastructure monitoring. For example, it can be used to monitor IoT devices, sensors, connected cars, media streaming devices, and any resource that streams events or measurements continuously. + +Observability data from these use cases differs from metrics generated by compute resources. They present additional challenges: +* **Out-of-order events**: Devices may be connected via mobile networks or even Bluetooth. Events from different devices may follow different paths and arrive at very different times. A **stateful, event-time logic** can be used to reorder them. +* **High frequency** and **high cardinality**: You can have a sheer number of devices, each emitting signals multiple times per second. **Aggregating over time** and **over dimensions** can reduce frequency and cardinality and make the volume of data more efficiently analysable. +* **Lack of contextual information**: Raw events sent by the devices often lack of contextual information for a meaningful analysis. **Enrichment** of raw events, looking up some reference dataset, can be used to add dimensions useful for the analysis. +* **Noise**: sensor measurement may contain noise. For example when a GPS tracker lose connection and reports spurious positions. These obvious outliers can be **filtered** out to simplify visualization and analysis. + +Flink can be used as a pre-processor to address all the above. + +You can implement a sink from scratch or use AsyncIO to call the Prometheus Remote-Write endpoint. However, there are not-trivial details to implement an efficient Remote-Write client: +* There is no high-level client for Prometheus Remote-Write. You would need to build on top of a low-level HTTP client. +* Remote-Write can be inefficient unless write requests are batched and parallelized. +* Error handling can be complex, and specifications demand strict behaviors (see [Strict Specifications, Lenient Implementations](#strict-specifications-lenient-implementations)). + +The new Prometheus connector manages all of this for you. + +## Key features + +The version `1.0.0` of the Prometheus connector has the following features: + +* DataStream API, Java Sink, based on AsyncSinkBase. +* Configurable write batching. +* Order is retained on retries. +* At-most-once guarantees (we’ll explain the reason for this later). +* Supports parallelism greater than 1. +* Configurable retry strategy for retryable server responses (e.g., throttling) and configurable behavior when the retry limit is reached. +* Configurable behavior on retryable errors (e.g., throttling from the Prometheus backend). Review Comment: Yes! removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
flinkbot commented on PR #25703: URL: https://github.com/apache/flink/pull/25703#issuecomment-2504141228 ## CI report: * b366d760dc117aa17cc528e2be5070b2f9036f4a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on code in PR #766: URL: https://github.com/apache/flink-web/pull/766#discussion_r1860854487 ## docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md: ## @@ -0,0 +1,202 @@ +--- +title: "Introducing the new Prometheus connector" +date: "2024-11-26T00:00:00.000Z" +authors: +- nicusX: + name: "Lorenzo Nicora" +--- + + +We are excited to announce a new sink connector that enables writing data to Prometheus ([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)). This articles introduces the main features of the connector, and the reasoning behind design decisions. + +This connector allows writing data to Prometheus using the [Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push interface, which lets you write time-series data to Prometheus at scale. + +## Motivations for a Prometheus connector + +Prometheus is an efficient time-series database optimized for building real-time dashboards and alerts, typically in combination with Grafana or other visualization tools. + +Prometheus is commonly used to monitor compute resources, IT infrastructure, Kubernetes clusters, applications, and cloud resources. It can also be used to observe your Flink cluster and Flink jobs. Flink existing [Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/) has this purpose. + +So, why do we need a connector? + +Prometheus can serve as a general-purpose observability time-series database, beyond traditional infrastructure monitoring. For example, it can be used to monitor IoT devices, sensors, connected cars, media streaming devices, and any resource that streams events or measurements continuously. + +Observability data from these use cases differs from metrics generated by compute resources. They present additional challenges: +* **Out-of-order events**: Devices may be connected via mobile networks or even Bluetooth. Events from different devices may follow different paths and arrive at very different times. A **stateful, event-time logic** can be used to reorder them. +* **High frequency** and **high cardinality**: You can have a sheer number of devices, each emitting signals multiple times per second. **Aggregating over time** and **over dimensions** can reduce frequency and cardinality and make the volume of data more efficiently analysable. +* **Lack of contextual information**: Raw events sent by the devices often lack of contextual information for a meaningful analysis. **Enrichment** of raw events, looking up some reference dataset, can be used to add dimensions useful for the analysis. +* **Noise**: sensor measurement may contain noise. For example when a GPS tracker lose connection and reports spurious positions. These obvious outliers can be **filtered** out to simplify visualization and analysis. + +Flink can be used as a pre-processor to address all the above. + +You can implement a sink from scratch or use AsyncIO to call the Prometheus Remote-Write endpoint. However, there are not-trivial details to implement an efficient Remote-Write client: +* There is no high-level client for Prometheus Remote-Write. You would need to build on top of a low-level HTTP client. +* Remote-Write can be inefficient unless write requests are batched and parallelized. +* Error handling can be complex, and specifications demand strict behaviors (see [Strict Specifications, Lenient Implementations](#strict-specifications-lenient-implementations)). + +The new Prometheus connector manages all of this for you. + +## Key features + +The version `1.0.0` of the Prometheus connector has the following features: + +* DataStream API, Java Sink, based on AsyncSinkBase. +* Configurable write batching. +* Order is retained on retries. +* At-most-once guarantees (we’ll explain the reason for this later). +* Supports parallelism greater than 1. +* Configurable retry strategy for retryable server responses (e.g., throttling) and configurable behavior when the retry limit is reached. +* Configurable behavior on retryable errors (e.g., throttling from the Prometheus backend). + +### Authentication + +Authentication is [explicitly out of scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for Prometheus Remote-Write specifications. +The connector provides a generic interface, PrometheusRequestSigner, to manipulate requests and add headers. This allows implementation of any authentication scheme that requires adding headers to the request, such as API keys, authorization, or signature tokens. + +In the release `1.0.0`, an implementation for Amazon Managed Service for Prometheus (AMP) is provided as a separate, optional dependency. + +## Designing the connector + +A sink to Prometheus differs from a sink to most other datastores or time-series databases. The wire interface is the easiest part; the main challenges arise from the diff
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on code in PR #766: URL: https://github.com/apache/flink-web/pull/766#discussion_r1860851806 ## docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md: ## @@ -0,0 +1,202 @@ +--- +title: "Introducing the new Prometheus connector" +date: "2024-11-26T00:00:00.000Z" +authors: +- nicusX: + name: "Lorenzo Nicora" +--- + + +We are excited to announce a new sink connector that enables writing data to Prometheus ([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)). This articles introduces the main features of the connector, and the reasoning behind design decisions. + +This connector allows writing data to Prometheus using the [Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push interface, which lets you write time-series data to Prometheus at scale. + +## Motivations for a Prometheus connector + +Prometheus is an efficient time-series database optimized for building real-time dashboards and alerts, typically in combination with Grafana or other visualization tools. + +Prometheus is commonly used to monitor compute resources, IT infrastructure, Kubernetes clusters, applications, and cloud resources. It can also be used to observe your Flink cluster and Flink jobs. Flink existing [Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/) has this purpose. + +So, why do we need a connector? + +Prometheus can serve as a general-purpose observability time-series database, beyond traditional infrastructure monitoring. For example, it can be used to monitor IoT devices, sensors, connected cars, media streaming devices, and any resource that streams events or measurements continuously. + +Observability data from these use cases differs from metrics generated by compute resources. They present additional challenges: +* **Out-of-order events**: Devices may be connected via mobile networks or even Bluetooth. Events from different devices may follow different paths and arrive at very different times. A **stateful, event-time logic** can be used to reorder them. +* **High frequency** and **high cardinality**: You can have a sheer number of devices, each emitting signals multiple times per second. **Aggregating over time** and **over dimensions** can reduce frequency and cardinality and make the volume of data more efficiently analysable. +* **Lack of contextual information**: Raw events sent by the devices often lack of contextual information for a meaningful analysis. **Enrichment** of raw events, looking up some reference dataset, can be used to add dimensions useful for the analysis. +* **Noise**: sensor measurement may contain noise. For example when a GPS tracker lose connection and reports spurious positions. These obvious outliers can be **filtered** out to simplify visualization and analysis. + +Flink can be used as a pre-processor to address all the above. + +You can implement a sink from scratch or use AsyncIO to call the Prometheus Remote-Write endpoint. However, there are not-trivial details to implement an efficient Remote-Write client: +* There is no high-level client for Prometheus Remote-Write. You would need to build on top of a low-level HTTP client. +* Remote-Write can be inefficient unless write requests are batched and parallelized. +* Error handling can be complex, and specifications demand strict behaviors (see [Strict Specifications, Lenient Implementations](#strict-specifications-lenient-implementations)). + +The new Prometheus connector manages all of this for you. Review Comment: Good point. I will try to clarify, hoping not to be too verbose. But the bottomline is that this is **data** from the point of view of the application -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on code in PR #766: URL: https://github.com/apache/flink-web/pull/766#discussion_r1860853048 ## docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md: ## @@ -0,0 +1,202 @@ +--- +title: "Introducing the new Prometheus connector" +date: "2024-11-26T00:00:00.000Z" +authors: +- nicusX: + name: "Lorenzo Nicora" +--- + + +We are excited to announce a new sink connector that enables writing data to Prometheus ([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)). This articles introduces the main features of the connector, and the reasoning behind design decisions. + +This connector allows writing data to Prometheus using the [Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push interface, which lets you write time-series data to Prometheus at scale. + +## Motivations for a Prometheus connector + +Prometheus is an efficient time-series database optimized for building real-time dashboards and alerts, typically in combination with Grafana or other visualization tools. + +Prometheus is commonly used to monitor compute resources, IT infrastructure, Kubernetes clusters, applications, and cloud resources. It can also be used to observe your Flink cluster and Flink jobs. Flink existing [Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/) has this purpose. + +So, why do we need a connector? + +Prometheus can serve as a general-purpose observability time-series database, beyond traditional infrastructure monitoring. For example, it can be used to monitor IoT devices, sensors, connected cars, media streaming devices, and any resource that streams events or measurements continuously. + +Observability data from these use cases differs from metrics generated by compute resources. They present additional challenges: +* **Out-of-order events**: Devices may be connected via mobile networks or even Bluetooth. Events from different devices may follow different paths and arrive at very different times. A **stateful, event-time logic** can be used to reorder them. +* **High frequency** and **high cardinality**: You can have a sheer number of devices, each emitting signals multiple times per second. **Aggregating over time** and **over dimensions** can reduce frequency and cardinality and make the volume of data more efficiently analysable. +* **Lack of contextual information**: Raw events sent by the devices often lack of contextual information for a meaningful analysis. **Enrichment** of raw events, looking up some reference dataset, can be used to add dimensions useful for the analysis. +* **Noise**: sensor measurement may contain noise. For example when a GPS tracker lose connection and reports spurious positions. These obvious outliers can be **filtered** out to simplify visualization and analysis. + +Flink can be used as a pre-processor to address all the above. + +You can implement a sink from scratch or use AsyncIO to call the Prometheus Remote-Write endpoint. However, there are not-trivial details to implement an efficient Remote-Write client: +* There is no high-level client for Prometheus Remote-Write. You would need to build on top of a low-level HTTP client. +* Remote-Write can be inefficient unless write requests are batched and parallelized. +* Error handling can be complex, and specifications demand strict behaviors (see [Strict Specifications, Lenient Implementations](#strict-specifications-lenient-implementations)). + +The new Prometheus connector manages all of this for you. + +## Key features + +The version `1.0.0` of the Prometheus connector has the following features: + +* DataStream API, Java Sink, based on AsyncSinkBase. +* Configurable write batching. +* Order is retained on retries. +* At-most-once guarantees (we’ll explain the reason for this later). +* Supports parallelism greater than 1. +* Configurable retry strategy for retryable server responses (e.g., throttling) and configurable behavior when the retry limit is reached. +* Configurable behavior on retryable errors (e.g., throttling from the Prometheus backend). + +### Authentication + +Authentication is [explicitly out of scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for Prometheus Remote-Write specifications. +The connector provides a generic interface, PrometheusRequestSigner, to manipulate requests and add headers. This allows implementation of any authentication scheme that requires adding headers to the request, such as API keys, authorization, or signature tokens. + +In the release `1.0.0`, an implementation for Amazon Managed Service for Prometheus (AMP) is provided as a separate, optional dependency. + +## Designing the connector + +A sink to Prometheus differs from a sink to most other datastores or time-series databases. The wire interface is the easiest part; the main challenges arise from the diff
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais closed pull request #25703: [FLINK-36809][table] Support ignoreIfExists param for createTable URL: https://github.com/apache/flink/pull/25703 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]
zentol commented on PR #25670: URL: https://github.com/apache/flink/pull/25670#issuecomment-2504396199 @mehdid93 You can try your changes with [chesnay/flink-ci:java_8_11_17_21_maven_386_jammy](https://hub.docker.com/layers/chesnay/flink-ci/java_8_11_17_21_maven_386_jammy/images/sha256-85838a97ac7151fac2493de5df7dd8fbb02e3ba1d48d7f64b654866deed07448?context=explore). I'll open a PR tomorrow to update CI as a whole to use that image. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update blogpost for new KinesisStreamsSource and DynamoDbStreamsSource [flink-web]
hlteoh37 merged PR #767: URL: https://github.com/apache/flink-web/pull/767 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33566) HBase sql-connector needs overwrite the rowKey
[ https://issues.apache.org/jira/browse/FLINK-33566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky updated FLINK-33566: - Fix Version/s: hbase-4.1.0 (was: hbase-3.0.0) > HBase sql-connector needs overwrite the rowKey > -- > > Key: FLINK-33566 > URL: https://issues.apache.org/jira/browse/FLINK-33566 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase >Affects Versions: 1.18.0 > Environment: flink: 1.18.0 > hbase: 2.2.3 > flink-connector-hbase-2.2:3.0.0-1.18 >Reporter: JankoWilliam >Priority: Major > Labels: HBase-2.0, flink-connector-hbase, pull-request-available > Fix For: hbase-4.1.0 > > > When I want to write label values of 50+to a rowkey column family in HBase > (all values are label values of 0/1), for example: > {"id":"","q1":"0","q2":"1","q3":"0","q4":"1",...,"q49":"0","q50":"1"} > Here are four label values for example: > {"id":"","q1":"0","q2":"1","q3":"0","q4":"1"} > {code:java} > --source: > CREATE TABLE kafka_table_( > `id` STRING, > `q1` STRING, > `q2` STRING, > `q3` STRING, > `q4` STRING > ) WITH ( > ... > 'connector'='kafka', > 'format'='json', > ... > ); > --sink: > CREATE TABLE hbase_table_ ( > rowkey STRING, > cf ROW, > PRIMARY KEY (rowkey ) NOT ENFORCED > ) WITH ( > 'connector' = 'my-hbase-2.2', > 'table-name' = 'test_table', > 'zookeeper.quorum' = '127.0.0.1' > ); > --insert: > insert into hbase_table_ > select > id AS rowkey , > ROW( cast(q1 as INT),cast(q2 as INT),cast(q3 as INT),cast(q4 as INT)) as cf > from kafka_table_ ;{code} > hbase: > hbase(main):016:0> scan 'test_table' > ROW COLUMN+CELL > > > column=cf:q1, > timestamp=1, value=\x00\x00\x00\x00 > > > column=cf:q2, > timestamp=1, value=\x00\x00\x00\x01 > > > column=cf:q3, > timestamp=1, value=\x00\x00\x00\x00 > > > column=cf:q4, > timestamp=1, value=\x00\x00\x00\x01 > > Upstream data has a fixed value of 50+k-v data, among which very few value > values are 1 (the default label value is 0). For example, only 1 or 2 values > are 1: q2=1, q4=1, so I want HBase to store the following values: > hbase(main):016:0> scan 'test_table' > ROW COLUMN+CELL > > > column=cf:q2, > timestamp=1, value=\x00\x00\x00\x01 > > column=cf:q4, > timestamp=1, value=\x00\x00\x00\x01 > > When I use the "sink. ignore null value" keyword here, It just don't update > the null value, and downstream third parties will still read all the values > (such as 50+), but there are only 2 values that are truly 1: > {code:java} > --sink: > CREATE TABLE hbase_table_ ( > rowkey STRING, > cf ROW, > PRIMARY KEY (rowkey ) NOT ENFORCED > ) WITH ( > 'connector' = 'my-hbase-2.2', > 'table-name' = 'test_table', > 'sink.ignore-null-value' = 'true', > 'zookeeper.quorum' = '127.0.0.1' > ); > --insert: > insert into hbase_table_ > select > id AS rowkey , > ROW( > case when q1 <> '0' then cast(q1 as INT) else null end, > case when q2 <> '0' then cast(q2 as INT) else null end, > case when q3 <> '0' then cast(q3 as INT) else null end, > case when q4 <> '0' then cast(q4 as INT) else null end > ) as cf > from kafka_table_ ; {code} > hbase(main):016:0> scan 'test_table' > ROW COLUMN+CELL > > >
[PR] [FLINK-35136][docs] Bump HBase connector docs version to v4.0 [flink]
ferenc-csaky opened a new pull request, #25702: URL: https://github.com/apache/flink/pull/25702 Simple release chore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
flinkbot commented on PR #25704: URL: https://github.com/apache/flink/pull/25704#issuecomment-2504208974 ## CI report: * d2bcde1986b8a02f114f9ff5556aae73919dcf7d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on code in PR #766: URL: https://github.com/apache/flink-web/pull/766#discussion_r1860897153 ## docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md: ## @@ -0,0 +1,202 @@ +--- +title: "Introducing the new Prometheus connector" +date: "2024-11-26T00:00:00.000Z" +authors: +- nicusX: + name: "Lorenzo Nicora" +--- + + +We are excited to announce a new sink connector that enables writing data to Prometheus ([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)). This articles introduces the main features of the connector, and the reasoning behind design decisions. + +This connector allows writing data to Prometheus using the [Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push interface, which lets you write time-series data to Prometheus at scale. + +## Motivations for a Prometheus connector + +Prometheus is an efficient time-series database optimized for building real-time dashboards and alerts, typically in combination with Grafana or other visualization tools. + +Prometheus is commonly used to monitor compute resources, IT infrastructure, Kubernetes clusters, applications, and cloud resources. It can also be used to observe your Flink cluster and Flink jobs. Flink existing [Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/) has this purpose. + +So, why do we need a connector? + +Prometheus can serve as a general-purpose observability time-series database, beyond traditional infrastructure monitoring. For example, it can be used to monitor IoT devices, sensors, connected cars, media streaming devices, and any resource that streams events or measurements continuously. + +Observability data from these use cases differs from metrics generated by compute resources. They present additional challenges: +* **Out-of-order events**: Devices may be connected via mobile networks or even Bluetooth. Events from different devices may follow different paths and arrive at very different times. A **stateful, event-time logic** can be used to reorder them. +* **High frequency** and **high cardinality**: You can have a sheer number of devices, each emitting signals multiple times per second. **Aggregating over time** and **over dimensions** can reduce frequency and cardinality and make the volume of data more efficiently analysable. +* **Lack of contextual information**: Raw events sent by the devices often lack of contextual information for a meaningful analysis. **Enrichment** of raw events, looking up some reference dataset, can be used to add dimensions useful for the analysis. +* **Noise**: sensor measurement may contain noise. For example when a GPS tracker lose connection and reports spurious positions. These obvious outliers can be **filtered** out to simplify visualization and analysis. + +Flink can be used as a pre-processor to address all the above. + +You can implement a sink from scratch or use AsyncIO to call the Prometheus Remote-Write endpoint. However, there are not-trivial details to implement an efficient Remote-Write client: +* There is no high-level client for Prometheus Remote-Write. You would need to build on top of a low-level HTTP client. +* Remote-Write can be inefficient unless write requests are batched and parallelized. +* Error handling can be complex, and specifications demand strict behaviors (see [Strict Specifications, Lenient Implementations](#strict-specifications-lenient-implementations)). + +The new Prometheus connector manages all of this for you. + +## Key features + +The version `1.0.0` of the Prometheus connector has the following features: + +* DataStream API, Java Sink, based on AsyncSinkBase. +* Configurable write batching. +* Order is retained on retries. +* At-most-once guarantees (we’ll explain the reason for this later). +* Supports parallelism greater than 1. +* Configurable retry strategy for retryable server responses (e.g., throttling) and configurable behavior when the retry limit is reached. +* Configurable behavior on retryable errors (e.g., throttling from the Prometheus backend). + +### Authentication + +Authentication is [explicitly out of scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for Prometheus Remote-Write specifications. +The connector provides a generic interface, PrometheusRequestSigner, to manipulate requests and add headers. This allows implementation of any authentication scheme that requires adding headers to the request, such as API keys, authorization, or signature tokens. + +In the release `1.0.0`, an implementation for Amazon Managed Service for Prometheus (AMP) is provided as a separate, optional dependency. + +## Designing the connector + +A sink to Prometheus differs from a sink to most other datastores or time-series databases. The wire interface is the easiest part; the main challenges arise from the diff
Re: [PR] [FLINK-35136][docs] Bump HBase connector docs version to v4.0 [flink]
ferenc-csaky commented on PR #25697: URL: https://github.com/apache/flink/pull/25697#issuecomment-2504020451 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36808) UNION ALL after lookup join produces unexpected results
[ https://issues.apache.org/jira/browse/FLINK-36808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren updated FLINK-36808: -- Description: Here is the SQL to reproduce the issue: {code:java} -- Data of table `stream`: -- (1, Alice) -- (2, Bob) CREATE TEMPORARY TABLE `stream` ( `id` BIGINT, `name` STRING, `txn_time` as proctime(), PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://localhost:5432/postgres', 'table-name' = 'stream', 'username' = 'postgres', 'password' = 'postgres' ); -- Data of table `dim`: -- (1, OK) -- (2, OK) CREATE TEMPORARY TABLE `dim` ( `id` BIGINT, `status` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://localhost:5432/postgres', 'table-name' = 'dim', 'username' = 'postgres', 'password' = 'postgres' ); -- Lookup join two tables twice with different filter, and union them together SELECT s.id, s.name, s.txn_time, d.status FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d` ON `s`.`id` = `d`.`id` WHERE `d`.`status` = 'OK' UNION ALL SELECT s.id, s.name, s.txn_time, d.status FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d` ON `s`.`id` = `d`.`id` WHERE `d`.`status` = 'NOT_EXISTS';{code} The first lookup join should output: {code:java} (1, Alice 2024-11-27 11:52:19.332, OK) (2, Bob 2024-11-27 11:52:19.332, OK) {code} The second lookup join should output nothing, as there's not status 'NOT_EXISTS'. But the result after union is: {code:java} 1, Alice, 2024-11-27 11:52:19.332, OK 2, Bob, 2024-11-27 11:52:19.332, OK 1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS 2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code} There shouldn't be any 'NOT_EXISTS's. The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are appended directly by the calc after the lookup join operation, which is not as expected. {code:java} | == Abstract Syntax Tree == LogicalUnion(all=[true]) :- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4]) : +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')]) : +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}]) : :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()]) : : +- LogicalTableScan(table=[[default_catalog, default_database, stream]]) : +- LogicalFilter(condition=[=($cor0.id, $0)]) : +- LogicalSnapshot(period=[$cor0.txn_time]) : +- LogicalTableScan(table=[[default_catalog, default_database, dim]]) +- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4]) +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')]) +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 2}]) :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()]) : +- LogicalTableScan(table=[[default_catalog, default_database, stream]]) +- LogicalFilter(condition=[=($cor1.id, $0)]) +- LogicalSnapshot(period=[$cor1.txn_time]) +- LogicalTableScan(table=[[default_catalog, default_database, dim]]) == Optimized Physical Plan == Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) +- Union(all=[true], union=[id, name, txn_time, status]) :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status]) : +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id]) : +- Calc(select=[id, name, PROCTIME() AS txn_time]) : +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) +- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status]) +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id]) +- Calc(select=[id, name, PROCTIME() AS txn_time]) +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) == Optimized Execution Plan == Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) +- Union(all=[true], union=[id, name, txn_time, status]) :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS status]) : +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])(reuse_id=[1]) : +- Calc(select=[id, name, PROCTIME() AS txn_time]) : +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) +- Calc(select=
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais commented on PR #25703: URL: https://github.com/apache/flink/pull/25703#issuecomment-2504152111 Forgot one detail, let me open that again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on code in PR #766: URL: https://github.com/apache/flink-web/pull/766#discussion_r1860859551 ## docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md: ## @@ -0,0 +1,202 @@ +--- +title: "Introducing the new Prometheus connector" +date: "2024-11-26T00:00:00.000Z" +authors: +- nicusX: + name: "Lorenzo Nicora" +--- + + +We are excited to announce a new sink connector that enables writing data to Prometheus ([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)). This articles introduces the main features of the connector, and the reasoning behind design decisions. + +This connector allows writing data to Prometheus using the [Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push interface, which lets you write time-series data to Prometheus at scale. + +## Motivations for a Prometheus connector + +Prometheus is an efficient time-series database optimized for building real-time dashboards and alerts, typically in combination with Grafana or other visualization tools. + +Prometheus is commonly used to monitor compute resources, IT infrastructure, Kubernetes clusters, applications, and cloud resources. It can also be used to observe your Flink cluster and Flink jobs. Flink existing [Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/) has this purpose. + +So, why do we need a connector? + +Prometheus can serve as a general-purpose observability time-series database, beyond traditional infrastructure monitoring. For example, it can be used to monitor IoT devices, sensors, connected cars, media streaming devices, and any resource that streams events or measurements continuously. + +Observability data from these use cases differs from metrics generated by compute resources. They present additional challenges: +* **Out-of-order events**: Devices may be connected via mobile networks or even Bluetooth. Events from different devices may follow different paths and arrive at very different times. A **stateful, event-time logic** can be used to reorder them. +* **High frequency** and **high cardinality**: You can have a sheer number of devices, each emitting signals multiple times per second. **Aggregating over time** and **over dimensions** can reduce frequency and cardinality and make the volume of data more efficiently analysable. +* **Lack of contextual information**: Raw events sent by the devices often lack of contextual information for a meaningful analysis. **Enrichment** of raw events, looking up some reference dataset, can be used to add dimensions useful for the analysis. +* **Noise**: sensor measurement may contain noise. For example when a GPS tracker lose connection and reports spurious positions. These obvious outliers can be **filtered** out to simplify visualization and analysis. + +Flink can be used as a pre-processor to address all the above. + +You can implement a sink from scratch or use AsyncIO to call the Prometheus Remote-Write endpoint. However, there are not-trivial details to implement an efficient Remote-Write client: +* There is no high-level client for Prometheus Remote-Write. You would need to build on top of a low-level HTTP client. +* Remote-Write can be inefficient unless write requests are batched and parallelized. +* Error handling can be complex, and specifications demand strict behaviors (see [Strict Specifications, Lenient Implementations](#strict-specifications-lenient-implementations)). + +The new Prometheus connector manages all of this for you. + +## Key features + +The version `1.0.0` of the Prometheus connector has the following features: + +* DataStream API, Java Sink, based on AsyncSinkBase. +* Configurable write batching. +* Order is retained on retries. +* At-most-once guarantees (we’ll explain the reason for this later). +* Supports parallelism greater than 1. +* Configurable retry strategy for retryable server responses (e.g., throttling) and configurable behavior when the retry limit is reached. +* Configurable behavior on retryable errors (e.g., throttling from the Prometheus backend). + +### Authentication + +Authentication is [explicitly out of scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for Prometheus Remote-Write specifications. +The connector provides a generic interface, PrometheusRequestSigner, to manipulate requests and add headers. This allows implementation of any authentication scheme that requires adding headers to the request, such as API keys, authorization, or signature tokens. + +In the release `1.0.0`, an implementation for Amazon Managed Service for Prometheus (AMP) is provided as a separate, optional dependency. + +## Designing the connector + +A sink to Prometheus differs from a sink to most other datastores or time-series databases. The wire interface is the easiest part; the main challenges arise from the diff
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on code in PR #766: URL: https://github.com/apache/flink-web/pull/766#discussion_r1860866006 ## docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md: ## @@ -0,0 +1,202 @@ +--- +title: "Introducing the new Prometheus connector" +date: "2024-11-26T00:00:00.000Z" +authors: +- nicusX: + name: "Lorenzo Nicora" +--- + + +We are excited to announce a new sink connector that enables writing data to Prometheus ([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)). This articles introduces the main features of the connector, and the reasoning behind design decisions. + +This connector allows writing data to Prometheus using the [Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push interface, which lets you write time-series data to Prometheus at scale. + +## Motivations for a Prometheus connector + +Prometheus is an efficient time-series database optimized for building real-time dashboards and alerts, typically in combination with Grafana or other visualization tools. + +Prometheus is commonly used to monitor compute resources, IT infrastructure, Kubernetes clusters, applications, and cloud resources. It can also be used to observe your Flink cluster and Flink jobs. Flink existing [Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/) has this purpose. + +So, why do we need a connector? + +Prometheus can serve as a general-purpose observability time-series database, beyond traditional infrastructure monitoring. For example, it can be used to monitor IoT devices, sensors, connected cars, media streaming devices, and any resource that streams events or measurements continuously. + +Observability data from these use cases differs from metrics generated by compute resources. They present additional challenges: +* **Out-of-order events**: Devices may be connected via mobile networks or even Bluetooth. Events from different devices may follow different paths and arrive at very different times. A **stateful, event-time logic** can be used to reorder them. +* **High frequency** and **high cardinality**: You can have a sheer number of devices, each emitting signals multiple times per second. **Aggregating over time** and **over dimensions** can reduce frequency and cardinality and make the volume of data more efficiently analysable. +* **Lack of contextual information**: Raw events sent by the devices often lack of contextual information for a meaningful analysis. **Enrichment** of raw events, looking up some reference dataset, can be used to add dimensions useful for the analysis. +* **Noise**: sensor measurement may contain noise. For example when a GPS tracker lose connection and reports spurious positions. These obvious outliers can be **filtered** out to simplify visualization and analysis. + +Flink can be used as a pre-processor to address all the above. + +You can implement a sink from scratch or use AsyncIO to call the Prometheus Remote-Write endpoint. However, there are not-trivial details to implement an efficient Remote-Write client: +* There is no high-level client for Prometheus Remote-Write. You would need to build on top of a low-level HTTP client. +* Remote-Write can be inefficient unless write requests are batched and parallelized. +* Error handling can be complex, and specifications demand strict behaviors (see [Strict Specifications, Lenient Implementations](#strict-specifications-lenient-implementations)). + +The new Prometheus connector manages all of this for you. + +## Key features + +The version `1.0.0` of the Prometheus connector has the following features: + +* DataStream API, Java Sink, based on AsyncSinkBase. +* Configurable write batching. +* Order is retained on retries. +* At-most-once guarantees (we’ll explain the reason for this later). +* Supports parallelism greater than 1. +* Configurable retry strategy for retryable server responses (e.g., throttling) and configurable behavior when the retry limit is reached. +* Configurable behavior on retryable errors (e.g., throttling from the Prometheus backend). + +### Authentication + +Authentication is [explicitly out of scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for Prometheus Remote-Write specifications. +The connector provides a generic interface, PrometheusRequestSigner, to manipulate requests and add headers. This allows implementation of any authentication scheme that requires adding headers to the request, such as API keys, authorization, or signature tokens. + +In the release `1.0.0`, an implementation for Amazon Managed Service for Prometheus (AMP) is provided as a separate, optional dependency. + +## Designing the connector + +A sink to Prometheus differs from a sink to most other datastores or time-series databases. The wire interface is the easiest part; the main challenges arise from the diff
Re: [PR] [FLINK-36595][docs] Explicitly set connector compatibility as string to prevent version comparison mismatch [flink-connector-hbase]
ferenc-csaky merged PR #53: URL: https://github.com/apache/flink-connector-hbase/pull/53 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36595][docs] Explicitly set connector compatibility as string … [flink-connector-hbase]
ferenc-csaky commented on PR #52: URL: https://github.com/apache/flink-connector-hbase/pull/52#issuecomment-2504645328 Closed, as it is obsolete now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36595][docs] Explicitly set connector compatibility as string to prevent version comparison mismatch [flink-connector-hbase]
boring-cyborg[bot] commented on PR #53: URL: https://github.com/apache/flink-connector-hbase/pull/53#issuecomment-2504644678 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36595][docs] Explicitly set connector compatibility as string … [flink-connector-hbase]
ferenc-csaky closed pull request #52: [FLINK-36595][docs] Explicitly set connector compatibility as string … URL: https://github.com/apache/flink-connector-hbase/pull/52 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35315) MemoryManagerConcurrentModReleaseTest executes more than 15 minutes
[ https://issues.apache.org/jira/browse/FLINK-35315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-35315: -- Priority: Critical (was: Major) > MemoryManagerConcurrentModReleaseTest executes more than 15 minutes > --- > > Key: FLINK-35315 > URL: https://issues.apache.org/jira/browse/FLINK-35315 > Project: Flink > Issue Type: Bug > Components: Runtime / Network, Tests >Affects Versions: 1.20.0, 2.0-preview >Reporter: Rui Fan >Priority: Critical > Labels: test-stability > Fix For: 2.0.0 > > Attachments: image-2024-05-09-11-53-10-037.png > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59395&view=results] > > It seems > MemoryManagerConcurrentModReleaseTest.testConcurrentModificationWhileReleasing > executes more than 15 minutes. > The root cause may be {color:#e1dfdd}ConcurrentModificationException{color} > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59395&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=10060] > > !image-2024-05-09-11-53-10-037.png! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35136][docs] Bump HBase connector docs version to v4.0 [flink]
flinkbot commented on PR #25702: URL: https://github.com/apache/flink/pull/25702#issuecomment-2504107185 ## CI report: * 41b6582f692f590ce71d2721306a8c06d3be97ee UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36810) Reflective access warning not addressed causing Flink failing to process job creation
[ https://issues.apache.org/jira/browse/FLINK-36810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901531#comment-17901531 ] Yusu Gao commented on FLINK-36810: -- [~tomncooper] Thank you for pointing out that one. I think we missed this list. Now my question becomes, is there any plan or any work in progress to refactor the code not to use so many reflections? Certain open-ups from this list like sun.security.tools are very concerning in terms of security. > Reflective access warning not addressed causing Flink failing to process job > creation > - > > Key: FLINK-36810 > URL: https://issues.apache.org/jira/browse/FLINK-36810 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.20.0 >Reporter: Yusu Gao >Priority: Major > > While we are testing migration our Flink cluster from 1.11 on Java 11 runtime > to 1.20 on Java 17 runtime, we noticed a job creation error. > {code:java} > "errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Could > not execute application. > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114) > at > java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) > at > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) > at java.base/java.lang.Thread.run(Thread.java:840) > Caused by: java.util.concurrent.CompletionException: > java.lang.NoClassDefFoundError: Could not initialize class > org.apache.flink.datastream.impl.ExecutionContextEnvironment > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770) > ... 1 more > Caused by: java.lang.NoClassDefFoundError: Could not initialize class > org.apache.flink.datastream.impl.ExecutionContextEnvironment > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:109) > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) > ... 1 more > Caused by: java.lang.ExceptionInInitializerError: Exception > java.lang.ExceptionInInitializerError [in thread > \"flink-jar-runner-thread-1\"] > ... 6 more {code} > Reverting runtime back to Java 11 with {*}--illegal-access=warn{*}, we found > a warning for Flink code > {code:java} > WARNING: Illegal reflective access by > org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator > (file:/path/to/flink/flink-1.20.0/lib/flink-dist-1.20.0.jar) to field > java.util.Collections$UnmodifiableMap.m {code} > Which points to > [DataStreamV2SinkTransformationTranslator|https://github.com/apache/flink/blob/release-1.20.0/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java#L98] > which is used exactly in the [class > |https://github.com/apache/flink/blob/master/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java#L95]throwing > exception. > And adding argument to the JVM_ARGS env var fixed issue > {code:java} > export JVM_ARGS="$JVM_ARGS --add-opens=java.base/java.util=ALL-UNNAMED"{code} > This is a workaround but not necessarily the best solution nor the safest. > Please help provide insights or better solution for such issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais opened a new pull request, #25703: URL: https://github.com/apache/flink/pull/25703 ## What is the purpose of the change Support ignoreIfExists for createTable. Works exactly the same way for other resources, like createFunction. Similar to SQL's `IF NOT EXISTS`. ## Brief change log - Added one more signature for TableEnvironment's createTable supporting ignoreIfExists - Added testCreateTableIfNotExistsFromDescriptor ## Verifying this change This change added tests and can be verified as follows: - Added test that validates that running createTable, with ignoreIfExists set to true, for an existing table leads to a no op. Also checks it doing the same ignoreIfExists set to false throws an exception. ## Does this pull request potentially affect one of the following parts: - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - Dependencies (does it add or upgrade a dependency): no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36809) Add ifNotExists param to createTable
[ https://issues.apache.org/jira/browse/FLINK-36809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36809: --- Labels: pull-request-available (was: ) > Add ifNotExists param to createTable > > > Key: FLINK-36809 > URL: https://issues.apache.org/jira/browse/FLINK-36809 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Gustavo de Morais >Priority: Major > Labels: pull-request-available > > Table API's createTable currently doesn't support ignoreIfExists, like other > interfaces for other resources already support. > We want to support the ignoreIfExists param for createTable. It should work > the same way it does for other resources, like createFunction. > > Similar to SQL's {{{}_IF NOT EXISTS_ clause{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36810) Reflective access warning not addressed causing Flink failing to process job creation
Yusu Gao created FLINK-36810: Summary: Reflective access warning not addressed causing Flink failing to process job creation Key: FLINK-36810 URL: https://issues.apache.org/jira/browse/FLINK-36810 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.20.0 Reporter: Yusu Gao While we are testing migration our Flink cluster from 1.11 on Java 11 runtime to 1.20 on Java 17 runtime, we noticed a job creation error. {code:java} "errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114) at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.datastream.impl.ExecutionContextEnvironment at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770) ... 1 more Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.datastream.impl.ExecutionContextEnvironment at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:109) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ... 1 more Caused by: java.lang.ExceptionInInitializerError: Exception java.lang.ExceptionInInitializerError [in thread \"flink-jar-runner-thread-1\"] ... 6 more {code} Reverting runtime back to Java 11 with {*}--illegal-access=warn{*}, we found a warning for Flink code {code:java} WARNING: Illegal reflective access by org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator (file:/path/to/flink/flink-1.20.0/lib/flink-dist-1.20.0.jar) to field java.util.Collections$UnmodifiableMap.m {code} Which points to [DataStreamV2SinkTransformationTranslator|https://github.com/apache/flink/blob/release-1.20.0/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java#L98] which is used exactly in the [class |https://github.com/apache/flink/blob/master/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java#L95]throwing exception. And adding argument to the JVM_ARGS env var fixed issue {code:java} export JVM_ARGS="$JVM_ARGS --add-opens=java.base/java.util=ALL-UNNAMED"{code} This is a workaround but not necessarily the best solution nor the safest. Please help provide insights or better solution for such issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on code in PR #766: URL: https://github.com/apache/flink-web/pull/766#discussion_r1860845426 ## docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md: ## @@ -0,0 +1,202 @@ +--- +title: "Introducing the new Prometheus connector" +date: "2024-11-26T00:00:00.000Z" +authors: +- nicusX: + name: "Lorenzo Nicora" +--- + + +We are excited to announce a new sink connector that enables writing data to Prometheus ([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)). This articles introduces the main features of the connector, and the reasoning behind design decisions. + +This connector allows writing data to Prometheus using the [Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push interface, which lets you write time-series data to Prometheus at scale. + +## Motivations for a Prometheus connector + +Prometheus is an efficient time-series database optimized for building real-time dashboards and alerts, typically in combination with Grafana or other visualization tools. + +Prometheus is commonly used to monitor compute resources, IT infrastructure, Kubernetes clusters, applications, and cloud resources. It can also be used to observe your Flink cluster and Flink jobs. Flink existing [Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/) has this purpose. + +So, why do we need a connector? + +Prometheus can serve as a general-purpose observability time-series database, beyond traditional infrastructure monitoring. For example, it can be used to monitor IoT devices, sensors, connected cars, media streaming devices, and any resource that streams events or measurements continuously. + +Observability data from these use cases differs from metrics generated by compute resources. They present additional challenges: +* **Out-of-order events**: Devices may be connected via mobile networks or even Bluetooth. Events from different devices may follow different paths and arrive at very different times. A **stateful, event-time logic** can be used to reorder them. +* **High frequency** and **high cardinality**: You can have a sheer number of devices, each emitting signals multiple times per second. **Aggregating over time** and **over dimensions** can reduce frequency and cardinality and make the volume of data more efficiently analysable. +* **Lack of contextual information**: Raw events sent by the devices often lack of contextual information for a meaningful analysis. **Enrichment** of raw events, looking up some reference dataset, can be used to add dimensions useful for the analysis. +* **Noise**: sensor measurement may contain noise. For example when a GPS tracker lose connection and reports spurious positions. These obvious outliers can be **filtered** out to simplify visualization and analysis. + +Flink can be used as a pre-processor to address all the above. + +You can implement a sink from scratch or use AsyncIO to call the Prometheus Remote-Write endpoint. However, there are not-trivial details to implement an efficient Remote-Write client: +* There is no high-level client for Prometheus Remote-Write. You would need to build on top of a low-level HTTP client. +* Remote-Write can be inefficient unless write requests are batched and parallelized. +* Error handling can be complex, and specifications demand strict behaviors (see [Strict Specifications, Lenient Implementations](#strict-specifications-lenient-implementations)). + +The new Prometheus connector manages all of this for you. + +## Key features + +The version `1.0.0` of the Prometheus connector has the following features: + +* DataStream API, Java Sink, based on AsyncSinkBase. +* Configurable write batching. +* Order is retained on retries. +* At-most-once guarantees (we’ll explain the reason for this later). Review Comment: Prometheus is at-most-once by design: fast ingestion and data freshness over consistency. If you try to make a Prometheus sink at-least-once, Prometheus will spit in your face :D If you make the connector fail when Prometheus rejects your data, you are doomed into an endless loop of restart-from-checkpoint -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add HBase Connector 4.0.0 [flink-web]
ferenc-csaky merged PR #762: URL: https://github.com/apache/flink-web/pull/762 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35136][docs] Bump HBase connector docs version to v4.0 [flink]
ferenc-csaky merged PR #25697: URL: https://github.com/apache/flink/pull/25697 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-36451) Kubernetes Application JobManager Potential Deadlock and TaskManager Pod Residuals
[ https://issues.apache.org/jira/browse/FLINK-36451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895936#comment-17895936 ] Matthias Pohl edited comment on FLINK-36451 at 11/27/24 2:33 PM: - The problem is that we have two code paths running concurrently (in the contenders main thread and in the main thread of the {{DefaultLeaderElectionService}}) that acquire the same locks in reversed order: * Certain operations in the contender (the DispatcherLeaderProcess in FLINK-36451 and the JobMasterServiceLeadershipRunner) need to be called only as leader. These operations are executed from within the contender and acquire the contender’s lock first before acquiring the LeaderElectionService lock (to check for leadership)). * In the meantime, any leadership change event will trigger the same locks in reverse order: First, locking the DefaultLeaderElectionService lock and then informing the registered contenders. Notifying the contenders will require acquiring their locks as part of the process. With unstable leadership happening quite often, the chance of running into a deadlock increases. This is an issue that does not only affect the k8s but also the ZooKeeper leader election. The solution should be to run any leader election-related operations solely on the {{DefaultLeaderElectionService}} main thread to ensure sequential execution. This allows us to remove the lock within the {{DefaultLeaderElectionService}}. We have to ensure, though, that the operations that are performed on the {{DefaultLeaderElectionService}} main thread are light-weight (similar to the requirements of Flink's {{RpcEndpoint}} main thread). was (Author: mapohl): The problem is that we have two code paths running concurrently (in the contenders main thread and in the main thread of the {{DefaultLeaderElectionService}}) that acquire the same locks in reversed order: * Certain operations in the contender (the DispatcherLeaderProcess in FLINK-36451 and the JobMasterServiceLeadershipRunner in FLINKCC-1558) need to be called only as leader. These operations are executed from within the contender and acquire the contender’s lock first before acquiring the LeaderElectionService lock (to check for leadership)). * In the meantime, any leadership change event will trigger the same locks in reverse order: First, locking the DefaultLeaderElectionService lock and then informing the registered contenders. Notifying the contenders will require acquiring their locks as part of the process. With unstable leadership happening quite often, the chance of running into a deadlock increases. This is an issue that does not only affect the k8s but also the ZooKeeper leader election. The solution should be to run any leader election-related operations solely on the {{DefaultLeaderElectionService}} main thread to ensure sequential execution. This allows us to remove the lock within the {{DefaultLeaderElectionService}}. We have to ensure, though, that the operations that are performed on the {{DefaultLeaderElectionService}} main thread are light-weight (similar to the requirements of Flink's {{RpcEndpoint}} main thread). > Kubernetes Application JobManager Potential Deadlock and TaskManager Pod > Residuals > -- > > Key: FLINK-36451 > URL: https://issues.apache.org/jira/browse/FLINK-36451 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.1 > Environment: * Flink version: 1.19.1 > * - Deployment mode: Flink Kubernetes Application Mode > * - JVM version: OpenJDK 17 > >Reporter: xiechenling >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available > Attachments: 1.png, 2.png, jobmanager.log, jstack.txt > > > In Kubernetes Application Mode, when there is significant etcd latency or > instability, the Flink JobManager may enter a deadlock situation. > Additionally, TaskManager pods are not cleaned up properly, resulting in > stale resources that prevent the Flink job from recovering correctly. This > issue occurs during frequent service restarts or network instability. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-36451) Kubernetes Application JobManager Potential Deadlock and TaskManager Pod Residuals
[ https://issues.apache.org/jira/browse/FLINK-36451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895936#comment-17895936 ] Matthias Pohl edited comment on FLINK-36451 at 11/27/24 2:34 PM: - The problem is that we have two code paths running concurrently (in the contenders main thread and in the main thread of the {{DefaultLeaderElectionService}}) that acquire the same locks in reversed order: * Certain operations in the contender (the DispatcherLeaderProcess in FLINK-36451 and similarly the JobMasterServiceLeadershipRunner) need to be called only as leader. These operations are executed from within the contender and acquire the contender’s lock first before acquiring the LeaderElectionService lock (to check for leadership)). * In the meantime, any leadership change event will trigger the same locks in reverse order: First, locking the DefaultLeaderElectionService lock and then informing the registered contenders. Notifying the contenders will require acquiring their locks as part of the process. With unstable leadership happening quite often, the chance of running into a deadlock increases. This is an issue that does not only affect the k8s but also the ZooKeeper leader election. The solution should be to run any leader election-related operations solely on the {{DefaultLeaderElectionService}} main thread to ensure sequential execution. This allows us to remove the lock within the {{DefaultLeaderElectionService}}. We have to ensure, though, that the operations that are performed on the {{DefaultLeaderElectionService}} main thread are light-weight (similar to the requirements of Flink's {{RpcEndpoint}} main thread). was (Author: mapohl): The problem is that we have two code paths running concurrently (in the contenders main thread and in the main thread of the {{DefaultLeaderElectionService}}) that acquire the same locks in reversed order: * Certain operations in the contender (the DispatcherLeaderProcess in FLINK-36451 and the JobMasterServiceLeadershipRunner) need to be called only as leader. These operations are executed from within the contender and acquire the contender’s lock first before acquiring the LeaderElectionService lock (to check for leadership)). * In the meantime, any leadership change event will trigger the same locks in reverse order: First, locking the DefaultLeaderElectionService lock and then informing the registered contenders. Notifying the contenders will require acquiring their locks as part of the process. With unstable leadership happening quite often, the chance of running into a deadlock increases. This is an issue that does not only affect the k8s but also the ZooKeeper leader election. The solution should be to run any leader election-related operations solely on the {{DefaultLeaderElectionService}} main thread to ensure sequential execution. This allows us to remove the lock within the {{DefaultLeaderElectionService}}. We have to ensure, though, that the operations that are performed on the {{DefaultLeaderElectionService}} main thread are light-weight (similar to the requirements of Flink's {{RpcEndpoint}} main thread). > Kubernetes Application JobManager Potential Deadlock and TaskManager Pod > Residuals > -- > > Key: FLINK-36451 > URL: https://issues.apache.org/jira/browse/FLINK-36451 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.1 > Environment: * Flink version: 1.19.1 > * - Deployment mode: Flink Kubernetes Application Mode > * - JVM version: OpenJDK 17 > >Reporter: xiechenling >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available > Attachments: 1.png, 2.png, jobmanager.log, jstack.txt > > > In Kubernetes Application Mode, when there is significant etcd latency or > instability, the Flink JobManager may enter a deadlock situation. > Additionally, TaskManager pods are not cleaned up properly, resulting in > stale resources that prevent the Flink job from recovering correctly. This > issue occurs during frequent service restarts or network instability. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36809) Add ifNotExists param to createTable
Gustavo de Morais created FLINK-36809: - Summary: Add ifNotExists param to createTable Key: FLINK-36809 URL: https://issues.apache.org/jira/browse/FLINK-36809 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Gustavo de Morais Table API's createTable currently doesn't support ignoreIfExists, like other interfaces for other resources already support. We want to support the ignoreIfExists param for createTable. It should work the same way it does for other resources, like createFunction. Similar to SQL's {{{}_IF NOT EXISTS_ clause{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36808) UNION ALL after lookup join produces unexpected results
Qingsheng Ren created FLINK-36808: - Summary: UNION ALL after lookup join produces unexpected results Key: FLINK-36808 URL: https://issues.apache.org/jira/browse/FLINK-36808 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.1, 1.20.0 Reporter: Qingsheng Ren Here is the SQL to reproduce the issue: {code:java} -- Data of table `stream`: -- (1, Alice) -- (2, Bob) CREATE TEMPORARY TABLE `stream` ( `id` BIGINT, `name` STRING, `txn_time` as proctime(), PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://localhost:5432/postgres', 'table-name' = 'stream', 'username' = 'postgres', 'password' = 'postgres' ); -- Data of table `dim`: -- (1, OK) -- (2, OK) CREATE TEMPORARY TABLE `dim` ( `id` BIGINT, `status` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://localhost:5432/postgres', 'table-name' = 'dim', 'username' = 'postgres', 'password' = 'postgres' ); -- Lookup join two tables twice with different filter, and union them together SELECT s.id, s.name, s.txn_time, d.status FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d` ON `s`.`id` = `d`.`id` WHERE `d`.`status` = 'OK' UNION ALL SELECT s.id, s.name, s.txn_time, d.status FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d` ON `s`.`id` = `d`.`id` WHERE `d`.`status` = 'NOT_EXISTS';{code} The first lookup join should output: {code:java} (1, Alice 2024-11-27 11:52:19.332, OK) (2, Bob 2024-11-27 11:52:19.332, OK) {code} The second lookup join should output nothing, as there's not status 'NOT_EXISTS'. But the result after union is: {code:java} 1, Alice, 2024-11-27 11:52:19.332, OK 2, Bob, 2024-11-27 11:52:19.332, OK 1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS 2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code} There shouldn't be any 'NOT_EXISTS's. The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are appended directly by the calc after the lookup join operation, which is not as expected. {code:java} | == Abstract Syntax Tree == LogicalUnion(all=[true]) :- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4]) : +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')]) : +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}]) : :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()]) : : +- LogicalTableScan(table=[[default_catalog, default_database, stream]]) : +- LogicalFilter(condition=[=($cor0.id, $0)]) : +- LogicalSnapshot(period=[$cor0.txn_time]) : +- LogicalTableScan(table=[[default_catalog, default_database, dim]]) +- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4]) +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')]) +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 2}]) :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()]) : +- LogicalTableScan(table=[[default_catalog, default_database, stream]]) +- LogicalFilter(condition=[=($cor1.id, $0)]) +- LogicalSnapshot(period=[$cor1.txn_time]) +- LogicalTableScan(table=[[default_catalog, default_database, dim]])== Optimized Physical Plan == Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) +- Union(all=[true], union=[id, name, txn_time, status]) :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status]) : +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id]) : +- Calc(select=[id, name, PROCTIME() AS txn_time]) : +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) +- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status]) +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id]) +- Calc(select=[id, name, PROCTIME() AS txn_time]) +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])== Optimized Execution Plan == Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) +- Union(all=[true], union=[id, name, txn_time, status]) :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS status]) : +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_t
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on code in PR #766: URL: https://github.com/apache/flink-web/pull/766#discussion_r1860883342 ## docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md: ## @@ -0,0 +1,202 @@ +--- +title: "Introducing the new Prometheus connector" +date: "2024-11-26T00:00:00.000Z" +authors: +- nicusX: + name: "Lorenzo Nicora" +--- + + +We are excited to announce a new sink connector that enables writing data to Prometheus ([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)). This articles introduces the main features of the connector, and the reasoning behind design decisions. + +This connector allows writing data to Prometheus using the [Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push interface, which lets you write time-series data to Prometheus at scale. + +## Motivations for a Prometheus connector + +Prometheus is an efficient time-series database optimized for building real-time dashboards and alerts, typically in combination with Grafana or other visualization tools. + +Prometheus is commonly used to monitor compute resources, IT infrastructure, Kubernetes clusters, applications, and cloud resources. It can also be used to observe your Flink cluster and Flink jobs. Flink existing [Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/) has this purpose. + +So, why do we need a connector? + +Prometheus can serve as a general-purpose observability time-series database, beyond traditional infrastructure monitoring. For example, it can be used to monitor IoT devices, sensors, connected cars, media streaming devices, and any resource that streams events or measurements continuously. + +Observability data from these use cases differs from metrics generated by compute resources. They present additional challenges: +* **Out-of-order events**: Devices may be connected via mobile networks or even Bluetooth. Events from different devices may follow different paths and arrive at very different times. A **stateful, event-time logic** can be used to reorder them. +* **High frequency** and **high cardinality**: You can have a sheer number of devices, each emitting signals multiple times per second. **Aggregating over time** and **over dimensions** can reduce frequency and cardinality and make the volume of data more efficiently analysable. +* **Lack of contextual information**: Raw events sent by the devices often lack of contextual information for a meaningful analysis. **Enrichment** of raw events, looking up some reference dataset, can be used to add dimensions useful for the analysis. +* **Noise**: sensor measurement may contain noise. For example when a GPS tracker lose connection and reports spurious positions. These obvious outliers can be **filtered** out to simplify visualization and analysis. + +Flink can be used as a pre-processor to address all the above. + +You can implement a sink from scratch or use AsyncIO to call the Prometheus Remote-Write endpoint. However, there are not-trivial details to implement an efficient Remote-Write client: +* There is no high-level client for Prometheus Remote-Write. You would need to build on top of a low-level HTTP client. +* Remote-Write can be inefficient unless write requests are batched and parallelized. +* Error handling can be complex, and specifications demand strict behaviors (see [Strict Specifications, Lenient Implementations](#strict-specifications-lenient-implementations)). + +The new Prometheus connector manages all of this for you. + +## Key features + +The version `1.0.0` of the Prometheus connector has the following features: + +* DataStream API, Java Sink, based on AsyncSinkBase. +* Configurable write batching. +* Order is retained on retries. +* At-most-once guarantees (we’ll explain the reason for this later). +* Supports parallelism greater than 1. +* Configurable retry strategy for retryable server responses (e.g., throttling) and configurable behavior when the retry limit is reached. +* Configurable behavior on retryable errors (e.g., throttling from the Prometheus backend). + +### Authentication + +Authentication is [explicitly out of scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for Prometheus Remote-Write specifications. +The connector provides a generic interface, PrometheusRequestSigner, to manipulate requests and add headers. This allows implementation of any authentication scheme that requires adding headers to the request, such as API keys, authorization, or signature tokens. + +In the release `1.0.0`, an implementation for Amazon Managed Service for Prometheus (AMP) is provided as a separate, optional dependency. + +## Designing the connector + +A sink to Prometheus differs from a sink to most other datastores or time-series databases. The wire interface is the easiest part; the main challenges arise from the diff
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on code in PR #766: URL: https://github.com/apache/flink-web/pull/766#discussion_r1860886508 ## docs/content/posts/2024-11-26-introducing-new-prometheus-connector.md: ## @@ -0,0 +1,202 @@ +--- +title: "Introducing the new Prometheus connector" +date: "2024-11-26T00:00:00.000Z" +authors: +- nicusX: + name: "Lorenzo Nicora" +--- + + +We are excited to announce a new sink connector that enables writing data to Prometheus ([FLIP-312](https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector)). This articles introduces the main features of the connector, and the reasoning behind design decisions. + +This connector allows writing data to Prometheus using the [Remote-Write](https://prometheus.io/docs/specs/remote_write_spec/) push interface, which lets you write time-series data to Prometheus at scale. + +## Motivations for a Prometheus connector + +Prometheus is an efficient time-series database optimized for building real-time dashboards and alerts, typically in combination with Grafana or other visualization tools. + +Prometheus is commonly used to monitor compute resources, IT infrastructure, Kubernetes clusters, applications, and cloud resources. It can also be used to observe your Flink cluster and Flink jobs. Flink existing [Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/) has this purpose. + +So, why do we need a connector? + +Prometheus can serve as a general-purpose observability time-series database, beyond traditional infrastructure monitoring. For example, it can be used to monitor IoT devices, sensors, connected cars, media streaming devices, and any resource that streams events or measurements continuously. + +Observability data from these use cases differs from metrics generated by compute resources. They present additional challenges: +* **Out-of-order events**: Devices may be connected via mobile networks or even Bluetooth. Events from different devices may follow different paths and arrive at very different times. A **stateful, event-time logic** can be used to reorder them. +* **High frequency** and **high cardinality**: You can have a sheer number of devices, each emitting signals multiple times per second. **Aggregating over time** and **over dimensions** can reduce frequency and cardinality and make the volume of data more efficiently analysable. +* **Lack of contextual information**: Raw events sent by the devices often lack of contextual information for a meaningful analysis. **Enrichment** of raw events, looking up some reference dataset, can be used to add dimensions useful for the analysis. +* **Noise**: sensor measurement may contain noise. For example when a GPS tracker lose connection and reports spurious positions. These obvious outliers can be **filtered** out to simplify visualization and analysis. + +Flink can be used as a pre-processor to address all the above. + +You can implement a sink from scratch or use AsyncIO to call the Prometheus Remote-Write endpoint. However, there are not-trivial details to implement an efficient Remote-Write client: +* There is no high-level client for Prometheus Remote-Write. You would need to build on top of a low-level HTTP client. +* Remote-Write can be inefficient unless write requests are batched and parallelized. +* Error handling can be complex, and specifications demand strict behaviors (see [Strict Specifications, Lenient Implementations](#strict-specifications-lenient-implementations)). + +The new Prometheus connector manages all of this for you. + +## Key features + +The version `1.0.0` of the Prometheus connector has the following features: + +* DataStream API, Java Sink, based on AsyncSinkBase. +* Configurable write batching. +* Order is retained on retries. +* At-most-once guarantees (we’ll explain the reason for this later). +* Supports parallelism greater than 1. +* Configurable retry strategy for retryable server responses (e.g., throttling) and configurable behavior when the retry limit is reached. +* Configurable behavior on retryable errors (e.g., throttling from the Prometheus backend). + +### Authentication + +Authentication is [explicitly out of scope](https://prometheus.io/docs/specs/remote_write_spec/#out-of-scope) for Prometheus Remote-Write specifications. +The connector provides a generic interface, PrometheusRequestSigner, to manipulate requests and add headers. This allows implementation of any authentication scheme that requires adding headers to the request, such as API keys, authorization, or signature tokens. + +In the release `1.0.0`, an implementation for Amazon Managed Service for Prometheus (AMP) is provided as a separate, optional dependency. + +## Designing the connector + +A sink to Prometheus differs from a sink to most other datastores or time-series databases. The wire interface is the easiest part; the main challenges arise from the diff
[jira] [Commented] (FLINK-36810) Reflective access warning not addressed causing Flink failing to process job creation
[ https://issues.apache.org/jira/browse/FLINK-36810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901522#comment-17901522 ] Thomas Cooper commented on FLINK-36810: --- Are you setting the Java options needed for JVM 17 support, they are [listed|https://github.com/apache/flink/blob/762bb5ab2a6a5ed48f3d60501b9e1c0d362ad8cb/flink-dist/src/main/resources/config.yaml#L19] in the default conf.yaml file? > Reflective access warning not addressed causing Flink failing to process job > creation > - > > Key: FLINK-36810 > URL: https://issues.apache.org/jira/browse/FLINK-36810 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.20.0 >Reporter: Yusu Gao >Priority: Major > > While we are testing migration our Flink cluster from 1.11 on Java 11 runtime > to 1.20 on Java 17 runtime, we noticed a job creation error. > {code:java} > "errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Could > not execute application. > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114) > at > java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) > at > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) > at java.base/java.lang.Thread.run(Thread.java:840) > Caused by: java.util.concurrent.CompletionException: > java.lang.NoClassDefFoundError: Could not initialize class > org.apache.flink.datastream.impl.ExecutionContextEnvironment > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770) > ... 1 more > Caused by: java.lang.NoClassDefFoundError: Could not initialize class > org.apache.flink.datastream.impl.ExecutionContextEnvironment > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:109) > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) > ... 1 more > Caused by: java.lang.ExceptionInInitializerError: Exception > java.lang.ExceptionInInitializerError [in thread > \"flink-jar-runner-thread-1\"] > ... 6 more {code} > Reverting runtime back to Java 11 with {*}--illegal-access=warn{*}, we found > a warning for Flink code > {code:java} > WARNING: Illegal reflective access by > org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator > (file:/path/to/flink/flink-1.20.0/lib/flink-dist-1.20.0.jar) to field > java.util.Collections$UnmodifiableMap.m {code} > Which points to > [DataStreamV2SinkTransformationTranslator|https://github.com/apache/flink/blob/release-1.20.0/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java#L98] > which is used exactly in the [class > |https://github.com/apache/flink/blob/master/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java#L95]throwing > exception. > And adding argument to the JVM_ARGS env var fixed issue > {code:java} > export JVM_ARGS="$JVM_ARGS --add-opens=java.base/java.util=ALL-UNNAMED"{code} > This is a workaround but not necessarily the best solution nor the safest. > Please help provide insights or better solution for such issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
gustavodemorais opened a new pull request, #25704: URL: https://github.com/apache/flink/pull/25704 ## What is the purpose of the change Support ignoreIfExists for createTable. Works exactly the same way for other resources, like createFunction. Similar to SQL's `IF NOT EXISTS`. ## Brief change log - Added one more signature for TableEnvironment's createTable supporting ignoreIfExists - Added testCreateTableIfNotExistsFromDescriptor ## Verifying this change This change added tests and can be verified as follows: - Added test that validates that running createTable, with ignoreIfExists set to true, for an existing table leads to a no op. Also checks it doing the same ignoreIfExists set to false throws an exception. ## Does this pull request potentially affect one of the following parts: - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - Dependencies (does it add or upgrade a dependency): no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on PR #766: URL: https://github.com/apache/flink-web/pull/766#issuecomment-2504225605 @hlteoh37 @dannycranmer I addressed all your comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36451][runtime] Replaces LeaderElection#hasLeadership with LeaderElection#runAsLeader [flink]
XComp commented on code in PR #25679: URL: https://github.com/apache/flink/pull/25679#discussion_r1860904735 ## flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java: ## @@ -311,8 +312,11 @@ void testMultipleDriverCreations() throws Exception { @Test void testGrantCallWhileInstantiatingDriver() throws Exception { final UUID expectedLeaderSessionID = UUID.randomUUID(); +// leadership acquired right from the start final TestingLeaderElectionDriver.Builder driverBuilder = -TestingLeaderElectionDriver.newNoOpBuilder(); +TestingLeaderElectionDriver.newBuilder(new AtomicBoolean(true)); Review Comment: That was an interesting one because I wondered why we didn't need to set the leadership pre-FLINK-36451. The reason was a bug in the [TestingLeaderContender#grantLeadership](https://github.com/apache/flink/pull/25679/files#diff-b0f861e1a8e93374d482f4565c05430b9134f10bf691da391abef6b79b4dd015L53) method. The old implementation confirmed the leadership and pushed a LeaderInformation event after that call even if the confirmation failed (due to leadership loss). The new implementation is cleaner in this regards -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add blogpost for new Prometheus connector [flink-web]
nicusX commented on PR #766: URL: https://github.com/apache/flink-web/pull/766#issuecomment-2504229269 @hlteoh37 @dannycranmer I addressed all your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36809][table] Support ignoreIfExists param for createTable [flink]
twalthr commented on code in PR #25704: URL: https://github.com/apache/flink/pull/25704#discussion_r1860917076 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -745,6 +745,35 @@ void createTemporarySystemFunction( */ void createTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * If the table should not be permanently stored in a catalog, use {@link + * #createTemporaryTable(String, TableDescriptor)} instead. + * + * Examples: + * + * {@code + * tEnv.createTable("MyTable", TableDescriptor.forConnector("datagen") + * .schema(Schema.newBuilder() + * .column("f0", DataTypes.STRING()) + * .build()) + * .option(DataGenOptions.ROWS_PER_SECOND, 10) + * .option("fields.f0.kind", "random") + * .build()); + * } + * + * @param path The path under which the table will be registered. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param descriptor Template for creating a {@link CatalogTable} instance. + * @param ignoreIfExists If a table exists under the given path and this flag is set, no + * operation is executed. An exception is thrown otherwise. + */ +void createTable(String path, TableDescriptor descriptor, boolean ignoreIfExists); Review Comment: can you check whether Python already supports `createTable` with TableDescriptors. if yes, we should also add the method there. check `flink/flink-python/pyflink/table/table_environment.py` ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java: ## @@ -100,6 +101,55 @@ void testCreateTableFromDescriptor() throws Exception { .contains(entry("connector", "fake"), entry("a", "Test")); } +@Test +void testCreateTableIfNotExistsFromDescriptor() throws Exception { +final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); +final String catalog = tEnv.getCurrentCatalog(); +final String database = tEnv.getCurrentDatabase(); + +final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build(); +tEnv.createTable( +"T", + TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build(), +true); + +final ObjectPath objectPath = new ObjectPath(database, "T"); +assertThat( +tEnv.getCatalog(catalog) +.orElseThrow(AssertionError::new) +.tableExists(objectPath)) +.isTrue(); + +final CatalogBaseTable catalogTable = + tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getTable(objectPath); +assertThat(catalogTable).isInstanceOf(CatalogTable.class); +assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema); +assertThat(catalogTable.getOptions()) +.contains(entry("connector", "fake"), entry("a", "Test")); + +assertThatNoException() +.isThrownBy( +() -> +tEnv.createTable( +"T", +TableDescriptor.forConnector("fake") +.schema(schema) +.option("a", "Test") +.build(), +true)); + +assertThatThrownBy( +() -> +tEnv.createTable( +"T", +TableDescriptor.forConnector("fake") +.schema(schema) +.option("a", "Test") +.build(), +false)) +.isInstanceOf(ValidationException.class); Review Comment: ideally, we should also check the error message. at least parts of it. `hasMessageContaining` could do the job? ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -745,6 +745,35 @@ void createTemporarySystemFunction( */ void createTable(String path, TableDescriptor descriptor); +/** + * Registers the given {@link TableDescriptor} as a catalog table. + * + * The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and
Re: [PR] [FLINK-35718][mysql] cherry pick DBZ-5333 from debezium MySqlErrorHandler and add logs. [flink-cdc]
github-actions[bot] commented on PR #3440: URL: https://github.com/apache/flink-cdc/pull/3440#issuecomment-2505023790 This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35277][cdc-connector][db2] Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. [flink-cdc]
github-actions[bot] closed pull request #3286: [FLINK-35277][cdc-connector][db2] Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. URL: https://github.com/apache/flink-cdc/pull/3286 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36397][cdc-connector][mysql] Obtain the high watermark offset and table data in the same transaction during the Snapshot phase [flink-cdc]
github-actions[bot] commented on PR #3616: URL: https://github.com/apache/flink-cdc/pull/3616#issuecomment-2505023628 This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35277][cdc-connector][db2] Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. [flink-cdc]
github-actions[bot] commented on PR #3286: URL: https://github.com/apache/flink-cdc/pull/3286#issuecomment-2505023836 This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] SQL Server CDC support - fixing the issues in the outstanding pull request by another author [flink-cdc]
github-actions[bot] commented on PR #3507: URL: https://github.com/apache/flink-cdc/pull/3507#issuecomment-2505023722 This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35718][mysql] cherry pick DBZ-5333 from debezium MySqlErrorHandler and add logs. [flink-cdc]
github-actions[bot] closed pull request #3440: [FLINK-35718][mysql] cherry pick DBZ-5333 from debezium MySqlErrorHandler and add logs. URL: https://github.com/apache/flink-cdc/pull/3440 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] SQL Server CDC support - fixing the issues in the outstanding pull request by another author [flink-cdc]
github-actions[bot] closed pull request #3507: SQL Server CDC support - fixing the issues in the outstanding pull request by another author URL: https://github.com/apache/flink-cdc/pull/3507 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36814) Flink CDC Support Mutil Datasource
melin created FLINK-36814: - Summary: Flink CDC Support Mutil Datasource Key: FLINK-36814 URL: https://issues.apache.org/jira/browse/FLINK-36814 Project: Flink Issue Type: New Feature Components: Flink CDC Reporter: melin Flink CDC Support Mutil Datasource -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36815) series bugs of DSv2
[ https://issues.apache.org/jira/browse/FLINK-36815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-36815: -- Assignee: xuhuang > series bugs of DSv2 > --- > > Key: FLINK-36815 > URL: https://issues.apache.org/jira/browse/FLINK-36815 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 2.0.0 >Reporter: xuhuang >Assignee: xuhuang >Priority: Major > Labels: pull-request-available > > I found four bugs in DSv2. > 1. The DSv2 _org.apache.flink.datastream.api.ExecutionEnvironment#fromSource_ > method return a > {_}NonKeyedPartitionStream{_}, which does not allow users to set source > configurations, such as operator name or parallelism. > 2. The _org.apache.flink.datastream.api.context.TaskInfo_ lack > {_}sub{_}_taskIndex_ and {_}attemptNumber{_}. > 3. The _open_ and _close_ methods of > _org.apache.flink.datastream.api.function.ProcessFunction_ are not being > called. > 4. The _open_ method of > _org.apache.flink.datastream.api.function.ProcessFunction_ lacks relevant > information, such as the current TaskInfo. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36815) series bugs of DSv2
xuhuang created FLINK-36815: --- Summary: series bugs of DSv2 Key: FLINK-36815 URL: https://issues.apache.org/jira/browse/FLINK-36815 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 2.0.0 Reporter: xuhuang I found four bugs in DSv2. 1. The DSv2 _org.apache.flink.datastream.api.ExecutionEnvironment#fromSource_ method return a {_}NonKeyedPartitionStream{_}, which does not allow users to set source configurations, such as operator name or parallelism. 2. The _org.apache.flink.datastream.api.context.TaskInfo_ lack {_}sub{_}_taskIndex_ and {_}attemptNumber{_}. 3. The _open_ and _close_ methods of _org.apache.flink.datastream.api.function.ProcessFunction_ are not being called. 4. The _open_ method of _org.apache.flink.datastream.api.function.ProcessFunction_ lacks relevant information, such as the current TaskInfo. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36815][API] Fix fix series bugs of DSv2 [flink]
codenohup opened a new pull request, #25706: URL: https://github.com/apache/flink/pull/25706 ## What is the purpose of the change Fix fix series bugs of DSv2. 1. The DSv2 org.apache.flink.datastream.api.ExecutionEnvironment#fromSource method return a NonKeyedPartitionStream, which does not allow users to set source configurations, such as operator name or parallelism. 2. The org.apache.flink.datastream.api.context.TaskInfo lack subtaskIndex and attemptNumber. 3. The open and close methods of org.apache.flink.datastream.api.function.ProcessFunction are not being called. 4. The open method of org.apache.flink.datastream.api.function.ProcessFunction lacks relevant information, such as the current TaskInfo. ## Brief change log - Make ExecutionEnvironment#fromSource return ProcessConfigurableAndNonKeyedPartitionStream - Introduce subtask index and attempt id in TaskInfo for DSv2 - Open and close user function in DSv2 ProcessOperator - Introduce a NonPartitionedContext parameter in Open method of UserFunction for DSv2 ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36815) series bugs of DSv2
[ https://issues.apache.org/jira/browse/FLINK-36815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36815: --- Labels: pull-request-available (was: ) > series bugs of DSv2 > --- > > Key: FLINK-36815 > URL: https://issues.apache.org/jira/browse/FLINK-36815 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 2.0.0 >Reporter: xuhuang >Priority: Major > Labels: pull-request-available > > I found four bugs in DSv2. > 1. The DSv2 _org.apache.flink.datastream.api.ExecutionEnvironment#fromSource_ > method return a > {_}NonKeyedPartitionStream{_}, which does not allow users to set source > configurations, such as operator name or parallelism. > 2. The _org.apache.flink.datastream.api.context.TaskInfo_ lack > {_}sub{_}_taskIndex_ and {_}attemptNumber{_}. > 3. The _open_ and _close_ methods of > _org.apache.flink.datastream.api.function.ProcessFunction_ are not being > called. > 4. The _open_ method of > _org.apache.flink.datastream.api.function.ProcessFunction_ lacks relevant > information, such as the current TaskInfo. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36815][API] Fix fix series bugs of DSv2 [flink]
flinkbot commented on PR #25706: URL: https://github.com/apache/flink/pull/25706#issuecomment-2505464742 ## CI report: * f24821faddf52ab597bb635334aee0e252fbba65 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36788] Fix and cover global committer [flink]
AHeise commented on code in PR #25685: URL: https://github.com/apache/flink/pull/25685#discussion_r1860247749 ## flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java: ## @@ -39,12 +41,29 @@ private StandardSinkTopologies() {} */ public static void addGlobalCommitter( DataStream> committables, -SerializableSupplier> committerFactory, +SerializableFunction> committerFactory, SerializableSupplier> committableSerializer) { committables .getExecutionEnvironment() .addOperator( new GlobalCommitterTransform<>( committables, committerFactory, committableSerializer)); } + +/** + * Adds a global committer to the pipeline that runs as final operator with a parallelism of + * one. + */ +public static void addGlobalCommitter( Review Comment: It's fine to keep if the global committer doesn't need access to the ctx. It's relatively little code and could be considered sugar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format [flink-cdc]
yuxiqian commented on code in PR #3658: URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1860226446 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql: ## Review Comment: Accidental change? ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java: ## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.shyiko.mysql.binlog.event.deserialization.json; + +import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Base64; + +import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig.useLegacyJsonFormat; + +/** + * Copied from mysql-binlog-connector-java 0.27.2 to add whitespace before value and after comma. + * + * Line 105: Added whitespace before value, Line 207: Added whitespace after comma Review Comment: Seems line numbers have been differed from actual changes. ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java: ## Review Comment: Since DataStream / Table source connectors' behavior also changed, corresponding integrated test cases might be necessary. ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java: ## @@ -457,6 +495,61 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception { .isEqualTo(expectedStreamRecord); } +private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat) +throws Exception { +database.createAndInitialize(); +CloseableIterator iterator = +env.fromSource( +getFlinkSourceProvider( +new String[] {"json_types"}, +database, +useLegacyJsonFormat) +.getSource(), +WatermarkStrategy.noWatermarks(), +"Event-Source") +.executeAndCollect(); + +Object[] expectedSnapshot = +new Object[] { +DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), +BinaryStringData.fromString("{\"key1\": \"value1\"}"), +BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"), +BinaryStringData.fromString( +"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"), +1 +}; + +// skip CreateTableEvent +List snapshotResults = +MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; +RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); +Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES)) +.isEqualTo(expectedSnapshot); + +try (Connection connection = database.getJdbcConnection(); +Statement statement = connection.createStatement()) { +statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;"); +} + +Object[] expectedStreamRecord = expectedSnapshot; + +if (useLegacyJsonFormat) { +expectedSnapshot[1] = BinaryStringData.fromString("{\"key1\":\"value1\"}"); +expectedSnapshot[2] = + BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}"); +expectedSnapshot[3] = +BinaryStringData.fro
Re: [PR] [FLINK-36788] Fix and cover global committer [flink]
AHeise commented on code in PR #25685: URL: https://github.com/apache/flink/pull/25685#discussion_r1860251139 ## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java: ## @@ -90,7 +92,18 @@ private Collection translateInternal( transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME); transformation.setParallelism(1); transformation.setMaxParallelism(1); -return Collections.emptyList(); +copySafely(transformation::setName, globalCommitterTransform::getName); Review Comment: We set the attributes while expanding the post commit topology. We expand this transform almost at the end of the sink expansion. So it's fair to say that we have it in all cases where we set the properties during post commit expansion. It's not set if no uid or customUidHashes are user-supplied. I think name should always be set but I still use the copySafely method because you never know how things will evolve. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36788] Fix and cover global committer [flink]
AHeise commented on code in PR #25685: URL: https://github.com/apache/flink/pull/25685#discussion_r1860253151 ## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java: ## @@ -188,6 +196,14 @@ private void expand() { } } +private void repeatUntilConverged(Supplier producer) { +R lastResult = producer.get(); +R nextResult; +while (!lastResult.equals(nextResult = producer.get())) { +lastResult = nextResult; +} +} + private List> getSinkTransformations(int sizeBefore) { Review Comment: Yes that's why I create a copy now of the sublist (should be List.copyOf in newer JDK versions). If new transforms are added, we have a separate iteration with a separate copy that expands them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]
zentol commented on PR #25670: URL: https://github.com/apache/flink/pull/25670#issuecomment-2503299977 > why are the CI tests failing? Because the image was modified incorrectly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] DRAFT [FLINK-xxxx][flink-table] Support create table ignoreIfExists for table-api [flink]
Guttz closed pull request #25701: DRAFT [FLINK-][flink-table] Support create table ignoreIfExists for table-api URL: https://github.com/apache/flink/pull/25701 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33571][table] Override json-path version for Calcite 1.32 to deal with CVE [flink]
tomncooper commented on PR #25602: URL: https://github.com/apache/flink/pull/25602#issuecomment-2503426742 @snuyanzin Good point, I ammended the title and commit message to point at [FLINK-33571](https://issues.apache.org/jira/browse/FLINK-33571). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]
pnowojski merged PR #25671: URL: https://github.com/apache/flink/pull/25671 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Do skip distributing maxAllowedWatermark if there are no sub… [flink]
pnowojski merged PR #25693: URL: https://github.com/apache/flink/pull/25693 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-36764) Add checkpoint type to checkpoint trace
[ https://issues.apache.org/jira/browse/FLINK-36764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-36764. -- Resolution: Fixed merged commit 09ad451 into apache:master > Add checkpoint type to checkpoint trace > --- > > Key: FLINK-36764 > URL: https://issues.apache.org/jira/browse/FLINK-36764 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Metrics >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > Currently it's impossible to distinguish checkpoints from savepoints. Also it > would be handy to distinguish aligned and unaligned checkpoints. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Handle map and array types binary record data [flink-cdc]
lvyanquan commented on PR #3434: URL: https://github.com/apache/flink-cdc/pull/3434#issuecomment-2503216515 Hi, @umeshdangat gentle ping. I think that this pr is an important base for many other following features, and we can try to merge it as soon as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update blogpost for new KinesisStreamsSource and DynamoDbStreamsSource [flink-web]
foxus opened a new pull request, #767: URL: https://github.com/apache/flink-web/pull/767 I've addressed some minor grammar nits in the KinesisStreamsSource and DynamoDbStreamsSource blog. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] (1.19 backport)[FLINK-36751] Fix PausableRelativeClock does not pause when the source only has one split [flink]
rkhachatryan commented on PR #25695: URL: https://github.com/apache/flink/pull/25695#issuecomment-2503274062 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36788] Fix and cover global committer [flink]
fapaul commented on code in PR #25685: URL: https://github.com/apache/flink/pull/25685#discussion_r1860223898 ## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java: ## @@ -90,7 +92,18 @@ private Collection translateInternal( transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME); transformation.setParallelism(1); transformation.setMaxParallelism(1); -return Collections.emptyList(); +copySafely(transformation::setName, globalCommitterTransform::getName); Review Comment: In which scenarios does the `globalCommitterTransform` have it's own attributes? ## flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java: ## @@ -39,12 +41,29 @@ private StandardSinkTopologies() {} */ public static void addGlobalCommitter( DataStream> committables, -SerializableSupplier> committerFactory, +SerializableFunction> committerFactory, SerializableSupplier> committableSerializer) { committables .getExecutionEnvironment() .addOperator( new GlobalCommitterTransform<>( committables, committerFactory, committableSerializer)); } + +/** + * Adds a global committer to the pipeline that runs as final operator with a parallelism of + * one. + */ +public static void addGlobalCommitter( Review Comment: Nit: Should we mark this method as deprecated to signal it will eventually be removed? ## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java: ## @@ -188,6 +196,14 @@ private void expand() { } } +private void repeatUntilConverged(Supplier producer) { +R lastResult = producer.get(); +R nextResult; +while (!lastResult.equals(nextResult = producer.get())) { +lastResult = nextResult; +} +} + private List> getSinkTransformations(int sizeBefore) { Review Comment: Didn't we discuss offline that the current behavior of sublisting the transformations isn't safe? ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java: ## @@ -241,7 +242,8 @@ public TestSinkV2WithPostCommitTopology( @Override public void addPostCommitTopology(DataStream> committables) { -// We do not need to do anything for tests +StandardSinkTopologies.addGlobalCommitter( Review Comment: Nit: Should we open a follow-up to increase the coverage even further by also covering the expansions of pre/post-commit topology? ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java: ## @@ -18,77 +18,285 @@ package org.apache.flink.streaming.api.graph; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy; import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; +import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.function.Predicate; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Tests for {@link org.apache.flink.streaming.api.transformations.SinkTransformation}. * * ATTENTION: This test is extremely brittle. Do NOT remove, add or re-order test cases. */ -@ExtendWith(ParameterizedTestExtension
[jira] [Commented] (FLINK-36780) Kafka source disable partition discovery unexpectedly
[ https://issues.apache.org/jira/browse/FLINK-36780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901414#comment-17901414 ] Hongshun Wang commented on FLINK-36780: --- [~liuml07] , I agree with you. This modification should be noticed for user when release. > Kafka source disable partition discovery unexpectedly > - > > Key: FLINK-36780 > URL: https://issues.apache.org/jira/browse/FLINK-36780 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-4.0.0, kafka-3.2.0, kafka-3.3.0, kafka-3.4.0 >Reporter: Mingliang Liu >Assignee: Mingliang Liu >Priority: Major > Labels: pull-request-available > > Currently Kafka source enables partition discovery. This is set by > {{partition.discovery.interval.ms}}, aka > {{KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS}}. The default value is > 5 minutes, which is equal to the default value of {{metadata.max.age.ms}} in > Kafka. > However, it's disabled by default unexpectedly in the source builder > ([code|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L476-L480]). > The intention I believe was to only disable for bounded source. > We need a fix that is able to keep the default partition discovery. This > could cause data loss after Kafka retention if the new partitions are not > consumed silently. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [Minor][Doc] fix spelling error. [flink-cdc]
lvyanquan opened a new pull request, #3764: URL: https://github.com/apache/flink-cdc/pull/3764 Sorry for this mistake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [docs] Update download links to up-to-date cdc version [flink-cdc]
yuxiqian opened a new pull request, #3765: URL: https://github.com/apache/flink-cdc/pull/3765 Use parameterized version string instead of write hard-encoded links. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [docs] Update download links to up-to-date cdc version [flink-cdc]
yuxiqian opened a new pull request, #3766: URL: https://github.com/apache/flink-cdc/pull/3766 Use parameterized version string instead of write hard-encoded links. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-3.2][docs] Update download links to up-to-date cdc version [flink-cdc]
yuxiqian closed pull request #3765: [BP-3.2][docs] Update download links to up-to-date cdc version URL: https://github.com/apache/flink-cdc/pull/3765 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36806) Support for synchronization between timestamp fields with and without time zones to avoid time offsets
zjjiang created FLINK-36806: --- Summary: Support for synchronization between timestamp fields with and without time zones to avoid time offsets Key: FLINK-36806 URL: https://issues.apache.org/jira/browse/FLINK-36806 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.2.1, cdc-3.1.0, cdc-3.3.0 Reporter: zjjiang Attachments: image-2024-11-27-16-23-22-995.png Currently, FlinkCDC can correctly synchronize the same time type (where the type indicates whether the time zone is included or not), but time offsets can occur when synchronizing between fields of different time types. For example, when synchronizing the field DATETIME (without time zone) in MySQL to the field TIMESTAMPTZ (with time zone) in Iceberg, there is an 8-hour offset after synchronization when the time in mysql represents Beijing time. This kind of synchronization in different time type fields is very common in practical applications, however, the time offset will be very troublesome for business. Therefore, Flink CDC's support for the conversion of different time types will avoid the time offset and help improve the data consistency and accuracy of synchronization. !image-2024-11-27-16-23-22-995.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36806) Support for synchronization between timestamp fields with and without time zones to avoid time offsets
[ https://issues.apache.org/jira/browse/FLINK-36806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zjjiang updated FLINK-36806: Description: Currently, FlinkCDC can correctly synchronize the same time type (where the type indicates whether the time zone is included or not), but time offsets can occur when synchronizing between fields of different time types. For example, when synchronizing the field DATETIME (without time zone) in MySQL to the field TIMESTAMPTZ (with time zone) in Iceberg, there is an 8-hour offset after synchronization when the time in mysql represents Beijing time. This kind of synchronization in different time type fields is very common in practical applications, however, the time offset will be very troublesome for business. Therefore, Flink CDC's support for the conversion of different time types will avoid the time offset and help improve the data consistency and accuracy of synchronization. !image-2024-11-27-16-23-22-995.png|width=1004,height=375! was: Currently, FlinkCDC can correctly synchronize the same time type (where the type indicates whether the time zone is included or not), but time offsets can occur when synchronizing between fields of different time types. For example, when synchronizing the field DATETIME (without time zone) in MySQL to the field TIMESTAMPTZ (with time zone) in Iceberg, there is an 8-hour offset after synchronization when the time in mysql represents Beijing time. This kind of synchronization in different time type fields is very common in practical applications, however, the time offset will be very troublesome for business. Therefore, Flink CDC's support for the conversion of different time types will avoid the time offset and help improve the data consistency and accuracy of synchronization. !image-2024-11-27-16-23-22-995.png! > Support for synchronization between timestamp fields with and without time > zones to avoid time offsets > -- > > Key: FLINK-36806 > URL: https://issues.apache.org/jira/browse/FLINK-36806 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.3.0, cdc-3.2.1 >Reporter: zjjiang >Priority: Major > Attachments: image-2024-11-27-16-23-22-995.png > > > Currently, FlinkCDC can correctly synchronize the same time type (where the > type indicates whether the time zone is included or not), but time offsets > can occur when synchronizing between fields of different time types. For > example, when synchronizing the field DATETIME (without time zone) in MySQL > to the field TIMESTAMPTZ (with time zone) in Iceberg, there is an 8-hour > offset after synchronization when the time in mysql represents Beijing time. > This kind of synchronization in different time type fields is very common in > practical applications, however, the time offset will be very troublesome for > business. Therefore, Flink CDC's support for the conversion of different time > types will avoid the time offset and help improve the data consistency and > accuracy of synchronization. > > !image-2024-11-27-16-23-22-995.png|width=1004,height=375! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36788] Fix and cover global committer [flink]
AHeise commented on code in PR #25685: URL: https://github.com/apache/flink/pull/25685#discussion_r1860258571 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java: ## @@ -241,7 +242,8 @@ public TestSinkV2WithPostCommitTopology( @Override public void addPostCommitTopology(DataStream> committables) { -// We do not need to do anything for tests +StandardSinkTopologies.addGlobalCommitter( Review Comment: That's a good point. I created https://issues.apache.org/jira/browse/FLINK-36807 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org