ksrihari93 opened a new issue, #5361:
URL: https://github.com/apache/hudi/issues/5361

   Hi Team,
   We are running cdc pipelines with Mysql, Kafka Connect,Debezium and Hudi ( 
Delta streamer).And Schema n schema registry
   For streaming pipelines, Hudi Job is running fine with proper schema 
integration between MySQL and Kafka topic.
   
   while backfilling the data using Delta streamer JDBC source we are seeing 
the mismatch between Debezium generated schema in schema registry and Hudi 
Generated. This is making the job Fail.
   
   mysql> desc  ev_test;
   
+--------------+---------------------+------+-----+----------------------+----------------+
   | Field        | Type                | Null | Key | Default              | 
Extra          |
   
+--------------+---------------------+------+-----+----------------------+----------------+
   | id           | bigint(20) unsigned | NO   | PRI | NULL                 | 
auto_increment |
   | Name         | varchar(20)         | NO   |     | NULL                 |   
             |
   | created_at   | timestamp           | NO   |     | CURRENT_TIMESTAMP    |   
             |
   | date_entered | datetime            | NO   |     | CURRENT_TIMESTAMP    |   
             |
   | dt1          | datetime(3)         | NO   |     | CURRENT_TIMESTAMP(3) |   
             |
   | ts1          | timestamp(3)        | NO   |     | CURRENT_TIMESTAMP(3) |   
             |
   | time1        | datetime            | NO   |     | CURRENT_TIMESTAMP    |   
             |
   
+--------------+---------------------+------+-----+----------------------+----------------+
   
   Hudi generated Schema for the table :
   
   {
     "type": "record",
     "name": "hoodie_source",
     "namespace": "hoodie.source",
     "fields": [
       {
         "name": "id",
         "type": [
           "null",
           {
             "type": "fixed",
             "name": "fixed",
             "namespace": "hoodie.source.hoodie_source.id",
             "size": 9,
             "logicalType": "decimal",
             "precision": 20,
             "scale": 0
           }
         ],
         "default": null
       },
       {
         "name": "Name",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "created_at",
         "type": [
           "null",
           {
             "type": "long",
             "logicalType": "timestamp-micros"
           }
         ],
         "default": null
       },
       {
         "name": "date_entered",
         "type": [
           "null",
           {
             "type": "long",
             "logicalType": "timestamp-micros"
           }
         ],
         "default": null
       },
       {
         "name": "dt1",
         "type": [
           "null",
           {
             "type": "long",
             "logicalType": "timestamp-micros"
           }
         ],
         "default": null
       },
       {
         "name": "ts1",
         "type": [
           "null",
           {
             "type": "long",
             "logicalType": "timestamp-micros"
           }
         ],
         "default": null
       },
       {
         "name": "time1",
         "type": [
           "null",
           {
             "type": "long",
             "logicalType": "timestamp-micros"
           }
         ],
         "default": null
       }
     ]
   }
   
   Debezium Generated schema:
   
   {
     "type": "record",
     "name": "Value",
     "namespace": "ev_orders_test.blackbox_prod.ev_test",
     "fields": [
       {
         "name": "id",
         "type": "long"
       },
       {
         "name": "Name",
         "type": "string"
       },
       {
         "name": "created_at",
         "type": [
           {
             "type": "string",
             "connect.default": "1970-01-01 05:30:00"
           },
           "null"
         ],
         "default": "1970-01-01 05:30:00"
       },
       {
         "name": "date_entered",
         "type": [
           {
             "type": "string",
             "connect.default": "0"
           },
           "null"
         ],
         "default": "0"
       },
       {
         "name": "dt1",
         "type": [
           {
             "type": "string",
             "connect.default": "0"
           },
           "null"
         ],
         "default": "0"
       },
       {
         "name": "ts1",
         "type": [
           {
             "type": "string",
             "connect.default": "1970-01-01 05:30:00"
           },
           "null"
         ],
         "default": "1970-01-01 05:30:00"
       },
       {
         "name": "time1",
         "type": [
           {
             "type": "string",
             "connect.default": "0"
           },
           "null"
         ],
         "default": "0"
       },
       {
         "name": "__op",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "__ts_ms",
         "type": [
           "null",
           "long"
         ],
         "default": null
       },
       {
         "name": "__source_ts_ms",
         "type": [
           "null",
           "long"
         ],
         "default": null
       },
       {
         "name": "__source_connector",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "__source_snapshot",
         "type": [
           "null",
           {
             "type": "string",
             "connect.version": 1,
             "connect.parameters": {
               "allowed": "true,last,false"
             },
             "connect.name": "io.debezium.data.Enum"
           }
         ],
         "default": null
       },
       {
         "name": "__source_query",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "__deleted",
         "type": [
           "null",
           "string"
         ],
         "default": null
       }
     ],
     "connect.name": "ev_test"
   }
   when pointed the Hoodie delta streamer job with Debezium generated schema 
it's getting failed with below error.Can someone please help in maintaining 
schema integrity between stream pipelines and backfilling pipelines .
   
   22514 [consumer-thread-1] ERROR org.apache.hudi.io.HoodieWriteHandle  - 
Error writing record HoodieRecord{key=HoodieKey { recordKey=106 
partitionPath=dt=2021-12-28}, currentLocation='null', newLocation='null'}
   java.io.EOFException
        at 
org.apache.avro.io.BinaryDecoder$ByteArrayByteSource.readRaw(BinaryDecoder.java:944)
        at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:349)
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
        at 
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
        at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
        at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:137)
        at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:127)
        at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75)
        at 
org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105)
        at 
org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
        at 
org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
        at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:105)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   Job Conf:
   below conf .
   --target-base-path
   /Users/hudi_op55/
   --table-type
   COPY_ON_WRITE
   --target-table
   test
   --source-class
   org.apache.hudi.utilities.sources.JdbcSource
   --op
   BULK_INSERT
   --source-ordering-field
   ts1
   --props
   /Users/jdbcF.properties
   --schemaprovider-class
   org.apache.hudi.utilities.schema.SchemaRegistryProvider
   
   Properties File:
   
   hoodie.deltastreamer.jdbc.url=jdbc:mysql://localhost:3306/sys
   hoodie.deltastreamer.jdbc.driver.class=com.mysql.cj.jdbc.Driver
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
   hoodie.datasource.write.recordkey.field=id
   hoodie.datasource.write.partitionpath.field=created_at
   hoodie.deltastreamer.jdbc.incr.pull=false
   ​
   hoodie.deltastreamer.jdbc.user=
   hoodie.deltastreamer.jdbc.password=
   hoodie.deltastreamer.jdbc.table.name=blackbox_prod.ev_test
   ​
   ​
   hoodie.deltastreamer.keygen.timebased.timestamp.type=SCALAR
   hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd hh:mm:ss
   hoodie.deltastreamer.keygen.timebased.timezone=IST
   hoodie.deltastreamer.keygen.timebased.output.dateformat='dt='yyyy-MM-dd
   hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit=MICROSECONDS
   ​
   ​
   schema.registry.url=http://localhost:9000/
   hoodie.deltastreamer.schemaprovider.registry.url=
   hoodie.deltastreamer.schemaprovider.registry.targetUrl=
   hoodie.deltastreamer.keygen.timebased.timestamp.type=SCALAR
   hoodie.deltastreamer.keygen.timebased.timezone=IST
   hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit=MICROSECONDS
   ​
   
hoodie.deltastreamer.schemaprovider.schema_post_processor=org.apache.hudi.utilities.schema.SparkAvroPostProcessor
   
        
        
        


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to