[
https://issues.apache.org/jira/browse/HUDI-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Chen updated HUDI-2374:
-----------------------------
Fix Version/s: 0.11.0
(was: 0.10.0)
> AvroDFSSource does not use the overridden schema to deserialize Avro binaries.
> ------------------------------------------------------------------------------
>
> Key: HUDI-2374
> URL: https://issues.apache.org/jira/browse/HUDI-2374
> Project: Apache Hudi
> Issue Type: Bug
> Components: DeltaStreamer
> Affects Versions: 0.9.0
> Reporter: Xuan Huy Pham
> Assignee: Alexey Kudinkin
> Priority: Major
> Labels: sev:critical
> Fix For: 0.11.0
>
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> Hi,
> I am not sure if the AvroDFSSource is intended to ignore the source schema
> from designated schema provider class, but the current logic always uses the
> Avro writer schema as reader schema.
> Logic as of release-0.9.0, Class:
> {{org.apache.hudi.utilities.sources.AvroDFSSource}}
> {code:java}
> public class AvroDFSSource extends AvroSource {
> private final DFSPathSelector pathSelector;
> public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext,
> SparkSession sparkSession,
> SchemaProvider schemaProvider) throws IOException {
> super(props, sparkContext, sparkSession, schemaProvider);
> this.pathSelector = DFSPathSelector
> .createSourceSelector(props, sparkContext.hadoopConfiguration());
> }
> @Override
> protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String>
> lastCkptStr, long sourceLimit) {
> Pair<Option<String>, String> selectPathsWithMaxModificationTime =
> pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext,
> lastCkptStr, sourceLimit);
> return selectPathsWithMaxModificationTime.getLeft()
> .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)),
> selectPathsWithMaxModificationTime.getRight()))
> .orElseGet(() -> new InputBatch<>(Option.empty(),
> selectPathsWithMaxModificationTime.getRight()));
> }
> private JavaRDD<GenericRecord> fromFiles(String pathStr) {
> sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch Avro
> data from files");
> JavaPairRDD<AvroKey, NullWritable> avroRDD =
> sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
> AvroKey.class, NullWritable.class,
> sparkContext.hadoopConfiguration());
> return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
> }
> }
> {code}
> The {{schemaProvider}} parameter is completely ignored in the constructor,
> making {{AvroKeyInputFormat}} always use writer schema to read.
> As a result, we often see this from DeltaStream logs:
> {code:java}
> 21/08/30 10:17:24 WARN AvroKeyInputFormat: Reader schema was not set. Use
> AvroJob.setInputKeySchema() if desired.
> 21/08/30 10:17:24 INFO AvroKeyInputFormat: Using a reader schema equal to the
> writer schema.
> {code}
> This [https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer] is a
> nice blog writing for AvroKafkaSource that supports BACKWARD_TRANSITIVE
> schema evolution. For DFS data, I see this is the main blocker. If we pass
> the source schema from {{schemaProvider}}, we should be able to have the same
> BACKWARD_TRANSITIVE schema evolution feature for DFS avro data.
>
> Suggested Fix: Pass the source schema from {{schemaProvider}} to hadoop
> configuration key {{avro.schema.input.key}}
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)