Hi, Michal, Sorry to reply late. Actually, you are right that the "partition.class" configuration is not used in Samza to determine the outgoing partition. In Samza, partition is defined by the following code sections: {code}
val topicName = envelope.getSystemStream.getStream val partitions: java.util.List[PartitionInfo] = producer.partitionsFor(topicName) val partitionKey = if(envelope.getPartitionKey != null) KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null val record = new ProducerRecord(envelope.getSystemStream.getStream, partitionKey, envelope.getKey.asInstanceOf[Array[Byte]], envelope.getMessage.asInstanceOf[Array[Byte]]) {code} {code} def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, partitions: java.util.List[PartitionInfo]): Integer = { val numPartitions = partitions.size abs(envelope.getPartitionKey.hashCode()) % numPartitions } {code} Hence, the partition.class in producer configuration is not used. On Fri, Dec 11, 2015 at 4:46 AM, Michal Hariš <michal.har...@gmail.com> wrote: > Hi all, I am not sure if this is the right mailing list but > us...@samza.apache.org doesn't seem to exist. > > I am just looking at the code of KafkaSystemProducer and am a bit confused > as to how the partitioning at Samza output is handled. > > Firstly it seems to be hard-coded to take a modulo of the hashCode of the > envelope partitionKey if provided otherwise null which means that it hands > the partitioning decision to the underlying kafka producer. > > Now when I try to override `systems.<..>.producer.partitioner.class` I see > a warning in the initialization that partitioner.class is not a known > config - however the configuartion for samza says that any configuration > available for kafka producer can be passed to `systems.<..>.producer...`. I > have checked that both new and old kafka producer api have > `partitioner.class` configurable. > > I think I am missing something or else it means that samza doesn't allow > for custom partitioning strategies at the output to kafka. > > Michal, >