TemiChan created FLINK-38281: -------------------------------- Summary: Schema changes still caused column size does not match the data size after enabling scan.parse.online.schema.changes.enabled Key: FLINK-38281 URL: https://issues.apache.org/jira/browse/FLINK-38281 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.4.0, cdc-3.3.0 Reporter: TemiChan
During the process of adding a field to the source table in CDC3.3 using pt-osc or gh-ost, if data is inserted into the source table, the error "column size does not match the data size" will be reported by CDC. However, if there is no data change in the source table during the online field addition process, and the data change is made to the source table after the field addition is completed by the tool, no error will be reported. *1.while true insert data* {{while}} {{{}true{}}}{{{};{}}}{{{}do{}}} {{mysql --login-path=dba doris_sync -Be }}{{{}"insert into test(ptest21,tcc10)select ptest21,tcc10 from test limit 10"{}}}{{{};done;{}}} *{{2.add column}}* {{{}pt-online-schema-change {-}{{-}}charset=utf8mb4 {-}{{-}}recursion-method=none -{-}no-version-check --skip-check-slave-lag --user=root --password={-}{}}}{{{}"xx"{}}} {{-host=127.0.0.1-}} {{{}u=chenqimi,P={}}}{{{}3006{}}}{{{},D=doris_sync,t={}}}{{{}test{}}}{{{},A=utf8mb4 --alter {{}}}}{{{}"add column tax_code VARCHAR(30) NULL "{}}} {{-execute}} *3.flinkcdc error* {{java.lang.IllegalStateException: Column size does not match the data size}} {{ }}{{{}at org.apache.flink.cdc.common.utils.Preconditions.checkState(Preconditions.java:{}}}{{{}161{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.serializerRecord(DorisEventSerializer.java:{}}}{{{}121{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.applyDataChangeEvent(DorisEventSerializer.java:{}}}{{{}100{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.serialize(DorisEventSerializer.java:{}}}{{{}71{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.serialize(DorisEventSerializer.java:{}}}{{{}49{}}}{{{}){}}} {{ }}{{{}at org.apache.doris.flink.sink.batch.DorisBatchWriter.write(DorisBatchWriter.java:{}}}{{{}120{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:{}}}{{{}160{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:{}}}{{{}178{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:{}}}{{{}75{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:{}}}{{{}50{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:{}}}{{{}29{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:{}}}{{{}38{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:{}}}{{{}238{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:{}}}{{{}157{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:{}}}{{{}114{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:{}}}{{{}65{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:{}}}{{{}579{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:{}}}{{{}231{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:{}}}{{{}909{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:{}}}{{{}858{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:{}}}{{{}958{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:{}}}{{{}937{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:{}}}{{{}751{}}}{{{}){}}} {{ }}{{{}at org.apache.flink.runtime.taskmanager.Task.run(Task.java:{}}}{{{}566{}}}{{{}){}}} {{ }}{{{}at java.lang.Thread.run(Thread.java:{}}}{{{}748{}}}{{{}){}}} -- This message was sent by Atlassian Jira (v8.20.10#820010)