Hi there, I am not able to perform a Left Join between a KStream and KTable in Kafka Streams.
Exception in thread "main" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: KSTREAM-FILTER-0000000003 and KTABLE-SOURCE-0000000005 are not joinable at org.apache.kafka.streams.kstream.internals.AbstractStream.ensureJoinableWith(AbstractStream.java:44) at org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:383) My code looks KStream<String, String> dataStream = builder .stream(stringDeserializer, stringDeserializer, conf.getString(INPUT_TOPIC)) .map(App::keyByAap) KTable<String, String> aapTable = builder.table("lookup_topic"); KStream<String, String> result = dataStream.leftJoin(aapTable, new ValueJoinerAap()); My ValueJoiner simply concat two Strings. I think the problem is in map where I change the key in order to join by this field. In the internals of KafkaStreams we can see that map function set to null source nodes @Override public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) { String name = topology.newName(MAP_NAME); topology.addProcessor(name, new KStreamMap<>(mapper), this.name); return new KStreamImpl<>(topology, name, null); } And after that, in the left join precondition, in function Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); the code checks if any source nodes is null and, obviously, the KStream which I applied the map has the source to null so throw the exception. if (thisSourceNodes == null || otherSourceNodes == null) throw new TopologyBuilderException(this.name + " and " + other.name + " are not joinable"); The question is, how can I change the key without lose parent data in order to perform a join with KTable after that? Thanks. -- Gaspar Muñoz Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd <https://twitter.com/StratioBD>*