Hello,
I have a kafka cluster (version 1.0.1) with two brokers.
I have four topics on this cluster  (w, x, y, z) with replication factor 2 and 
2 partitions each.
To this cluster I connect with two consumers using the kafka-streams api 
version 1.0.1.

Like so:

 @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
WallclockTimestampExtractor.class.getName());
        
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
LogAndContinueExceptionHandler.class);
        return new StreamsConfig(props);
    }

    @Bean(name = "WTable")
    public KTable<String, WRecord>wKTable(ApplicationContext 
applicationContext, StreamsBuilder kStreamBuilder, WStore wStore) throws 
ClassNotFoundException {
        KTable<String, WRecord> table = kStreamBuilder.table(wTopic, 
Consumed.with(Serdes.String(), MySerdes.wSerde()), 
Materialized.with(Serdes.String(), 
MySerdes.wSerde()).as(wStore.getStoreName()));
        table.toStream().foreach((key, value) -> 
applicationContext.getBean(SomeService.class).processWEvent(key, value));
        return table;
    }

   @Bean(name = "XTable")
   ...

   @Bean(name = "YTable")
   ...

   @Bean(name = "ZTable")
    public KTable<String, ZRecord>zKTable(ApplicationContext 
applicationContext, StreamsBuilder kStreamBuilder, ZStore zStore) throws 
ClassNotFoundException {
        KTable<String, ZRecord> table = kStreamBuilder.table(zTopic, 
Consumed.with(Serdes.String(), MySerdes.zSerde()), 
Materialized.with(Serdes.String(), 
MySerdes.zSerde()).as(zStore.getStoreName()));
        table.toStream().foreach((key, value) -> 
applicationContext.getBean(SomeService.class).processZEvent(key, value));
        return table;
    }

I would expect the two consumer applications I start would connect to all 4 
topics and each one will consume from one partition of those topics like so:
Consumer1 would maybe connect wTopic-0, xTopic-1, yTopic-0, zTopic-0
Consumer2 would maybe connect wTopic-1, xTopic-0, yTopic-1, zTopic-1

but what actually happens is:
Consumer1 connects wTopic-0, xTopic-1, xTopic-0, zTopic-0 (consumer 1 is not 
even consuming from yTopic)
Consumer2 connects wTopic-1, yTopic-0, yTopic-1, zTopic-1 (consumer 2 is not 
even consuming from xTopic)

Is there a way to address this problem?

PS: I have only noticed this problem when I connect to more than three topics

Reply via email to