The part that works is the commented out, single receiver stream below the 
loop.  It seems that when I call KafkaUtils.createStream more than once, I 
don’t receive any messages.

I’ll dig through the logs, but at first glance yesterday I didn’t see anything 
suspect.  I’ll have to look closer.

mn

On Sep 23, 2014, at 6:14 PM, Tim Smith <secs...@gmail.com> wrote:

> Maybe post the before-code as in what was the code before you did the
> loop (that worked)? I had similar situations where reviewing code
> before (worked) and after (does not work) helped. Also, what helped is
> the Scala REPL because I can see what are the object types being
> returned by each statement.
> 
> Other than code, in the driver logs, you should see events that say
> "Registered receiver for stream 0 from
> akka.tcp://sp...@node5.acme.net:53135"
> 
> Now, if you goto "node5" and look at Spark or YarnContainer logs
> (depending on who's doing RM), you should be able to see if the
> receiver has any errors when trying to talk to kafka.
> 
> 
> 
> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narr...@gmail.com> wrote:
>> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, 
>> but this may cause waves for me upstream (e.g., non-Java)
>> 
>> Thanks for looking at this.  If anyone else can see a glaring issue in the 
>> Java approach that would be appreciated.
>> 
>> Thanks,
>> Matt
>> 
>> On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote:
>> 
>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>> equivalent (that I have tested to work):
>>> 
>>> val kInStreams = (1 to 10).map{_ =>
>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>> across the cluster, one for each partition, potentially but active
>>> receivers are only as many kafka partitions you have
>>> 
>>> val kInMsg = 
>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> 
>>> wrote:
>>>> So, this is scrubbed some for confidentiality, but the meat of it is as 
>>>> follows.  Note, that if I substitute the commented section for the loop, I 
>>>> receive messages from the topic.
>>>> 
>>>> SparkConf sparkConf = new SparkConf();
>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>> sparkConf.set("spark.logConf", "true");
>>>> 
>>>> Map<String, String> kafkaProps = new HashMap<>();
>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>> kafkaProps.put("group.id", groupId);
>>>> 
>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
>>>> Seconds.apply(1));
>>>> jsc.checkpoint("hdfs://<some_location>");
>>>> 
>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new 
>>>> ArrayList<>(5);
>>>> 
>>>> for (int i = 0; i < 5; i++) {
>>>>   streamList.add(KafkaUtils.createStream(jsc,
>>>>                                          String.class, ProtobufModel.class,
>>>>                                          StringDecoder.class, 
>>>> ProtobufModelDecoder.class,
>>>>                                          kafkaProps,
>>>>                                          Collections.singletonMap(topic, 
>>>> 1),
>>>>                                          StorageLevel.MEMORY_ONLY_SER()));
>>>> }
>>>> 
>>>> final JavaPairDStream<String, ProtobufModel> stream = 
>>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>> 
>>>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
>>>> //                  KafkaUtils.createStream(jsc,
>>>> //                                          String.class, 
>>>> ProtobufModel.class,
>>>> //                                          StringDecoder.class, 
>>>> ProtobufModelDecoder.class,
>>>> //                                          kafkaProps,
>>>> //                                          
>>>> Collections.singletonMap(topic, 5),
>>>> //                                          
>>>> StorageLevel.MEMORY_ONLY_SER());
>>>> 
>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>>>       new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>>>>           @Override
>>>>           public Tuple2<String, Integer> call(Tuple2<String, 
>>>> ProtobufModel> tuple) throws Exception {
>>>>               return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>>           }
>>>>       });
>>>> 
>>>> … and futher Spark functions ...
>>>> 
>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote:
>>>> 
>>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>> 
>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> 
>>>>> wrote:
>>>>>> Hey,
>>>>>> 
>>>>>> Spark 1.1.0
>>>>>> Kafka 0.8.1.1
>>>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>>> 
>>>>>> I have a five partition Kafka topic.  I can create a single Kafka 
>>>>>> receiver
>>>>>> via KafkaUtils.createStream with five threads in the topic map and 
>>>>>> consume
>>>>>> messages fine.  Sifting through the user list and Google, I see that its
>>>>>> possible to split the Kafka receiver among the Spark workers such that I 
>>>>>> can
>>>>>> have a receiver per topic, and have this distributed to workers rather 
>>>>>> than
>>>>>> localized to the driver.  I’m following something like this:
>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>>>> But for Kafka obviously.  From the Streaming Programming Guide “ 
>>>>>> Receiving
>>>>>> multiple data streams can therefore be achieved by creating multiple 
>>>>>> input
>>>>>> DStreams and configuring them to receive different partitions of the data
>>>>>> stream from the source(s)."
>>>>>> 
>>>>>> However, I’m not able to consume any messages from Kafka after I perform 
>>>>>> the
>>>>>> union operation.  Again, if I create a single, multi-threaded, receiver I
>>>>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>>>>> jssc.union(…) i get:
>>>>>> 
>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>>> 
>>>>>> 
>>>>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>>>>> incorrectly?
>>>>>> 
>>>>>> Thanks in advance.
>>>>>> 
>>>>>> Matt
>>>>> 
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>> 
>>>> 
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to