JB,

To clarify, you are able to run the Amazon Kinesis example provided in the
spark examples dir?

bin/run-example streaming.KinesisWordCountASL [app name] [stream name]
[endpoint url] ?

If it helps, below are the steps I used to build spark

mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package

And I did this with revision 4f894dd6906311cb57add6757690069a18078783
(v.1.5.1)

Thanks,
Phil


On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi <cepoi.eu...@gmail.com> wrote:

> So running it using spark-submit doesnt change anything, it still works.
>
> When reading the code
> https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
> it looks like the receivers are definitely being ser/de. I think this is
> the issue, need to find a way to confirm that now...
>
> 2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eu...@gmail.com>:
>
>> Hey,
>>
>> A quick update on other things that have been tested.
>>
>> When looking at the compiled code of the spark-streaming-kinesis-asl jar
>> everything looks normal (there is a class that implements SyncMap and it is
>> used inside the receiver).
>> Starting a spark shell and using introspection to instantiate a receiver
>> and check that blockIdToSeqNumRanges implements SyncMap works too. So
>> obviously it has the correct type according to that.
>>
>> Another thing to test could be to do the same introspection stuff but
>> inside a spark job to make sure it is not a problem in the way the jobs are
>> run.
>> The other idea would be that this is a problem related to ser/de. For
>> example if the receiver was being serialized and then deserialized it could
>> definitely happen depending on the lib used and its configuration that it
>> just doesn't preserve the concrete type. So it would deserialize using the
>> compile type instead of the runtime type.
>>
>> Cheers,
>> Eugen
>>
>>
>> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>>
>>> Thanks for the update Phil.
>>>
>>> I'm preparing a environment to reproduce it.
>>>
>>> I keep you posted.
>>>
>>> Thanks again,
>>> Regards
>>> JB
>>>
>>> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>>>
>>>> Not a dumb question, but yes I updated all of the library references to
>>>> 1.5, including  (even tried 1.5.1).
>>>>
>>>> // Versions.spark set elsewhere to "1.5.0"
>>>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
>>>> "provided"
>>>>
>>>> I am experiencing the issue in my own spark project, but also when I try
>>>> to run the spark streaming kinesis example that comes in spark/examples
>>>>
>>>> Tried running the streaming job locally, and also in EMR with release
>>>> 4.1.0 that includes Spark 1.5
>>>>
>>>> Very strange!
>>>>
>>>>     ---------- Forwarded message ----------
>>>>
>>>>     From: "Jean-Baptiste Onofré" <j...@nanthrax.net <mailto:
>>>> j...@nanthrax.net>>
>>>>     To: user@spark.apache.org <mailto:user@spark.apache.org>
>>>>
>>>>     Cc:
>>>>     Date: Thu, 15 Oct 2015 08:03:55 +0200
>>>>     Subject: Re: Spark 1.5 Streaming and Kinesis
>>>>     Hi Phil,
>>>>     KinesisReceiver is part of extra. Just a dumb question: did you
>>>>     update all, including the Spark Kinesis extra containing the
>>>>     KinesisReceiver ?
>>>>     I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we
>>>> see:
>>>>     blockIdToSeqNumRanges.clear()
>>>>     which is a:
>>>>     private val blockIdToSeqNumRanges = new
>>>>     mutable.HashMap[StreamBlockId, SequenceNumberRanges]
>>>>          with mutable.SynchronizedMap[StreamBlockId,
>>>> SequenceNumberRanges]
>>>>     So, it doesn't look fully correct to me.
>>>>     Let me investigate a bit this morning.
>>>>     Regards
>>>>     JB
>>>>     On 10/15/2015 07:49 AM, Phil Kallos wrote:
>>>>     We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>>>>     streaming applications, to take advantage of the new Kinesis
>>>>     checkpointing improvements in 1.5.
>>>>     However after upgrading, we are consistently seeing the following
>>>> error:
>>>>     java.lang.ClassCastException: scala.collection.mutable.HashMap
>>>> cannot be
>>>>     cast to scala.collection.mutable.SynchronizedMap
>>>>     at
>>>>
>>>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>>>>     at
>>>>
>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>>>>     at
>>>>
>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>>>>     at
>>>>
>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>>>>     at
>>>>
>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
>>>>     at
>>>>
>>>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>>>     at
>>>>
>>>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>>>     at
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>>>     at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>     at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>     at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>     I even get this when running the Kinesis examples :
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>>> with
>>>>     bin/run-example streaming.KinesisWordCountASL
>>>>     Am I doing something incorrect?
>>>>
>>>>
>>>>     --
>>>>     Jean-Baptiste Onofré
>>>>     jbono...@apache.org <mailto:jbono...@apache.org>
>>>>     http://blog.nanthrax.net <http://blog.nanthrax.net/>
>>>>     Talend - http://www.talend.com <http://www.talend.com/>
>>>>
>>>>     Hi,
>>>>
>>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

Reply via email to