Additionally,

If I dial up/down the number of executor cores, this does what I want.  Thanks 
for the extra eyes!

mn

On Sep 25, 2014, at 12:34 PM, Matt Narrell <matt.narr...@gmail.com> wrote:

> Tim,
> 
> I think I understand this now.  I had a five node Spark cluster and a five 
> partition topic, and I created five receivers.  I found this:  
> http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming
>  Indicating that if I use all my workers as receivers, there are none left to 
> do the processing.  If I drop the number of partitions/receivers down while 
> still having multiple unioned receivers, I see messages.
> 
> mn
> 
> On Sep 25, 2014, at 10:18 AM, Matt Narrell <matt.narr...@gmail.com> wrote:
> 
>> I suppose I have other problems as I can’t get the Scala example to work 
>> either.  Puzzling, as I have literally coded like the examples (that are 
>> purported to work), but no luck.
>> 
>> mn
>> 
>> On Sep 24, 2014, at 11:27 AM, Tim Smith <secs...@gmail.com> wrote:
>> 
>>> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
>>> 
>>> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <matt.narr...@gmail.com> 
>>> wrote:
>>>> The part that works is the commented out, single receiver stream below the 
>>>> loop.  It seems that when I call KafkaUtils.createStream more than once, I 
>>>> don’t receive any messages.
>>>> 
>>>> I’ll dig through the logs, but at first glance yesterday I didn’t see 
>>>> anything suspect.  I’ll have to look closer.
>>>> 
>>>> mn
>>>> 
>>>> On Sep 23, 2014, at 6:14 PM, Tim Smith <secs...@gmail.com> wrote:
>>>> 
>>>>> Maybe post the before-code as in what was the code before you did the
>>>>> loop (that worked)? I had similar situations where reviewing code
>>>>> before (worked) and after (does not work) helped. Also, what helped is
>>>>> the Scala REPL because I can see what are the object types being
>>>>> returned by each statement.
>>>>> 
>>>>> Other than code, in the driver logs, you should see events that say
>>>>> "Registered receiver for stream 0 from
>>>>> akka.tcp://sp...@node5.acme.net:53135"
>>>>> 
>>>>> Now, if you goto "node5" and look at Spark or YarnContainer logs
>>>>> (depending on who's doing RM), you should be able to see if the
>>>>> receiver has any errors when trying to talk to kafka.
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narr...@gmail.com> 
>>>>> wrote:
>>>>>> To my eyes, these are functionally equivalent.  I’ll try a Scala 
>>>>>> approach, but this may cause waves for me upstream (e.g., non-Java)
>>>>>> 
>>>>>> Thanks for looking at this.  If anyone else can see a glaring issue in 
>>>>>> the Java approach that would be appreciated.
>>>>>> 
>>>>>> Thanks,
>>>>>> Matt
>>>>>> 
>>>>>> On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>>> 
>>>>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>>>>>> equivalent (that I have tested to work):
>>>>>>> 
>>>>>>> val kInStreams = (1 to 10).map{_ =>
>>>>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>>>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>>>>>> across the cluster, one for each partition, potentially but active
>>>>>>> receivers are only as many kafka partitions you have
>>>>>>> 
>>>>>>> val kInMsg = 
>>>>>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> 
>>>>>>> wrote:
>>>>>>>> So, this is scrubbed some for confidentiality, but the meat of it is 
>>>>>>>> as follows.  Note, that if I substitute the commented section for the 
>>>>>>>> loop, I receive messages from the topic.
>>>>>>>> 
>>>>>>>> SparkConf sparkConf = new SparkConf();
>>>>>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>>>>>> sparkConf.set("spark.logConf", "true");
>>>>>>>> 
>>>>>>>> Map<String, String> kafkaProps = new HashMap<>();
>>>>>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>>>>>> kafkaProps.put("group.id", groupId);
>>>>>>>> 
>>>>>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
>>>>>>>> Seconds.apply(1));
>>>>>>>> jsc.checkpoint("hdfs://<some_location>");
>>>>>>>> 
>>>>>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new 
>>>>>>>> ArrayList<>(5);
>>>>>>>> 
>>>>>>>> for (int i = 0; i < 5; i++) {
>>>>>>>> streamList.add(KafkaUtils.createStream(jsc,
>>>>>>>>                                        String.class, 
>>>>>>>> ProtobufModel.class,
>>>>>>>>                                        StringDecoder.class, 
>>>>>>>> ProtobufModelDecoder.class,
>>>>>>>>                                        kafkaProps,
>>>>>>>>                                        Collections.singletonMap(topic, 
>>>>>>>> 1),
>>>>>>>>                                        
>>>>>>>> StorageLevel.MEMORY_ONLY_SER()));
>>>>>>>> }
>>>>>>>> 
>>>>>>>> final JavaPairDStream<String, ProtobufModel> stream = 
>>>>>>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>>>>>> 
>>>>>>>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
>>>>>>>> //                  KafkaUtils.createStream(jsc,
>>>>>>>> //                                          String.class, 
>>>>>>>> ProtobufModel.class,
>>>>>>>> //                                          StringDecoder.class, 
>>>>>>>> ProtobufModelDecoder.class,
>>>>>>>> //                                          kafkaProps,
>>>>>>>> //                                          
>>>>>>>> Collections.singletonMap(topic, 5),
>>>>>>>> //                                          
>>>>>>>> StorageLevel.MEMORY_ONLY_SER());
>>>>>>>> 
>>>>>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>>>>>>>     new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() 
>>>>>>>> {
>>>>>>>>         @Override
>>>>>>>>         public Tuple2<String, Integer> call(Tuple2<String, 
>>>>>>>> ProtobufModel> tuple) throws Exception {
>>>>>>>>             return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>>>>>>         }
>>>>>>>>     });
>>>>>>>> 
>>>>>>>> … and futher Spark functions ...
>>>>>>>> 
>>>>>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>>>>>> 
>>>>>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell 
>>>>>>>>> <matt.narr...@gmail.com> wrote:
>>>>>>>>>> Hey,
>>>>>>>>>> 
>>>>>>>>>> Spark 1.1.0
>>>>>>>>>> Kafka 0.8.1.1
>>>>>>>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>>>>>>> 
>>>>>>>>>> I have a five partition Kafka topic.  I can create a single Kafka 
>>>>>>>>>> receiver
>>>>>>>>>> via KafkaUtils.createStream with five threads in the topic map and 
>>>>>>>>>> consume
>>>>>>>>>> messages fine.  Sifting through the user list and Google, I see that 
>>>>>>>>>> its
>>>>>>>>>> possible to split the Kafka receiver among the Spark workers such 
>>>>>>>>>> that I can
>>>>>>>>>> have a receiver per topic, and have this distributed to workers 
>>>>>>>>>> rather than
>>>>>>>>>> localized to the driver.  I’m following something like this:
>>>>>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>>>>>>>> But for Kafka obviously.  From the Streaming Programming Guide “ 
>>>>>>>>>> Receiving
>>>>>>>>>> multiple data streams can therefore be achieved by creating multiple 
>>>>>>>>>> input
>>>>>>>>>> DStreams and configuring them to receive different partitions of the 
>>>>>>>>>> data
>>>>>>>>>> stream from the source(s)."
>>>>>>>>>> 
>>>>>>>>>> However, I’m not able to consume any messages from Kafka after I 
>>>>>>>>>> perform the
>>>>>>>>>> union operation.  Again, if I create a single, multi-threaded, 
>>>>>>>>>> receiver I
>>>>>>>>>> can consume messages fine.  If I create 5 receivers in a loop, and 
>>>>>>>>>> call
>>>>>>>>>> jssc.union(…) i get:
>>>>>>>>>> 
>>>>>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>>>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>>>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>>>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>>>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Do I need to do anything to the unioned DStream?  Am I going about 
>>>>>>>>>> this
>>>>>>>>>> incorrectly?
>>>>>>>>>> 
>>>>>>>>>> Thanks in advance.
>>>>>>>>>> 
>>>>>>>>>> Matt
>>>>>>>>> 
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>> 
>>>> 
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>> 
> 

Reply via email to