So running it using spark-submit doesnt change anything, it still works. When reading the code https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100 it looks like the receivers are definitely being ser/de. I think this is the issue, need to find a way to confirm that now...
2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eu...@gmail.com>: > Hey, > > A quick update on other things that have been tested. > > When looking at the compiled code of the spark-streaming-kinesis-asl jar > everything looks normal (there is a class that implements SyncMap and it is > used inside the receiver). > Starting a spark shell and using introspection to instantiate a receiver > and check that blockIdToSeqNumRanges implements SyncMap works too. So > obviously it has the correct type according to that. > > Another thing to test could be to do the same introspection stuff but > inside a spark job to make sure it is not a problem in the way the jobs are > run. > The other idea would be that this is a problem related to ser/de. For > example if the receiver was being serialized and then deserialized it could > definitely happen depending on the lib used and its configuration that it > just doesn't preserve the concrete type. So it would deserialize using the > compile type instead of the runtime type. > > Cheers, > Eugen > > > 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <j...@nanthrax.net>: > >> Thanks for the update Phil. >> >> I'm preparing a environment to reproduce it. >> >> I keep you posted. >> >> Thanks again, >> Regards >> JB >> >> On 10/15/2015 08:36 AM, Phil Kallos wrote: >> >>> Not a dumb question, but yes I updated all of the library references to >>> 1.5, including (even tried 1.5.1). >>> >>> // Versions.spark set elsewhere to "1.5.0" >>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark % >>> "provided" >>> >>> I am experiencing the issue in my own spark project, but also when I try >>> to run the spark streaming kinesis example that comes in spark/examples >>> >>> Tried running the streaming job locally, and also in EMR with release >>> 4.1.0 that includes Spark 1.5 >>> >>> Very strange! >>> >>> ---------- Forwarded message ---------- >>> >>> From: "Jean-Baptiste Onofré" <j...@nanthrax.net <mailto: >>> j...@nanthrax.net>> >>> To: user@spark.apache.org <mailto:user@spark.apache.org> >>> >>> Cc: >>> Date: Thu, 15 Oct 2015 08:03:55 +0200 >>> Subject: Re: Spark 1.5 Streaming and Kinesis >>> Hi Phil, >>> KinesisReceiver is part of extra. Just a dumb question: did you >>> update all, including the Spark Kinesis extra containing the >>> KinesisReceiver ? >>> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we >>> see: >>> blockIdToSeqNumRanges.clear() >>> which is a: >>> private val blockIdToSeqNumRanges = new >>> mutable.HashMap[StreamBlockId, SequenceNumberRanges] >>> with mutable.SynchronizedMap[StreamBlockId, >>> SequenceNumberRanges] >>> So, it doesn't look fully correct to me. >>> Let me investigate a bit this morning. >>> Regards >>> JB >>> On 10/15/2015 07:49 AM, Phil Kallos wrote: >>> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis >>> streaming applications, to take advantage of the new Kinesis >>> checkpointing improvements in 1.5. >>> However after upgrading, we are consistently seeing the following >>> error: >>> java.lang.ClassCastException: scala.collection.mutable.HashMap >>> cannot be >>> cast to scala.collection.mutable.SynchronizedMap >>> at >>> >>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175) >>> at >>> >>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) >>> at >>> >>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) >>> at >>> >>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542) >>> at >>> >>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532) >>> at >>> >>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984) >>> at >>> >>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> I even get this when running the Kinesis examples : >>> >>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html >>> with >>> bin/run-example streaming.KinesisWordCountASL >>> Am I doing something incorrect? >>> >>> >>> -- >>> Jean-Baptiste Onofré >>> jbono...@apache.org <mailto:jbono...@apache.org> >>> http://blog.nanthrax.net <http://blog.nanthrax.net/> >>> Talend - http://www.talend.com <http://www.talend.com/> >>> >>> Hi, >>> >>> >> -- >> Jean-Baptiste Onofré >> jbono...@apache.org >> http://blog.nanthrax.net >> Talend - http://www.talend.com >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >