Re: [PR] [FLINK-36151] Add schema evolution related docs [flink-cdc]

2024-08-27 Thread via GitHub


monologuist commented on PR #3575:
URL: https://github.com/apache/flink-cdc/pull/3575#issuecomment-2311755677

   @yuxiqian Hello, I would like to know which Flink CDC version your document 
corresponds to. I only seem to see three schema change Behaviors in Flink CDC 
3.0.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Run schema coordinator logic asynchronously to avoid blocking the main thread [flink-cdc]

2024-08-27 Thread via GitHub


leonardBang commented on PR #3577:
URL: https://github.com/apache/flink-cdc/pull/3577#issuecomment-2311762537

   @loserwang1024 Would you like to review this PR when you have time ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36151] Add schema evolution related docs [flink-cdc]

2024-08-27 Thread via GitHub


yuxiqian commented on PR #3575:
URL: https://github.com/apache/flink-cdc/pull/3575#issuecomment-2311766702

   Hi @monologuist, this PR reflects current `master` situation 
(`release-3.3-SNAPSHOT`). Will backport it to previous release branches later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36148][pipeline-connector][mysql] Add custom parser for CreateTableEvent [flink-cdc]

2024-08-27 Thread via GitHub


leonardBang merged PR #3570:
URL: https://github.com/apache/flink-cdc/pull/3570


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-36148) Support parsing DDL to new CreateTableEvent in binlog reading phase.

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-36148.

Resolution: Fixed

fixed in master: 3c1517f70f728e1ef8a4b6bed942d87d66d874df

> Support parsing DDL to new CreateTableEvent in binlog reading phase.
> 
>
> Key: FLINK-36148
> URL: https://issues.apache.org/jira/browse/FLINK-36148
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: LvYanquan
>Assignee: LvYanquan
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Currently, we didn't  parsing DDL to new CreateTableEvent in binlog reading 
> phase, this will lead to NullPointException when a new table creation DDL was 
> met because a CreateTableEvent was not sent before DataChangeEvent.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36150][pipeline-connector/mysql] tables.exclude is still valid if scan.binlog.newly-added-table.enabled is true. [flink-cdc]

2024-08-27 Thread via GitHub


