joeytman opened a new issue, #7973:
URL: https://github.com/apache/hudi/issues/7973

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   Yes
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   Unknown yet if it's a bug or user error
   
   **Describe the problem you faced**
   
   I'm trying to set up a DeltaStreamer job that consumes Avro changelog from 
Kafka and writes to s3. The changelog is created by Debezium from Vitess 
binlog, writing schemas to the confluent-api-compatible apicurio registry. I've 
gone ahead and implemented a VitessDebeziumSource and VitessDebeziumAvroPayload 
and can see the envelope is deserialized correctly and the flattened dataset 
looks good from logs within `VitessDebeziumSource.processDataset`. However, 
writes are later failing when trying to deserialize records 
[here](https://github.com/apache/hudi/blob/60dfe4d766d6050bb811e4f7d51b88126b5d5575/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L169-L174)
 (invoked 
[here](https://github.com/apache/hudi/blob/2a3b0b5af8ac03d0b5940ca1d77bf9b71f9040eb/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java#L87)).
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Compile hudi-utilities-bundle w/ new `VitesssDebeziumSource.java` and 
`VitessDebeziumAvroPayload.java`. My implementation of each is:
   
   `VitessDebeziumSource`:
   ```
   package org.apache.hudi.utilities.sources.debezium;
   
   import org.apache.hudi.common.config.TypedProperties;
   import org.apache.hudi.common.model.debezium.DebeziumConstants;
   import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
   import org.apache.hudi.utilities.schema.SchemaProvider;
   
   import org.apache.spark.api.java.JavaSparkContext;
   import org.apache.spark.sql.Dataset;
   import org.apache.spark.sql.Row;
   import org.apache.spark.sql.SparkSession;
   
   import org.apache.log4j.LogManager;
   import org.apache.log4j.Logger;
   
   /**
    * Source for incrementally ingesting debezium generated change logs for 
Vitess DB.
    */
   public class VitessDebeziumSource extends DebeziumSource {
   
       private static final Logger LOG = 
LogManager.getLogger(VitessDebeziumSource.class);
   
       public VitessDebeziumSource(TypedProperties props, JavaSparkContext 
sparkContext,
                                   SparkSession sparkSession,
                                   SchemaProvider schemaProvider,
                                   HoodieDeltaStreamerMetrics metrics) {
           super(props, sparkContext, sparkSession, schemaProvider, metrics);
       }
   
       /**
        * Debezium Kafka Payload has a nested structure (see 
https://debezium.io/documentation/reference/2.1/connectors/vitess.html).
        * This function flattens this nested structure for the Vitess data, and 
also extracts a subset of Debezium metadata fields.
        *
        * @param rowDataset Dataset containing Debezium Payloads
        * @return New dataset with flattened columns
        */
       @Override
       protected Dataset<Row> processDataset(Dataset<Row> rowDataset) {
           Dataset<Row> flattenedDataset = rowDataset;
           if (rowDataset.columns().length > 0) {
               // Only flatten for non-empty schemas
               Dataset<Row> insertedOrUpdatedData = rowDataset
                       .selectExpr(
                               String.format("%s as %s", 
DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME),
                               String.format("%s as %s", 
DebeziumConstants.INCOMING_TS_MS_FIELD, 
DebeziumConstants.FLATTENED_TS_COL_NAME), // TODO: use source.ts_ms
                               String.format("%s.*", 
DebeziumConstants.INCOMING_AFTER_FIELD)
                       )
                       
.filter(rowDataset.col(DebeziumConstants.INCOMING_OP_FIELD).notEqual(DebeziumConstants.DELETE_OP));
   
               Dataset<Row> deletedData = rowDataset
                       .selectExpr(
                               String.format("%s as %s", 
DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME),
                               String.format("%s as %s", 
DebeziumConstants.INCOMING_TS_MS_FIELD, 
DebeziumConstants.FLATTENED_TS_COL_NAME), // TODO: use source.ts_ms
                               String.format("%s.*", 
DebeziumConstants.INCOMING_BEFORE_FIELD)
                       )
                       
.filter(rowDataset.col(DebeziumConstants.INCOMING_OP_FIELD).equalTo(DebeziumConstants.DELETE_OP));
   
               flattenedDataset = insertedOrUpdatedData.union(deletedData);
           }
   
           LOG.error("VitessDebeziumSource.processDataset 
flattenedDataset.schema: " + flattenedDataset.schema());
           LOG.error("VitessDebeziumSource.processDataset 
flattenedDataset.showString: " + flattenedDataset.showString(10, 0, false));
   
           return flattenedDataset;
       }
   
   }
   ```
   
   `VitessDebeziumAvroPayload`:
   ```
   
   package org.apache.hudi.common.model.debezium;
   
   import org.apache.hudi.common.util.Option;
   import org.apache.hudi.exception.HoodieException;
   
   import org.apache.avro.Schema;
   import org.apache.avro.generic.GenericRecord;
   import org.apache.avro.generic.IndexedRecord;
   import org.apache.log4j.LogManager;
   import org.apache.log4j.Logger;
   
   import java.io.IOException;
   import java.util.Objects;
   
   /**
    * Provides support for seamlessly applying changes captured via Debezium 
for MysqlDB.
    * <p>
    * Debezium change event types are determined for the op field in the payload
    * <p>
    * - For inserts, op=i
    * - For deletes, op=d
    * - For updates, op=u
    * - For snapshot inserts, op=r
    * <p>
    * This payload implementation will issue matching insert, delete, updates 
against the hudi table
    */
   public class VitessDebeziumAvroPayload extends AbstractDebeziumAvroPayload {
   
       private static final Logger LOG = 
LogManager.getLogger(VitessDebeziumAvroPayload.class);
   
       public VitessDebeziumAvroPayload(GenericRecord record, Comparable 
orderingVal) {
           super(record, orderingVal);
       }
   
       public VitessDebeziumAvroPayload(Option<GenericRecord> record) {
           super(record);
       }
   
       private Option<String> extractSeq(IndexedRecord record) {
           Object value = ((GenericRecord) 
record).get(DebeziumConstants.FLATTENED_TS_COL_NAME);
           return Option.ofNullable(Objects.toString(value, null));
       }
   
       @Override
       protected boolean shouldPickCurrentRecord(IndexedRecord currentRecord, 
IndexedRecord insertRecord, Schema schema) throws IOException {
           String insertSourceSeq = extractSeq(insertRecord)
                   .orElseThrow(() ->
                           new HoodieException(String.format("%s cannot be null 
in insert record: %s",
                                   DebeziumConstants.FLATTENED_TS_COL_NAME, 
insertRecord)));
           Option<String> currentSourceSeqOpt = extractSeq(currentRecord);
           // Pick the current value in storage only if its Seq (source.ts_ms) 
is latest
           // compared to the Seq (source.ts_ms) of the insert value
           return currentSourceSeqOpt.isPresent() && 
insertSourceSeq.compareTo(currentSourceSeqOpt.get()) < 0;
       }
   }
   
   ```
   
   2. Launch DeltaStreamer job via EMR CLI w/ 
   ```
   spark-submit \
       --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
/home/jthaidigsman/hudi-utilities-bundle_2.12-0.12.1.jar  \
       --target-table db.table \
       --target-base-path s3://redacted/hudi/table/ \
       --table-type COPY_ON_WRITE \
       --source-class 
org.apache.hudi.utilities.sources.debezium.VitessDebeziumSource \
       --payload-class 
org.apache.hudi.common.model.debezium.VitessDebeziumAvroPayload \
       --source-ordering-field _event_origin_ts_ms \
       --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
       --checkpoint redacted.redacted.redacted,0:0 \
       --hoodie-conf hoodie.datasource.write.recordkey.field=col1,col2 \
       --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
 \
       --hoodie-conf hoodie.datasource.write.partitionpath.field="" \
       --hoodie-conf bootstrap.servers=redacted:9092 \
       --hoodie-conf hoodie.deltastreamer.source.kafka.topic=redacted \
       --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=http://redacted:8080/apis/ccompat/v6/subjects/redacted-value/versions/latest/
 \
       --hoodie-conf schema.registry.url=http://redacted:8080/apis/ccompat/v6/ \
       --hoodie-conf 
hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
 
   ```
   
   3. Observe deserialization error.
   
   **Expected behavior**
   
   In 
[RFC-39](https://github.com/apache/hudi/blob/release-0.12.1/rfc/rfc-39/rfc-39.md),
 its stated that:
   > Since we change the schema of the incoming record in the source class, we 
have to provide a schema for the target record. We propose to implement 
DebeziumAvroSource.java as a RowSource and allow spark to infer the schema of 
the transformed record. An alternative approach is to implement a 
DebeziumSchemaRegistryProvider.java class that extends the current 
SchemaRegistryProvider.java, and implements the method getTargetSchema . It 
constructs the target schema from the original schema by including only the 
fields nested within the after field of the original record, along with the 
meta fields that were actually ingested.
   
   From my understanding of this paragraph, at the point in the write operation 
we're failing at, we should be inferring the schema from the record which has 
already been deserialized and flattened via `VitessDebeziumSource`. However, 
adding some logs to the `bytesToAvro` method shows that the `writerSchema` and 
`readerSchema` are set to the schema of the debezium envelope, rather than the 
schema of the record we're writing. This goes against my understanding of how 
the flow should work, as it seems like it's using the target schema returned 
from the `SchemaRegistryProvider` rather than inferring the schema. 
   
   I also tried implementing DebeziumSchemaRegistryProvider with a new 
`getTargetSchema` as mentioned as an alternative in the RFC, and was able to 
see that the schemas passed to `bytesToAvro` were now set to the formatted 
schema produced by `DebeziumSchemaRegistryProvider`, confirming that the schema 
inference flow described by the RFC is not being triggered.
   
   
   Is there some config that needs to be set so that Deltastreamer will write 
with schema of the record produced by VitessDebeziumSource ? Or am I 
misunderstanding how this is supposed to work?
   
   
   **Environment Description**
   
   * Hudi version : 0.12.1 (also tried on 0.13.0 and 0.14.0 and got the same 
error)
   
   * Spark version : 3.3.0-amzn-1
   
   * Hive version : Hive 3.1.3-amzn-2
   
   * Hadoop version : Hadoop 3.3.3-amzn-1
   
   * EMR version : emr-6.9.0
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional context**
   
   Debezium Envelope Schema (from schema registry):
   ```
   {
     "type": "record",
     "name": "Envelope",
     "namespace": "redacted.redacted.redacted",
     "fields": [
       {
         "name": "before",
         "type": [
           "null",
           {
             "type": "record",
             "name": "Value",
             "fields": [
               {
                 "name": "col1",
                 "type": "string"
               },
               {
                 "name": "col2",
                 "type": "string"
               },
               {
                 "name": "col3",
                 "type": "string"
               },
               {
                 "name": "col4",
                 "type": "long"
               },
               {
                 "name": "col5",
                 "type": "long"
               },
               {
                 "name": "col6",
                 "type": "string"
               },
               {
                 "name": "col7",
                 "type": "string"
               },
               {
                 "name": "col8",
                 "type": {
                   "type": "int",
                   "connect.type": "int16"
                 }
               },
               {
                 "name": "col9",
                 "type": {
                   "type": "int",
                   "connect.type": "int16"
                 }
               },
               {
                 "name": "col10",
                 "type": {
                   "type": "int",
                   "connect.type": "int16"
                 }
               },
               {
                 "name": "col11",
                 "type": {
                   "type": "int",
                   "connect.type": "int16"
                 }
               },
               {
                 "name": "col12",
                 "type": "string"
               },
               {
                 "name": "col13",
                 "type": "string"
               },
               {
                 "name": "col14",
                 "type": {
                   "type": "int",
                   "connect.type": "int16"
                 }
               },
               {
                 "name": "col15",
                 "type": "string"
               }
             ],
             "connect.name": "sandbox.byuser.channels_members.Value"
           }
         ],
         "default": null
       },
       {
         "name": "after",
         "type": [
           "null",
           "Value"
         ],
         "default": null
       },
       {
         "name": "source",
         "type": {
           "type": "record",
           "name": "Source",
           "namespace": "io.debezium.connector.vitess",
           "fields": [
             {
               "name": "version",
               "type": "string"
             },
             {
               "name": "connector",
               "type": "string"
             },
             {
               "name": "name",
               "type": "string"
             },
             {
               "name": "ts_ms",
               "type": "long"
             },
             {
               "name": "snapshot",
               "type": [
                 {
                   "type": "string",
                   "connect.version": 1,
                   "connect.parameters": {
                     "allowed": "true,last,false,incremental"
                   },
                   "connect.default": "false",
                   "connect.name": "io.debezium.data.Enum"
                 },
                 "null"
               ],
               "default": "false"
             },
             {
               "name": "db",
               "type": "string"
             },
             {
               "name": "sequence",
               "type": [
                 "null",
                 "string"
               ],
               "default": null
             },
             {
               "name": "keyspace",
               "type": "string"
             },
             {
               "name": "table",
               "type": "string"
             },
             {
               "name": "vgtid",
               "type": "string"
             }
           ],
           "connect.name": "io.debezium.connector.vitess.Source"
         }
       },
       {
         "name": "op",
         "type": "string"
       },
       {
         "name": "ts_ms",
         "type": [
           "null",
           "long"
         ],
         "default": null
       },
       {
         "name": "transaction",
         "type": [
           "null",
           {
             "type": "record",
             "name": "block",
             "namespace": "event",
             "fields": [
               {
                 "name": "id",
                 "type": "string"
               },
               {
                 "name": "total_order",
                 "type": "long"
               },
               {
                 "name": "data_collection_order",
                 "type": "long"
               }
             ],
             "connect.version": 1,
             "connect.name": "event.block"
           }
         ],
         "default": null
       }
     ],
     "connect.version": 1,
     "connect.name": "redacted.redacted.redacted.Envelope"
   }
   
   ```
   
   From within the `VitessDebeziumSource` logs we can see that the packets from 
kafka are ingested, deserialized, and flattened properly:
   ```
   23/02/15 20:40:34 ERROR VitessDebeziumSource: 
VitessDebeziumSource.processDataset flattenedDataset.showString: 
+----------------------+-------------------+-------------+-------------+---------------+-----------+------------+---------+-------------+-------+------------+--------------------+----------+--------------+-------------------+----------------------+-------------+
   |_change_operation_type|_event_origin_ts_ms|col1      |col2   |col3 |col4 
|col5 |col6 |col7 |col8 |col9 |col10 |col11 |col12 |col13 |col14 |col15 |
   
+----------------------+-------------------+-------------+-------------+---------------+-----------+------------+---------+-------------+-------+------------+--------------------+----------+--------------+-------------------+----------------------+-------------+
   |c                     |1676473534195      
|1153411137283|1152398039042|1152398038962  |1676473534 |0           |0        
|0            |0      |1           |1                   |0         |1           
  |1152398038962      |0                     |1152398038962|
   |c                     |1676473536740      
|1153411137283|1152398039074|1152398038962  |1676473536 |0           |0        
|0            |0      |1           |1                   |0         
|1153411137283 |0                  |0                     |1152398038962|
   |u                     |1676473536792      
|1153411137283|1152398039074|1152398038962  |1676473536 |0           |0        
|0            |1      |1           |1                   |0         
|1153411137283 |0                  |0                     |1152398038962|
   |c                     |1676473542584      
|1152398039026|1153411137507|1152398039010  |1676473542 |0           |0        
|0            |0      |1           |1                   |0         |1           
  |1152398039010      |0                     |1152398039010|
   |c                     |1676473544236      
|1152398039026|1152398039234|1152398039010  |1676473543 |0           |0        
|0            |0      |1           |1                   |0         
|1152398039026 |0                  |0                     |1152398039010|
   |u                     |1676473544286      
|1152398039026|1152398039234|1152398039010  |1676473543 |0           |0        
|0            |1      |1           |1                   |0         
|1152398039026 |0                  |0                     |1152398039010|
   |c                     |1676473544326      
|1153411137283|1152398039154|1152398038962  |1676473544 |0           |0        
|0            |0      |0           |0                   |0         |0           
  |0                  |0                     |1152398038962|
   |c                     |1676473554065      
|1152398039426|1153411137939|1152398039362  |1676473554 |0           |0        
|0            |0      |1           |1                   |0         |1           
  |1152398039362      |0                     |1152398039362|
   |c                     |1676473555721      
|1152398039426|1152398039666|1152398039362  |1676473555 |0           |0        
|0            |0      |1           |1                   |0         
|1152398039426 |0                  |0                     |1152398039362|
   |u                     |1676473555815      
|1152398039426|1152398039666|1152398039362  |1676473555 |0           |0        
|0            |1      |1           |1                   |0         
|1152398039426 |0                  |0                     |1152398039362|
   
+----------------------+-------------------+-------------+-------------+---------------+-----------+------------+---------+-------------+-------+------------+--------------------+----------+--------------+-------------------+----------------------+-------------+
   ```
   
   But from logs within `avroToBytes` we can see that the `writerSchema` and 
`readerSchema` are still set to the Debezium Envelope schema, rather than 
inferring the schema from the flattened dataset:
   ```
   23/02/15 20:29:49 ERROR HoodieAvroUtils: bytesToAvro writerSchema: 
{"type":"record","name":"Envelope","namespace":"redacted.redacted.redacted","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"col1","type":"string"},{"name":"col2","type":"string"},{"name":"col3","type":"string"},{"name":"col4","type":"long"},{"name":"col5","type":"long"},{"name":"col6","type":"string"},{"name":"col7","type":"string"},{"name":"col8","type":{"type":"int","connect.type":"int16"}},{"name":"col9","type":{"type":"int","connect.type":"int16"}},{"name":"col10","type":{"type":"int","connect.type":"int16"}},{"name":"col11","type":{"type":"int","connect.type":"int16"}},{"name":"col12","type":"string"},{"name":"col13","type":"string"},{"name":"col14","type":{"type":"int","connect.type":"int16"}},{"name":"col15","type":"string"}],"connect.name":"sandbox.byuser.channels_members.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"so
 
urce","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.vitess","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"keyspace","type":"string"},{"name":"table","type":"string"},{"name":"vgtid","type":"string"}],"connect.name":"io.debezium.connector.vitess.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"block","namespace":"event","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long
 
"}],"connect.version":1,"connect.name":"event.block"}],"default":null}],"connect.version":1,"connect.name":"redacted.redacted.redacted.Envelope"}
   23/02/15 20:29:49 ERROR HoodieAvroUtils: bytesToAvro readerSchema: 
{"type":"record","name":"Envelope","namespace":"redacted.redacted.redacted","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"col1","type":"string"},{"name":"col2","type":"string"},{"name":"col3","type":"string"},{"name":"col4","type":"long"},{"name":"col5","type":"long"},{"name":"col6","type":"string"},{"name":"col7","type":"string"},{"name":"col8","type":{"type":"int","connect.type":"int16"}},{"name":"col9","type":{"type":"int","connect.type":"int16"}},{"name":"col10","type":{"type":"int","connect.type":"int16"}},{"name":"col11","type":{"type":"int","connect.type":"int16"}},{"name":"col12","type":"string"},{"name":"col13","type":"string"},{"name":"col14","type":{"type":"int","connect.type":"int16"}},{"name":"col15","type":"string"}],"connect.name":"sandbox.byuser.channels_members.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"so
 
urce","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.vitess","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"keyspace","type":"string"},{"name":"table","type":"string"},{"name":"vgtid","type":"string"}],"connect.name":"io.debezium.connector.vitess.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"block","namespace":"event","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long
 
"}],"connect.version":1,"connect.name":"event.block"}],"default":null}],"connect.version":1,"connect.name":"redacted.redacted.redacted.Envelope"}
   ```
   
   
   **Stacktrace**
   Driver:
   ```
   Exception in thread "main" org.apache.hudi.exception.HoodieException: Commit 
20230215204036257 failed and rolled-back !
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:653)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:336)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:204)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:202)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:571)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1006)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1095)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1104)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   ```
   
   Executor logs shows many stacktraces, with two primary variations of errors.
   
   Negative Length error:
   ```
   23/02/15 20:29:50 ERROR HoodieWriteHandle: Error writing record 
HoodieRecord{key=HoodieKey { recordKey=col1:1153431876899,col2:1153431871443 
partitionPath=}, currentLocation='null', newLocation='null'}
   org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: 
-13097524020
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:164) 
~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:152) 
~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertRecord(AbstractDebeziumAvroPayload.java:87)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertValue(AbstractDebeziumAvroPayload.java:58)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_362]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_362]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_362]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_362]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_362]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]
   
   ```
   
   String too long error:
   ```
   3/02/15 20:29:50 ERROR HoodieWriteHandle: Error writing record 
HoodieRecord{key=HoodieKey { recordKey=col1:1153415312659,col2:1153415313283 
partitionPath=}, currentLocation='null', newLocation='null'}
   java.lang.UnsupportedOperationException: Cannot read strings longer than 
2147483639 bytes
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:305) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
 ~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) 
~[avro-1.11.0.jar:1.11.0]
        at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:164) 
~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:152) 
~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertRecord(AbstractDebeziumAvroPayload.java:87)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertValue(AbstractDebeziumAvroPayload.java:58)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
 ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_362]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_362]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_362]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_362]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_362]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]
   ```
   
   Given that we’ve already deserialized the records once at this point, I 
think the error message is arising from the mismatch between the schema used to 
deserialize and the actual bytes, but let me know if you think something else 
could be going on.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to