Sure thing!
The main looks like:
--------------------------------------------------------------------------------------------------
val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
val kafkaConf = Map(
"zookeeper.connect" -> zookeeper,
"group.id" -> options.group,
"zookeeper.connection.timeout.ms" -> "10000",
"auto.commit.interval.ms" -> "1000",
"rebalance.max.retries" -> "25",
"bootstrap.servers" -> kafkaBrokers
)
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(kafkaConf, checkpointDirectory, topic, numThreads,
isProd)
}, createOnError = true)
ssc.start()
ssc.awaitTermination()
--------------------------------------------------------------------------------------------------
And createContext is defined as:
--------------------------------------------------------------------------------------------------
val batchDuration = Seconds(5)
val checkpointDuration = Seconds(20)
private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
def createContext(kafkaConf: Map[String, String],
checkpointDirectory: String,
topic: String,
numThreads: Int,
isProd: Boolean)
: StreamingContext = {
val sparkConf = new SparkConf().setAppName("***")
val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.checkpoint(checkpointDirectory)
val topicSet = topic.split(",").toSet
val groupId = kafkaConf.getOrElse("group.id", "")
val directKStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
directKStream.checkpoint(checkpointDuration)
val table = ***
directKStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.flatMap(rec => someFunc(rec))
.reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
.foreachPartition { partitionRec =>
val dbWrite = DynamoDBWriter()
partitionRec.foreach {
/* Update Dynamo Here */
}
}
/** Set up ZK Connection **/
val props = new Properties()
kafkaConf.foreach(param => props.put(param._1, param._2))
props.setProperty(AUTO_OFFSET_COMMIT, "false")
val consumerConfig = new ConsumerConfig(props)
assert(!consumerConfig.autoCommitEnable)
val zkClient = new ZkClient(consumerConfig.zkConnect,
consumerConfig.zkSessionTimeoutMs,
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
offsetRanges.foreach { osr =>
val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
ZkUtils.updatePersistentPath(zkClient, zkPath,
osr.untilOffset.toString)
}
}
ssc
}
On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <[email protected]> wrote:
> Sounds like something's not set up right... can you post a minimal code
> example that reproduces the issue?
>
> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <[email protected]> wrote:
>
>> Yeah. All messages are lost while the streaming job was down.
>>
>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <[email protected]>
>> wrote:
>>
>>> Are you actually losing messages then?
>>>
>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <[email protected]>
>>> wrote:
>>>
>>>> No; first batch only contains messages received after the second job
>>>> starts (messages come in at a steady rate of about 400/second).
>>>>
>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <[email protected]>
>>>> wrote:
>>>>
>>>>> Does the first batch after restart contain all the messages received
>>>>> while the job was down?
>>>>>
>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm using direct spark streaming (from kafka) with checkpointing, and
>>>>>> everything works well until a restart. When I shut down (^C) the first
>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a
>>>>>> series of 0
>>>>>> event batches that get queued (corresponding to the 1 minute when the
>>>>>> job
>>>>>> was down). Eventually, the batches would resume processing, and I
>>>>>> would see
>>>>>> that each batch has roughly 2000 events.
>>>>>>
>>>>>> I see that at the beginning of the second launch, the checkpoint dirs
>>>>>> are
>>>>>> found and "loaded", according to console output.
>>>>>>
>>>>>> Is this expected behavior? It seems like I might've configured
>>>>>> something
>>>>>> incorrectly, since I would expect with checkpointing that the
>>>>>> streaming job
>>>>>> would resume from checkpoint and continue processing from there
>>>>>> (without
>>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>>
>>>>>> Also, if I were to wait > 10 minutes or so before re-launching, there
>>>>>> would
>>>>>> be so many 0 event batches that the job would hang. Is this merely
>>>>>> something
>>>>>> to be "waited out", or should I set up some restart behavior/make a
>>>>>> config
>>>>>> change to discard checkpointing if the elapsed time has been too long?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> <
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>>>> >
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: [email protected]
>>>>>> For additional commands, e-mail: [email protected]
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>