ruanhang1993 commented on code in PR #3573:
URL: https://github.com/apache/flink-cdc/pull/3573#discussion_r1732312221


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/Selectors.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.schema;
+
+import org.apache.flink.cdc.common.utils.Predicates;
+
+import io.debezium.relational.TableId;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/** Selectors for filtering tables. */
+public class Selectors {
+
+private List selectors;
+
+private Selectors() {}
+
+/**
+ * A {@link Selector} that determines whether a table identified by a 
given {@link TableId} is
+ * to be included.
+ */
+private static class Selector {
+private final Predicate namespacePred;
+private final Predicate tableNamePred;
+
+public Selector(String namespace, String schemaName, String tableName) 
{

Review Comment:
   `schemaName` is never used. Just delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36133) Remove subclass implementations of TableSource and TableSink, along with their corresponding Factory classes.

2024-08-27 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu updated FLINK-36133:
--
Parent: FLINK-33748
Issue Type: Sub-task  (was: Technical Debt)

> Remove subclass implementations of TableSource and TableSink, along with 
> their corresponding Factory classes.
> -
>
> Key: FLINK-36133
> URL: https://issues.apache.org/jira/browse/FLINK-36133
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 2.0.0
>
>
> This Jira is a sub-task in https://issues.apache.org/jira/browse/FLINK-33748
> Remove subclasses such as 
>  * ArrowTableSource
>  * CsvTableSource
>  * ...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Run schema coordinator logic asynchronously to avoid blocking the main thread [flink-cdc]

2024-08-27 Thread via GitHub


leonardBang commented on code in PR #3577:
URL: https://github.com/apache/flink-cdc/pull/3577#discussion_r1732395696


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java:
##
@@ -113,18 +119,27 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
 public SchemaRegistry(
 String operatorName,
 OperatorCoordinator.Context context,
+ExecutorService executorService,
 MetadataApplier metadataApplier,
 List routes) {
-this(operatorName, context, metadataApplier, routes, 
SchemaChangeBehavior.EVOLVE);
+this(
+operatorName,
+context,
+executorService,
+metadataApplier,
+routes,
+SchemaChangeBehavior.EVOLVE);

Review Comment:
   LENIENT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-36066) Introducing the AdaptiveGraphGenerator component

2024-08-27 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-36066:
---

Assignee: Lei Yang

> Introducing the AdaptiveGraphGenerator component
> 
>
> Key: FLINK-36066
> URL: https://issues.apache.org/jira/browse/FLINK-36066
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Assignee: Lei Yang
>Priority: Major
>
> To support the incremental generation of JobGraph and update the StreamGraph 
> based on runtime information, we plan to introduce the AdaptiveGraphManager 
> component, which will implement the AdaptiveGraphGenerator interface. The 
> AdaptiveGraphGenerator interface is responsible for:
>  # Generating the JobGraph
>  # Responding to upstream job vertex finished events, generating JobVertex, 
> and updating it to the JobGraph
>  # Providing StreamGraphContext to allow StreamGraphOptimizer to add, delete, 
> and modify StreamNode and StreamEdge



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36033) FLIP-469: Supports Adaptive Optimization of StreamGraph

2024-08-27 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-36033:
---

Assignee: Junrui Li

> FLIP-469: Supports Adaptive Optimization of StreamGraph
> ---
>
> Key: FLINK-36033
> URL: https://issues.apache.org/jira/browse/FLINK-36033
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>
> This is the umbrella ticket for 
> [FLIP-469|https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36032) FLIP-468: Introducing StreamGraph-Based Job Submission

2024-08-27 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-36032:
---

Assignee: Junrui Li

> FLIP-468: Introducing StreamGraph-Based Job Submission
> --
>
> Key: FLINK-36032
> URL: https://issues.apache.org/jira/browse/FLINK-36032
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination, Runtime / REST
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>
> This is the umbrella ticket for 
> [FLIP-468|https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35579] update frocksdb version to v8.10.0 [flink]

2024-08-27 Thread via GitHub


mayuehappy commented on PR #25253:
URL: https://github.com/apache/flink/pull/25253#issuecomment-2311939034

   > Caused by: org.rocksdb.RocksDBException: `/dev/null' exists but is not a 
directory
   
   
   This is because when run e2e test, the directory of rocksdb log will be 
specified as` /dev/null` by default. But in Rocksdb-v8.10 will `call s = 
env->CreateDirIfMissing(options.db_log_dir);` This will lead the creation of 
Logger to fail.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35579] update frocksdb version to v8.10.0 [flink]

2024-08-27 Thread via GitHub


mayuehappy commented on PR #25253:
URL: https://github.com/apache/flink/pull/25253#issuecomment-2311939990

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36159) FLIP-470: Support Adaptive Broadcast Join

2024-08-27 Thread xingbe (Jira)
xingbe created FLINK-36159:
--

 Summary: FLIP-470: Support Adaptive Broadcast Join
 Key: FLINK-36159
 URL: https://issues.apache.org/jira/browse/FLINK-36159
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Coordination, Table SQL / Planner, Table SQL / 
Runtime
Reporter: xingbe


This is the umbrella ticket for 
[FLIP-470|[http://example.com|https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join]].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36159) FLIP-470: Support Adaptive Broadcast Join

2024-08-27 Thread xingbe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xingbe updated FLINK-36159:
---
Description: This is the umbrella ticket for FLIP-470 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join.
  (was: This is the umbrella ticket for 
[FLIP-470|[http://example.com|https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join]].)

> FLIP-470: Support Adaptive Broadcast Join
> -
>
> Key: FLINK-36159
> URL: https://issues.apache.org/jira/browse/FLINK-36159
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination, Table SQL / Planner, Table SQL / 
> Runtime
>Reporter: xingbe
>Priority: Major
>
> This is the umbrella ticket for FLIP-470 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35773][docs] Document s5cmd [flink]

2024-08-27 Thread via GitHub


pnowojski commented on code in PR #25235:
URL: https://github.com/apache/flink/pull/25235#discussion_r1732443462


##
docs/content/docs/deployment/filesystems/s3.md:
##
@@ -164,4 +164,38 @@ The `s3.entropy.key` defines the string in paths that is 
replaced by the random
 If a file system operation does not pass the *"inject entropy"* write option, 
the entropy key substring is simply removed.
 The `s3.entropy.length` defines the number of random alphanumeric characters 
used for entropy.
 
+## s5cmd
+
+Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` can be configured to use 
the [s5cmd tool](https://github.com/peak/s5cmd) for faster file upload and 
download.
+[Benchmark 
results](https://cwiki.apache.org/confluence/display/FLINK/FLIP-444%3A+Native+file+copy+support)
 are showing that `s5cmd` can be over 2 times more CPU efficient. 
+Which means either using half the CPU to upload or download the same set of 
files, or doing that twice as fast with the same amount of available CPU.
+
+In order to use this feature, the `s5cmd` binary has to be present and 
accessible to the Flink's task managers, for example via embedding it in the 
used docker image.
+Secondly the path to the `s5cmd` has to be configured via:
+```yaml
+s3.s5cmd.path: /path/to/the/s5cmd
+```
+
+The remaining configuration options (with their default value listed below) 
are:

Review Comment:
   I'm doing that in a section below. Or do you mean something else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35773][docs] Document s5cmd [flink]

2024-08-27 Thread via GitHub


pnowojski commented on PR #25235:
URL: https://github.com/apache/flink/pull/25235#issuecomment-2311981366

   > And I guess we also need to update zh version?
   
   Yep, I wanted to wait until the english version is mostly done/approved, in 
order to avoid copy pasting fixups.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36124][filesystem][s3]Make S3RecoverableFsDataOutputStream.sync not to close the stream [flink]

2024-08-27 Thread via GitHub


ferenc-csaky commented on PR #25231:
URL: https://github.com/apache/flink/pull/25231#issuecomment-2312059662

   Thanks for the fix @gaborgsomogyi!
   
   As I tried to understand the problem we have here, I checked the history of 
what happened here before and why. The previous change that was made 
`S3RecoverableFsDataOutputStream#sync()` done in 
[FLINK-28513](https://issues.apache.org/jira/browse/FLINK-28513). The error 
msg. there states:
   ```
   S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to 
create a persistent recoverable intermediate point.
   ```
   If we check the [`persist()` 
implementation](https://github.com/apache/flink/blob/9bcd8f4b8f48c8d9ad05575b60779c9216ee4965/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java#L158),
 it will upload the local part to S3, but will not enforce closure of that 
local part file, neither will commit the snapshotted state. IIUC committing is 
crucial inside `sync()`, because it is a void method, so if we would simply 
call `persist()` inside, we would lose the `S3Recoverable` object it returns, 
and the ability to resume correctly in case of a failure. So without changing 
the interfaces, it is necessary to commit here and make sure we have a 
consistent state before we leave the `sync()` method and release the lock.
   
   Based on these conclusions, if the current solution performs good based 
according to the actual usage, the fix itself LGTM.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36143][ES6][ES7] Intro retry-on-conflict param to resolve Sink ES occurred "version conflict" [flink-connector-elasticsearch]

2024-08-27 Thread via GitHub


leosanqing commented on PR #109:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/109#issuecomment-2312079949

   @schulzp @alpinegizmo @aljoscha @rmetzger  Has any body to review my code? 
thk ❤️❤️


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36117] Implement AsyncKeyedStateBackend for RocksDBKeyedStateBackend and HeapKeyedStateBackend [flink]

2024-08-27 Thread via GitHub


Zakelly commented on code in PR #25233:
URL: https://github.com/apache/flink/pull/25233#discussion_r1732498188


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorUtils.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+

Review Comment:
   Please provide some javadoc here. Same for other new introduced classes.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptorUtils.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+public class StateDescriptorUtils {
+private StateDescriptorUtils() {}
+
+public static org.apache.flink.api.common.state.StateDescriptor 
transformFromV2ToV1(
+StateDescriptor stateDescriptorV2) {
+org.apache.flink.api.common.state.StateDescriptor stateDescriptorV1 =
+new org.apache.flink.api.common.state.StateDescriptor(
+"", stateDescriptorV2.getSerializer(), null) {
+@Override
+public Type getType() {
+StateDescriptor.Type typeV2 = 
stateDescriptorV2.getType();
+switch (typeV2) {
+case VALUE:
+return 
org.apache.flink.api.common.state.StateDescriptor.Type.VALUE;
+case LIST:
+return 
org.apache.flink.api.common.state.StateDescriptor.Type.LIST;
+case REDUCING:
+return 
org.apache.flink.api.common.state.StateDescriptor.Type
+.REDUCING;
+case AGGREGATING:
+return 
org.apache.flink.api.common.state.StateDescriptor.Type
+.AGGREGATING;
+case MAP:
+return 
org.apache.flink.api.common.state.StateDescriptor.Type.MAP;
+default:
+throw new IllegalArgumentException(
+"Unsupported state type: " + typeV2);
+}
+}
+};
+return stateDescriptorV1;

Review Comment:
   We should not use anonymous class here.
   A preferred way:
   ```
   @SuppressWarnings({"unchecked", "rawtypes"})
   public static  
org.apache.flink.api.common.state.StateDescriptor transformFromV2ToV1(
   StateDescriptor stateDescriptorV2) {
   switch (stateDescriptorV2.getType()) {
   case VALUE:
   return new 
org.apache.flink.api.common.state.ValueStateDescriptor(stateDescriptorV2.getStateId(),
 stateDescriptorV2.getSerializer());
   case MAP:
   //xxx
   case LIST:
   //xxx
   case REDUCING:
   //xxx
   case AGGREGATING:
   //xxx
   default:
   throw new IllegalArgumentException(
   "Unsupported state type: " + 
stateDescriptorV2.getType());
   }
   }
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ValueStateV2Wrapper.java:
#

Re: [PR] fixup! [FLINK-35886][task] Fix watermark idleness timeout accounting when subtask is backpressured/blocked [flink]

2024-08-27 Thread via GitHub


pnowojski commented on code in PR #25245:
URL: https://github.com/apache/flink/pull/25245#discussion_r1732604961


##
pom.xml:
##
@@ -2355,8 +2355,6 @@ under the License.

@org.apache.flink.annotation.Experimental

@org.apache.flink.annotation.Internal

-   
-   
org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier$Context


org.apache.flink.util.clock.Clock

Review Comment:
   I'm keeping this incompatibility excluded as is. I think/hope it should be 
fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fixup! [FLINK-35886][task] Fix watermark idleness timeout accounting when subtask is backpressured/blocked [flink]

2024-08-27 Thread via GitHub


pnowojski commented on PR #25245:
URL: https://github.com/apache/flink/pull/25245#issuecomment-2312186890

   Thanks @rkhachatryan for looking into this. Let's try without reverting. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fixup! (1.20 backport) [FLINK-35886][task] Fix watermark idleness timeout accounting when subtask is backpressured/blocked [flink]

2024-08-27 Thread via GitHub


pnowojski commented on PR #25246:
URL: https://github.com/apache/flink/pull/25246#issuecomment-2312192390

   Thanks @rkhachatryan for looking into this. Let's try without reverting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-36063) Migrate StreamGraph and its related classes to flink-runtime

2024-08-27 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-36063.
---
Fix Version/s: 2.0.0
   Resolution: Fixed

master: 51f1684be0214a1a8c47ffb4ae20b6ebf6f0fc0c

> Migrate StreamGraph and its related classes to flink-runtime
> 
>
> Key: FLINK-36063
> URL: https://issues.apache.org/jira/browse/FLINK-36063
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> In order to allow relevant components in the runtime layer to access the 
> StreamGraph and its related classes, we should migrate StreamGraph and its 
> related classes from flink-streaming-java to flink-runtime.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36063][runtime] Migrate StreamGraph and its related classes from flink-streaming-java to flink-runtime. [flink]

2024-08-27 Thread via GitHub


zhuzhurk closed pull request #25219: [FLINK-36063][runtime] Migrate StreamGraph 
and its related classes from flink-streaming-java to flink-runtime.
URL: https://github.com/apache/flink/pull/25219


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34510][Runtime/State]Rename RestoreMode to RecoveryClaimMode [flink]

2024-08-27 Thread via GitHub


Zakelly commented on code in PR #25192:
URL: https://github.com/apache/flink/pull/25192#discussion_r1732616249


##
flink-runtime-web/src/test/resources/rest_api_v1.snapshot:
##
@@ -4190,4 +4190,4 @@
   }
 }
   } ]
-}

Review Comment:
   Ah, it's better to revert this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35579] update frocksdb version to v8.10.0 [flink]

2024-08-27 Thread via GitHub


mayuehappy commented on PR #25253:
URL: https://github.com/apache/flink/pull/25253#issuecomment-2312392245

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35709][Table SQL / API] Allow defining a schema in REPLACE TABLE AS (RTAS) [flink]

2024-08-27 Thread via GitHub


spena commented on code in PR #25247:
URL: https://github.com/apache/flink/pull/25247#discussion_r1732752338


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java:
##
@@ -63,9 +79,90 @@ public class MergeTableAsUtil {
 }
 
 /**
- * Merges the schema part of the {@code sqlCreateTableAs} with the {@code 
sourceSchema}.
+ * Rewrites the query operation to include only the fields that may be 
persisted in the sink.
+ */
+public PlannerQueryOperation maybeRewriteQuery(
+CatalogManager catalogManager,
+FlinkPlannerImpl flinkPlanner,
+PlannerQueryOperation origQueryOperation,
+SqlNode origQueryNode,
+ResolvedCatalogTable sinkTable) {
+FlinkCalciteSqlValidator sqlValidator = 
flinkPlanner.getOrCreateSqlValidator();
+SqlRewriterUtils rewriterUtils = new SqlRewriterUtils(sqlValidator);
+FlinkTypeFactory typeFactory = (FlinkTypeFactory) 
sqlValidator.getTypeFactory();
+
+// Only fields that may be persisted will be included in the select 
query
+RowType sinkRowType =
+((RowType) 
sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType());
+
+Map sourceFields =
+IntStream.range(0, 
origQueryOperation.getResolvedSchema().getColumnNames().size())
+.boxed()
+.collect(
+Collectors.toMap(
+
origQueryOperation.getResolvedSchema().getColumnNames()
+::get,
+Function.identity()));
+
+// assignedFields contains the new sink fields that are not present in 
the source
+// and that will be included in the select query
+LinkedHashMap assignedFields = new LinkedHashMap<>();
+
+// targetPositions contains the positions of the source fields that 
will be
+// included in the select query
+List targetPositions = new ArrayList<>();

Review Comment:
   I thought that too, but it doesn't work. The method 
`rewriterUtils.rewriteSelect()` in line 147 doesn't let me pass a 
List. It is probably because it is a scala code. I even tried to do a 
casting `(List) targetPositions` and still it doesn't let me call it 
that way. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36124][filesystem][s3]Make S3RecoverableFsDataOutputStream.sync not to close the stream [flink]

2024-08-27 Thread via GitHub


gaborgsomogyi commented on code in PR #25231:
URL: https://github.com/apache/flink/pull/25231#discussion_r1732792672


##
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:
##
@@ -129,10 +129,9 @@ public void sync() throws IOException {
 lock();
 try {
 fileStream.flush();
-openNewPartIfNecessary(userDefinedMinPartSize);
+uploadCurrentAndOpenNewPart(fileStream.getPos());

Review Comment:
   Does anybody knows where are these tests? It would be good to proceed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36124][filesystem][s3]Make S3RecoverableFsDataOutputStream.sync not to close the stream [flink]

2024-08-27 Thread via GitHub


gaborgsomogyi commented on code in PR #25231:
URL: https://github.com/apache/flink/pull/25231#discussion_r1732814002


##
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java:
##
@@ -129,10 +129,9 @@ public void sync() throws IOException {
 lock();
 try {
 fileStream.flush();
-openNewPartIfNecessary(userDefinedMinPartSize);
+uploadCurrentAndOpenNewPart(fileStream.getPos());

Review Comment:
   @MartijnVisser could you plz lend a helping hand here? That said this is 
working in our internal system but happy to do the homework and execute the `S3 
connector test suite` but no idea what we talk about...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Backport bug fixes from `master` to `release 3.2` branch [flink-cdc]

2024-08-27 Thread via GitHub


leonardBang commented on PR #3566:
URL: https://github.com/apache/flink-cdc/pull/3566#issuecomment-2312518458

   Thanks @yuxiqian for the backport, all CI passed, I'll organize the commits 
and merge it soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36160) Support hive advanced configuration

2024-08-27 Thread zhuanshenbsj1 (Jira)
zhuanshenbsj1 created FLINK-36160:
-

 Summary: Support hive advanced configuration
 Key: FLINK-36160
 URL: https://issues.apache.org/jira/browse/FLINK-36160
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 2.0.0
Reporter: zhuanshenbsj1
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36160] Support hive advanced configuration [flink]

2024-08-27 Thread via GitHub


zhuanshenbsj1 opened a new pull request, #25258:
URL: https://github.com/apache/flink/pull/25258

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36161) Integration Test Example uses deprecated SinkFunction

2024-08-27 Thread Wolff Bock von Wuelfingen (Jira)
Wolff Bock von Wuelfingen created FLINK-36161:
-

 Summary: Integration Test Example uses deprecated SinkFunction
 Key: FLINK-36161
 URL: https://issues.apache.org/jira/browse/FLINK-36161
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Tests
Affects Versions: 1.20.0, 1.20.1
Reporter: Wolff Bock von Wuelfingen


[This 
example|https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/testing/]
 uses SinkFunction, which is deprecated. It should be replaced by something 
else, i'm sure someone who is actually proficient in Flink knows what's best 
for an example.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36160] Support hive advanced configuration [flink]

2024-08-27 Thread via GitHub


flinkbot commented on PR #25258:
URL: https://github.com/apache/flink/pull/25258#issuecomment-2312648935

   
   ## CI report:
   
   * 65a84f3a32e91294c6227f460c6ce97bc0d73868 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35709][Table SQL / API] Allow defining a schema in REPLACE TABLE AS (RTAS) [flink]

2024-08-27 Thread via GitHub


twalthr merged PR #25247:
URL: https://github.com/apache/flink/pull/25247


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35708) Allow defining a schema in REPLACE TABLE AS (RTAS)

2024-08-27 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-35708.

Fix Version/s: 2.0.0
   Resolution: Fixed

Fixed in master: c1fa29ac91aeef64701d8d97b13644a4267e8f8e

> Allow defining a schema in REPLACE TABLE AS (RTAS)
> --
>
> Key: FLINK-35708
> URL: https://issues.apache.org/jira/browse/FLINK-35708
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergio Peña
>Assignee: Sergio Peña
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Allow a schema definition on RTAS statements.
> For following syntax should be allowed:
> {noformat}
> [CREATE OR] REPLACE TABLE table_name [( {  } )]
> WITH (table_properties)AS SELECT query_expression; 
> :  
>   [ [, ...n] ],  
>   [  ],  
>   [ [, ...n] ]{noformat}
> The behavior is similar to the CTAS statement (see 
> https://issues.apache.org/jira/browse/FLINK-35707)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35790] Update docs for new schema definition in CTAS and RTAS [flink]

2024-08-27 Thread via GitHub


twalthr merged PR #25238:
URL: https://github.com/apache/flink/pull/25238


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35790) Update docs for new schema definition in CTAS and RTAS

2024-08-27 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-35790.

Fix Version/s: 2.0.0
 Assignee: Sergio Peña
   Resolution: Fixed

Fixed in master: 8c116def0b514236da1e95ef20e81dc364aa2b71

> Update docs for new schema definition in CTAS and RTAS
> --
>
> Key: FLINK-35790
> URL: https://issues.apache.org/jira/browse/FLINK-35790
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergio Peña
>Assignee: Sergio Peña
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36145] Rename flinkStateSnapshotReference to initialState [flink-kubernetes-operator]

2024-08-27 Thread via GitHub


gyfora opened a new pull request, #873:
URL: https://github.com/apache/flink-kubernetes-operator/pull/873

   ## What is the purpose of the change
   
   Rename JobSpec.flinkStateSnapshotReference to JobSpec.initialState. This 
should make less verbose and easier to memorize for users. 
   
   flinkStateSnapshotReference field has not been released officially yet, so 
this change should not affect production users.
   
   ## Brief change log
   
   - Rename in Java, Markdown files and examples
   - Regenerate CRDs
   
   ## Verifying this change
   
   - Unit tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
yes
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? updated
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36145][snapshot] Rename flinkStateSnapshotReference to snapshotReference [flink-kubernetes-operator]

2024-08-27 Thread via GitHub


gyfora commented on PR #870:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/870#issuecomment-2312765821

   I have opened a new PR with the further shortened `initialState` version
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35177] Fix DataGen Connector documentation [flink]

2024-08-27 Thread via GitHub


GOODBOY008 commented on code in PR #24692:
URL: https://github.com/apache/flink/pull/24692#discussion_r1733042779


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java:
##
@@ -76,14 +76,14 @@
  * Long values at the overall source rate (across all source subtasks) of 100 
events per second.

Review Comment:
   ```suggestion
* String values at the overall source rate (across all source subtasks) of 
100 events per second.
   ```



##
docs/content.zh/docs/connectors/datastream/datagen.md:
##
@@ -65,14 +65,13 @@ Rate Limiting
 `Long` values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.

Review Comment:
   ```suggestion
   `String` values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
   ```



##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -65,14 +65,13 @@ Rate Limiting
 `Long` values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.

Review Comment:
   ```suggestion
   `String` values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36145] Rename flinkStateSnapshotReference to initialState [flink-kubernetes-operator]

2024-08-27 Thread via GitHub


gyfora commented on PR #873:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/873#issuecomment-2312807325

   Actually @tweise @mateczagany , I have a slight concern about this 
initialState reference field overall. It may make sense to simply keep the 
current `initialSavepointPath` field, because having a reference like field may 
allow users to "steal" the savepointpath of other users through the operator's 
permissions.
   
   With the old way you either know the path or you don't.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36162) Remove flinkStateSnapshotReference and namespace from FlinkStateSnapshot jobReference

2024-08-27 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-36162:
--

 Summary: Remove flinkStateSnapshotReference and namespace from 
FlinkStateSnapshot jobReference
 Key: FLINK-36162
 URL: https://issues.apache.org/jira/browse/FLINK-36162
 Project: Flink
  Issue Type: Sub-task
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora


I think in the initial version we should remove both the newly introduced 
job.spec.flinkStateSnapshotReference and 
FlinkStateSnapshot.jobReference.namspace fields as they generally allow users 
to trigger and access savepoint paths from namespaces where the user may not 
have permissions.

Let me give you 2 examples:

jobReference.namespace, allows us to trigger a savepoint for a job in a 
different namespace. This works as long as the operator has access to the user 
and does not verify that the current user in fact does. This may ultimately 
allow us to trigger a savepoint to a custom place and even steal the state.

In a similar way the initial flinkStateSnapshot reference would allow us to 
steal a savepoint path that we normally don't know/have access to and store it 
in our resource.

I suggest to simply remove these until we have a good way to solve these 
issues, I think there is generally not much use for these fields overall.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36145] Rename flinkStateSnapshotReference to initialState [flink-kubernetes-operator]

2024-08-27 Thread via GitHub


gyfora closed pull request #873: [FLINK-36145] Rename 
flinkStateSnapshotReference to initialState
URL: https://github.com/apache/flink-kubernetes-operator/pull/873


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36145) Change JobSpec.flinkStateSnapshotReference to snapshotReference

2024-08-27 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877085#comment-17877085
 ] 

