Thanks a lot , it worked for me. I'm using single direct stream which
retrieves data from all the topic.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Mon, Nov 2, 2015 at 8:13 PM, Cody Koeninger <c...@koeninger.org> wrote:

> combine topicsSet_1 and topicsSet_2 in a single createDirectStream call.
> Then you can use hasOffsetRanges to see what the topic for a given
> partition is.
>
> On Mon, Nov 2, 2015 at 7:26 AM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> if i try like below code snippet , it shows exception , how to avoid this
>> exception ? how to switch processing based on topic ?
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.seconds(30));
>> HashSet<String> topicsSet_1 = new
>> HashSet<String>(Arrays.asList(topics.split(",")));
>> HashSet<String> topicsSet_2 = new
>> HashSet<String>(Arrays.asList(topics.split(",")));
>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>> kafkaParams.put("metadata.broker.list", brokers);
>> JavaPairInputDStream<String, String> messages_1 =
>> KafkaUtils.createDirectStream(
>>            jssc,
>>            String.class,
>>            String.class,
>>            StringDecoder.class,
>>            StringDecoder.class,
>>            kafkaParams,
>>            topicsSet_1
>>        );
>>
>> JavaPairInputDStream<String, String> messages_2 =
>> KafkaUtils.createDirectStream(
>>            jssc,
>>            String.class,
>>            String.class,
>>            StringDecoder.class,
>>            StringDecoder.class,
>>            kafkaParams,
>>             topicsSet_2
>>        );
>>
>> * Log Trace* :
>>
>> [ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> [ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> [ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> 15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw
>> exception: java.io.IOException: Failed to delete
>> somedomain/user/hdfs/spark_output/kyt_req/part-00055
>> 15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception: java.io.IOException:
>> Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055)
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770
>> remote=somedomain]. 59994 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain":8020;
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898
>> remote=somedomain]. 59998 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain;
>> 15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw
>> exception: java.lang.NullPointerException
>> 15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception:
>> java.lang.NullPointerException)
>> 15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering
>> ApplicationMaster with FAILED (diag message: User class threw exception:
>> java.lang.NullPointerException)
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482
>> remote=somedomain]. 59991 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain":8020;
>> [ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>>
>>
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Just put them all in one stream and switch processing based on the topic
>>>
>>> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V <ramkumar.c...@gmail.com>
>>> wrote:
>>>
>>>> i want to join all those logs in some manner. That's what i'm trying to
>>>> do.
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>> wrote:
>>>>
>>>>> I don't think Spark Streaming supports multiple streaming context in
>>>>> one jvm, you cannot use in such way. Instead you could run multiple
>>>>> streaming applications, since you're using Yarn.
>>>>>
>>>>> 2015年10月30日星期五,Ramkumar V <ramkumar.c...@gmail.com> 写道:
>>>>>
>>>>>> I found NPE is mainly because of im using the
>>>>>> same JavaStreamingContext for some other kafka stream. if i change the 
>>>>>> name
>>>>>> , its running successfully. how to run multiple JavaStreamingContext in
>>>>>> a program ?  I'm getting following exception if i run
>>>>>> multiple JavaStreamingContext in single file.
>>>>>>
>>>>>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status:
>>>>>> FAILED, exitCode: 15, (reason: User class threw exception:
>>>>>> java.lang.IllegalStateException: Only one StreamingContext may be started
>>>>>> in this JVM. Currently running StreamingContext was started
>>>>>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>>>>>>
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> From the code, I think this field "rememberDuration" shouldn't be
>>>>>>> null, it will be verified at the start, unless some place changes it's
>>>>>>> value in the runtime that makes it null, but I cannot image how this
>>>>>>> happened. Maybe you could add some logs around the place where exception
>>>>>>> happens if you could reproduce it.
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> No. this is the only exception that im getting multiple times in my
>>>>>>>> log. Also i was reading some other topics earlier but im not faced 
>>>>>>>> this NPE.
>>>>>>>>
>>>>>>>> *Thanks*,
>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <
>>>>>>>> sai.sai.s...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I just did a local test with your code, seems everything is fine,
>>>>>>>>> the only difference is that I use the master branch, but I don't 
>>>>>>>>> think it
>>>>>>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>>>>>>> beside this one? Probably this is due to other exceptions that makes 
>>>>>>>>> this
>>>>>>>>> system unstable.
>>>>>>>>>
>>>>>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <
>>>>>>>>> ramkumar.c...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> No, i dont have any special settings. if i keep only reading line
>>>>>>>>>> in my code, it's throwing NPE.
>>>>>>>>>>
>>>>>>>>>> *Thanks*,
>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <
>>>>>>>>>> sai.sai.s...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Do you have any special settings, from your code, I don't think
>>>>>>>>>>> it will incur NPE at that place.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <
>>>>>>>>>>> ramkumar.c...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> spark version - spark 1.4.1
>>>>>>>>>>>>
>>>>>>>>>>>> my code snippet:
>>>>>>>>>>>>
>>>>>>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>>>>>>> String topics = "x,y,z";
>>>>>>>>>>>> HashSet<String> TopicsSet = new
>>>>>>>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String,
>>>>>>>>>>>> String>();
>>>>>>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>>>>>>
>>>>>>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>>>>>>            jssc,
>>>>>>>>>>>>            String.class,
>>>>>>>>>>>>            String.class,
>>>>>>>>>>>>            StringDecoder.class,
>>>>>>>>>>>>            StringDecoder.class,
>>>>>>>>>>>>            kafkaParams,
>>>>>>>>>>>>             TopicsSet
>>>>>>>>>>>>        );
>>>>>>>>>>>>
>>>>>>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String ,
>>>>>>>>>>>> String>,Void> () {
>>>>>>>>>>>>             public Void call(JavaPairRDD<String , String>
>>>>>>>>>>>> tuple) {
>>>>>>>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>>>>>>>
>>>>>>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>>>>>>>                 return null;
>>>>>>>>>>>>             }
>>>>>>>>>>>>        });
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <
>>>>>>>>>>>> sai.sai.s...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What Spark version are you using, also a small code snippet of
>>>>>>>>>>>>> how you use Spark Streaming would be greatly helpful.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>>>>>>>>>>>>> ramkumar.c...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting
>>>>>>>>>>>>>> this exception. Any idea for this ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>>>>>>>> ramkumar.c...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm trying to read from kafka stream and printing it
>>>>>>>>>>>>>>> textfile. I'm using java over spark. I dont know why i'm 
>>>>>>>>>>>>>>> getting the
>>>>>>>>>>>>>>> following exception. Also exception message is very abstract.  
>>>>>>>>>>>>>>> can anyone
>>>>>>>>>>>>>>> please help me ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Log Trace :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>>>>>>>> generator
>>>>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>>>>> threw exception: java.lang.NullPointerException
>>>>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Reply via email to