So, this is scrubbed some for confidentiality, but the meat of it is as follows. Note, that if I substitute the commented section for the loop, I receive messages from the topic.
SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.streaming.unpersist", "true"); sparkConf.set("spark.logConf", "true"); Map<String, String> kafkaProps = new HashMap<>(); kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka"); kafkaProps.put("group.id", groupId); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1)); jsc.checkpoint("hdfs://<some_location>"); List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5); for (int i = 0; i < 5; i++) { streamList.add(KafkaUtils.createStream(jsc, String.class, ProtobufModel.class, StringDecoder.class, ProtobufModelDecoder.class, kafkaProps, Collections.singletonMap(topic, 1), StorageLevel.MEMORY_ONLY_SER())); } final JavaPairDStream<String, ProtobufModel> stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); // final JavaPairReceiverInputDStream<String, ProtobufModel> stream = // KafkaUtils.createStream(jsc, // String.class, ProtobufModel.class, // StringDecoder.class, ProtobufModelDecoder.class, // kafkaProps, // Collections.singletonMap(topic, 5), // StorageLevel.MEMORY_ONLY_SER()); final JavaPairDStream<String, Integer> tuples = stream.mapToPair( new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> tuple) throws Exception { return new Tuple2<>(tuple._2().getDeviceId(), 1); } }); … and futher Spark functions ... On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote: > Posting your code would be really helpful in figuring out gotchas. > > On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> wrote: >> Hey, >> >> Spark 1.1.0 >> Kafka 0.8.1.1 >> Hadoop (YARN/HDFS) 2.5.1 >> >> I have a five partition Kafka topic. I can create a single Kafka receiver >> via KafkaUtils.createStream with five threads in the topic map and consume >> messages fine. Sifting through the user list and Google, I see that its >> possible to split the Kafka receiver among the Spark workers such that I can >> have a receiver per topic, and have this distributed to workers rather than >> localized to the driver. I’m following something like this: >> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 >> But for Kafka obviously. From the Streaming Programming Guide “ Receiving >> multiple data streams can therefore be achieved by creating multiple input >> DStreams and configuring them to receive different partitions of the data >> stream from the source(s)." >> >> However, I’m not able to consume any messages from Kafka after I perform the >> union operation. Again, if I create a single, multi-threaded, receiver I >> can consume messages fine. If I create 5 receivers in a loop, and call >> jssc.union(…) i get: >> >> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks >> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks >> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks >> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks >> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks >> >> >> Do I need to do anything to the unioned DStream? Am I going about this >> incorrectly? >> >> Thanks in advance. >> >> Matt > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org