Gyula Fora commented on FLINK-36145:


I will probably close this in favour of 
https://issues.apache.org/jira/browse/FLINK-36162

 

> Change JobSpec.flinkStateSnapshotReference to snapshotReference
> ---
>
> Key: FLINK-36145
> URL: https://issues.apache.org/jira/browse/FLINK-36145
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.10.0
>
>
> To avoid redundant / verbose naming we should change this field name in the 
> spec before it's released:
> JobSpec.flinkStateSnapshotReference -> JobSpec.snapshotReference or 
> JobSpec.stateSnapshotReference



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36162) Remove flinkStateSnapshotReference and namespace from FlinkStateSnapshot jobReference

2024-08-27 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877086#comment-17877086
 ] 

Gyula Fora commented on FLINK-36162:


cc [~thw] 

> Remove flinkStateSnapshotReference and namespace from FlinkStateSnapshot 
> jobReference
> -
>
> Key: FLINK-36162
> URL: https://issues.apache.org/jira/browse/FLINK-36162
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Blocker
>
> I think in the initial version we should remove both the newly introduced 
> job.spec.flinkStateSnapshotReference and 
> FlinkStateSnapshot.jobReference.namspace fields as they generally allow users 
> to trigger and access savepoint paths from namespaces where the user may not 
> have permissions.
> Let me give you 2 examples:
> jobReference.namespace, allows us to trigger a savepoint for a job in a 
> different namespace. This works as long as the operator has access to the 
> user and does not verify that the current user in fact does. This may 
> ultimately allow us to trigger a savepoint to a custom place and even steal 
> the state.
> In a similar way the initial flinkStateSnapshot reference would allow us to 
> steal a savepoint path that we normally don't know/have access to and store 
> it in our resource.
> I suggest to simply remove these until we have a good way to solve these 
> issues, I think there is generally not much use for these fields overall.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36150) tables.exclude is not valid if scan.binlog.newly-added-table.enabled is true.

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-36150:
---
Issue Type: Bug  (was: Improvement)

> tables.exclude is not valid if scan.binlog.newly-added-table.enabled is true.
> -
>
> Key: FLINK-36150
> URL: https://issues.apache.org/jira/browse/FLINK-36150
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Current, `tables.exclude` is provided for user to exclude some table, because
>  tables passed to DataSource will be filtered when MySqlDataSourceFactory 
> creates DataSource.
> However, when scan.binlog.newly-added-table.enabled is true, new table ddl 
> from binlog will be read and won't be filtered by `tables.exclude`.
>  
> This Jira aims to pass `tables.exclude` to MySqlSourceConfig and will use it 
> when find tables from mysql database.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36150) tables.exclude is not valid if scan.binlog.newly-added-table.enabled is true.

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-36150:
--

Assignee: Hongshun Wang

> tables.exclude is not valid if scan.binlog.newly-added-table.enabled is true.
> -
>
> Key: FLINK-36150
> URL: https://issues.apache.org/jira/browse/FLINK-36150
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Current, `tables.exclude` is provided for user to exclude some table, because
>  tables passed to DataSource will be filtered when MySqlDataSourceFactory 
> creates DataSource.
> However, when scan.binlog.newly-added-table.enabled is true, new table ddl 
> from binlog will be read and won't be filtered by `tables.exclude`.
>  
> This Jira aims to pass `tables.exclude` to MySqlSourceConfig and will use it 
> when find tables from mysql database.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36150][pipeline-connector/mysql] tables.exclude is still valid if scan.binlog.newly-added-table.enabled is true. [flink-cdc]

2024-08-27 Thread via GitHub


leonardBang merged PR #3573:
URL: https://github.com/apache/flink-cdc/pull/3573


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [minor][cdc-runtime] Run schema coordinator logic asynchronously to avoid blocking the main thread [flink-cdc]

2024-08-27 Thread via GitHub


leonardBang merged PR #3577:
URL: https://github.com/apache/flink-cdc/pull/3577


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-36150) tables.exclude is not valid if scan.binlog.newly-added-table.enabled is true.

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-36150.

Resolution: Fixed

master: afe9c3c0adb77594565b24c004daa6385d50d5fc

3.2: todo

> tables.exclude is not valid if scan.binlog.newly-added-table.enabled is true.
> -
>
> Key: FLINK-36150
> URL: https://issues.apache.org/jira/browse/FLINK-36150
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Current, `tables.exclude` is provided for user to exclude some table, because
>  tables passed to DataSource will be filtered when MySqlDataSourceFactory 
> creates DataSource.
> However, when scan.binlog.newly-added-table.enabled is true, new table ddl 
> from binlog will be read and won't be filtered by `tables.exclude`.
>  
> This Jira aims to pass `tables.exclude` to MySqlSourceConfig and will use it 
> when find tables from mysql database.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Backport bug fixes from `master` to `release 3.2` branch [flink-cdc]

2024-08-27 Thread via GitHub


leonardBang merged PR #3566:
URL: https://github.com/apache/flink-cdc/pull/3566


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35890][cdc] Select only the increment but still enter the SNAPSHOT lead to ckp failed [flink-cdc]

2024-08-27 Thread via GitHub


leonardBang commented on PR #3568:
URL: https://github.com/apache/flink-cdc/pull/3568#issuecomment-2313070670

   Thanks @leosanqing for the contribution, could you also check other 
connectors? I think some start up mode of debezium should not pass to 
underlying debeziunm engine once the Flink CDC incremental snapshot mode 
enabled.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36128) The in-flight data will meet unrecoverable issue when restore from checkpoint once old schema has been flushed to downstream systems

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-36128:
---
Fix Version/s: cdc-3.3.0

> The in-flight data will meet unrecoverable issue when restore from checkpoint 
> once old schema has been flushed to downstream systems
> 
>
> Key: FLINK-36128
> URL: https://issues.apache.org/jira/browse/FLINK-36128
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: yux
>Assignee: yux
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.3.0
>
>
> Currently, default schema evolution mode "EVOLVE" could not handle 
> exceptions, and might not be able to restore from existing state correctly 
> after failover. Before we can "manually trigger checkpoint" that was 
> introduced in Flink 1.19, making "LENIENT" a default option might be more 
> suitable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36128) The in-flight data will meet unrecoverable issue when restore from checkpoint once old schema has been flushed to downstream systems

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-36128.

Resolution: Fixed

master: 2e938a92f5335515f7eb46077254aee25acd9107

release-3.2: debd43cdd75eb53ceed8c2483e0a14edc1b265c6

> The in-flight data will meet unrecoverable issue when restore from checkpoint 
> once old schema has been flushed to downstream systems
> 
>
> Key: FLINK-36128
> URL: https://issues.apache.org/jira/browse/FLINK-36128
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: yux
>Assignee: yux
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Currently, default schema evolution mode "EVOLVE" could not handle 
> exceptions, and might not be able to restore from existing state correctly 
> after failover. Before we can "manually trigger checkpoint" that was 
> introduced in Flink 1.19, making "LENIENT" a default option might be more 
> suitable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35056) when initial sqlserver table that's primary key is datetime type, it org.apache.flink.table.api.ValidationException: Timestamp precision must be between 0 and 9 (both

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-35056.

Resolution: Fixed

> when initial sqlserver table that's primary key is datetime type,  it 
> org.apache.flink.table.api.ValidationException: Timestamp precision must be 
> between 0 and 9 (both inclusive)
> --
>
> Key: FLINK-35056
> URL: https://issues.apache.org/jira/browse/FLINK-35056
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: fengfengzai
>Assignee: Sergei Morozov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
> Attachments: sqlserver-bug.png
>
>
> when initial sqlserver table that's primary key is datetime type.
> it error:
>  org.apache.flink.table.api.ValidationException: Timestamp precision must be 
> between 0 and 9 (both inclusive)
>  
> i find datetime's length is 23. it exceed 9. so it errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35056) when initial sqlserver table that's primary key is datetime type, it org.apache.flink.table.api.ValidationException: Timestamp precision must be between 0 and 9

2024-08-27 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876041#comment-17876041
 ] 

Leonard Xu edited comment on FLINK-35056 at 8/27/24 4:56 PM:
-

Fixed in master: 52f2019469cdeb63246317b6cb8c6b825233c27c

release-3.2:daf28ddc99ae5678f50e6f3c3de0d959567d3522


was (Author: leonard xu):
Fixed in master: 52f2019469cdeb63246317b6cb8c6b825233c27c

release-3.2:TODO

> when initial sqlserver table that's primary key is datetime type,  it 
> org.apache.flink.table.api.ValidationException: Timestamp precision must be 
> between 0 and 9 (both inclusive)
> --
>
> Key: FLINK-35056
> URL: https://issues.apache.org/jira/browse/FLINK-35056
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: fengfengzai
>Assignee: Sergei Morozov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
> Attachments: sqlserver-bug.png
>
>
> when initial sqlserver table that's primary key is datetime type.
> it error:
>  org.apache.flink.table.api.ValidationException: Timestamp precision must be 
> between 0 and 9 (both inclusive)
>  
> i find datetime's length is 23. it exceed 9. so it errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36114) Schema registry should block when handling existing requests

2024-08-27 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875816#comment-17875816
 ] 

Leonard Xu edited comment on FLINK-36114 at 8/27/24 4:58 PM:
-

Fixed in master: ee843e2f246c300ecfeaf5cfd529693fadf2625f

release-3.2: 6dd1c11fe13f29fcaf202c35234040f44383e3bf


was (Author: leonard xu):
Fixed in master: ee843e2f246c300ecfeaf5cfd529693fadf2625f

> Schema registry should block when handling existing requests
> 
>
> Key: FLINK-36114
> URL: https://issues.apache.org/jira/browse/FLINK-36114
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: yux
>Assignee: yux
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Currently, SchemaRegistry asynchronously receives schema change requests from 
> SchemaOperator, and results of multiple requests might got mixed up together, 
> causing incorrect logic flow in multiple parallelism cases.
> Changing SchemaRegistry's behavior to accept requests in serial should 
> resolve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36092) Transform doesn't fully support with schema evolution

2024-08-27 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875826#comment-17875826
 ] 

Leonard Xu edited comment on FLINK-36092 at 8/27/24 4:57 PM:
-

Fixed in master:3837887819021638a7b82018887d0dd146a4b4de

release-3.2: dacbe7c34a4f310fdfaf0abd38f7efb0adc74fde


was (Author: leonard xu):
Fixed in master:3837887819021638a7b82018887d0dd146a4b4de

> Transform doesn't fully support with schema evolution
> -
>
> Key: FLINK-36092
> URL: https://issues.apache.org/jira/browse/FLINK-36092
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: yux
>Assignee: yux
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Currently, transformed table could not fully support upstream schema 
> evolutions.
> Need to add more test cases to test if add / alterType / rename / drop column 
> events works with wildcard matchers and non-wildcard matchers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36081) Flink CDC MySQL source connector missing some columns data of newly added tables

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-36081:
---
Fix Version/s: cdc-3.2.0

