new datasource

2015-11-19 Thread james.gre...@baesystems.com


We have written a new Spark DataSource that uses both Parquet and 
ElasticSearch.  It is based on the existing Parquet DataSource.   When I look 
at the filters being pushed down to buildScan I don’t get anything representing 
any filters based on UDFs – or for any fields generated by an explode – I had 
thought if I made it a CatalystScan I would get everything I needed.



This is fine from the Parquet point of view – but we are using ElasticSearch to 
index/filter the data we are searching and I need to be able to capture the UDF 
conditions – or have access to the Plan AST in order that I can construct a 
query for ElasticSearch.



I am thinking I might just need to patch Spark to do this – but I’d prefer not 
too if there is a way of getting round this without hacking the core code.  Any 
ideas?



Thanks



James



Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under the 
control of BAE Systems Applied Intelligence Limited, details of which can be 
found at http://www.baesystems.com/Businesses/index.htm.


RE: new datasource

2015-11-19 Thread james.gre...@baesystems.com
Thanks Hao

I have written a new Data Source based on ParquetRelation and I have just 
retested what I had said about not getting anything extra when I change it over 
to a CatalystScan instead of PrunedFilteredScan and ooops it seems to work fine.





From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: 19 November 2015 15:30
To: Green, James (UK Guildford); dev@spark.apache.org
Subject: RE: new datasource

I think you probably need to write some code as you need to support the ES, 
there are 2 options per my understanding:

Create a new Data Source from scratch, but you probably need to overwrite the 
interface at:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L751

Or you can reuse most of code in ParquetRelation in the new DataSource, but 
also need to modify your own logic, see
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L285

Hope it helpful.

Hao
From: james.gre...@baesystems.com<mailto:james.gre...@baesystems.com> 
[mailto:james.gre...@baesystems.com]
Sent: Thursday, November 19, 2015 11:14 PM
To: dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: new datasource



We have written a new Spark DataSource that uses both Parquet and 
ElasticSearch.  It is based on the existing Parquet DataSource.   When I look 
at the filters being pushed down to buildScan I don’t get anything representing 
any filters based on UDFs – or for any fields generated by an explode – I had 
thought if I made it a CatalystScan I would get everything I needed.



This is fine from the Parquet point of view – but we are using ElasticSearch to 
index/filter the data we are searching and I need to be able to capture the UDF 
conditions – or have access to the Plan AST in order that I can construct a 
query for ElasticSearch.



I am thinking I might just need to patch Spark to do this – but I’d prefer not 
too if there is a way of getting round this without hacking the core code.  Any 
ideas?



Thanks



James


Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under the 
control of BAE Systems Applied Intelligence Limited, details of which can be 
found at http://www.baesystems.com/Businesses/index.htm.
Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under the 
control of BAE Systems Applied Intelligence Limited, details of which can be 
found at http://www.baesystems.com/Businesses/index.htm.


spark hivethriftserver problem on 1.5.0 -> 1.6.0 upgrade

2016-01-26 Thread james.gre...@baesystems.com
Hi

I posted this on the user list yesterday,  I am posting it here now because on 
further investigation I am pretty sure this is a bug:


On upgrade from 1.5.0 to 1.6.0 I have a problem with the hivethriftserver2, I 
have this code:

val hiveContext = new HiveContext(SparkContext.getOrCreate(conf));

val thing = 
hiveContext.read.parquet("hdfs://dkclusterm1.imp.net:8020/user/jegreen1/ex208")

thing.registerTempTable("thing")

HiveThriftServer2.startWithContext(hiveContext)


When I start things up on the cluster my hive-site.xml is found – I can see 
that the metastore connects:


INFO  metastore - Trying to connect to metastore with URI 
thrift://dkclusterm2.imp.net:9083
INFO  metastore - Connected to metastore.


But then later on the thrift server seems not to connect to the remote hive 
metastore but to start a derby instance instead:

INFO  AbstractService - Service:CLIService is started.
INFO  ObjectStore - ObjectStore, initialize called
INFO  Query - Reading in results for query 
"org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is 
closing
INFO  MetaStoreDirectSql - Using direct SQL, underlying DB is DERBY
INFO  ObjectStore - Initialized ObjectStore
INFO  HiveMetaStore - 0: get_databases: default
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=get_databases: 
default
INFO  HiveMetaStore - 0: Shutting down the object store...
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=Shutting down the 
object store...
INFO  HiveMetaStore - 0: Metastore shutdown complete.
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=Metastore shutdown 
complete.
INFO  AbstractService - Service:ThriftBinaryCLIService is started.
INFO  AbstractService - Service:HiveServer2 is started.

On 1.5.0 the same bit of the log reads:

INFO  AbstractService - Service:CLIService is started.
INFO  metastore - Trying to connect to metastore with URI 
thrift://dkclusterm2.imp.net:9083  *** ie 1.5.0 connects to remote hive
INFO  metastore - Connected to metastore.
INFO  AbstractService - Service:ThriftBinaryCLIService is started.
INFO  AbstractService - Service:HiveServer2 is started.
INFO  ThriftCLIService - Starting ThriftBinaryCLIService on port 1 with 
5...500 worker threads



So if I connect to this with JDBC I can see all the tables on the hive server – 
but not anything temporary – I guess they are going to derby.

I see someone on the databricks website is also having this problem.


Thanks

James
Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under the 
control of BAE Systems Applied Intelligence Limited, details of which can be 
found at http://www.baesystems.com/Businesses/index.htm.


RE: spark hivethriftserver problem on 1.5.0 -> 1.6.0 upgrade

2016-01-27 Thread james.gre...@baesystems.com
b8da62/_tmp_space.db
INFO  ParquetRelation - Listing 
hdfs://dkclusterm1.imp.net:8020/user/jegreen1/ex208 on driver
INFO  SparkContext - Starting job: parquet at ThriftTest.scala:39
INFO  DAGScheduler - Got job 0 (parquet at ThriftTest.scala:39) with 32 output 
partitions
INFO  DAGScheduler - Final stage: ResultStage 0 (parquet at ThriftTest.scala:39)
INFO  DAGScheduler - Parents of final stage: List()
INFO  DAGScheduler - Missing parents: List()
INFO  DAGScheduler - Submitting ResultStage 0 (MapPartitionsRDD[1] at parquet 
at ThriftTest.scala:39), which has no missing parents
INFO  MemoryStore - Block broadcast_0 stored as values in memory (estimated 
size 65.5 KB, free 65.5 KB)
INFO  MemoryStore - Block broadcast_0_piece0 stored as bytes in memory 
(estimated size 22.9 KB, free 88.3 KB)
INFO  BlockManagerInfo - Added broadcast_0_piece0 in memory on localhost:41220 
(size: 22.9 KB, free: 511.1 MB)
INFO  SparkContext - Created broadcast 0 from broadcast at 
DAGScheduler.scala:1006
INFO  DAGScheduler - Submitting 32 missing tasks from ResultStage 0 
(MapPartitionsRDD[1] at parquet at ThriftTest.scala:39)
INFO  TaskSchedulerImpl - Adding task set 0.0 with 32 tasks
INFO  TaskSetManager - Starting task 0.0 in stage 0.0 (TID 0, localhost, 
partition 0,PROCESS_LOCAL, 6528 bytes)
INFO  TaskSetManager - Starting task 1.0 in stage 0.0 (TID 1, localhost, 
partition 1,PROCESS_LOCAL, 6528 bytes)
INFO  TaskSetManager - Starting task 2.0 in stage 0.0 (TID 2, localhost, 
partition 2,PROCESS_LOCAL, 6528 bytes)
INFO  TaskSetManager - Starting task 3.0 in stage 0.0 (TID 3, localhost, 
partition 3,PROCESS_LOCAL, 6528 bytes)
INFO  TaskSetManager - Starting task 4.0 in stage 0.0 (TID 4, localhost, 
partition 4,PROCESS_LOCAL, 6528 bytes)
INFO  TaskSetManager - Starting task 5.0 in stage 0.0 (TID 5, localhost, 
partition 5,PROCESS_LOCAL, 6528 bytes)


From: Yin Huai [mailto:yh...@databricks.com]
Sent: 26 January 2016 17:48
To: Green, James (UK Guildford)
Cc: dev@spark.apache.org
Subject: Re: spark hivethriftserver problem on 1.5.0 -> 1.6.0 upgrade

Can you post more logs, specially lines around "Initializing execution hive 
..." (this is for an internal used fake metastore and it is derby) and 
"Initializing HiveMetastoreConnection version ..." (this is for the real 
metastore. It should be your remote one)? Also, those temp tables are stored in 
the memory and are associated with a HiveContext. If you can not see temp 
tables, it usually means that the HiveContext that you used with JDBC was 
different from the one used to create the temp table. However, in your case, 
you are using HiveThriftServer2.startWithContext(hiveContext). So, it will be 
good to provide more logs and see what happened.

Thanks,

Yin

On Tue, Jan 26, 2016 at 1:33 AM, 
james.gre...@baesystems.com<mailto:james.gre...@baesystems.com> 
mailto:james.gre...@baesystems.com>> wrote:
Hi

I posted this on the user list yesterday,  I am posting it here now because on 
further investigation I am pretty sure this is a bug:


On upgrade from 1.5.0 to 1.6.0 I have a problem with the hivethriftserver2, I 
have this code:

val hiveContext = new HiveContext(SparkContext.getOrCreate(conf));

val thing = 
hiveContext.read.parquet("hdfs://dkclusterm1.imp.net:8020/user/jegreen1/ex208<http://dkclusterm1.imp.net:8020/user/jegreen1/ex208>")

thing.registerTempTable("thing")

HiveThriftServer2.startWithContext(hiveContext)


When I start things up on the cluster my hive-site.xml is found – I can see 
that the metastore connects:


INFO  metastore - Trying to connect to metastore with URI 
thrift://dkclusterm2.imp.net:9083<http://dkclusterm2.imp.net:9083>
INFO  metastore - Connected to metastore.


But then later on the thrift server seems not to connect to the remote hive 
metastore but to start a derby instance instead:

INFO  AbstractService - Service:CLIService is started.
INFO  ObjectStore - ObjectStore, initialize called
INFO  Query - Reading in results for query 
"org.datanucleus.store.rdbms.query.SQLQuery@0<mailto:org.datanucleus.store.rdbms.query.SQLQuery@0>"
 since the connection used is closing
INFO  MetaStoreDirectSql - Using direct SQL, underlying DB is DERBY
INFO  ObjectStore - Initialized ObjectStore
INFO  HiveMetaStore - 0: get_databases: default
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=get_databases: 
default
INFO  HiveMetaStore - 0: Shutting down the object store...
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=Shutting down the 
object store...
INFO  HiveMetaStore - 0: Metastore shutdown complete.
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=Metastore shutdown 
complete.
INFO  AbstractService - Service:ThriftBinaryCLIService is started.
INFO  AbstractService - Service:HiveServer2 is started.

On 1.5.0 the same bit of the log reads:

INFO  AbstractService - Service:CLIService is started.
INFO  metastore - Trying to conne

RE: spark hivethriftserver problem on 1.5.0 -> 1.6.0 upgrade

2016-02-03 Thread james.gre...@baesystems.com
I have a workaround for this issue which is to go back to single session mode 
for the thrift server:

conf.set("spark.sql.hive.thriftServer.singleSession", "true")

This seems to mean that temp tables can be registered in 1.6.0 with a remote 
metastore.

Cheers

James



From: Yin Huai [mailto:yh...@databricks.com]
Sent: 26 January 2016 17:48
To: Green, James (UK Guildford)
Cc: dev@spark.apache.org
Subject: Re: spark hivethriftserver problem on 1.5.0 -> 1.6.0 upgrade

Can you post more logs, specially lines around "Initializing execution hive 
..." (this is for an internal used fake metastore and it is derby) and 
"Initializing HiveMetastoreConnection version ..." (this is for the real 
metastore. It should be your remote one)? Also, those temp tables are stored in 
the memory and are associated with a HiveContext. If you can not see temp 
tables, it usually means that the HiveContext that you used with JDBC was 
different from the one used to create the temp table. However, in your case, 
you are using HiveThriftServer2.startWithContext(hiveContext). So, it will be 
good to provide more logs and see what happened.

Thanks,

Yin

On Tue, Jan 26, 2016 at 1:33 AM, 
james.gre...@baesystems.com<mailto:james.gre...@baesystems.com> 
mailto:james.gre...@baesystems.com>> wrote:
Hi

I posted this on the user list yesterday,  I am posting it here now because on 
further investigation I am pretty sure this is a bug:


On upgrade from 1.5.0 to 1.6.0 I have a problem with the hivethriftserver2, I 
have this code:

val hiveContext = new HiveContext(SparkContext.getOrCreate(conf));

val thing = 
hiveContext.read.parquet("hdfs://dkclusterm1.imp.net:8020/user/jegreen1/ex208<http://dkclusterm1.imp.net:8020/user/jegreen1/ex208>")

thing.registerTempTable("thing")

HiveThriftServer2.startWithContext(hiveContext)


When I start things up on the cluster my hive-site.xml is found – I can see 
that the metastore connects:


INFO  metastore - Trying to connect to metastore with URI 
thrift://dkclusterm2.imp.net:9083<http://dkclusterm2.imp.net:9083>
INFO  metastore - Connected to metastore.


But then later on the thrift server seems not to connect to the remote hive 
metastore but to start a derby instance instead:

INFO  AbstractService - Service:CLIService is started.
INFO  ObjectStore - ObjectStore, initialize called
INFO  Query - Reading in results for query 
"org.datanucleus.store.rdbms.query.SQLQuery@0<mailto:org.datanucleus.store.rdbms.query.SQLQuery@0>"
 since the connection used is closing
INFO  MetaStoreDirectSql - Using direct SQL, underlying DB is DERBY
INFO  ObjectStore - Initialized ObjectStore
INFO  HiveMetaStore - 0: get_databases: default
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=get_databases: 
default
INFO  HiveMetaStore - 0: Shutting down the object store...
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=Shutting down the 
object store...
INFO  HiveMetaStore - 0: Metastore shutdown complete.
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=Metastore shutdown 
complete.
INFO  AbstractService - Service:ThriftBinaryCLIService is started.
INFO  AbstractService - Service:HiveServer2 is started.

On 1.5.0 the same bit of the log reads:

INFO  AbstractService - Service:CLIService is started.
INFO  metastore - Trying to connect to metastore with URI 
thrift://dkclusterm2.imp.net:9083<http://dkclusterm2.imp.net:9083>  *** 
ie 1.5.0 connects to remote hive
INFO  metastore - Connected to metastore.
INFO  AbstractService - Service:ThriftBinaryCLIService is started.
INFO  AbstractService - Service:HiveServer2 is started.
INFO  ThriftCLIService - Starting ThriftBinaryCLIService on port 1 with 
5...500 worker threads



So if I connect to this with JDBC I can see all the tables on the hive server – 
but not anything temporary – I guess they are going to derby.

I see someone on the databricks website is also having this problem.


Thanks

James
Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under the 
control of BAE Systems Applied Intelligence Limited, details of which can be 
found at http://www.baesystems.com/Businesses/index.htm.

Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under the 
control of BAE