Alright, here is my full code:

*//paragraph one*
z.load("org.apache.spark:spark-streaming-kinesis-asl_2.10:1.5.2")

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.s3.AmazonS3Client


import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis._
import org.apache.spark.storage.StorageLevel

import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse

*//paragraph 2*
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials
val endpointUrl = "kinesis.us-east-1.amazonaws.com"
val appName = "StoredashCollector-debug"
val streamName = XXXXX
val windowDuration = Seconds(10)
val rememberDuration = Minutes(7)

val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numStreams =
kinesisClient.describeStream(streamName).getStreamDescription.getShards.size
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()

// problematic line here
val ssc = new StreamingContext(sc,windowDuration)
ssc.remember(rememberDuration)

case class LogEvent(
                     DataType: Option[String]
                     , RequestType: Option[String]
                     , SessionId: Option[String]
                     , HostName: Option[String]
                     , TransactionAffiliation: Option[String]
                     , SkuStockOutFromProductDetail: List[String]
                     , SkuStockOutFromShelf: List[String]
                     , TransactionId: Option[String]
                     , ProductId: Option[String]
                   )

val streams = (0 until 1).map { i =>
  KinesisUtils.createStream(ssc, appName, streamName, endpointUrl,
regionName,
    InitialPositionInStream.LATEST, windowDuration,
StorageLevel.MEMORY_ONLY)
}

val unionStream = ssc.union(streams).map(byteArray => new String(byteArray))

// used by json4s
implicit val formats = DefaultFormats
val eventsStream = unionStream.map{
    str => parse(str).extract[LogEvent]
}

eventsStream.print(10)

ssc.start()

*// output of the previous paragraph including error message:*

credentials: com.amazonaws.auth.AWSCredentials =
com.amazonaws.auth.BasicSessionCredentials@1181e472 endpointUrl: String =
kinesis.us-east-1.amazonaws.com appName: String = StoredashCollector-debug
streamName: String = XXXX windowDuration:
org.apache.spark.streaming.Duration = 10000 ms rememberDuration:
org.apache.spark.streaming.Duration = 420000 ms kinesisClient:
com.amazonaws.services.kinesis.AmazonKinesisClient =
com.amazonaws.services.kinesis.AmazonKinesisClient@275e9422 numStreams: Int
= 2 regionName: String = us-east-1 <console>:45: error: overloaded method
constructor StreamingContext with alternatives: (path: String,sparkContext:
org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.SparkContext)org.apache.spark.streaming.StreamingContext
<and> (path: String,hadoopConf:
org.apache.hadoop.conf.Configuration)org.apache.spark.streaming.StreamingContext
<and> (conf: org.apache.spark.SparkConf,batchDuration:
org.apache.spark.streaming.Duration)org.apache.spark.streaming.StreamingContext
<and> (sparkContext:
org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.SparkContext,batchDuration:
org.apache.spark.streaming.Duration)org.apache.spark.streaming.StreamingContext
cannot be applied to
(org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.SparkContext,
org.apache.spark.streaming.Duration) val ssc = new
StreamingContext(sc,windowDuration)


That's it; I've sent you my cluster Id per e-mail.

FA

On 27 January 2016 at 21:51, Jonathan Kelly <jonathaka...@gmail.com> wrote:

> That's a really weird issue! Sorry though, I can't seem to reproduce it,
> not in spark-shell nor in Zeppelin (on emr-4.3.0). This is all I'm running:
>
> import org.apache.spark.streaming._
> val ssc = new StreamingContext(sc, Seconds(1))
>
> Is there anything else you're running? Are you specifying any special
> configuration when creating the cluster? Do you have a cluster id I can
> take a look at? Also, can you reproduce this in spark-shell or just in
> Zeppelin?
>
> ~ Jonathan
>
> On Wed, Jan 27, 2016 at 12:46 PM Felipe Almeida <falmeida1...@gmail.com>
> wrote:
>
>> Hi Johnathan.
>>
>> That issue (using* z**.load("mavenproject"*) is solved indeed (thanks),
>> but now I can't create a StreamingContext from the current SparkContext
>> (variable sc).
>>
>> When trying to run *val ssc = new StreamingContext(sc,Seconds(1))*, I
>> get the following error:
>>
>> <console>:45: error: overloaded method constructor StreamingContext with
>> alternatives: (path: String,sparkContext:
>> org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.SparkContext)org.apache.spark.streaming.StreamingContext
>> <and> (path: String,hadoopConf:
>> org.apache.hadoop.conf.Configuration)org.apache.spark.streaming.StreamingContext
>> <and> (conf: org.apache.spark.SparkConf,batchDuration:
>> org.apache.spark.streaming.Duration)org.apache.spark.streaming.StreamingContext
>> <and> (sparkContext:
>> org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.SparkContext,batchDuration:
>> org.apache.spark.streaming.Duration)org.apache.spark.streaming.StreamingContext
>> cannot be applied to
>> (org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.org.apache.spark.SparkContext,
>> org.apache.spark.streaming.Duration) val ssc = new
>> StreamingContext(sc,Seconds(1))
>>
>> See how the modules leading up to the class name are funny:
>> org.apache.spark.org.apache.... it look's like it's it a loop. And one can
>> see in the error message that what I did was indeed use a SparkContext and
>> then a Duration, as per the docs.
>>
>> FA
>>
>> On 27 January 2016 at 02:14, Jonathan Kelly <jonathaka...@gmail.com>
>> wrote:
>>
>>> Yeah, I saw your self-reply on SO but did not have time to reply there
>>> also before leaving for the day. I hope emr-4.3.0 works well for you!
>>> Please let me know if you run into any other issues, especially with Spark
>>> or Zeppelin, or if you have any feature requests.
>>>
>>> Thanks,
>>> Jonathan
>>> On Tue, Jan 26, 2016 at 6:02 PM Felipe Almeida <falmeida1...@gmail.com>
>>> wrote:
>>>
>>>> Thank you very much Jonathan; I'll be sure to try out the new label as
>>>> soon as I can!
>>>>
>>>> I eventually found a workaround which involved passing the package in
>>>> the *--packages* for spark-submit, which was under
>>>> */usr/lib/zeppelin/conf/zeppelin.sh*:
>>>> http://stackoverflow.com/a/35006137/436721
>>>>
>>>> Anyway, thanks for the reply, will let you know if the problem persists.
>>>>
>>>> FA
>>>>
>>>> On 26 January 2016 at 23:19, Jonathan Kelly <jonathaka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Felipe,
>>>>>
>>>>> I'm really sorry you had to deal with this frustration, but the good
>>>>> news is that we on EMR have fixed this issue and just released the fix in
>>>>> emr-4.3.0 today. (At least, it is available in some regions today and
>>>>> should be available in the other regions by some time tomorrow.)
>>>>>
>>>>> BTW, this was discussed on another thread last month:
>>>>> http://apache-zeppelin-users-incubating-mailing-list.75479.x6.nabble.com/Zeppelin-notes-version-control-scheduler-and-external-deps-td1730.html
>>>>>  (Not
>>>>> sure why I show up with the name "Work". Oh well. =P)
>>>>>
>>>>> ~ Jonathan
>>>>>
>>>>> On Mon, Jan 25, 2016 at 5:21 PM Felipe Almeida <falmeida1...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I cannot, for the love of *** make `z.load("maven-project")` work for
>>>>>> Zeppelin 0.5.5 on Amazon Elastic MapReduce. I keep getting A *Null
>>>>>> Pointer Exception* due to, I think, something related to Sonatype.
>>>>>>
>>>>>> I've also asked a question on SO about this:
>>>>>> http://stackoverflow.com/questions/35005455/java-npe-when-loading-a-dependency-from-maven-from-within-zeppelin-on-aws-emr
>>>>>>
>>>>>> Has anyone gone through this?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> FA
>>>>>>
>>>>>> --
>>>>>> “Every time you stay out late; every time you sleep in; every time
>>>>>> you miss a workout; every time you don’t give 100% – You make it that 
>>>>>> much
>>>>>> easier for me to beat you.” - Unknown author
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> “Every time you stay out late; every time you sleep in; every time you
>>>> miss a workout; every time you don’t give 100% – You make it that much
>>>> easier for me to beat you.” - Unknown author
>>>>
>>>
>>
>>
>> --
>> “Every time you stay out late; every time you sleep in; every time you
>> miss a workout; every time you don’t give 100% – You make it that much
>> easier for me to beat you.” - Unknown author
>>
>


-- 
“Every time you stay out late; every time you sleep in; every time you miss
a workout; every time you don’t give 100% – You make it that much easier
for me to beat you.” - Unknown author

Reply via email to