> Flink CDC MySQL source connector missing some columns data of newly added 
> tables
> 
>
> Key: FLINK-36081
> URL: https://issues.apache.org/jira/browse/FLINK-36081
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
> Environment: jdk 11
> flink 1.17
> flinkcdc 3.0.0
>Reporter: Mingya Wang
>Assignee: Mingya Wang
>Priority: Major
>  Labels: mysql-cdc-connector, pull-request-available
> Fix For: cdc-3.2.0, cdc-3.3.0
>
>
> *Problem Description:*
> When adding a new table, the Flink CDC MySQL source connector experiences 
> missing data for some columns of the newly added table.
> *Reproduction Scenario:*
>  # Remove a table from a cdc job that is running normally, then start the job 
> with resume functionality.
>  # Perform a column addition operation on the removed table.
>  # Add the table back to the job. The job continues to run without 
> interruption upon table addition, but data for the newly added columns is 
> missing in the synchronized data.
> *Cause Analysis:*
> The issue arises because the MySQL CDC Source maintains the table schema in 
> state. When adding a new table, it recovers the schema from the previous 
> state. Since the prior schema exists and represents the structure before the 
> column addition, the MySQL CDC Source provides the downstream with data based 
> on the schema cached in the state. Consequently, records outputted to 
> downstream systems are missing the fields corresponding to the newly added 
> columns.
> *Proposed Solution:*
> Upon removing a table from the cdc job, it is necessary to also 
> correspondingly remove the table from the MySQLBinlogSplit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36094) CDC SchemaRegistryRequestHandler should throw exception which is not SchemaEvolveException

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-36094.

Resolution: Fixed

fixed master: 6205a5a0f16d2cf72ff751573351c4e15ea59efb

release-3.2: 5a80757443ecc816ff6db0cb7618aaa4d4bce9b7

> CDC SchemaRegistryRequestHandler should throw exception which is not 
> SchemaEvolveException
> --
>
> Key: FLINK-36094
> URL: https://issues.apache.org/jira/browse/FLINK-36094
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Current, SchemaRegistryRequestHandler only throw 
> SchemaEvolveException, which will not handle the others(like network, oom, or 
> else.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36081) Flink CDC MySQL source connector missing some columns data of newly added tables

2024-08-27 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875326#comment-17875326
 ] 

Leonard Xu edited comment on FLINK-36081 at 8/27/24 5:00 PM:
-

Fixed in master(3.3-SNAPSHOT): 77c63385d947f3bb8e726561a7f01cd383941a96

release-3.2: 39afc4fc891abcb937fa0517e651caad34df196d


was (Author: ruanhang1993):
Fixed in master(3.3-SNAPSHOT): 77c63385d947f3bb8e726561a7f01cd383941a96

> Flink CDC MySQL source connector missing some columns data of newly added 
> tables
> 
>
> Key: FLINK-36081
> URL: https://issues.apache.org/jira/browse/FLINK-36081
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
> Environment: jdk 11
> flink 1.17
> flinkcdc 3.0.0
>Reporter: Mingya Wang
>Assignee: Mingya Wang
>Priority: Major
>  Labels: mysql-cdc-connector, pull-request-available
> Fix For: cdc-3.3.0
>
>
> *Problem Description:*
> When adding a new table, the Flink CDC MySQL source connector experiences 
> missing data for some columns of the newly added table.
> *Reproduction Scenario:*
>  # Remove a table from a cdc job that is running normally, then start the job 
> with resume functionality.
>  # Perform a column addition operation on the removed table.
>  # Add the table back to the job. The job continues to run without 
> interruption upon table addition, but data for the newly added columns is 
> missing in the synchronized data.
> *Cause Analysis:*
> The issue arises because the MySQL CDC Source maintains the table schema in 
> state. When adding a new table, it recovers the schema from the previous 
> state. Since the prior schema exists and represents the structure before the 
> column addition, the MySQL CDC Source provides the downstream with data based 
> on the schema cached in the state. Consequently, records outputted to 
> downstream systems are missing the fields corresponding to the newly added 
> columns.
> *Proposed Solution:*
> Upon removing a table from the cdc job, it is necessary to also 
> correspondingly remove the table from the MySQLBinlogSplit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36088) Fix NPE in PaimonDataSink.

2024-08-27 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17874938#comment-17874938
 ] 

Leonard Xu edited comment on FLINK-36088 at 8/27/24 5:01 PM:
-

master: 7f08c6caba082915b9684ea384363f1df85dab8d

release-3.2:56af7513db3eeb817c7e27e27bd8a9063561


was (Author: leonard xu):
master: 7f08c6caba082915b9684ea384363f1df85dab8d

release-3.2:TODO

> Fix NPE in PaimonDataSink.
> --
>
> Key: FLINK-36088
> URL: https://issues.apache.org/jira/browse/FLINK-36088
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: LvYanquan
>Assignee: LvYanquan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Fix NoPointException in BucketAssignOperator when job restarted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36076) Hotfix: Set isSchemaChangeApplying as volatile for thread safe.

2024-08-27 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17874940#comment-17874940
 ] 

Leonard Xu edited comment on FLINK-36076 at 8/27/24 5:01 PM:
-

master: d3473de4db92229eb09e1e4c698a2db5448c8805

3.2: 9f358ab1d7c2693d475560dd02d62a75ebe659ef


was (Author: leonard xu):
master: d3473de4db92229eb09e1e4c698a2db5448c8805

3.2: TODO

> Hotfix: Set isSchemaChangeApplying as volatile for thread safe.
> ---
>
> Key: FLINK-36076
> URL: https://issues.apache.org/jira/browse/FLINK-36076
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Current, we apply schema change asynchronous.  
> SchemaChangeThreadPool will set isSchemaChangeApplying as true when apply 
> schema change event. And the main thread which handle operater event will 
> check isSchemaChangeApplying.
>  
> For thread safe, this should be volatile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36088) Fix NPE in PaimonDataSink.

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-36088.

Resolution: Fixed

> Fix NPE in PaimonDataSink.
> --
>
> Key: FLINK-36088
> URL: https://issues.apache.org/jira/browse/FLINK-36088
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: LvYanquan
>Assignee: LvYanquan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Fix NoPointException in BucketAssignOperator when job restarted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36076) Hotfix: Set isSchemaChangeApplying as volatile for thread safe.

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-36076.

Resolution: Fixed

> Hotfix: Set isSchemaChangeApplying as volatile for thread safe.
> ---
>
> Key: FLINK-36076
> URL: https://issues.apache.org/jira/browse/FLINK-36076
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Current, we apply schema change asynchronous.  
> SchemaChangeThreadPool will set isSchemaChangeApplying as true when apply 
> schema change event. And the main thread which handle operater event will 
> check isSchemaChangeApplying.
>  
> For thread safe, this should be volatile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36082) Fix NotSerializableException for KafkaDataSink

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-36082.

Resolution: Fixed

> Fix NotSerializableException for KafkaDataSink
> --
>
> Key: FLINK-36082
> URL: https://issues.apache.org/jira/browse/FLINK-36082
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: LvYanquan
>Assignee: LvYanquan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> When I submitted a job to sync data from MySQL to Kafka using yaml, I met the 
> following exception:
> {code:java}
> Caused by: java.io.NotSerializableException: 
> org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSink
>   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> ~[?:1.8.0_372]
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
> ~[?:1.8.0_372] {code}
> and when I used CDC version of 3.1, it didn't happen. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36082) Fix NotSerializableException for KafkaDataSink

2024-08-27 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17874876#comment-17874876
 ] 

Leonard Xu edited comment on FLINK-36082 at 8/27/24 5:02 PM:
-

master: 0470fdbbc13a4281083488274e891a74cc8e81a8

release-3.2: 74f58cd31757970c6d784aa7982275cd18583b55


was (Author: leonard xu):
master: 0470fdbbc13a4281083488274e891a74cc8e81a8

release-3.2: TODO

> Fix NotSerializableException for KafkaDataSink
> --
>
> Key: FLINK-36082
> URL: https://issues.apache.org/jira/browse/FLINK-36082
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: LvYanquan
>Assignee: LvYanquan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> When I submitted a job to sync data from MySQL to Kafka using yaml, I met the 
> following exception:
> {code:java}
> Caused by: java.io.NotSerializableException: 
> org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSink
>   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> ~[?:1.8.0_372]
>   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> ~[?:1.8.0_372]
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
> ~[?:1.8.0_372] {code}
> and when I used CDC version of 3.1, it didn't happen. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36056) change flink.connector.elasticsearch.version to a release version in branch release-3.2

2024-08-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-36056.

Resolution: Fixed

fixed in master(3.2-SNAPSHOT): ab64eb3b574ebbeb81cf6b6500acbd211eea39ab

> change flink.connector.elasticsearch.version to a release version in branch 
> release-3.2
> ---
>
> Key: FLINK-36056
> URL: https://issues.apache.org/jira/browse/FLINK-36056
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: wuzexian
>Assignee: wuzexian
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Currently, the es connector version is incorrectly set as a SNPASHOT version, 
> we should change flink.connector.elasticsearch.version to a release version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [cdc-connector][sqlserver]Flink cdc pipeline support sqlserver source [flink-cdc]

2024-08-27 Thread via GitHub


zakhalex commented on PR #3445:
URL: https://github.com/apache/flink-cdc/pull/3445#issuecomment-2313494810

   @ChengJie1053 - do you have the privilege to approve the workflows?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36162) Remove flinkStateSnapshotReference and namespace from FlinkStateSnapshot jobReference

2024-08-27 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877194#comment-17877194
 ] 

Thomas Weise commented on FLINK-36162:
--

[~gyfora] I was surprised when you closed your PR but now it makes sense :) 
Agreed that it is better to remove these fields, they aren't solid enough and 
potentially hard to straighten out in the future. They also don't solve 
anything beyond what initialSavepointPath already covers.

> Remove flinkStateSnapshotReference and namespace from FlinkStateSnapshot 
> jobReference
> -
>
> Key: FLINK-36162
> URL: https://issues.apache.org/jira/browse/FLINK-36162
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Blocker
>
> I think in the initial version we should remove both the newly introduced 
> job.spec.flinkStateSnapshotReference and 
> FlinkStateSnapshot.jobReference.namspace fields as they generally allow users 
> to trigger and access savepoint paths from namespaces where the user may not 
> have permissions.
> Let me give you 2 examples:
> jobReference.namespace, allows us to trigger a savepoint for a job in a 
> different namespace. This works as long as the operator has access to the 
> user and does not verify that the current user in fact does. This may 
> ultimately allow us to trigger a savepoint to a custom place and even steal 
> the state.
> In a similar way the initial flinkStateSnapshot reference would allow us to 
> steal a savepoint path that we normally don't know/have access to and store 
> it in our resource.
> I suggest to simply remove these until we have a good way to solve these 
> issues, I think there is generally not much use for these fields overall.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-22748] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]

2024-08-27 Thread via GitHub


klam-shop commented on code in PR #109:
URL: 
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1733569499


##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java:
##
@@ -188,6 +188,120 @@ public void testKafkaSourceSink() throws Exception {
 deleteTestTopic(topic);
 }
 
+@Test
+public void testKafkaSourceSinkWithTopicList() throws Exception {
+// we always use a different topic name for each parameterized topic,
+// in order to make sure the topic can be created.
+final String topic1 = "topics1_" + format + "_" + UUID.randomUUID();
+final String topic2 = "topics2_" + format + "_" + UUID.randomUUID();
+createTestTopic(topic2, 1, 1);
+createTestTopic(topic1, 1, 1);
+
+// -- Produce an event time stream into Kafka 
---
+String groupId = getStandardProps().getProperty("group.id");
+String bootstraps = getBootstrapServers();
+final String createTable =
+String.format(
+"CREATE TABLE kafka (\n"
++ "  `topic` STRING METADATA,\n"
++ "  `user_id` INT,\n"
++ "  `item_id` INT,\n"
++ "  `behavior` STRING\n"
++ ") WITH (\n"
++ "  'connector' = '%s',\n"
++ "  'topic' = '%s;%s',\n"
++ "  'properties.bootstrap.servers' = '%s',\n"
++ "  'properties.group.id' = '%s',\n"
++ "  'scan.startup.mode' = 
'earliest-offset',\n"
++ "  %s\n"
++ ")\n",
+KafkaDynamicTableFactory.IDENTIFIER,
+topic1,
+topic2,
+bootstraps,
+groupId,
+formatOptions());
+
+tEnv.executeSql(createTable);
+
+List values =
+Arrays.asList(
+Row.of(topic1, 1, 1102, "behavior 1"),
+Row.of(topic2, 2, 1103, "behavior 2"));
+tEnv.fromValues(values).insertInto("kafka").execute().await();
+
+// -- Consume stream from Kafka ---
+
+List results = collectAllRows(tEnv.sqlQuery("SELECT * from 
kafka"));
+
+assertThat(results)
+.containsExactly(
+Row.of(topic1, 1, 1102, "behavior 1"),
+Row.of(topic2, 2, 1103, "behavior 2"));

Review Comment:
   Fair points, added assertions to check the specific topics and also use 
`containsExactlyInAnyOrder`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-22748] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]

2024-08-27 Thread via GitHub


klam-shop commented on code in PR #109:
URL: 
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1733569499


##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java:
##
@@ -188,6 +188,120 @@ public void testKafkaSourceSink() throws Exception {
 deleteTestTopic(topic);
 }
 
+@Test
+public void testKafkaSourceSinkWithTopicList() throws Exception {
+// we always use a different topic name for each parameterized topic,
+// in order to make sure the topic can be created.
+final String topic1 = "topics1_" + format + "_" + UUID.randomUUID();
+final String topic2 = "topics2_" + format + "_" + UUID.randomUUID();
+createTestTopic(topic2, 1, 1);
+createTestTopic(topic1, 1, 1);
+
+// -- Produce an event time stream into Kafka 
---
+String groupId = getStandardProps().getProperty("group.id");
+String bootstraps = getBootstrapServers();
+final String createTable =
+String.format(
+"CREATE TABLE kafka (\n"
++ "  `topic` STRING METADATA,\n"
++ "  `user_id` INT,\n"
++ "  `item_id` INT,\n"
++ "  `behavior` STRING\n"
++ ") WITH (\n"
++ "  'connector' = '%s',\n"
++ "  'topic' = '%s;%s',\n"
++ "  'properties.bootstrap.servers' = '%s',\n"
++ "  'properties.group.id' = '%s',\n"
++ "  'scan.startup.mode' = 
'earliest-offset',\n"
++ "  %s\n"
++ ")\n",
+KafkaDynamicTableFactory.IDENTIFIER,
+topic1,
+topic2,
+bootstraps,
+groupId,
+formatOptions());
+
+tEnv.executeSql(createTable);
+
+List values =
+Arrays.asList(
+Row.of(topic1, 1, 1102, "behavior 1"),
+Row.of(topic2, 2, 1103, "behavior 2"));
+tEnv.fromValues(values).insertInto("kafka").execute().await();
+
+// -- Consume stream from Kafka ---
+
+List results = collectAllRows(tEnv.sqlQuery("SELECT * from 
kafka"));
+
+assertThat(results)
+.containsExactly(
+Row.of(topic1, 1, 1102, "behavior 1"),
+Row.of(topic2, 2, 1103, "behavior 2"));

Review Comment:
   Good points, added assertions to check the specific topics and also use 
`containsExactlyInAnyOrder`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [FLINK-XXXX] Generalized watermark wip [flink]

2024-08-27 Thread via GitHub


jeyhunkarimov commented on PR #24997:
URL: https://github.com/apache/flink/pull/24997#issuecomment-2313665000

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35718][mysql] cherry pick DBZ-5333 from debezium MySqlErrorHandler and add logs. [flink-cdc]

2024-08-27 Thread via GitHub


github-actions[bot] commented on PR #3440:
URL: https://github.com/apache/flink-cdc/pull/3440#issuecomment-2313776551

   This pull request has been automatically marked as stale because it has not 
had recent activity for 60 days. It will be closed in 30 days if no further 
activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35718) Cherrypick DBZ-5333 to fix frequently failover by EOFException.

2024-08-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35718:
---
Labels: pull-request-available  (was: )

> Cherrypick DBZ-5333 to fix frequently failover by EOFException.
> ---
>
> Key: FLINK-35718
> URL: https://issues.apache.org/jira/browse/FLINK-35718
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: LvYanquan
>Priority: Not a Priority
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> This EOFException issue should happen occasionally, and Debezium provided a 
> retry mechanism to avoid frequently failover.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35277][cdc-connector][db2] Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. [flink-cdc]

2024-08-27 Thread via GitHub


github-actions[bot] commented on PR #3286:
URL: https://github.com/apache/flink-cdc/pull/3286#issuecomment-2313776592

   This pull request has been automatically marked as stale because it has not 
had recent activity for 60 days. It will be closed in 30 days if no further 
activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [e2e][tests] Improve the stability of pipeline e2e tests [flink-cdc]

2024-08-27 Thread via GitHub


yuxiqian commented on code in PR #3578:
URL: https://github.com/apache/flink-cdc/pull/3578#discussion_r1733730877


##
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java:
##
@@ -719,6 +718,11 @@ public void testMergeTableRouteWithTransform() throws 
Exception {
 
 @Test
 public void testReplacementSymbol() throws Exception {
+String defaultFlinkProperties = getFlinkProperties(flinkVersion);
+overrideFlinkProperties(
+defaultFlinkProperties.replace(
+"execution.checkpointing.interval: 300",
+"execution.checkpointing.interval: 1"));

Review Comment:
   I've tried to run this locally and `overrideFlinkProperties` seems not 
working since this environment variable will only take effect once when the 
container started in `before()` hook function.
   
   https://github.com/user-attachments/assets/c2402109-451f-49be-859f-493bf2e31a49";>
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cdc-connector][sqlserver]Flink cdc pipeline support sqlserver source [flink-cdc]

2024-08-27 Thread via GitHub


ChengJie1053 commented on PR #3445:
URL: https://github.com/apache/flink-cdc/pull/3445#issuecomment-2313885858

   > @ChengJie1053 - do you have the privilege to approve the workflows?
   
   I'm sorry. I don't have that clearance
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35917) Failed to deserialize data of EventHeaderV4

2024-08-27 Thread wangkang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangkang updated FLINK-35917:
-
Description: Caused by: java.lang.RuntimeException: One or more fetchers 
have encountered exception at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:229)
 at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:173)
 at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:134)
 at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: 
SplitFetcher thread 0 received unexpected exception while polling the records 
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:154)
 at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:109)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
... 1 more Caused by: 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped. at 
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) at 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:86)
 at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1545)
 at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079)
 at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
 at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) 
... 1 more Caused by: io.debezium.DebeziumException: Failed to deserialize data 
of EventHeaderV4\{timestamp=1722030104000, eventType=EXT_WRITE_ROWS, 
serverId=140168246, headerLength=19, dataLength=189, nextPosition=34794428, 
flags=0} at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1489)
 ... 5 more Caused by: 
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
 Failed to deserialize data of EventHeaderV4\{timestamp=1722030104000, 
eventType=EXT_WRITE_ROWS, serverId=140168246, headerLength=19, dataLength=189, 
nextPosition=34794428, flags=0} at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:341)
 at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:244)
 at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259)
 at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1051)
 ... 3 more Caused by: java.io.EOFException: Failed to read remaining 8 of 18 
bytes from position 108151391. Block length: 51. Initial block length: 185. at 
com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.fill(ByteArrayInputStream.java:115)
 at 
com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:105)
 at 
io.debezium.connector.mysql.RowDeserializers.deserializeVarString(RowDeserializers.java:264)
 at 
