Thanks for reporting this Hamid. The ProcessorTopologyTestDriver is used
only in ProcessorTopologyTest and is currently not expected to use
otherwise, and hence that is why we overwrite the MockProducer's
partitionsFor function to only return the empty list. Is there any
particular reason that you want to use ProcessorTopologyTestDriver in your
testing / staging environment? In general I'd like to recommend using the a
full-fledged Kafka Stream library for integration tests with synthetic
testing data.

That said, in RecordCollector we can at least capture `numPartitions` = 0
and throw a more informative error. Will file a patch for this improvement.


Guozhang


On Wed, Sep 28, 2016 at 2:48 AM, Hamidreza Afzali <
hamidreza.afz...@hivestreaming.com> wrote:

> Hi,
>
> We are using the latest Kafka 0.10.1 branch. The combination of
> ProcessorTopologyTestDriver and WindowedStreamPartitioner is resulting in a
> division by 0 exception because of the empty list of partitions:
>
> https://github.com/apache/kafka/blob/0.10.1/streams/src/
> test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java#L158
> https://github.com/apache/kafka/blob/0.10.1/streams/src/
> main/java/org/apache/kafka/streams/kstream/internals/
> WindowedStreamPartitioner.java#L47
>
> Our topology looks similar to this:
>
>   builder.stream("events")
>     .groupByKey(...)
>     .aggregate(...,
>       TimeWindows.of(1 * 60 * 1000L)
>     )
>     .mapValues(_.size: Integer)
>     .to(windowedSerde, Serdes.Integer(), "events-over-time")
>
> If we use our own partitioner in .to() it works.
>
>   class MyStreamPartitioner[K, V]() extends StreamPartitioner[K, V] {
>     override def partition(k: K, v: V, numPartitions: Int): Integer = {
>       // return an integer between 0 and numPartitions-1, or null if the
> default partitioning logic should be used
>       null
>     }
>   }
>
> Is this a bug?
>
> Thank you in advance,
> Hamid
>
>


-- 
-- Guozhang

Reply via email to