The part that works is the commented out, single receiver stream below the loop. It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages.
I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect. I’ll have to look closer. mn On Sep 23, 2014, at 6:14 PM, Tim Smith <secs...@gmail.com> wrote: > Maybe post the before-code as in what was the code before you did the > loop (that worked)? I had similar situations where reviewing code > before (worked) and after (does not work) helped. Also, what helped is > the Scala REPL because I can see what are the object types being > returned by each statement. > > Other than code, in the driver logs, you should see events that say > "Registered receiver for stream 0 from > akka.tcp://sp...@node5.acme.net:53135" > > Now, if you goto "node5" and look at Spark or YarnContainer logs > (depending on who's doing RM), you should be able to see if the > receiver has any errors when trying to talk to kafka. > > > > On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narr...@gmail.com> wrote: >> To my eyes, these are functionally equivalent. I’ll try a Scala approach, >> but this may cause waves for me upstream (e.g., non-Java) >> >> Thanks for looking at this. If anyone else can see a glaring issue in the >> Java approach that would be appreciated. >> >> Thanks, >> Matt >> >> On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote: >> >>> Sorry, I am almost Java illiterate but here's my Scala code to do the >>> equivalent (that I have tested to work): >>> >>> val kInStreams = (1 to 10).map{_ => >>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic" >>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers >>> across the cluster, one for each partition, potentially but active >>> receivers are only as many kafka partitions you have >>> >>> val kInMsg = >>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) >>> >>> >>> >>> >>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> >>> wrote: >>>> So, this is scrubbed some for confidentiality, but the meat of it is as >>>> follows. Note, that if I substitute the commented section for the loop, I >>>> receive messages from the topic. >>>> >>>> SparkConf sparkConf = new SparkConf(); >>>> sparkConf.set("spark.streaming.unpersist", "true"); >>>> sparkConf.set("spark.logConf", "true"); >>>> >>>> Map<String, String> kafkaProps = new HashMap<>(); >>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka"); >>>> kafkaProps.put("group.id", groupId); >>>> >>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, >>>> Seconds.apply(1)); >>>> jsc.checkpoint("hdfs://<some_location>"); >>>> >>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new >>>> ArrayList<>(5); >>>> >>>> for (int i = 0; i < 5; i++) { >>>> streamList.add(KafkaUtils.createStream(jsc, >>>> String.class, ProtobufModel.class, >>>> StringDecoder.class, >>>> ProtobufModelDecoder.class, >>>> kafkaProps, >>>> Collections.singletonMap(topic, >>>> 1), >>>> StorageLevel.MEMORY_ONLY_SER())); >>>> } >>>> >>>> final JavaPairDStream<String, ProtobufModel> stream = >>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); >>>> >>>> // final JavaPairReceiverInputDStream<String, ProtobufModel> stream = >>>> // KafkaUtils.createStream(jsc, >>>> // String.class, >>>> ProtobufModel.class, >>>> // StringDecoder.class, >>>> ProtobufModelDecoder.class, >>>> // kafkaProps, >>>> // >>>> Collections.singletonMap(topic, 5), >>>> // >>>> StorageLevel.MEMORY_ONLY_SER()); >>>> >>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair( >>>> new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() { >>>> @Override >>>> public Tuple2<String, Integer> call(Tuple2<String, >>>> ProtobufModel> tuple) throws Exception { >>>> return new Tuple2<>(tuple._2().getDeviceId(), 1); >>>> } >>>> }); >>>> >>>> … and futher Spark functions ... >>>> >>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote: >>>> >>>>> Posting your code would be really helpful in figuring out gotchas. >>>>> >>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> >>>>> wrote: >>>>>> Hey, >>>>>> >>>>>> Spark 1.1.0 >>>>>> Kafka 0.8.1.1 >>>>>> Hadoop (YARN/HDFS) 2.5.1 >>>>>> >>>>>> I have a five partition Kafka topic. I can create a single Kafka >>>>>> receiver >>>>>> via KafkaUtils.createStream with five threads in the topic map and >>>>>> consume >>>>>> messages fine. Sifting through the user list and Google, I see that its >>>>>> possible to split the Kafka receiver among the Spark workers such that I >>>>>> can >>>>>> have a receiver per topic, and have this distributed to workers rather >>>>>> than >>>>>> localized to the driver. I’m following something like this: >>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 >>>>>> But for Kafka obviously. From the Streaming Programming Guide “ >>>>>> Receiving >>>>>> multiple data streams can therefore be achieved by creating multiple >>>>>> input >>>>>> DStreams and configuring them to receive different partitions of the data >>>>>> stream from the source(s)." >>>>>> >>>>>> However, I’m not able to consume any messages from Kafka after I perform >>>>>> the >>>>>> union operation. Again, if I create a single, multi-threaded, receiver I >>>>>> can consume messages fine. If I create 5 receivers in a loop, and call >>>>>> jssc.union(…) i get: >>>>>> >>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks >>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks >>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks >>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks >>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks >>>>>> >>>>>> >>>>>> Do I need to do anything to the unioned DStream? Am I going about this >>>>>> incorrectly? >>>>>> >>>>>> Thanks in advance. >>>>>> >>>>>> Matt >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org