Re: [PR] [FLINK-29741][scala] Delete flink-scala and flink-streaming-scala modules [flink]

2024-09-25 Thread via GitHub


reswqa merged PR #25393:
URL: https://github.com/apache/flink/pull/25393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36375) Missing default value in AddColumnEvent

2024-09-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36375:
---
Labels: pull-request-available  (was: )

> Missing default value in AddColumnEvent
> ---
>
> Key: FLINK-36375
> URL: https://issues.apache.org/jira/browse/FLINK-36375
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: linqigeng
>Priority: Major
>  Labels: pull-request-available
>
> SchemaOperator receives an AddColumnEvent, the default value will be lost 
> when sent downstream.
> !https://static.dingtalk.com/media/lQLPJwNB2rjdYf_Mxc0Gi7B9jRZh-ndCUAbcobnpdTcA_1675_197.png|width=1675,height=197!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent [flink-cdc]

2024-09-25 Thread via GitHub


qg-lin commented on PR #3622:
URL: https://github.com/apache/flink-cdc/pull/3622#issuecomment-2375891374

   @lvyanquan @yuxiqian PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-36374) Bundle forst statebackend in flink-dist and provide shortcut to enable

2024-09-25 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-36374:
---

Assignee: Zakelly Lan

> Bundle forst statebackend in flink-dist and provide shortcut to enable
> --
>
> Key: FLINK-36374
> URL: https://issues.apache.org/jira/browse/FLINK-36374
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>
> Currently, the forst statebackend are built under flink-statebackend-forst, 
> but is not included in the flink-dist jar. It is better to provide a same 
> distribution way like rocksdb.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36374) Bundle forst statebackend in flink-dist and provide shortcut to enable

2024-09-25 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-36374:
---

 Summary: Bundle forst statebackend in flink-dist and provide 
shortcut to enable
 Key: FLINK-36374
 URL: https://issues.apache.org/jira/browse/FLINK-36374
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan


Currently, the forst statebackend are built under flink-statebackend-forst, but 
is not included in the flink-dist jar. It is better to provide a same 
distribution way like rocksdb.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [minor][cdc-connector][mongodb] Add `op_type` metadata column [flink-cdc]

2024-09-25 Thread via GitHub


qg-lin commented on PR #3615:
URL: https://github.com/apache/flink-cdc/pull/3615#issuecomment-2375852444

   Since DeduplicateFunctionHelper#processLastRowOnChangelog returns the row 
before deletion, the `delete` type is temporarily missing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36375) Missing default value in AddColumnEvent

2024-09-25 Thread linqigeng (Jira)
linqigeng created FLINK-36375:
-

 Summary: Missing default value in AddColumnEvent
 Key: FLINK-36375
 URL: https://issues.apache.org/jira/browse/FLINK-36375
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: linqigeng


SchemaOperator receives an AddColumnEvent, the default value will be lost when 
sent downstream.

!https://static.dingtalk.com/media/lQLPJwNB2rjdYf_Mxc0Gi7B9jRZh-ndCUAbcobnpdTcA_1675_197.png|width=1675,height=197!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36374] Bundle forst statebackend in flink-dist and provide shortcut to enable [flink]

2024-09-25 Thread via GitHub


Zakelly opened a new pull request, #25402:
URL: https://github.com/apache/flink/pull/25402

   ## What is the purpose of the change
   
   Currently, the forst statebackend are built under 
`flink-statebackend-forst`, but is not included in the `flink-dist`'s jar. This 
PR make forst statebackend bundled in `flink-dist`, and provide a shortcut to 
load forst. 
   
   ## Brief change log
   
