Hi All,
We are facing a Serialization issue ,


This issue has been submit to JIRA

https://issues.apache.org/jira/browse/SPARK-2018





We have an application run on Spark on Power7 System .
But we meet an important issue about serialization.
The example HdfsWordCount can meet the problem.
./bin/run-example org.apache.spark.examples.streaming.HdfsWordCount localdir
We used Power7 (Big-Endian arch) and Redhat 6.4.
Big-Endian is the main cause since the example ran successfully in another
Power-based Little Endian setup.
here is the exception stack and log:
Spark Executor Command: "/opt/ibm/java-ppc64-70//bin/java" "-cp"
"/home/test1/spark-1.0.0-bin-hadoop2/lib::/home/test1/src/spark-1.0.0-bin-hadoop2/conf:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/test1/src/hadoop-2.3.0-cdh5.0.0/etc/hadoop/:/home/test1/src/hadoop-2.3.0-cdh5.0.0/etc/hadoop/"
"-XX:MaxPermSize=128m" "-Xdebug"
"-Xrunjdwp:transport=dt_socket,address=99999,server=y,suspend=n" "-Xms512M"
"-Xmx512M" "org.apache.spark.executor.CoarseGrainedExecutorBackend"
"akka.tcp://spark@9.186.105.141:60253/user/CoarseGrainedScheduler" "2"
"p7hvs7br16" "4" "akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker"
"app-20140604023054-0000"
========================================
14/06/04 02:31:20 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/06/04 02:31:21 INFO spark.SecurityManager: Changing view acls to:
test1,yifeng
14/06/04 02:31:21 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(test1, yifeng)
14/06/04 02:31:22 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/06/04 02:31:22 INFO Remoting: Starting remoting
14/06/04 02:31:22 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkExecutor@p7hvs7br16:39658]
14/06/04 02:31:22 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkExecutor@p7hvs7br16:39658]
14/06/04 02:31:22 INFO executor.CoarseGrainedExecutorBackend: Connecting to
driver: akka.tcp://spark@9.186.105.141:60253/user/CoarseGrainedScheduler
14/06/04 02:31:22 INFO worker.WorkerWatcher: Connecting to worker
akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker
14/06/04 02:31:23 INFO worker.WorkerWatcher: Successfully connected to
akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker
14/06/04 02:31:24 INFO executor.CoarseGrainedExecutorBackend: Successfully
registered with driver
14/06/04 02:31:24 INFO spark.SecurityManager: Changing view acls to:
test1,yifeng
14/06/04 02:31:24 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(test1, yifeng)
14/06/04 02:31:24 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/06/04 02:31:24 INFO Remoting: Starting remoting
14/06/04 02:31:24 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@p7hvs7br16:58990]
14/06/04 02:31:24 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@p7hvs7br16:58990]
14/06/04 02:31:24 INFO spark.SparkEnv: Connecting to MapOutputTracker:
akka.tcp://spark@9.186.105.141:60253/user/MapOutputTracker
14/06/04 02:31:25 INFO spark.SparkEnv: Connecting to BlockManagerMaster:
akka.tcp://spark@9.186.105.141:60253/user/BlockManagerMaster
14/06/04 02:31:25 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20140604023125-3f61
14/06/04 02:31:25 INFO storage.MemoryStore: MemoryStore started with
capacity 307.2 MB.
14/06/04 02:31:25 INFO network.ConnectionManager: Bound socket to port 39041
with id = ConnectionManagerId(p7hvs7br16,39041)
14/06/04 02:31:25 INFO storage.BlockManagerMaster: Trying to register
BlockManager
14/06/04 02:31:25 INFO storage.BlockManagerMaster: Registered BlockManager
14/06/04 02:31:25 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-7bce4e43-2833-4666-93af-bd97c327497b
14/06/04 02:31:25 INFO spark.HttpServer: Starting HTTP Server
14/06/04 02:31:25 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/06/04 02:31:26 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:39958
14/06/04 02:31:26 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 2
14/06/04 02:31:26 INFO executor.Executor: Running task ID 2
14/06/04 02:31:26 ERROR executor.Executor: Exception in task ID 2
java.io.InvalidClassException: scala.reflect.ClassTag$$anon$1; local class
incompatible: stream classdesc serialVersionUID = -8102093212602380348,
local class serialVersionUID = -4937928798201944954
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:678)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1678)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1827)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2047)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1971)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2047)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1971)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:409)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:76)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:607)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1078)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1949)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2047)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1971)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:409)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:76)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:607)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1078)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1949)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2047)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1971)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2047)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1971)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:409)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:76)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:607)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1078)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1949)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2047)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1971)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2047)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1971)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:409)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:76)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:607)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1078)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1949)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2047)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1971)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2047)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1971)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:409)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:76)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:607)
at java.lang.reflect.Method.invoke(Method.java:607)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1078)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1949)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2047)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1971)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1854)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:409)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at
org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1852)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1406)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:409)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:781)
14/06/04 02:31:26 ERROR executor.CoarseGrainedExecutorBackend: Driver
Disassociated [akka.tcp://sparkExecutor@p7hvs7br16:39658] ->
[akka.tcp://spark@9.186.105.141:60253] disassociated! Shutting down.

Now I have some new detection.
(1) Compare
server Spark exec mode pass or not jdk 
x86(Little Endian) Local+cluster pass x86
p8(Little Endian) Local+cluster pass IBM(little endian)
P7(Big Endian) Local mode pass(I change some jar classpath then can't pass)
IBM(Big endian)
P7 (Big Endian) Cluster mode not IBM(Big endian)
(2) The Exception priciple
2.1 Main Error : Exception in thread "main" org.apache.spark.SparkException:
Job aborted due to stage failure: Task 1.0:0 failed 4 times, most recent
failure: Exception failure in TID 3 on host arlab105.austin.ibm.com:
java.io.InvalidClassException: org.apache.spark.SerializableWritable; local
class incompatible: stream classdesc serialVersionUID = 6301214776158303468,
local class serialVersionUID = -7785455416944904980
(other may has the same reason)
2.2 Exception in thread "main" org.apache.spark.SparkException: Job aborted
due to stage failure: Task 0.0:0 failed 1 times, most recent failure:
Exception failure in TID 1 on host localhost: java.io.InvalidClassException:
scala.Tuple2; invalid descriptor for field _1
Now we analysis 2.1 Bug .
refer:
serialVersionUID has two generate method
1 default 1Lprivate static final long serialVersionUID = 1L
2 Generated by hash. Class name ,interface name ,method ,attribute can
affect the result.
Our error is not 1L .So it generated by method 2.
UID is used when the process deserialize the byte array .the process read
the local class file ,and find the class's UID.If it is diff with the array
.Then throw the Exception.
Let's see the work flow of Spark Serilization
Local mode 
once serialize 
object ----serialize(thread1 or thread2)-->array----deserialize(thread2 or
process2)--->object
Cluster mode
twice serialize
object ---serialize(thread1 or thread2)-->array-Actor send message serialize
--->message-->Actor receive and deserialize it ----->array
------deserialize(thread2 or process2)--->object
summary:
let't compare (1) 's four situation.
I think the reason is that IBM jdk and (scala lib and akka lib) may have
some intersection of some class. But they compile in diff platform use diff
javac .They may generate diff UID. 
In run time .jvm may load the same class from diff .class file.
(3)Method to fix it.
I think
The reason is the same class load diff class file.
There are two method .May be there are other better method.
4.1 Let the two file has the same version UID:Compile scala lib and akka lib
in P7 platform
4.2 Let the two loader load the same Jar. Use some method like extend class
loader or OSGI .We force the jvm to load the same class file.(The difficult
thing is that classes is in jar and class num is too large .)




But  I want to know  what's the real reason of this bug ?
How can we fast fix this bug?


Thanks a lot

Best Regards!
Yanjie Gao 



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Big-Endian-IBM-Power7-Spark-Serialization-issue-tp7003.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Reply via email to