Thanks, I'll try it. It would be nice if you could improve the error message.
2016-05-04 20:13 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > +1 > > I had the same thought and put it on my personal agenda already. > > -Matthias > > On 05/04/2016 06:37 PM, Jay Kreps wrote: > > Is it possible to make the error message give more an explanation? > > > > -Jay > > > > On Wed, May 4, 2016 at 8:46 AM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Hi, > >> > >> I am still new to Kafka Streams by myself, but from my understanding if > >> you change the key, your partitioning changes, ie, is not valid anymore. > >> Thus, the joins (which assumes co-located data) cannot be performed > >> (this is the reason why sources get set to null). You can write to an > >> intermediate topic via .through(...) to get a valid partitioning: > >> > >> KStream dataStream = builder.stream(...).map(...).through(...); > >> > >> Afterward, your join should work. > >> > >> -Matthias > >> > >> > >> On 05/04/2016 04:43 PM, Gaspar Muñoz wrote: > >>> Hi there, > >>> > >>> I am not able to perform a Left Join between a KStream and KTable in > >> Kafka > >>> Streams. > >>> > >>> Exception in thread "main" > >>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > >> topology > >>> building: KSTREAM-FILTER-0000000003 and KTABLE-SOURCE-0000000005 are > not > >>> joinable > >>> at > >>> > >> > org.apache.kafka.streams.kstream.internals.AbstractStream.ensureJoinableWith(AbstractStream.java:44) > >>> at > >>> > >> > org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:383) > >>> > >>> > >>> My code looks > >>> > >>> KStream<String, String> dataStream = builder > >>> .stream(stringDeserializer, stringDeserializer, > >>> conf.getString(INPUT_TOPIC)) > >>> .map(App::keyByAap) > >>> > >>> KTable<String, String> aapTable = builder.table("lookup_topic"); > >>> > >>> KStream<String, String> result = dataStream.leftJoin(aapTable, new > >>> ValueJoinerAap()); > >>> > >>> > >>> My ValueJoiner simply concat two Strings. I think the problem is in > map > >>> where I change the key in order to join by this field. In the internals > >> of > >>> KafkaStreams we can see that map function set to null source nodes > >>> > >>> @Override > >>> public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, > >>> V1>> mapper) { > >>> String name = topology.newName(MAP_NAME); > >>> > >>> topology.addProcessor(name, new KStreamMap<>(mapper), this.name); > >>> > >>> return new KStreamImpl<>(topology, name, null); > >>> } > >>> > >>> > >>> And after that, in the left join precondition, in function Set<String> > >>> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); the > code > >>> checks if any source nodes is null and, obviously, the KStream which I > >>> applied the map has the source to null so throw the exception. > >>> > >>> if (thisSourceNodes == null || otherSourceNodes == null) > >>> throw new TopologyBuilderException(this.name + " and " + > >>> other.name + " are not joinable"); > >>> > >>> > >>> The question is, how can I change the key without lose parent data in > >> order > >>> to perform a join with KTable after that? > >>> > >>> Thanks. > >>> > >> > >> > > > > -- Gaspar Muñoz Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd <https://twitter.com/StratioBD>*