Hello all,
I am using Spark 1.0.2 and I have a custom receiver that works well.
I tried adding Kryo serialization to SparkConf:
val spark = new SparkConf()
…..
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
and I am getting a strange error that I am not sure how to solve:
Exception in thread "Thread-37" org.apache.spark.SparkException: Job aborted
due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception
failure in TID 0 on host localhost: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't
call Map.put
Serialization trace:
object (com.typesafe.config.impl.SimpleConfig)
atomFeedConf (com.twc.needle.cp.AtomFeedReceiver)
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
……
Here is part of my custom receiver:
class AtomFeedReceiver
extends Receiver[String](StorageLevel.MEMORY_ONLY_SER) {
private val conf = ConfigFactory.load
private val atomFeedConf = conf.getConfig("cp.spark.atomfeed")
private val atomFeedUrl = atomFeedConf.getString("url")
private val urlConnTimeout = atomFeedConf.getInt("url_conn_timeout")
private val urlReadTimeout = atomFeedConf.getInt("url_read_timeout")
private val username = atomFeedConf.getString("username")
private val password = atomFeedConf.getString("password")
private val keepFeedCriteria = atomFeedConf.getString("keep_feed_criteria")
private val feedTrackerDir = atomFeedConf.getString("feed_tracker_dir")
private val feedTrackerFileName =
atomFeedConf.getString("feed_tracker_file_name")
private val enableSampling = atomFeedConf.getBoolean("enable_sampling”)
…..
Here is how I am calling the receiver in the main method:
val logLineStreamRaw = ssc.receiverStream(new AtomFeedReceiver)
Any idea why Spark needs the Config object to be mutable only when Kryo
serialization is enabled?
Thanks.
________________________________
This E-mail and any of its attachments may contain Time Warner Cable
proprietary information, which is privileged, confidential, or subject to
copyright belonging to Time Warner Cable. This E-mail is intended solely for
the use of the individual or entity to which it is addressed. If you are not
the intended recipient of this E-mail, you are hereby notified that any
dissemination, distribution, copying, or action taken in relation to the
contents of and attachments to this E-mail is strictly prohibited and may be
unlawful. If you have received this E-mail in error, please notify the sender
immediately and permanently delete the original and any copy of this E-mail and
any printout.