io.debezium.connector.mysql.RowDeserializers$WriteRowsDeserializer.deserializeVarString(RowDeserializers.java:192)
 at 
com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataDeserializer.java:189)
 at 
com.github.shyiko.mysql.binlog.event.deser

Re: [PR] [e2e][tests] Improve the stability of pipeline e2e tests [flink-cdc]

2024-08-27 Thread via GitHub


leonardBang commented on code in PR #3578:
URL: https://github.com/apache/flink-cdc/pull/3578#discussion_r1733762640


##
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java:
##
@@ -719,6 +718,11 @@ public void testMergeTableRouteWithTransform() throws 
Exception {
 
 @Test
 public void testReplacementSymbol() throws Exception {
+String defaultFlinkProperties = getFlinkProperties(flinkVersion);
+overrideFlinkProperties(
+defaultFlinkProperties.replace(
+"execution.checkpointing.interval: 300",
+"execution.checkpointing.interval: 1"));

Review Comment:
   good catch, looks like we use overrideFlinkProperties wrongly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35917) Failed to deserialize data of EventHeaderV4

2024-08-27 Thread wangkang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangkang updated FLINK-35917:
-
Description: 
it occurred during the full to incremental phase:

Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:229)
 at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:173)
 at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:134)
 at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: 
SplitFetcher thread 0 received unexpected exception while polling the records 
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:154)
 at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:109)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
... 1 more Caused by: 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped. at 
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) at 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:86)
 at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1545)
 at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079)
 at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
 at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) 
... 1 more Caused by: io.debezium.DebeziumException: Failed to deserialize data 
of EventHeaderV4\{timestamp=1722030104000, eventType=EXT_WRITE_ROWS, 
serverId=140168246, headerLength=19, dataLength=189, nextPosition=34794428, 
flags=0} at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1489)
 ... 5 more Caused by: 
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
 Failed to deserialize data of EventHeaderV4\{timestamp=1722030104000, 
eventType=EXT_WRITE_ROWS, serverId=140168246, headerLength=19, dataLength=189, 
nextPosition=34794428, flags=0} at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:341)
 at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:244)
 at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259)
 at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1051)
 ... 3 more Caused by: java.io.EOFException: Failed to read remaining 8 of 18 
bytes from position 108151391. Block length: 51. Initial block length: 185. at 
com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.fill(ByteArrayInputStream.java:115)
 at 
com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:105)
 at 
io.debezium.connector.mysql.RowDeserializers.deserializeVarString(RowDeserializers.java:264)
 at 
io.debezium.connector.mysql.RowDeserializers$WriteRowsDeserializer.deserializeVarString(RowDeserializers.java:192)
 at 
com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataDeserializer.java:

Re: [PR] [FLINK-35599][cdc-connector][jdbc-mysql]Flink cdc pipeline sink jdbc mysql [flink-cdc]

2024-08-27 Thread via GitHub


cobolbaby commented on PR #3433:
URL: https://github.com/apache/flink-cdc/pull/3433#issuecomment-2313956284

   Does it only support jdbc-mysql? Can the jdbc be extended to a general sink 
connector?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35599) Add JDBC Sink Plugin to Flink-CDC-Pipeline

2024-08-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35599:
---
Labels: pull-request-available  (was: )

> Add JDBC Sink Plugin to Flink-CDC-Pipeline
> --
>
> Key: FLINK-35599
> URL: https://issues.apache.org/jira/browse/FLINK-35599
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: JerryZhou
>Assignee: JerryZhou
>Priority: Minor
>  Labels: pull-request-available
>
> TODO



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35177] Fix DataGen Connector documentation [flink]

2024-08-27 Thread via GitHub


GOODBOY008 merged PR #24692:
URL: https://github.com/apache/flink/pull/24692


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35177] Fix DataGen Connector documentation [flink]

2024-08-27 Thread via GitHub


GOODBOY008 commented on PR #24692:
URL: https://github.com/apache/flink/pull/24692#issuecomment-2314045037

   @morozov LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35177) Datagen examples in documentation do not compile

2024-08-27 Thread Zhongqiang Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongqiang Gong resolved FLINK-35177.
-
Fix Version/s: 2.0.0
   Resolution: Fixed

Merged to branch: Master , Via : bc3afba5cb97265c4833818563f61b3fb6b48b6a .

> Datagen examples in documentation do not compile
> 
>
> Key: FLINK-35177
> URL: https://issues.apache.org/jira/browse/FLINK-35177
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.19.0
>Reporter: Sergei Morozov
>Assignee: Sergei Morozov
>Priority: Not a Priority
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Currently, the 
> [examples|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/#rate-limiting]
>  look like this:
> {code:java}
> GeneratorFunction generatorFunction = index -> index;
> double recordsPerSecond = 100;
> DataGeneratorSource source =
> new DataGeneratorSource<>(
>  generatorFunction,
>  Long.MAX_VALUE,
>  RateLimiterStrategy.perSecond(recordsPerSecond),
>  Types.STRING);
> {code}
> The generator function returns Long but the DataGeneratorSource uses String, 
> so their types do not match.
> Either the generator function needs to be changed to return a string, or the 
> source needs to use Long.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35177] Fix DataGen Connector documentation [flink]

2024-08-27 Thread via GitHub


GOODBOY008 commented on PR #24692:
URL: https://github.com/apache/flink/pull/24692#issuecomment-2314102983

   @morozov Can you open another pr to branch `release-1.20` ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36163) Kafka cdc pipeline source supports protobuf format.

2024-08-27 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-36163:
-

 Summary: Kafka cdc pipeline source supports protobuf format.
 Key: FLINK-36163
 URL: https://issues.apache.org/jira/browse/FLINK-36163
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: Hongshun Wang
 Fix For: cdc-3.3.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35579] update frocksdb version to v8.10.0 [flink]

2024-08-27 Thread via GitHub


mayuehappy commented on PR #25253:
URL: https://github.com/apache/flink/pull/25253#issuecomment-2314121275

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36164) JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned PostgreSQL Table

2024-08-27 Thread tchivs (Jira)
tchivs created FLINK-36164:
--

 Summary: JdbcIncrementalSource CheckPoint Timeout Due to 
Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned 
PostgreSQL Table
 Key: FLINK-36164
 URL: https://issues.apache.org/jira/browse/FLINK-36164
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: tchivs


When synchronizing a PostgreSQL table using a connector, if the table contains 
a large number of partitions, the checkpoint always fails. By tracing the 
source code, it was found that the PostgresDialect's queryTableSchema queries 
the schema of each table. This schema querying during each checkpoint causes 
connection timeouts.
 
 

 

tableName="(public)\.(aia_t_icc_jjdb.*|aia_t_vcs_fkdb.*|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)"

```
JdbcIncrementalSource incrSource =
PostgresSourceBuilder.PostgresIncrementalSource.builder()
.hostname(hostname)
.port(port)
.database(databaseName)
.schemaList(schemaName)
.tableList(tableName)
.username(username)
.password(password)
.deserializer(schema)
.slotName(slotName)
.decodingPluginName(config.get(DECODING_PLUGIN_NAME))
.includeSchemaChanges(true)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
.fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
.connectTimeout(config.get(CONNECT_TIMEOUT))
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
.connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
.distributionFactorUpper(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
.distributionFactorLower(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
.heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
.build();
return env.fromSource(
incrSource, WatermarkStrategy.noWatermarks(), "Postgres IncrSource");
```

 

 


check point Exception:

```
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold. The latest checkpoint failed due to Checkpoint expired 
before completing., view the Checkpoint History tab or the Job Manager log to 
find out why continuous checkpoints failed.
```
![image](https://github.com/user-attachments/assets/183f44fd-0ab1-4302-9f13-ed5242c43636)


error log:

```logger
2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
(dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O 
exception (java.net.SocketException) caught when processing request to 
{}->http://192.168.0.168:8040: Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown 
Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)

2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the embedded en

[jira] [Updated] (FLINK-36164) JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned PostgreSQL Table

2024-08-27 Thread tchivs (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tchivs updated FLINK-36164:
---
Description: 
When synchronizing a PostgreSQL table using a connector, if the table contains 
a large number of partitions, the checkpoint always fails. By tracing the 
source code, it was found that the PostgresDialect's queryTableSchema queries 
the schema of each table. This schema querying during each checkpoint causes 
connection timeouts.
 
 

 

tableName value is 
"(public)\.(aia_t_icc_jjdb.{*}|aia_t_vcs_fkdb.{*}|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)"

 


JdbcIncrementalSource incrSource =
PostgresSourceBuilder.PostgresIncrementalSource.builder()
.hostname(hostname)
.port(port)
.database(databaseName)
.schemaList(schemaName)
.tableList(tableName)
.username(username)
.password(password)
.deserializer(schema)
.slotName(slotName)
.decodingPluginName(config.get(DECODING_PLUGIN_NAME))
.includeSchemaChanges(true)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
.fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
.connectTimeout(config.get(CONNECT_TIMEOUT))
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
.connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
.distributionFactorUpper(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
.distributionFactorLower(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
.heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
.build();
return env.fromSource(
incrSource, WatermarkStrategy.noWatermarks(), "Postgres IncrSource");



 

 

check point Exception:

```
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold. The latest checkpoint failed due to Checkpoint expired 
before completing., view the Checkpoint History tab or the Job Manager log to 
find out why continuous checkpoints failed.
```
![image]([https://github.com/user-attachments/assets/183f44fd-0ab1-4302-9f13-ed5242c43636])

error log:

```logger
2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
(dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O 
exception (java.net.SocketException) caught when processing request to 
{}->[http://192.168.0.168:8040:] Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown 
Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)

2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] - 
Reporting error:
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
at com.ververica

[jira] [Updated] (FLINK-36164) JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned PostgreSQL Table

2024-08-27 Thread tchivs (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tchivs updated FLINK-36164:
---
Description: 
What's Wrong?
When synchronizing a PostgreSQL table using a connector, if the table contains 
a large number of partitions, the checkpoint always fails. By tracing the 
source code, it was found that the PostgresDialect's queryTableSchema queries 
the schema of each table. This schema querying during each checkpoint causes 
connection timeouts.


check point Exception:

org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold. The latest checkpoint failed due to Checkpoint expired 
before completing., view the Checkpoint History tab or the Job Manager log to 
find out why continuous checkpoints failed.
image

error log:

 
{code:java}
2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
(dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O 
exception (java.net.SocketException) caught when processing request to 
{}->http://192.168.0.168:8040: Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown 
Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)
2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] - 
Reporting error:
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115) 
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822) 
[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
 [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_381]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the task and engine
2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] - 
Stopping down connector
2024-05-28 15:41:07,376 WARN io.debezium.pipeline.ChangeEventSourceCoordinator 
[] - Coordinator didn't stop in the expected time, shutting down executor now
2024-

