Partitioner is an optional field when defining an rdd. KafkaRDD doesn't define one, so you can't really assume anything about the way it's partitioned, because spark doesn't know anything about the way it's partitioned. If you want to rely on some property of how things were partitioned as they were being produced into kafka, you need to do foreachPartition or mapPartition yourself. Otherwise, spark will do a shuffle for any operation that would ordinarily require a shuffle, even if keys are already in the "right" place.
Regarding the assignment of cores to partitions, that's not really accurate. Each kafka partition will correspond to a spark partition. If you do an operation that shuffles, that relationship no longer holds true. Even if you're doing a straight map operation without a shuffle, you will probably get 1 executor core working on 1 partition, but there's no guarantee the scheduler will do that, and no guarantee it'll be the same core / partition relationship for the next batch. On Mon, Nov 23, 2015 at 9:01 AM, Thúy Hằng Lê <[email protected]> wrote: > Thanks Cody, > > I still have concerns about this. > What's do you mean by saying Spark direct stream doesn't have a default > partitioner? Could you please help me to explain more? > > When i assign 20 cores to 20 Kafka partitions, I am expecting each core > will work on a partition. Is it correct? > > I'm still couldn't figure out how RDD will be partitioned after mapToPair > function. It would be great if you could brieftly explain ( or send me some > document, i couldnt find it) about how shuffle work on mapToPair function. > > Thank you very much. > On Nov 23, 2015 12:26 AM, "Cody Koeninger" <[email protected]> wrote: > >> Spark direct stream doesn't have a default partitioner. >> >> If you know that you want to do an operation on keys that are already >> partitioned by kafka, just use mapPartitions or foreachPartition to avoid a >> shuffle. >> >> On Sat, Nov 21, 2015 at 11:46 AM, trung kien <[email protected]> wrote: >> >>> Hi all, >>> >>> I am having problem of understanding how RDD will be partitioned after >>> calling mapToPair function. >>> Could anyone give me more information about parititoning in this >>> function? >>> >>> I have a simple application doing following job: >>> >>> JavaPairInputDStream<String, String> messages = >>> KafkaUtils.createDirectStream(...) >>> >>> JavaPairDStream<String, Double> stats = messages.mapToPair(JSON_DECODE) >>> >>> .reduceByKey(SUM); >>> >>> saveToDB(stats) >>> >>> I setup 2 workers (each dedicate 20 cores) for this job. >>> My kafka topic has 40 partitions (I want each core handle a partition), >>> and the messages send to queue are partitioned by the same key as mapToPair >>> function. >>> I'm using default Partitioner of both Kafka and Sprark. >>> >>> Ideally, I shouldn't see the data shuffle between cores in mapToPair >>> stage, right? >>> However, in my Spark UI, I see that the "Locality Level" for this stage >>> is "ANY", which means data need to be transfered. >>> Any comments on this? >>> >>> -- >>> Thanks >>> Kien >>> >> >>