- Add forst in `flink-dist`'s pom file
- Add shortcut in `StateBackendLoader`. 
   
   
   ## Verifying this change
   
   Modified `ForStStateBackendFactoryTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36374) Bundle forst statebackend in flink-dist and provide shortcut to enable

2024-09-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36374:
---
Labels: pull-request-available  (was: )

> Bundle forst statebackend in flink-dist and provide shortcut to enable
> --
>
> Key: FLINK-36374
> URL: https://issues.apache.org/jira/browse/FLINK-36374
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the forst statebackend are built under flink-statebackend-forst, 
> but is not included in the flink-dist jar. It is better to provide a same 
> distribution way like rocksdb.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36374][state/forst] Bundle forst statebackend in flink-dist and provide shortcut to enable [flink]

2024-09-25 Thread via GitHub


flinkbot commented on PR #25402:
URL: https://github.com/apache/flink/pull/25402#issuecomment-2375878995

   
   ## CI report:
   
   * 3b63165442e17ced06c405d00f575fa02307cb2d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Flink 36245] Remove legacy SourceFunction / SinkFunction / Sink V1 API in 2.0 [flink]

2024-09-25 Thread via GitHub


lvyanquan commented on PR #25331:
URL: https://github.com/apache/flink/pull/25331#issuecomment-2375887294

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent [flink-cdc]

2024-09-25 Thread via GitHub


qg-lin opened a new pull request, #3622:
URL: https://github.com/apache/flink-cdc/pull/3622

   SchemaOperator receives an AddColumnEvent, the default value will be lost 
when sent downstream.
   JIRA: [https://issues.apache.org/jira/browse/FLINK-36375](url)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36375) Missing default value in AddColumnEvent/RenameColumnEvent

2024-09-25 Thread linqigeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

linqigeng updated FLINK-36375:
--
Description: 
SchemaOperator receives an AddColumnEvent or RenameColumnEvent, the default 
value will be lost when sent downstream.

!https://static.dingtalk.com/media/lQLPJwNB2rjdYf_Mxc0Gi7B9jRZh-ndCUAbcobnpdTcA_1675_197.png|width=1675,height=197!

  was:
SchemaOperator receives an AddColumnEvent, the default value will be lost when 
sent downstream.

!https://static.dingtalk.com/media/lQLPJwNB2rjdYf_Mxc0Gi7B9jRZh-ndCUAbcobnpdTcA_1675_197.png|width=1675,height=197!

Summary: Missing default value in AddColumnEvent/RenameColumnEvent  
(was: Missing default value in AddColumnEvent)

> Missing default value in AddColumnEvent/RenameColumnEvent
> -
>
> Key: FLINK-36375
> URL: https://issues.apache.org/jira/browse/FLINK-36375
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: linqigeng
>Priority: Major
>  Labels: pull-request-available
>
> SchemaOperator receives an AddColumnEvent or RenameColumnEvent, the default 
> value will be lost when sent downstream.
> !https://static.dingtalk.com/media/lQLPJwNB2rjdYf_Mxc0Gi7B9jRZh-ndCUAbcobnpdTcA_1675_197.png|width=1675,height=197!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35709][table-planner] Allow reordering source columns in CTAS and RTAS [flink]

2024-09-25 Thread via GitHub


spena opened a new pull request, #25401:
URL: https://github.com/apache/flink/pull/25401

   ## What is the purpose of the change
   
   This PR allows you to use CTAS to reorder the columns defined in the 
`SELECT` part by specifying all column names without data types in the `CREATE` 
part. This feature is equivalent to the `INSERT INTO` statement.
   
   Example:
   
   ```sql
   CREATE TABLE my_ctas_table (
   order_time, price, quantity, id
   ) WITH (
   'connector' = 'kafka',
   ...
   ) AS SELECT id, price, quantity, order_time FROM source_table;
   ```
   
   The resulting table `my_ctas_table` will be equivalent to create the 
following table and insert the data with the following statement:
   
   ```
   CREATE TABLE my_ctas_table (
   order_time TIMESTAMP(3),
   price DOUBLE,
   quantity DOUBLE,
   id BIGINT
   ) WITH (
   'connector' = 'kafka',
   ...
   );
   
   INSERT INTO my_ctas_table (order_time, price, quantity, id)
   SELECT id, price, quantity, order_time FROM source_table;
   ```
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - Added unit tests and integration tests to verify the feature
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? Updated the `create.md` 
documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35709][table-planner] Allow reordering source columns in CTAS and RTAS [flink]

2024-09-25 Thread via GitHub


flinkbot commented on PR #25401:
URL: https://github.com/apache/flink/pull/25401#issuecomment-2375351231

   
   ## CI report:
   
   * dfb06e8b80b48a4c25a0c50a1fb3841426ccb7e5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36292][Connectors/Common]: remove timeout to avoid timeout exception [flink]

2024-09-25 Thread via GitHub


showuon commented on code in PR #25371:
URL: https://github.com/apache/flink/pull/25371#discussion_r1776302945


##
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java:
##
@@ -71,7 +71,7 @@ public void testCloseFetcherWithException() throws Exception {
 .hasRootCauseMessage("Artificial exception on closing the 
split reader.");
 }
 
-@Test(timeout = 3)
+@Test

Review Comment:
   Sorry @becketqin , I misunderstood your comment. I thought you were asking 
me to change this:
   ```
   @Test(timeout = Long.MAX_VALUE)
   ```
   
   My bad! Just read it again, I understand what you said now. We should set 
the max timeout on the `close` method, otherwise, it will only log a warning 
and the test passes. So everything makes sense now. Just updated the PR. Please 
take a look again. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30614][serializer] Remove old method of resolving schema compa… [flink]

2024-09-25 Thread via GitHub


Zakelly commented on PR #25384:
URL: https://github.com/apache/flink/pull/25384#issuecomment-2375806647

   I'd suggest removing `GenericArraySerializerConfigSnapshot` and 
`EitherSerializerSnapshot` with another commit by this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36372) Can not insert data to sink with reserved keywords column name

2024-09-25 Thread Tan Nguyen (Jira)
Tan Nguyen created FLINK-36372:
--

 Summary: Can not insert data to sink with reserved keywords column 
name
 Key: FLINK-36372
 URL: https://issues.apache.org/jira/browse/FLINK-36372
 Project: Flink
  Issue Type: Bug
Reporter: Tan Nguyen
 Attachments: image-2024-09-26-10-33-37-134.png, 
image-2024-09-26-10-40-07-605.png, image-2024-09-26-10-41-10-327.png, 
image-2024-09-26-10-48-11-315.png

Hi teams,

I created a flink application running as batch mode, we read data from kafka 
then we run the query transform data from sql file. Finally execute insert 
query from sql file to sink data. Here the insert query:

!image-2024-09-26-10-33-37-134.png!

Even I use the backticks to escape the reserved keywords, still failed, here 
the error:
{code:java}
// code placeholder
java.lang.RuntimeException: Writing records to JDBC failed.at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:265)at
 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:70)at
 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)at
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:115)at
 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)at
 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1062)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:930)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:813)at
 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)at
 org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)at 
java.base/java.lang.Thread.run(Thread.java:829)Caused by: java.io.IOException: 
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO 
native_program_domain.member_program_aux_data(id, member_program_id, 
program_provider_id, enrollment_event_type, cancellation_reason, 
member_start_date, house_hold_id, enrollment_source, plan_contract_id, 
protocol, enrollment_priority, member_disenroll_date, pbp, hras_needed, 
hra_due_date, completion_status, hra_completion_method, 
hra_completion_description, enrollment_lob, clock_start_date, csp_group_id, 
csp_plan_id, group, segment_id, division, created_by, created_date, 
last_modified_by, last_modified_date) VALUES 
(('3319eb7d-e247-4eec-9c3e-e0ba9ee5b7aa'), 
('5e4fd136-2b14-3b63-9b9c-e3c1af424c55'), (NULL), ('R'), (NULL), ('20140608'), 
(NULL), (NULL), ('CONT-8'), ('PR01'), (NULL), (NULL), ('PBP-0008'), 
('FALSE'::boolean), (NULL), (NULL), (NULL), ('NextGen'), (NULL), (NULL), 
(NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL)) ON 
CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, 
member_program_id=EXCLUDED.member_program_id, 
program_provider_id=EXCLUDED.program_provider_id, 
enrollment_event_type=EXCLUDED.enrollment_event_type, 
cancellation_reason=EXCLUDED.cancellation_reason, 
member_start_date=EXCLUDED.member_start_date, 
house_hold_id=EXCLUDED.house_hold_id, 
enrollment_source=EXCLUDED.enrollment_source, 
plan_contract_id=EXCLUDED.plan_contract_id, protocol=EXCLUDED.protocol, 
enrollment_priority=EXCLUDED.enrollment_priority, 
member_disenroll_date=EXCLUDED.member_disenroll_date, pbp=EXCLUDED.pbp, 
hras_needed=EXCLUDED.hras_needed, hra_due_date=EXCLUDED.hra_due_date, 
completion_status=EXCLUDED.completion_status, 
hra_completion_method=EXCLUDED.hra_completion_method, 
hra_completion_description=EXCLUDED.hra_completion_description, 
enrollment_lob=EXCLUDED.enrollment_lob, 
clock_start_date=EXCLUDED.clock_start_date, csp_group_id=EXCLUDED.csp_group_id, 
csp_plan_id=EXCLUDED.csp_plan_id, group=EXCLUDED.group, 
segment_id=EXCLUDED.segment_id, division=EXCLUDED.division, 
created_by=EXCLUDED.created_by, created_date=EXCLUDED.created_date, 
last_modified_by=EXCLUDED.last_modified_by, 
last_modified_date=EXCLUDED.last_modified_date was aborted: ERROR: syntax error 
at or near "group"Position: 435 Call getNextException to see other errors in 
the batch.at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)at
 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:262)...
 13 moreCaused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO 
native_program_domain.member_program_aux_data(id, member_program_id, 
program_provider_id, enrollment_event_type, cancellation_reason, 
member_start_date, house_hold_id, enrollment_source, plan_contract_id, 
protocol, enr

[jira] [Updated] (FLINK-36372) Can not insert data to sink with reserved keywords column name

2024-09-25 Thread Tan Nguyen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tan Nguyen updated FLINK-36372:
---
Description: 
Hi teams,

I created a flink application running as batch mode, we read data from kafka 
then we run the query transform data from sql file. Finally execute insert 
query from sql file to sink data. Here the insert query:

!image-2024-09-26-10-33-37-134.png!

Even I use the backticks to escape the reserved keywords, still failed, here 
the error:
{code:java}
// code placeholder
java.lang.RuntimeException: Writing records to JDBC failed.at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:265)at
 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:70)at
 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)at
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:115)at
 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)at
 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1062)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:930)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:813)at
 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)at
 org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)at 
java.base/java.lang.Thread.run(Thread.java:829)Caused by: java.io.IOException: 
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO 
native_program_domain.member_program_aux_data(id, member_program_id, 
program_provider_id, enrollment_event_type, cancellation_reason, 
member_start_date, house_hold_id, enrollment_source, plan_contract_id, 
protocol, enrollment_priority, member_disenroll_date, pbp, hras_needed, 
hra_due_date, completion_status, hra_completion_method, 
hra_completion_description, enrollment_lob, clock_start_date, csp_group_id, 
csp_plan_id, group, segment_id, division, created_by, created_date, 
last_modified_by, last_modified_date) VALUES 
(('3319eb7d-e247-4eec-9c3e-e0ba9ee5b7aa'), 
('5e4fd136-2b14-3b63-9b9c-e3c1af424c55'), (NULL), ('R'), (NULL), ('20140608'), 
(NULL), (NULL), ('CONT-8'), ('PR01'), (NULL), (NULL), ('PBP-0008'), 
('FALSE'::boolean), (NULL), (NULL), (NULL), ('NextGen'), (NULL), (NULL), 
(NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL)) ON 
CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, 
member_program_id=EXCLUDED.member_program_id, 
program_provider_id=EXCLUDED.program_provider_id, 
enrollment_event_type=EXCLUDED.enrollment_event_type, 
cancellation_reason=EXCLUDED.cancellation_reason, 
member_start_date=EXCLUDED.member_start_date, 
house_hold_id=EXCLUDED.house_hold_id, 
enrollment_source=EXCLUDED.enrollment_source, 
plan_contract_id=EXCLUDED.plan_contract_id, protocol=EXCLUDED.protocol, 
enrollment_priority=EXCLUDED.enrollment_priority, 
member_disenroll_date=EXCLUDED.member_disenroll_date, pbp=EXCLUDED.pbp, 
hras_needed=EXCLUDED.hras_needed, hra_due_date=EXCLUDED.hra_due_date, 
completion_status=EXCLUDED.completion_status, 
hra_completion_method=EXCLUDED.hra_completion_method, 
hra_completion_description=EXCLUDED.hra_completion_description, 
enrollment_lob=EXCLUDED.enrollment_lob, 
clock_start_date=EXCLUDED.clock_start_date, csp_group_id=EXCLUDED.csp_group_id, 
csp_plan_id=EXCLUDED.csp_plan_id, group=EXCLUDED.group, 
segment_id=EXCLUDED.segment_id, division=EXCLUDED.division, 
created_by=EXCLUDED.created_by, created_date=EXCLUDED.created_date, 
last_modified_by=EXCLUDED.last_modified_by, 
last_modified_date=EXCLUDED.last_modified_date was aborted: ERROR: syntax error 
at or near "group"Position: 435 Call getNextException to see other errors in 
the batch.at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)at
 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:262)...
 13 moreCaused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO 
native_program_domain.member_program_aux_data(id, member_program_id, 
program_provider_id, enrollment_event_type, cancellation_reason, 
member_start_date, house_hold_id, enrollment_source, plan_contract_id, 
protocol, enrollment_priority, member_disenroll_date, pbp, hras_needed, 
hra_due_date, completion_status, hra_completion_method, 
hra_completion_description, enrollment_lob, clock_start_date, csp_group_id, 
csp_plan_id, group, segment_id, division, created_by, created_date, 
last_modified_by, last_modified_date

[jira] [Updated] (FLINK-36372) Can not insert data to sink with reserved keywords column name

2024-09-25 Thread Tan Nguyen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tan Nguyen updated FLINK-36372:
---
Description: 
Hi teams,

I created a flink application running as batch mode, we read data from kafka 
then we run the query transform data from sql file. Finally execute insert 
query from sql file to sink data. Here the insert query:

!image-2024-09-26-10-33-37-134.png!

Even I use the backticks to escape the reserved keywords, still failed, here 
the error:
{code:java}
// code placeholder
java.lang.RuntimeException: Writing records to JDBC failed.at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:265)at
 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:70)at
 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)at
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:115)at
 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)at
 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1062)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:930)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:813)at
 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)at
 org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)at 
java.base/java.lang.Thread.run(Thread.java:829)Caused by: java.io.IOException: 
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO 
native_program_domain.member_program_aux_data(id, member_program_id, 
program_provider_id, enrollment_event_type, cancellation_reason, 
member_start_date, house_hold_id, enrollment_source, plan_contract_id, 
protocol, enrollment_priority, member_disenroll_date, pbp, hras_needed, 
hra_due_date, completion_status, hra_completion_method, 
hra_completion_description, enrollment_lob, clock_start_date, csp_group_id, 
csp_plan_id, group, segment_id, division, created_by, created_date, 
last_modified_by, last_modified_date) VALUES 
(('3319eb7d-e247-4eec-9c3e-e0ba9ee5b7aa'), 
('5e4fd136-2b14-3b63-9b9c-e3c1af424c55'), (NULL), ('R'), (NULL), ('20140608'), 
(NULL), (NULL), ('CONT-8'), ('PR01'), (NULL), (NULL), ('PBP-0008'), 
('FALSE'::boolean), (NULL), (NULL), (NULL), ('NextGen'), (NULL), (NULL), 
(NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL)) ON 
CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, 
member_program_id=EXCLUDED.member_program_id, 
program_provider_id=EXCLUDED.program_provider_id, 
enrollment_event_type=EXCLUDED.enrollment_event_type, 
cancellation_reason=EXCLUDED.cancellation_reason, 
member_start_date=EXCLUDED.member_start_date, 
house_hold_id=EXCLUDED.house_hold_id, 
enrollment_source=EXCLUDED.enrollment_source, 
plan_contract_id=EXCLUDED.plan_contract_id, protocol=EXCLUDED.protocol, 
enrollment_priority=EXCLUDED.enrollment_priority, 
member_disenroll_date=EXCLUDED.member_disenroll_date, pbp=EXCLUDED.pbp, 
hras_needed=EXCLUDED.hras_needed, hra_due_date=EXCLUDED.hra_due_date, 
completion_status=EXCLUDED.completion_status, 
hra_completion_method=EXCLUDED.hra_completion_method, 
hra_completion_description=EXCLUDED.hra_completion_description, 
enrollment_lob=EXCLUDED.enrollment_lob, 
clock_start_date=EXCLUDED.clock_start_date, csp_group_id=EXCLUDED.csp_group_id, 
csp_plan_id=EXCLUDED.csp_plan_id, group=EXCLUDED.group, 
segment_id=EXCLUDED.segment_id, division=EXCLUDED.division, 
created_by=EXCLUDED.created_by, created_date=EXCLUDED.created_date, 
last_modified_by=EXCLUDED.last_modified_by, 
last_modified_date=EXCLUDED.last_modified_date was aborted: ERROR: syntax error 
at or near "group"Position: 435 Call getNextException to see other errors in 
the batch.at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)at
 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:262)...
 13 moreCaused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO 
native_program_domain.member_program_aux_data(id, member_program_id, 
program_provider_id, enrollment_event_type, cancellation_reason, 
member_start_date, house_hold_id, enrollment_source, plan_contract_id, 
protocol, enrollment_priority, member_disenroll_date, pbp, hras_needed, 
hra_due_date, completion_status, hra_completion_method, 
hra_completion_description, enrollment_lob, clock_start_date, csp_group_id, 
csp_plan_id, group, segment_id, division, created_by, created_date, 
last_modified_by, last_modified_date

[jira] [Created] (FLINK-36373) Support distribute state requests in task thread

2024-09-25 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-36373:
--

 Summary: Support distribute state requests in task thread
 Key: FLINK-36373
 URL: https://issues.apache.org/jira/browse/FLINK-36373
 Project: Flink
  Issue Type: Sub-task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36364) Do not reuse serialized key in Forst map state and/or other namespaces

2024-09-25 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884842#comment-17884842
 ] 

Zakelly Lan commented on FLINK-36364:
-

Merge 82582b3a7b75b7ffae9d48895088189b1324caa3 into master

> Do not reuse serialized key in Forst map state and/or other namespaces
> --
>
> Key: FLINK-36364
> URL: https://issues.apache.org/jira/browse/FLINK-36364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35928) ForSt supports compiling with RocksDB

2024-09-25 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-35928:
---

Assignee: Yanfei Lei  (was: Hangxiang Yu)

> ForSt supports compiling with RocksDB
> -
>
> Key: FLINK-35928
> URL: https://issues.apache.org/jira/browse/FLINK-35928
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Hangxiang Yu
>Assignee: Yanfei Lei
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36364) Do not reuse serialized key in Forst map state and/or other namespaces

2024-09-25 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan resolved FLINK-36364.
-
Fix Version/s: 2.0-preview
   Resolution: Fixed

> Do not reuse serialized key in Forst map state and/or other namespaces
> --
>
> Key: FLINK-36364
> URL: https://issues.apache.org/jira/browse/FLINK-36364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36364][state/forst] Do not reuse serialized key in Forst map state and/or other namespaces [flink]

2024-09-25 Thread via GitHub


Zakelly merged PR #25394:
URL: https://github.com/apache/flink/pull/25394


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35291] Improve the ROW data deserialization performance of DebeziumEventDeserializationScheme [flink-cdc]

2024-09-25 Thread via GitHub


github-actions[bot] commented on PR #3289:
URL: https://github.com/apache/flink-cdc/pull/3289#issuecomment-2375474952

   This pull request has been automatically marked as stale because it has not 
had recent activity for 60 days. It will be closed in 30 days if no further 
activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-21820) JDBC connector shouldn't read all rows in per statement by default

2024-09-25 Thread zhanghuaibei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884829#comment-17884829
 ] 

zhanghuaibei commented on FLINK-21820:
--

h4. [~leonard] [~jark] [~yangsanity] [~nicholasjiang] 

 Is fetchsize not effective yet? In version 3.2 of mysqlcdc, 
usecursorfetch=true has been set, but when chunksize is set to 4, it still 
causes room error

> JDBC connector shouldn't read all rows in per statement by default
> --
>
> Key: FLINK-21820
> URL: https://issues.apache.org/jira/browse/FLINK-21820
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC, Table SQL / Ecosystem
>Reporter: Leonard Xu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> The default value for JDBC option 'scan.fetch-size' is 0 which means read all 
> rows in statement, this may lead to OOM or IO timeout.
> We'd better set a reasonable value as default value.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36335][runtime] Improving Method Reusability in StreamGraphGenerator with JobVertexBuildContext [flink]

2024-09-25 Thread via GitHub


noorall commented on PR #25366:
URL: https://github.com/apache/flink/pull/25366#issuecomment-2375736309

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36355][runtime] Remove deprecated KeydStream#asQueryableState [flink]

2024-09-25 Thread via GitHub


yunfengzhou-hub closed pull request #25391: [FLINK-36355][runtime] Remove 
deprecated KeydStream#asQueryableState
URL: https://github.com/apache/flink/pull/25391


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36355][runtime] Remove deprecated KeydStream#asQueryableState [flink]

2024-09-25 Thread via GitHub


yunfengzhou-hub commented on PR #25391:
URL: https://github.com/apache/flink/pull/25391#issuecomment-2376037028

   According to offline discussion with @Zakelly, this component will 
temporarily not be removed until a new query state solution is proposed in 
Flink 2.x.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-36166) testJoinDisorderChangeLog failed on AZP

2024-09-25 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884867#comment-17884867
 ] 

xuyang edited comment on FLINK-36166 at 9/26/24 6:37 AM:
-

Everything is working fine in my local environment. I will temporarily ignore 
this test to ensure the normal release candidate for version 2.0-preview.

[~Weijie Guo] Can you assign this jira to me? I'll trace this test after 
2.0-preview and before 2.0.0.


was (Author: xuyangzhong):
I'm not quite sure if this is related to 'replacing 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#addSource with 
the Java StreamExecutionEnvironment' in FLINK-36327.

Everything is working fine in my local environment. I will temporarily ignore 
this test to ensure the normal release candidate for version 2.0-preview.

[~Weijie Guo] Can you assign this jira to me? I'll trace this test after 
2.0-preview and before 2.0.0.

> testJoinDisorderChangeLog failed on AZP
> ---
>
> Key: FLINK-36166
> URL: https://issues.apache.org/jira/browse/FLINK-36166
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 2.0-preview
>Reporter: Weijie Guo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> Aug 23 03:59:33 03:59:33.972 [ERROR] Failures: 
> Aug 23 03:59:33 03:59:33.972 [ERROR] 
> org.apache.flink.table.planner.runtime.stream.sql.TableSinkITCase.testJoinDisorderChangeLog
> Aug 23 03:59:33 03:59:33.972 [ERROR]   Run 1: 
> TableSinkITCase.testJoinDisorderChangeLog:119 
> Aug 23 03:59:33 expected: List(+I[jason, 4, 22.5, 22])
> Aug 23 03:59:33  but was: ArrayBuffer(+U[jason, 4, 22.5, 22])
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61572&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=12536



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36166) testJoinDisorderChangeLog failed on AZP

2024-09-25 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884867#comment-17884867
 ] 

xuyang edited comment on FLINK-36166 at 9/26/24 6:14 AM:
-

I'm not quite sure if this is related to 'replacing 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#addSource with 
the Java StreamExecutionEnvironment' in FLINK-36327.

Everything is working fine in my local environment. I will temporarily ignore 
this test to ensure the normal release candidate for version 2.0-preview.

[~Weijie Guo] Can you assign this jira to me? I'll trace this test after 
2.0-preview and before 2.0.0.


was (Author: xuyangzhong):
I'm not quite sure if this is related to 'replacing 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#addSource with 
the Java StreamExecutionEnvironment' in FLINK-36327.

Everything is working fine in my local environment. I will temporarily ignore 
this test to ensure the normal release candidate for version 2.0-preview.

> testJoinDisorderChangeLog failed on AZP
> ---
>
> Key: FLINK-36166
> URL: https://issues.apache.org/jira/browse/FLINK-36166
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 2.0-preview
>Reporter: Weijie Guo
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> Aug 23 03:59:33 03:59:33.972 [ERROR] Failures: 
> Aug 23 03:59:33 03:59:33.972 [ERROR] 
> org.apache.flink.table.planner.runtime.stream.sql.TableSinkITCase.testJoinDisorderChangeLog
> Aug 23 03:59:33 03:59:33.972 [ERROR]   Run 1: 
> TableSinkITCase.testJoinDisorderChangeLog:119 
> Aug 23 03:59:33 expected: List(+I[jason, 4, 22.5, 22])
> Aug 23 03:59:33  but was: ArrayBuffer(+U[jason, 4, 22.5, 22])
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61572&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=12536



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent [flink-cdc]

2024-09-25 Thread via GitHub


qg-lin commented on PR #3622:
URL: https://github.com/apache/flink-cdc/pull/3622#issuecomment-2376090149

   > Thanks for @qg-lin's contribution! Seems lenientized `RenameColumnEvent` 
is also prone to this issue. Would you like to fix this in this PR?
   
   my pleasure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLINK-36332 Add option to select the Fabric8 httpclient implemention [flink-kubernetes-operator]

2024-09-25 Thread via GitHub


SamBarker commented on code in PR #881:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/881#discussion_r1776061626


##
pom.xml:
##
@@ -110,6 +110,7 @@ under the License.
 
 
 10.15.2.0
+24.1.0

Review Comment:
   I've removed `@NotNull` from `KubernetesAutoScalerStateStore` as `@NonNull` 
is the default. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLINK-36332 Add option to select the Fabric8 httpclient implemention [flink-kubernetes-operator]

2024-09-25 Thread via GitHub


SamBarker commented on code in PR #881:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/881#discussion_r1776062522


##
.github/workflows/ci.yml:
##
@@ -78,6 +79,7 @@ jobs:
 namespace: ["default","flink"]
 mode: ["native", "standalone"]
 java-version: [ 11, 17, 21 ]
+http-client: [ "okhttp", "vertx", "jetty",  "jdk" ]

Review Comment:
   I think 
https://github.com/apache/flink-kubernetes-operator/pull/881/commits/7b74210e8d64acb881a268ac8f37271de155c060
 does the job



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLINK-36332 Add option to select the Fabric8 httpclient implemention [flink-kubernetes-operator]

2024-09-25 Thread via GitHub


SamBarker commented on code in PR #881:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/881#discussion_r1776062259


##
.github/workflows/ci.yml:
##
@@ -47,7 +48,7 @@ jobs:
 ${{ runner.os }}-maven-
   - name: Build with Maven
 run: |
-  mvn -B clean install javadoc:javadoc -Pgenerate-docs
+  mvn -B clean install javadoc:javadoc -Pgenerate-docs 
-Dfabric8.httpclinent.impl=${{ matrix.httpclient }}

Review Comment:
   Reverted in 
[57b341b](https://github.com/apache/flink-kubernetes-operator/pull/881/commits/57b341bd841404c4d31d083adcc0d1385f22dffc)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36364][state/forst] Do not reuse serialized key in Forst map state and/or other namespaces [flink]

2024-09-25 Thread via GitHub


fredia commented on code in PR #25394:
URL: https://github.com/apache/flink/pull/25394#discussion_r1776219418


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSerializerUtils.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+
+import java.io.IOException;
+
+/** A utility of serialization keys in ForSt. */
+public class ForStSerializerUtils {
+
+/**
+ * Serialize a key and namespace. No user key.
+ *
+ * @param contextKey the context key of current request
+ * @param builder key builder
+ * @param defaultNamespace default namespace of the state
+ * @param namespaceSerializer the namespace serializer
+ * @param enableKeyReuse whether to enable key reuse
+ */
+public static  byte[] serializeKeyAndNamespace(
+ContextKey contextKey,
+SerializedCompositeKeyBuilder builder,
+N defaultNamespace,
+TypeSerializer namespaceSerializer,
+boolean enableKeyReuse)
+throws IOException {
+N namespace = contextKey.getNamespace();
+namespace = (namespace == null ? defaultNamespace : namespace);
+if (enableKeyReuse && namespace == defaultNamespace) {

Review Comment:
   Thanks for the clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36344][runtime/metrics] Introduce lastCheckpointCompletedTimestamp metric [flink]

2024-09-25 Thread via GitHub


Myasuka commented on PR #25399:
URL: https://github.com/apache/flink/pull/25399#issuecomment-2375646639

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34702][table-planner] Refactor Deduplicate optimization to defer to StreamPhysicalRank for valid StreamExecDeduplicate node conversion to avoid exceptions [flink]

2024-09-25 Thread via GitHub


lincoln-lil commented on code in PR #25380:
URL: https://github.com/apache/flink/pull/25380#discussion_r1774875365


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala:
##
@@ -352,21 +352,55 @@ object RankUtil {
 }
 
 val inputRowType = rank.getInput.getRowType
-val isSortOnTimeAttribute = sortOnTimeAttribute(sortCollation, 
inputRowType)
+val isSortOnTimeAttribute = sortOnTimeAttributeOnly(sortCollation, 
inputRowType)
 
 !rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && 
isRowNumberType
   }
 
-  private def sortOnTimeAttribute(
+  private def sortOnTimeAttributeOnly(
   sortCollation: RelCollation,
   inputRowType: RelDataType): Boolean = {
 if (sortCollation.getFieldCollations.size() != 1) {
-  false
-} else {
-  val firstSortField = sortCollation.getFieldCollations.get(0)
-  val fieldType = 
inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType
-  FlinkTypeFactory.isProctimeIndicatorType(fieldType) ||
-  FlinkTypeFactory.isRowtimeIndicatorType(fieldType)
+  return false
+}
+val firstSortField = sortCollation.getFieldCollations.get(0)
+val fieldType = 
inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType
+FlinkTypeFactory.isProctimeIndicatorType(fieldType) ||
+FlinkTypeFactory.isRowtimeIndicatorType(fieldType)
+  }
+
+  /**
+   * Checks if the given sort collation has a field collation which based on a 
rowtime attribute.
+   */
+  def sortOnRowTime(sortCollation: RelCollation, inputRowType: RelDataType): 
Boolean = {
+sortCollation.getFieldCollations.exists {
+  firstSortField =>
+val fieldType = 
inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType
+FlinkTypeFactory.isRowtimeIndicatorType(fieldType)
+}
+  }
+
+  /** Whether the given rank is logically a deduplication. */
+  def isDeduplication(rank: Rank): Boolean = {
+!rank.outputRankNumber && rank.rankType == RankType.ROW_NUMBER && 
isTop1(rank.rankRange)
+  }
+
+  /** Whether the given [[StreamPhysicalRank]] could be converted to 
[[StreamExecDeduplicate]]. */
+  def canConvertToDeduplicate(rank: StreamPhysicalRank): Boolean = {
+lazy val inputInsertOnly = ChangelogPlanUtils.inputInsertOnly(rank)
+lazy val sortOnTimeAttributeOnly =
+  RankUtil.sortOnTimeAttributeOnly(rank.orderKey, rank.getInput.getRowType)
+
+isDeduplication(rank) && inputInsertOnly && sortOnTimeAttributeOnly
+  }
+
+  /** Determines if the given order key indicates that the last row should be 
kept. */
+  def keepLastRow(orderKey: RelCollation): Boolean = {

Review Comment:
   TODO rename



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36367) Use RexNormalize in EquivalentExprShuttle

2024-09-25 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-36367:
---

 Summary: Use RexNormalize in EquivalentExprShuttle
 Key: FLINK-36367
 URL: https://issues.apache.org/jira/browse/FLINK-36367
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


This is a follow up for the discussions in comments 
https://github.com/apache/flink/pull/25388#discussion_r1774177868



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36361][table] Do not use StringBuilder for EquivalentExprShuttle [flink]

2024-09-25 Thread via GitHub


snuyanzin commented on code in PR #25388:
URL: https://github.com/apache/flink/pull/25388#discussion_r1774883976


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##
@@ -439,17 +439,16 @@ object FlinkRexUtil {
   })
 
   private class EquivalentExprShuttle(rexBuilder: RexBuilder) extends 
RexShuttle {
-private val equiExprMap = mutable.HashMap[String, RexNode]()
+private val equiExprSet = mutable.HashSet[RexNode]()
 
 override def visitCall(call: RexCall): RexNode = {
   call.getOperator match {
 case EQUALS | NOT_EQUALS | GREATER_THAN | LESS_THAN | 
GREATER_THAN_OR_EQUAL |
 LESS_THAN_OR_EQUAL =>
-  val swapped = swapOperands(call)
-  if (equiExprMap.contains(swapped.toString)) {
-swapped
+  if (equiExprSet.contains(call)) {

Review Comment:
   jira issue https://issues.apache.org/jira/browse/FLINK-36367
   
   I tried
   1. it touches more than 100 plans which is still doable
   2. I faced one float issue for a couple of plans: depending on how tests are 
running (just one or for the whole class) the order of operands is different 
(the operator expression is valid for both).
   So need more time to debug and fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36368) Fix subtask management in CommittableCollector

2024-09-25 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-36368:
---

 Summary: Fix subtask management in CommittableCollector 
 Key: FLINK-36368
 URL: https://issues.apache.org/jira/browse/FLINK-36368
 Project: Flink
  Issue Type: Bug
Reporter: Arvid Heise
Assignee: Arvid Heise
 Fix For: 2.0-preview


CommittableCollector is owned by a specific committer subtask and may handle a 
number of upstream tasks emitting committables.

Currently, CommittableCollector rewrites the subtask id to its owner upon 
receiving a committable (summary). However, this makes it impossible to track 
if a specific upstream subtask has sent all committables or not.

I propose to stop rewriting the subtask id while collecting the committables 
and instead rewrite them only on emission. This should also ease debugging the 
state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36361][table] Do not use StringBuilder for EquivalentExprShuttle [flink]

2024-09-25 Thread via GitHub


snuyanzin commented on PR #25388:
URL: https://github.com/apache/flink/pull/25388#issuecomment-2373541247

   thanks for taking a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36287] Disallow UC for inner sink channels [flink]

2024-09-25 Thread via GitHub


AHeise commented on PR #25353:
URL: https://github.com/apache/flink/pull/25353#issuecomment-2373541699

   Addressed feedback and slimmed down PR a bit to the essentials. I have 
created FLINK-36368 for follow-up work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36275][rest] Remove deprecated ProgramArgsQueryParameter [flink]

2024-09-25 Thread via GitHub


reswqa merged PR #25323:
URL: https://github.com/apache/flink/pull/25323


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-36275) Remove deprecated ProgramArgsQueryParameter

2024-09-25 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo resolved FLINK-36275.

Fix Version/s: 2.0-preview
   Resolution: Done

master(2.0) via f58a3e02a63b094e8c64344a7a74808a3e13ec36.

> Remove deprecated ProgramArgsQueryParameter
> ---
>
> Key: FLINK-36275
> URL: https://issues.apache.org/jira/browse/FLINK-36275
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Affects Versions: 2.0-preview
>Reporter: Weijie Guo
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> ProgramArgsQueryParameter has been deprecated in Flink 1.7(see 
> https://issues.apache.org/jira/browse/FLINK-10295). We can remove 
> this(Breaking compatibility) in Flink 2.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36275) Remove deprecated ProgramArgsQueryParameter

2024-09-25 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-36275:
--

Assignee: Weijie Guo

> Remove deprecated ProgramArgsQueryParameter
> ---
>
> Key: FLINK-36275
> URL: https://issues.apache.org/jira/browse/FLINK-36275
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Affects Versions: 2.0-preview
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> ProgramArgsQueryParameter has been deprecated in Flink 1.7(see 
> https://issues.apache.org/jira/browse/FLINK-10295). We can remove 
> this(Breaking compatibility) in Flink 2.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-30614][serializer] Remove old method of resolving schema compa… [flink]

2024-09-25 Thread via GitHub


AlexYinHan commented on PR #25384:
URL: https://github.com/apache/flink/pull/25384#issuecomment-2373451676

   @masteryhx Thanks for the comments. I have update the docs. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36325][statebackend] Implement basic restore from checkpoint for ForStStateBackend [flink]

2024-09-25 Thread via GitHub


flinkbot commented on PR #25397:
URL: https://github.com/apache/flink/pull/25397#issuecomment-2373503514

   
   ## CI report:
   
   * 28d1f2ed93431e3ec739b3f5af700dc0704accc9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-09-25 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-35035:
--
Fix Version/s: 2.0-preview

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Assignee: Zdenek Tison
>Priority: Minor
> Fix For: 2.0-preview
>
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36292][Connectors/Common]: remove timeout to avoid timeout exception [flink]

2024-09-25 Thread via GitHub


showuon commented on code in PR #25371:
URL: https://github.com/apache/flink/pull/25371#discussion_r1774665859


##
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java:
##
@@ -71,7 +71,7 @@ public void testCloseFetcherWithException() throws Exception {
 .hasRootCauseMessage("Artificial exception on closing the 
split reader.");
 }
 
-@Test(timeout = 3)
+@Test

Review Comment:
   Sorry @becketqin , I don't why adding timeout to `Long.MAX_VALUE` will work 
when there are tasks not terminated. I did a test and confirmed even if there 
are tasks not terminated, the test still can pass, with or without the `timeout 
= Long.MAX_VALUE`. 
   
   To catch the error when there are tasks failing to be closed, I change the 
`close` method to return `boolean`, so that we have a way to know if tasks are 
completely closed in time, otherwise, fail the test. Please take a look again. 
Thanks.



##
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java:
##
@@ -71,7 +71,7 @@ public void testCloseFetcherWithException() throws Exception {
 .hasRootCauseMessage("Artificial exception on closing the 
split reader.");
 }
 
-@Test(timeout = 3)
+@Test

Review Comment:
   Sorry @becketqin , I don't get why adding timeout to `Long.MAX_VALUE` will 
work when there are tasks not terminated. I did a test and confirmed even if 
there are tasks not terminated, the test still can pass, with or without the 
`timeout = Long.MAX_VALUE`. 
   
   To catch the error when there are tasks failing to be closed, I change the 
`close` method to return `boolean`, so that we have a way to know if tasks are 
completely closed in time, otherwise, fail the test. Please take a look again. 
Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-09-25 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-35035.
---
Resolution: Fixed

Closing this parent issue. All subtasks are addressed and the no new test 
instabilities related to this change were identified in the past 1-2 weeks.

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Assignee: Zdenek Tison
>Priority: Minor
> Fix For: 2.0-preview
>
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [DRAFT][FLINK-36366][core] Remove deprecate API in flink-core exclude connector and state part [flink]

2024-09-25 Thread via GitHub


flinkbot commented on PR #25396:
URL: https://github.com/apache/flink/pull/25396#issuecomment-2373400996

   
   ## CI report:
   
   * 9f67d79ffaceec279e944054d51880ebb8d340cd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36325][statebackend] Implement basic restore from checkpoint for ForStStateBackend [flink]

2024-09-25 Thread via GitHub


zoltar9264 opened a new pull request, #25397:
URL: https://github.com/apache/flink/pull/25397

   ## What is the purpose of the change
   
   Impletement basic restore from checkpoint in ForStStateBackend, the 
checkpoint implement in 
[FLINK-35510](https://issues.apache.org/jira/browse/FLINK-35510).
   Note that ForStStateBackend now support sync and async two kind keyed state 
backend, we make async one support checkpoint and restore first.
   
   ## Brief change log
   
   *(for example:)*
 - Introduce ForStIncrementalRestoreOperation
 - Implement from checkpoint in ForStKeyedStateBackendBuilder
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Add new test case in ForStStateDataTransferTest
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36325) Implement basic restore from checkpoint for ForStStateBackend

2024-09-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36325:
---
Labels: pull-request-available  (was: )

> Implement basic restore from checkpoint for ForStStateBackend
> -
>
> Key: FLINK-36325
> URL: https://issues.apache.org/jira/browse/FLINK-36325
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
>
> As title, implement basic restore from checkpoint for ForStStateBackend, 
> rescale will be implemented later.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36011) Generalize RescaleManager to become StateTransitionManager

2024-09-25 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884537#comment-17884537
 ] 

Matthias Pohl commented on FLINK-36011:
---

master: 
[c869326d089705475481c2c2ea42a6efabb8c828|https://github.com/apache/flink/commit/c869326d089705475481c2c2ea42a6efabb8c828]

> Generalize RescaleManager to become StateTransitionManager
> --
>
> Key: FLINK-36011
> URL: https://issues.apache.org/jira/browse/FLINK-36011
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zdenek Tison
>Assignee: Zdenek Tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> The goal is to change the RescaleManager component to one with a broader 
> responsibility that will manage the adaptive scheduler's state transitions.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [DRAFT][FLINK-29741][scala] Remove flink-dist-scala, flink-scala, flink-streaming-scala module [flink]

2024-09-25 Thread via GitHub


reswqa commented on PR #25393:
URL: https://github.com/apache/flink/pull/25393#issuecomment-2373310633

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36366) Remove deprecate API in flink-core exclude connector and state part

2024-09-25 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-36366:
--

 Summary: Remove deprecate API in flink-core exclude connector and 
state part
 Key: FLINK-36366
 URL: https://issues.apache.org/jira/browse/FLINK-36366
 Project: Flink
  Issue Type: Sub-task
  Components: API / Core
Affects Versions: 2.0-preview
Reporter: Weijie Guo
Assignee: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36366) Remove deprecate API in flink-core exclude connector and state part

2024-09-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36366:
---
Labels: pull-request-available  (was: )

> Remove deprecate API in flink-core exclude connector and state part
> ---
>
> Key: FLINK-36366
> URL: https://issues.apache.org/jira/browse/FLINK-36366
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Core
>Affects Versions: 2.0-preview
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [DRAFT][FLINK-36366][core] Remove deprecate API in flink-core exclude connector and state part [flink]

2024-09-25 Thread via GitHub


reswqa opened a new pull request, #25396:
URL: https://github.com/apache/flink/pull/25396

   ## What is the purpose of the change
   
   *Remove deprecate API in flink-core exclude connector and state part*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: yes
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36335][runtime] Improving Method Reusability in StreamGraphGenerator with JobVertexBuildContext [flink]

2024-09-25 Thread via GitHub


JunRuiLee commented on code in PR #25366:
URL: https://github.com/apache/flink/pull/25366#discussion_r1772938079


##
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Helper class encapsulates all necessary information and configurations 
required during the
+ * construction of job vertices.
+ */
+@Internal
+public class JobVertexBuildContext {
+
+private final StreamGraph streamGraph;
+
+/** The {@link OperatorChainInfo}s, key is the start node id of the chain. 
*/
+private final Map chainInfos;
+
+/** The {@link OperatorInfo}s, key is the id of the stream node. */
+private final Map operatorInfosInorder;
+
+// The config for node chained with the current operator,for multi input 
operators containing
+// sourceChain, the key will be the ids of all nodes in the chain, 
otherwise the key will only
+// be the id of the head node in the chain
+private final Map> chainedConfigs;
+
+// The created JobVertex, the key is start node id.
+private final Map jobVerticesInorder;
+
+// Futures for the serialization of operator coordinators.
+private final Map<
+JobVertexID,
+
List>>>
+coordinatorSerializationFuturesPerJobVertex;
+
+// The order of StreamEdge connected to other vertices should be 
consistent with the order in
+// which JobEdge was created.
+private final List physicalEdgesInOrder;
+
+// We need to use AtomicBoolean to sense whether HybridResultPartition 
exists during the process

Review Comment:
   // We use AtomicBoolean to track the existence of HybridResultPartition 
   // during the incremental JobGraph generation process introduced by 
   // AdaptiveGraphManager. It is essential to globally monitor changes 
   // to this variable, thus necessitating the use of a Boolean object 
   // instead of a primitive boolean.



##
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.Se

Re: [PR] [FLINK-36361][table] Do not use StringBuilder for EquivalentExprShuttle [flink]

2024-09-25 Thread via GitHub


snuyanzin merged PR #25388:
URL: https://github.com/apache/flink/pull/25388


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36361) Do not use StringBuilder for EquivalentExprShuttle

2024-09-25 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884527#comment-17884527
 ] 

Sergey Nuyanzin commented on FLINK-36361:
-

Merged as 
[0578c8e17dddc88444635e7ad8f159cb52982af3|https://github.com/apache/flink/commit/0578c8e17dddc88444635e7ad8f159cb52982af3]

> Do not use StringBuilder for EquivalentExprShuttle
> --
>
> Key: FLINK-36361
> URL: https://issues.apache.org/jira/browse/FLINK-36361
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Currently there is {{EquivalentExprShuttle}} where there is a map of 
> {{toString}} node to {{RelNode}}. In fact there is no need for toString which 
> is calculated each time with usage of {{StringBuilder}} under the hood. And 
> even more we faced one weird case where it consumes memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36361) Do not use StringBuilder for EquivalentExprShuttle

2024-09-25 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-36361.
-
Fix Version/s: 2.0-preview
   Resolution: Fixed

> Do not use StringBuilder for EquivalentExprShuttle
> --
>
> Key: FLINK-36361
> URL: https://issues.apache.org/jira/browse/FLINK-36361
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> Currently there is {{EquivalentExprShuttle}} where there is a map of 
> {{toString}} node to {{RelNode}}. In fact there is no need for toString which 
> is calculated each time with usage of {{StringBuilder}} under the hood. And 
> even more we faced one weird case where it consumes memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36273) Remove deprecated Table/SQL configuration in 2.0

2024-09-25 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36273.
---
  Assignee: xuyang
Resolution: Fixed

Fixed in master: 733a56682bcceabab6625b621547538c6d993976

> Remove deprecated Table/SQL configuration in 2.0
> 
>
> Key: FLINK-36273
> URL: https://issues.apache.org/jira/browse/FLINK-36273
> Project: Flink
>  Issue Type: Sub-task
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36273][table] Remove deprecated Table/SQL configuration in 2.0 [flink]

2024-09-25 Thread via GitHub


lincoln-lil merged PR #25334:
URL: https://github.com/apache/flink/pull/25334


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36188] Fix disable buffer flush lose efficacy [flink-connector-hbase]

2024-09-25 Thread via GitHub


boring-cyborg[bot] commented on PR #49:
URL: 
https://github.com/apache/flink-connector-hbase/pull/49#issuecomment-2373679524

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-36188) HBase disable buffer flush lose efficacy

2024-09-25 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky reassigned FLINK-36188:


Assignee: MOBIN

> HBase disable buffer flush lose efficacy
> 
>
> Key: FLINK-36188
> URL: https://issues.apache.org/jira/browse/FLINK-36188
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: hbase-3.0.0
> Environment: Flink 1.16.3
>Reporter: MOBIN
>Assignee: MOBIN
>Priority: Critical
>  Labels: pull-request-available
>
> HBase table
> ||rowkey||col||
> |1|1|
> The user lookup joins the hbase table, adds 1 to the col value, and writes it 
> back to hbase
> {code:java}
> @Test
> void testTableSinkDisabledBufferFlush() throws Exception {
>         StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
> streamSettings);        
> tEnv.executeSql(
>                 "CREATE TABLE hTableForSink ("
>                         + " rowkey INT PRIMARY KEY NOT ENFORCED,"
>                         + " family1 ROW"
>                         + ") WITH ("
>                         + " 'connector' = 'hbase-2.2',"
>                         + " 'sink.buffer-flush.max-size' = '0',"
>                         + " 'sink.buffer-flush.max-rows' = '0',"
>                         + " 'table-name' = '"
>                         + TEST_TABLE_6
>                         + "',"
>                         + " 'zookeeper.quorum' = '"
>                         + getZookeeperQuorum()
>                         + "'"
>                         + ")");       
> String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
>         tEnv.executeSql(insert).await();        
> tEnv.executeSql(
>                 "CREATE VIEW user_click AS "
>                         + " SELECT user_id, proctime() AS proc_time"
>                         + " FROM ( "
>                         + " VALUES(1), (1), (1), (1), (1)"
>                         + " ) AS t (user_id);");    
>     
>tEnv.executeSql(
>                 "INSERT INTO hTableForSink SELECT "
>                         + "    user_id as rowkey,"
>                         + "    ROW(CAST(family1.col1 + 1 AS INT))"
>                         + " FROM user_click INNER JOIN hTableForSink"
>                         + " FOR SYSTEM_TIME AS OF user_click.proc_time"
>                         + " ON hTableForSink.rowkey = user_click.user_id;");  
>       
> tEnv.executeSql(
>                 "CREATE TABLE hTableForQuery ("
>                         + " rowkey INT PRIMARY KEY NOT ENFORCED,"
>                         + " family1 ROW"
>                         + ") WITH ("
>                         + " 'connector' = 'hbase-2.2',"
>                         + " 'table-name' = '"
>                         + TEST_TABLE_6
>                         + "',"
>                         + " 'zookeeper.quorum' = '"
>                         + getZookeeperQuorum()
>                         + "'"
>                         + ")");
>         String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";     
>    
> TableResult firstResult = tEnv.executeSql(query);
>         List firstResults = 
> CollectionUtil.iteratorToList(firstResult.collect());
>         String firstExpected = "+I[1, 6]";
>         TestBaseUtils.compareResultAsText(firstResults, firstExpected);
>     } {code}
> test failed
> {code:java}
> org.junit.ComparisonFailure: Different elements in arrays: expected 1 
> elements and received 1
>  expected: [+I[1, 6]]
>  received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
> Expected :+I[1, 6]
> Actual   :+I[1, 2] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36188] Fix disable buffer flush lose efficacy [flink-connector-hbase]

2024-09-25 Thread via GitHub


ferenc-csaky merged PR #49:
URL: https://github.com/apache/flink-connector-hbase/pull/49


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-36188) HBase disable buffer flush lose efficacy

2024-09-25 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky closed FLINK-36188.

Resolution: Fixed

[{{c6eb87f}}|https://github.com/apache/flink-connector-hbase/commit/c6eb87f809cd30ad352d1644980eefce658bb63f]
 in main

> HBase disable buffer flush lose efficacy
> 
>
> Key: FLINK-36188
> URL: https://issues.apache.org/jira/browse/FLINK-36188
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: hbase-3.0.0
> Environment: Flink 1.16.3
>Reporter: MOBIN
>Assignee: MOBIN
>Priority: Critical
>  Labels: pull-request-available
> Fix For: hbase-4.0.0
>
>
> HBase table
> ||rowkey||col||
> |1|1|
> The user lookup joins the hbase table, adds 1 to the col value, and writes it 
> back to hbase
> {code:java}
> @Test
> void testTableSinkDisabledBufferFlush() throws Exception {
>         StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
> streamSettings);        
> tEnv.executeSql(
>                 "CREATE TABLE hTableForSink ("
>                         + " rowkey INT PRIMARY KEY NOT ENFORCED,"
>                         + " family1 ROW"
>                         + ") WITH ("
>                         + " 'connector' = 'hbase-2.2',"
>                         + " 'sink.buffer-flush.max-size' = '0',"
>                         + " 'sink.buffer-flush.max-rows' = '0',"
>                         + " 'table-name' = '"
>                         + TEST_TABLE_6
>                         + "',"
>                         + " 'zookeeper.quorum' = '"
>                         + getZookeeperQuorum()
>                         + "'"
>                         + ")");       
> String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
>         tEnv.executeSql(insert).await();        
> tEnv.executeSql(
>                 "CREATE VIEW user_click AS "
>                         + " SELECT user_id, proctime() AS proc_time"
>                         + " FROM ( "
>                         + " VALUES(1), (1), (1), (1), (1)"
>                         + " ) AS t (user_id);");    
>     
>tEnv.executeSql(
>                 "INSERT INTO hTableForSink SELECT "
>                         + "    user_id as rowkey,"
>                         + "    ROW(CAST(family1.col1 + 1 AS INT))"
>                         + " FROM user_click INNER JOIN hTableForSink"
>                         + " FOR SYSTEM_TIME AS OF user_click.proc_time"
>                         + " ON hTableForSink.rowkey = user_click.user_id;");  
>       
> tEnv.executeSql(
>                 "CREATE TABLE hTableForQuery ("
>                         + " rowkey INT PRIMARY KEY NOT ENFORCED,"
>                         + " family1 ROW"
>                         + ") WITH ("
>                         + " 'connector' = 'hbase-2.2',"
>                         + " 'table-name' = '"
>                         + TEST_TABLE_6
>                         + "',"
>                         + " 'zookeeper.quorum' = '"
>                         + getZookeeperQuorum()
>                         + "'"
>                         + ")");
>         String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";     
>    
> TableResult firstResult = tEnv.executeSql(query);
>         List firstResults = 
> CollectionUtil.iteratorToList(firstResult.collect());
>         String firstExpected = "+I[1, 6]";
>         TestBaseUtils.compareResultAsText(firstResults, firstExpected);
>     } {code}
> test failed
> {code:java}
> org.junit.ComparisonFailure: Different elements in arrays: expected 1 
> elements and received 1
>  expected: [+I[1, 6]]
>  received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
> Expected :+I[1, 6]
> Actual   :+I[1, 2] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36188) HBase disable buffer flush lose efficacy

2024-09-25 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-36188:
-
Fix Version/s: hbase-4.0.0

> HBase disable buffer flush lose efficacy
> 
>
> Key: FLINK-36188
> URL: https://issues.apache.org/jira/browse/FLINK-36188
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: hbase-3.0.0
> Environment: Flink 1.16.3
>Reporter: MOBIN
>Assignee: MOBIN
>Priority: Critical
>  Labels: pull-request-available
> Fix For: hbase-4.0.0
>
>
> HBase table
> ||rowkey||col||
> |1|1|
> The user lookup joins the hbase table, adds 1 to the col value, and writes it 
> back to hbase
> {code:java}
> @Test
> void testTableSinkDisabledBufferFlush() throws Exception {
>         StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
> streamSettings);        
> tEnv.executeSql(
>                 "CREATE TABLE hTableForSink ("
>                         + " rowkey INT PRIMARY KEY NOT ENFORCED,"
>                         + " family1 ROW"
>                         + ") WITH ("
>                         + " 'connector' = 'hbase-2.2',"
>                         + " 'sink.buffer-flush.max-size' = '0',"
>                         + " 'sink.buffer-flush.max-rows' = '0',"
>                         + " 'table-name' = '"
>                         + TEST_TABLE_6
>                         + "',"
>                         + " 'zookeeper.quorum' = '"
>                         + getZookeeperQuorum()
>                         + "'"
>                         + ")");       
> String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
>         tEnv.executeSql(insert).await();        
> tEnv.executeSql(
>                 "CREATE VIEW user_click AS "
>                         + " SELECT user_id, proctime() AS proc_time"
>                         + " FROM ( "
>                         + " VALUES(1), (1), (1), (1), (1)"
>                         + " ) AS t (user_id);");    
>     
>tEnv.executeSql(
>                 "INSERT INTO hTableForSink SELECT "
>                         + "    user_id as rowkey,"
>                         + "    ROW(CAST(family1.col1 + 1 AS INT))"
>                         + " FROM user_click INNER JOIN hTableForSink"
>                         + " FOR SYSTEM_TIME AS OF user_click.proc_time"
>                         + " ON hTableForSink.rowkey = user_click.user_id;");  
>       
> tEnv.executeSql(
>                 "CREATE TABLE hTableForQuery ("
>                         + " rowkey INT PRIMARY KEY NOT ENFORCED,"
>                         + " family1 ROW"
>                         + ") WITH ("
>                         + " 'connector' = 'hbase-2.2',"
>                         + " 'table-name' = '"
>                         + TEST_TABLE_6
>                         + "',"
>                         + " 'zookeeper.quorum' = '"
>                         + getZookeeperQuorum()
>                         + "'"
>                         + ")");
>         String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";     
>    
> TableResult firstResult = tEnv.executeSql(query);
>         List firstResults = 
> CollectionUtil.iteratorToList(firstResult.collect());
>         String firstExpected = "+I[1, 6]";
>         TestBaseUtils.compareResultAsText(firstResults, firstExpected);
>     } {code}
> test failed
> {code:java}
> org.junit.ComparisonFailure: Different elements in arrays: expected 1 
> elements and received 1
>  expected: [+I[1, 6]]
>  received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
> Expected :+I[1, 6]
> Actual   :+I[1, 2] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35233][BP v3.0] Fix lookup cache reuse RowData object problem. [flink-connector-hbase]

2024-09-25 Thread via GitHub


ferenc-csaky merged PR #48:
URL: https://github.com/apache/flink-connector-hbase/pull/48


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36292][Connectors/Common]: remove timeout to avoid timeout exception [flink]

2024-09-25 Thread via GitHub


becketqin commented on code in PR #25371:
URL: https://github.com/apache/flink/pull/25371#discussion_r1775726090


##
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java:
##
@@ -71,7 +71,7 @@ public void testCloseFetcherWithException() throws Exception {
 .hasRootCauseMessage("Artificial exception on closing the 
split reader.");
 }
 
-@Test(timeout = 3)
+@Test

Review Comment:
   >  I did a test and confirmed even if there are tasks not terminated, the 
test still can pass, with or without the timeout = Long.MAX_VALUE.
   
   Hmm, this is not expected. The timeout is used in the 
`SplitFetcherManager.close()`
   ```
   executors.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)
   ```
   This is a blocking call and won't return until the executors finishes all 
the tasks, or otherwise interrupted.
   
   So if the timeout is `Long.MAX_VALUE`, and there are running tasks, 
`SplitFetcherManager.close()` will block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36292][Connectors/Common]: remove timeout to avoid timeout exception [flink]

2024-09-25 Thread via GitHub


becketqin commented on code in PR #25371:
URL: https://github.com/apache/flink/pull/25371#discussion_r1775726090


##
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java:
##
@@ -71,7 +71,7 @@ public void testCloseFetcherWithException() throws Exception {
 .hasRootCauseMessage("Artificial exception on closing the 
split reader.");
 }
 
-@Test(timeout = 3)
+@Test

Review Comment:
   >  I did a test and confirmed even if there are tasks not terminated, the 
test still can pass, with or without the timeout = Long.MAX_VALUE.
   
   Hmm, this is not expected. The timeout is used in the 
`SplitFetcherManager.close()`
   ```
   executors.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)
   ```
   This is a blocking call and won't return until the executors finishes all 
the tasks, or otherwise be interrupted.
   
   So if the timeout is `Long.MAX_VALUE`, and there are running tasks, 
`SplitFetcherManager.close()` will block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DRAFT][FLINK-36336] Remove deprecated dataset API dependency exclude table module [flink]

2024-09-25 Thread via GitHub


codenohup commented on PR #25365:
URL: https://github.com/apache/flink/pull/25365#issuecomment-2374836433

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Fix Stale label automatically removed with no update [flink-cdc]

2024-09-25 Thread via GitHub


leonardBang merged PR #3621:
URL: https://github.com/apache/flink-cdc/pull/3621


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36273][table] Remove deprecated Table/SQL configuration in 2.0 [flink]

2024-09-25 Thread via GitHub


lincoln-lil commented on PR #25334:
URL: https://github.com/apache/flink/pull/25334#issuecomment-2373644347

   For 2nd commit,  went over all options mentioned in 
https://issues.apache.org/jira/browse/FLINK-35473
   All deprecated options there were remove except the occurrences in 
corresponding release notes.
   
   The changes looks good!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36369][table] Move deprecated user-visible classes in table modules to the legacy package [flink]

2024-09-25 Thread via GitHub


xuyangzhong commented on PR #25400:
URL: https://github.com/apache/flink/pull/25400#issuecomment-2374388353

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-34702) Rank should not convert to StreamExecDuplicate when the input is not insert only

2024-09-25 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-34702.
---
Fix Version/s: 2.0-preview
   (was: 2.0.0)
   Resolution: Fixed

fixed in master: 2f8b2d81a97a681d67964ef17932ea0abf35730f

> Rank should not convert to StreamExecDuplicate when the input is not insert 
> only
> 
>
> Key: FLINK-34702
> URL: https://issues.apache.org/jira/browse/FLINK-34702
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
>   val sqlQuery =
> """
>   |SELECT *
>   |FROM (
>   |  SELECT *,
>   |ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
> rowNum
>   |  FROM (select a, count(b) as b from MyTable group by a)
>   |)
>   |WHERE rowNum = 1
> """.stripMargin
>   util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't 
> support consuming update changes which is produced by node 
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now 
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in 
> this case. and we can defer whether input contains update change in the 
> "optimize the physical plan" phase. 
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate 
> can support consuming update changes , we can deprecate it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34702][table-planner] Refactor Deduplicate optimization to defer to StreamPhysicalRank for valid StreamExecDeduplicate node conversion to avoid exceptions [flink]

2024-09-25 Thread via GitHub


lincoln-lil merged PR #25380:
URL: https://github.com/apache/flink/pull/25380


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLINK-36332 Add option to select the Fabric8 httpclient implemention [flink-kubernetes-operator]

2024-09-25 Thread via GitHub


gyfora commented on code in PR #881:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/881#discussion_r1774650443


##
.github/workflows/ci.yml:
##
@@ -47,7 +48,7 @@ jobs:
 ${{ runner.os }}-maven-
   - name: Build with Maven
 run: |
-  mvn -B clean install javadoc:javadoc -Pgenerate-docs
+  mvn -B clean install javadoc:javadoc -Pgenerate-docs 
-Dfabric8.httpclinent.impl=${{ matrix.httpclient }}

Review Comment:
   I prefer the e2e approach, unit tests mock most http interactions anyways



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLINK-36332 Add option to select the Fabric8 httpclient implemention [flink-kubernetes-operator]

2024-09-25 Thread via GitHub


gyfora commented on code in PR #881:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/881#discussion_r1774647193


##
.github/workflows/ci.yml:
##
@@ -78,6 +79,7 @@ jobs:
 namespace: ["default","flink"]
 mode: ["native", "standalone"]
 java-version: [ 11, 17, 21 ]
+http-client: [ "okhttp", "vertx", "jetty",  "jdk" ]

Review Comment:
   I suggest to only test the different HTTP clients in a single test case such 
as:
   `v1_20, default, native, java21 and test_application_operations.sh `
   
   we don't want to blow up our matrix by a factor of 4 :D 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] 【update】Add new operator config "savepoint-create-subdir" to allow user to configure whether to create sub-directories named by job id under the 'state.savepoints.dir' [flink-kubernetes-opera

2024-09-25 Thread via GitHub


gyfora commented on PR #882:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/882#issuecomment-2373230412

   Hi @xieyang1994 !
   
   New features need a JIRA first and the PR should be opened against main in 
any case unless we are back porting a bug fix to one of the supported release 
versions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36335][runtime] Improving Method Reusability in StreamGraphGenerator with JobVertexBuildContext [flink]

2024-09-25 Thread via GitHub


noorall commented on code in PR #25366:
URL: https://github.com/apache/flink/pull/25366#discussion_r1774908113


##
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -593,15 +616,22 @@ private Map 
buildChainedInputsAndGetHeadInputs(
 
 if (targetChainingStrategy == 
ChainingStrategy.HEAD_WITH_SOURCES
 && isChainableInput(sourceOutEdge, streamGraph)) {
+// we cache the non-chainable outputs here, and set the 
non-chained config later
+OperatorInfo operatorInfo = new OperatorInfo();
+operatorInfo.setChainableOutputs(Collections.emptyList());

Review Comment:
   > maybe nonChainable?
   > 
   > ```
   > OperatorInfo operatorInfo = new OperatorInfo();
   > jobVertexBuildContext.addOperatorInfo(sourceNodeId, operatorInfo);
   > 
   > final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));
   > final StreamConfig.SourceInputConfig inputConfig =
   > new 
