Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]

2024-10-17 Thread via GitHub


pnowojski commented on code in PR #25539:
URL: https://github.com/apache/flink/pull/25539#discussion_r1804401155


##
flink-metrics/flink-metrics-otel/pom.xml:
##
@@ -0,0 +1,141 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   2.0-SNAPSHOT
+   
+
+   flink-metrics-otel
+   Flink : Metrics : OpenTelemetry
+
+   
+   1.30.0

Review Comment:
   I couldn't find matching higher versions of all of those libraries :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]

2024-10-17 Thread via GitHub


pnowojski commented on code in PR #25539:
URL: https://github.com/apache/flink/pull/25539#discussion_r1804401532


##
flink-metrics/flink-metrics-otel/pom.xml:
##
@@ -0,0 +1,141 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   2.0-SNAPSHOT
+   
+
+   flink-metrics-otel
+   Flink : Metrics : OpenTelemetry
+
+   
+   1.30.0
+   
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+
+   
+   io.opentelemetry
+   opentelemetry-sdk
+   ${opentelemetry.version}
+   ${flink.markBundledAsOptional}
+   
+
+   
+   io.opentelemetry
+   opentelemetry-sdk-common
+   ${opentelemetry.version}
+   ${flink.markBundledAsOptional}
+   
+
+   
+   io.opentelemetry
+   opentelemetry-exporter-logging
+   ${opentelemetry.version}
+   ${flink.markBundledAsOptional}
+   
+
+   
+   io.opentelemetry
+   opentelemetry-exporter-otlp
+   ${opentelemetry.version}
+   ${flink.markBundledAsOptional}
+   
+
+   
+   io.opentelemetry
+   opentelemetry-semconv
+   ${opentelemetry.version}-alpha
+   ${flink.markBundledAsOptional}
+   
+
+   
+
+   
+   io.opentelemetry.javaagent
+   opentelemetry-testing-common
+   ${opentelemetry.version}-alpha
+   test
+   
+
+   
+   io.opentelemetry.javaagent
+   opentelemetry-agent-for-testing
+   ${opentelemetry.version}-alpha
+   test
+   
+
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   test
+   test-jar
+   
+
+   
+   org.apache.flink
+   flink-test-utils-junit
+   ${project.version}
+   
+
+   
+   org.mock-server
+   
mockserver-netty-no-dependencies
+   5.15.0
+   test
+   
+
+   
+   commons-codec
+   commons-codec
+   1.17.1

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33410] Add a test for UNBOUNDED PRECEDING [flink]

2024-10-17 Thread via GitHub


dawidwys merged PR #25536:
URL: https://github.com/apache/flink/pull/25536


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy [flink-kubernetes-operator]

2024-10-17 Thread via GitHub


huyuanfeng2018 commented on PR #904:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/904#issuecomment-2418982398

   > I'm lacking context for the change so my comments could be missing the 
mark but I think that means I represent a lot of people trying to consume the 
proposed config, if the naming confuses me its likely to confuse others.
   
   @SamBarker  Thank you very much for your review. `scaling.radical.enabled` 
is really not easy to understand. I think your concern is warranted. I will 
think about the naming of this option.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36560) Fix the issue of timestamp_ltz increasing by 8 hours in Paimon

2024-10-17 Thread LvYanquan (Jira)
LvYanquan created FLINK-36560:
-

 Summary: Fix the issue of timestamp_ltz increasing by 8 hours in 
Paimon
 Key: FLINK-36560
 URL: https://issues.apache.org/jira/browse/FLINK-36560
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.1
Reporter: LvYanquan
 Fix For: cdc-3.2.1
 Attachments: image-2024-10-17-17-16-01-773.png

When synchronizing the timestamp field type in MySQL, it was found that the 
time displayed in the Paimon table was incorrect.
How to reproduct:
{code:java}
CREATE TABLE `orders` (
  order_id bigint not null primary key,
  user_id varchar(50) not null,
  shop_id bigint not null,
  product_id bigint not null,
  buy_fee bigint not null,   
  create_time timestamp not null,
  update_time timestamp not null default now(),
  state int not null
);

INSERT INTO orders VALUES
(11, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 
18:42:56', 1),
(12, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 
18:42:56', 1),
(13, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 
18:42:56', 1),
(14, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 
18:42:56', 1),
(15, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 
18:42:56', 1),
(16, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 
18:42:56', 1),
(17, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 
18:42:56', 1);{code}
My yaml job is like following:
source:
type: mysql
hostname: host
port: 3306
username: flink
password: xx
tables: yaml_test.\.*
server-id: 22600-22620

sink:
type: paimon
catalog.properties.metastore: filesystem
catalog.properties.warehouse: xx
catalog.properties.fs.oss.endpoint: xx
catalog.properties.fs.oss.accessKeyId:xx
catalog.properties.fs.oss.accessKeySecret: xx

pipeline:
name: MySQL Database to Paimon Database

Currently, the result is like following:
the `create_time` and `update_time` fields are no correct.
||order_id||user_id||shop_id||product_id||buy_fee||create_time||update_time||state||
|100,001|user_001|12,345|1|5,000|2023-02-16 00:40:56|2023-02-16 02:42:56|1|
|100,002|user_002|12,346|2|4,000|2023-02-15 23:40:56|2023-02-16 02:42:56|1|
|100,003|user_003|12,347|3|3,000|2023-02-15 22:40:56|2023-02-16 02:42:56|1|
|100,004|user_001|12,347|4|2,000|2023-02-15 21:40:56|2023-02-16 02:42:56|1|
|100,005|user_002|12,348|5|1,000|2023-02-15 20:40:56|2023-02-16 02:42:56|1|
|100,006|user_001|12,348|1|1,000|2023-02-15 19:40:56|2023-02-16 02:42:56|1|
|100,007|user_003|12,347|4|2,000|2023-02-15 18:40:56|2023-02-16 02:42:56|1|

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36560) Fix the issue of timestamp_ltz increasing by 8 hours in Paimon

2024-10-17 Thread LvYanquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890385#comment-17890385
 ] 

LvYanquan commented on FLINK-36560:
---

I've found the issue in code, and willing to fix it.

> Fix the issue of timestamp_ltz increasing by 8 hours in Paimon
> --
>
> Key: FLINK-36560
> URL: https://issues.apache.org/jira/browse/FLINK-36560
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.1
>Reporter: LvYanquan
>Priority: Major
> Fix For: cdc-3.2.1
>
> Attachments: image-2024-10-17-17-16-01-773.png
>
>
> When synchronizing the timestamp field type in MySQL, it was found that the 
> time displayed in the Paimon table was incorrect.
> How to reproduct:
> {code:java}
> CREATE TABLE `orders` (
>   order_id bigint not null primary key,
>   user_id varchar(50) not null,
>   shop_id bigint not null,
>   product_id bigint not null,
>   buy_fee bigint not null,   
>   create_time timestamp not null,
>   update_time timestamp not null default now(),
>   state int not null
> );
> INSERT INTO orders VALUES
> (11, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 
> 18:42:56', 1),
> (12, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 
> 18:42:56', 1),
> (13, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 
> 18:42:56', 1),
> (14, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 
> 18:42:56', 1),
> (15, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 
> 18:42:56', 1),
> (16, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 
> 18:42:56', 1),
> (17, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 
> 18:42:56', 1);{code}
> My yaml job is like following:
> source:
> type: mysql
> hostname: host
> port: 3306
> username: flink
> password: xx
> tables: yaml_test.\.*
> server-id: 22600-22620
> sink:
> type: paimon
> catalog.properties.metastore: filesystem
> catalog.properties.warehouse: xx
> catalog.properties.fs.oss.endpoint: xx
> catalog.properties.fs.oss.accessKeyId:xx
> catalog.properties.fs.oss.accessKeySecret: xx
> pipeline:
> name: MySQL Database to Paimon Database
> Currently, the result is like following:
> the `create_time` and `update_time` fields are no correct.
> ||order_id||user_id||shop_id||product_id||buy_fee||create_time||update_time||state||
> |100,001|user_001|12,345|1|5,000|2023-02-16 00:40:56|2023-02-16 02:42:56|1|
> |100,002|user_002|12,346|2|4,000|2023-02-15 23:40:56|2023-02-16 02:42:56|1|
> |100,003|user_003|12,347|3|3,000|2023-02-15 22:40:56|2023-02-16 02:42:56|1|
> |100,004|user_001|12,347|4|2,000|2023-02-15 21:40:56|2023-02-16 02:42:56|1|
> |100,005|user_002|12,348|5|1,000|2023-02-15 20:40:56|2023-02-16 02:42:56|1|
> |100,006|user_001|12,348|1|1,000|2023-02-15 19:40:56|2023-02-16 02:42:56|1|
> |100,007|user_003|12,347|4|2,000|2023-02-15 18:40:56|2023-02-16 02:42:56|1|
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35031][runtime] LatencyMarker/RecordAttribute emitting under async execution model [flink]

2024-10-17 Thread via GitHub


fredia merged PR #25503:
URL: https://github.com/apache/flink/pull/25503


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 [flink]

2024-10-17 Thread via GitHub


SamBarker commented on code in PR #25494:
URL: https://github.com/apache/flink/pull/25494#discussion_r1804065549


##
flink-rpc/flink-rpc-akka/pom.xml:
##
@@ -94,8 +94,8 @@ under the License.


io.netty
-   netty
-   3.10.6.Final
+   netty-all

Review Comment:
   Pekko tests that `io.netty.channel.Channel` (via reflection) is on the 
classpath so shading is still required to apply the relocations, even if its 
not moving the classes around. 
   
   However as shading is only applied during package and so after test 
execution something has to supply netty for the tests in `flink-rpc-akka` so 
having netty-all in `test` scope rather than `provided` might just work.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy [flink-kubernetes-operator]

2024-10-17 Thread via GitHub


SamBarker commented on code in PR #904:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/904#discussion_r1804328631


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##
@@ -416,21 +416,31 @@ protected static > int scale(
 // Optimize the case where newParallelism <= 
maxParallelism / 2
 newParallelism > numKeyGroupsOrPartitions / 2
 ? numKeyGroupsOrPartitions
-: numKeyGroupsOrPartitions / 2,
+: numKeyGroupsOrPartitions / 2 + 
numKeyGroupsOrPartitions % 2,
 upperBound);
 
+boolean scalingRadical =
+
context.getConfiguration().get(AutoScalerOptions.SCALING_RADICAL_ENABLED);
+
 // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source,
 // we try to adjust the parallelism such that it divides
 // the numKeyGroupsOrPartitions without a remainder => data is evenly 
spread across subtasks
 for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
-if (numKeyGroupsOrPartitions % p == 0) {
+if (numKeyGroupsOrPartitions % p == 0
+||
+// When scaling radical is enabled, Try to find the 
smallest parallelism that
+// can satisfy the
+// current consumption rate.
+(scalingRadical
+&& numKeyGroupsOrPartitions / p
+< numKeyGroupsOrPartitions / 
newParallelism)) {

Review Comment:
   I think extracting this as a method `canMaximiseUtilisation` the intent is 
to make the intent of the condition easier to understand when working through 
the code. 



##
docs/layouts/shortcodes/generated/auto_scaler_configuration.html:
##
@@ -188,6 +188,12 @@
 Duration
 Time interval to resend the identical event
 
+
+job.autoscaler.scaling.radical.enabled

Review Comment:
   Coming at this cold, its not at all clear to me what radical means. While 
the description goes some way towards clarifying the intent it doesn't feel 
like a great term, additionally following through the JIRA links `radical` 
feels like a very off term for a default (assuming I'm following properly). I 
wonder if `job.autoscaler.scaling.maximizeUtilisation.enabled` would make 
things more explicit?



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##
@@ -416,21 +416,31 @@ protected static > int scale(
 // Optimize the case where newParallelism <= 
maxParallelism / 2
 newParallelism > numKeyGroupsOrPartitions / 2
 ? numKeyGroupsOrPartitions
-: numKeyGroupsOrPartitions / 2,
+: numKeyGroupsOrPartitions / 2 + 
numKeyGroupsOrPartitions % 2,
 upperBound);
 
+boolean scalingRadical =
+
context.getConfiguration().get(AutoScalerOptions.SCALING_RADICAL_ENABLED);
+
 // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source,
 // we try to adjust the parallelism such that it divides
 // the numKeyGroupsOrPartitions without a remainder => data is evenly 
spread across subtasks
 for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
-if (numKeyGroupsOrPartitions % p == 0) {
+if (numKeyGroupsOrPartitions % p == 0
+||
+// When scaling radical is enabled, Try to find the 
smallest parallelism that
+// can satisfy the
+// current consumption rate.
+(scalingRadical
+&& numKeyGroupsOrPartitions / p
+< numKeyGroupsOrPartitions / 
newParallelism)) {
 return p;
 }
 }
 
-// When adjust the parallelism after rounding up cannot be evenly 
divided by
-// numKeyGroupsOrPartitions, Try to find the smallest parallelism that 
can satisfy the
-// current consumption rate.
+// When adjust the parallelism after rounding up cannot be
+// find the right degree of parallelism to meet requirements,
+// Try to find the smallest parallelism that can satisfy the current 
consumption rate.

Review Comment:
   nits
   ```suggestion
   // When adjusting the parallelism after rounding up cannot
   // find the right degree of parallelism to meet requirements.
   // Try to find the smallest parallelism that can satisfy the current 
consumption rate.
   ```



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.j

[jira] [Assigned] (FLINK-4602) Move RocksDB backend to proper package

2024-10-17 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-4602:
--

Assignee: Han Yin

> Move RocksDB backend to proper package
> --
>
> Key: FLINK-4602
> URL: https://issues.apache.org/jira/browse/FLINK-4602
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / State Backends
>Reporter: Aljoscha Krettek
>Assignee: Han Yin
>Priority: Major
>  Labels: 2.0-related, auto-unassigned
> Fix For: 2.0.0
>
>
> Right now the package is {{org.apache.flink.contrib.streaming.state}}, it 
> should probably be in {{org.apache.flink.runtime.state.rocksdb}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-4602) Move RocksDB backend to proper package

2024-10-17 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan updated FLINK-4602:
---
Parent: (was: FLINK-3957)
Issue Type: Technical Debt  (was: Sub-task)

> Move RocksDB backend to proper package
> --
>
> Key: FLINK-4602
> URL: https://issues.apache.org/jira/browse/FLINK-4602
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / State Backends
>Reporter: Aljoscha Krettek
>Priority: Major
>  Labels: 2.0-related, auto-unassigned
> Fix For: 2.0.0
>
>
> Right now the package is {{org.apache.flink.contrib.streaming.state}}, it 
> should probably be in {{org.apache.flink.runtime.state.rocksdb}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] FLINK-36479 remove deprecated method Table getSchema as [flink]

2024-10-17 Thread via GitHub


davidradl commented on code in PR #25540:
URL: https://github.com/apache/flink/pull/25540#discussion_r1804348228


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java:
##
@@ -161,27 +147,6 @@ TemporalTableFunction createTemporalTableFunction(
  */
 Table as(String field, String... fields);
 
-/**
- * Renames the fields of the expression result. Use this to disambiguate 
fields before joining
- * to operations.
- *

Review Comment:
   I see a test that uses this function in Scala in OverWindowValidationTest is 
has .
   
`Over.partitionBy($"c").orderBy($"rowtime").preceding(2).following($"xx").as($"w"))`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36559) [docs]Add flink cdc elasticsearch pipeline sink to docs

2024-10-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36559:
---
Labels: pull-request-available  (was: )

> [docs]Add flink cdc elasticsearch pipeline sink to docs
> ---
>
> Key: FLINK-36559
> URL: https://issues.apache.org/jira/browse/FLINK-36559
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36559) [docs]Add elasticsearch sink to docs

2024-10-17 Thread JunboWang (Jira)
JunboWang created FLINK-36559:
-

 Summary: [docs]Add elasticsearch sink to docs
 Key: FLINK-36559
 URL: https://issues.apache.org/jira/browse/FLINK-36559
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: JunboWang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36559) [docs]Add flink cdc elasticsearch pipeline sink to docs

2024-10-17 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-36559:
--
Summary: [docs]Add flink cdc elasticsearch pipeline sink to docs  (was: 
[docs]Add elasticsearch sink to docs)

> [docs]Add flink cdc elasticsearch pipeline sink to docs
> ---
>
> Key: FLINK-36559
> URL: https://issues.apache.org/jira/browse/FLINK-36559
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36559) [docs]Add elasticsearch sink to docs

2024-10-17 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-36559:
--
Component/s: Flink CDC
 (was: Documentation)

> [docs]Add elasticsearch sink to docs
> 
>
> Key: FLINK-36559
> URL: https://issues.apache.org/jira/browse/FLINK-36559
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36517][cdc-connect][paimon] use filterAndCommit API for Avoid commit the same datafile duplicate [flink-cdc]

2024-10-17 Thread via GitHub


lvyanquan commented on code in PR #3639:
URL: https://github.com/apache/flink-cdc/pull/3639#discussion_r1804467910


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java:
##
@@ -454,6 +454,46 @@ public void testSinkWithMultiTables(String metastore)
 Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", 
"1")), result);
 }
 
+@ParameterizedTest
+@ValueSource(strings = {"filesystem", "hive"})
+public void testDuplicateCommitAfterRestore(String metastore)
+throws IOException, InterruptedException, 
Catalog.DatabaseNotEmptyException,
+Catalog.DatabaseNotExistException, SchemaEvolveException {
+initialize(metastore);
+PaimonSink paimonSink =
+new PaimonSink<>(
+catalogOptions, new 
PaimonRecordEventSerializer(ZoneId.systemDefault()));
+PaimonWriter writer = paimonSink.createWriter(new 
MockInitContext());
+Committer committer = 
paimonSink.createCommitter();
+
+// insert
+for (Event event : createTestEvents()) {
+writer.write(event, null);
+}
+writer.flush(false);
+Collection> 
commitRequests =
+writer.prepareCommit().stream()
+.map(MockCommitRequestImpl::new)
+.collect(Collectors.toList());
+committer.commit(commitRequests);
+
+// We've two steps in checkpoint: 1. snapshotState(ckp); 2. 
notifyCheckpointComplete(ckp).
+// It's possible that flink job will restore from a checkpoint with 
only step#1 finished and
+// step#2 not.
+// CommitterOperator will try to re-commit recovered transactions.
+committer.commit(commitRequests);

Review Comment:
   Considering there is another issue 
https://issues.apache.org/jira/browse/FLINK-36541 in PaimonWriter, If there is 
a problem with adding this loop, you can skip it for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]

2024-10-17 Thread via GitHub


mateczagany commented on PR #25509:
URL: https://github.com/apache/flink/pull/25509#issuecomment-2418969804

   I have reproduced this issue easily, but I could not reproduce it if 
snapshot compression is turned on. 
   
   IIUC when using snapshot compression, `SnappyFramedInputStream#available()` 
will return the number of bytes uncompressed in the buffer, and we should skip 
those.
   
   When not using snapshot compression, we call 
`BufferedInputStream#available()` which will be the number of buffered bytes, 
and in my tests the buffer size was 4096, so if we read e.g. 50 bytes of data 
during the last deserialization, this was 4046. 
   Skipping this many bytes resulted in the next seek having to seek backwards, 
and that will cause the S3 client to close and re-open the stream.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy [flink-kubernetes-operator]

2024-10-17 Thread via GitHub


huyuanfeng2018 opened a new pull request, #904:
URL: https://github.com/apache/flink-kubernetes-operator/pull/904

   
   
   ## What is the purpose of the change
   Introduce a parameter to support autoscaler adopt a more radical strategy 
when source vertex or upstream shuffle is keyBy
   
   ## Brief change log
   
 - *Add a new option: `scaling.radical.enabled`*
 - *Support use a more aggressive strategy(Resource utilization takes 
priority) to determine the degree of parallelism after Source or after keyby 
without first considering balanced consumption.*
   
   ## Verifying this change
   
   
   in `org.apache.flink.autoscaler.JobVertexScalerTest.JobVertexScalerTest`
   * `testParallelismComputationWithAdjustment` and 
`testNumPartitionsAdjustment` add logic to test
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(yes / no)
 - Core observer or reconciler logic that is regularly executed: (yes / no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36527) Introduce a parameter to support the autoscaler to adopt a more aggressive strategy when Source or upstream shuffle is keyBy

2024-10-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36527:
---
Labels: pull-request-available  (was: )

> Introduce a parameter to support the autoscaler to adopt a more aggressive 
> strategy when Source or upstream shuffle is keyBy
> 
>
> Key: FLINK-36527
> URL: https://issues.apache.org/jira/browse/FLINK-36527
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Assignee: yuanfenghu
>Priority: Minor
>  Labels: pull-request-available
>
> In 
> [https://issues.apache.org/jira/browse/FLINK-36192|https://issues.apache.org/jira/browse/FLINK-36192]
> We have completed the optimization of Souce parallelism determination.
> Task Source or upstream shuffle is keyBy will be adjusted to be a divisor of 
> the number of partitions or the maximum degree of parallelism.
> This Jira hopes to introduce a parameter to enable a {color:#de350b}more 
> aggressive{color} strategy : 
> https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1764419949



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33410] Add a test for UNBOUNDED PRECEDING [flink]

2024-10-17 Thread via GitHub


dawidwys commented on PR #25536:
URL: https://github.com/apache/flink/pull/25536#issuecomment-2418841891

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36560) Fix the issue of timestamp_ltz increasing by 8 hours in Paimon

2024-10-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36560:
---
Labels: pull-request-available  (was: )

> Fix the issue of timestamp_ltz increasing by 8 hours in Paimon
> --
>
> Key: FLINK-36560
> URL: https://issues.apache.org/jira/browse/FLINK-36560
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.1
>Reporter: LvYanquan
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.1
>
> Attachments: image-2024-10-17-17-16-01-773.png
>
>
> When synchronizing the timestamp field type in MySQL, it was found that the 
> time displayed in the Paimon table was incorrect.
> How to reproduct:
> {code:java}
> CREATE TABLE `orders` (
>   order_id bigint not null primary key,
>   user_id varchar(50) not null,
>   shop_id bigint not null,
>   product_id bigint not null,
>   buy_fee bigint not null,   
>   create_time timestamp not null,
>   update_time timestamp not null default now(),
>   state int not null
> );
> INSERT INTO orders VALUES
> (11, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 
> 18:42:56', 1),
> (12, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 
> 18:42:56', 1),
> (13, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 
> 18:42:56', 1),
> (14, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 
> 18:42:56', 1),
> (15, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 
> 18:42:56', 1),
> (16, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 
> 18:42:56', 1),
> (17, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 
> 18:42:56', 1);{code}
> My yaml job is like following:
> source:
> type: mysql
> hostname: host
> port: 3306
> username: flink
> password: xx
> tables: yaml_test.\.*
> server-id: 22600-22620
> sink:
> type: paimon
> catalog.properties.metastore: filesystem
> catalog.properties.warehouse: xx
> catalog.properties.fs.oss.endpoint: xx
> catalog.properties.fs.oss.accessKeyId:xx
> catalog.properties.fs.oss.accessKeySecret: xx
> pipeline:
> name: MySQL Database to Paimon Database
> Currently, the result is like following:
> the `create_time` and `update_time` fields are no correct.
> ||order_id||user_id||shop_id||product_id||buy_fee||create_time||update_time||state||
> |100,001|user_001|12,345|1|5,000|2023-02-16 00:40:56|2023-02-16 02:42:56|1|
> |100,002|user_002|12,346|2|4,000|2023-02-15 23:40:56|2023-02-16 02:42:56|1|
> |100,003|user_003|12,347|3|3,000|2023-02-15 22:40:56|2023-02-16 02:42:56|1|
> |100,004|user_001|12,347|4|2,000|2023-02-15 21:40:56|2023-02-16 02:42:56|1|
> |100,005|user_002|12,348|5|1,000|2023-02-15 20:40:56|2023-02-16 02:42:56|1|
> |100,006|user_001|12,348|1|1,000|2023-02-15 19:40:56|2023-02-16 02:42:56|1|
> |100,007|user_003|12,347|4|2,000|2023-02-15 18:40:56|2023-02-16 02:42:56|1|
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]

2024-10-17 Thread via GitHub


Zakelly commented on code in PR #25501:
URL: https://github.com/apache/flink/pull/25501#discussion_r1804441714


##
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java:
##
@@ -111,6 +112,7 @@ interface Provider extends Serializable {
  InternalTimeServiceManager create(
 TaskIOMetricGroup taskIOMetricGroup,
 CheckpointableKeyedStateBackend keyedStatedBackend,
+AsyncKeyedStateBackend asyncKeyedStateBackend,

Review Comment:
   How about merge the two parameter into one state backend? Like this
   ```suggestion
   PriorityQueueSetFactory factory,
   KeyGroupRange keyGroupRange,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36560][pipeline-connector][paimon] fix the issue that timestamp_ltz field is not correctly converted. [flink-cdc]

2024-10-17 Thread via GitHub


lvyanquan commented on PR #3648:
URL: https://github.com/apache/flink-cdc/pull/3648#issuecomment-2419036919

   @beryllw @yuxiqian @ruanhang1993 could you please help to review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35031) Latency marker emitting under async execution model

2024-10-17 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei resolved FLINK-35031.

  Assignee: Yanfei Lei
Resolution: Resolved

> Latency marker emitting under async execution model
> ---
>
> Key: FLINK-35031
> URL: https://issues.apache.org/jira/browse/FLINK-35031
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35031) Latency marker emitting under async execution model

2024-10-17 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890390#comment-17890390
 ] 

Yanfei Lei commented on FLINK-35031:


Merged into master via ecb41150dab63c0c12b8dc3c447810e7e93df115

> Latency marker emitting under async execution model
> ---
>
> Key: FLINK-35031
> URL: https://issues.apache.org/jira/browse/FLINK-35031
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36517][cdc-connect][paimon] use filterAndCommit API for Avoid commit the same datafile duplicate [flink-cdc]

2024-10-17 Thread via GitHub


lvyanquan commented on code in PR #3639:
URL: https://github.com/apache/flink-cdc/pull/3639#discussion_r1804454752


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java:
##
@@ -454,6 +454,46 @@ public void testSinkWithMultiTables(String metastore)
 Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", 
"1")), result);
 }
 
+@ParameterizedTest
+@ValueSource(strings = {"filesystem", "hive"})
+public void testDuplicateCommitAfterRestore(String metastore)
+throws IOException, InterruptedException, 
Catalog.DatabaseNotEmptyException,
+Catalog.DatabaseNotExistException, SchemaEvolveException {
+initialize(metastore);
+PaimonSink paimonSink =
+new PaimonSink<>(
+catalogOptions, new 
PaimonRecordEventSerializer(ZoneId.systemDefault()));
+PaimonWriter writer = paimonSink.createWriter(new 
MockInitContext());
+Committer committer = 
paimonSink.createCommitter();
+
+// insert
+for (Event event : createTestEvents()) {
+writer.write(event, null);
+}
+writer.flush(false);
+Collection> 
commitRequests =
+writer.prepareCommit().stream()
+.map(MockCommitRequestImpl::new)
+.collect(Collectors.toList());
+committer.commit(commitRequests);
+
+// We've two steps in checkpoint: 1. snapshotState(ckp); 2. 
notifyCheckpointComplete(ckp).
+// It's possible that flink job will restore from a checkpoint with 
only step#1 finished and
+// step#2 not.
+// CommitterOperator will try to re-commit recovered transactions.
+committer.commit(commitRequests);

Review Comment:
   Thanks for adding this, what about running `insert and commit` many times(in 
a for loop), to simulate more complex situations and situations with compaction?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]

2024-10-17 Thread via GitHub


Zakelly commented on code in PR #25509:
URL: https://github.com/apache/flink/pull/25509#discussion_r1804708194


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataInputStream.java:
##
@@ -31,19 +31,21 @@ public class CompressibleFSDataInputStream extends 
FSDataInputStream {
 
 private final FSDataInputStream delegate;
 private final InputStream compressingDelegate;
+private final boolean compressed;
 
 public CompressibleFSDataInputStream(
 FSDataInputStream delegate, StreamCompressionDecorator 
compressionDecorator)
 throws IOException {
 this.delegate = delegate;
 this.compressingDelegate = 
compressionDecorator.decorateWithCompression(delegate);
+this.compressed = compressionDecorator != 
UncompressedStreamCompressionDecorator.INSTANCE;
 }
 
 @Override
 public void seek(long desired) throws IOException {
-final int available = compressingDelegate.available();
-if (available > 0) {
-if (available != compressingDelegate.skip(available)) {
+if (compressed) {
+final int available = compressingDelegate.available();
+if (available > 0 && available != 
compressingDelegate.skip(available)) {
 throw new IOException("Unable to skip buffered data.");
 }
 }

Review Comment:
   How about moving this part into a new interface. For example, 
`compressingDelegate` is a newly introduced `DecompressingInputStream`, which 
provides a method `clearBuffer`. Here's where the different behaviors go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36326] Fix auto scan newly-added table after restart [flink-cdc]

2024-10-17 Thread via GitHub


lvyanquan commented on PR #3613:
URL: https://github.com/apache/flink-cdc/pull/3613#issuecomment-2419385239

   According to feedback from community users, cherrypicking this pr has indeed 
solved the problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]

2024-10-17 Thread via GitHub


gaborgsomogyi commented on code in PR #25509:
URL: https://github.com/apache/flink/pull/25509#discussion_r1804812888


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataInputStream.java:
##
@@ -31,19 +31,21 @@ public class CompressibleFSDataInputStream extends 
FSDataInputStream {
 
 private final FSDataInputStream delegate;
 private final InputStream compressingDelegate;
+private final boolean compressed;
 
 public CompressibleFSDataInputStream(
 FSDataInputStream delegate, StreamCompressionDecorator 
compressionDecorator)
 throws IOException {
 this.delegate = delegate;
 this.compressingDelegate = 
compressionDecorator.decorateWithCompression(delegate);
+this.compressed = compressionDecorator != 
UncompressedStreamCompressionDecorator.INSTANCE;
 }
 
 @Override
 public void seek(long desired) throws IOException {
-final int available = compressingDelegate.available();
-if (available > 0) {
-if (available != compressingDelegate.skip(available)) {
+if (compressed) {
+final int available = compressingDelegate.available();
+if (available > 0 && available != 
compressingDelegate.skip(available)) {
 throw new IOException("Unable to skip buffered data.");
 }
 }

Review Comment:
   I agree that this area has improvement possibility but I've the following 
reasons not to do that (at least in this PR):
   * This is a blocker and as such I would keep the fix as small as possible 
not to break things
   * The main issue here is that we don't have separate classes for 
uncompressed and compressed `FSDataInputStream`, so if you ask me I would do 
that step. I've tried that already and came to the conclusion that it would 
lead far because this is a pattern in Flink like 
`CompressibleFSDataOutputStream` and all the streams which are in `has-a` 
context instead of being a real decorator around each other. So all in all 
introducing another level of layer as an interface would be overkill here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35268][state] Add ttl interface for Async State API && implement TtlListStateV2/TtlValueStateV2 [flink]

2024-10-17 Thread via GitHub


Zakelly commented on code in PR #25515:
URL: https://github.com/apache/flink/pull/25515#discussion_r1804486595


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java:
##
@@ -59,7 +59,7 @@ public AbstractAggregatingState(
 }
 
 protected StateFuture asyncGetAccumulator() {

Review Comment:
   How about remove this method?



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregatingStateV2.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.ttl;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.state.ttl.TtlStateContext;
+import org.apache.flink.runtime.state.ttl.TtlValue;
+import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps aggregating state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the value added to the state
+ * @param  The type of the accumulator (intermediate aggregate state).
+ * @param  Type of the value extracted from the state
+ */
+class TtlAggregatingStateV2
+extends AbstractTtlStateV2<
+K, N, ACC, TtlValue, InternalAggregatingState, OUT>>
+implements InternalAggregatingState {
+
+TtlAggregatingStateV2(
+TtlStateContext, 
OUT>, ACC>
+ttlStateContext,
+TtlAggregateFunctionV2 aggregateFunction) {
+super(ttlStateContext);
+aggregateFunction.updater = (ttlValue) -> 
original.updateInternal(ttlValue);
+}
+
+@Override
+public StateFuture asyncMergeNamespaces(N target, Collection 
sources) {
+return original.asyncMergeNamespaces(target, sources);
+}
+
+@Override
+public void mergeNamespaces(N target, Collection sources) {
+original.mergeNamespaces(target, sources);
+}
+
+@Override
+public StateFuture asyncGet() {
+return original.asyncGet();
+}
+
+@Override
+public StateFuture asyncAdd(IN value) {
+return original.asyncAdd(value);
+}
+
+@Override
+public OUT get() {
+return original.get();
+}
+
+@Override
+public void add(IN value) {
+original.add(value);
+}
+
+@Override
+public void clear() {
+original.clear();
+}
+
+@Override
+public StateFuture asyncGetInternal() {
+return original.asyncGetInternal()
+.thenApply(ttlValue -> getElementWithTtlCheck(ttlValue, 
original::updateInternal));

Review Comment:
   How about update the value asynchronously when `updateOnRead`?



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/AbstractTtlStateV2.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.ttl;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.state.ttl.AbstractTtlDecorator;
+import org.apache.flink.runtime.state.ttl.TtlStateContext;
+import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
+
+/**
+ * Base class for TTL logic wrappers of state objects. StateV2 does not support
+ * FULL_STATE_SCAN_SNAPSHOT and INCREMENTAL_CLEANUP, only supports 
ROCKSDB_COMPACTION_FILTER.
+ *
+ *

Re: [PR] [FLINK-36564][ci] Running CI in random timezone to expose more time related bugs. [flink-cdc]

2024-10-17 Thread via GitHub


lvyanquan commented on PR #3650:
URL: https://github.com/apache/flink-cdc/pull/3650#issuecomment-2421138664

   @yuxiqian PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35109] Drop support for Flink 1.17 & 1.18 and fix tests for 1.20-SNAPSHOT [flink-connector-kafka]

2024-10-17 Thread via GitHub


ruanhang1993 closed pull request #102: [FLINK-35109] Drop support for Flink 
1.17 & 1.18 and fix tests for 1.20-SNAPSHOT
URL: https://github.com/apache/flink-connector-kafka/pull/102


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLINK-36540.[Runtime / Task] Add Support for Hadoop Caller Context when using Flink to operate hdfs. [flink]

2024-10-17 Thread via GitHub


liangyu-1 commented on PR #25516:
URL: https://github.com/apache/flink/pull/25516#issuecomment-2421163090

   @dmvk @fapaul 
   Hi, would you please help me check this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.18][FLINK-36421] [fs] [checkpoint] Sync outputStream before returning handle in FsCheckpointStreamFactory [flink]

2024-10-17 Thread via GitHub


Zakelly commented on PR #25479:
URL: https://github.com/apache/flink/pull/25479#issuecomment-2421167188

   Will merge this without CI green 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33977][runtime] Adaptive scheduler may not minimize the number of TMs during downscaling [flink]

2024-10-17 Thread via GitHub


1996fanrui commented on PR #25218:
URL: https://github.com/apache/flink/pull/25218#issuecomment-2421167238

   Hey @XComp @ztison , sorry, I'd like to discuss with you again about this PR.
   
   Could we fix the issue for `DefaultSlotAssigner` and `Application Mode` 
first? I prefer to fix it first for several reasons:
   
   - @XComp 's first 
[concern](https://github.com/apache/flink/pull/25218#discussion_r1771187666) is 
this fix conflicts with `execution.state-recovery.from-local`, so it's better 
to be handled in a FLIP.
   - That's why this PR only change the code of `DefaultSlotAssigner` and 
doesn't change any code of `StateLocalitySlotAssigner`.
   - @ztison 's 
[concern](https://github.com/apache/flink/pull/25218#issuecomment-2401913141) 
is this fix conflicts with spreading the workload across as many workers as 
possible.
   - As we discussed before, this concern only exists for session mode. 
That why I'm curious could we fix it for `Application Mode` first.
   - The third reason is most important: the issue that this PR is trying to 
fix is ​​more like a bug than an optimization for `Application Mode` and 
disable `execution.state-recovery.from-local`.
   - The phenomenon of this bug is that TM resources cannot be released 
after scaling down.
   - I believe that flink users use Adaptive Scheduler mainly to scale up 
and scale down quickly or more efficiently.
   - Many users have questions like: why resources can be saved after 
scaling down.
   - This bug is reported to 3 JIRAs: FLINK-33977, FLINK-35594 and 
FLINK-35903.
   - The main reason I wanna discuss with you again is :  one flink user 
[reported this bug 
](https://apache-flink.slack.com/archives/C03G7LJTS2G/p1729167222445569)again 
in the Slack troubleshooting channel, the the reporter cc me in the [next 
thread](https://apache-flink.slack.com/archives/C03G7LJTS2G/p1729167719506889) 
due to I'm the active contributor of autoscaler. (I guess he doesn't know the 
bug or phenomenon is not related to autoscaler, it's related to Adaptive 
Scheduler)
   - It is worth mentioning that as I know @RocMarshal (the developer of 
this PR) doesn't report any jira, because he noticed the issue is reported via 
some JIRAs.
   - It means at least 5 users(From what I have observed, these 5 users 
come from 5 different companies) faced this issue in their production jobs. I’m 
happy to see more and more companies trying out Adaptive Scheduler.
   - The fourth reason: 1.20 is the LTS version for 1.x series.
   - If we think it's a bug, we could fix it in 1.20.x and 2.0.x together.
   - If we think it's an improvement or feature rather than a bug , and 
improve it in a FLIP, it means this issue cannot be fixed in 1.x series.
   - That's why 
[FLIP-461](https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler)
 and 
[FLIP-472](https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states)
 cannot be backported to 1.x series.
   - Actually, I think both of 
[FLIP-461](https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler)
 and 
[FLIP-472](https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states)
  are great improvement for Adaptive Scheduler. Thank you for the great work. ❤️
   - I believe most of users(companies) are not able to maintain the 
internal flink version, and they use the official flink version. If this bug is 
not fixed in 1.x, it may be difficult for Adaptive Scheduler to be used by a 
large number of users in 1.x.
   - Of course, my team maintains our internal flink version. We can easily 
fix it in our production environment. My initiative is mainly to enable most 
flink users to have a better Adaptive Scheduler experience.
   
   Sorry to bother you again. This is definitely my last try. If you think it 
is unreasonable, I can accept it and deal with it in a subsequent FLIP. Thank 
you very much. ❤️


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36564) Running CI in random timezone to expose more time related bugs

2024-10-17 Thread LvYanquan (Jira)
LvYanquan created FLINK-36564:
-

 Summary: Running CI in random timezone  to expose more time 
related bugs
 Key: FLINK-36564
 URL: https://issues.apache.org/jira/browse/FLINK-36564
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.3.0
Reporter: LvYanquan
 Fix For: cdc-3.3.0


Refer to [this 
comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012],
 when running CI, setting random time zone in session can help to expose issues 
that is related to time zone in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36563) Running CI in random timezone to expose more time related bugs

2024-10-17 Thread LvYanquan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LvYanquan closed FLINK-36563.
-
Resolution: Duplicate

> Running CI in random timezone  to expose more time related bugs 
> 
>
> Key: FLINK-36563
> URL: https://issues.apache.org/jira/browse/FLINK-36563
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.3.0
>Reporter: LvYanquan
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Refer to [this 
> comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012],
>  when running CI, setting random time zone in session can help to expose 
> issues that is related to time zone in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36562) Running CI in random timezone to expose more time related bugs

2024-10-17 Thread LvYanquan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LvYanquan closed FLINK-36562.
-
Release Note: Duplicated with FLINK-36564.
  Resolution: Duplicate

> Running CI in random timezone  to expose more time related bugs 
> 
>
> Key: FLINK-36562
> URL: https://issues.apache.org/jira/browse/FLINK-36562
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.3.0
>Reporter: LvYanquan
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Refer to [this 
> comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012],
>  when running CI, setting random time zone in session can help to expose 
> issues that is related to time zone in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36562) Running CI in random timezone to expose more time related bugs

2024-10-17 Thread LvYanquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890773#comment-17890773
 ] 

LvYanquan commented on FLINK-36562:
---

Closed as this is duplicated with FLINK-36564

> Running CI in random timezone  to expose more time related bugs 
> 
>
> Key: FLINK-36562
> URL: https://issues.apache.org/jira/browse/FLINK-36562
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.3.0
>Reporter: LvYanquan
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Refer to [this 
> comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012],
>  when running CI, setting random time zone in session can help to expose 
> issues that is related to time zone in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36474] Support merging timestamp columns when routing [flink-cdc]

2024-10-17 Thread via GitHub


yuxiqian commented on PR #3636:
URL: https://github.com/apache/flink-cdc/pull/3636#issuecomment-2421159128

   Squashed & rebased with `master`. Could @ruanhang1993 please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-36559) [docs]Add flink cdc elasticsearch pipeline sink to docs

2024-10-17 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang closed FLINK-36559.
-
Resolution: Duplicate

Duplicate with FLINK-36052.

> [docs]Add flink cdc elasticsearch pipeline sink to docs
> ---
>
> Key: FLINK-36559
> URL: https://issues.apache.org/jira/browse/FLINK-36559
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36559) [docs]Add flink cdc elasticsearch pipeline sink to docs

2024-10-17 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-36559:
--
Description: Duplicate with 36052.

> [docs]Add flink cdc elasticsearch pipeline sink to docs
> ---
>
> Key: FLINK-36559
> URL: https://issues.apache.org/jira/browse/FLINK-36559
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>  Labels: pull-request-available
>
> Duplicate with 36052.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36052) add elasticsearch.md for elasticsearch pipeline connector

2024-10-17 Thread JunboWang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890794#comment-17890794
 ] 

JunboWang commented on FLINK-36052:
---

Could we add Component/s as FLINK-CDC?

> add elasticsearch.md for elasticsearch pipeline connector
> -
>
> Key: FLINK-36052
> URL: https://issues.apache.org/jira/browse/FLINK-36052
> Project: Flink
>  Issue Type: Improvement
>Reporter: wuzexian
>Assignee: wuzexian
>Priority: Major
>  Labels: pull-request-available
>
> add elasticsearch.md for elasticsearch pipeline connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36541) Occasional met commit conflict problem in PaimonSink.

2024-10-17 Thread LvYanquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890800#comment-17890800
 ] 

LvYanquan commented on FLINK-36541:
---

I would like to try to fix it.

> Occasional met commit conflict problem in PaimonSink.
> -
>
> Key: FLINK-36541
> URL: https://issues.apache.org/jira/browse/FLINK-36541
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.3.0
>Reporter: LvYanquan
>Priority: Major
> Fix For: cdc-3.3.0
>
> Attachments: image-2024-10-15-20-36-03-495.png
>
>
> When writing records to Paimon, we occasional met commit conflict problem 
> even if Parallelism is 1.
> It turns out that PaimonWriter preparecommit Committable with a checkpointId 
> which is always 1, as the following code:
> [https://github.com/apache/flink-cdc/blob/a1781f432d906fa2a864642a5f74ac5bdc963d9c/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java#L164-L178]
> !image-2024-10-15-20-36-03-495.png!
> This is actually an incorrect usage, we must pass in an increasing ID to 
> ensure the correctness of the snapshot information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36052) add elasticsearch.md for elasticsearch pipeline connector

2024-10-17 Thread JunboWang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890794#comment-17890794
 ] 

JunboWang edited comment on FLINK-36052 at 10/18/24 5:49 AM:
-

Could we set Component/s as FLINK-CDC?


was (Author: JIRAUSER305453):
Could we add Component/s as FLINK-CDC?

> add elasticsearch.md for elasticsearch pipeline connector
> -
>
> Key: FLINK-36052
> URL: https://issues.apache.org/jira/browse/FLINK-36052
> Project: Flink
>  Issue Type: Improvement
>Reporter: wuzexian
>Assignee: wuzexian
>Priority: Major
>  Labels: pull-request-available
>
> add elasticsearch.md for elasticsearch pipeline connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36559) [docs]Add flink cdc elasticsearch pipeline sink to docs

2024-10-17 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-36559:
--
Description: (was: Duplicate with 36052.)

> [docs]Add flink cdc elasticsearch pipeline sink to docs
> ---
>
> Key: FLINK-36559
> URL: https://issues.apache.org/jira/browse/FLINK-36559
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36066][runtime] Introducing the AdaptiveGraphManager component [flink]

2024-10-17 Thread via GitHub


JunRuiLee commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1805761682


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/StreamNodeForwardGroup.java:
##
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.forwardgroup;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class StreamNodeForwardGroup {

Review Comment:
   JavaDoc is required



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java:
##
@@ -85,6 +88,10 @@ public ResultPartitionType getResultType() {
 return resultType;
 }
 
+public List getConsumerStreamEdges() {

Review Comment:
   getOutputStreamEdges



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/StreamNodeForwardGroup.java:
##
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.forwardgroup;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class StreamNodeForwardGroup {

Review Comment:
   Could we introduce an interface named ForwardGroup and rename the original 
ForwardGroup class to JobVerticesForwardGroup? The JobVerticesForwardGroup and 
StreamNodeForwardGroup class would then implement this interface.



##
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java:
##
@@ -31,4 +31,12 @@ public interface StreamGraphHasher {
  * didn't change.
  */
 Map traverseStreamGraphAndGenerateHashes(StreamGraph 
streamGraph);
+
+/**
+ * Generates a hash for the specified {@link StreamNode} within the {@link 
StreamGraph}. This
+ * hash is stored in the provided map and can be used to uniquely identify 
the {@link
+ * StreamNode} across job submissions, assuming its configuration remains 
unchanged.
+ */
+boolean generateHashesByStreamNode(
+StreamNode streamNode, StreamGraph streamGraph, Map hashes);

Review Comment:
   int streamNodeId, StreamGraph streamGraph, Map hashes



##
f

[jira] [Created] (FLINK-36562) Running CI in random timezone to expose more time related bugs

2024-10-17 Thread LvYanquan (Jira)
LvYanquan created FLINK-36562:
-

 Summary: Running CI in random timezone  to expose more time 
related bugs 
 Key: FLINK-36562
 URL: https://issues.apache.org/jira/browse/FLINK-36562
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.3.0
Reporter: LvYanquan
 Fix For: cdc-3.3.0


Refer to [this 
comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012],
 when running CI, setting random time zone in session can help to expose issues 
that is related to time zone in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36563) Running CI in random timezone to expose more time related bugs

2024-10-17 Thread LvYanquan (Jira)
LvYanquan created FLINK-36563:
-

 Summary: Running CI in random timezone  to expose more time 
related bugs 
 Key: FLINK-36563
 URL: https://issues.apache.org/jira/browse/FLINK-36563
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.3.0
Reporter: LvYanquan
 Fix For: cdc-3.3.0


Refer to [this 
comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012],
 when running CI, setting random time zone in session can help to expose issues 
that is related to time zone in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36565) Pipeline YAML should allow merging decimal with different precisions

2024-10-17 Thread yux (Jira)
yux created FLINK-36565:
---

 Summary: Pipeline YAML should allow merging decimal with different 
precisions
 Key: FLINK-36565
 URL: https://issues.apache.org/jira/browse/FLINK-36565
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, it's not possible to merge two Decimal-typed fields with different 
precision or scaling. Since DECIMAL(p1, s1) and DECIMAL(p2, s2) could be 
converted to DECIMAL(MAX(p1 - s1, p2 - s2) + MAX(s1, s2), MAX(s1, s2) without 
any loss, this converting path seems reasonable and worth being added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]

2024-10-17 Thread via GitHub


Zakelly commented on code in PR #25509:
URL: https://github.com/apache/flink/pull/25509#discussion_r1805752878


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataInputStream.java:
##
@@ -31,19 +31,21 @@ public class CompressibleFSDataInputStream extends 
FSDataInputStream {
 
 private final FSDataInputStream delegate;
 private final InputStream compressingDelegate;
+private final boolean compressed;
 
 public CompressibleFSDataInputStream(
 FSDataInputStream delegate, StreamCompressionDecorator 
compressionDecorator)
 throws IOException {
 this.delegate = delegate;
 this.compressingDelegate = 
compressionDecorator.decorateWithCompression(delegate);
+this.compressed = compressionDecorator != 
UncompressedStreamCompressionDecorator.INSTANCE;
 }
 
 @Override
 public void seek(long desired) throws IOException {
-final int available = compressingDelegate.available();
-if (available > 0) {
-if (available != compressingDelegate.skip(available)) {
+if (compressed) {
+final int available = compressingDelegate.available();
+if (available > 0 && available != 
compressingDelegate.skip(available)) {
 throw new IOException("Unable to skip buffered data.");
 }
 }

Review Comment:
   Separate classes uncompressed and compressed `FSDataInputStream` sounds good 
to me. Agree to consider this later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36563) Running CI in random timezone to expose more time related bugs

2024-10-17 Thread LvYanquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890772#comment-17890772
 ] 

LvYanquan commented on FLINK-36563:
---

Closed as this is duplicated with FLINK-36564.

> Running CI in random timezone  to expose more time related bugs 
> 
>
> Key: FLINK-36563
> URL: https://issues.apache.org/jira/browse/FLINK-36563
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.3.0
>Reporter: LvYanquan
>Priority: Major
> Fix For: cdc-3.3.0
>
>
> Refer to [this 
> comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012],
>  when running CI, setting random time zone in session can help to expose 
> issues that is related to time zone in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36566) Code optimization: always identify DataChangeEvent before SchemaChangeEvent in Operator

2024-10-17 Thread LvYanquan (Jira)
LvYanquan created FLINK-36566:
-

 Summary: Code optimization: always identify DataChangeEvent before 
SchemaChangeEvent in Operator
 Key: FLINK-36566
 URL: https://issues.apache.org/jira/browse/FLINK-36566
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.3.0
Reporter: LvYanquan
 Fix For: cdc-3.3.0


In a data flow system, the number of DataChangeEvents is always much larger 
than that of SchemaChangeEvents. If we always identify DataChangeEvents first 
(such as using an `instance of` judgment), it can reduce a lot of judgment 
logic and improve the performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36565] Route module allows merging Decimals with various precisions [flink-cdc]

2024-10-17 Thread via GitHub


yuxiqian commented on PR #3651:
URL: https://github.com/apache/flink-cdc/pull/3651#issuecomment-2421296255

   Could @melin please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36565] Route module allows merging Decimals with various precisions [flink-cdc]

2024-10-17 Thread via GitHub


yuxiqian opened a new pull request, #3651:
URL: https://github.com/apache/flink-cdc/pull/3651

   This closes FLINK-36565.
   
   Currently, table-merging router does not allow coercing `Decimal` fields 
with different precisions from upstream, which means some lossless conversions 
are actually not possible, including:
   
   * `TINYINT`, `SMALLINT`, `INT`, `BIGINT` to `DECIMAL(p > 17, s >= 0)`
   * low precision `DECIMAL` to high precision ones
   
   ---
   
   I've baked the changes in #3636 (FLINK-36474) into this PR since there are 
some common changes between them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36565) Pipeline YAML should allow merging decimal with different precisions

2024-10-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36565:
---
Labels: pull-request-available  (was: )

> Pipeline YAML should allow merging decimal with different precisions
> 
>
> Key: FLINK-36565
> URL: https://issues.apache.org/jira/browse/FLINK-36565
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>  Labels: pull-request-available
>
> Currently, it's not possible to merge two Decimal-typed fields with different 
> precision or scaling. Since DECIMAL(p1, s1) and DECIMAL(p2, s2) could be 
> converted to DECIMAL(MAX(p1 - s1, p2 - s2) + MAX(s1, s2), MAX(s1, s2) without 
> any loss, this converting path seems reasonable and worth being added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36564) Running CI in random timezone to expose more time related bugs

2024-10-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36564:
---
Labels: pull-request-available  (was: )

> Running CI in random timezone  to expose more time related bugs
> ---
>
> Key: FLINK-36564
> URL: https://issues.apache.org/jira/browse/FLINK-36564
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.3.0
>Reporter: LvYanquan
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.3.0
>
>
> Refer to [this 
> comment|https://github.com/apache/flink-cdc/pull/3648#pullrequestreview-2374769012],
>  when running CI, setting random time zone in session can help to expose 
> issues that is related to time zone in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]

2024-10-17 Thread via GitHub


Zakelly commented on code in PR #25501:
URL: https://github.com/apache/flink/pull/25501#discussion_r1805798857


##
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java:
##
@@ -220,7 +220,7 @@ public void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
 isUsingCustomRawKeyedState());
 
 stateHandler = new StreamOperatorStateHandler(context, 
getExecutionConfig(), cancelables);
-timeServiceManager = context.internalTimerServiceManager();
+timeServiceManager = context.asyncInternalTimerServiceManager();

Review Comment:
   I think this is not true. We'd better keep 
`context.internalTimerServiceManager` here, and re-assign `timeServiceManager` 
in `AbstractAsyncStateStreamOperatorV2` with `asyncInternalTimerServiceManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36567) Planner module didn't use the setting from flink-conf.yaml

2024-10-17 Thread liting liu (Jira)
liting liu created FLINK-36567:
--

 Summary: Planner module didn't use the setting from flink-conf.yaml
 Key: FLINK-36567
 URL: https://issues.apache.org/jira/browse/FLINK-36567
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.18.1
Reporter: liting liu


I found the flink-table-planner_*.jar was generated in the /tmp dir, event 
though the conf 
`io.tmp.dirs` has been set to `/opt`. 

See jobmanger's log:
2024-10-17 08:52:30,330 INFO org.apache.flink.configuration.GlobalConfiguration 
[] - Loading configuration property: io.tmp.dirs, /opt

The code related should be 
```
org.apache.flink.table.planner.loader.PlannerModule#PlannerModule


private PlannerModule() {
try {
final ClassLoader flinkClassLoader = PlannerModule.class.getClassLoader();

final Path tmpDirectory =
Paths.get(ConfigurationUtils.parseTempDirectories(new Configuration())[0]);
```
The PlannerModule creates a new Configuration instead of using the values from 
the configFile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]

2024-10-17 Thread via GitHub


gaborgsomogyi commented on PR #25509:
URL: https://github.com/apache/flink/pull/25509#issuecomment-2419716967

   > Could you reproduce this issue with state compression enabled?
   
   I've slightly touched compressed state and yeah, seen either slowness and/or 
huge amount of re-opens. 
   The number of re-opens are controlled by (which I've played with to reduce 
re-opens):
   * `fs.s3a.readahead.range`
   * `fs.s3a.input.async.drain.threshold`
   
   Plus tried to enable pre-fetching via `fs.s3a.prefetch.enabled` to do 
everything in memory to gain some speed.
   
   None of them helped. My finding is that `skip` in case of S3 is reading the 
data into a buffer and then just drop it. I've not gone through the whole 
snappy chain but assumed the same happens with decompression at top.
   
   Not sure from where the mentioned `4096 bytes` buffer size is coming from 
but having as many re-opens as many elements are the list is also something 
which is not optimal🙂
   Are you saying that the exact same state data with default S3 Hadoop configs 
is slow uncompressed and fast compressed? That would be better case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 [flink]

2024-10-17 Thread via GitHub


ferenc-csaky commented on code in PR #25494:
URL: https://github.com/apache/flink/pull/25494#discussion_r1804959664


##
flink-rpc/flink-rpc-akka/pom.xml:
##
@@ -94,8 +94,8 @@ under the License.


io.netty
-   netty
-   3.10.6.Final
+   netty-all

Review Comment:
   Ah yeah, thanks for the detailed answer, quite some time ago I started to 
create a POC to migrate to Artery but forgot the whole context since then. :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36510][rpc] bump pekko to 1.1.2, remove netty 3 [flink]

2024-10-17 Thread via GitHub


ferenc-csaky commented on PR #25494:
URL: https://github.com/apache/flink/pull/25494#issuecomment-2419808148

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]

2024-10-17 Thread via GitHub


AHeise merged PR #757:
URL: https://github.com/apache/flink-web/pull/757


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33696][metrics] Add OpenTelemetryMetricReporter and OpenTelemetryTraceReporter [flink]

2024-10-17 Thread via GitHub


pnowojski commented on PR #25539:
URL: https://github.com/apache/flink/pull/25539#issuecomment-2419944369

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36356) HadoopRecoverableWriterTest.testRecoverWithState due to IOException

2024-10-17 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890500#comment-17890500
 ] 

Piotr Nowojski commented on FLINK-36356:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=63129&view=logs&j=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819&t=2dd510a3-5041-5201-6dc3-54d310f68906

> HadoopRecoverableWriterTest.testRecoverWithState due to IOException
> ---
>
> Key: FLINK-36356
> URL: https://issues.apache.org/jira/browse/FLINK-36356
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 2.0-preview
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=62378&view=logs&j=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819&t=2dd510a3-5041-5201-6dc3-54d310f68906&l=10514
> {code}
> Sep 23 07:55:16 07:55:16.451 [ERROR] Tests run: 12, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 20.05 s <<< FAILURE! -- in 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriterTest
> Sep 23 07:55:16 07:55:16.451 [ERROR] 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriterTest.testRecoverWithState
>  -- Time elapsed: 2.694 s <<< ERROR!
> Sep 23 07:55:16 java.io.IOException: All datanodes 
> [DatanodeInfoWithStorage[127.0.0.1:45240,DS-13a30476-dff5-4f3a-88b1-887571521a95,DISK]]
>  are bad. Aborting...
> Sep 23 07:55:16   at 
> org.apache.hadoop.hdfs.DataStreamer.handleBadDatanode(DataStreamer.java:1537)
> Sep 23 07:55:16   at 
> org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1472)
> Sep 23 07:55:16   at 
> org.apache.hadoop.hdfs.DataStreamer.processDatanodeError(DataStreamer.java:1244)
> Sep 23 07:55:16   at 
> org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:663)
> {code}
> The Maven logs reveal a bit more (I attached the extract of the failed build):
> {code}
> 07:55:13,491 [DataXceiver for client DFSClient_NONMAPREDUCE_211593080_35 at 
> /127.0.0.1:59360 [Receiving block 
> BP-289839883-172.27.0.2-1727078098659:blk_1073741832_1016]] ERROR 
> org.apache.hadoop.hdfs.server.datanode.DataNode  [] - 
> 127.0.0.1:46429:DataXceiver error processing WRITE_BLOCK operation  src: 
> /127.0.0.1:59360 dst: /127.0.0.1:46429
> java.nio.channels.ClosedByInterruptException: null
> at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
>  ~[?:1.8.0_292]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:406) 
> ~[?:1.8.0_292]
> at 
> org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
>  ~[hadoop-common-2.10.2.jar:?]
> at 
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) 
> ~[hadoop-common-2.10.2.jar:?]
> at 
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) 
> ~[hadoop-common-2.10.2.jar:?]
> at 
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) 
> ~[hadoop-common-2.10.2.jar:?]
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) 
> ~[?:1.8.0_292]
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) 
> ~[?:1.8.0_292]
> at java.io.BufferedInputStream.read(BufferedInputStream.java:345) 
> ~[?:1.8.0_292]
> at java.io.DataInputStream.read(DataInputStream.java:149) 
> ~[?:1.8.0_292]
> at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:209) 
> ~[hadoop-common-2.10.2.jar:?]
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:211)
>  ~[hadoop-hdfs-client-2.10.2.jar:?]
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
>  ~[hadoop-hdfs-client-2.10.2.jar:?]
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
>  ~[hadoop-hdfs-client-2.10.2.jar:?]
> at 
> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:528)
>  ~[hadoop-hdfs-2.10.2.jar:?]
> at 
> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:968)
>  ~[hadoop-hdfs-2.10.2.jar:?]
> at 
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:877)
>  ~[hadoop-hdfs-2.10.2.jar:?]
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:166)
>  ~[hadoop-hdfs-2.10.2.jar:?]
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:103)
>  ~[hadoop-hdfs-2.10.2.jar:?]
> at 
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataX

[jira] [Updated] (FLINK-36561) ResultSet.wasNull() does not reflect null values in Flink JDBC Driver

2024-10-17 Thread Ilya Soin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Soin updated FLINK-36561:
--
Language:   (was: jav)

> ResultSet.wasNull() does not reflect null values in Flink JDBC Driver
> -
>
> Key: FLINK-36561
> URL: https://issues.apache.org/jira/browse/FLINK-36561
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / JDBC
>Affects Versions: 1.18.1, 1.20.0, 1.19.1
>Reporter: Ilya Soin
>Priority: Major
>
> As per JDBC 
> [standard|https://docs.oracle.com/en/java/javase/17/docs/api/java.sql/java/sql/ResultSet.html#wasNull()],
>  {{ResultSet.wasNull()}}
> {quote}Reports whether the last column read had a value of SQL NULL. Note 
> that you must first call one of the getter methods on a column to try to read 
> its value and then call the method wasNull to see if the value read was SQL 
> NULL.
> {quote}
> However, Flink JDBC driver currently does not update the {{wasNull}} flag 
> within the {{FlinkResultSet.get*()}} methods. Instead, it only sets this flag 
> during [iteration over 
> rows|https://github.com/apache/flink/blob/release-2.0-preview1-rc1/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java#L106]
>  fetched from the gateway endpoint. This behavior leads to {{wasNull}} 
> returning true only if the entire row is null, not when individual column 
> values are null. Consequently, reading a null value using 
> {{FlinkResultSet.get*()}} incorrectly results in {{wasNull()}} returning 
> false, which is not compliant with the JDBC specification.
> h4. Proposed solution
> Check if the underlying value accessed with {{FlinkResultSet.get*()}} method 
> is null, and update wasNull accordingly.
> h4. For discussion
> Can we skip null rows in FlinkResultSet.next()?
> h4. Steps to reproduce:
> Add
> {code:java}
> assertTrue(resultSet.wasNull());
> {code}
> after any call to resultSet.get*() in 
> [testStringResultSetNullData()|https://github.com/apache/flink/blob/release-2.0-preview1-rc1/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java#L115].
>  Run the test and see the failed check.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36561) ResultSet.wasNull() does not reflect null values in Flink JDBC Driver

2024-10-17 Thread Ilya Soin (Jira)
Ilya Soin created FLINK-36561:
-

 Summary: ResultSet.wasNull() does not reflect null values in Flink 
JDBC Driver
 Key: FLINK-36561
 URL: https://issues.apache.org/jira/browse/FLINK-36561
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / JDBC
Affects Versions: 1.19.1, 1.20.0, 1.18.1
Reporter: Ilya Soin


As per JDBC 
[standard|https://docs.oracle.com/en/java/javase/17/docs/api/java.sql/java/sql/ResultSet.html#wasNull()],
 {{ResultSet.wasNull()}}
{quote}Reports whether the last column read had a value of SQL NULL. Note that 
you must first call one of the getter methods on a column to try to read its 
value and then call the method wasNull to see if the value read was SQL NULL.
{quote}
However, Flink JDBC driver currently does not update the {{wasNull}} flag 
within the {{FlinkResultSet.get*()}} methods. Instead, it only sets this flag 
during [iteration over 
rows|https://github.com/apache/flink/blob/release-2.0-preview1-rc1/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java#L106]
 fetched from the gateway endpoint. This behavior leads to {{wasNull}} 
returning true only if the entire row is null, not when individual column 
values are null. Consequently, reading a null value using 
{{FlinkResultSet.get*()}} incorrectly results in {{wasNull()}} returning false, 
which is not compliant with the JDBC specification.
h4. Proposed solution

Check if the underlying value accessed with {{FlinkResultSet.get*()}} method is 
null, and update wasNull accordingly.
h4. For discussion

Can we skip null rows in FlinkResultSet.next()?
h4. Steps to reproduce:

Add
{code:java}
assertTrue(resultSet.wasNull());
{code}
after any call to resultSet.get*() in 
[testStringResultSetNullData()|https://github.com/apache/flink/blob/release-2.0-preview1-rc1/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java#L115].
 Run the test and see the failed check.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36530][state] Fix S3 performance issue with uncompressed state restore [flink]

2024-10-17 Thread via GitHub


mateczagany commented on PR #25509:
URL: https://github.com/apache/flink/pull/25509#issuecomment-2420140089

   > Are you saying that the exact same state data with default S3 Hadoop 
configs is slow uncompressed and fast compressed? That would be better case.
   
   Yes, that's correct, I did not tune any S3 settings. The job was the same, 
compressed state size was 290 MB, uncompressed one was 11 MB. The recovery of 
uncompressed state resulted in one S3 `GET` after each `read()` after `skip()` 
was called, while the compressed data did not. 
   
   It was a ListState of Strings, and for the compressed data I even induced an 
artificial `skip()` for each element by reading 1 less byte in 
`StringValue#read()`, and none of the `skip()` or `read()` calls resulted in 
any new S3 `GET` queries.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org