Re: [PR] [FLINK-29741][scala] Delete flink-scala and flink-streaming-scala modules [flink]
reswqa merged PR #25393: URL: https://github.com/apache/flink/pull/25393 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36375) Missing default value in AddColumnEvent
[ https://issues.apache.org/jira/browse/FLINK-36375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36375: --- Labels: pull-request-available (was: ) > Missing default value in AddColumnEvent > --- > > Key: FLINK-36375 > URL: https://issues.apache.org/jira/browse/FLINK-36375 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: linqigeng >Priority: Major > Labels: pull-request-available > > SchemaOperator receives an AddColumnEvent, the default value will be lost > when sent downstream. > !https://static.dingtalk.com/media/lQLPJwNB2rjdYf_Mxc0Gi7B9jRZh-ndCUAbcobnpdTcA_1675_197.png|width=1675,height=197! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent [flink-cdc]
qg-lin commented on PR #3622: URL: https://github.com/apache/flink-cdc/pull/3622#issuecomment-2375891374 @lvyanquan @yuxiqian PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36374) Bundle forst statebackend in flink-dist and provide shortcut to enable
[ https://issues.apache.org/jira/browse/FLINK-36374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan reassigned FLINK-36374: --- Assignee: Zakelly Lan > Bundle forst statebackend in flink-dist and provide shortcut to enable > -- > > Key: FLINK-36374 > URL: https://issues.apache.org/jira/browse/FLINK-36374 > Project: Flink > Issue Type: Sub-task >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > > Currently, the forst statebackend are built under flink-statebackend-forst, > but is not included in the flink-dist jar. It is better to provide a same > distribution way like rocksdb. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36374) Bundle forst statebackend in flink-dist and provide shortcut to enable
Zakelly Lan created FLINK-36374: --- Summary: Bundle forst statebackend in flink-dist and provide shortcut to enable Key: FLINK-36374 URL: https://issues.apache.org/jira/browse/FLINK-36374 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Currently, the forst statebackend are built under flink-statebackend-forst, but is not included in the flink-dist jar. It is better to provide a same distribution way like rocksdb. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [minor][cdc-connector][mongodb] Add `op_type` metadata column [flink-cdc]
qg-lin commented on PR #3615: URL: https://github.com/apache/flink-cdc/pull/3615#issuecomment-2375852444 Since DeduplicateFunctionHelper#processLastRowOnChangelog returns the row before deletion, the `delete` type is temporarily missing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36375) Missing default value in AddColumnEvent
linqigeng created FLINK-36375: - Summary: Missing default value in AddColumnEvent Key: FLINK-36375 URL: https://issues.apache.org/jira/browse/FLINK-36375 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.2.0 Reporter: linqigeng SchemaOperator receives an AddColumnEvent, the default value will be lost when sent downstream. !https://static.dingtalk.com/media/lQLPJwNB2rjdYf_Mxc0Gi7B9jRZh-ndCUAbcobnpdTcA_1675_197.png|width=1675,height=197! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36374] Bundle forst statebackend in flink-dist and provide shortcut to enable [flink]
Zakelly opened a new pull request, #25402: URL: https://github.com/apache/flink/pull/25402 ## What is the purpose of the change Currently, the forst statebackend are built under `flink-statebackend-forst`, but is not included in the `flink-dist`'s jar. This PR make forst statebackend bundled in `flink-dist`, and provide a shortcut to load forst. ## Brief change log - Add forst in `flink-dist`'s pom file - Add shortcut in `StateBackendLoader`. ## Verifying this change Modified `ForStStateBackendFactoryTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36374) Bundle forst statebackend in flink-dist and provide shortcut to enable
[ https://issues.apache.org/jira/browse/FLINK-36374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36374: --- Labels: pull-request-available (was: ) > Bundle forst statebackend in flink-dist and provide shortcut to enable > -- > > Key: FLINK-36374 > URL: https://issues.apache.org/jira/browse/FLINK-36374 > Project: Flink > Issue Type: Sub-task >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > > Currently, the forst statebackend are built under flink-statebackend-forst, > but is not included in the flink-dist jar. It is better to provide a same > distribution way like rocksdb. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36374][state/forst] Bundle forst statebackend in flink-dist and provide shortcut to enable [flink]
flinkbot commented on PR #25402: URL: https://github.com/apache/flink/pull/25402#issuecomment-2375878995 ## CI report: * 3b63165442e17ced06c405d00f575fa02307cb2d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Flink 36245] Remove legacy SourceFunction / SinkFunction / Sink V1 API in 2.0 [flink]
lvyanquan commented on PR #25331: URL: https://github.com/apache/flink/pull/25331#issuecomment-2375887294 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent [flink-cdc]
qg-lin opened a new pull request, #3622: URL: https://github.com/apache/flink-cdc/pull/3622 SchemaOperator receives an AddColumnEvent, the default value will be lost when sent downstream. JIRA: [https://issues.apache.org/jira/browse/FLINK-36375](url) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36375) Missing default value in AddColumnEvent/RenameColumnEvent
[ https://issues.apache.org/jira/browse/FLINK-36375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] linqigeng updated FLINK-36375: -- Description: SchemaOperator receives an AddColumnEvent or RenameColumnEvent, the default value will be lost when sent downstream. !https://static.dingtalk.com/media/lQLPJwNB2rjdYf_Mxc0Gi7B9jRZh-ndCUAbcobnpdTcA_1675_197.png|width=1675,height=197! was: SchemaOperator receives an AddColumnEvent, the default value will be lost when sent downstream. !https://static.dingtalk.com/media/lQLPJwNB2rjdYf_Mxc0Gi7B9jRZh-ndCUAbcobnpdTcA_1675_197.png|width=1675,height=197! Summary: Missing default value in AddColumnEvent/RenameColumnEvent (was: Missing default value in AddColumnEvent) > Missing default value in AddColumnEvent/RenameColumnEvent > - > > Key: FLINK-36375 > URL: https://issues.apache.org/jira/browse/FLINK-36375 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: linqigeng >Priority: Major > Labels: pull-request-available > > SchemaOperator receives an AddColumnEvent or RenameColumnEvent, the default > value will be lost when sent downstream. > !https://static.dingtalk.com/media/lQLPJwNB2rjdYf_Mxc0Gi7B9jRZh-ndCUAbcobnpdTcA_1675_197.png|width=1675,height=197! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35709][table-planner] Allow reordering source columns in CTAS and RTAS [flink]
spena opened a new pull request, #25401: URL: https://github.com/apache/flink/pull/25401 ## What is the purpose of the change This PR allows you to use CTAS to reorder the columns defined in the `SELECT` part by specifying all column names without data types in the `CREATE` part. This feature is equivalent to the `INSERT INTO` statement. Example: ```sql CREATE TABLE my_ctas_table ( order_time, price, quantity, id ) WITH ( 'connector' = 'kafka', ... ) AS SELECT id, price, quantity, order_time FROM source_table; ``` The resulting table `my_ctas_table` will be equivalent to create the following table and insert the data with the following statement: ``` CREATE TABLE my_ctas_table ( order_time TIMESTAMP(3), price DOUBLE, quantity DOUBLE, id BIGINT ) WITH ( 'connector' = 'kafka', ... ); INSERT INTO my_ctas_table (order_time, price, quantity, id) SELECT id, price, quantity, order_time FROM source_table; ``` ## Verifying this change This change added tests and can be verified as follows: - Added unit tests and integration tests to verify the feature ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? Updated the `create.md` documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35709][table-planner] Allow reordering source columns in CTAS and RTAS [flink]
flinkbot commented on PR #25401: URL: https://github.com/apache/flink/pull/25401#issuecomment-2375351231 ## CI report: * dfb06e8b80b48a4c25a0c50a1fb3841426ccb7e5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36292][Connectors/Common]: remove timeout to avoid timeout exception [flink]
showuon commented on code in PR #25371: URL: https://github.com/apache/flink/pull/25371#discussion_r1776302945 ## flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java: ## @@ -71,7 +71,7 @@ public void testCloseFetcherWithException() throws Exception { .hasRootCauseMessage("Artificial exception on closing the split reader."); } -@Test(timeout = 3) +@Test Review Comment: Sorry @becketqin , I misunderstood your comment. I thought you were asking me to change this: ``` @Test(timeout = Long.MAX_VALUE) ``` My bad! Just read it again, I understand what you said now. We should set the max timeout on the `close` method, otherwise, it will only log a warning and the test passes. So everything makes sense now. Just updated the PR. Please take a look again. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30614][serializer] Remove old method of resolving schema compa… [flink]
Zakelly commented on PR #25384: URL: https://github.com/apache/flink/pull/25384#issuecomment-2375806647 I'd suggest removing `GenericArraySerializerConfigSnapshot` and `EitherSerializerSnapshot` with another commit by this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36372) Can not insert data to sink with reserved keywords column name
Tan Nguyen created FLINK-36372: -- Summary: Can not insert data to sink with reserved keywords column name Key: FLINK-36372 URL: https://issues.apache.org/jira/browse/FLINK-36372 Project: Flink Issue Type: Bug Reporter: Tan Nguyen Attachments: image-2024-09-26-10-33-37-134.png, image-2024-09-26-10-40-07-605.png, image-2024-09-26-10-41-10-327.png, image-2024-09-26-10-48-11-315.png Hi teams, I created a flink application running as batch mode, we read data from kafka then we run the query transform data from sql file. Finally execute insert query from sql file to sink data. Here the insert query: !image-2024-09-26-10-33-37-134.png! Even I use the backticks to escape the reserved keywords, still failed, here the error: {code:java} // code placeholder java.lang.RuntimeException: Writing records to JDBC failed.at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:265)at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:70)at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:115)at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1062)at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:930)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:813)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)at java.base/java.lang.Thread.run(Thread.java:829)Caused by: java.io.IOException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO native_program_domain.member_program_aux_data(id, member_program_id, program_provider_id, enrollment_event_type, cancellation_reason, member_start_date, house_hold_id, enrollment_source, plan_contract_id, protocol, enrollment_priority, member_disenroll_date, pbp, hras_needed, hra_due_date, completion_status, hra_completion_method, hra_completion_description, enrollment_lob, clock_start_date, csp_group_id, csp_plan_id, group, segment_id, division, created_by, created_date, last_modified_by, last_modified_date) VALUES (('3319eb7d-e247-4eec-9c3e-e0ba9ee5b7aa'), ('5e4fd136-2b14-3b63-9b9c-e3c1af424c55'), (NULL), ('R'), (NULL), ('20140608'), (NULL), (NULL), ('CONT-8'), ('PR01'), (NULL), (NULL), ('PBP-0008'), ('FALSE'::boolean), (NULL), (NULL), (NULL), ('NextGen'), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL)) ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, member_program_id=EXCLUDED.member_program_id, program_provider_id=EXCLUDED.program_provider_id, enrollment_event_type=EXCLUDED.enrollment_event_type, cancellation_reason=EXCLUDED.cancellation_reason, member_start_date=EXCLUDED.member_start_date, house_hold_id=EXCLUDED.house_hold_id, enrollment_source=EXCLUDED.enrollment_source, plan_contract_id=EXCLUDED.plan_contract_id, protocol=EXCLUDED.protocol, enrollment_priority=EXCLUDED.enrollment_priority, member_disenroll_date=EXCLUDED.member_disenroll_date, pbp=EXCLUDED.pbp, hras_needed=EXCLUDED.hras_needed, hra_due_date=EXCLUDED.hra_due_date, completion_status=EXCLUDED.completion_status, hra_completion_method=EXCLUDED.hra_completion_method, hra_completion_description=EXCLUDED.hra_completion_description, enrollment_lob=EXCLUDED.enrollment_lob, clock_start_date=EXCLUDED.clock_start_date, csp_group_id=EXCLUDED.csp_group_id, csp_plan_id=EXCLUDED.csp_plan_id, group=EXCLUDED.group, segment_id=EXCLUDED.segment_id, division=EXCLUDED.division, created_by=EXCLUDED.created_by, created_date=EXCLUDED.created_date, last_modified_by=EXCLUDED.last_modified_by, last_modified_date=EXCLUDED.last_modified_date was aborted: ERROR: syntax error at or near "group"Position: 435 Call getNextException to see other errors in the batch.at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:262)... 13 moreCaused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO native_program_domain.member_program_aux_data(id, member_program_id, program_provider_id, enrollment_event_type, cancellation_reason, member_start_date, house_hold_id, enrollment_source, plan_contract_id, protocol, enr
[jira] [Updated] (FLINK-36372) Can not insert data to sink with reserved keywords column name
[ https://issues.apache.org/jira/browse/FLINK-36372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tan Nguyen updated FLINK-36372: --- Description: Hi teams, I created a flink application running as batch mode, we read data from kafka then we run the query transform data from sql file. Finally execute insert query from sql file to sink data. Here the insert query: !image-2024-09-26-10-33-37-134.png! Even I use the backticks to escape the reserved keywords, still failed, here the error: {code:java} // code placeholder java.lang.RuntimeException: Writing records to JDBC failed.at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:265)at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:70)at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:115)at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1062)at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:930)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:813)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)at java.base/java.lang.Thread.run(Thread.java:829)Caused by: java.io.IOException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO native_program_domain.member_program_aux_data(id, member_program_id, program_provider_id, enrollment_event_type, cancellation_reason, member_start_date, house_hold_id, enrollment_source, plan_contract_id, protocol, enrollment_priority, member_disenroll_date, pbp, hras_needed, hra_due_date, completion_status, hra_completion_method, hra_completion_description, enrollment_lob, clock_start_date, csp_group_id, csp_plan_id, group, segment_id, division, created_by, created_date, last_modified_by, last_modified_date) VALUES (('3319eb7d-e247-4eec-9c3e-e0ba9ee5b7aa'), ('5e4fd136-2b14-3b63-9b9c-e3c1af424c55'), (NULL), ('R'), (NULL), ('20140608'), (NULL), (NULL), ('CONT-8'), ('PR01'), (NULL), (NULL), ('PBP-0008'), ('FALSE'::boolean), (NULL), (NULL), (NULL), ('NextGen'), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL)) ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, member_program_id=EXCLUDED.member_program_id, program_provider_id=EXCLUDED.program_provider_id, enrollment_event_type=EXCLUDED.enrollment_event_type, cancellation_reason=EXCLUDED.cancellation_reason, member_start_date=EXCLUDED.member_start_date, house_hold_id=EXCLUDED.house_hold_id, enrollment_source=EXCLUDED.enrollment_source, plan_contract_id=EXCLUDED.plan_contract_id, protocol=EXCLUDED.protocol, enrollment_priority=EXCLUDED.enrollment_priority, member_disenroll_date=EXCLUDED.member_disenroll_date, pbp=EXCLUDED.pbp, hras_needed=EXCLUDED.hras_needed, hra_due_date=EXCLUDED.hra_due_date, completion_status=EXCLUDED.completion_status, hra_completion_method=EXCLUDED.hra_completion_method, hra_completion_description=EXCLUDED.hra_completion_description, enrollment_lob=EXCLUDED.enrollment_lob, clock_start_date=EXCLUDED.clock_start_date, csp_group_id=EXCLUDED.csp_group_id, csp_plan_id=EXCLUDED.csp_plan_id, group=EXCLUDED.group, segment_id=EXCLUDED.segment_id, division=EXCLUDED.division, created_by=EXCLUDED.created_by, created_date=EXCLUDED.created_date, last_modified_by=EXCLUDED.last_modified_by, last_modified_date=EXCLUDED.last_modified_date was aborted: ERROR: syntax error at or near "group"Position: 435 Call getNextException to see other errors in the batch.at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:262)... 13 moreCaused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO native_program_domain.member_program_aux_data(id, member_program_id, program_provider_id, enrollment_event_type, cancellation_reason, member_start_date, house_hold_id, enrollment_source, plan_contract_id, protocol, enrollment_priority, member_disenroll_date, pbp, hras_needed, hra_due_date, completion_status, hra_completion_method, hra_completion_description, enrollment_lob, clock_start_date, csp_group_id, csp_plan_id, group, segment_id, division, created_by, created_date, last_modified_by, last_modified_date
[jira] [Updated] (FLINK-36372) Can not insert data to sink with reserved keywords column name
[ https://issues.apache.org/jira/browse/FLINK-36372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tan Nguyen updated FLINK-36372: --- Description: Hi teams, I created a flink application running as batch mode, we read data from kafka then we run the query transform data from sql file. Finally execute insert query from sql file to sink data. Here the insert query: !image-2024-09-26-10-33-37-134.png! Even I use the backticks to escape the reserved keywords, still failed, here the error: {code:java} // code placeholder java.lang.RuntimeException: Writing records to JDBC failed.at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:265)at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:70)at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:115)at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1062)at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:930)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:813)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)at java.base/java.lang.Thread.run(Thread.java:829)Caused by: java.io.IOException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO native_program_domain.member_program_aux_data(id, member_program_id, program_provider_id, enrollment_event_type, cancellation_reason, member_start_date, house_hold_id, enrollment_source, plan_contract_id, protocol, enrollment_priority, member_disenroll_date, pbp, hras_needed, hra_due_date, completion_status, hra_completion_method, hra_completion_description, enrollment_lob, clock_start_date, csp_group_id, csp_plan_id, group, segment_id, division, created_by, created_date, last_modified_by, last_modified_date) VALUES (('3319eb7d-e247-4eec-9c3e-e0ba9ee5b7aa'), ('5e4fd136-2b14-3b63-9b9c-e3c1af424c55'), (NULL), ('R'), (NULL), ('20140608'), (NULL), (NULL), ('CONT-8'), ('PR01'), (NULL), (NULL), ('PBP-0008'), ('FALSE'::boolean), (NULL), (NULL), (NULL), ('NextGen'), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL), (NULL)) ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, member_program_id=EXCLUDED.member_program_id, program_provider_id=EXCLUDED.program_provider_id, enrollment_event_type=EXCLUDED.enrollment_event_type, cancellation_reason=EXCLUDED.cancellation_reason, member_start_date=EXCLUDED.member_start_date, house_hold_id=EXCLUDED.house_hold_id, enrollment_source=EXCLUDED.enrollment_source, plan_contract_id=EXCLUDED.plan_contract_id, protocol=EXCLUDED.protocol, enrollment_priority=EXCLUDED.enrollment_priority, member_disenroll_date=EXCLUDED.member_disenroll_date, pbp=EXCLUDED.pbp, hras_needed=EXCLUDED.hras_needed, hra_due_date=EXCLUDED.hra_due_date, completion_status=EXCLUDED.completion_status, hra_completion_method=EXCLUDED.hra_completion_method, hra_completion_description=EXCLUDED.hra_completion_description, enrollment_lob=EXCLUDED.enrollment_lob, clock_start_date=EXCLUDED.clock_start_date, csp_group_id=EXCLUDED.csp_group_id, csp_plan_id=EXCLUDED.csp_plan_id, group=EXCLUDED.group, segment_id=EXCLUDED.segment_id, division=EXCLUDED.division, created_by=EXCLUDED.created_by, created_date=EXCLUDED.created_date, last_modified_by=EXCLUDED.last_modified_by, last_modified_date=EXCLUDED.last_modified_date was aborted: ERROR: syntax error at or near "group"Position: 435 Call getNextException to see other errors in the batch.at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:262)... 13 moreCaused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO native_program_domain.member_program_aux_data(id, member_program_id, program_provider_id, enrollment_event_type, cancellation_reason, member_start_date, house_hold_id, enrollment_source, plan_contract_id, protocol, enrollment_priority, member_disenroll_date, pbp, hras_needed, hra_due_date, completion_status, hra_completion_method, hra_completion_description, enrollment_lob, clock_start_date, csp_group_id, csp_plan_id, group, segment_id, division, created_by, created_date, last_modified_by, last_modified_date
[jira] [Created] (FLINK-36373) Support distribute state requests in task thread
Yanfei Lei created FLINK-36373: -- Summary: Support distribute state requests in task thread Key: FLINK-36373 URL: https://issues.apache.org/jira/browse/FLINK-36373 Project: Flink Issue Type: Sub-task Reporter: Yanfei Lei -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36364) Do not reuse serialized key in Forst map state and/or other namespaces
[ https://issues.apache.org/jira/browse/FLINK-36364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884842#comment-17884842 ] Zakelly Lan commented on FLINK-36364: - Merge 82582b3a7b75b7ffae9d48895088189b1324caa3 into master > Do not reuse serialized key in Forst map state and/or other namespaces > -- > > Key: FLINK-36364 > URL: https://issues.apache.org/jira/browse/FLINK-36364 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35928) ForSt supports compiling with RocksDB
[ https://issues.apache.org/jira/browse/FLINK-35928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan reassigned FLINK-35928: --- Assignee: Yanfei Lei (was: Hangxiang Yu) > ForSt supports compiling with RocksDB > - > > Key: FLINK-35928 > URL: https://issues.apache.org/jira/browse/FLINK-35928 > Project: Flink > Issue Type: Sub-task >Reporter: Hangxiang Yu >Assignee: Yanfei Lei >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-36364) Do not reuse serialized key in Forst map state and/or other namespaces
[ https://issues.apache.org/jira/browse/FLINK-36364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan resolved FLINK-36364. - Fix Version/s: 2.0-preview Resolution: Fixed > Do not reuse serialized key in Forst map state and/or other namespaces > -- > > Key: FLINK-36364 > URL: https://issues.apache.org/jira/browse/FLINK-36364 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > Fix For: 2.0-preview > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36364][state/forst] Do not reuse serialized key in Forst map state and/or other namespaces [flink]
Zakelly merged PR #25394: URL: https://github.com/apache/flink/pull/25394 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35291] Improve the ROW data deserialization performance of DebeziumEventDeserializationScheme [flink-cdc]
github-actions[bot] commented on PR #3289: URL: https://github.com/apache/flink-cdc/pull/3289#issuecomment-2375474952 This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-21820) JDBC connector shouldn't read all rows in per statement by default
[ https://issues.apache.org/jira/browse/FLINK-21820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884829#comment-17884829 ] zhanghuaibei commented on FLINK-21820: -- h4. [~leonard] [~jark] [~yangsanity] [~nicholasjiang] Is fetchsize not effective yet? In version 3.2 of mysqlcdc, usecursorfetch=true has been set, but when chunksize is set to 4, it still causes room error > JDBC connector shouldn't read all rows in per statement by default > -- > > Key: FLINK-21820 > URL: https://issues.apache.org/jira/browse/FLINK-21820 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC, Table SQL / Ecosystem >Reporter: Leonard Xu >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > The default value for JDBC option 'scan.fetch-size' is 0 which means read all > rows in statement, this may lead to OOM or IO timeout. > We'd better set a reasonable value as default value. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36335][runtime] Improving Method Reusability in StreamGraphGenerator with JobVertexBuildContext [flink]
noorall commented on PR #25366: URL: https://github.com/apache/flink/pull/25366#issuecomment-2375736309 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36355][runtime] Remove deprecated KeydStream#asQueryableState [flink]
yunfengzhou-hub closed pull request #25391: [FLINK-36355][runtime] Remove deprecated KeydStream#asQueryableState URL: https://github.com/apache/flink/pull/25391 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36355][runtime] Remove deprecated KeydStream#asQueryableState [flink]
yunfengzhou-hub commented on PR #25391: URL: https://github.com/apache/flink/pull/25391#issuecomment-2376037028 According to offline discussion with @Zakelly, this component will temporarily not be removed until a new query state solution is proposed in Flink 2.x. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-36166) testJoinDisorderChangeLog failed on AZP
[ https://issues.apache.org/jira/browse/FLINK-36166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884867#comment-17884867 ] xuyang edited comment on FLINK-36166 at 9/26/24 6:37 AM: - Everything is working fine in my local environment. I will temporarily ignore this test to ensure the normal release candidate for version 2.0-preview. [~Weijie Guo] Can you assign this jira to me? I'll trace this test after 2.0-preview and before 2.0.0. was (Author: xuyangzhong): I'm not quite sure if this is related to 'replacing org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#addSource with the Java StreamExecutionEnvironment' in FLINK-36327. Everything is working fine in my local environment. I will temporarily ignore this test to ensure the normal release candidate for version 2.0-preview. [~Weijie Guo] Can you assign this jira to me? I'll trace this test after 2.0-preview and before 2.0.0. > testJoinDisorderChangeLog failed on AZP > --- > > Key: FLINK-36166 > URL: https://issues.apache.org/jira/browse/FLINK-36166 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 2.0-preview >Reporter: Weijie Guo >Priority: Critical > Labels: test-stability > > {code:java} > Aug 23 03:59:33 03:59:33.972 [ERROR] Failures: > Aug 23 03:59:33 03:59:33.972 [ERROR] > org.apache.flink.table.planner.runtime.stream.sql.TableSinkITCase.testJoinDisorderChangeLog > Aug 23 03:59:33 03:59:33.972 [ERROR] Run 1: > TableSinkITCase.testJoinDisorderChangeLog:119 > Aug 23 03:59:33 expected: List(+I[jason, 4, 22.5, 22]) > Aug 23 03:59:33 but was: ArrayBuffer(+U[jason, 4, 22.5, 22]) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61572&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=12536 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-36166) testJoinDisorderChangeLog failed on AZP
[ https://issues.apache.org/jira/browse/FLINK-36166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884867#comment-17884867 ] xuyang edited comment on FLINK-36166 at 9/26/24 6:14 AM: - I'm not quite sure if this is related to 'replacing org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#addSource with the Java StreamExecutionEnvironment' in FLINK-36327. Everything is working fine in my local environment. I will temporarily ignore this test to ensure the normal release candidate for version 2.0-preview. [~Weijie Guo] Can you assign this jira to me? I'll trace this test after 2.0-preview and before 2.0.0. was (Author: xuyangzhong): I'm not quite sure if this is related to 'replacing org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#addSource with the Java StreamExecutionEnvironment' in FLINK-36327. Everything is working fine in my local environment. I will temporarily ignore this test to ensure the normal release candidate for version 2.0-preview. > testJoinDisorderChangeLog failed on AZP > --- > > Key: FLINK-36166 > URL: https://issues.apache.org/jira/browse/FLINK-36166 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 2.0-preview >Reporter: Weijie Guo >Priority: Critical > Labels: test-stability > > {code:java} > Aug 23 03:59:33 03:59:33.972 [ERROR] Failures: > Aug 23 03:59:33 03:59:33.972 [ERROR] > org.apache.flink.table.planner.runtime.stream.sql.TableSinkITCase.testJoinDisorderChangeLog > Aug 23 03:59:33 03:59:33.972 [ERROR] Run 1: > TableSinkITCase.testJoinDisorderChangeLog:119 > Aug 23 03:59:33 expected: List(+I[jason, 4, 22.5, 22]) > Aug 23 03:59:33 but was: ArrayBuffer(+U[jason, 4, 22.5, 22]) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61572&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=12536 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent [flink-cdc]
qg-lin commented on PR #3622: URL: https://github.com/apache/flink-cdc/pull/3622#issuecomment-2376090149 > Thanks for @qg-lin's contribution! Seems lenientized `RenameColumnEvent` is also prone to this issue. Would you like to fix this in this PR? my pleasure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] FLINK-36332 Add option to select the Fabric8 httpclient implemention [flink-kubernetes-operator]
SamBarker commented on code in PR #881: URL: https://github.com/apache/flink-kubernetes-operator/pull/881#discussion_r1776061626 ## pom.xml: ## @@ -110,6 +110,7 @@ under the License. 10.15.2.0 +24.1.0 Review Comment: I've removed `@NotNull` from `KubernetesAutoScalerStateStore` as `@NonNull` is the default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] FLINK-36332 Add option to select the Fabric8 httpclient implemention [flink-kubernetes-operator]
SamBarker commented on code in PR #881: URL: https://github.com/apache/flink-kubernetes-operator/pull/881#discussion_r1776062522 ## .github/workflows/ci.yml: ## @@ -78,6 +79,7 @@ jobs: namespace: ["default","flink"] mode: ["native", "standalone"] java-version: [ 11, 17, 21 ] +http-client: [ "okhttp", "vertx", "jetty", "jdk" ] Review Comment: I think https://github.com/apache/flink-kubernetes-operator/pull/881/commits/7b74210e8d64acb881a268ac8f37271de155c060 does the job -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] FLINK-36332 Add option to select the Fabric8 httpclient implemention [flink-kubernetes-operator]
SamBarker commented on code in PR #881: URL: https://github.com/apache/flink-kubernetes-operator/pull/881#discussion_r1776062259 ## .github/workflows/ci.yml: ## @@ -47,7 +48,7 @@ jobs: ${{ runner.os }}-maven- - name: Build with Maven run: | - mvn -B clean install javadoc:javadoc -Pgenerate-docs + mvn -B clean install javadoc:javadoc -Pgenerate-docs -Dfabric8.httpclinent.impl=${{ matrix.httpclient }} Review Comment: Reverted in [57b341b](https://github.com/apache/flink-kubernetes-operator/pull/881/commits/57b341bd841404c4d31d083adcc0d1385f22dffc) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36364][state/forst] Do not reuse serialized key in Forst map state and/or other namespaces [flink]
fredia commented on code in PR #25394: URL: https://github.com/apache/flink/pull/25394#discussion_r1776219418 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSerializerUtils.java: ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; + +import java.io.IOException; + +/** A utility of serialization keys in ForSt. */ +public class ForStSerializerUtils { + +/** + * Serialize a key and namespace. No user key. + * + * @param contextKey the context key of current request + * @param builder key builder + * @param defaultNamespace default namespace of the state + * @param namespaceSerializer the namespace serializer + * @param enableKeyReuse whether to enable key reuse + */ +public static byte[] serializeKeyAndNamespace( +ContextKey contextKey, +SerializedCompositeKeyBuilder builder, +N defaultNamespace, +TypeSerializer namespaceSerializer, +boolean enableKeyReuse) +throws IOException { +N namespace = contextKey.getNamespace(); +namespace = (namespace == null ? defaultNamespace : namespace); +if (enableKeyReuse && namespace == defaultNamespace) { Review Comment: Thanks for the clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36344][runtime/metrics] Introduce lastCheckpointCompletedTimestamp metric [flink]
Myasuka commented on PR #25399: URL: https://github.com/apache/flink/pull/25399#issuecomment-2375646639 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34702][table-planner] Refactor Deduplicate optimization to defer to StreamPhysicalRank for valid StreamExecDeduplicate node conversion to avoid exceptions [flink]
lincoln-lil commented on code in PR #25380: URL: https://github.com/apache/flink/pull/25380#discussion_r1774875365 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala: ## @@ -352,21 +352,55 @@ object RankUtil { } val inputRowType = rank.getInput.getRowType -val isSortOnTimeAttribute = sortOnTimeAttribute(sortCollation, inputRowType) +val isSortOnTimeAttribute = sortOnTimeAttributeOnly(sortCollation, inputRowType) !rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && isRowNumberType } - private def sortOnTimeAttribute( + private def sortOnTimeAttributeOnly( sortCollation: RelCollation, inputRowType: RelDataType): Boolean = { if (sortCollation.getFieldCollations.size() != 1) { - false -} else { - val firstSortField = sortCollation.getFieldCollations.get(0) - val fieldType = inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType - FlinkTypeFactory.isProctimeIndicatorType(fieldType) || - FlinkTypeFactory.isRowtimeIndicatorType(fieldType) + return false +} +val firstSortField = sortCollation.getFieldCollations.get(0) +val fieldType = inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType +FlinkTypeFactory.isProctimeIndicatorType(fieldType) || +FlinkTypeFactory.isRowtimeIndicatorType(fieldType) + } + + /** + * Checks if the given sort collation has a field collation which based on a rowtime attribute. + */ + def sortOnRowTime(sortCollation: RelCollation, inputRowType: RelDataType): Boolean = { +sortCollation.getFieldCollations.exists { + firstSortField => +val fieldType = inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType +FlinkTypeFactory.isRowtimeIndicatorType(fieldType) +} + } + + /** Whether the given rank is logically a deduplication. */ + def isDeduplication(rank: Rank): Boolean = { +!rank.outputRankNumber && rank.rankType == RankType.ROW_NUMBER && isTop1(rank.rankRange) + } + + /** Whether the given [[StreamPhysicalRank]] could be converted to [[StreamExecDeduplicate]]. */ + def canConvertToDeduplicate(rank: StreamPhysicalRank): Boolean = { +lazy val inputInsertOnly = ChangelogPlanUtils.inputInsertOnly(rank) +lazy val sortOnTimeAttributeOnly = + RankUtil.sortOnTimeAttributeOnly(rank.orderKey, rank.getInput.getRowType) + +isDeduplication(rank) && inputInsertOnly && sortOnTimeAttributeOnly + } + + /** Determines if the given order key indicates that the last row should be kept. */ + def keepLastRow(orderKey: RelCollation): Boolean = { Review Comment: TODO rename -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36367) Use RexNormalize in EquivalentExprShuttle
Sergey Nuyanzin created FLINK-36367: --- Summary: Use RexNormalize in EquivalentExprShuttle Key: FLINK-36367 URL: https://issues.apache.org/jira/browse/FLINK-36367 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Sergey Nuyanzin Assignee: Sergey Nuyanzin This is a follow up for the discussions in comments https://github.com/apache/flink/pull/25388#discussion_r1774177868 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36361][table] Do not use StringBuilder for EquivalentExprShuttle [flink]
snuyanzin commented on code in PR #25388: URL: https://github.com/apache/flink/pull/25388#discussion_r1774883976 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala: ## @@ -439,17 +439,16 @@ object FlinkRexUtil { }) private class EquivalentExprShuttle(rexBuilder: RexBuilder) extends RexShuttle { -private val equiExprMap = mutable.HashMap[String, RexNode]() +private val equiExprSet = mutable.HashSet[RexNode]() override def visitCall(call: RexCall): RexNode = { call.getOperator match { case EQUALS | NOT_EQUALS | GREATER_THAN | LESS_THAN | GREATER_THAN_OR_EQUAL | LESS_THAN_OR_EQUAL => - val swapped = swapOperands(call) - if (equiExprMap.contains(swapped.toString)) { -swapped + if (equiExprSet.contains(call)) { Review Comment: jira issue https://issues.apache.org/jira/browse/FLINK-36367 I tried 1. it touches more than 100 plans which is still doable 2. I faced one float issue for a couple of plans: depending on how tests are running (just one or for the whole class) the order of operands is different (the operator expression is valid for both). So need more time to debug and fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36368) Fix subtask management in CommittableCollector
Arvid Heise created FLINK-36368: --- Summary: Fix subtask management in CommittableCollector Key: FLINK-36368 URL: https://issues.apache.org/jira/browse/FLINK-36368 Project: Flink Issue Type: Bug Reporter: Arvid Heise Assignee: Arvid Heise Fix For: 2.0-preview CommittableCollector is owned by a specific committer subtask and may handle a number of upstream tasks emitting committables. Currently, CommittableCollector rewrites the subtask id to its owner upon receiving a committable (summary). However, this makes it impossible to track if a specific upstream subtask has sent all committables or not. I propose to stop rewriting the subtask id while collecting the committables and instead rewrite them only on emission. This should also ease debugging the state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36361][table] Do not use StringBuilder for EquivalentExprShuttle [flink]
snuyanzin commented on PR #25388: URL: https://github.com/apache/flink/pull/25388#issuecomment-2373541247 thanks for taking a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36287] Disallow UC for inner sink channels [flink]
AHeise commented on PR #25353: URL: https://github.com/apache/flink/pull/25353#issuecomment-2373541699 Addressed feedback and slimmed down PR a bit to the essentials. I have created FLINK-36368 for follow-up work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36275][rest] Remove deprecated ProgramArgsQueryParameter [flink]
reswqa merged PR #25323: URL: https://github.com/apache/flink/pull/25323 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-36275) Remove deprecated ProgramArgsQueryParameter
[ https://issues.apache.org/jira/browse/FLINK-36275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo resolved FLINK-36275. Fix Version/s: 2.0-preview Resolution: Done master(2.0) via f58a3e02a63b094e8c64344a7a74808a3e13ec36. > Remove deprecated ProgramArgsQueryParameter > --- > > Key: FLINK-36275 > URL: https://issues.apache.org/jira/browse/FLINK-36275 > Project: Flink > Issue Type: Sub-task > Components: Runtime / REST >Affects Versions: 2.0-preview >Reporter: Weijie Guo >Priority: Blocker > Labels: pull-request-available > Fix For: 2.0-preview > > > ProgramArgsQueryParameter has been deprecated in Flink 1.7(see > https://issues.apache.org/jira/browse/FLINK-10295). We can remove > this(Breaking compatibility) in Flink 2.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36275) Remove deprecated ProgramArgsQueryParameter
[ https://issues.apache.org/jira/browse/FLINK-36275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-36275: -- Assignee: Weijie Guo > Remove deprecated ProgramArgsQueryParameter > --- > > Key: FLINK-36275 > URL: https://issues.apache.org/jira/browse/FLINK-36275 > Project: Flink > Issue Type: Sub-task > Components: Runtime / REST >Affects Versions: 2.0-preview >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Blocker > Labels: pull-request-available > Fix For: 2.0-preview > > > ProgramArgsQueryParameter has been deprecated in Flink 1.7(see > https://issues.apache.org/jira/browse/FLINK-10295). We can remove > this(Breaking compatibility) in Flink 2.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-30614][serializer] Remove old method of resolving schema compa… [flink]
AlexYinHan commented on PR #25384: URL: https://github.com/apache/flink/pull/25384#issuecomment-2373451676 @masteryhx Thanks for the comments. I have update the docs. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36325][statebackend] Implement basic restore from checkpoint for ForStStateBackend [flink]
flinkbot commented on PR #25397: URL: https://github.com/apache/flink/pull/25397#issuecomment-2373503514 ## CI report: * 28d1f2ed93431e3ec739b3f5af700dc0704accc9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-35035: -- Fix Version/s: 2.0-preview > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Assignee: Zdenek Tison >Priority: Minor > Fix For: 2.0-preview > > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36292][Connectors/Common]: remove timeout to avoid timeout exception [flink]
showuon commented on code in PR #25371: URL: https://github.com/apache/flink/pull/25371#discussion_r1774665859 ## flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java: ## @@ -71,7 +71,7 @@ public void testCloseFetcherWithException() throws Exception { .hasRootCauseMessage("Artificial exception on closing the split reader."); } -@Test(timeout = 3) +@Test Review Comment: Sorry @becketqin , I don't why adding timeout to `Long.MAX_VALUE` will work when there are tasks not terminated. I did a test and confirmed even if there are tasks not terminated, the test still can pass, with or without the `timeout = Long.MAX_VALUE`. To catch the error when there are tasks failing to be closed, I change the `close` method to return `boolean`, so that we have a way to know if tasks are completely closed in time, otherwise, fail the test. Please take a look again. Thanks. ## flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java: ## @@ -71,7 +71,7 @@ public void testCloseFetcherWithException() throws Exception { .hasRootCauseMessage("Artificial exception on closing the split reader."); } -@Test(timeout = 3) +@Test Review Comment: Sorry @becketqin , I don't get why adding timeout to `Long.MAX_VALUE` will work when there are tasks not terminated. I did a test and confirmed even if there are tasks not terminated, the test still can pass, with or without the `timeout = Long.MAX_VALUE`. To catch the error when there are tasks failing to be closed, I change the `close` method to return `boolean`, so that we have a way to know if tasks are completely closed in time, otherwise, fail the test. Please take a look again. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-35035. --- Resolution: Fixed Closing this parent issue. All subtasks are addressed and the no new test instabilities related to this change were identified in the past 1-2 weeks. > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Assignee: Zdenek Tison >Priority: Minor > Fix For: 2.0-preview > > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [DRAFT][FLINK-36366][core] Remove deprecate API in flink-core exclude connector and state part [flink]
flinkbot commented on PR #25396: URL: https://github.com/apache/flink/pull/25396#issuecomment-2373400996 ## CI report: * 9f67d79ffaceec279e944054d51880ebb8d340cd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36325][statebackend] Implement basic restore from checkpoint for ForStStateBackend [flink]
zoltar9264 opened a new pull request, #25397: URL: https://github.com/apache/flink/pull/25397 ## What is the purpose of the change Impletement basic restore from checkpoint in ForStStateBackend, the checkpoint implement in [FLINK-35510](https://issues.apache.org/jira/browse/FLINK-35510). Note that ForStStateBackend now support sync and async two kind keyed state backend, we make async one support checkpoint and restore first. ## Brief change log *(for example:)* - Introduce ForStIncrementalRestoreOperation - Implement from checkpoint in ForStKeyedStateBackendBuilder ## Verifying this change This change added tests and can be verified as follows: - Add new test case in ForStStateDataTransferTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36325) Implement basic restore from checkpoint for ForStStateBackend
[ https://issues.apache.org/jira/browse/FLINK-36325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36325: --- Labels: pull-request-available (was: ) > Implement basic restore from checkpoint for ForStStateBackend > - > > Key: FLINK-36325 > URL: https://issues.apache.org/jira/browse/FLINK-36325 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Feifan Wang >Priority: Major > Labels: pull-request-available > > As title, implement basic restore from checkpoint for ForStStateBackend, > rescale will be implemented later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36011) Generalize RescaleManager to become StateTransitionManager
[ https://issues.apache.org/jira/browse/FLINK-36011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884537#comment-17884537 ] Matthias Pohl commented on FLINK-36011: --- master: [c869326d089705475481c2c2ea42a6efabb8c828|https://github.com/apache/flink/commit/c869326d089705475481c2c2ea42a6efabb8c828] > Generalize RescaleManager to become StateTransitionManager > -- > > Key: FLINK-36011 > URL: https://issues.apache.org/jira/browse/FLINK-36011 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zdenek Tison >Assignee: Zdenek Tison >Priority: Major > Labels: pull-request-available > Fix For: 2.0-preview > > > The goal is to change the RescaleManager component to one with a broader > responsibility that will manage the adaptive scheduler's state transitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [DRAFT][FLINK-29741][scala] Remove flink-dist-scala, flink-scala, flink-streaming-scala module [flink]
reswqa commented on PR #25393: URL: https://github.com/apache/flink/pull/25393#issuecomment-2373310633 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36366) Remove deprecate API in flink-core exclude connector and state part
Weijie Guo created FLINK-36366: -- Summary: Remove deprecate API in flink-core exclude connector and state part Key: FLINK-36366 URL: https://issues.apache.org/jira/browse/FLINK-36366 Project: Flink Issue Type: Sub-task Components: API / Core Affects Versions: 2.0-preview Reporter: Weijie Guo Assignee: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36366) Remove deprecate API in flink-core exclude connector and state part
[ https://issues.apache.org/jira/browse/FLINK-36366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36366: --- Labels: pull-request-available (was: ) > Remove deprecate API in flink-core exclude connector and state part > --- > > Key: FLINK-36366 > URL: https://issues.apache.org/jira/browse/FLINK-36366 > Project: Flink > Issue Type: Sub-task > Components: API / Core >Affects Versions: 2.0-preview >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [DRAFT][FLINK-36366][core] Remove deprecate API in flink-core exclude connector and state part [flink]
reswqa opened a new pull request, #25396: URL: https://github.com/apache/flink/pull/25396 ## What is the purpose of the change *Remove deprecate API in flink-core exclude connector and state part* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: yes - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36335][runtime] Improving Method Reusability in StreamGraphGenerator with JobVertexBuildContext [flink]
JunRuiLee commented on code in PR #25366: URL: https://github.com/apache/flink/pull/25366#discussion_r1772938079 ## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.SerializedValue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Helper class encapsulates all necessary information and configurations required during the + * construction of job vertices. + */ +@Internal +public class JobVertexBuildContext { + +private final StreamGraph streamGraph; + +/** The {@link OperatorChainInfo}s, key is the start node id of the chain. */ +private final Map chainInfos; + +/** The {@link OperatorInfo}s, key is the id of the stream node. */ +private final Map operatorInfosInorder; + +// The config for node chained with the current operator,for multi input operators containing +// sourceChain, the key will be the ids of all nodes in the chain, otherwise the key will only +// be the id of the head node in the chain +private final Map> chainedConfigs; + +// The created JobVertex, the key is start node id. +private final Map jobVerticesInorder; + +// Futures for the serialization of operator coordinators. +private final Map< +JobVertexID, + List>>> +coordinatorSerializationFuturesPerJobVertex; + +// The order of StreamEdge connected to other vertices should be consistent with the order in +// which JobEdge was created. +private final List physicalEdgesInOrder; + +// We need to use AtomicBoolean to sense whether HybridResultPartition exists during the process Review Comment: // We use AtomicBoolean to track the existence of HybridResultPartition // during the incremental JobGraph generation process introduced by // AdaptiveGraphManager. It is essential to globally monitor changes // to this variable, thus necessitating the use of a Boolean object // instead of a primitive boolean. ## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.Se
Re: [PR] [FLINK-36361][table] Do not use StringBuilder for EquivalentExprShuttle [flink]
snuyanzin merged PR #25388: URL: https://github.com/apache/flink/pull/25388 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36361) Do not use StringBuilder for EquivalentExprShuttle
[ https://issues.apache.org/jira/browse/FLINK-36361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884527#comment-17884527 ] Sergey Nuyanzin commented on FLINK-36361: - Merged as [0578c8e17dddc88444635e7ad8f159cb52982af3|https://github.com/apache/flink/commit/0578c8e17dddc88444635e7ad8f159cb52982af3] > Do not use StringBuilder for EquivalentExprShuttle > -- > > Key: FLINK-36361 > URL: https://issues.apache.org/jira/browse/FLINK-36361 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Currently there is {{EquivalentExprShuttle}} where there is a map of > {{toString}} node to {{RelNode}}. In fact there is no need for toString which > is calculated each time with usage of {{StringBuilder}} under the hood. And > even more we faced one weird case where it consumes memory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-36361) Do not use StringBuilder for EquivalentExprShuttle
[ https://issues.apache.org/jira/browse/FLINK-36361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin resolved FLINK-36361. - Fix Version/s: 2.0-preview Resolution: Fixed > Do not use StringBuilder for EquivalentExprShuttle > -- > > Key: FLINK-36361 > URL: https://issues.apache.org/jira/browse/FLINK-36361 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: 2.0-preview > > > Currently there is {{EquivalentExprShuttle}} where there is a map of > {{toString}} node to {{RelNode}}. In fact there is no need for toString which > is calculated each time with usage of {{StringBuilder}} under the hood. And > even more we faced one weird case where it consumes memory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36273) Remove deprecated Table/SQL configuration in 2.0
[ https://issues.apache.org/jira/browse/FLINK-36273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee closed FLINK-36273. --- Assignee: xuyang Resolution: Fixed Fixed in master: 733a56682bcceabab6625b621547538c6d993976 > Remove deprecated Table/SQL configuration in 2.0 > > > Key: FLINK-36273 > URL: https://issues.apache.org/jira/browse/FLINK-36273 > Project: Flink > Issue Type: Sub-task >Reporter: xuyang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > Fix For: 2.0-preview > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36273][table] Remove deprecated Table/SQL configuration in 2.0 [flink]
lincoln-lil merged PR #25334: URL: https://github.com/apache/flink/pull/25334 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36188] Fix disable buffer flush lose efficacy [flink-connector-hbase]
boring-cyborg[bot] commented on PR #49: URL: https://github.com/apache/flink-connector-hbase/pull/49#issuecomment-2373679524 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36188) HBase disable buffer flush lose efficacy
[ https://issues.apache.org/jira/browse/FLINK-36188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky reassigned FLINK-36188: Assignee: MOBIN > HBase disable buffer flush lose efficacy > > > Key: FLINK-36188 > URL: https://issues.apache.org/jira/browse/FLINK-36188 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: hbase-3.0.0 > Environment: Flink 1.16.3 >Reporter: MOBIN >Assignee: MOBIN >Priority: Critical > Labels: pull-request-available > > HBase table > ||rowkey||col|| > |1|1| > The user lookup joins the hbase table, adds 1 to the col value, and writes it > back to hbase > {code:java} > @Test > void testTableSinkDisabledBufferFlush() throws Exception { > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, > streamSettings); > tEnv.executeSql( > "CREATE TABLE hTableForSink (" > + " rowkey INT PRIMARY KEY NOT ENFORCED," > + " family1 ROW" > + ") WITH (" > + " 'connector' = 'hbase-2.2'," > + " 'sink.buffer-flush.max-size' = '0'," > + " 'sink.buffer-flush.max-rows' = '0'," > + " 'table-name' = '" > + TEST_TABLE_6 > + "'," > + " 'zookeeper.quorum' = '" > + getZookeeperQuorum() > + "'" > + ")"); > String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))"; > tEnv.executeSql(insert).await(); > tEnv.executeSql( > "CREATE VIEW user_click AS " > + " SELECT user_id, proctime() AS proc_time" > + " FROM ( " > + " VALUES(1), (1), (1), (1), (1)" > + " ) AS t (user_id);"); > >tEnv.executeSql( > "INSERT INTO hTableForSink SELECT " > + " user_id as rowkey," > + " ROW(CAST(family1.col1 + 1 AS INT))" > + " FROM user_click INNER JOIN hTableForSink" > + " FOR SYSTEM_TIME AS OF user_click.proc_time" > + " ON hTableForSink.rowkey = user_click.user_id;"); > > tEnv.executeSql( > "CREATE TABLE hTableForQuery (" > + " rowkey INT PRIMARY KEY NOT ENFORCED," > + " family1 ROW" > + ") WITH (" > + " 'connector' = 'hbase-2.2'," > + " 'table-name' = '" > + TEST_TABLE_6 > + "'," > + " 'zookeeper.quorum' = '" > + getZookeeperQuorum() > + "'" > + ")"); > String query = "SELECT rowkey, family1.col1 FROM hTableForQuery"; > > TableResult firstResult = tEnv.executeSql(query); > List firstResults = > CollectionUtil.iteratorToList(firstResult.collect()); > String firstExpected = "+I[1, 6]"; > TestBaseUtils.compareResultAsText(firstResults, firstExpected); > } {code} > test failed > {code:java} > org.junit.ComparisonFailure: Different elements in arrays: expected 1 > elements and received 1 > expected: [+I[1, 6]] > received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]> > Expected :+I[1, 6] > Actual :+I[1, 2] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36188] Fix disable buffer flush lose efficacy [flink-connector-hbase]
ferenc-csaky merged PR #49: URL: https://github.com/apache/flink-connector-hbase/pull/49 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-36188) HBase disable buffer flush lose efficacy
[ https://issues.apache.org/jira/browse/FLINK-36188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky closed FLINK-36188. Resolution: Fixed [{{c6eb87f}}|https://github.com/apache/flink-connector-hbase/commit/c6eb87f809cd30ad352d1644980eefce658bb63f] in main > HBase disable buffer flush lose efficacy > > > Key: FLINK-36188 > URL: https://issues.apache.org/jira/browse/FLINK-36188 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: hbase-3.0.0 > Environment: Flink 1.16.3 >Reporter: MOBIN >Assignee: MOBIN >Priority: Critical > Labels: pull-request-available > Fix For: hbase-4.0.0 > > > HBase table > ||rowkey||col|| > |1|1| > The user lookup joins the hbase table, adds 1 to the col value, and writes it > back to hbase > {code:java} > @Test > void testTableSinkDisabledBufferFlush() throws Exception { > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, > streamSettings); > tEnv.executeSql( > "CREATE TABLE hTableForSink (" > + " rowkey INT PRIMARY KEY NOT ENFORCED," > + " family1 ROW" > + ") WITH (" > + " 'connector' = 'hbase-2.2'," > + " 'sink.buffer-flush.max-size' = '0'," > + " 'sink.buffer-flush.max-rows' = '0'," > + " 'table-name' = '" > + TEST_TABLE_6 > + "'," > + " 'zookeeper.quorum' = '" > + getZookeeperQuorum() > + "'" > + ")"); > String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))"; > tEnv.executeSql(insert).await(); > tEnv.executeSql( > "CREATE VIEW user_click AS " > + " SELECT user_id, proctime() AS proc_time" > + " FROM ( " > + " VALUES(1), (1), (1), (1), (1)" > + " ) AS t (user_id);"); > >tEnv.executeSql( > "INSERT INTO hTableForSink SELECT " > + " user_id as rowkey," > + " ROW(CAST(family1.col1 + 1 AS INT))" > + " FROM user_click INNER JOIN hTableForSink" > + " FOR SYSTEM_TIME AS OF user_click.proc_time" > + " ON hTableForSink.rowkey = user_click.user_id;"); > > tEnv.executeSql( > "CREATE TABLE hTableForQuery (" > + " rowkey INT PRIMARY KEY NOT ENFORCED," > + " family1 ROW" > + ") WITH (" > + " 'connector' = 'hbase-2.2'," > + " 'table-name' = '" > + TEST_TABLE_6 > + "'," > + " 'zookeeper.quorum' = '" > + getZookeeperQuorum() > + "'" > + ")"); > String query = "SELECT rowkey, family1.col1 FROM hTableForQuery"; > > TableResult firstResult = tEnv.executeSql(query); > List firstResults = > CollectionUtil.iteratorToList(firstResult.collect()); > String firstExpected = "+I[1, 6]"; > TestBaseUtils.compareResultAsText(firstResults, firstExpected); > } {code} > test failed > {code:java} > org.junit.ComparisonFailure: Different elements in arrays: expected 1 > elements and received 1 > expected: [+I[1, 6]] > received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]> > Expected :+I[1, 6] > Actual :+I[1, 2] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36188) HBase disable buffer flush lose efficacy
[ https://issues.apache.org/jira/browse/FLINK-36188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky updated FLINK-36188: - Fix Version/s: hbase-4.0.0 > HBase disable buffer flush lose efficacy > > > Key: FLINK-36188 > URL: https://issues.apache.org/jira/browse/FLINK-36188 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: hbase-3.0.0 > Environment: Flink 1.16.3 >Reporter: MOBIN >Assignee: MOBIN >Priority: Critical > Labels: pull-request-available > Fix For: hbase-4.0.0 > > > HBase table > ||rowkey||col|| > |1|1| > The user lookup joins the hbase table, adds 1 to the col value, and writes it > back to hbase > {code:java} > @Test > void testTableSinkDisabledBufferFlush() throws Exception { > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, > streamSettings); > tEnv.executeSql( > "CREATE TABLE hTableForSink (" > + " rowkey INT PRIMARY KEY NOT ENFORCED," > + " family1 ROW" > + ") WITH (" > + " 'connector' = 'hbase-2.2'," > + " 'sink.buffer-flush.max-size' = '0'," > + " 'sink.buffer-flush.max-rows' = '0'," > + " 'table-name' = '" > + TEST_TABLE_6 > + "'," > + " 'zookeeper.quorum' = '" > + getZookeeperQuorum() > + "'" > + ")"); > String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))"; > tEnv.executeSql(insert).await(); > tEnv.executeSql( > "CREATE VIEW user_click AS " > + " SELECT user_id, proctime() AS proc_time" > + " FROM ( " > + " VALUES(1), (1), (1), (1), (1)" > + " ) AS t (user_id);"); > >tEnv.executeSql( > "INSERT INTO hTableForSink SELECT " > + " user_id as rowkey," > + " ROW(CAST(family1.col1 + 1 AS INT))" > + " FROM user_click INNER JOIN hTableForSink" > + " FOR SYSTEM_TIME AS OF user_click.proc_time" > + " ON hTableForSink.rowkey = user_click.user_id;"); > > tEnv.executeSql( > "CREATE TABLE hTableForQuery (" > + " rowkey INT PRIMARY KEY NOT ENFORCED," > + " family1 ROW" > + ") WITH (" > + " 'connector' = 'hbase-2.2'," > + " 'table-name' = '" > + TEST_TABLE_6 > + "'," > + " 'zookeeper.quorum' = '" > + getZookeeperQuorum() > + "'" > + ")"); > String query = "SELECT rowkey, family1.col1 FROM hTableForQuery"; > > TableResult firstResult = tEnv.executeSql(query); > List firstResults = > CollectionUtil.iteratorToList(firstResult.collect()); > String firstExpected = "+I[1, 6]"; > TestBaseUtils.compareResultAsText(firstResults, firstExpected); > } {code} > test failed > {code:java} > org.junit.ComparisonFailure: Different elements in arrays: expected 1 > elements and received 1 > expected: [+I[1, 6]] > received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]> > Expected :+I[1, 6] > Actual :+I[1, 2] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35233][BP v3.0] Fix lookup cache reuse RowData object problem. [flink-connector-hbase]
ferenc-csaky merged PR #48: URL: https://github.com/apache/flink-connector-hbase/pull/48 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36292][Connectors/Common]: remove timeout to avoid timeout exception [flink]
becketqin commented on code in PR #25371: URL: https://github.com/apache/flink/pull/25371#discussion_r1775726090 ## flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java: ## @@ -71,7 +71,7 @@ public void testCloseFetcherWithException() throws Exception { .hasRootCauseMessage("Artificial exception on closing the split reader."); } -@Test(timeout = 3) +@Test Review Comment: > I did a test and confirmed even if there are tasks not terminated, the test still can pass, with or without the timeout = Long.MAX_VALUE. Hmm, this is not expected. The timeout is used in the `SplitFetcherManager.close()` ``` executors.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS) ``` This is a blocking call and won't return until the executors finishes all the tasks, or otherwise interrupted. So if the timeout is `Long.MAX_VALUE`, and there are running tasks, `SplitFetcherManager.close()` will block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36292][Connectors/Common]: remove timeout to avoid timeout exception [flink]
becketqin commented on code in PR #25371: URL: https://github.com/apache/flink/pull/25371#discussion_r1775726090 ## flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java: ## @@ -71,7 +71,7 @@ public void testCloseFetcherWithException() throws Exception { .hasRootCauseMessage("Artificial exception on closing the split reader."); } -@Test(timeout = 3) +@Test Review Comment: > I did a test and confirmed even if there are tasks not terminated, the test still can pass, with or without the timeout = Long.MAX_VALUE. Hmm, this is not expected. The timeout is used in the `SplitFetcherManager.close()` ``` executors.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS) ``` This is a blocking call and won't return until the executors finishes all the tasks, or otherwise be interrupted. So if the timeout is `Long.MAX_VALUE`, and there are running tasks, `SplitFetcherManager.close()` will block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DRAFT][FLINK-36336] Remove deprecated dataset API dependency exclude table module [flink]
codenohup commented on PR #25365: URL: https://github.com/apache/flink/pull/25365#issuecomment-2374836433 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Fix Stale label automatically removed with no update [flink-cdc]
leonardBang merged PR #3621: URL: https://github.com/apache/flink-cdc/pull/3621 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36273][table] Remove deprecated Table/SQL configuration in 2.0 [flink]
lincoln-lil commented on PR #25334: URL: https://github.com/apache/flink/pull/25334#issuecomment-2373644347 For 2nd commit, went over all options mentioned in https://issues.apache.org/jira/browse/FLINK-35473 All deprecated options there were remove except the occurrences in corresponding release notes. The changes looks good! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36369][table] Move deprecated user-visible classes in table modules to the legacy package [flink]
xuyangzhong commented on PR #25400: URL: https://github.com/apache/flink/pull/25400#issuecomment-2374388353 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-34702) Rank should not convert to StreamExecDuplicate when the input is not insert only
[ https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee closed FLINK-34702. --- Fix Version/s: 2.0-preview (was: 2.0.0) Resolution: Fixed fixed in master: 2f8b2d81a97a681d67964ef17932ea0abf35730f > Rank should not convert to StreamExecDuplicate when the input is not insert > only > > > Key: FLINK-34702 > URL: https://issues.apache.org/jira/browse/FLINK-34702 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0 >Reporter: Jacky Lau >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 2.0-preview > > > {code:java} > @Test > def testSimpleFirstRowOnBuiltinProctime1(): Unit = { > val sqlQuery = > """ > |SELECT * > |FROM ( > | SELECT *, > |ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as > rowNum > | FROM (select a, count(b) as b from MyTable group by a) > |) > |WHERE rowNum = 1 > """.stripMargin > util.verifyExecPlan(sqlQuery) > } {code} > Exception: > org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't > support consuming update changes which is produced by node > GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b]) > because the StreamPhysicalDeduplicate can not consuming update changes now > while StreamExecRank can. > so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in > this case. and we can defer whether input contains update change in the > "optimize the physical plan" phase. > so we can add an option to solve it. and when the StreamPhysicalDeduplicate > can support consuming update changes , we can deprecate it -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34702][table-planner] Refactor Deduplicate optimization to defer to StreamPhysicalRank for valid StreamExecDeduplicate node conversion to avoid exceptions [flink]
lincoln-lil merged PR #25380: URL: https://github.com/apache/flink/pull/25380 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] FLINK-36332 Add option to select the Fabric8 httpclient implemention [flink-kubernetes-operator]
gyfora commented on code in PR #881: URL: https://github.com/apache/flink-kubernetes-operator/pull/881#discussion_r1774650443 ## .github/workflows/ci.yml: ## @@ -47,7 +48,7 @@ jobs: ${{ runner.os }}-maven- - name: Build with Maven run: | - mvn -B clean install javadoc:javadoc -Pgenerate-docs + mvn -B clean install javadoc:javadoc -Pgenerate-docs -Dfabric8.httpclinent.impl=${{ matrix.httpclient }} Review Comment: I prefer the e2e approach, unit tests mock most http interactions anyways -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] FLINK-36332 Add option to select the Fabric8 httpclient implemention [flink-kubernetes-operator]
gyfora commented on code in PR #881: URL: https://github.com/apache/flink-kubernetes-operator/pull/881#discussion_r1774647193 ## .github/workflows/ci.yml: ## @@ -78,6 +79,7 @@ jobs: namespace: ["default","flink"] mode: ["native", "standalone"] java-version: [ 11, 17, 21 ] +http-client: [ "okhttp", "vertx", "jetty", "jdk" ] Review Comment: I suggest to only test the different HTTP clients in a single test case such as: `v1_20, default, native, java21 and test_application_operations.sh ` we don't want to blow up our matrix by a factor of 4 :D -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] 【update】Add new operator config "savepoint-create-subdir" to allow user to configure whether to create sub-directories named by job id under the 'state.savepoints.dir' [flink-kubernetes-opera
gyfora commented on PR #882: URL: https://github.com/apache/flink-kubernetes-operator/pull/882#issuecomment-2373230412 Hi @xieyang1994 ! New features need a JIRA first and the PR should be opened against main in any case unless we are back porting a bug fix to one of the supported release versions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36335][runtime] Improving Method Reusability in StreamGraphGenerator with JobVertexBuildContext [flink]
noorall commented on code in PR #25366: URL: https://github.com/apache/flink/pull/25366#discussion_r1774908113 ## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ## @@ -593,15 +616,22 @@ private Map buildChainedInputsAndGetHeadInputs( if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES && isChainableInput(sourceOutEdge, streamGraph)) { +// we cache the non-chainable outputs here, and set the non-chained config later +OperatorInfo operatorInfo = new OperatorInfo(); +operatorInfo.setChainableOutputs(Collections.emptyList()); Review Comment: > maybe nonChainable? > > ``` > OperatorInfo operatorInfo = new OperatorInfo(); > jobVertexBuildContext.addOperatorInfo(sourceNodeId, operatorInfo); > > final OperatorID opId = new OperatorID(hashes.get(sourceNodeId)); > final StreamConfig.SourceInputConfig inputConfig = > new StreamConfig.SourceInputConfig(sourceOutEdge); > final StreamConfig operatorConfig = new StreamConfig(new Configuration()); > > setOperatorConfig( > sourceNodeId, > operatorConfig, > Collections.emptyMap(), > jobVertexBuildContext); > setOperatorChainedOutputsConfig( > operatorConfig, Collections.emptyList(), jobVertexBuildContext); > > // we cache the non-chainable outputs here, and set the non-chained config later > operatorInfo.setNonChainableOutputs(Collections.emptyList()); > ``` > > And a unit test case is needed Thank you for the reminder and I have corrected this part. However, there is no need for an additional unit test because the property in OperatorInfo is an empty list by default. Therefore, it doesn't matter whether it is set or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36335][runtime] Improving Method Reusability in StreamGraphGenerator with JobVertexBuildContext [flink]
noorall commented on PR #25366: URL: https://github.com/apache/flink/pull/25366#issuecomment-2373601784 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36109) Improve FlinkStateSnapshot resource labels for efficient filtering
[ https://issues.apache.org/jira/browse/FLINK-36109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884626#comment-17884626 ] Gyula Fora commented on FLINK-36109: I think adding these labels makes sense for now > Improve FlinkStateSnapshot resource labels for efficient filtering > -- > > Key: FLINK-36109 > URL: https://issues.apache.org/jira/browse/FLINK-36109 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Gyula Fora >Priority: Critical > Fix For: kubernetes-operator-1.10.0 > > > The new FlinkStateSnapshot CRs currently have the following label only: > ``` > snapshot.type: XY > ``` > We need to investigate if we can efficiently filter based on snapshot > resource type (savepoint/checkpoint), target job etc. I am not sure if we can > do this based on the spec directly or we need labels for it. > Also we should probably rename the snapshot.type label to trigger.type to be > more straightforward. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34876][transform] Support UDF functions in transform [flink-cdc]
yuxiqian commented on PR #3465: URL: https://github.com/apache/flink-cdc/pull/3465#issuecomment-2374215861 Hi @mamineturki, `flink-cdc-pipeline-udf-examples` are for testing purposes only. If you'd like to test it, using `org.apache.flink.cdc.udf.examples.scala.AddOneFunctionClass` should work. Seems it wasn't successfully loaded by Flink classloaders in your case. Have you tried putting `.jar` file into Flink's `/lib` path (not Flink CDC `/lib`!) and restart the Flink cluster? If that doesn't work, try passing it with `--jar XXX.jar` argument when submitting pipeline job with `bin/flink-cdc.sh`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34876][transform] Support UDF functions in transform [flink-cdc]
mamineturki commented on PR #3465: URL: https://github.com/apache/flink-cdc/pull/3465#issuecomment-2374187978 @yuxiqian Hi, I have been trying to run an example with this new feature but I always get `java.lang.ClassNotFoundException: org.example.udf.AddOneFunctionClass` I tried loading the udf function using the path in the PR description, as well as the package name `org.apache.flink.cdc.udf.examples.scala.AddOneFunctionClass` with no luck. I imported the function by downloading the pre-packaged jar `wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-udf-examples/3.2.0/flink-cdc-pipeline-udf-examples-3.2.0.jar` please advise on what am I missing exactly. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34876][transform] Support UDF functions in transform [flink-cdc]
mamineturki commented on PR #3465: URL: https://github.com/apache/flink-cdc/pull/3465#issuecomment-2374243091 Thanks for the speedy reply @yuxiqian . moving the jar to `Flink lib` did not help. passing the jar with `--jar` caused the `Flink-cdc.sh` command to get stuck with no logs, and no submitted job. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34876][transform] Support UDF functions in transform [flink-cdc]
mamineturki commented on PR #3465: URL: https://github.com/apache/flink-cdc/pull/3465#issuecomment-2374250013 Thanks for the speedy response @yuxiqian . passing the jar using `--jar` worked for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36369][table] Move deprecated user-visible classes in table modules to the legacy package [flink]
flinkbot commented on PR #25400: URL: https://github.com/apache/flink/pull/25400#issuecomment-2374256737 ## CI report: * 259abc16084da0205bae73b37033c29ecb079b29 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36370) Flink 1.18 fails with Empty server certificate chain when High Availability and mTLS both enabled
Aniruddh J created FLINK-36370: -- Summary: Flink 1.18 fails with Empty server certificate chain when High Availability and mTLS both enabled Key: FLINK-36370 URL: https://issues.apache.org/jira/browse/FLINK-36370 Project: Flink Issue Type: Bug Reporter: Aniruddh J Hi, in my kubernetes cluster I have flink-kubernetes-operator v1.7.0 and apache-flink v1.18.1 installed. In the FlinkDeployment CR when I enable Kubernetes high availability services with mTLS something like below: ``` high-availability.type: kubernetes high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: 'file:///mnt/pv/ha' security.ssl.rest.authentication-enabled: 'true' ``` I am ending up with `SSLHandshakeException with empty client certificate` . Though both of them work fine when implemented individually. Upon enabling `{{{}-{}}}{{{}[Djavax.net|http://djavax.net/]{}}}{{{}.debug=all`{}}} observed client server communication and figured out [https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java] is where Client gets setup and it happens from the operator side [https://github.com/apache/flink-kubernetes-operator/blob/b081b75b72ddde643710e869b95b214912882363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L750] (correct me here please) When we enable both mTLS and HA the client doesn't seem to be getting setup. Not only that, it doesn't follow the same path of client creation. Below is the part of the ssl handshake log before getting the error: ``` javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 15:16:12.508 GMT|null:-1|Produced CertificateRequest handshake message ( "CertificateRequest": { "certificate types": [ecdsa_sign, rsa_sign, dss_sign] "supported signature algorithms": [ecdsa_secp256r1_sha256, .., rsa_sha224, dsa_sha224, ecdsa_sha1, rsa_pkcs1_sha1, dsa_sha1] "certificate authorities": [CN=FlinkCA, O=Apache Flink] } javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 15:16:12.512 GMT|null:-1|Raw read ( : 1603030007 0B 0300 ) javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 15:16:12.513 GMT|null:-1|READ: TLSv1.2 handshake, length = 7 javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 15:16:12.513 GMT|null:-1|Consuming client Certificate handshake message ( "Certificates": ) javax.net.ssl|ERROR|53|flink-rest-server-netty-worker-thread-1|2024-09-19 15:16:12.514 GMT|null:-1|Fatal (BAD_CERTIFICATE): Empty server certificate chain ( "throwable" : { javax.net.ssl.SSLHandshakeException: Empty server certificate chain ``` >From the initial looks it seems when Flink server is requesting for >certificates from Client, the client doesn't send anything back since it does >not have matching CAs? Some client is sending a REST request to Flink server which the netty library is handling but until we figure out the client we don't know whether it's the truststore on client that's a problem or something else we don't see here. Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36370) Flink 1.18 fails with Empty server certificate chain when High Availability and mTLS both enabled
[ https://issues.apache.org/jira/browse/FLINK-36370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aniruddh J updated FLINK-36370: --- Attachment: flink-cert-issue.log Description: Hi, in my kubernetes cluster I have flink-kubernetes-operator v1.7.0 and apache-flink v1.18.1 installed. In the FlinkDeployment CR when I enable Kubernetes high availability services with mTLS something like below: {code:java} high-availability.type: kubernetes high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: 'file:///mnt/pv/ha' security.ssl.rest.authentication-enabled: 'true'{code} I am ending up with *SSLHandshakeException with empty client certificate* Though both of them work fine when implemented individually. Upon enabling *{{{}-{}}}{{{}[Djavax.net|http://djavax.net/]{}}}{{{}.debug=all{}}}* observed client server communication and figured out [https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java] is where Client gets setup and it happens from the operator side [https://github.com/apache/flink-kubernetes-operator/blob/b081b75b72ddde643710e869b95b214912882363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L750] (correct me here please) When we enable both mTLS and HA the client doesn't seem to be getting setup. Not only that, it doesn't follow the same path of client creation. Below is the part of the ssl handshake log before getting the error (attached the entire ssl handshake log): {code:java} javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 15:16:12.508 GMT|null:-1|Produced CertificateRequest handshake message ( "CertificateRequest": { "certificate types": [ecdsa_sign, rsa_sign, dss_sign] "supported signature algorithms": [ecdsa_secp256r1_sha256, .., rsa_sha224, dsa_sha224, ecdsa_sha1, rsa_pkcs1_sha1, dsa_sha1] "certificate authorities": [CN=FlinkCA, O=Apache Flink] } javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 15:16:12.512 GMT|null:-1|Raw read ( : 1603030007 0B 0300 ) javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 15:16:12.513 GMT|null:-1|READ: TLSv1.2 handshake, length = 7 javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 15:16:12.513 GMT|null:-1|Consuming client Certificate handshake message ( "Certificates": ) javax.net.ssl|ERROR|53|flink-rest-server-netty-worker-thread-1|2024-09-19 15:16:12.514 GMT|null:-1|Fatal (BAD_CERTIFICATE): Empty server certificate chain ( "throwable" : { javax.net.ssl.SSLHandshakeException: Empty server certificate chain {code} >From the initial looks it seems when Flink server is requesting for >certificates from Client, the client doesn't send anything back since it does >not have certificates matching the CA? Some client is sending a REST request to Flink server which the netty library is handling but until we figure out the client we don't know whether it's the truststore on client that's a problem or something else we don't see here. Thanks! was: Hi, in my kubernetes cluster I have flink-kubernetes-operator v1.7.0 and apache-flink v1.18.1 installed. In the FlinkDeployment CR when I enable Kubernetes high availability services with mTLS something like below: ``` high-availability.type: kubernetes high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: 'file:///mnt/pv/ha' security.ssl.rest.authentication-enabled: 'true' ``` I am ending up with `SSLHandshakeException with empty client certificate` . Though both of them work fine when implemented individually. Upon enabling `{{{}-{}}}{{{}[Djavax.net|http://djavax.net/]{}}}{{{}.debug=all`{}}} observed client server communication and figured out [https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java] is where Client gets setup and it happens from the operator side [https://github.com/apache/flink-kubernetes-operator/blob/b081b75b72ddde643710e869b95b214912882363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L750] (correct me here please) When we enable both mTLS and HA the client doesn't seem to be getting setup. Not only that, it doesn't follow the same path of client creation. Below is the part of the ssl handshake log before getting the error: ``` javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 15:16:12.508 GMT|null:-1|Produced CertificateRequest handshake message ( "CertificateRequest": { "certificate types": [ecdsa_sign, rsa_sign, dss_sign] "supported signature algorithms": [ecdsa_secp256r1_sha256, .., rsa_sha224, dsa_sha224, ecdsa_sha1, rsa_pkcs1_sha1, ds
[jira] [Updated] (FLINK-36371) Field BinaryStringData as keySelector would lead the same records hash shuffle into different tumble windows
[ https://issues.apache.org/jira/browse/FLINK-36371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eason Ye updated FLINK-36371: - Description: The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap --(keyby)–> TumbleProcessWindow -> sink. The Source would emit two fields: UserName String, and Amount Integer. FlatMap collects the BinaryRowData type record and emits it to the TumbleProcessWindow operator. KeySelector is using the UserName BinaryStringData. key by code is : {code:java} .keyBy(rowData -> rowData.getString(0)){code} The window result is unexpected, the same username records arrived at TumbleProcessWindow simultaneously, but these records were calculated in the different windows. When I use the below keyBy, the window result is correct. {code:java} .keyBy(rowData -> rowData.getString(0).toString()){code} Reproduction Code: {code:java} env.fromSource( kafkaSource, WatermarkStrategy.noWatermarks(), sourceName()) .flatMap(new FlatMapFunction() { public void flatMap(RowData value, Collector out) throws Exception { System.out.println("is instanceof BinaryRowData =" + (value instanceof BinaryRowData)); out.collect(value); } }) .keyBy((KeySelector) value -> value.getString(0)) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .process(new FlinkTumbleWindowingProcessFunction()) .print(); public static class FlinkTumbleWindowingProcessFunction extends ProcessWindowFunction { int subtask = -1; public void open(Configuration parameters) throws Exception { super.open(parameters); subtask = getRuntimeContext().getIndexOfThisSubtask(); } public void process( StringData s, ProcessWindowFunction.Context context, Iterable elements, Collector out) throws Exception { System.out.printf( "subtask = %d, s = %s, elements = %s%n", subtask, s, elements); } } {code} was: The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap --(keyby)–> TumbleProcessWindow -> sink. The Source would emit two fields: UserName String, and Amount Integer. FlatMap collects the BinaryRowData type record and emits it to the TumbleProcessWindow operator. KeySelector is using the UserName BinaryStringData. key by code is : {code:java} .keyBy(rowData -> rowData.getString(0)){code} The window result is unexpected, the same username records arrived at TumbleProcessWindow simultaneously, but these records were calculated in the different windows. When I use the below keyBy, the window result is correct. {code:java} .keyBy(rowData -> rowData.getString(0).toString()){code} Reproduction: {code:java} env.fromSource( kafkaSource, WatermarkStrategy.noWatermarks(), sourceName()) .flatMap(new FlatMapFunction() { public void flatMap(RowData value, Collector out) throws Exception { System.out.println("is instanceof BinaryRowData =" + (value instanceof BinaryRowData)); out.collect(value); } }) .keyBy((KeySelector) value -> value.getString(0)) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .process(new FlinkTumbleWindowingProcessFunction()) .print(); {code} > Field BinaryStringData as keySelector would lead the same records hash > shuffle into different tumble windows > > > Key: FLINK-36371 > URL: https://issues.apache.org/jira/browse/FLINK-36371 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Affects Versions: 2.0.0 >Reporter: Eason Ye >Priority: Major > > The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap > --(keyby)–> TumbleProcessWindow -> sink. > The Source would emit two fields: UserName String, and Amount Integer. > FlatMap collects the BinaryRowData type record and emits it to the > TumbleProcessWindow operator. > KeySelector is using the UserName BinaryStringData. key by code is : > {code:java} > .keyBy(rowData -> rowData.getString(0)){code} > The window result is unexpected, the same username records arrived at > TumbleProcessWindow simultaneously, but these records were calculated in the > different windows. > When I use the below keyBy, the window result is correct. > {code:java} > .keyBy(rowData -> rowData.getString(0).toString()){code} > > Reproduction Code: > {code:java} > env.fromSource( > kafkaSource, > W
[jira] [Created] (FLINK-36371) Field BinaryStringData as keySelector would lead the same records hash shuffle into different tumble windows
Eason Ye created FLINK-36371: Summary: Field BinaryStringData as keySelector would lead the same records hash shuffle into different tumble windows Key: FLINK-36371 URL: https://issues.apache.org/jira/browse/FLINK-36371 Project: Flink Issue Type: Improvement Components: API / Type Serialization System Affects Versions: 2.0.0 Reporter: Eason Ye The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap --(keyby)–> TumbleProcessWindow -> sink. The Source would emit two fields: UserName String, and Amount Integer. FlatMap collects the BinaryRowData type record and emits it to the TumbleProcessWindow operator. KeySelector is using the UserName BinaryStringData. key by code is : {code:java} .keyBy(rowData -> rowData.getString(0)){code} The window result is unexpected, the same username records arrived at TumbleProcessWindow simultaneously, but these records were calculated in the different windows. When I use the below keyBy, the window result is correct. {code:java} .keyBy(rowData -> rowData.getString(0).toString()){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36371) Field BinaryStringData as keySelector would lead the same records hash shuffle into different tumble windows
[ https://issues.apache.org/jira/browse/FLINK-36371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eason Ye updated FLINK-36371: - Description: The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap --(keyby)–> TumbleProcessWindow -> sink. The Source would emit two fields: UserName String, and Amount Integer. FlatMap collects the BinaryRowData type record and emits it to the TumbleProcessWindow operator. KeySelector is using the UserName BinaryStringData. key by code is : {code:java} .keyBy(rowData -> rowData.getString(0)){code} The window result is unexpected, the same username records arrived at TumbleProcessWindow simultaneously, but these records were calculated in the different windows. When I use the below keyBy, the window result is correct. {code:java} .keyBy(rowData -> rowData.getString(0).toString()){code} Reproduction: {code:java} env.fromSource( kafkaSource, WatermarkStrategy.noWatermarks(), sourceName()) .flatMap(new FlatMapFunction() { public void flatMap(RowData value, Collector out) throws Exception { System.out.println("is instanceof BinaryRowData =" + (value instanceof BinaryRowData)); out.collect(value); } }) .keyBy((KeySelector) value -> value.getString(0)) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .process(new FlinkTumbleWindowingProcessFunction()) .print(); {code} was: The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap --(keyby)–> TumbleProcessWindow -> sink. The Source would emit two fields: UserName String, and Amount Integer. FlatMap collects the BinaryRowData type record and emits it to the TumbleProcessWindow operator. KeySelector is using the UserName BinaryStringData. key by code is : {code:java} .keyBy(rowData -> rowData.getString(0)){code} The window result is unexpected, the same username records arrived at TumbleProcessWindow simultaneously, but these records were calculated in the different windows. When I use the below keyBy, the window result is correct. {code:java} .keyBy(rowData -> rowData.getString(0).toString()){code} > Field BinaryStringData as keySelector would lead the same records hash > shuffle into different tumble windows > > > Key: FLINK-36371 > URL: https://issues.apache.org/jira/browse/FLINK-36371 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Affects Versions: 2.0.0 >Reporter: Eason Ye >Priority: Major > > The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap > --(keyby)–> TumbleProcessWindow -> sink. > The Source would emit two fields: UserName String, and Amount Integer. > FlatMap collects the BinaryRowData type record and emits it to the > TumbleProcessWindow operator. > KeySelector is using the UserName BinaryStringData. key by code is : > {code:java} > .keyBy(rowData -> rowData.getString(0)){code} > The window result is unexpected, the same username records arrived at > TumbleProcessWindow simultaneously, but these records were calculated in the > different windows. > When I use the below keyBy, the window result is correct. > {code:java} > .keyBy(rowData -> rowData.getString(0).toString()){code} > > Reproduction: > > {code:java} > env.fromSource( > kafkaSource, > WatermarkStrategy.noWatermarks(), > sourceName()) > .flatMap(new FlatMapFunction() { > public void flatMap(RowData value, Collector out) throws > Exception { > System.out.println("is instanceof BinaryRowData =" + (value > instanceof BinaryRowData)); > out.collect(value); > } > }) > .keyBy((KeySelector) value -> value.getString(0)) > .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) > .process(new FlinkTumbleWindowingProcessFunction()) > .print(); {code} > > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36369][table] Move deprecated user-visible classes in table modules to the legacy package [flink]
xuyangzhong commented on PR #25400: URL: https://github.com/apache/flink/pull/25400#issuecomment-2374330934 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36370) Flink 1.18 fails with Empty server certificate chain when High Availability and mTLS both enabled
[ https://issues.apache.org/jira/browse/FLINK-36370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aniruddh J updated FLINK-36370: --- Priority: Major (was: Minor) > Flink 1.18 fails with Empty server certificate chain when High Availability > and mTLS both enabled > - > > Key: FLINK-36370 > URL: https://issues.apache.org/jira/browse/FLINK-36370 > Project: Flink > Issue Type: Bug >Reporter: Aniruddh J >Priority: Major > Attachments: flink-cert-issue.log > > > Hi, in my kubernetes cluster I have flink-kubernetes-operator v1.7.0 and > apache-flink v1.18.1 installed. In the FlinkDeployment CR when I enable > Kubernetes high availability services with mTLS something like below: > {code:java} > high-availability.type: kubernetes > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > high-availability.storageDir: 'file:///mnt/pv/ha' > security.ssl.rest.authentication-enabled: 'true'{code} > I am ending up with *SSLHandshakeException with empty client certificate* > > Though both of them work fine when implemented individually. Upon enabling > *{{{}-{}}}{{{}[Djavax.net|http://djavax.net/]{}}}{{{}.debug=all{}}}* observed > client server communication and figured out > [https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java] > is where Client gets setup and it happens from the operator side > [https://github.com/apache/flink-kubernetes-operator/blob/b081b75b72ddde643710e869b95b214912882363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L750] > (correct me here please) > > When we enable both mTLS and HA the client doesn't seem to be getting setup. > Not only that, it doesn't follow the same path of client creation. Below is > the part of the ssl handshake log before getting the error (attached the > entire ssl handshake log): > {code:java} > javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 > 15:16:12.508 GMT|null:-1|Produced CertificateRequest handshake message ( > "CertificateRequest": > { "certificate types": [ecdsa_sign, rsa_sign, dss_sign] "supported signature > algorithms": [ecdsa_secp256r1_sha256, .., rsa_sha224, dsa_sha224, ecdsa_sha1, > rsa_pkcs1_sha1, dsa_sha1] "certificate authorities": [CN=FlinkCA, O=Apache > Flink] } > javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 > 15:16:12.512 GMT|null:-1|Raw read ( > : 1603030007 0B 0300 > ) > javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 > 15:16:12.513 GMT|null:-1|READ: TLSv1.2 handshake, length = 7 > javax.net.ssl|DEBUG|53|flink-rest-server-netty-worker-thread-1|2024-09-19 > 15:16:12.513 GMT|null:-1|Consuming client Certificate handshake message ( > "Certificates": > ) > javax.net.ssl|ERROR|53|flink-rest-server-netty-worker-thread-1|2024-09-19 > 15:16:12.514 GMT|null:-1|Fatal (BAD_CERTIFICATE): Empty server certificate > chain ( > "throwable" : { > javax.net.ssl.SSLHandshakeException: Empty server certificate chain > {code} > From the initial looks it seems when Flink server is requesting for > certificates from Client, the client doesn't send anything back since it does > not have certificates matching the CA? > > Some client is sending a REST request to Flink server which the netty library > is handling but until we figure out the client we don't know whether it's the > truststore on client that's a problem or something else we don't see here. > Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [DRAFT][FLINK-29741][scala] Remove flink-dist-scala, flink-scala, flink-streaming-scala module [flink]
codenohup commented on PR #25393: URL: https://github.com/apache/flink/pull/25393#issuecomment-2373796766 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33977][runtime] Adaptive scheduler may not minimize the number of TMs during downscaling [flink]
RocMarshal commented on code in PR #25218: URL: https://github.com/apache/flink/pull/25218#discussion_r1775043513 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java: ## @@ -43,35 +47,43 @@ public Collection assignSlots( Collection freeSlots, VertexParallelism vertexParallelism, JobAllocationsInformation previousAllocations) { -List allGroups = new ArrayList<>(); +checkSlotsSufficient(jobInformation, freeSlots); + +final List allGroups = new ArrayList<>(); for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); } -Iterator iterator = freeSlots.iterator(); +Collection pickedSlots = freeSlots; +if (freeSlots.size() > allGroups.size()) { Review Comment: Thanks for the comments. >Why do we need this condition? To avoid the extra effort of sorting the TMs based on their offered slot count? If yes, could we add this as a comment to underline the intention? Yes, you are right. I'd like to update it. >On the other hand, is it worth to save the sorting efforts if freeSlots.size() == allGroups.size()? 🤔 I guess it would make sense for big amounts of TaskManagers. Yes, so we keep the condition `if (freeSlots.size() > allGroups.size()) {` , which means that this condition you described would not process the sort-logic. > I don't know the limits we have on that front in Flink at the moment. In my limited read, nothing limited about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org