Ah, that's unfortunate. Basically we have an existing stream-processing pipeline which relies on different partitioning schemes and we are writing some upstream Samza jobs. The only way to get it write in a particular partitioning scheme is then to write a different KafkaSystemFactory right ? Or perhaps patch the existing one ? I don't see a reason why it has to always use the default partitioning..
On 17 December 2015 at 07:59, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Michal, > > Sorry to reply late. Actually, you are right that the "partition.class" > configuration is not used in Samza to determine the outgoing partition. In > Samza, partition is defined by the following code sections: > {code} > > val topicName = envelope.getSystemStream.getStream > val partitions: java.util.List[PartitionInfo] = > producer.partitionsFor(topicName) > val partitionKey = if(envelope.getPartitionKey != null) > KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null > val record = new ProducerRecord(envelope.getSystemStream.getStream, > partitionKey, > envelope.getKey.asInstanceOf[Array[Byte]], > > envelope.getMessage.asInstanceOf[Array[Byte]]) > > {code} > > {code} > > def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, > partitions: java.util.List[PartitionInfo]): Integer = { > val numPartitions = partitions.size > abs(envelope.getPartitionKey.hashCode()) % numPartitions > } > > {code} > > > Hence, the partition.class in producer configuration is not used. > > > On Fri, Dec 11, 2015 at 4:46 AM, Michal Hariš <michal.har...@gmail.com> > wrote: > > > Hi all, I am not sure if this is the right mailing list but > > us...@samza.apache.org doesn't seem to exist. > > > > I am just looking at the code of KafkaSystemProducer and am a bit > confused > > as to how the partitioning at Samza output is handled. > > > > Firstly it seems to be hard-coded to take a modulo of the hashCode of the > > envelope partitionKey if provided otherwise null which means that it > hands > > the partitioning decision to the underlying kafka producer. > > > > Now when I try to override `systems.<..>.producer.partitioner.class` I > see > > a warning in the initialization that partitioner.class is not a known > > config - however the configuartion for samza says that any configuration > > available for kafka producer can be passed to > `systems.<..>.producer...`. I > > have checked that both new and old kafka producer api have > > `partitioner.class` configurable. > > > > I think I am missing something or else it means that samza doesn't allow > > for custom partitioning strategies at the output to kafka. > > > > Michal, > > > -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR