joeytman opened a new issue, #7973: URL: https://github.com/apache/hudi/issues/7973
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? Yes - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. Unknown yet if it's a bug or user error **Describe the problem you faced** I'm trying to set up a DeltaStreamer job that consumes Avro changelog from Kafka and writes to s3. The changelog is created by Debezium from Vitess binlog, writing schemas to the confluent-api-compatible apicurio registry. I've gone ahead and implemented a VitessDebeziumSource and VitessDebeziumAvroPayload and can see the envelope is deserialized correctly and the flattened dataset looks good from logs within `VitessDebeziumSource.processDataset`. However, writes are later failing when trying to deserialize records [here](https://github.com/apache/hudi/blob/60dfe4d766d6050bb811e4f7d51b88126b5d5575/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L169-L174) (invoked [here](https://github.com/apache/hudi/blob/2a3b0b5af8ac03d0b5940ca1d77bf9b71f9040eb/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java#L87)). **To Reproduce** Steps to reproduce the behavior: 1. Compile hudi-utilities-bundle w/ new `VitesssDebeziumSource.java` and `VitessDebeziumAvroPayload.java`. My implementation of each is: `VitessDebeziumSource`: ``` package org.apache.hudi.utilities.sources.debezium; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.debezium.DebeziumConstants; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; /** * Source for incrementally ingesting debezium generated change logs for Vitess DB. */ public class VitessDebeziumSource extends DebeziumSource { private static final Logger LOG = LogManager.getLogger(VitessDebeziumSource.class); public VitessDebeziumSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) { super(props, sparkContext, sparkSession, schemaProvider, metrics); } /** * Debezium Kafka Payload has a nested structure (see https://debezium.io/documentation/reference/2.1/connectors/vitess.html). * This function flattens this nested structure for the Vitess data, and also extracts a subset of Debezium metadata fields. * * @param rowDataset Dataset containing Debezium Payloads * @return New dataset with flattened columns */ @Override protected Dataset<Row> processDataset(Dataset<Row> rowDataset) { Dataset<Row> flattenedDataset = rowDataset; if (rowDataset.columns().length > 0) { // Only flatten for non-empty schemas Dataset<Row> insertedOrUpdatedData = rowDataset .selectExpr( String.format("%s as %s", DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME), // TODO: use source.ts_ms String.format("%s.*", DebeziumConstants.INCOMING_AFTER_FIELD) ) .filter(rowDataset.col(DebeziumConstants.INCOMING_OP_FIELD).notEqual(DebeziumConstants.DELETE_OP)); Dataset<Row> deletedData = rowDataset .selectExpr( String.format("%s as %s", DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME), // TODO: use source.ts_ms String.format("%s.*", DebeziumConstants.INCOMING_BEFORE_FIELD) ) .filter(rowDataset.col(DebeziumConstants.INCOMING_OP_FIELD).equalTo(DebeziumConstants.DELETE_OP)); flattenedDataset = insertedOrUpdatedData.union(deletedData); } LOG.error("VitessDebeziumSource.processDataset flattenedDataset.schema: " + flattenedDataset.schema()); LOG.error("VitessDebeziumSource.processDataset flattenedDataset.showString: " + flattenedDataset.showString(10, 0, false)); return flattenedDataset; } } ``` `VitessDebeziumAvroPayload`: ``` package org.apache.hudi.common.model.debezium; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.Objects; /** * Provides support for seamlessly applying changes captured via Debezium for MysqlDB. * <p> * Debezium change event types are determined for the op field in the payload * <p> * - For inserts, op=i * - For deletes, op=d * - For updates, op=u * - For snapshot inserts, op=r * <p> * This payload implementation will issue matching insert, delete, updates against the hudi table */ public class VitessDebeziumAvroPayload extends AbstractDebeziumAvroPayload { private static final Logger LOG = LogManager.getLogger(VitessDebeziumAvroPayload.class); public VitessDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); } public VitessDebeziumAvroPayload(Option<GenericRecord> record) { super(record); } private Option<String> extractSeq(IndexedRecord record) { Object value = ((GenericRecord) record).get(DebeziumConstants.FLATTENED_TS_COL_NAME); return Option.ofNullable(Objects.toString(value, null)); } @Override protected boolean shouldPickCurrentRecord(IndexedRecord currentRecord, IndexedRecord insertRecord, Schema schema) throws IOException { String insertSourceSeq = extractSeq(insertRecord) .orElseThrow(() -> new HoodieException(String.format("%s cannot be null in insert record: %s", DebeziumConstants.FLATTENED_TS_COL_NAME, insertRecord))); Option<String> currentSourceSeqOpt = extractSeq(currentRecord); // Pick the current value in storage only if its Seq (source.ts_ms) is latest // compared to the Seq (source.ts_ms) of the insert value return currentSourceSeqOpt.isPresent() && insertSourceSeq.compareTo(currentSourceSeqOpt.get()) < 0; } } ``` 2. Launch DeltaStreamer job via EMR CLI w/ ``` spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/jthaidigsman/hudi-utilities-bundle_2.12-0.12.1.jar \ --target-table db.table \ --target-base-path s3://redacted/hudi/table/ \ --table-type COPY_ON_WRITE \ --source-class org.apache.hudi.utilities.sources.debezium.VitessDebeziumSource \ --payload-class org.apache.hudi.common.model.debezium.VitessDebeziumAvroPayload \ --source-ordering-field _event_origin_ts_ms \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --checkpoint redacted.redacted.redacted,0:0 \ --hoodie-conf hoodie.datasource.write.recordkey.field=col1,col2 \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \ --hoodie-conf hoodie.datasource.write.partitionpath.field="" \ --hoodie-conf bootstrap.servers=redacted:9092 \ --hoodie-conf hoodie.deltastreamer.source.kafka.topic=redacted \ --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://redacted:8080/apis/ccompat/v6/subjects/redacted-value/versions/latest/ \ --hoodie-conf schema.registry.url=http://redacted:8080/apis/ccompat/v6/ \ --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer ``` 3. Observe deserialization error. **Expected behavior** In [RFC-39](https://github.com/apache/hudi/blob/release-0.12.1/rfc/rfc-39/rfc-39.md), its stated that: > Since we change the schema of the incoming record in the source class, we have to provide a schema for the target record. We propose to implement DebeziumAvroSource.java as a RowSource and allow spark to infer the schema of the transformed record. An alternative approach is to implement a DebeziumSchemaRegistryProvider.java class that extends the current SchemaRegistryProvider.java, and implements the method getTargetSchema . It constructs the target schema from the original schema by including only the fields nested within the after field of the original record, along with the meta fields that were actually ingested. From my understanding of this paragraph, at the point in the write operation we're failing at, we should be inferring the schema from the record which has already been deserialized and flattened via `VitessDebeziumSource`. However, adding some logs to the `bytesToAvro` method shows that the `writerSchema` and `readerSchema` are set to the schema of the debezium envelope, rather than the schema of the record we're writing. This goes against my understanding of how the flow should work, as it seems like it's using the target schema returned from the `SchemaRegistryProvider` rather than inferring the schema. I also tried implementing DebeziumSchemaRegistryProvider with a new `getTargetSchema` as mentioned as an alternative in the RFC, and was able to see that the schemas passed to `bytesToAvro` were now set to the formatted schema produced by `DebeziumSchemaRegistryProvider`, confirming that the schema inference flow described by the RFC is not being triggered. Is there some config that needs to be set so that Deltastreamer will write with schema of the record produced by VitessDebeziumSource ? Or am I misunderstanding how this is supposed to work? **Environment Description** * Hudi version : 0.12.1 (also tried on 0.13.0 and 0.14.0 and got the same error) * Spark version : 3.3.0-amzn-1 * Hive version : Hive 3.1.3-amzn-2 * Hadoop version : Hadoop 3.3.3-amzn-1 * EMR version : emr-6.9.0 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : No **Additional context** Debezium Envelope Schema (from schema registry): ``` { "type": "record", "name": "Envelope", "namespace": "redacted.redacted.redacted", "fields": [ { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "col1", "type": "string" }, { "name": "col2", "type": "string" }, { "name": "col3", "type": "string" }, { "name": "col4", "type": "long" }, { "name": "col5", "type": "long" }, { "name": "col6", "type": "string" }, { "name": "col7", "type": "string" }, { "name": "col8", "type": { "type": "int", "connect.type": "int16" } }, { "name": "col9", "type": { "type": "int", "connect.type": "int16" } }, { "name": "col10", "type": { "type": "int", "connect.type": "int16" } }, { "name": "col11", "type": { "type": "int", "connect.type": "int16" } }, { "name": "col12", "type": "string" }, { "name": "col13", "type": "string" }, { "name": "col14", "type": { "type": "int", "connect.type": "int16" } }, { "name": "col15", "type": "string" } ], "connect.name": "sandbox.byuser.channels_members.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, { "name": "source", "type": { "type": "record", "name": "Source", "namespace": "io.debezium.connector.vitess", "fields": [ { "name": "version", "type": "string" }, { "name": "connector", "type": "string" }, { "name": "name", "type": "string" }, { "name": "ts_ms", "type": "long" }, { "name": "snapshot", "type": [ { "type": "string", "connect.version": 1, "connect.parameters": { "allowed": "true,last,false,incremental" }, "connect.default": "false", "connect.name": "io.debezium.data.Enum" }, "null" ], "default": "false" }, { "name": "db", "type": "string" }, { "name": "sequence", "type": [ "null", "string" ], "default": null }, { "name": "keyspace", "type": "string" }, { "name": "table", "type": "string" }, { "name": "vgtid", "type": "string" } ], "connect.name": "io.debezium.connector.vitess.Source" } }, { "name": "op", "type": "string" }, { "name": "ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "transaction", "type": [ "null", { "type": "record", "name": "block", "namespace": "event", "fields": [ { "name": "id", "type": "string" }, { "name": "total_order", "type": "long" }, { "name": "data_collection_order", "type": "long" } ], "connect.version": 1, "connect.name": "event.block" } ], "default": null } ], "connect.version": 1, "connect.name": "redacted.redacted.redacted.Envelope" } ``` From within the `VitessDebeziumSource` logs we can see that the packets from kafka are ingested, deserialized, and flattened properly: ``` 23/02/15 20:40:34 ERROR VitessDebeziumSource: VitessDebeziumSource.processDataset flattenedDataset.showString: +----------------------+-------------------+-------------+-------------+---------------+-----------+------------+---------+-------------+-------+------------+--------------------+----------+--------------+-------------------+----------------------+-------------+ |_change_operation_type|_event_origin_ts_ms|col1 |col2 |col3 |col4 |col5 |col6 |col7 |col8 |col9 |col10 |col11 |col12 |col13 |col14 |col15 | +----------------------+-------------------+-------------+-------------+---------------+-----------+------------+---------+-------------+-------+------------+--------------------+----------+--------------+-------------------+----------------------+-------------+ |c |1676473534195 |1153411137283|1152398039042|1152398038962 |1676473534 |0 |0 |0 |0 |1 |1 |0 |1 |1152398038962 |0 |1152398038962| |c |1676473536740 |1153411137283|1152398039074|1152398038962 |1676473536 |0 |0 |0 |0 |1 |1 |0 |1153411137283 |0 |0 |1152398038962| |u |1676473536792 |1153411137283|1152398039074|1152398038962 |1676473536 |0 |0 |0 |1 |1 |1 |0 |1153411137283 |0 |0 |1152398038962| |c |1676473542584 |1152398039026|1153411137507|1152398039010 |1676473542 |0 |0 |0 |0 |1 |1 |0 |1 |1152398039010 |0 |1152398039010| |c |1676473544236 |1152398039026|1152398039234|1152398039010 |1676473543 |0 |0 |0 |0 |1 |1 |0 |1152398039026 |0 |0 |1152398039010| |u |1676473544286 |1152398039026|1152398039234|1152398039010 |1676473543 |0 |0 |0 |1 |1 |1 |0 |1152398039026 |0 |0 |1152398039010| |c |1676473544326 |1153411137283|1152398039154|1152398038962 |1676473544 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |1152398038962| |c |1676473554065 |1152398039426|1153411137939|1152398039362 |1676473554 |0 |0 |0 |0 |1 |1 |0 |1 |1152398039362 |0 |1152398039362| |c |1676473555721 |1152398039426|1152398039666|1152398039362 |1676473555 |0 |0 |0 |0 |1 |1 |0 |1152398039426 |0 |0 |1152398039362| |u |1676473555815 |1152398039426|1152398039666|1152398039362 |1676473555 |0 |0 |0 |1 |1 |1 |0 |1152398039426 |0 |0 |1152398039362| +----------------------+-------------------+-------------+-------------+---------------+-----------+------------+---------+-------------+-------+------------+--------------------+----------+--------------+-------------------+----------------------+-------------+ ``` But from logs within `avroToBytes` we can see that the `writerSchema` and `readerSchema` are still set to the Debezium Envelope schema, rather than inferring the schema from the flattened dataset: ``` 23/02/15 20:29:49 ERROR HoodieAvroUtils: bytesToAvro writerSchema: {"type":"record","name":"Envelope","namespace":"redacted.redacted.redacted","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"col1","type":"string"},{"name":"col2","type":"string"},{"name":"col3","type":"string"},{"name":"col4","type":"long"},{"name":"col5","type":"long"},{"name":"col6","type":"string"},{"name":"col7","type":"string"},{"name":"col8","type":{"type":"int","connect.type":"int16"}},{"name":"col9","type":{"type":"int","connect.type":"int16"}},{"name":"col10","type":{"type":"int","connect.type":"int16"}},{"name":"col11","type":{"type":"int","connect.type":"int16"}},{"name":"col12","type":"string"},{"name":"col13","type":"string"},{"name":"col14","type":{"type":"int","connect.type":"int16"}},{"name":"col15","type":"string"}],"connect.name":"sandbox.byuser.channels_members.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"so urce","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.vitess","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"keyspace","type":"string"},{"name":"table","type":"string"},{"name":"vgtid","type":"string"}],"connect.name":"io.debezium.connector.vitess.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"block","namespace":"event","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long "}],"connect.version":1,"connect.name":"event.block"}],"default":null}],"connect.version":1,"connect.name":"redacted.redacted.redacted.Envelope"} 23/02/15 20:29:49 ERROR HoodieAvroUtils: bytesToAvro readerSchema: {"type":"record","name":"Envelope","namespace":"redacted.redacted.redacted","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"col1","type":"string"},{"name":"col2","type":"string"},{"name":"col3","type":"string"},{"name":"col4","type":"long"},{"name":"col5","type":"long"},{"name":"col6","type":"string"},{"name":"col7","type":"string"},{"name":"col8","type":{"type":"int","connect.type":"int16"}},{"name":"col9","type":{"type":"int","connect.type":"int16"}},{"name":"col10","type":{"type":"int","connect.type":"int16"}},{"name":"col11","type":{"type":"int","connect.type":"int16"}},{"name":"col12","type":"string"},{"name":"col13","type":"string"},{"name":"col14","type":{"type":"int","connect.type":"int16"}},{"name":"col15","type":"string"}],"connect.name":"sandbox.byuser.channels_members.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"so urce","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.vitess","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"keyspace","type":"string"},{"name":"table","type":"string"},{"name":"vgtid","type":"string"}],"connect.name":"io.debezium.connector.vitess.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"block","namespace":"event","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long "}],"connect.version":1,"connect.name":"event.block"}],"default":null}],"connect.version":1,"connect.name":"redacted.redacted.redacted.Envelope"} ``` **Stacktrace** Driver: ``` Exception in thread "main" org.apache.hudi.exception.HoodieException: Commit 20230215204036257 failed and rolled-back ! at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:653) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:336) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:204) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:202) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:571) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1006) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1095) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1104) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` Executor logs shows many stacktraces, with two primary variations of errors. Negative Length error: ``` 23/02/15 20:29:50 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=col1:1153431876899,col2:1153431871443 partitionPath=}, currentLocation='null', newLocation='null'} org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -13097524020 at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[avro-1.11.0.jar:1.11.0] at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:164) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:152) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertRecord(AbstractDebeziumAvroPayload.java:87) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertValue(AbstractDebeziumAvroPayload.java:58) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_362] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_362] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_362] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362] ``` String too long error: ``` 3/02/15 20:29:50 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=col1:1153415312659,col2:1153415313283 partitionPath=}, currentLocation='null', newLocation='null'} java.lang.UnsupportedOperationException: Cannot read strings longer than 2147483639 bytes at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:305) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[avro-1.11.0.jar:1.11.0] at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:164) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:152) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertRecord(AbstractDebeziumAvroPayload.java:87) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertValue(AbstractDebeziumAvroPayload.java:58) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_362] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_362] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_362] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362] ``` Given that we’ve already deserialized the records once at this point, I think the error message is arising from the mismatch between the schema used to deserialize and the actual bytes, but let me know if you think something else could be going on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
