Hey, Peter Very detailed issue description, there’re two possible reasons which may lead to current case from my understanding.
(1) The network issue, could you check network from Flink to MySQL via telnet192.168.10.32 3306 in TaskManager? (2) The Higher MySQL version with lower MySQL Driver version is not recommended, but the error should not connection time out in this case Best, Leonard > 2025 9月 23 23:31,Peter Muller <[email protected]> 写道: > > Hi everybody, > > I am trying to replicate a MySQL database into StarRocks using a Flink CDC > pipeline. The Flink job loads correctly, starts the incremental snapshot and > transfers multiple GB (sometimes even multiple hundred GBs) into the sink DB. > Then, after an arbitrary duration of time (it ranges from ~10m to about ~12h, > depending on the run), the job fails with a 'Communications link failure' - > see error message below. Since the job is still in the snapshot phase, it > just perpetually restarts it. Unfortunately, I have been unable to debug this > issue for over a week now and I am getting a bit desperate... Therefore, any > help or pointers where to look further would be greatly appreciated! > > Relevant observations: > > This job used to run fine in a past setup, where the MySQL DB, Flink and > StarRocks were all containers on the same server. The issue appeared after > migrating the MySQL DB onto a separate server. > The CPU, RAM and NICs on both machines seem to be far away from their max > capacity when the job fails > Flink/ StarRocks server: > Flink CDC 3.4.0 and Flink 1.20.2 > StarRocks 3.5.5 > Everything in Podman containers on Debain 13 > MySQL Connector/J 8.0.33 (the latest 8.0 jar that I could find) > I can't see anything in journalctl or dmesg that would hint at a root cause > MySQL DB server: > MySQL 8.0.43 in a Podman container on a Debian 13 VM that runs on Proxmox > I can't see any irregularities in MySQL's error_log, general_log or > slow_query_log > I can't see anything in journalctl or dmesg that would hint at a root cause > I don't exceed max_connections > Networking: > The problem persists across three completely separate network connections > with separate NICs, two of the connections are even P2P (the servers are > physically next to each other) > Running tcpdump on the Flink server does not show any dropped packages > Configuration: > Please see below my current configuration for Flink, the Flink job and MySQL > I have tried to change any parameter in Flink, JDBC, Debezium and MySQL that > I thought could be relevant in terms of networking or timeouts - but still no > luck > > > Root Exception: > > 2025-09-23 15:21:11,099 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition (1/1) > (c2b1d7b6ad8a31b175c49d96cc5e6b36_cbc357ccb763df2852fee8c4fc7d55f2_0_0) > switched from RUNNING to FAILED on 10.89.2.141:38211-997a8a @ > flinktaskmanager1 (dataPort=35119). > java.lang.RuntimeException: One or more fetchers have encountered exception > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443) > ~[flink-dist-1.20.2.jar:1.20.2] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist-1.20.2.jar:1.20.2] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.20.2.jar:1.20.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) > ~[flink-dist-1.20.2.jar:1.20.2] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-dist-1.20.2.jar:1.20.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) > ~[flink-dist-1.20.2.jar:1.20.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) > ~[flink-dist-1.20.2.jar:1.20.2] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) > ~[flink-dist-1.20.2.jar:1.20.2] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) > ~[flink-dist-1.20.2.jar:1.20.2] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) > ~[flink-dist-1.20.2.jar:1.20.2] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist-1.20.2.jar:1.20.2] > at java.base/java.lang.Thread.run(Unknown Source) ~[?:?] > Caused by: java.lang.RuntimeException: SplitFetcher thread 110 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) ~[?:?] > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) ~[?:?] > ... 1 more > Caused by: io.debezium.DebeziumException: Error reading MySQL variables: > Communications link failure > > The last packet sent successfully to the server was 0 milliseconds ago. The > driver has not received any packets from the server. > at > io.debezium.connector.mysql.MySqlConnection.querySystemVariables(MySqlConnection.java:162) > ~[?:?] > at > io.debezium.connector.mysql.MySqlConnection.readMySqlSystemVariables(MySqlConnection.java:140) > ~[?:?] > at > io.debezium.connector.mysql.MySqlConnection.isTableIdCaseSensitive(MySqlConnection.java:502) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:114) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:134) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:77) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:118) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84) > ~[?:?] > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) ~[?:?] > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) ~[?:?] > ... 1 more > Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: > Communications link failure > > The last packet sent successfully to the server was 0 milliseconds ago. The > driver has not received any packets from the server. > at > com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:175) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:825) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:446) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:239) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:188) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:244) > ~[?:?] > at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888) > ~[?:?] > at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883) > ~[?:?] > at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:548) ~[?:?] > at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496) ~[?:?] > at > io.debezium.connector.mysql.MySqlConnection.querySystemVariables(MySqlConnection.java:146) > ~[?:?] > at > io.debezium.connector.mysql.MySqlConnection.readMySqlSystemVariables(MySqlConnection.java:140) > ~[?:?] > at > io.debezium.connector.mysql.MySqlConnection.isTableIdCaseSensitive(MySqlConnection.java:502) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:114) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:134) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:77) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:118) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84) > ~[?:?] > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) ~[?:?] > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) ~[?:?] > ... 1 more > Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications > link failure > > The last packet sent successfully to the server was 0 milliseconds ago. The > driver has not received any packets from the server. > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:?] > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown > Source) ~[?:?] > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown > Source) ~[?:?] > at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) > ~[?:?] > at > com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:62) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:150) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:166) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:89) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at com.mysql.cj.NativeSession.connect(NativeSession.java:121) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:945) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:815) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:446) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:239) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:188) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:244) > ~[?:?] > at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888) > ~[?:?] > at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883) > ~[?:?] > at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:548) ~[?:?] > at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496) ~[?:?] > at > io.debezium.connector.mysql.MySqlConnection.querySystemVariables(MySqlConnection.java:146) > ~[?:?] > at > io.debezium.connector.mysql.MySqlConnection.readMySqlSystemVariables(MySqlConnection.java:140) > ~[?:?] > at > io.debezium.connector.mysql.MySqlConnection.isTableIdCaseSensitive(MySqlConnection.java:502) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:114) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:134) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:77) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:118) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84) > ~[?:?] > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) ~[?:?] > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) ~[?:?] > ... 1 more > Caused by: java.net.ConnectException: Connection timed out (Connection timed > out) > at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) > ~[?:?] > at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source) > ~[?:?] > at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown > Source) ~[?:?] > at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source) > ~[?:?] > at java.base/java.net.SocksSocketImpl.connect(Unknown Source) ~[?:?] > at java.base/java.net.Socket.connect(Unknown Source) ~[?:?] > at > com.mysql.cj.protocol.StandardSocketFactory.connect(StandardSocketFactory.java:153) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:63) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at com.mysql.cj.NativeSession.connect(NativeSession.java:121) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:945) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:815) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:446) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:239) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:188) > ~[mysql-connector-j-8.0.33.jar:8.0.33] > at > io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:244) > ~[?:?] > at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888) > ~[?:?] > at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883) > ~[?:?] > at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:548) ~[?:?] > at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496) ~[?:?] > at > io.debezium.connector.mysql.MySqlConnection.querySystemVariables(MySqlConnection.java:146) > ~[?:?] > at > io.debezium.connector.mysql.MySqlConnection.readMySqlSystemVariables(MySqlConnection.java:140) > ~[?:?] > at > io.debezium.connector.mysql.MySqlConnection.isTableIdCaseSensitive(MySqlConnection.java:502) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:114) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:134) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:77) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:118) > ~[?:?] > at > org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84) > ~[?:?] > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) > ~[flink-connector-files-1.20.2.jar:1.20.2] > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) ~[?:?] > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) ~[?:?] > ... 1 more > > Flink config.yaml > > # Flink CDC configuration > # /opt/flink/conf/config.yaml > > blob.server.port: 6124 > > taskmanager.bind-host: '0.0.0.0' > taskmanager.memory.process.size: 200g > taskmanager.memory.managed.size: 1g > taskmanager.memory.network.max: 1g > taskmanager.numberOfTaskSlots: 1 > > taskmanager.network.partition-request-timeout: 3600s > taskmanager.network.request-backoff.initial: 100 > taskmanager.network.request-backoff.max: 1800000 > taskmanager.network.retries: 20 > > jobmanager.bind-host: '0.0.0.0' > jobmanager.execution.failover-strategy: region > jobmanager.memory.process.size: 100g > jobmanager.memory.off-heap.size: 1g > jobmanager.memory.enable-jvm-direct-memory-limit: true > jobmanager.rpc.address: '10.89.0.40' > jobmanager.rpc.port: 6123 > > rest.address: '10.89.0.40' > rest.bind-address: '0.0.0.0' > rest.port: 8081 > > parallelism.default: 1 > > state.backend.type: 'hashmap' > > execution.checkpointing.incremental: true > execution.checkpointing.interval: 5m > execution.checkpointing.min-pause: 5m > execution.checkpointing.timeout: 1h > execution.checkpointing.storage: 'filesystem' > execution.checkpointing.dir: > 'file:///opt/flink/checkpoints/' > execution.checkpointing.savepoint-dir: > 'file:///opt/flink/savepoints/' > > env.java.opts.all: > --add-exports=java.base/sun.net.util=ALL-UNNAMED > --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED > --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED > --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED > --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED > --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED > --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED > --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED > --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net > <http://java.net/>=ALL-UNNAMED --add-opens=java.base/java.io > <http://java.io/>=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED > --add-opens=java.base/sun.nio.ch <http://sun.nio.ch/>=ALL-UNNAMED > --add-opens=java.base/java.lang.reflect=ALL-UNNAMED > --add-opens=java.base/java.text=ALL-UNNAMED > --add-opens=java.base/java.time=ALL-UNNAMED > --add-opens=java.base/java.util=ALL-UNNAMED > --add-opens=java.base/java.util.concurrent=ALL-UNNAMED > --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED > --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED > > pekko.ask.timeout: 10m > heartbeat.timeout: 10m > > > Flink CDC pipeline job > > ################################################################################ > # Description: Sync all MySQL tables to StarRocks > ################################################################################ > > pipeline: > name: Sync all MySQL tables to StarRocks > parallelism: 1 > > source: > type: mysql > hostname: 192.168.10.32 > port: 3306 > username: flink_mysql > password: "$FLINK_MYSQL_PASSWORD" > tables: db.\.* > tables.exclude: db.(tmptable_[a-zA-Z0-9_]+) > server-id: '5500-5599' > scan.incremental.snapshot.chunk.size: 131072 # 8192 > scan.binlog.newly-added-table.enabled: 'true' > scan.incremental.close-idle-reader.enabled: 'true' > connect.timeout: 7200s > connect.max-retries: 10 > server-time-zone: 'UTC' > > jdbc.properties.allowPublicKeyRetrieval: 'true' > jdbc.properties.connectTimeout: 7200000 > jdbc.properties.socketTimeout: 7200000 > jdbc.properties.max_execution_time: 7200000 > > debezium.poll.interval.ms <http://debezium.poll.interval.ms/>: > 10000 > debezium.connect.timeout.ms <http://debezium.connect.timeout.ms/>: > 7200000 > debezium.snapshot.lock.timeout.ms > <http://debezium.snapshot.lock.timeout.ms/>: 7200000 > debezium.signal.kafka.poll.timeout.ms > <http://debezium.signal.kafka.poll.timeout.ms/>: 10000 > debezium.connect.keep.alive: 'true' > > sink: > type: starrocks > name: StarRocks Sink > jdbc-url: jdbc:mysql://10.89.0.50:9030 > <http://10.89.0.50:9030/> > load-url: http://10.89.0.50:8030 > <http://10.89.2.150:8030/> > username: flink > password: "$FLINK_STARROCKS_PASSWORD" > table.create.properties.replication_num: 1 > table.create.properties.fast_schema_evolution: 'true' > sink.properties.timeout: 1200 > > my.cnf > > [mysqld] > log_bin = ON > > join_buffer_size=128M > sort_buffer_size=2M > read_rnd_buffer_size=2M > > skip-name-resolve > datadir=/var/lib/mysql > socket=/var/lib/mysql/mysql.sock > secure-file-priv=/var/lib/mysql-files > > pid-file=/var/run/mysqld/mysqld.pid > > tmpdir=/tmp > > bind-address = 0.0.0.0 > > # Set up logging > general_log=ON > general_log_file=/var/lib/mysql/log_general > slow_query_log=ON > slow_query_log_file=/var/lib/mysql/log_slow_query > > default_storage_engine=InnoDB > > character_set_server=utf8mb4 > collation_server=utf8mb4_bin > lower_case_table_names=1 > innodb_file_per_table=ON > > max_connections=100 > > innodb_buffer_pool_size=64g > innodb_redo_log_capacity=24G > innodb_flush_log_at_trx_commit=1 > innodb_flush_method=O_DIRECT > innodb_log_buffer_size=1G > innodb_io_capacity=10000 > innodb_io_capacity_max=20000 > innodb_ddl_buffer_size=4G > innodb_ddl_threads=10 > innodb_parallel_read_threads=10 > > temptable_max_ram=4G > > # Server System Variables > connect_timeout=3600 > interactive_timeout=28800 > max_execution_time=7200000 > net_read_timeout=3600 > wait_timeout=28800 > > # InnoDB Startup Options > innodb_lock_wait_timeout=3600 > > mysqlx=OFF > > Thank you for your help! > Peter
