[ 
https://issues.apache.org/jira/browse/HUDI-3335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Harsha Teja Kanna updated HUDI-3335:
------------------------------------
    Description: 
Have a COW table with metadata enabled. Loading from Spark query fails with 
java.lang.NullPointerException

*Environment*

Spark 3.1.2

Hudi 0.10.1

*Query*

import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.common.config.HoodieMetadataConfig

val basePath = "s3a://datalake-hudi/v1"

 val df = spark.
    read.
    format("org.apache.hudi").
    option(HoodieMetadataConfig.ENABLE.key(), "true").
    option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).
    load(s"${basePath}/sessions/")
 df.createOrReplaceTempView(table)

*Passing an individual partition works though*
val df = spark.
    read.
    format("org.apache.hudi").
    option(HoodieMetadataConfig.ENABLE.key(), "true").
    option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).
    load(s"${basePath}/sessions/date=2022/01/25")
 df.createOrReplaceTempView(table)

*Also, disabling metadata works, but the query taking very long time*
val df = spark.
    read.
    format("org.apache.hudi").
    option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).
    load(s"${basePath}/sessions/")
 df.createOrReplaceTempView(table)

*Loading files with stacktrace:*

  at 
org.sparkproject.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
  at org.sparkproject.guava.cache.LocalCache.put(LocalCache.java:4210)
  at 
org.sparkproject.guava.cache.LocalCache$LocalManualCache.put(LocalCache.java:4804)
  at 
org.apache.spark.sql.execution.datasources.SharedInMemoryCache$$anon$3.putLeafFiles(FileStatusCache.scala:161)
  at 
org.apache.hudi.HoodieFileIndex.$anonfun$loadPartitionPathFiles$4(HoodieFileIndex.scala:631)
  at 
org.apache.hudi.HoodieFileIndex.$anonfun$loadPartitionPathFiles$4$adapted(HoodieFileIndex.scala:629)
  at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468)
  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468)
  at 
org.apache.hudi.HoodieFileIndex.loadPartitionPathFiles(HoodieFileIndex.scala:629)
  at org.apache.hudi.HoodieFileIndex.refresh0(HoodieFileIndex.scala:387)
  at org.apache.hudi.HoodieFileIndex.<init>(HoodieFileIndex.scala:184)
  at org.apache.hudi.DefaultSource.getBaseFileOnlyView(DefaultSource.scala:199)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:119)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:69)
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
  at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
  at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
  at $anonfun$res3$1(<console>:46)
  at $anonfun$res3$1$adapted(<console>:40)
  at scala.collection.Iterator.foreach(Iterator.scala:941)
  at scala.collection.Iterator.foreach$(Iterator.scala:941)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

*Writer config*

**

spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-cores 4 \
--driver-memory 4g \
--executor-cores 4 \
--executor-memory 6g \
--num-executors 8 \
--jars 
s3://datalake/jars/unused-1.0.0.jar,s3://datalake/jars/spark-avro_2.12-3.1.2.jar
 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.sources.parallelPartitionDiscovery.parallelism=25000 \
s3://datalake/jars/hudi-0.10.1/hudi-utilities-bundle_2.12-0.10.1.jar \
--table-type COPY_ON_WRITE \
--source-ordering-field timestamp \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--target-base-path s3a://datalake-hudi/sessions \
--target-table sessions \
--transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--op INSERT \
--hoodie-conf hoodie.clean.automatic=true \
--hoodie-conf hoodie.cleaner.commits.retained=10 \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
--hoodie-conf hoodie.clustering.inline=true \
--hoodie-conf hoodie.clustering.inline.max.commits=5 \
--hoodie-conf 
hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy
 \
--hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=1000 \
--hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=268435456 \
--hoodie-conf 
hoodie.clustering.plan.strategy.sort.columns=survey_dbid,session_dbid \
--hoodie-conf hoodie.clustering.plan.strategy.target.file.max.bytes=536870912 \
--hoodie-conf hoodie.clustering.preserve.commit.metadata=true \
--hoodie-conf hoodie.datasource.hive_sync.database=datalake-hudi \
--hoodie-conf hoodie.datasource.hive_sync.enable=false \
--hoodie-conf hoodie.datasource.hive_sync.ignore_exceptions=true \
--hoodie-conf hoodie.datasource.hive_sync.mode=hms \
--hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.HiveStylePartitionValueExtractor
 \
