ksrihari93 opened a new issue, #5361: URL: https://github.com/apache/hudi/issues/5361
Hi Team, We are running cdc pipelines with Mysql, Kafka Connect,Debezium and Hudi ( Delta streamer).And Schema n schema registry For streaming pipelines, Hudi Job is running fine with proper schema integration between MySQL and Kafka topic. while backfilling the data using Delta streamer JDBC source we are seeing the mismatch between Debezium generated schema in schema registry and Hudi Generated. This is making the job Fail. mysql> desc ev_test; +--------------+---------------------+------+-----+----------------------+----------------+ | Field | Type | Null | Key | Default | Extra | +--------------+---------------------+------+-----+----------------------+----------------+ | id | bigint(20) unsigned | NO | PRI | NULL | auto_increment | | Name | varchar(20) | NO | | NULL | | | created_at | timestamp | NO | | CURRENT_TIMESTAMP | | | date_entered | datetime | NO | | CURRENT_TIMESTAMP | | | dt1 | datetime(3) | NO | | CURRENT_TIMESTAMP(3) | | | ts1 | timestamp(3) | NO | | CURRENT_TIMESTAMP(3) | | | time1 | datetime | NO | | CURRENT_TIMESTAMP | | +--------------+---------------------+------+-----+----------------------+----------------+ Hudi generated Schema for the table : { "type": "record", "name": "hoodie_source", "namespace": "hoodie.source", "fields": [ { "name": "id", "type": [ "null", { "type": "fixed", "name": "fixed", "namespace": "hoodie.source.hoodie_source.id", "size": 9, "logicalType": "decimal", "precision": 20, "scale": 0 } ], "default": null }, { "name": "Name", "type": [ "null", "string" ], "default": null }, { "name": "created_at", "type": [ "null", { "type": "long", "logicalType": "timestamp-micros" } ], "default": null }, { "name": "date_entered", "type": [ "null", { "type": "long", "logicalType": "timestamp-micros" } ], "default": null }, { "name": "dt1", "type": [ "null", { "type": "long", "logicalType": "timestamp-micros" } ], "default": null }, { "name": "ts1", "type": [ "null", { "type": "long", "logicalType": "timestamp-micros" } ], "default": null }, { "name": "time1", "type": [ "null", { "type": "long", "logicalType": "timestamp-micros" } ], "default": null } ] } Debezium Generated schema: { "type": "record", "name": "Value", "namespace": "ev_orders_test.blackbox_prod.ev_test", "fields": [ { "name": "id", "type": "long" }, { "name": "Name", "type": "string" }, { "name": "created_at", "type": [ { "type": "string", "connect.default": "1970-01-01 05:30:00" }, "null" ], "default": "1970-01-01 05:30:00" }, { "name": "date_entered", "type": [ { "type": "string", "connect.default": "0" }, "null" ], "default": "0" }, { "name": "dt1", "type": [ { "type": "string", "connect.default": "0" }, "null" ], "default": "0" }, { "name": "ts1", "type": [ { "type": "string", "connect.default": "1970-01-01 05:30:00" }, "null" ], "default": "1970-01-01 05:30:00" }, { "name": "time1", "type": [ { "type": "string", "connect.default": "0" }, "null" ], "default": "0" }, { "name": "__op", "type": [ "null", "string" ], "default": null }, { "name": "__ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "__source_ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "__source_connector", "type": [ "null", "string" ], "default": null }, { "name": "__source_snapshot", "type": [ "null", { "type": "string", "connect.version": 1, "connect.parameters": { "allowed": "true,last,false" }, "connect.name": "io.debezium.data.Enum" } ], "default": null }, { "name": "__source_query", "type": [ "null", "string" ], "default": null }, { "name": "__deleted", "type": [ "null", "string" ], "default": null } ], "connect.name": "ev_test" } when pointed the Hoodie delta streamer job with Debezium generated schema it's getting failed with below error.Can someone please help in maintaining schema integrity between stream pipelines and backfilling pipelines . 22514 [consumer-thread-1] ERROR org.apache.hudi.io.HoodieWriteHandle - Error writing record HoodieRecord{key=HoodieKey { recordKey=106 partitionPath=dt=2021-12-28}, currentLocation='null', newLocation='null'} java.io.EOFException at org.apache.avro.io.BinaryDecoder$ByteArrayByteSource.readRaw(BinaryDecoder.java:944) at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:349) at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:137) at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:127) at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75) at org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105) at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90) at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:105) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Job Conf: below conf . --target-base-path /Users/hudi_op55/ --table-type COPY_ON_WRITE --target-table test --source-class org.apache.hudi.utilities.sources.JdbcSource --op BULK_INSERT --source-ordering-field ts1 --props /Users/jdbcF.properties --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider Properties File: hoodie.deltastreamer.jdbc.url=jdbc:mysql://localhost:3306/sys hoodie.deltastreamer.jdbc.driver.class=com.mysql.cj.jdbc.Driver hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=created_at hoodie.deltastreamer.jdbc.incr.pull=false hoodie.deltastreamer.jdbc.user= hoodie.deltastreamer.jdbc.password= hoodie.deltastreamer.jdbc.table.name=blackbox_prod.ev_test hoodie.deltastreamer.keygen.timebased.timestamp.type=SCALAR hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd hh:mm:ss hoodie.deltastreamer.keygen.timebased.timezone=IST hoodie.deltastreamer.keygen.timebased.output.dateformat='dt='yyyy-MM-dd hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit=MICROSECONDS schema.registry.url=http://localhost:9000/ hoodie.deltastreamer.schemaprovider.registry.url= hoodie.deltastreamer.schemaprovider.registry.targetUrl= hoodie.deltastreamer.keygen.timebased.timestamp.type=SCALAR hoodie.deltastreamer.keygen.timebased.timezone=IST hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit=MICROSECONDS hoodie.deltastreamer.schemaprovider.schema_post_processor=org.apache.hudi.utilities.schema.SparkAvroPostProcessor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org