[ https://issues.apache.org/jira/browse/FLINK-36223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zjf updated FLINK-36223: ------------------------ Labels: pull-request-available (was: ) > SplitFetcher thread 0 received unexpected exception while polling the records > ----------------------------------------------------------------------------- > > Key: FLINK-36223 > URL: https://issues.apache.org/jira/browse/FLINK-36223 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.1.1 > Environment: # JDK 1.8 > # SQL SERVER 2019 > # FLINK CDC 3X > Reporter: zjf > Priority: Major > Labels: pull-request-available > Fix For: cdc-3.1.1 > > Attachments: image-2024-09-05-14-17-56-066.png, > image-2024-09-05-14-23-48-144.png, image-2024-09-05-14-35-58-759.png, > image-2024-09-05-14-36-12-672.png, image-2024-09-05-14-37-46-581.png, > image-2024-09-05-14-38-30-542.png, image-2024-09-05-14-38-49-424.png, > image-2024-09-05-14-39-07-070.png > > > 1.SQL Server dynamic table error occurred,The triggering condition is that > after I save the checkpoint, I add a new table to my Flink CDC, and then the > exception occurs when using the checkpoint to restore the CDC task > 2.The error log information is as follows > 024-09-04 18:08:56,577 INFO tracer[] [debezium-reader-0] > i.d.c.s.SqlServerStreamingChangeEventSource:? - CDC is enabled for table > Capture instance "T_BD_SUPPLIER_L" > [sourceTableId=AIS20231222100348.dbo.T_BD_SUPPLIER_L, > changeTableId=AIS20231222100348.cdc.T_BD_SUPPLIER_L_CT, > startLsn=000abdbd:0000192b:0001, changeTableObjectId=627568271, stopLsn=NULL] > but the table is not whitelisted by connector > 2024-09-04 18:08:56,947 ERROR tracer[] [Source Data Fetcher for Source: > kingdee-cdc-supply_test-source (1/1)#0|#0] > o.a.f.c.b.s.r.f.SplitFetcherManager:? - Received uncaught exception. > java.lang.RuntimeException: SplitFetcher thread 0 received unexpected > exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: > com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: > An exception occurred in the change event producer. This connector will be > stopped. > at > io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:459) > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) > at > com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) > at > com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) > at > com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) > ... 6 common frames omitted > Caused by: > com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: > file is not a valid field name > at > com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) > at > com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) > at > com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) > at > com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193) > at > com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223) > at > io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47) > at > com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147) > at > com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62) > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.getChangeTablesToQuery(SqlServerStreamingChangeEventSource.java:581) > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:237) > ... 10 common frames omitted > 3.The Maven configuration file I introduced is as follows > <properties> > <flink.version>1.19.1</flink.version> > <sql-connector.version>3.0.1</sql-connector.version> > </properties> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-base</artifactId> > <version>${flink.version}</version> > <!-- <scope>compile</scope>--> > </dependency> > <dependency> > <groupId>com.ververica</groupId> > <artifactId>flink-cdc-base</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>com.ververica</groupId> > <artifactId>flink-sql-connector-mysql-cdc</artifactId> > <version>${sql-connector.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>com.ververica</groupId> > <artifactId>flink-sql-connector-sqlserver-cdc</artifactId> > <version>${sql-connector.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-runtime-web</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-runtime</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka</artifactId> > <version>3.1.0-1.18</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-pulsar</artifactId> > <version>4.1.0-1.18</version> > </dependency> > The reason is that the Flink CDC reads the serialized checkpoint file, which > only contains one table A in the file. However, when I add table B and the > data in table B changes, this exception occurs. The reason is that this table > is not included in the historical checkpoint serialization. Below is the > image evidence of my debugging > After I added a new table, I couldn't find it here > !image-2024-09-05-14-17-56-066.png! > this.schema.tableFor(currentTable.getSourceTableId()) == null > I guess the judgment here is that when the checkpoint serialization content > cannot be found, the newly added table needs to be initialized and added to > the cache > !image-2024-09-05-14-23-48-144.png! > But I don't understand this paragraph anymore. Struct's schema local variable > doesn't contain the file attribute, so why bother reading it? Could it be > that there is a problem with the package version I introduced? This issue has > also occurred in Flink sql connector mysql cdc before, but it can be solved > by setting. scanNewlyAddedTableEnabled (true) in MySqlSourceBuilder. However, > I did not find it in the methods of the sqlServerSourceBuilder in Flink sql > connector sqlserver cdc > !image-2024-09-05-14-37-46-581.png! > This code should be reading the file attribute > !image-2024-09-05-14-38-30-542.png! > Because I couldn't find it and threw an exception here, the specific > manifestation is that I kept looping errors > !image-2024-09-05-14-38-49-424.png! > !image-2024-09-05-14-39-07-070.png! > If this problem is not resolved, I will not be able to read the last > checkpoint or savepoint and will have to delete it. If there are any data > modifications in the database at this time, I will not be able to capture them -- This message was sent by Atlassian Jira (v8.20.10#820010)