--hoodie-conf hoodie.datasource.hive_sync.partition_fields=date \
--hoodie-conf hoodie.datasource.hive_sync.table=sessions \
--hoodie-conf hoodie.datasource.hive_sync.use_jdbc=false \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
 \
--hoodie-conf hoodie.datasource.write.operation=insert \
--hoodie-conf hoodie.datasource.write.partitionpath.field=date:TIMESTAMP \
--hoodie-conf hoodie.datasource.write.precombine.field=timestamp \
--hoodie-conf 
hoodie.datasource.write.recordkey.field=session_dbid,question_id,answer \
--hoodie-conf hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy/MM/dd 
\
--hoodie-conf hoodie.deltastreamer.keygen.timebased.input.timezone=GMT \
--hoodie-conf 
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd \
--hoodie-conf hoodie.deltastreamer.keygen.timebased.output.timezone=GMT \
--hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING \
--hoodie-conf 
hoodie.deltastreamer.source.dfs.root=s3://datalake-hudi/raw/parquet/data/sessions/year=2022/month=01/day=26/hour=02
 \
--hoodie-conf 
hoodie.deltastreamer.source.input.selector=org.apache.hudi.utilities.sources.helpers.DFSPathSelector
 \
--hoodie-conf "\"hoodie.deltastreamer.transformer.sql=SELECT question_id, 
answer, to_timestamp(timestamp) as timestamp, session_dbid, survey_dbid, 
date_format(to_timestamp(timestamp), 'yyyy/MM/dd') AS date FROM <SRC> a \"" \
--hoodie-conf hoodie.file.listing.parallelism=256 \
--hoodie-conf hoodie.finalize.write.parallelism=256 \
--hoodie-conf 
hoodie.generate.consistent.timestamp.logical.for.key.generator=true \
--hoodie-conf hoodie.insert.shuffle.parallelism=1000 \
--hoodie-conf hoodie.metadata.enable=true \
--hoodie-conf hoodie.metadata.metrics.enable=true \
--hoodie-conf 
hoodie.metrics.cloudwatch.metric.prefix=emr.datalake.prd.insert.sessions \
--hoodie-conf hoodie.metrics.on=false \
--hoodie-conf hoodie.metrics.reporter.type=CLOUDWATCH \
--hoodie-conf hoodie.parquet.block.size=536870912 \
--hoodie-conf hoodie.parquet.compression.codec=snappy \
--hoodie-conf hoodie.parquet.max.file.size=536870912 \
--hoodie-conf hoodie.parquet.small.file.limit=268435456

 

 

 

**

  was:
Have a COW table with metadata enabled. Loading from Spark query fails with 
java.lang.NullPointerException

*Environment*

Spark 3.1.2

Hudi 0.10.1

*Query*

import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.common.config.HoodieMetadataConfig

val basePath = "s3a://datalake-hudi/v1"

 val df = spark.
    read.
    format("org.apache.hudi").
    option(HoodieMetadataConfig.ENABLE.key(), "true").
    option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).
    load(s"${basePath}/sessions/")
 df.createOrReplaceTempView(table)

*Passing an individual partition works though*
val df = spark.
    read.
    format("org.apache.hudi").
    option(HoodieMetadataConfig.ENABLE.key(), "true").
    option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).
    load(s"${basePath}/sessions/date=2022/01/25")
 df.createOrReplaceTempView(table)

*Also, disabling metadata works, but the query taking very long time*
val df = spark.
    read.
    format("org.apache.hudi").
    option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).
    load(s"${basePath}/sessions/")
 df.createOrReplaceTempView(table)

*Loading files with stacktrace:*

  at 
org.sparkproject.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
  at org.sparkproject.guava.cache.LocalCache.put(LocalCache.java:4210)
  at 
org.sparkproject.guava.cache.LocalCache$LocalManualCache.put(LocalCache.java:4804)
  at 
org.apache.spark.sql.execution.datasources.SharedInMemoryCache$$anon$3.putLeafFiles(FileStatusCache.scala:161)
  at 
org.apache.hudi.HoodieFileIndex.$anonfun$loadPartitionPathFiles$4(HoodieFileIndex.scala:631)
  at 
org.apache.hudi.HoodieFileIndex.$anonfun$loadPartitionPathFiles$4$adapted(HoodieFileIndex.scala:629)
  at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468)
  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468)
  at 
org.apache.hudi.HoodieFileIndex.loadPartitionPathFiles(HoodieFileIndex.scala:629)
  at org.apache.hudi.HoodieFileIndex.refresh0(HoodieFileIndex.scala:387)
  at org.apache.hudi.HoodieFileIndex.<init>(HoodieFileIndex.scala:184)
  at org.apache.hudi.DefaultSource.getBaseFileOnlyView(DefaultSource.scala:199)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:119)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:69)
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
  at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
  at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
  at $anonfun$res3$1(<console>:46)
  at $anonfun$res3$1$adapted(<console>:40)
  at scala.collection.Iterator.foreach(Iterator.scala:941)
  at scala.collection.Iterator.foreach$(Iterator.scala:941)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

*Writer config*

**

spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-cores 4 \
--driver-memory 4g \
--executor-cores 4 \
--executor-memory 6g \
--num-executors 8 \
--jars 
s3://datalake/jars/unused-1.0.0.jar,s3://datalake/jars/spark-avro_2.12-3.1.2.jar
 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.sources.parallelPartitionDiscovery.parallelism=25000 \
s3://datalake/jars/hudi-0.10.1/hudi-utilities-bundle_2.12-0.10.1.jar \
--table-type COPY_ON_WRITE \
--source-ordering-field timestamp \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--target-base-path s3a://datalake-hudi/sessions \
--target-table sessions \
--transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--op INSERT \
--hoodie-conf hoodie.clean.automatic=true \
--hoodie-conf hoodie.cleaner.commits.retained=10 \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
--hoodie-conf hoodie.clustering.inline=true \
--hoodie-conf hoodie.clustering.inline.max.commits=5 \
--hoodie-conf 
hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy
 \
--hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=1000 \
--hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=268435456 \
--hoodie-conf 
hoodie.clustering.plan.strategy.sort.columns=survey_dbid,session_dbid \
--hoodie-conf hoodie.clustering.plan.strategy.target.file.max.bytes=536870912 \
--hoodie-conf hoodie.clustering.preserve.commit.metadata=true \
--hoodie-conf hoodie.datasource.hive_sync.database=datalake-hudi \
--hoodie-conf hoodie.datasource.hive_sync.enable=false \
--hoodie-conf hoodie.datasource.hive_sync.ignore_exceptions=true \
--hoodie-conf hoodie.datasource.hive_sync.mode=hms \
--hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.HiveStylePartitionValueExtractor
 \
--hoodie-conf hoodie.datasource.hive_sync.partition_fields=date \
--hoodie-conf hoodie.datasource.hive_sync.table=sessions \
--hoodie-conf hoodie.datasource.hive_sync.use_jdbc=false \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
 \
--hoodie-conf hoodie.datasource.write.operation=insert \
--hoodie-conf hoodie.datasource.write.partitionpath.field=date:TIMESTAMP \
--hoodie-conf hoodie.datasource.write.precombine.field=timestamp \
--hoodie-conf 
hoodie.datasource.write.recordkey.field=session_dbid,question_id,answer \
--hoodie-conf hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy/MM/dd 
\
--hoodie-conf hoodie.deltastreamer.keygen.timebased.input.timezone=GMT \
--hoodie-conf 
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd \
--hoodie-conf hoodie.deltastreamer.keygen.timebased.output.timezone=GMT \
--hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING \
--hoodie-conf 
hoodie.deltastreamer.source.dfs.root=s3://datalake-hudi/raw/parquet/data/sessions/year=2022/month=01/day=26/hour=02
 \
