I use spark 1.4.1 on EMR cluster with YARN

Sharing my code:

PARAGRAPH #1

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import scala.io.Source
@transient val suffixesStr = "hello\nworld"
@transient val suffList = suffixesStr.lines.filter(line =>
!line.startsWith("//") && line.trim() != "")
val suffListRDD = sc.parallelize(suffList.toList)
val schemaString = "suffix"
val schemaItemsCount = schemaString.count(_ == ' ') + 1
// filtering to get only valid rows
val eventsRDD = suffListRDD.filter(x => "\\t".r.findAllIn(x).length + 1 ==
schemaItemsCount)
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val schema =StructType(schemaString.split(" ").map(fieldName =>
StructField(fieldName, StringType, true)))
// Convert records of the RDD (eventsRDD) to Rows.
// split with preserve tokens
val rowRDD = eventsRDD.map(_.split("\\t", -1)).map(p => Row(p(0)))
val eventsDF = sqlContext.createDataFrame(rowRDD, schema)
eventsDF.registerTempTable("events")
// Uncomment these if you like to verify the logic in same paragraph
//val cnt = sqlContext.sql("SELECT count(*) from events")
//cnt.take(10).foreach(println)
----------------OUTPUT---------------
sqlContext: org.apache.spark.sql.SQLContext =
org.apache.spark.sql.SQLContext@36e407de import scala.io.Source
suffixesStr: String = hello world suffList: Iterator[String] = non-empty
iterator suffListRDD: org.apache.spark.rdd.RDD[String] =
ParallelCollectionRDD[3] at parallelize at <console>:28 schemaString:
String = suffix schemaItemsCount: Int = 1 eventsRDD:
org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at
<console>:36 import org.apache.spark.sql.Row import
org.apache.spark.sql.types.{StructType, StructField, StringType} schema:
org.apache.spark.sql.types.StructType =
StructType(StructField(suffix,StringType,true)) rowRDD:
org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at
map at <console>:41 eventsDF: org.apache.spark.sql.DataFrame = [suffix:
string]

PARAGRAPH #2
%sql SELECT count(*) from events
----------------OUTPUT---------------
org.apache.spark.sql.AnalysisException: no such table events; line 1 pos 21
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)



As you asked, I also checked the bank example. It works just fine:

PARAGRAPH #1
@transient val suffixesStr = "hello;world" @transient val suffList =
suffixesStr.lines.filter(line => !line.startsWith("//") && line.trim() !=
"") val bankText = sc.parallelize(suffList.toList) case class
Bank(f1:String, f2:String) val bank =
bankText.map(s=>s.split(";")).map(s=>Bank(s(0),s(1)))
bank.toDF().registerTempTable("bank")
----------------OUTPUT---------------
suffixesStr: String = hello;world suffList: Iterator[String] = non-empty
iterator bankText: org.apache.spark.rdd.RDD[String] =
ParallelCollectionRDD[11] at parallelize at <console>:30 defined class Bank
bank: org.apache.spark.rdd.RDD[Bank] = MapPartitionsRDD[13] at map at
<console>:34

PARAGRAPH #2
%sql select count(*) from bank
----------------OUTPUT---------------
1


What could be the root cause?

2015-09-09 15:50 GMT+04:00 IT CTO <[email protected]>:

> I just tested it on my code and had not problem doing so.
> Can you share the code before the line eventsDF.registerTempTable("
> events")?
> Can you run the Zeppelin Load Data Into Table paragraph in the Zeppelin
> Tutorial and then try to run the select from bank on another note and see
> if it works?
>
> Generally speaking since they both run in the same JVM and share the same
> spark context you shouldn't have problem doing what you described.
>
> Thanks,
> Eran
>
> On Wed, Sep 9, 2015 at 2:30 PM Eugene <[email protected]> wrote:
>
>> Hello,
>>
>> In promo video tutorial for Zeppelin (
>> http://www.youtube.com/watch?v=_PQbVH_aO5E&feature=youtu.be) I see
>> demonstration of how to declare table 'bank' in one note and query this
>> table in another note.
>> I tried doing this and it did not work.
>>
>> I have one notebook with two notes:
>>
>> Note #1: // basically all data definition for second note
>> eventsDF.registerTempTable("events")
>>
>> Note #2: // querying defined table
>> %sql select * from events limit 10
>>
>>
>> Note #1 gets executed okay, Note #2 fails
>> org.apache.spark.sql.AnalysisException: no such table events;
>>
>> How should I proceed to make my table be available for Note #2?
>>
>> Thanks!
>>
>>
>> --
>>
>>
>> Best regards,
>> Eugene.
>>
> --
> Eran | "You don't need eyes to see, you need vision" (Faithless)
>



-- 


Best regards,
Eugene.

Reply via email to