Maybe post the before-code as in what was the code before you did the
loop (that worked)? I had similar situations where reviewing code
before (worked) and after (does not work) helped. Also, what helped is
the Scala REPL because I can see what are the object types being
returned by each statement.

Other than code, in the driver logs, you should see events that say
"Registered receiver for stream 0 from
akka.tcp://sp...@node5.acme.net:53135"

Now, if you goto "node5" and look at Spark or YarnContainer logs
(depending on who's doing RM), you should be able to see if the
receiver has any errors when trying to talk to kafka.



On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narr...@gmail.com> wrote:
> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, 
> but this may cause waves for me upstream (e.g., non-Java)
>
> Thanks for looking at this.  If anyone else can see a glaring issue in the 
> Java approach that would be appreciated.
>
> Thanks,
> Matt
>
> On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote:
>
>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>> equivalent (that I have tested to work):
>>
>> val kInStreams = (1 to 10).map{_ =>
>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>> across the cluster, one for each partition, potentially but active
>> receivers are only as many kafka partitions you have
>>
>> val kInMsg = 
>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>
>>
>>
>>
>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> wrote:
>>> So, this is scrubbed some for confidentiality, but the meat of it is as 
>>> follows.  Note, that if I substitute the commented section for the loop, I 
>>> receive messages from the topic.
>>>
>>> SparkConf sparkConf = new SparkConf();
>>> sparkConf.set("spark.streaming.unpersist", "true");
>>> sparkConf.set("spark.logConf", "true");
>>>
>>> Map<String, String> kafkaProps = new HashMap<>();
>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>> kafkaProps.put("group.id", groupId);
>>>
>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
>>> Seconds.apply(1));
>>> jsc.checkpoint("hdfs://<some_location>");
>>>
>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new 
>>> ArrayList<>(5);
>>>
>>> for (int i = 0; i < 5; i++) {
>>>    streamList.add(KafkaUtils.createStream(jsc,
>>>                                           String.class, ProtobufModel.class,
>>>                                           StringDecoder.class, 
>>> ProtobufModelDecoder.class,
>>>                                           kafkaProps,
>>>                                           Collections.singletonMap(topic, 
>>> 1),
>>>                                           StorageLevel.MEMORY_ONLY_SER()));
>>> }
>>>
>>> final JavaPairDStream<String, ProtobufModel> stream = 
>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>
>>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
>>> //                  KafkaUtils.createStream(jsc,
>>> //                                          String.class, 
>>> ProtobufModel.class,
>>> //                                          StringDecoder.class, 
>>> ProtobufModelDecoder.class,
>>> //                                          kafkaProps,
>>> //                                          Collections.singletonMap(topic, 
>>> 5),
>>> //                                          StorageLevel.MEMORY_ONLY_SER());
>>>
>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>>        new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>>>            @Override
>>>            public Tuple2<String, Integer> call(Tuple2<String, 
>>> ProtobufModel> tuple) throws Exception {
>>>                return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>            }
>>>        });
>>>
>>> … and futher Spark functions ...
>>>
>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote:
>>>
>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>
>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> 
>>>> wrote:
>>>>> Hey,
>>>>>
>>>>> Spark 1.1.0
>>>>> Kafka 0.8.1.1
>>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>>
>>>>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>>>>> via KafkaUtils.createStream with five threads in the topic map and consume
>>>>> messages fine.  Sifting through the user list and Google, I see that its
>>>>> possible to split the Kafka receiver among the Spark workers such that I 
>>>>> can
>>>>> have a receiver per topic, and have this distributed to workers rather 
>>>>> than
>>>>> localized to the driver.  I’m following something like this:
>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>>>>> multiple data streams can therefore be achieved by creating multiple input
>>>>> DStreams and configuring them to receive different partitions of the data
>>>>> stream from the source(s)."
>>>>>
>>>>> However, I’m not able to consume any messages from Kafka after I perform 
>>>>> the
>>>>> union operation.  Again, if I create a single, multi-threaded, receiver I
>>>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>>>> jssc.union(…) i get:
>>>>>
>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>>
>>>>>
>>>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>>>> incorrectly?
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>> Matt
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to