[ 
https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

caixiaowei updated FLINK-37254:
-------------------------------
    Description: 
When the table schema contains the following field:

{color:#000000}`tenant_id`{color} 
{color:#000080}bigint{color}({color:#0000ff}20{color}) 
{color:#800000}unsigned{color} {color:#800000}NOT{color} 
{color:#800000}NULL{color} {color:#800000}DEFAULT{color} 
{color:#008000}'0'{color} {color:#000000}COMMENT{color} {color:#008000}'Tenant 
ID'{color},

The following exception is thrown:

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
    at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
    at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
    at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: io.debezium.DebeziumException: Failed to set field default value for 
'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default 
value is 0 of type class java.lang.String
    at 
io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421)
    at 
io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at 
io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:147)
    at 
io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:135)
    at 
io.debezium.relational.RelationalDatabaseSchema.refreshSchema(RelationalDatabaseSchema.java:209)
    at 
io.debezium.relational.RelationalDatabaseSchema.refresh(RelationalDatabaseSchema.java:200)
    at 
org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.getTableSchema(OceanBaseRichSourceFunction.java:304)
    at 
org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.lambda$readSnapshotRecordsByTable$2(OceanBaseRichSourceFunction.java:320)
    at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:555)
    at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496)
    at 
org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readSnapshotRecordsByTable(OceanBaseRichSourceFunction.java:317)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readSnapshotRecords(OceanBaseRichSourceFunction.java:309)
    at 
org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.run(OceanBaseRichSourceFunction.java:189)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
Caused by: 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException:
 Invalid default value
    at 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
    at 
io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:415)
    ... 25 more
Caused by: 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException:
 Invalid Java object for schema 
"org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal" 
with type BYTES: class java.lang.String
    at 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
    at 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)

 

 

The issue has been identified in the following code:

/Users/david/.m2/repository/org/apache/flink/flink-sql-connector-mysql-cdc/3.2-SNAPSHOT/flink-sql-connector-mysql-cdc-3.2-SNAPSHOT.jar!/org/apache/flink/cdc/connectors/shaded/org/apache/kafka/connect/data/ConnectSchema.class

!1.png!

 

  was:
When the table schema contains the following field:

{color:#000000}`tenant_id`{color} 
{color:#000080}bigint{color}({color:#0000ff}20{color}) 
{color:#800000}unsigned{color} {color:#800000}NOT{color} 
{color:#800000}NULL{color} {color:#800000}DEFAULT{color} 
{color:#008000}'0'{color} {color:#000000}COMMENT{color} {color:#008000}'Tenant 
ID'{color},

 

The following exception is thrown:

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
    at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
    at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
    at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: io.debezium.DebeziumException: Failed to set field default value for 
'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default 
value is 0 of type class java.lang.String
    at 
io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421)
    at 
io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at 
io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:147)
    at 
io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:135)
    at 
io.debezium.relational.RelationalDatabaseSchema.refreshSchema(RelationalDatabaseSchema.java:209)
    at 
io.debezium.relational.RelationalDatabaseSchema.refresh(RelationalDatabaseSchema.java:200)
    at 
org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.getTableSchema(OceanBaseRichSourceFunction.java:304)
    at 
org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.lambda$readSnapshotRecordsByTable$2(OceanBaseRichSourceFunction.java:320)
    at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:555)
    at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496)
    at 
org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readSnapshotRecordsByTable(OceanBaseRichSourceFunction.java:317)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readSnapshotRecords(OceanBaseRichSourceFunction.java:309)
    at 
org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.run(OceanBaseRichSourceFunction.java:189)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
Caused by: 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException:
 Invalid default value
    at 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
    at 
io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:415)
    ... 25 more
Caused by: 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException:
 Invalid Java object for schema 
"org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal" 
with type BYTES: class java.lang.String
    at 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
    at 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)

 

 

The issue has been identified in the following code:

/Users/david/.m2/repository/org/apache/flink/flink-sql-connector-mysql-cdc/3.2-SNAPSHOT/flink-sql-connector-mysql-cdc-3.2-SNAPSHOT.jar!/org/apache/flink/cdc/connectors/shaded/org/apache/kafka/connect/data/ConnectSchema.class

 

 


> Flink CDC OceanBase Connector Throws Exception for Column Default Value 
> ‘DEFAULT ‘0’
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-37254
>                 URL: https://issues.apache.org/jira/browse/FLINK-37254
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.1.0
>         Environment: flink 1.18.1
> flink cdc 3.1.0
> oceanbase 5.7.25-OceanBase_CE-v4.1.0.2
>  
>            Reporter: caixiaowei
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: 1.png
>
>
> When the table schema contains the following field:
> {color:#000000}`tenant_id`{color} 
> {color:#000080}bigint{color}({color:#0000ff}20{color}) 
> {color:#800000}unsigned{color} {color:#800000}NOT{color} 
> {color:#800000}NULL{color} {color:#800000}DEFAULT{color} 
> {color:#008000}'0'{color} {color:#000000}COMMENT{color} 
> {color:#008000}'Tenant ID'{color},
> The following exception is thrown:
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>     at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
>     at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
>     at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
>     at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
>     at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
>     at 
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
>     at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
>     at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
>     at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
>     at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
>     at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
>     at 
> java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
>     at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
> Caused by: io.debezium.DebeziumException: Failed to set field default value 
> for 'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the 
> default value is 0 of type class java.lang.String
>     at 
> io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421)
>     at 
> io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149)
>     at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
>     at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>     at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
>     at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
>     at 
> io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:147)
>     at 
> io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:135)
>     at 
> io.debezium.relational.RelationalDatabaseSchema.refreshSchema(RelationalDatabaseSchema.java:209)
>     at 
> io.debezium.relational.RelationalDatabaseSchema.refresh(RelationalDatabaseSchema.java:200)
>     at 
> org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.getTableSchema(OceanBaseRichSourceFunction.java:304)
>     at 
> org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.lambda$readSnapshotRecordsByTable$2(OceanBaseRichSourceFunction.java:320)
>     at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:555)
>     at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496)
>     at 
> org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readSnapshotRecordsByTable(OceanBaseRichSourceFunction.java:317)
>     at java.lang.Iterable.forEach(Iterable.java:75)
>     at 
> org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readSnapshotRecords(OceanBaseRichSourceFunction.java:309)
>     at 
> org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.run(OceanBaseRichSourceFunction.java:189)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
> Caused by: 
> org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException:
>  Invalid default value
>     at 
> org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
>     at 
> io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:415)
>     ... 25 more
> Caused by: 
> org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException:
>  Invalid Java object for schema 
> "org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal"
>  with type BYTES: class java.lang.String
>     at 
> org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
>     at 
> org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
>     at 
> org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
>  
>  
> The issue has been identified in the following code:
> /Users/david/.m2/repository/org/apache/flink/flink-sql-connector-mysql-cdc/3.2-SNAPSHOT/flink-sql-connector-mysql-cdc-3.2-SNAPSHOT.jar!/org/apache/flink/cdc/connectors/shaded/org/apache/kafka/connect/data/ConnectSchema.class
> !1.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to