+1 I had the same thought and put it on my personal agenda already.
-Matthias On 05/04/2016 06:37 PM, Jay Kreps wrote: > Is it possible to make the error message give more an explanation? > > -Jay > > On Wed, May 4, 2016 at 8:46 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Hi, >> >> I am still new to Kafka Streams by myself, but from my understanding if >> you change the key, your partitioning changes, ie, is not valid anymore. >> Thus, the joins (which assumes co-located data) cannot be performed >> (this is the reason why sources get set to null). You can write to an >> intermediate topic via .through(...) to get a valid partitioning: >> >> KStream dataStream = builder.stream(...).map(...).through(...); >> >> Afterward, your join should work. >> >> -Matthias >> >> >> On 05/04/2016 04:43 PM, Gaspar Muñoz wrote: >>> Hi there, >>> >>> I am not able to perform a Left Join between a KStream and KTable in >> Kafka >>> Streams. >>> >>> Exception in thread "main" >>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid >> topology >>> building: KSTREAM-FILTER-0000000003 and KTABLE-SOURCE-0000000005 are not >>> joinable >>> at >>> >> org.apache.kafka.streams.kstream.internals.AbstractStream.ensureJoinableWith(AbstractStream.java:44) >>> at >>> >> org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:383) >>> >>> >>> My code looks >>> >>> KStream<String, String> dataStream = builder >>> .stream(stringDeserializer, stringDeserializer, >>> conf.getString(INPUT_TOPIC)) >>> .map(App::keyByAap) >>> >>> KTable<String, String> aapTable = builder.table("lookup_topic"); >>> >>> KStream<String, String> result = dataStream.leftJoin(aapTable, new >>> ValueJoinerAap()); >>> >>> >>> My ValueJoiner simply concat two Strings. I think the problem is in map >>> where I change the key in order to join by this field. In the internals >> of >>> KafkaStreams we can see that map function set to null source nodes >>> >>> @Override >>> public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, >>> V1>> mapper) { >>> String name = topology.newName(MAP_NAME); >>> >>> topology.addProcessor(name, new KStreamMap<>(mapper), this.name); >>> >>> return new KStreamImpl<>(topology, name, null); >>> } >>> >>> >>> And after that, in the left join precondition, in function Set<String> >>> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); the code >>> checks if any source nodes is null and, obviously, the KStream which I >>> applied the map has the source to null so throw the exception. >>> >>> if (thisSourceNodes == null || otherSourceNodes == null) >>> throw new TopologyBuilderException(this.name + " and " + >>> other.name + " are not joinable"); >>> >>> >>> The question is, how can I change the key without lose parent data in >> order >>> to perform a join with KTable after that? >>> >>> Thanks. >>> >> >> >
signature.asc
Description: OpenPGP digital signature