Thanks a lot , it worked for me. I'm using single direct stream which retrieves data from all the topic.
*Thanks*, <https://in.linkedin.com/in/ramkumarcs31> On Mon, Nov 2, 2015 at 8:13 PM, Cody Koeninger <c...@koeninger.org> wrote: > combine topicsSet_1 and topicsSet_2 in a single createDirectStream call. > Then you can use hasOffsetRanges to see what the topic for a given > partition is. > > On Mon, Nov 2, 2015 at 7:26 AM, Ramkumar V <ramkumar.c...@gmail.com> > wrote: > >> if i try like below code snippet , it shows exception , how to avoid this >> exception ? how to switch processing based on topic ? >> >> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >> Durations.seconds(30)); >> HashSet<String> topicsSet_1 = new >> HashSet<String>(Arrays.asList(topics.split(","))); >> HashSet<String> topicsSet_2 = new >> HashSet<String>(Arrays.asList(topics.split(","))); >> HashMap<String, String> kafkaParams = new HashMap<String, String>(); >> kafkaParams.put("metadata.broker.list", brokers); >> JavaPairInputDStream<String, String> messages_1 = >> KafkaUtils.createDirectStream( >> jssc, >> String.class, >> String.class, >> StringDecoder.class, >> StringDecoder.class, >> kafkaParams, >> topicsSet_1 >> ); >> >> JavaPairInputDStream<String, String> messages_2 = >> KafkaUtils.createDirectStream( >> jssc, >> String.class, >> String.class, >> StringDecoder.class, >> StringDecoder.class, >> kafkaParams, >> topicsSet_2 >> ); >> >> * Log Trace* : >> >> [ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0] >> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler] >> swallowing exception during message send >> (akka.remote.RemoteTransportExceptionNoStackTrace) >> [ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0] >> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler] >> swallowing exception during message send >> (akka.remote.RemoteTransportExceptionNoStackTrace) >> [ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0] >> [akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler] >> swallowing exception during message send >> (akka.remote.RemoteTransportExceptionNoStackTrace) >> 15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw >> exception: java.io.IOException: Failed to delete >> somedomain/user/hdfs/spark_output/kyt_req/part-00055 >> 15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED, >> exitCode: 15, (reason: User class threw exception: java.io.IOException: >> Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055) >> java.io.IOException: Failed on local exception: >> java.io.InterruptedIOException: Interruped while waiting for IO on channel >> java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770 >> remote=somedomain]. 59994 millis timeout left.; Host Details : local host >> is: "somedomain"; destination host is: "somedomain":8020; >> java.io.IOException: Failed on local exception: >> java.io.InterruptedIOException: Interruped while waiting for IO on channel >> java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898 >> remote=somedomain]. 59998 millis timeout left.; Host Details : local host >> is: "somedomain"; destination host is: "somedomain; >> 15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw >> exception: java.lang.NullPointerException >> 15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED, >> exitCode: 15, (reason: User class threw exception: >> java.lang.NullPointerException) >> 15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering >> ApplicationMaster with FAILED (diag message: User class threw exception: >> java.lang.NullPointerException) >> java.io.IOException: Failed on local exception: >> java.io.InterruptedIOException: Interruped while waiting for IO on channel >> java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482 >> remote=somedomain]. 59991 millis timeout left.; Host Details : local host >> is: "somedomain"; destination host is: "somedomain":8020; >> [ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0] >> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler] >> swallowing exception during message send >> (akka.remote.RemoteTransportExceptionNoStackTrace) >> >> >> >> *Thanks*, >> <https://in.linkedin.com/in/ramkumarcs31> >> >> >> On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> Just put them all in one stream and switch processing based on the topic >>> >>> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V <ramkumar.c...@gmail.com> >>> wrote: >>> >>>> i want to join all those logs in some manner. That's what i'm trying to >>>> do. >>>> >>>> *Thanks*, >>>> <https://in.linkedin.com/in/ramkumarcs31> >>>> >>>> >>>> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sai.sai.s...@gmail.com> >>>> wrote: >>>> >>>>> I don't think Spark Streaming supports multiple streaming context in >>>>> one jvm, you cannot use in such way. Instead you could run multiple >>>>> streaming applications, since you're using Yarn. >>>>> >>>>> 2015年10月30日星期五,Ramkumar V <ramkumar.c...@gmail.com> 写道: >>>>> >>>>>> I found NPE is mainly because of im using the >>>>>> same JavaStreamingContext for some other kafka stream. if i change the >>>>>> name >>>>>> , its running successfully. how to run multiple JavaStreamingContext in >>>>>> a program ? I'm getting following exception if i run >>>>>> multiple JavaStreamingContext in single file. >>>>>> >>>>>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: >>>>>> FAILED, exitCode: 15, (reason: User class threw exception: >>>>>> java.lang.IllegalStateException: Only one StreamingContext may be started >>>>>> in this JVM. Currently running StreamingContext was started >>>>>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622) >>>>>> >>>>>> >>>>>> *Thanks*, >>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>> >>>>>> >>>>>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sai.sai.s...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> From the code, I think this field "rememberDuration" shouldn't be >>>>>>> null, it will be verified at the start, unless some place changes it's >>>>>>> value in the runtime that makes it null, but I cannot image how this >>>>>>> happened. Maybe you could add some logs around the place where exception >>>>>>> happens if you could reproduce it. >>>>>>> >>>>>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> No. this is the only exception that im getting multiple times in my >>>>>>>> log. Also i was reading some other topics earlier but im not faced >>>>>>>> this NPE. >>>>>>>> >>>>>>>> *Thanks*, >>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao < >>>>>>>> sai.sai.s...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I just did a local test with your code, seems everything is fine, >>>>>>>>> the only difference is that I use the master branch, but I don't >>>>>>>>> think it >>>>>>>>> changes a lot in this part. Do you met any other exceptions or errors >>>>>>>>> beside this one? Probably this is due to other exceptions that makes >>>>>>>>> this >>>>>>>>> system unstable. >>>>>>>>> >>>>>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V < >>>>>>>>> ramkumar.c...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> No, i dont have any special settings. if i keep only reading line >>>>>>>>>> in my code, it's throwing NPE. >>>>>>>>>> >>>>>>>>>> *Thanks*, >>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao < >>>>>>>>>> sai.sai.s...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Do you have any special settings, from your code, I don't think >>>>>>>>>>> it will incur NPE at that place. >>>>>>>>>>> >>>>>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V < >>>>>>>>>>> ramkumar.c...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> spark version - spark 1.4.1 >>>>>>>>>>>> >>>>>>>>>>>> my code snippet: >>>>>>>>>>>> >>>>>>>>>>>> String brokers = "ip:port,ip:port"; >>>>>>>>>>>> String topics = "x,y,z"; >>>>>>>>>>>> HashSet<String> TopicsSet = new >>>>>>>>>>>> HashSet<String>(Arrays.asList(topics.split(","))); >>>>>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, >>>>>>>>>>>> String>(); >>>>>>>>>>>> kafkaParams.put("metadata.broker.list", brokers); >>>>>>>>>>>> >>>>>>>>>>>> JavaPairInputDStream<String, String> messages = >>>>>>>>>>>> KafkaUtils.createDirectStream( >>>>>>>>>>>> jssc, >>>>>>>>>>>> String.class, >>>>>>>>>>>> String.class, >>>>>>>>>>>> StringDecoder.class, >>>>>>>>>>>> StringDecoder.class, >>>>>>>>>>>> kafkaParams, >>>>>>>>>>>> TopicsSet >>>>>>>>>>>> ); >>>>>>>>>>>> >>>>>>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String , >>>>>>>>>>>> String>,Void> () { >>>>>>>>>>>> public Void call(JavaPairRDD<String , String> >>>>>>>>>>>> tuple) { >>>>>>>>>>>> JavaRDD<String>rdd = tuple.values(); >>>>>>>>>>>> >>>>>>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output"); >>>>>>>>>>>> return null; >>>>>>>>>>>> } >>>>>>>>>>>> }); >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> *Thanks*, >>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao < >>>>>>>>>>>> sai.sai.s...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> What Spark version are you using, also a small code snippet of >>>>>>>>>>>>> how you use Spark Streaming would be greatly helpful. >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V < >>>>>>>>>>>>> ramkumar.c...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting >>>>>>>>>>>>>> this exception. Any idea for this ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> *Thanks*, >>>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V < >>>>>>>>>>>>>> ramkumar.c...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm trying to read from kafka stream and printing it >>>>>>>>>>>>>>> textfile. I'm using java over spark. I dont know why i'm >>>>>>>>>>>>>>> getting the >>>>>>>>>>>>>>> following exception. Also exception message is very abstract. >>>>>>>>>>>>>>> can anyone >>>>>>>>>>>>>>> please help me ? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Log Trace : >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job >>>>>>>>>>>>>>> generator >>>>>>>>>>>>>>> java.lang.NullPointerException >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org >>>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class >>>>>>>>>>>>>>> threw exception: java.lang.NullPointerException >>>>>>>>>>>>>>> java.lang.NullPointerException >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org >>>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Thanks*, >>>>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >> >