Is it possible to make the error message give more an explanation? -Jay
On Wed, May 4, 2016 at 8:46 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi, > > I am still new to Kafka Streams by myself, but from my understanding if > you change the key, your partitioning changes, ie, is not valid anymore. > Thus, the joins (which assumes co-located data) cannot be performed > (this is the reason why sources get set to null). You can write to an > intermediate topic via .through(...) to get a valid partitioning: > > KStream dataStream = builder.stream(...).map(...).through(...); > > Afterward, your join should work. > > -Matthias > > > On 05/04/2016 04:43 PM, Gaspar Muñoz wrote: > > Hi there, > > > > I am not able to perform a Left Join between a KStream and KTable in > Kafka > > Streams. > > > > Exception in thread "main" > > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > topology > > building: KSTREAM-FILTER-0000000003 and KTABLE-SOURCE-0000000005 are not > > joinable > > at > > > org.apache.kafka.streams.kstream.internals.AbstractStream.ensureJoinableWith(AbstractStream.java:44) > > at > > > org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:383) > > > > > > My code looks > > > > KStream<String, String> dataStream = builder > > .stream(stringDeserializer, stringDeserializer, > > conf.getString(INPUT_TOPIC)) > > .map(App::keyByAap) > > > > KTable<String, String> aapTable = builder.table("lookup_topic"); > > > > KStream<String, String> result = dataStream.leftJoin(aapTable, new > > ValueJoinerAap()); > > > > > > My ValueJoiner simply concat two Strings. I think the problem is in map > > where I change the key in order to join by this field. In the internals > of > > KafkaStreams we can see that map function set to null source nodes > > > > @Override > > public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, > > V1>> mapper) { > > String name = topology.newName(MAP_NAME); > > > > topology.addProcessor(name, new KStreamMap<>(mapper), this.name); > > > > return new KStreamImpl<>(topology, name, null); > > } > > > > > > And after that, in the left join precondition, in function Set<String> > > allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); the code > > checks if any source nodes is null and, obviously, the KStream which I > > applied the map has the source to null so throw the exception. > > > > if (thisSourceNodes == null || otherSourceNodes == null) > > throw new TopologyBuilderException(this.name + " and " + > > other.name + " are not joinable"); > > > > > > The question is, how can I change the key without lose parent data in > order > > to perform a join with KTable after that? > > > > Thanks. > > > >