Hello everyone,
We are testing checkpointing against YARN 2.7.1 with Spark 1.5. We are
trying to make sure checkpointing works with orderly shutdowns(i.e. yarn
application --kill) and unexpected shutdowns which we simulate with a kill
-9. If there is anyone who has successfully tested failover recently with
Kinesis+YARN, I would appreciate even the confirmation this should work.
We have a simple driver that does aggregate counts per minute against a
Kinesis stream. We see initial hanging behavior (~2- 10 minutes) in both
relaunches. When we do an "unexpected" shutdown of the application master
with "kill -9" of the jvm process, yarn successfully kills the orphan
executors, launches a new driver with new executors. The logs indicate the
restoring the checkpoint was successful. However, the first two Spark
streaming batches which are of 0 events intermittently hang for anywhere
between 2-10+ minutes with a SocketTimeoutException while doing a Kinesis
getRecords (stack trace at the end of this mail).
Under normal circumstances, Spark skips the transform and mapToPair stages
on 0 events. However when the executors hang, we notice that the job goes
through the transform stage and tasks hangs in making a getRecord call from
Kinesis for 2 minutes before emitting a "SocketTimeoutException: Read timed
out" for a Kinesis getRecords call.
Kinesis as a service should behave more gracefully even if it was fed bad
parameters but why does Spark call getRecords when the batch size is 0 when
relaunching?
Any input is greatly appreciated as we are stuck on testing failover.
Heji
I've put the stack trace below:
[2015-11-09 15:20:23,478] INFO Unable to execute HTTP request: Read timed
out (com.amazonaws.http.AmazonHttpClient)
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
at sun.security.ssl.InputRecord.read(InputRecord.java:532)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at
org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
at
org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
at
org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
at
com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151)
at
com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:176)
at
com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:408)
at
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2184)
at
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2165)
at
com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:279)
at
com.amazonaws.transform.JsonUnmarshallerContextImpl.readCurrentJsonTokenValue(JsonUnmarshallerContextImpl.java:129)
at
com.amazonaws.transform.JsonUnmarshallerContextImpl.readText(JsonUnmarshallerContextImpl.java:123)
at
com.amazonaws.transform.SimpleTypeJsonUnmarshallers$ByteBufferJsonUnmarshaller.unmarshall(SimpleTypeJsonUnmarshallers.java:185)
at
com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:58)
at
com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:31)
at
com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:93)
at
com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:43)
at
com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:50)
at
com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:31)
at
com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:106)
at
com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:42)
at
com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
at
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
at
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2490)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1142)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:214)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:214)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:258)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:213)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:199)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:155)
at
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:127)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:203)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)