--hoodie-conf 
hoodie.deltastreamer.source.input.selector=org.apache.hudi.utilities.sources.helpers.DFSPathSelector
 \
--hoodie-conf "\"hoodie.deltastreamer.transformer.sql=SELECT question_id, 
answer, to_timestamp(timestamp) as timestamp, session_dbid, survey_dbid, 
date_format(to_timestamp(timestamp), 'yyyy/MM/dd') AS date FROM <SRC> a \"" \
--hoodie-conf hoodie.file.listing.parallelism=256 \
--hoodie-conf hoodie.finalize.write.parallelism=256 \
--hoodie-conf 
hoodie.generate.consistent.timestamp.logical.for.key.generator=true \
--hoodie-conf hoodie.insert.shuffle.parallelism=1000 \
--hoodie-conf hoodie.metadata.enable=true \
--hoodie-conf hoodie.metadata.metrics.enable=true \
--hoodie-conf 
hoodie.metrics.cloudwatch.metric.prefix=emr.datalake.prd.insert.sessions \
--hoodie-conf hoodie.metrics.on=false \
--hoodie-conf hoodie.metrics.reporter.type=CLOUDWATCH \
--hoodie-conf hoodie.parquet.block.size=536870912 \
--hoodie-conf hoodie.parquet.compression.codec=snappy \
--hoodie-conf hoodie.parquet.max.file.size=536870912 \
--hoodie-conf hoodie.parquet.small.file.limit=268435456


*Commit timeline*

20220125224502471.clean

20220125224502471.clean.inflight

20220125224502471.clean.requested

20220125225810828.clean

20220125225810828.clean.inflight

20220125225810828.clean.requested

20220125230125674.clean

20220125230125674.clean.inflight

20220125230125674.clean.requested

20220125230854957.clean

20220125230854957.clean.inflight

20220125230854957.clean.requested

20220125232236767.clean

20220125232236767.clean.inflight

20220125232236767.clean.requested

20220125232638588.clean

20220125232638588.clean.inflight

20220125232638588.clean.requested

20220125233355290.clean

20220125233355290.clean.inflight

20220125233355290.clean.requested

20220125234539672.clean

20220125234539672.clean.inflight

20220125234539672.clean.requested

20220125234944271.clean

20220125234944271.clean.inflight

20220125234944271.clean.requested

20220125235718218.clean

20220125235718218.clean.inflight

20220125235718218.clean.requested

20220126000225375.clean

20220126000225375.clean.inflight

20220126000225375.clean.requested

20220126000937875.clean

20220126000937875.clean.inflight

20220126000937875.clean.requested

20220126003307449.clean

20220126003307449.clean.inflight

20220126003307449.clean.requested

20220126003617137.clean

20220126003617137.clean.inflight

20220126003617137.clean.requested

20220126004518227.clean

20220126004518227.clean.inflight

20220126004518227.clean.requested

20220126005806798.clean

20220126005806798.clean.inflight

20220126005806798.clean.requested

20220126010011407.commit

20220126010011407.commit.requested

20220126010011407.inflight

20220126010227320.clean

20220126010227320.clean.inflight

20220126010227320.clean.requested

20220126010242754.replacecommit

20220126010242754.replacecommit.inflight

20220126010242754.replacecommit.requested

20220126010800207.commit

20220126010800207.commit.requested

20220126010800207.inflight

20220126010920192.clean

20220126010920192.clean.inflight

20220126010920192.clean.requested

20220126011114529.commit

20220126011114529.commit.requested

20220126011114529.inflight

20220126011230532.clean

20220126011230532.clean.inflight

20220126011230532.clean.requested

20220126011426028.commit

20220126011426028.commit.requested

20220126011426028.inflight

20220126011818299.commit

20220126011818299.commit.requested

20220126011818299.inflight

20220126012003045.clean

20220126012003045.clean.inflight

