Ah, that's unfortunate. Basically we have an existing stream-processing
pipeline which relies on different partitioning schemes and we are writing
some upstream Samza jobs. The only way to get it write in a particular
partitioning scheme is then to write a different KafkaSystemFactory right ?
Or perhaps patch the existing one ? I don't see a reason why it has to
always use the default partitioning..

On 17 December 2015 at 07:59, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Michal,
>
> Sorry to reply late. Actually, you are right that the "partition.class"
> configuration is not used in Samza to determine the outgoing partition. In
> Samza, partition is defined by the following code sections:
> {code}
>
> val topicName = envelope.getSystemStream.getStream
> val partitions: java.util.List[PartitionInfo]  =
> producer.partitionsFor(topicName)
> val partitionKey = if(envelope.getPartitionKey != null)
> KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null
> val record = new ProducerRecord(envelope.getSystemStream.getStream,
>                                 partitionKey,
>                                 envelope.getKey.asInstanceOf[Array[Byte]],
>
> envelope.getMessage.asInstanceOf[Array[Byte]])
>
> {code}
>
> {code}
>
> def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope,
> partitions: java.util.List[PartitionInfo]): Integer = {
>   val numPartitions = partitions.size
>   abs(envelope.getPartitionKey.hashCode()) % numPartitions
> }
>
> {code}
>
>
> Hence, the partition.class in producer configuration is not used.
>
>
> On Fri, Dec 11, 2015 at 4:46 AM, Michal Hariš <michal.har...@gmail.com>
> wrote:
>
> > Hi all, I am not sure if this is the right mailing list but
> > us...@samza.apache.org doesn't seem to exist.
> >
> > I am just looking at the code of KafkaSystemProducer and am a bit
> confused
> > as to how the  partitioning at Samza output is handled.
> >
> > Firstly it seems to be hard-coded to take a modulo of the hashCode of the
> > envelope partitionKey if provided otherwise null which means that it
> hands
> > the partitioning decision to the underlying kafka producer.
> >
> > Now when I try to override `systems.<..>.producer.partitioner.class` I
> see
> > a warning in the initialization that partitioner.class is not a known
> > config - however the configuartion for samza says that any configuration
> > available for kafka producer can be passed to
> `systems.<..>.producer...`. I
> > have checked that both new and old kafka producer api have
> > `partitioner.class` configurable.
> >
> > I think I am missing something or else it means that samza doesn't allow
> > for custom partitioning strategies at the output to kafka.
> >
> > Michal,
> >
>



-- 
Michal Haris
Technical Architect
direct line: +44 (0) 207 749 0229
www.visualdna.com | t: +44 (0) 207 734 7033
31 Old Nichol Street
London
E2 7HR

Reply via email to