new datasource
We have written a new Spark DataSource that uses both Parquet and ElasticSearch. It is based on the existing Parquet DataSource. When I look at the filters being pushed down to buildScan I don’t get anything representing any filters based on UDFs – or for any fields generated by an explode – I had thought if I made it a CatalystScan I would get everything I needed. This is fine from the Parquet point of view – but we are using ElasticSearch to index/filter the data we are searching and I need to be able to capture the UDF conditions – or have access to the Plan AST in order that I can construct a query for ElasticSearch. I am thinking I might just need to patch Spark to do this – but I’d prefer not too if there is a way of getting round this without hacking the core code. Any ideas? Thanks James Please consider the environment before printing this email. This message should be regarded as confidential. If you have received this email in error please notify the sender and destroy it immediately. Statements of intent shall only become binding when confirmed in hard copy by an authorised signatory. The contents of this email may relate to dealings with other companies under the control of BAE Systems Applied Intelligence Limited, details of which can be found at http://www.baesystems.com/Businesses/index.htm.
RE: new datasource
Thanks Hao I have written a new Data Source based on ParquetRelation and I have just retested what I had said about not getting anything extra when I change it over to a CatalystScan instead of PrunedFilteredScan and ooops it seems to work fine. From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: 19 November 2015 15:30 To: Green, James (UK Guildford); dev@spark.apache.org Subject: RE: new datasource I think you probably need to write some code as you need to support the ES, there are 2 options per my understanding: Create a new Data Source from scratch, but you probably need to overwrite the interface at: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L751 Or you can reuse most of code in ParquetRelation in the new DataSource, but also need to modify your own logic, see https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L285 Hope it helpful. Hao From: james.gre...@baesystems.com<mailto:james.gre...@baesystems.com> [mailto:james.gre...@baesystems.com] Sent: Thursday, November 19, 2015 11:14 PM To: dev@spark.apache.org<mailto:dev@spark.apache.org> Subject: new datasource We have written a new Spark DataSource that uses both Parquet and ElasticSearch. It is based on the existing Parquet DataSource. When I look at the filters being pushed down to buildScan I don’t get anything representing any filters based on UDFs – or for any fields generated by an explode – I had thought if I made it a CatalystScan I would get everything I needed. This is fine from the Parquet point of view – but we are using ElasticSearch to index/filter the data we are searching and I need to be able to capture the UDF conditions – or have access to the Plan AST in order that I can construct a query for ElasticSearch. I am thinking I might just need to patch Spark to do this – but I’d prefer not too if there is a way of getting round this without hacking the core code. Any ideas? Thanks James Please consider the environment before printing this email. This message should be regarded as confidential. If you have received this email in error please notify the sender and destroy it immediately. Statements of intent shall only become binding when confirmed in hard copy by an authorised signatory. The contents of this email may relate to dealings with other companies under the control of BAE Systems Applied Intelligence Limited, details of which can be found at http://www.baesystems.com/Businesses/index.htm. Please consider the environment before printing this email. This message should be regarded as confidential. If you have received this email in error please notify the sender and destroy it immediately. Statements of intent shall only become binding when confirmed in hard copy by an authorised signatory. The contents of this email may relate to dealings with other companies under the control of BAE Systems Applied Intelligence Limited, details of which can be found at http://www.baesystems.com/Businesses/index.htm.
spark hivethriftserver problem on 1.5.0 -> 1.6.0 upgrade
Hi I posted this on the user list yesterday, I am posting it here now because on further investigation I am pretty sure this is a bug: On upgrade from 1.5.0 to 1.6.0 I have a problem with the hivethriftserver2, I have this code: val hiveContext = new HiveContext(SparkContext.getOrCreate(conf)); val thing = hiveContext.read.parquet("hdfs://dkclusterm1.imp.net:8020/user/jegreen1/ex208") thing.registerTempTable("thing") HiveThriftServer2.startWithContext(hiveContext) When I start things up on the cluster my hive-site.xml is found – I can see that the metastore connects: INFO metastore - Trying to connect to metastore with URI thrift://dkclusterm2.imp.net:9083 INFO metastore - Connected to metastore. But then later on the thrift server seems not to connect to the remote hive metastore but to start a derby instance instead: INFO AbstractService - Service:CLIService is started. INFO ObjectStore - ObjectStore, initialize called INFO Query - Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing INFO MetaStoreDirectSql - Using direct SQL, underlying DB is DERBY INFO ObjectStore - Initialized ObjectStore INFO HiveMetaStore - 0: get_databases: default INFO audit - ugi=jegreen1 ip=unknown-ip-addr cmd=get_databases: default INFO HiveMetaStore - 0: Shutting down the object store... INFO audit - ugi=jegreen1 ip=unknown-ip-addr cmd=Shutting down the object store... INFO HiveMetaStore - 0: Metastore shutdown complete. INFO audit - ugi=jegreen1 ip=unknown-ip-addr cmd=Metastore shutdown complete. INFO AbstractService - Service:ThriftBinaryCLIService is started. INFO AbstractService - Service:HiveServer2 is started. On 1.5.0 the same bit of the log reads: INFO AbstractService - Service:CLIService is started. INFO metastore - Trying to connect to metastore with URI thrift://dkclusterm2.imp.net:9083 *** ie 1.5.0 connects to remote hive INFO metastore - Connected to metastore. INFO AbstractService - Service:ThriftBinaryCLIService is started. INFO AbstractService - Service:HiveServer2 is started. INFO ThriftCLIService - Starting ThriftBinaryCLIService on port 1 with 5...500 worker threads So if I connect to this with JDBC I can see all the tables on the hive server – but not anything temporary – I guess they are going to derby. I see someone on the databricks website is also having this problem. Thanks James Please consider the environment before printing this email. This message should be regarded as confidential. If you have received this email in error please notify the sender and destroy it immediately. Statements of intent shall only become binding when confirmed in hard copy by an authorised signatory. The contents of this email may relate to dealings with other companies under the control of BAE Systems Applied Intelligence Limited, details of which can be found at http://www.baesystems.com/Businesses/index.htm.
RE: spark hivethriftserver problem on 1.5.0 -> 1.6.0 upgrade
b8da62/_tmp_space.db INFO ParquetRelation - Listing hdfs://dkclusterm1.imp.net:8020/user/jegreen1/ex208 on driver INFO SparkContext - Starting job: parquet at ThriftTest.scala:39 INFO DAGScheduler - Got job 0 (parquet at ThriftTest.scala:39) with 32 output partitions INFO DAGScheduler - Final stage: ResultStage 0 (parquet at ThriftTest.scala:39) INFO DAGScheduler - Parents of final stage: List() INFO DAGScheduler - Missing parents: List() INFO DAGScheduler - Submitting ResultStage 0 (MapPartitionsRDD[1] at parquet at ThriftTest.scala:39), which has no missing parents INFO MemoryStore - Block broadcast_0 stored as values in memory (estimated size 65.5 KB, free 65.5 KB) INFO MemoryStore - Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.9 KB, free 88.3 KB) INFO BlockManagerInfo - Added broadcast_0_piece0 in memory on localhost:41220 (size: 22.9 KB, free: 511.1 MB) INFO SparkContext - Created broadcast 0 from broadcast at DAGScheduler.scala:1006 INFO DAGScheduler - Submitting 32 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at parquet at ThriftTest.scala:39) INFO TaskSchedulerImpl - Adding task set 0.0 with 32 tasks INFO TaskSetManager - Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 6528 bytes) INFO TaskSetManager - Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 6528 bytes) INFO TaskSetManager - Starting task 2.0 in stage 0.0 (TID 2, localhost, partition 2,PROCESS_LOCAL, 6528 bytes) INFO TaskSetManager - Starting task 3.0 in stage 0.0 (TID 3, localhost, partition 3,PROCESS_LOCAL, 6528 bytes) INFO TaskSetManager - Starting task 4.0 in stage 0.0 (TID 4, localhost, partition 4,PROCESS_LOCAL, 6528 bytes) INFO TaskSetManager - Starting task 5.0 in stage 0.0 (TID 5, localhost, partition 5,PROCESS_LOCAL, 6528 bytes) From: Yin Huai [mailto:yh...@databricks.com] Sent: 26 January 2016 17:48 To: Green, James (UK Guildford) Cc: dev@spark.apache.org Subject: Re: spark hivethriftserver problem on 1.5.0 -> 1.6.0 upgrade Can you post more logs, specially lines around "Initializing execution hive ..." (this is for an internal used fake metastore and it is derby) and "Initializing HiveMetastoreConnection version ..." (this is for the real metastore. It should be your remote one)? Also, those temp tables are stored in the memory and are associated with a HiveContext. If you can not see temp tables, it usually means that the HiveContext that you used with JDBC was different from the one used to create the temp table. However, in your case, you are using HiveThriftServer2.startWithContext(hiveContext). So, it will be good to provide more logs and see what happened. Thanks, Yin On Tue, Jan 26, 2016 at 1:33 AM, james.gre...@baesystems.com<mailto:james.gre...@baesystems.com> mailto:james.gre...@baesystems.com>> wrote: Hi I posted this on the user list yesterday, I am posting it here now because on further investigation I am pretty sure this is a bug: On upgrade from 1.5.0 to 1.6.0 I have a problem with the hivethriftserver2, I have this code: val hiveContext = new HiveContext(SparkContext.getOrCreate(conf)); val thing = hiveContext.read.parquet("hdfs://dkclusterm1.imp.net:8020/user/jegreen1/ex208<http://dkclusterm1.imp.net:8020/user/jegreen1/ex208>") thing.registerTempTable("thing") HiveThriftServer2.startWithContext(hiveContext) When I start things up on the cluster my hive-site.xml is found – I can see that the metastore connects: INFO metastore - Trying to connect to metastore with URI thrift://dkclusterm2.imp.net:9083<http://dkclusterm2.imp.net:9083> INFO metastore - Connected to metastore. But then later on the thrift server seems not to connect to the remote hive metastore but to start a derby instance instead: INFO AbstractService - Service:CLIService is started. INFO ObjectStore - ObjectStore, initialize called INFO Query - Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0<mailto:org.datanucleus.store.rdbms.query.SQLQuery@0>" since the connection used is closing INFO MetaStoreDirectSql - Using direct SQL, underlying DB is DERBY INFO ObjectStore - Initialized ObjectStore INFO HiveMetaStore - 0: get_databases: default INFO audit - ugi=jegreen1 ip=unknown-ip-addr cmd=get_databases: default INFO HiveMetaStore - 0: Shutting down the object store... INFO audit - ugi=jegreen1 ip=unknown-ip-addr cmd=Shutting down the object store... INFO HiveMetaStore - 0: Metastore shutdown complete. INFO audit - ugi=jegreen1 ip=unknown-ip-addr cmd=Metastore shutdown complete. INFO AbstractService - Service:ThriftBinaryCLIService is started. INFO AbstractService - Service:HiveServer2 is started. On 1.5.0 the same bit of the log reads: INFO AbstractService - Service:CLIService is started. INFO metastore - Trying to conne
RE: spark hivethriftserver problem on 1.5.0 -> 1.6.0 upgrade
I have a workaround for this issue which is to go back to single session mode for the thrift server: conf.set("spark.sql.hive.thriftServer.singleSession", "true") This seems to mean that temp tables can be registered in 1.6.0 with a remote metastore. Cheers James From: Yin Huai [mailto:yh...@databricks.com] Sent: 26 January 2016 17:48 To: Green, James (UK Guildford) Cc: dev@spark.apache.org Subject: Re: spark hivethriftserver problem on 1.5.0 -> 1.6.0 upgrade Can you post more logs, specially lines around "Initializing execution hive ..." (this is for an internal used fake metastore and it is derby) and "Initializing HiveMetastoreConnection version ..." (this is for the real metastore. It should be your remote one)? Also, those temp tables are stored in the memory and are associated with a HiveContext. If you can not see temp tables, it usually means that the HiveContext that you used with JDBC was different from the one used to create the temp table. However, in your case, you are using HiveThriftServer2.startWithContext(hiveContext). So, it will be good to provide more logs and see what happened. Thanks, Yin On Tue, Jan 26, 2016 at 1:33 AM, james.gre...@baesystems.com<mailto:james.gre...@baesystems.com> mailto:james.gre...@baesystems.com>> wrote: Hi I posted this on the user list yesterday, I am posting it here now because on further investigation I am pretty sure this is a bug: On upgrade from 1.5.0 to 1.6.0 I have a problem with the hivethriftserver2, I have this code: val hiveContext = new HiveContext(SparkContext.getOrCreate(conf)); val thing = hiveContext.read.parquet("hdfs://dkclusterm1.imp.net:8020/user/jegreen1/ex208<http://dkclusterm1.imp.net:8020/user/jegreen1/ex208>") thing.registerTempTable("thing") HiveThriftServer2.startWithContext(hiveContext) When I start things up on the cluster my hive-site.xml is found – I can see that the metastore connects: INFO metastore - Trying to connect to metastore with URI thrift://dkclusterm2.imp.net:9083<http://dkclusterm2.imp.net:9083> INFO metastore - Connected to metastore. But then later on the thrift server seems not to connect to the remote hive metastore but to start a derby instance instead: INFO AbstractService - Service:CLIService is started. INFO ObjectStore - ObjectStore, initialize called INFO Query - Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0<mailto:org.datanucleus.store.rdbms.query.SQLQuery@0>" since the connection used is closing INFO MetaStoreDirectSql - Using direct SQL, underlying DB is DERBY INFO ObjectStore - Initialized ObjectStore INFO HiveMetaStore - 0: get_databases: default INFO audit - ugi=jegreen1 ip=unknown-ip-addr cmd=get_databases: default INFO HiveMetaStore - 0: Shutting down the object store... INFO audit - ugi=jegreen1 ip=unknown-ip-addr cmd=Shutting down the object store... INFO HiveMetaStore - 0: Metastore shutdown complete. INFO audit - ugi=jegreen1 ip=unknown-ip-addr cmd=Metastore shutdown complete. INFO AbstractService - Service:ThriftBinaryCLIService is started. INFO AbstractService - Service:HiveServer2 is started. On 1.5.0 the same bit of the log reads: INFO AbstractService - Service:CLIService is started. INFO metastore - Trying to connect to metastore with URI thrift://dkclusterm2.imp.net:9083<http://dkclusterm2.imp.net:9083> *** ie 1.5.0 connects to remote hive INFO metastore - Connected to metastore. INFO AbstractService - Service:ThriftBinaryCLIService is started. INFO AbstractService - Service:HiveServer2 is started. INFO ThriftCLIService - Starting ThriftBinaryCLIService on port 1 with 5...500 worker threads So if I connect to this with JDBC I can see all the tables on the hive server – but not anything temporary – I guess they are going to derby. I see someone on the databricks website is also having this problem. Thanks James Please consider the environment before printing this email. This message should be regarded as confidential. If you have received this email in error please notify the sender and destroy it immediately. Statements of intent shall only become binding when confirmed in hard copy by an authorised signatory. The contents of this email may relate to dealings with other companies under the control of BAE Systems Applied Intelligence Limited, details of which can be found at http://www.baesystems.com/Businesses/index.htm. Please consider the environment before printing this email. This message should be regarded as confidential. If you have received this email in error please notify the sender and destroy it immediately. Statements of intent shall only become binding when confirmed in hard copy by an authorised signatory. The contents of this email may relate to dealings with other companies under the control of BAE