[PR] [FLINK-35177] Fix DataGen Connector documentation [flink]

2024-08-27 Thread via GitHub


morozov opened a new pull request, #25259:
URL: https://github.com/apache/flink/pull/25259

   This is a backport of https://github.com/apache/flink/pull/24692.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35177] Fix DataGen Connector documentation [flink]

2024-08-27 Thread via GitHub


morozov commented on PR #24692:
URL: https://github.com/apache/flink/pull/24692#issuecomment-2314127171

   @GOODBOY008, please see https://github.com/apache/flink/pull/25259.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36164) JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned PostgreSQL Table

2024-08-27 Thread tchivs (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tchivs updated FLINK-36164:
---
Description: 
What's Wrong?
When synchronizing a PostgreSQL table using a connector, if the table contains 
a large number of partitions, the checkpoint always fails. By tracing the 
source code, it was found that the PostgresDialect's queryTableSchema queries 
the schema of each table. This schema querying during each checkpoint causes 
connection timeouts.
{code:java}
JdbcIncrementalSource incrSource =
PostgresSourceBuilder.PostgresIncrementalSource.builder()
.hostname(hostname)
.port(port)
.database(databaseName)
.schemaList(schemaName)
.tableList(tableName)
.username(username)
.password(password)
.deserializer(schema)
.slotName(slotName)
.decodingPluginName(config.get(DECODING_PLUGIN_NAME))
.includeSchemaChanges(true)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
.fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
.connectTimeout(config.get(CONNECT_TIMEOUT))
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
.connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
.distributionFactorUpper(

config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
.distributionFactorLower(

config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
.heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
.build(); {code}
 

check point Exception:
{quote}org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
tolerable failure threshold. The latest checkpoint failed due to Checkpoint 
expired before completing., view the Checkpoint History tab or the Job Manager 
log to find out why continuous checkpoints failed.
image
{quote}
error log:

 
{code:java}
2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
(dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O 
exception (java.net.SocketException) caught when processing request to 
{}->http://192.168.0.168:8040: Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown 
Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)
2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] - 
Reporting error:
java.lang.InterruptedException: 

[jira] [Updated] (FLINK-36164) JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned PostgreSQL Table

2024-08-27 Thread tchivs (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tchivs updated FLINK-36164:
---
Attachment: image-2024-08-28-11-28-33-016.png

> JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All 
> Subpartitions When Synchronizing a Partitioned PostgreSQL Table
> --
>
> Key: FLINK-36164
> URL: https://issues.apache.org/jira/browse/FLINK-36164
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: tchivs
>Priority: Major
> Attachments: image-2024-08-28-11-28-21-496.png, 
> image-2024-08-28-11-28-33-016.png
>
>
> What's Wrong?
> When synchronizing a PostgreSQL table using a connector, if the table 
> contains a large number of partitions, the checkpoint always fails. By 
> tracing the source code, it was found that the PostgresDialect's 
> queryTableSchema queries the schema of each table. This schema querying 
> during each checkpoint causes connection timeouts.
> {code:java}
> JdbcIncrementalSource incrSource =
> PostgresSourceBuilder.PostgresIncrementalSource.builder()
> .hostname(hostname)
> .port(port)
> .database(databaseName)
> .schemaList(schemaName)
> .tableList(tableName)
> .username(username)
> .password(password)
> .deserializer(schema)
> .slotName(slotName)
> .decodingPluginName(config.get(DECODING_PLUGIN_NAME))
> .includeSchemaChanges(true)
> .debeziumProperties(debeziumProperties)
> .startupOptions(startupOptions)
> .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
> .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
> .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
> .connectTimeout(config.get(CONNECT_TIMEOUT))
> .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
> .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
> .distributionFactorUpper(
> 
> config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
> .distributionFactorLower(
> 
> config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
> .heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
> .build(); {code}
>  
> check point Exception:
> {quote}org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
> tolerable failure threshold. The latest checkpoint failed due to Checkpoint 
> expired before completing., view the Checkpoint History tab or the Job 
> Manager log to find out why continuous checkpoints failed.
> image
> {quote}
> error log:
>  
> {code:java}
> 2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
> Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
> (dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
> 2024-05-28 15:39:07,370 INFO 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state CANCELED to JobManager for task fkdb: 
> Writer -> fkdb: Committer (1/1)#0 
> dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
> 2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - 
> I/O exception (java.net.SocketException) caught when processing request to 
> {}->http://192.168.0.168:8040: Socket closed
> 2024-05-28 15:39:07,378 INFO 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state CANCELED to JobManager for task jjdb: 
> Writer -> jjdb: Committer (1/1)#0 
> dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
> 2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
> Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
> signal - interrupting; it is stuck for 30 seconds in method:
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail

[jira] [Updated] (FLINK-36164) JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned PostgreSQL Table

2024-08-27 Thread tchivs (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tchivs updated FLINK-36164:
---
Attachment: image-2024-08-28-11-28-21-496.png

> JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All 
> Subpartitions When Synchronizing a Partitioned PostgreSQL Table
> --
>
> Key: FLINK-36164
> URL: https://issues.apache.org/jira/browse/FLINK-36164
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: tchivs
>Priority: Major
> Attachments: image-2024-08-28-11-28-21-496.png, 
> image-2024-08-28-11-28-33-016.png
>
>
> What's Wrong?
> When synchronizing a PostgreSQL table using a connector, if the table 
> contains a large number of partitions, the checkpoint always fails. By 
> tracing the source code, it was found that the PostgresDialect's 
> queryTableSchema queries the schema of each table. This schema querying 
> during each checkpoint causes connection timeouts.
> {code:java}
> JdbcIncrementalSource incrSource =
> PostgresSourceBuilder.PostgresIncrementalSource.builder()
> .hostname(hostname)
> .port(port)
> .database(databaseName)
> .schemaList(schemaName)
> .tableList(tableName)
> .username(username)
> .password(password)
> .deserializer(schema)
> .slotName(slotName)
> .decodingPluginName(config.get(DECODING_PLUGIN_NAME))
> .includeSchemaChanges(true)
> .debeziumProperties(debeziumProperties)
> .startupOptions(startupOptions)
> .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
> .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
> .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
> .connectTimeout(config.get(CONNECT_TIMEOUT))
> .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
> .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
> .distributionFactorUpper(
> 
> config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
> .distributionFactorLower(
> 
> config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
> .heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
> .build(); {code}
>  
> check point Exception:
> {quote}org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
> tolerable failure threshold. The latest checkpoint failed due to Checkpoint 
> expired before completing., view the Checkpoint History tab or the Job 
> Manager log to find out why continuous checkpoints failed.
> image
> {quote}
> error log:
>  
> {code:java}
> 2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
> Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
> (dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
> 2024-05-28 15:39:07,370 INFO 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state CANCELED to JobManager for task fkdb: 
> Writer -> fkdb: Committer (1/1)#0 
> dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
> 2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - 
> I/O exception (java.net.SocketException) caught when processing request to 
> {}->http://192.168.0.168:8040: Socket closed
> 2024-05-28 15:39:07,378 INFO 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state CANCELED to JobManager for task jjdb: 
> Writer -> jjdb: Committer (1/1)#0 
> dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
> 2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
> Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
> signal - interrupting; it is stuck for 30 seconds in method:
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail

[jira] [Updated] (FLINK-36164) JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned PostgreSQL Table

2024-08-27 Thread tchivs (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tchivs updated FLINK-36164:
---
Description: 
What's Wrong?
When synchronizing a PostgreSQL table using a connector, if the table contains 
a large number of partitions, the checkpoint always fails. By tracing the 
source code, it was found that the PostgresDialect's queryTableSchema queries 
the schema of each table. This schema querying during each checkpoint causes 
connection timeouts.
{code:java}
JdbcIncrementalSource incrSource =
PostgresSourceBuilder.PostgresIncrementalSource.builder()
.hostname(hostname)
.port(port)
.database(databaseName)
.schemaList(schemaName)
.tableList(tableName)
.username(username)
.password(password)
.deserializer(schema)
.slotName(slotName)
.decodingPluginName(config.get(DECODING_PLUGIN_NAME))
.includeSchemaChanges(true)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
.fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
.connectTimeout(config.get(CONNECT_TIMEOUT))
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
.connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
.distributionFactorUpper(

config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
.distributionFactorLower(

config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
.heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
.build(); {code}
!image-2024-08-28-11-28-21-496.png!

check point Exception:
{quote}org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
tolerable failure threshold. The latest checkpoint failed due to Checkpoint 
expired before completing., view the Checkpoint History tab or the Job Manager 
log to find out why continuous checkpoints failed.
image
{quote}
error log:

 
{code:java}
2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
(dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O 
exception (java.net.SocketException) caught when processing request to 
{}->http://192.168.0.168:8040: Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown 
Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)
2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] - 
Reporting error

  1   2   >