20220126012003045.clean.requested

20220126012240288.commit

20220126012240288.commit.requested

20220126012240288.inflight

20220126012443455.clean

20220126012443455.clean.inflight

20220126012443455.clean.requested

20220126012508460.replacecommit

20220126012508460.replacecommit.inflight

20220126012508460.replacecommit.requested

20220126013218816.commit

20220126013218816.commit.requested

20220126013218816.inflight

20220126013428875.clean

20220126013428875.clean.inflight

20220126013428875.clean.requested

20220126013648751.commit

20220126013648751.commit.requested

20220126013648751.inflight

20220126013859643.clean

20220126013859643.clean.inflight

20220126013859643.clean.requested

20220126014254294.commit

20220126014254294.commit.requested

20220126014254294.inflight

20220126014516195.clean

20220126014516195.clean.inflight

20220126014516195.clean.requested

20220126014711043.commit

20220126014711043.commit.requested

20220126014711043.inflight

20220126014808898.clean

20220126014808898.clean.inflight

20220126014808898.clean.requested

20220126015008443.commit

20220126015008443.commit.requested

20220126015008443.inflight

20220126015119193.replacecommit

20220126015119193.replacecommit.inflight

20220126015119193.replacecommit.requested

20220126015653770.commit

20220126015653770.commit.requested

20220126015653770.inflight

20220126020011172.commit

20220126020011172.commit.requested

20220126020011172.inflight

20220126020405299.commit

20220126020405299.commit.requested

20220126020405299.inflight

20220126020813841.commit

20220126020813841.commit.requested

20220126020813841.inflight

20220126021002748.clean

20220126021002748.clean.inflight

20220126021002748.clean.requested

20220126021231085.commit

20220126021231085.commit.requested

20220126021231085.inflight

20220126021429124.clean

20220126021429124.clean.inflight

20220126021429124.clean.requested

20220126021445188.replacecommit

20220126021445188.replacecommit.inflight

20220126021445188.replacecommit.requested

20220126021949824.commit

20220126021949824.commit.requested

20220126021949824.inflight

20220126022154561.clean

20220126022154561.clean.inflight

20220126022154561.clean.requested

20220126022523011.commit

20220126022523011.commit.requested

20220126022523011.inflight

20220126023054200.commit

20220126023054200.commit.requested

20220126023054200.inflight

20220126023530250.commit

20220126023530250.commit.requested

20220126023530250.inflight

20220126023637109.clean

20220126023637109.clean.inflight

20220126023637109.clean.requested

20220126024028688.commit

20220126024028688.commit.requested

20220126024028688.inflight

20220126024137627.replacecommit

20220126024137627.replacecommit.inflight

20220126024137627.replacecommit.requested

20220126024720121.commit

20220126024720121.commit.requested

20220126024720121.inflight

archived

hoodie.properties

metadata

 

*Metadata hoodie timeline*

20220125154001455002.clean

20220125154001455002.clean.inflight

20220125154001455002.clean.requested

20220125160751769002.clean

20220125160751769002.clean.inflight

20220125160751769002.clean.requested

20220125163020781002.clean

20220125163020781002.clean.inflight

20220125163020781002.clean.requested

20220125165722170002.clean

20220125165722170002.clean.inflight

20220125165722170002.clean.requested

20220125172016239002.clean

20220125172016239002.clean.inflight

20220125172016239002.clean.requested

20220125174427654002.clean

20220125174427654002.clean.inflight

20220125174427654002.clean.requested

20220125181218237002.clean

20220125181218237002.clean.inflight

20220125181218237002.clean.requested

20220125184343588002.clean

20220125184343588002.clean.inflight

20220125184343588002.clean.requested

20220125191038318002.clean

20220125191038318002.clean.inflight

20220125191038318002.clean.requested

20220125193445223002.clean

20220125193445223002.clean.inflight

20220125193445223002.clean.requested

20220125200741168002.clean

20220125200741168002.clean.inflight

20220125200741168002.clean.requested

20220125203814934002.clean

