[ https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
caixiaowei updated FLINK-37254: ------------------------------- Summary: Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’ (was: flink cdc的oceanbase cdc 存在表字段的默认值为default '0'抛出异常) > Flink CDC OceanBase Connector Throws Exception for Column Default Value > ‘DEFAULT ‘0’ > ------------------------------------------------------------------------------------ > > Key: FLINK-37254 > URL: https://issues.apache.org/jira/browse/FLINK-37254 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.1.0 > Environment: flink 1.18.1 > flink cdc 3.1.0 > oceanbase 5.7.25-OceanBase_CE-v4.1.0.2 > > Reporter: caixiaowei > Priority: Major > Labels: pull-request-available > > 当表结构的字段存在: > {color:#000000}{color:#006464}`tenant_id`{color} > {color:#000080}bigint{color}({color:#0000ff}20{color}) > {color:#800000}unsigned{color} {color:#800000}NOT{color} > {color:#800000}NULL{color} {color:#800000}DEFAULT{color} > {color:#008000}'0'{color} {color:#000000}COMMENT{color} > {color:#008000}'租户ID'{color},{color} > > 会抛出以下异常: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180) > at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261) > at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) > at > org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > at > org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) > at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) > at > org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) > at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) > at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) > at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) > at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) > at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) > at > java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) > Caused by: io.debezium.DebeziumException: Failed to set field default value > for 'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the > default value is 0 of type class java.lang.String > at > io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421) > at > io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) > at > java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) > at > io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:147) > at > io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:135) > at > io.debezium.relational.RelationalDatabaseSchema.refreshSchema(RelationalDatabaseSchema.java:209) > at > io.debezium.relational.RelationalDatabaseSchema.refresh(RelationalDatabaseSchema.java:200) > at > org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.getTableSchema(OceanBaseRichSourceFunction.java:304) > at > org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.lambda$readSnapshotRecordsByTable$2(OceanBaseRichSourceFunction.java:320) > at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:555) > at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496) > at > org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readSnapshotRecordsByTable(OceanBaseRichSourceFunction.java:317) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readSnapshotRecords(OceanBaseRichSourceFunction.java:309) > at > org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.run(OceanBaseRichSourceFunction.java:189) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338) > Caused by: > org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: > Invalid default value > at > org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131) > at > io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:415) > ... 25 more > Caused by: > org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: > Invalid Java object for schema > "org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal" > with type BYTES: class java.lang.String > at > org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242) > at > org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213) > at > org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129) -- This message was sent by Atlassian Jira (v8.20.10#820010)