StreamConfig.SourceInputConfig(sourceOutEdge);
   > final StreamConfig operatorConfig = new StreamConfig(new Configuration());
   > 
   > setOperatorConfig(
   > sourceNodeId,
   > operatorConfig,
   > Collections.emptyMap(),
   > jobVertexBuildContext);
   > setOperatorChainedOutputsConfig(
   > operatorConfig, Collections.emptyList(), 
jobVertexBuildContext);
   > 
   > // we cache the non-chainable outputs here, and set the non-chained config 
later
   > operatorInfo.setNonChainableOutputs(Collections.emptyList());
   > ```
   > 
   > And a unit test case is needed
   
   Thank you for the reminder and I have corrected this part. However, there is 
no need for an additional unit test because the property in OperatorInfo is an 
empty list by default. Therefore, it doesn't matter whether it is set or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36335][runtime] Improving Method Reusability in StreamGraphGenerator with JobVertexBuildContext [flink]

2024-09-25 Thread via GitHub


noorall commented on PR #25366:
URL: https://github.com/apache/flink/pull/25366#issuecomment-2373601784

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36109) Improve FlinkStateSnapshot resource labels for efficient filtering

2024-09-25 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884626#comment-17884626
 ] 

Gyula Fora commented on FLINK-36109:


I think adding these labels makes sense for now 

> Improve FlinkStateSnapshot resource labels for efficient filtering
> --
>
> Key: FLINK-36109
> URL: https://issues.apache.org/jira/browse/FLINK-36109
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Critical
> Fix For: kubernetes-operator-1.10.0
>
>
> The new FlinkStateSnapshot CRs currently have the following label only:
> ```
> snapshot.type: XY
> ```
> We need to investigate if we can efficiently filter based on snapshot 
> resource type (savepoint/checkpoint), target job etc. I am not sure if we can 
> do this based on the spec directly or we need labels for it.
> Also we should probably rename the snapshot.type label to trigger.type to be 
> more straightforward.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34876][transform] Support UDF functions in transform [flink-cdc]

2024-09-25 Thread via GitHub


yuxiqian commented on PR #3465:
URL: https://github.com/apache/flink-cdc/pull/3465#issuecomment-2374215861

   Hi @mamineturki, `flink-cdc-pipeline-udf-examples` are for testing purposes 
only. If you'd like to test it, using 
`org.apache.flink.cdc.udf.examples.scala.AddOneFunctionClass` should work. 
Seems it wasn't successfully loaded by Flink classloaders in your case.
   
   Have you tried putting `.jar` file into Flink's `/lib` path (not Flink CDC 
`/lib`!) and restart the Flink cluster? If that doesn't work, try passing it 
with `--jar XXX.jar` argument when submitting pipeline job with 
`bin/flink-cdc.sh`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34876][transform] Support UDF functions in transform [flink-cdc]

2024-09-25 Thread via GitHub


mamineturki commented on PR #3465:
URL: https://github.com/apache/flink-cdc/pull/3465#issuecomment-2374187978

   @yuxiqian Hi, I have been trying to run an example with this new feature but 
I always get `java.lang.ClassNotFoundException: 
org.example.udf.AddOneFunctionClass`
   
   I tried loading the udf function using the path in the PR description, as 
well as the package name 
`org.apache.flink.cdc.udf.examples.scala.AddOneFunctionClass` with no luck.
   
   I imported the function by downloading the pre-packaged jar `wget 
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-udf-examples/3.2.0/flink-cdc-pipeline-udf-examples-3.2.0.jar`
   
   please advise on what am I missing exactly. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34876][transform] Support UDF functions in transform [flink-cdc]

2024-09-25 Thread via GitHub


mamineturki commented on PR #3465:
URL: https://github.com/apache/flink-cdc/pull/3465#issuecomment-2374243091

   Thanks for the speedy reply @yuxiqian . moving the jar to `Flink lib` did 
not help. passing the jar with `--jar` caused the `Flink-cdc.sh` command to get 
stuck with no logs, and no submitted job.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34876][transform] Support UDF functions in transform [flink-cdc]

2024-09-25 Thread via GitHub


mamineturki commented on PR #3465:
URL: https://github.com/apache/flink-cdc/pull/3465#issuecomment-2374250013

   Thanks for the speedy response @yuxiqian . passing the jar using `--jar` 
worked for me. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36369][table] Move deprecated user-visible classes in table modules to the legacy package [flink]

2024-09-25 Thread via GitHub


flinkbot commented on PR #25400:
URL: https://github.com/apache/flink/pull/25400#issuecomment-2374256737

   
   ## CI report:
   
   * 259abc16084da0205bae73b37033c29ecb079b29 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36370) Flink 1.18 fails with Empty server certificate chain when High Availability and mTLS both enabled

2024-09-25 Thread Aniruddh J (Jira)
Aniruddh J created FLINK-36370:
--

 Summary: Flink 1.18 fails with Empty server certificate chain when 
High Availability and mTLS both enabled
 Key: FLINK-36370
 URL: https://issues.apache.org/jira/browse/FLINK-36370
 Project: Flink
  Issue Type: Bug
Reporter: Aniruddh J


Hi, in my kubernetes cluster I have flink-kubernetes-operator v1.7.0 and 
apache-flink v1.18.1 installed. In the FlinkDeployment CR when I enable 
Kubernetes high availability services with mTLS something like below:
```
high-availability.type: kubernetes
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: 'file:///mnt/pv/ha'
security.ssl.rest.authentication-enabled: 'true'
```
I am ending up with `SSLHandshakeException with empty client certificate` .
 
Though both of them work fine when implemented individually. Upon enabling  
`{{{}-{}}}{{{}[Djavax.net|http://djavax.net/]{}}}{{{}.debug=all`{}}} observed 
client server communication and figured out  
[https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java]
 is where Client gets setup and it happens from the operator side 
[https://github.com/apache/flink-kubernetes-operator/blob/b081b75b72ddde643710e869b95b214912882363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L750]
 (correct me here please)
 
When we enable both mTLS and HA the client doesn't seem to be getting setup. 
Not only that, it doesn't follow the same path of client creation. Below is the 
part of the ssl handshake log before getting the error:
```
javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
15:16:12.508 GMT|null:-1|Produced CertificateRequest handshake message (
"CertificateRequest": {
"certificate types": [ecdsa_sign, rsa_sign, dss_sign]
"supported signature algorithms": [ecdsa_secp256r1_sha256, .., rsa_sha224, 
dsa_sha224, ecdsa_sha1, rsa_pkcs1_sha1, dsa_sha1]
"certificate authorities": [CN=FlinkCA, O=Apache Flink]
}
javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
15:16:12.512 GMT|null:-1|Raw read (
: 1603030007 0B 0300 
)
javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
15:16:12.513 GMT|null:-1|READ: TLSv1.2 handshake, length = 7
javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
15:16:12.513 GMT|null:-1|Consuming client Certificate handshake message (
"Certificates": 
)
javax.net.ssl|ERROR|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
15:16:12.514 GMT|null:-1|Fatal (BAD_CERTIFICATE): Empty server certificate 
chain (
"throwable" : {
javax.net.ssl.SSLHandshakeException: Empty server certificate chain
 
```
>From the initial looks it seems when Flink server is requesting for 
>certificates from Client, the client doesn't send anything back since it does 
>not have matching CAs?
 
Some client is sending a REST request to Flink server which the netty library 
is handling but until we figure out the client we don't know whether it's the 
truststore on client that's a problem or something else we don't see here. 
Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36370) Flink 1.18 fails with Empty server certificate chain when High Availability and mTLS both enabled

2024-09-25 Thread Aniruddh J (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aniruddh J updated FLINK-36370:
---
 Attachment: flink-cert-issue.log
Description: 
Hi, in my kubernetes cluster I have flink-kubernetes-operator v1.7.0 and 
apache-flink v1.18.1 installed. In the FlinkDeployment CR when I enable 
Kubernetes high availability services with mTLS something like below:


{code:java}
high-availability.type: kubernetes
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: 'file:///mnt/pv/ha'
security.ssl.rest.authentication-enabled: 'true'{code}


I am ending up with *SSLHandshakeException with empty client certificate*
 
Though both of them work fine when implemented individually. Upon enabling  
*{{{}-{}}}{{{}[Djavax.net|http://djavax.net/]{}}}{{{}.debug=all{}}}* observed 
client server communication and figured out  
[https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java]
 is where Client gets setup and it happens from the operator side 
[https://github.com/apache/flink-kubernetes-operator/blob/b081b75b72ddde643710e869b95b214912882363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L750]
 (correct me here please)
 
When we enable both mTLS and HA the client doesn't seem to be getting setup. 
Not only that, it doesn't follow the same path of client creation. Below is the 
part of the ssl handshake log before getting the error (attached the entire ssl 
handshake log):
{code:java}
javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
15:16:12.508 GMT|null:-1|Produced CertificateRequest handshake message (
"CertificateRequest":
{ "certificate types": [ecdsa_sign, rsa_sign, dss_sign] "supported signature 
algorithms": [ecdsa_secp256r1_sha256, .., rsa_sha224, dsa_sha224, ecdsa_sha1, 
rsa_pkcs1_sha1, dsa_sha1] "certificate authorities": [CN=FlinkCA, O=Apache 
Flink] }
javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
15:16:12.512 GMT|null:-1|Raw read (
: 1603030007 0B 0300 
)
javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
15:16:12.513 GMT|null:-1|READ: TLSv1.2 handshake, length = 7
javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
15:16:12.513 GMT|null:-1|Consuming client Certificate handshake message (
"Certificates": 
)
javax.net.ssl|ERROR|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
15:16:12.514 GMT|null:-1|Fatal (BAD_CERTIFICATE): Empty server certificate 
chain (
"throwable" : {
javax.net.ssl.SSLHandshakeException: Empty server certificate chain
{code}
>From the initial looks it seems when Flink server is requesting for 
>certificates from Client, the client doesn't send anything back since it does 
>not have certificates  matching the CA?
 
Some client is sending a REST request to Flink server which the netty library 
is handling but until we figure out the client we don't know whether it's the 
truststore on client that's a problem or something else we don't see here. 
Thanks!

  was:
Hi, in my kubernetes cluster I have flink-kubernetes-operator v1.7.0 and 
apache-flink v1.18.1 installed. In the FlinkDeployment CR when I enable 
Kubernetes high availability services with mTLS something like below:
```
high-availability.type: kubernetes
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: 'file:///mnt/pv/ha'
security.ssl.rest.authentication-enabled: 'true'
```
I am ending up with `SSLHandshakeException with empty client certificate` .
 
Though both of them work fine when implemented individually. Upon enabling  
`{{{}-{}}}{{{}[Djavax.net|http://djavax.net/]{}}}{{{}.debug=all`{}}} observed 
client server communication and figured out  
[https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java]
 is where Client gets setup and it happens from the operator side 
[https://github.com/apache/flink-kubernetes-operator/blob/b081b75b72ddde643710e869b95b214912882363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L750]
 (correct me here please)
 
When we enable both mTLS and HA the client doesn't seem to be getting setup. 
Not only that, it doesn't follow the same path of client creation. Below is the 
part of the ssl handshake log before getting the error:
```
javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
15:16:12.508 GMT|null:-1|Produced CertificateRequest handshake message (
"CertificateRequest": {
"certificate types": [ecdsa_sign, rsa_sign, dss_sign]
"supported signature algorithms": [ecdsa_secp256r1_sha256, .., rsa_sha224, 
dsa_sha224, ecdsa_sha1, rsa_pkcs1_sha1, ds

[jira] [Updated] (FLINK-36371) Field BinaryStringData as keySelector would lead the same records hash shuffle into different tumble windows

2024-09-25 Thread Eason Ye (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eason Ye updated FLINK-36371:
-
Description: 
The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap 
--(keyby)–> TumbleProcessWindow -> sink.

The Source would emit two fields: UserName String, and Amount Integer.

FlatMap collects the BinaryRowData type record and emits it to the 
TumbleProcessWindow operator.

KeySelector is using the UserName BinaryStringData. key by code is : 
{code:java}
.keyBy(rowData -> rowData.getString(0)){code}
The window result is unexpected, the same username records arrived at 
TumbleProcessWindow simultaneously, but these records were calculated in the 
different windows.  

When I use the below keyBy, the window result is correct.
{code:java}
.keyBy(rowData -> rowData.getString(0).toString()){code}
 

Reproduction Code:
{code:java}
env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
sourceName())
         .flatMap(new FlatMapFunction() {
public void flatMap(RowData value, Collector out) throws 
Exception {
System.out.println("is instanceof BinaryRowData =" + (value 
instanceof BinaryRowData));
out.collect(value);
}
})
.keyBy((KeySelector) value -> value.getString(0))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new FlinkTumbleWindowingProcessFunction())
.print(); 

public static class FlinkTumbleWindowingProcessFunction extends 
ProcessWindowFunction {
int subtask = -1;

public void open(Configuration parameters) throws Exception {
super.open(parameters);
subtask = getRuntimeContext().getIndexOfThisSubtask();
}

public void process(
StringData s,
ProcessWindowFunction.Context context,
Iterable elements,
Collector out) throws Exception {
System.out.printf(
"subtask = %d, s = %s, elements = %s%n",
subtask,
s,
elements);
}
}

{code}
 

 

 

 

 

  was:
The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap 
--(keyby)–> TumbleProcessWindow -> sink.

The Source would emit two fields: UserName String, and Amount Integer.

FlatMap collects the BinaryRowData type record and emits it to the 
TumbleProcessWindow operator.

KeySelector is using the UserName BinaryStringData. key by code is : 
{code:java}
.keyBy(rowData -> rowData.getString(0)){code}
The window result is unexpected, the same username records arrived at 
TumbleProcessWindow simultaneously, but these records were calculated in the 
different windows.  

When I use the below keyBy, the window result is correct.
{code:java}
.keyBy(rowData -> rowData.getString(0).toString()){code}
 

Reproduction:

 
{code:java}
env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
sourceName())
         .flatMap(new FlatMapFunction() {
public void flatMap(RowData value, Collector out) throws 
Exception {
System.out.println("is instanceof BinaryRowData =" + (value 
instanceof BinaryRowData));
out.collect(value);
}
})
.keyBy((KeySelector) value -> value.getString(0))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new FlinkTumbleWindowingProcessFunction())
.print(); {code}
 

 

 

 

 


> Field BinaryStringData as keySelector would lead the same records hash 
> shuffle into different tumble windows
> 
>
> Key: FLINK-36371
> URL: https://issues.apache.org/jira/browse/FLINK-36371
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 2.0.0
>Reporter: Eason Ye
>Priority: Major
>
> The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap 
> --(keyby)–> TumbleProcessWindow -> sink.
> The Source would emit two fields: UserName String, and Amount Integer.
> FlatMap collects the BinaryRowData type record and emits it to the 
> TumbleProcessWindow operator.
> KeySelector is using the UserName BinaryStringData. key by code is : 
> {code:java}
> .keyBy(rowData -> rowData.getString(0)){code}
> The window result is unexpected, the same username records arrived at 
> TumbleProcessWindow simultaneously, but these records were calculated in the 
> different windows.  
> When I use the below keyBy, the window result is correct.
> {code:java}
> .keyBy(rowData -> rowData.getString(0).toString()){code}
>  
> Reproduction Code:
> {code:java}
> env.fromSource(
> kafkaSource,
> W

[jira] [Created] (FLINK-36371) Field BinaryStringData as keySelector would lead the same records hash shuffle into different tumble windows

2024-09-25 Thread Eason Ye (Jira)
Eason Ye created FLINK-36371:


 Summary: Field BinaryStringData as keySelector would lead the same 
records hash shuffle into different tumble windows
 Key: FLINK-36371
 URL: https://issues.apache.org/jira/browse/FLINK-36371
 Project: Flink
  Issue Type: Improvement
  Components: API / Type Serialization System
Affects Versions: 2.0.0
Reporter: Eason Ye


The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap 
--(keyby)–> TumbleProcessWindow -> sink.

The Source would emit two fields: UserName String, and Amount Integer.

FlatMap collects the BinaryRowData type record and emits it to the 
TumbleProcessWindow operator.

KeySelector is using the UserName BinaryStringData. key by code is : 
{code:java}
.keyBy(rowData -> rowData.getString(0)){code}
The window result is unexpected, the same username records arrived at 
TumbleProcessWindow simultaneously, but these records were calculated in the 
different windows.  

When I use the below keyBy, the window result is correct.
{code:java}
.keyBy(rowData -> rowData.getString(0).toString()){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36371) Field BinaryStringData as keySelector would lead the same records hash shuffle into different tumble windows

2024-09-25 Thread Eason Ye (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eason Ye updated FLINK-36371:
-
Description: 
The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap 
--(keyby)–> TumbleProcessWindow -> sink.

The Source would emit two fields: UserName String, and Amount Integer.

FlatMap collects the BinaryRowData type record and emits it to the 
TumbleProcessWindow operator.

KeySelector is using the UserName BinaryStringData. key by code is : 
{code:java}
.keyBy(rowData -> rowData.getString(0)){code}
The window result is unexpected, the same username records arrived at 
TumbleProcessWindow simultaneously, but these records were calculated in the 
different windows.  

When I use the below keyBy, the window result is correct.
{code:java}
.keyBy(rowData -> rowData.getString(0).toString()){code}
 

Reproduction:

 
{code:java}
env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
sourceName())
         .flatMap(new FlatMapFunction() {
public void flatMap(RowData value, Collector out) throws 
Exception {
System.out.println("is instanceof BinaryRowData =" + (value 
instanceof BinaryRowData));
out.collect(value);
}
})
.keyBy((KeySelector) value -> value.getString(0))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new FlinkTumbleWindowingProcessFunction())
.print(); {code}
 

 

 

 

 

  was:
The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap 
--(keyby)–> TumbleProcessWindow -> sink.

The Source would emit two fields: UserName String, and Amount Integer.

FlatMap collects the BinaryRowData type record and emits it to the 
TumbleProcessWindow operator.

KeySelector is using the UserName BinaryStringData. key by code is : 
{code:java}
.keyBy(rowData -> rowData.getString(0)){code}
The window result is unexpected, the same username records arrived at 
TumbleProcessWindow simultaneously, but these records were calculated in the 
different windows.  

When I use the below keyBy, the window result is correct.
{code:java}
.keyBy(rowData -> rowData.getString(0).toString()){code}


> Field BinaryStringData as keySelector would lead the same records hash 
> shuffle into different tumble windows
> 
>
> Key: FLINK-36371
> URL: https://issues.apache.org/jira/browse/FLINK-36371
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 2.0.0
>Reporter: Eason Ye
>Priority: Major
>
> The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap 
> --(keyby)–> TumbleProcessWindow -> sink.
> The Source would emit two fields: UserName String, and Amount Integer.
> FlatMap collects the BinaryRowData type record and emits it to the 
> TumbleProcessWindow operator.
> KeySelector is using the UserName BinaryStringData. key by code is : 
> {code:java}
> .keyBy(rowData -> rowData.getString(0)){code}
> The window result is unexpected, the same username records arrived at 
> TumbleProcessWindow simultaneously, but these records were calculated in the 
> different windows.  
> When I use the below keyBy, the window result is correct.
> {code:java}
> .keyBy(rowData -> rowData.getString(0).toString()){code}
>  
> Reproduction:
>  
> {code:java}
> env.fromSource(
> kafkaSource,
> WatermarkStrategy.noWatermarks(),
> sourceName())
>          .flatMap(new FlatMapFunction() {
> public void flatMap(RowData value, Collector out) throws 
> Exception {
> System.out.println("is instanceof BinaryRowData =" + (value 
> instanceof BinaryRowData));
> out.collect(value);
> }
> })
> .keyBy((KeySelector) value -> value.getString(0))
> .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
> .process(new FlinkTumbleWindowingProcessFunction())
> .print(); {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36369][table] Move deprecated user-visible classes in table modules to the legacy package [flink]

2024-09-25 Thread via GitHub


xuyangzhong commented on PR #25400:
URL: https://github.com/apache/flink/pull/25400#issuecomment-2374330934

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36370) Flink 1.18 fails with Empty server certificate chain when High Availability and mTLS both enabled

2024-09-25 Thread Aniruddh J (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aniruddh J updated FLINK-36370:
---
Priority: Major  (was: Minor)

> Flink 1.18 fails with Empty server certificate chain when High Availability 
> and mTLS both enabled
> -
>
> Key: FLINK-36370
> URL: https://issues.apache.org/jira/browse/FLINK-36370
> Project: Flink
>  Issue Type: Bug
>Reporter: Aniruddh J
>Priority: Major
> Attachments: flink-cert-issue.log
>
>
> Hi, in my kubernetes cluster I have flink-kubernetes-operator v1.7.0 and 
> apache-flink v1.18.1 installed. In the FlinkDeployment CR when I enable 
> Kubernetes high availability services with mTLS something like below:
> {code:java}
> high-availability.type: kubernetes
> high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: 'file:///mnt/pv/ha'
> security.ssl.rest.authentication-enabled: 'true'{code}
> I am ending up with *SSLHandshakeException with empty client certificate*
>  
> Though both of them work fine when implemented individually. Upon enabling  
> *{{{}-{}}}{{{}[Djavax.net|http://djavax.net/]{}}}{{{}.debug=all{}}}* observed 
> client server communication and figured out  
> [https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java]
>  is where Client gets setup and it happens from the operator side 
> [https://github.com/apache/flink-kubernetes-operator/blob/b081b75b72ddde643710e869b95b214912882363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L750]
>  (correct me here please)
>  
> When we enable both mTLS and HA the client doesn't seem to be getting setup. 
> Not only that, it doesn't follow the same path of client creation. Below is 
> the part of the ssl handshake log before getting the error (attached the 
> entire ssl handshake log):
> {code:java}
> javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
> 15:16:12.508 GMT|null:-1|Produced CertificateRequest handshake message (
> "CertificateRequest":
> { "certificate types": [ecdsa_sign, rsa_sign, dss_sign] "supported signature 
> algorithms": [ecdsa_secp256r1_sha256, .., rsa_sha224, dsa_sha224, ecdsa_sha1, 
> rsa_pkcs1_sha1, dsa_sha1] "certificate authorities": [CN=FlinkCA, O=Apache 
> Flink] }
> javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
> 15:16:12.512 GMT|null:-1|Raw read (
> : 1603030007 0B 0300 
> )
> javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
> 15:16:12.513 GMT|null:-1|READ: TLSv1.2 handshake, length = 7
> javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
> 15:16:12.513 GMT|null:-1|Consuming client Certificate handshake message (
> "Certificates": 
> )
> javax.net.ssl|ERROR|53|flink-rest-server-netty-worker-thread-1|2024-09-19 
> 15:16:12.514 GMT|null:-1|Fatal (BAD_CERTIFICATE): Empty server certificate 
> chain (
> "throwable" : {
> javax.net.ssl.SSLHandshakeException: Empty server certificate chain
> {code}
> From the initial looks it seems when Flink server is requesting for 
> certificates from Client, the client doesn't send anything back since it does 
> not have certificates  matching the CA?
>  
> Some client is sending a REST request to Flink server which the netty library 
> is handling but until we figure out the client we don't know whether it's the 
> truststore on client that's a problem or something else we don't see here. 
> Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [DRAFT][FLINK-29741][scala] Remove flink-dist-scala, flink-scala, flink-streaming-scala module [flink]

2024-09-25 Thread via GitHub


codenohup commented on PR #25393:
URL: https://github.com/apache/flink/pull/25393#issuecomment-2373796766

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33977][runtime] Adaptive scheduler may not minimize the number of TMs during downscaling [flink]

2024-09-25 Thread via GitHub


RocMarshal commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1775043513


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##
@@ -43,35 +47,43 @@ public Collection assignSlots(
 Collection freeSlots,
 VertexParallelism vertexParallelism,
 JobAllocationsInformation previousAllocations) {
-List allGroups = new ArrayList<>();
+checkSlotsSufficient(jobInformation, freeSlots);
+
+final List allGroups = new ArrayList<>();
 for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
 
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
 }
 
-Iterator iterator = freeSlots.iterator();
+Collection pickedSlots = freeSlots;
+if (freeSlots.size() > allGroups.size()) {

Review Comment:
   Thanks for the comments.
   
   >Why do we need this condition? To avoid the extra effort of sorting the TMs 
based on their offered slot count? If yes, could we add this as a comment to 
underline the intention?
   
   Yes, you are right. I'd like to update it.
   
   >On the other hand, is it worth to save the sorting efforts if 
freeSlots.size() == allGroups.size()? 🤔 I guess it would make sense for big 
amounts of TaskManagers. 
   
   Yes, so we keep the condition `if (freeSlots.size() > allGroups.size()) {` , 
which means that this condition you described would not process the sort-logic.
   
   > I don't know the limits we have on that front in Flink at the moment. 
   In my limited read, nothing limited about it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >