zyclove commented on issue #9902:
URL: https://github.com/apache/hudi/issues/9902#issuecomment-1780588181
This issue should add follow .
// set latest schema
if (StringUtils.isNullOrEmpty(avroSchema)) {
avroSchema = latestHistorySchema;
}
```
Caused by: org.apache.avro.SchemaParseException: Cannot parse <null> schema
at org.apache.avro.Schema.parse(Schema.java:1633)
at org.apache.avro.Schema$Parser.parse(Schema.java:1430)
at org.apache.avro.Schema$Parser.parse(Schema.java:1418)
at
org.apache.hudi.common.util.InternalSchemaCache.getInternalSchemaByVersionId(InternalSchemaCache.java:220)
at
org.apache.hudi.common.util.InternalSchemaCache.getInternalSchemaByVersionId(InternalSchemaCache.java:226)
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.composeSchemaEvolutionTransformer(HoodieMergeHelper.java:177)
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:94)
```
```java
public static InternalSchema getInternalSchemaByVersionId(long versionId,
String tablePath, Configuration hadoopConf, String validCommits) {
String avroSchema = "";
Set<String> commitSet =
Arrays.stream(validCommits.split(",")).collect(Collectors.toSet());
List<String> validateCommitList =
commitSet.stream().map(HoodieInstant::extractTimestamp).collect(Collectors.toList());
FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
Path hoodieMetaPath = new Path(tablePath,
HoodieTableMetaClient.METAFOLDER_NAME);
//step1:
Path candidateCommitFile = commitSet.stream().filter(fileName ->
HoodieInstant.extractTimestamp(fileName).equals(versionId + ""))
.findFirst().map(f -> new Path(hoodieMetaPath,
f)).orElse(null);
if (candidateCommitFile != null) {
try {
byte[] data;
try (FSDataInputStream is = fs.open(candidateCommitFile)) {
data = FileIOUtils.readAsByteArray(is);
} catch (IOException e) {
throw e;
}
HoodieCommitMetadata metadata =
HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
String latestInternalSchemaStr =
metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
avroSchema =
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
if (latestInternalSchemaStr != null) {
return
SerDeHelper.fromJson(latestInternalSchemaStr).orElse(null);
}
} catch (Exception e1) {
// swallow this exception.
LOG.warn(String.format("Cannot find internal schema from
commit file %s. Falling back to parsing historical internal schema",
candidateCommitFile.toString()));
}
}
// step2:
FileBasedInternalSchemaStorageManager
fileBasedInternalSchemaStorageManager = new
FileBasedInternalSchemaStorageManager(hadoopConf, new Path(tablePath));
String latestHistorySchema =
fileBasedInternalSchemaStorageManager.getHistorySchemaStrByGivenValidCommits(validateCommitList);
if (latestHistorySchema.isEmpty()) {
return InternalSchema.getEmptyInternalSchema();
}
InternalSchema fileSchema =
InternalSchemaUtils.searchSchema(versionId,
SerDeHelper.parseSchemas(latestHistorySchema));
**// set latest schema
if (StringUtils.isNullOrEmpty(avroSchema)) {
avroSchema = latestHistorySchema;
}**
// step3:
return fileSchema.isEmptySchema() ?
AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(avroSchema))) : fileSchema;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]