Dan Burkert created KUDU-2518:
---------------------------------
Summary: SparkSQL queries without temporary tables
Key: KUDU-2518
URL: https://issues.apache.org/jira/browse/KUDU-2518
Project: Kudu
Issue Type: Improvement
Components: hms, spark
Affects Versions: 1.7.1
Reporter: Dan Burkert
One long-standing ergonomic issue with the Kudu/SparkSQL integration is the
requirement to register Kudu tables as temp tables before they can be scanned
using a SQL string ({{sql("SELECT * FROM my_kudu_table")}}). Ideally SparkSQL
could query Kudu tables that it discovers via the HMS with no additional
configuration. Yesterday I explored what it would take to get there, and I
found some interesting things.
If the HMS table contains a {{spark.sql.sources.provider}} table property with
a value like {{org.apache.kudu.spark.kudu.DefaultSource}}, SparkSQL will
automatically instantiate the corresponding {{RelationProvider}} class, passing
a {{SQLContext}} and a map of parameters, which it fills in with the table's
HDFS URI, and storage properties. The current plan for Kudu + HMS integration
(KUDU-2191) is not to set any storage properties, instead attributes like
master addresses and table ID will be stored as table properties. As a result,
SparkSQL is instantiating a Kudu {{DefaultSource}}, but it doesn't pass
necessary arguments like the table name or master addresses. Getting this far
required adding a dummy {{org.apache.kudu.hive.KuduStorageHandler}} class to
the classpath so that the Hive client wouldn't choke on the bogus class name.
The stacktrace from Spark attempting to instantiate the {{DefaultSource}} is
provided below.
{code:java}
Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
Spark context available as 'sc' (master = local[*], app id =
local-1532719985143).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sql("DESCRIBE TABLE t1")
org.spark_project.guava.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: Kudu table name must be specified in create
options using key 'kudu.table'. parameters: Map(), parameters-size: 0,
parameters-keys: Set(), path: None
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2263)
at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
at
org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
at
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
at
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
at
org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
... 49 elided
Caused by: java.lang.IllegalArgumentException: Kudu table name must be
specified in create options using key 'kudu.table'. parameters: Map(),
parameters-size: 0, parameters-keys: Set(), path: None
at
org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
at
org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at
org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.getOrElse(CaseInsensitiveMap.scala:28)
at
org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:81)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
at
org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
at
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 96 more
scala>{code}
After striking out with the existing interfaces I looked at the
{{DataSourceRegister}} API which is a part of the {{DataSourceV2}} effort
underway in Spark. It's not clear that this API actually provides more context
when creating relations (we need table name and master addresses from the table
properties and options are still just passed as a map in
{{DataSourceOptions}}), but more significantly it doesn't appear that the
{{spark.sql.sources.provider}} property works correctly with {{DataSourceV2}}
instances, it gives a class cast issue:
{code:java}
Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
Spark context available as 'sc' (master = local[*], app id =
local-1532720634224).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sql("DESCRIBE TABLE t1")
org.apache.spark.sql.AnalysisException: org.apache.kudu.spark.KuduDataSource is
not a valid Spark SQL Data Source.;
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:415)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
at
org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
at
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
at
org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
at
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
at
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
at
org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
... 49 elided
scala>{code}
{{org.apache.kudu.spark.KuduDataSource}} is a dummy class I put on the
classpath and added to the Hive metastore table attribute:
{code:java}
class KuduDataSource extends DataSourceV2
with DataSourceRegister
with ReadSupport
{
override def shortName(): String = "kudu"
override def createReader(options: DataSourceOptions): DataSourceReader = {
new KuduDataSourceReader(options)
}
}
class KuduDataSourceReader(val options: DataSourceOptions) extends
DataSourceReader {
override def readSchema(): StructType = ???
override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] =
???
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)