20220125203814934002.clean.inflight

20220125203814934002.clean.requested

20220125211447323002.clean

20220125211447323002.clean.inflight

20220125211447323002.clean.requested

20220125214421740002.clean

20220125214421740002.clean.inflight

20220125214421740002.clean.requested

20220125221009798002.clean

20220125221009798002.clean.inflight

20220125221009798002.clean.requested

20220125224319264002.clean

20220125224319264002.clean.inflight

20220125224319264002.clean.requested

20220125231128580002.clean

20220125231128580002.clean.inflight

20220125231128580002.clean.requested

20220125234345790002.clean

20220125234345790002.clean.inflight

20220125234345790002.clean.requested

20220126001130415002.clean

20220126001130415002.clean.inflight

20220126001130415002.clean.requested

20220126004341130002.clean

20220126004341130002.clean.inflight

20220126004341130002.clean.requested

20220126011114529002.clean

20220126011114529002.clean.inflight

20220126011114529002.clean.requested

20220126013648751002.clean

20220126013648751002.clean.inflight

20220126013648751002.clean.requested

20220126013859643.deltacommit

20220126013859643.deltacommit.inflight

20220126013859643.deltacommit.requested

20220126014254294.deltacommit

20220126014254294.deltacommit.inflight

20220126014254294.deltacommit.requested

20220126014516195.deltacommit

20220126014516195.deltacommit.inflight

20220126014516195.deltacommit.requested

20220126014711043.deltacommit

20220126014711043.deltacommit.inflight

20220126014711043.deltacommit.requested

20220126014808898.deltacommit

20220126014808898.deltacommit.inflight

20220126014808898.deltacommit.requested

20220126015008443.deltacommit

20220126015008443.deltacommit.inflight

20220126015008443.deltacommit.requested

20220126015119193.deltacommit

20220126015119193.deltacommit.inflight

20220126015119193.deltacommit.requested

20220126015119193001.commit

20220126015119193001.compaction.inflight

20220126015119193001.compaction.requested

20220126015653770.deltacommit

20220126015653770.deltacommit.inflight

20220126015653770.deltacommit.requested

20220126020011172.deltacommit

20220126020011172.deltacommit.inflight

20220126020011172.deltacommit.requested

20220126020405299.deltacommit

20220126020405299.deltacommit.inflight

20220126020405299.deltacommit.requested

20220126020405299002.clean

20220126020405299002.clean.inflight

20220126020405299002.clean.requested

20220126020813841.deltacommit

20220126020813841.deltacommit.inflight

20220126020813841.deltacommit.requested

20220126021002748.deltacommit

20220126021002748.deltacommit.inflight

20220126021002748.deltacommit.requested

20220126021231085.deltacommit

20220126021231085.deltacommit.inflight

20220126021231085.deltacommit.requested

20220126021429124.deltacommit

20220126021429124.deltacommit.inflight

20220126021429124.deltacommit.requested

20220126021445188.deltacommit

20220126021445188.deltacommit.inflight

20220126021445188.deltacommit.requested

20220126021949824.deltacommit

20220126021949824.deltacommit.inflight

20220126021949824.deltacommit.requested

20220126022154561.deltacommit

20220126022154561.deltacommit.inflight

20220126022154561.deltacommit.requested

20220126022154561001.commit

20220126022154561001.compaction.inflight

20220126022154561001.compaction.requested

20220126022523011.deltacommit

20220126022523011.deltacommit.inflight

20220126022523011.deltacommit.requested

20220126023054200.deltacommit

20220126023054200.deltacommit.inflight

20220126023054200.deltacommit.requested

20220126023530250.deltacommit

20220126023530250.deltacommit.inflight

20220126023530250.deltacommit.requested

20220126023530250002.clean

20220126023530250002.clean.inflight

20220126023530250002.clean.requested

20220126023637109.deltacommit

20220126023637109.deltacommit.inflight

20220126023637109.deltacommit.requested

20220126024028688.deltacommit

20220126024028688.deltacommit.inflight

20220126024028688.deltacommit.requested

20220126024137627.deltacommit

20220126024137627.deltacommit.inflight

20220126024137627.deltacommit.requested

20220126024720121.deltacommit

20220126024720121.deltacommit.inflight

20220126024720121.deltacommit.requested

archived

hoodie.properties

 

 

 

**


> Loading Hudi table fails with NullPointerException
> --------------------------------------------------
>
>                 Key: HUDI-3335
>                 URL: https://issues.apache.org/jira/browse/HUDI-3335
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 0.10.1
>            Reporter: Harsha Teja Kanna
>            Priority: Critical
>
> Have a COW table with metadata enabled. Loading from Spark query fails with 
> java.lang.NullPointerException
> *Environment*
> Spark 3.1.2
> Hudi 0.10.1
> *Query*
> import org.apache.hudi.DataSourceReadOptions
> import org.apache.hudi.common.config.HoodieMetadataConfig
> val basePath = "s3a://datalake-hudi/v1"
>  val df = spark.
>     read.
>     format("org.apache.hudi").
>     option(HoodieMetadataConfig.ENABLE.key(), "true").
>     option(DataSourceReadOptions.QUERY_TYPE.key(), 
> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).
>     load(s"${basePath}/sessions/")
>  df.createOrReplaceTempView(table)
> *Passing an individual partition works though*
> val df = spark.
>     read.
>     format("org.apache.hudi").
>     option(HoodieMetadataConfig.ENABLE.key(), "true").
>     option(DataSourceReadOptions.QUERY_TYPE.key(), 
> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).
>     load(s"${basePath}/sessions/date=2022/01/25")
>  df.createOrReplaceTempView(table)
> *Also, disabling metadata works, but the query taking very long time*
> val df = spark.
>     read.
>     format("org.apache.hudi").
>     option(DataSourceReadOptions.QUERY_TYPE.key(), 
> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).
>     load(s"${basePath}/sessions/")
>  df.createOrReplaceTempView(table)
> *Loading files with stacktrace:*
>   at 
> org.sparkproject.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
>   at org.sparkproject.guava.cache.LocalCache.put(LocalCache.java:4210)
>   at 
> org.sparkproject.guava.cache.LocalCache$LocalManualCache.put(LocalCache.java:4804)
>   at 
> org.apache.spark.sql.execution.datasources.SharedInMemoryCache$$anon$3.putLeafFiles(FileStatusCache.scala:161)
>   at 
> org.apache.hudi.HoodieFileIndex.$anonfun$loadPartitionPathFiles$4(HoodieFileIndex.scala:631)
>   at 
> org.apache.hudi.HoodieFileIndex.$anonfun$loadPartitionPathFiles$4$adapted(HoodieFileIndex.scala:629)
>   at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
>   at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468)
>   at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468)
>   at 
> org.apache.hudi.HoodieFileIndex.loadPartitionPathFiles(HoodieFileIndex.scala:629)
>   at org.apache.hudi.HoodieFileIndex.refresh0(HoodieFileIndex.scala:387)
>   at org.apache.hudi.HoodieFileIndex.<init>(HoodieFileIndex.scala:184)
>   at 
> org.apache.hudi.DefaultSource.getBaseFileOnlyView(DefaultSource.scala:199)
>   at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:119)
>   at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:69)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
>   at 
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
>   at 
> org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
>   at scala.Option.getOrElse(Option.scala:189)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
>   at $anonfun$res3$1(<console>:46)
>   at $anonfun$res3$1$adapted(<console>:40)
>   at scala.collection.Iterator.foreach(Iterator.scala:941)
>   at scala.collection.Iterator.foreach$(Iterator.scala:941)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> *Writer config*
> **
> spark-submit \
> --master yarn \
> --deploy-mode cluster \
> --driver-cores 4 \
> --driver-memory 4g \
> --executor-cores 4 \
> --executor-memory 6g \
> --num-executors 8 \
> --jars 
> s3://datalake/jars/unused-1.0.0.jar,s3://datalake/jars/spark-avro_2.12-3.1.2.jar
>  \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.sql.sources.parallelPartitionDiscovery.parallelism=25000 \
> s3://datalake/jars/hudi-0.10.1/hudi-utilities-bundle_2.12-0.10.1.jar \
> --table-type COPY_ON_WRITE \
> --source-ordering-field timestamp \
> --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
> --target-base-path s3a://datalake-hudi/sessions \
> --target-table sessions \
> --transformer-class 
> org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
> --op INSERT \
> --hoodie-conf hoodie.clean.automatic=true \
> --hoodie-conf hoodie.cleaner.commits.retained=10 \
> --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
> --hoodie-conf hoodie.clustering.inline=true \
> --hoodie-conf hoodie.clustering.inline.max.commits=5 \
> --hoodie-conf 
> hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy
>  \
> --hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=1000 \
> --hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=268435456 \
> --hoodie-conf 
> hoodie.clustering.plan.strategy.sort.columns=survey_dbid,session_dbid \
> --hoodie-conf hoodie.clustering.plan.strategy.target.file.max.bytes=536870912 
> \
> --hoodie-conf hoodie.clustering.preserve.commit.metadata=true \
> --hoodie-conf hoodie.datasource.hive_sync.database=datalake-hudi \
> --hoodie-conf hoodie.datasource.hive_sync.enable=false \
> --hoodie-conf hoodie.datasource.hive_sync.ignore_exceptions=true \
> --hoodie-conf hoodie.datasource.hive_sync.mode=hms \
> --hoodie-conf 
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.HiveStylePartitionValueExtractor
>  \
> --hoodie-conf hoodie.datasource.hive_sync.partition_fields=date \
> --hoodie-conf hoodie.datasource.hive_sync.table=sessions \
> --hoodie-conf hoodie.datasource.hive_sync.use_jdbc=false \
> --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
> --hoodie-conf 
> hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
>  \
> --hoodie-conf hoodie.datasource.write.operation=insert \
> --hoodie-conf hoodie.datasource.write.partitionpath.field=date:TIMESTAMP \
> --hoodie-conf hoodie.datasource.write.precombine.field=timestamp \
> --hoodie-conf 
> hoodie.datasource.write.recordkey.field=session_dbid,question_id,answer \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy/MM/dd \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.input.timezone=GMT \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.timezone=GMT \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING \
> --hoodie-conf 
> hoodie.deltastreamer.source.dfs.root=s3://datalake-hudi/raw/parquet/data/sessions/year=2022/month=01/day=26/hour=02
>  \
> --hoodie-conf 
> hoodie.deltastreamer.source.input.selector=org.apache.hudi.utilities.sources.helpers.DFSPathSelector
>  \
> --hoodie-conf "\"hoodie.deltastreamer.transformer.sql=SELECT question_id, 
> answer, to_timestamp(timestamp) as timestamp, session_dbid, survey_dbid, 
> date_format(to_timestamp(timestamp), 'yyyy/MM/dd') AS date FROM <SRC> a \"" \
> --hoodie-conf hoodie.file.listing.parallelism=256 \
> --hoodie-conf hoodie.finalize.write.parallelism=256 \
> --hoodie-conf 
> hoodie.generate.consistent.timestamp.logical.for.key.generator=true \
> --hoodie-conf hoodie.insert.shuffle.parallelism=1000 \
> --hoodie-conf hoodie.metadata.enable=true \
> --hoodie-conf hoodie.metadata.metrics.enable=true \
> --hoodie-conf 
> hoodie.metrics.cloudwatch.metric.prefix=emr.datalake.prd.insert.sessions \
> --hoodie-conf hoodie.metrics.on=false \
> --hoodie-conf hoodie.metrics.reporter.type=CLOUDWATCH \
> --hoodie-conf hoodie.parquet.block.size=536870912 \
> --hoodie-conf hoodie.parquet.compression.codec=snappy \
> --hoodie-conf hoodie.parquet.max.file.size=536870912 \
> --hoodie-conf hoodie.parquet.small.file.limit=268435456
>  
>  
>  
> **



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to