Hi TD, The scenario here is to let events from topic1 wait a fixed 10 minutes for events with same key from topic2 to come and left outer join them by the key
does the query do what is expected? if not, what is the right way to achieve this? thanks, Renyi. On Tue, Oct 13, 2015 at 5:14 PM, Daniel Li <daniell...@gmail.com> wrote: > We have a scenario that events from three kafka topics sharing the same > keys need to be merged. One topic has the master events; most events in > other two topics arrive within 10 minutes of master event arrival. Wrote > pseudo code below. I'd love to hear your thoughts whether I am on the right > track. > > // Scenario > // (1) Merging events from Kafka topic1, topic2 and topic 3 > sharing the same keys > // (2) Events in topic1 are master events > // (3) One master event may have associated event in Topic2 and/or > Topic3 sharing the same key > // (4) Most events in topic2 and topic3 will arrive within 10 > minutes of the master event arrival > // > // Pseudo code > // Use 1-minute window of events in topic1, to left-outer-join > with next 10-minute of events from > // topic2 and topic3 > > > // parse the event to form key-value pair > def parse(v:String) = { > (v.split(",")(0), v) > } > > // Create context with 1 minute batch interval > val sparkConf = new SparkConf().setAppName("MergeLogs") > val ssc = new StreamingContext(sparkConf, Minutes(1)) > ssc.checkpoint(checkpointDirectory) > > // Create direct kafka stream with brokers and topics > val kafkaParams = Map[String, String]("metadata.broker.list" -> > brokers) > > val stream1 = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder]( > ssc, kafkaParams, Set(“topic1”) > stream1.checkpoint(Minutes(5) > val pairStream1 = stream1.map(_._2).map(s => parse(s)) > > val stream2 = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder]( > ssc, kafkaParams, Set(“topic2”) > stream2.checkpoint(Minutes(5) > val pairStream2 = stream2.map(_._2).map(s => parse(s)) > > val stream3 = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder]( > ssc, kafkaParams, Set(“topic3”) > stream3.checkpoint(Minutes(5) > val pairStream3 = stream3.map(_._2).map(s => parse(s)) > > // load 1 minute of master events from topic 1 > val windowedStream1 = pairStream1.window(Minutes(1)) > > // load 10 minutes of topic1 and topic2 > val windowedStream2 = pairStream2.window(Minutes(10), Minutes(1)) > val windowedStream3 = pairStream3.window(Minutes(10), Minutes(1)) > > // lefter join topic1 with topic2 and topic3 > *val joinedStream = > windowedStream1.leftOuterJoin(windowedStream2).leftOuterJoin(windowedStream3)* > > // dump merged events > joinedStream.foreachRDD { rdd => > val connection = createNewConnection() // executed at the driver > rdd.foreach { record => > connection.send(record) // executed at the worker > } > > // Start the computation > val ssc = StreamingContext.getOrCreate(checkpointDirectory, > () => { > createContext(ip, port, outputPath, checkpointDirectory) > }) > ssc.start() > ssc.awaitTermination() > > thx > Daniel > >